|
@@ -0,0 +1,170 @@
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+# +
|
|
|
+# Get the metadata for the YB
|
|
|
+# -
|
|
|
+
|
|
|
+import requests
|
|
|
+from functools import reduce
|
|
|
+from operator import add
|
|
|
+import urllib.parse
|
|
|
+import yaml
|
|
|
+import xml.etree.ElementTree as ET
|
|
|
+import datetime
|
|
|
+from io import StringIO
|
|
|
+import csv
|
|
|
+from openpyxl import Workbook
|
|
|
+
|
|
|
+# Parameters
|
|
|
+credentials = yaml.safe_load(open("credentials.yaml", "r", encoding = "utf-8"))
|
|
|
+query = open("./query","r", encoding = "utf-8").read()
|
|
|
+api_key = credentials["ncbi_api_key"]
|
|
|
+email = credentials["email"]
|
|
|
+query_uri = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils"
|
|
|
+
|
|
|
+# Dictionnary of data to extract from the api
|
|
|
+data_dictionnary = yaml.safe_load(open("./variables_dict.yaml","r", encoding = "utf-8"))
|
|
|
+
|
|
|
+
|
|
|
+def query_pmi(tool, param, email, api_key):
|
|
|
+
|
|
|
+ if tool not in ["esearch", "efetch", "esummary"]:
|
|
|
+ pass
|
|
|
+
|
|
|
+ tool = f"{tool}.fcgi"
|
|
|
+ params = "&".join([f"{urllib.parse.quote(x)}={urllib.parse.quote(str(y))}" for x,y in param.items()])
|
|
|
+ param2 = f"email={email}&api_key={api_key}"
|
|
|
+
|
|
|
+ query = f"{query_uri}/{tool}?{params}&{param2}"
|
|
|
+ res = requests.get(query)
|
|
|
+
|
|
|
+ return res
|
|
|
+
|
|
|
+
|
|
|
+# +
|
|
|
+# Get article list
|
|
|
+
|
|
|
+search_query = {"db":"pubmed", "term":query, 'retmax':100000, 'rettype' : 'uilist', 'retmode':'json'}
|
|
|
+articles_pcmid = query_pmi("esearch", search_query, email, api_key).json()["esearchresult"]["idlist"]
|
|
|
+print(f"Extracted {len(articles_pcmid)} PMCIDs for current query")
|
|
|
+
|
|
|
+# +
|
|
|
+# Get metadata
|
|
|
+summaries = []
|
|
|
+abstracts = []
|
|
|
+
|
|
|
+# Looping over the articles
|
|
|
+n_articles = len(articles_pcmid)
|
|
|
+step = 100
|
|
|
+for i in range((n_articles//step)+int((n_articles%step) != 0)):
|
|
|
+ print(f"Getting summaries {i*step} to {(i+1)*step}")
|
|
|
+
|
|
|
+ ids = articles_pcmid[i*step:(i+1)*step]
|
|
|
+
|
|
|
+ summary_query = {"db":"pubmed", "id":",".join([str(x) for x in ids]), "retstart":step, "retmode":"json"}
|
|
|
+ summaries.append(
|
|
|
+ query_pmi("esummary", summary_query, email, api_key).json()["result"]
|
|
|
+ )
|
|
|
+
|
|
|
+ summary_query = {"db":"pubmed", "id":",".join([str(x) for x in ids]), "retmode":"xml", "rettype":"abstract"}
|
|
|
+ abstracts.append(
|
|
|
+ query_pmi("efetch", summary_query, email, api_key).text
|
|
|
+ )
|
|
|
+# -
|
|
|
+
|
|
|
+# Merging all summaries together
|
|
|
+summaries_all = dict(reduce(add, [tuple(x.items()) for x in summaries]))
|
|
|
+summaries_all.pop("uids")
|
|
|
+assert(len(summaries_all.keys()) == len(articles_pcmid))
|
|
|
+
|
|
|
+# Parsing abstract
|
|
|
+abstracts_parsed = [y for x in abstracts for y in ET.parse(StringIO(x)).findall("PubmedArticle")]
|
|
|
+abstracts_parsed = dict([(list(y.iter("PMID"))[-1].text, [z.text for z in y.iter("AbstractText")]) for y in abstracts_parsed if y is not None])
|
|
|
+
|
|
|
+# Abstract in summaries
|
|
|
+for x in summaries_all.keys():
|
|
|
+ if x in abstracts_parsed.keys():
|
|
|
+ summaries_all[x]['abstract'] = ".".join([y for y in abstracts_parsed[x] if y is not None])
|
|
|
+
|
|
|
+
|
|
|
+# +
|
|
|
+# Outputing result
|
|
|
+
|
|
|
+def generate_output (metadata):
|
|
|
+ output = {}
|
|
|
+
|
|
|
+ # Pré-traitement de la date
|
|
|
+ pub_date = metadata["pubdate"].split("-")[0] \
|
|
|
+ .replace("Summer","Jul") \
|
|
|
+ .replace("Spring","Apr") \
|
|
|
+ .replace("Autumn", "Oct") \
|
|
|
+ .replace("Winter", "Jan") \
|
|
|
+ .replace("First Quarter","Feb")
|
|
|
+ n_terms = len(pub_date.split(" "))
|
|
|
+ if n_terms == 1:
|
|
|
+ pub_date_object = datetime.datetime.strptime(pub_date, "%Y")
|
|
|
+ elif n_terms == 2:
|
|
|
+ pub_date_object = datetime.datetime.strptime(pub_date, "%Y %b")
|
|
|
+ else:
|
|
|
+ pub_date_object = datetime.datetime.strptime(pub_date, "%Y %b %d")
|
|
|
+
|
|
|
+ for x,y in data_dictionnary.items():
|
|
|
+ if y == '':
|
|
|
+ output[x] = ""
|
|
|
+ elif y.startswith("#"):
|
|
|
+ if y == "#articleids":
|
|
|
+ doi = [z["value"] for z in metadata["articleids"] if z["idtype"] == "doi"]
|
|
|
+ if len(doi) > 0:
|
|
|
+ output[x] = doi[0]
|
|
|
+ else:
|
|
|
+ output[x] = ""
|
|
|
+ elif y == "#yearpubdate":
|
|
|
+ output[x] = pub_date_object.year
|
|
|
+ elif y == "#monthpubdate":
|
|
|
+ output[x] = datetime.datetime.strftime(pub_date_object,"%B")
|
|
|
+ elif y == "#authorsfirst":
|
|
|
+ if len(metadata["authors"]) > 0:
|
|
|
+ output[x] = metadata["authors"][0]["name"]
|
|
|
+ else:
|
|
|
+ output[x] = ""
|
|
|
+ elif y == "#authors":
|
|
|
+ if len(metadata["authors"]) > 0:
|
|
|
+ output[x] = " | ".join([x["name"] for x in metadata["authors"]])
|
|
|
+ else:
|
|
|
+ output[x] = ""
|
|
|
+ else:
|
|
|
+ if y in metadata.keys():
|
|
|
+ output[x] = str(metadata[y])
|
|
|
+ else:
|
|
|
+ output[x] = ""
|
|
|
+
|
|
|
+ return output
|
|
|
+
|
|
|
+
|
|
|
+# -
|
|
|
+
|
|
|
+output = list(map(generate_output, list(summaries_all.values())))
|
|
|
+
|
|
|
+# +
|
|
|
+# CSV
|
|
|
+# -
|
|
|
+
|
|
|
+with open("yb_list.csv","w", encoding = "utf-8") as f:
|
|
|
+ w = csv.DictWriter(f, output[0].keys(), delimiter = ";")
|
|
|
+ w.writeheader()
|
|
|
+ w.writerows(output)
|
|
|
+
|
|
|
+# +
|
|
|
+# XLSX
|
|
|
+
|
|
|
+# +
|
|
|
+wb = Workbook()
|
|
|
+ws = wb.active
|
|
|
+ws.title = "Extraction"
|
|
|
+
|
|
|
+header = list(output[0].keys())
|
|
|
+ws.append(header)
|
|
|
+
|
|
|
+for row in output:
|
|
|
+ ws.append(list(row.values()))
|
|
|
+
|
|
|
+wb.save("yb_list.xlsx")
|