123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- # -*- coding: utf-8 -*-
- # +
- # Get the metadata for the YB
- # -
- import requests
- from functools import reduce
- from operator import add
- import urllib.parse
- import yaml
- import xml.etree.ElementTree as ET
- import datetime
- from io import StringIO
- import csv
- from openpyxl import Workbook
- # Parameters
- credentials = yaml.safe_load(open("credentials.yaml", "r", encoding = "utf-8"))
- query = open("./query","r", encoding = "utf-8").read()
- api_key = credentials["ncbi_api_key"]
- email = credentials["email"]
- query_uri = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils"
- # Dictionnary of data to extract from the api
- data_dictionnary = yaml.safe_load(open("./variables_dict.yaml","r", encoding = "utf-8"))
- def query_pmi(tool, param, email, api_key):
-
- if tool not in ["esearch", "efetch", "esummary"]:
- pass
-
- tool = f"{tool}.fcgi"
- params = "&".join([f"{urllib.parse.quote(x)}={urllib.parse.quote(str(y))}" for x,y in param.items()])
- param2 = f"email={email}&api_key={api_key}"
-
- query = f"{query_uri}/{tool}?{params}&{param2}"
- res = requests.get(query)
-
- return res
- # +
- # Get article list
- search_query = {"db":"pubmed", "term":query, 'retmax':100000, 'rettype' : 'uilist', 'retmode':'json'}
- articles_pcmid = query_pmi("esearch", search_query, email, api_key).json()["esearchresult"]["idlist"]
- print(f"Extracted {len(articles_pcmid)} PMCIDs for current query")
- # +
- # Get metadata
- summaries = []
- abstracts = []
- # Looping over the articles
- n_articles = len(articles_pcmid)
- step = 100
- for i in range((n_articles//step)+int((n_articles%step) != 0)):
- print(f"Getting summaries {i*step} to {(i+1)*step}")
-
- ids = articles_pcmid[i*step:(i+1)*step]
-
- summary_query = {"db":"pubmed", "id":",".join([str(x) for x in ids]), "retstart":step, "retmode":"json"}
- summaries.append(
- query_pmi("esummary", summary_query, email, api_key).json()["result"]
- )
-
- summary_query = {"db":"pubmed", "id":",".join([str(x) for x in ids]), "retmode":"xml", "rettype":"abstract"}
- abstracts.append(
- query_pmi("efetch", summary_query, email, api_key).text
- )
- # -
- # Merging all summaries together
- summaries_all = dict(reduce(add, [tuple(x.items()) for x in summaries]))
- summaries_all.pop("uids")
- assert(len(summaries_all.keys()) == len(articles_pcmid))
- # Parsing abstract
- abstracts_parsed = [y for x in abstracts for y in ET.parse(StringIO(x)).findall("PubmedArticle")]
- abstracts_parsed = dict([(list(y.iter("PMID"))[-1].text, [z.text for z in y.iter("AbstractText")]) for y in abstracts_parsed if y is not None])
- # Abstract in summaries
- for x in summaries_all.keys():
- if x in abstracts_parsed.keys():
- summaries_all[x]['abstract'] = ".".join([y for y in abstracts_parsed[x] if y is not None])
- # +
- # Outputing result
- def generate_output (metadata):
- output = {}
-
- # Pré-traitement de la date
- pub_date = metadata["pubdate"].split("-")[0] \
- .replace("Summer","Jul") \
- .replace("Spring","Apr") \
- .replace("Autumn", "Oct") \
- .replace("Winter", "Jan") \
- .replace("First Quarter","Feb")
- n_terms = len(pub_date.split(" "))
- if n_terms == 1:
- pub_date_object = datetime.datetime.strptime(pub_date, "%Y")
- elif n_terms == 2:
- pub_date_object = datetime.datetime.strptime(pub_date, "%Y %b")
- else:
- pub_date_object = datetime.datetime.strptime(pub_date, "%Y %b %d")
-
- for x,y in data_dictionnary.items():
- if y == '':
- output[x] = ""
- elif y.startswith("#"):
- if y == "#articleids":
- doi = [z["value"] for z in metadata["articleids"] if z["idtype"] == "doi"]
- if len(doi) > 0:
- output[x] = doi[0]
- else:
- output[x] = ""
- elif y == "#yearpubdate":
- output[x] = pub_date_object.year
- elif y == "#monthpubdate":
- output[x] = datetime.datetime.strftime(pub_date_object,"%B")
- elif y == "#authorsfirst":
- if len(metadata["authors"]) > 0:
- output[x] = metadata["authors"][0]["name"]
- else:
- output[x] = ""
- elif y == "#authors":
- if len(metadata["authors"]) > 0:
- output[x] = " | ".join([x["name"] for x in metadata["authors"]])
- else:
- output[x] = ""
- else:
- if y in metadata.keys():
- output[x] = str(metadata[y])
- else:
- output[x] = ""
-
- return output
- # -
- output = list(map(generate_output, list(summaries_all.values())))
- # +
- # CSV
- # -
- with open("yb_list.csv","w", encoding = "utf-8") as f:
- w = csv.DictWriter(f, output[0].keys(), delimiter = ";")
- w.writeheader()
- w.writerows(output)
- # +
- # XLSX
- # +
- wb = Workbook()
- ws = wb.active
- ws.title = "Extraction"
- header = list(output[0].keys())
- ws.append(header)
- for row in output:
- ws.append(list(row.values()))
-
- wb.save("yb_list.xlsx")
|