# -*- coding: utf-8 -*- # + # Get the metadata for the YB # - import requests from functools import reduce from operator import add import urllib.parse import yaml import xml.etree.ElementTree as ET import datetime from io import StringIO import csv from openpyxl import Workbook # Parameters credentials = yaml.safe_load(open("credentials.yaml", "r", encoding = "utf-8")) query = open("./query","r", encoding = "utf-8").read() api_key = credentials["ncbi_api_key"] email = credentials["email"] query_uri = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils" # Dictionnary of data to extract from the api data_dictionnary = yaml.safe_load(open("./variables_dict.yaml","r", encoding = "utf-8")) def query_pmi(tool, param, email, api_key): if tool not in ["esearch", "efetch", "esummary"]: pass tool = f"{tool}.fcgi" params = "&".join([f"{urllib.parse.quote(x)}={urllib.parse.quote(str(y))}" for x,y in param.items()]) param2 = f"email={email}&api_key={api_key}" query = f"{query_uri}/{tool}?{params}&{param2}" res = requests.get(query) return res # + # Get article list search_query = {"db":"pubmed", "term":query, 'retmax':100000, 'rettype' : 'uilist', 'retmode':'json'} articles_pcmid = query_pmi("esearch", search_query, email, api_key).json()["esearchresult"]["idlist"] print(f"Extracted {len(articles_pcmid)} PMCIDs for current query") # + # Get metadata summaries = [] abstracts = [] # Looping over the articles n_articles = len(articles_pcmid) step = 100 for i in range((n_articles//step)+int((n_articles%step) != 0)): print(f"Getting summaries {i*step} to {(i+1)*step}") ids = articles_pcmid[i*step:(i+1)*step] summary_query = {"db":"pubmed", "id":",".join([str(x) for x in ids]), "retstart":step, "retmode":"json"} summaries.append( query_pmi("esummary", summary_query, email, api_key).json()["result"] ) summary_query = {"db":"pubmed", "id":",".join([str(x) for x in ids]), "retmode":"xml", "rettype":"abstract"} abstracts.append( query_pmi("efetch", summary_query, email, api_key).text ) # - # Merging all summaries together summaries_all = dict(reduce(add, [tuple(x.items()) for x in summaries])) summaries_all.pop("uids") assert(len(summaries_all.keys()) == len(articles_pcmid)) # Parsing abstract abstracts_parsed = [y for x in abstracts for y in ET.parse(StringIO(x)).findall("PubmedArticle")] abstracts_parsed = dict([(list(y.iter("PMID"))[-1].text, [z.text for z in y.iter("AbstractText")]) for y in abstracts_parsed if y is not None]) # Abstract in summaries for x in summaries_all.keys(): if x in abstracts_parsed.keys(): summaries_all[x]['abstract'] = ".".join([y for y in abstracts_parsed[x] if y is not None]) # + # Outputing result def generate_output (metadata): output = {} # Pré-traitement de la date pub_date = metadata["pubdate"].split("-")[0] \ .replace("Summer","Jul") \ .replace("Spring","Apr") \ .replace("Autumn", "Oct") \ .replace("Winter", "Jan") \ .replace("First Quarter","Feb") n_terms = len(pub_date.split(" ")) if n_terms == 1: pub_date_object = datetime.datetime.strptime(pub_date, "%Y") elif n_terms == 2: pub_date_object = datetime.datetime.strptime(pub_date, "%Y %b") else: pub_date_object = datetime.datetime.strptime(pub_date, "%Y %b %d") for x,y in data_dictionnary.items(): if y == '': output[x] = "" elif y.startswith("#"): if y == "#articleids": doi = [z["value"] for z in metadata["articleids"] if z["idtype"] == "doi"] if len(doi) > 0: output[x] = doi[0] else: output[x] = "" elif y == "#yearpubdate": output[x] = pub_date_object.year elif y == "#monthpubdate": output[x] = datetime.datetime.strftime(pub_date_object,"%B") elif y == "#authorsfirst": if len(metadata["authors"]) > 0: output[x] = metadata["authors"][0]["name"] else: output[x] = "" elif y == "#authors": if len(metadata["authors"]) > 0: output[x] = " | ".join([x["name"] for x in metadata["authors"]]) else: output[x] = "" else: if y in metadata.keys(): output[x] = str(metadata[y]) else: output[x] = "" return output # - output = list(map(generate_output, list(summaries_all.values()))) # + # CSV # - with open("yb_list.csv","w", encoding = "utf-8") as f: w = csv.DictWriter(f, output[0].keys(), delimiter = ";") w.writeheader() w.writerows(output) # + # XLSX # + wb = Workbook() ws = wb.active ws.title = "Extraction" header = list(output[0].keys()) ws.append(header) for row in output: ws.append(list(row.values())) wb.save("yb_list.xlsx")