yb_metadata.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. # -*- coding: utf-8 -*-
  2. # +
  3. # Get the metadata for the YB
  4. # -
  5. import requests
  6. from functools import reduce
  7. from operator import add
  8. import urllib.parse
  9. import yaml
  10. import xml.etree.ElementTree as ET
  11. import datetime
  12. from io import StringIO
  13. import csv
  14. from openpyxl import Workbook
  15. # Parameters
  16. credentials = yaml.safe_load(open("credentials.yaml", "r", encoding = "utf-8"))
  17. query = open("./query","r", encoding = "utf-8").read()
  18. api_key = credentials["ncbi_api_key"]
  19. email = credentials["email"]
  20. query_uri = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils"
  21. # Dictionnary of data to extract from the api
  22. data_dictionnary = yaml.safe_load(open("./variables_dict.yaml","r", encoding = "utf-8"))
  23. def query_pmi(tool, param, email, api_key):
  24. if tool not in ["esearch", "efetch", "esummary"]:
  25. pass
  26. tool = f"{tool}.fcgi"
  27. params = "&".join([f"{urllib.parse.quote(x)}={urllib.parse.quote(str(y))}" for x,y in param.items()])
  28. param2 = f"email={email}&api_key={api_key}"
  29. query = f"{query_uri}/{tool}?{params}&{param2}"
  30. res = requests.get(query)
  31. return res
  32. # +
  33. # Get article list
  34. search_query = {"db":"pubmed", "term":query, 'retmax':100000, 'rettype' : 'uilist', 'retmode':'json'}
  35. articles_pcmid = query_pmi("esearch", search_query, email, api_key).json()["esearchresult"]["idlist"]
  36. print(f"Extracted {len(articles_pcmid)} PMCIDs for current query")
  37. # +
  38. # Get metadata
  39. summaries = []
  40. abstracts = []
  41. # Looping over the articles
  42. n_articles = len(articles_pcmid)
  43. step = 100
  44. for i in range((n_articles//step)+int((n_articles%step) != 0)):
  45. print(f"Getting summaries {i*step} to {(i+1)*step}")
  46. ids = articles_pcmid[i*step:(i+1)*step]
  47. summary_query = {"db":"pubmed", "id":",".join([str(x) for x in ids]), "retstart":step, "retmode":"json"}
  48. summaries.append(
  49. query_pmi("esummary", summary_query, email, api_key).json()["result"]
  50. )
  51. summary_query = {"db":"pubmed", "id":",".join([str(x) for x in ids]), "retmode":"xml", "rettype":"abstract"}
  52. abstracts.append(
  53. query_pmi("efetch", summary_query, email, api_key).text
  54. )
  55. # -
  56. # Merging all summaries together
  57. summaries_all = dict(reduce(add, [tuple(x.items()) for x in summaries]))
  58. summaries_all.pop("uids")
  59. assert(len(summaries_all.keys()) == len(articles_pcmid))
  60. # Parsing abstract
  61. abstracts_parsed = [y for x in abstracts for y in ET.parse(StringIO(x)).findall("PubmedArticle")]
  62. abstracts_parsed = dict([(list(y.iter("PMID"))[-1].text, [z.text for z in y.iter("AbstractText")]) for y in abstracts_parsed if y is not None])
  63. # Abstract in summaries
  64. for x in summaries_all.keys():
  65. if x in abstracts_parsed.keys():
  66. summaries_all[x]['abstract'] = ".".join([y for y in abstracts_parsed[x] if y is not None])
  67. # +
  68. # Outputing result
  69. def generate_output (metadata):
  70. output = {}
  71. # Pré-traitement de la date
  72. pub_date = metadata["pubdate"].split("-")[0] \
  73. .replace("Summer","Jul") \
  74. .replace("Spring","Apr") \
  75. .replace("Autumn", "Oct") \
  76. .replace("Winter", "Jan") \
  77. .replace("First Quarter","Feb")
  78. n_terms = len(pub_date.split(" "))
  79. if n_terms == 1:
  80. pub_date_object = datetime.datetime.strptime(pub_date, "%Y")
  81. elif n_terms == 2:
  82. pub_date_object = datetime.datetime.strptime(pub_date, "%Y %b")
  83. else:
  84. pub_date_object = datetime.datetime.strptime(pub_date, "%Y %b %d")
  85. for x,y in data_dictionnary.items():
  86. if y == '':
  87. output[x] = ""
  88. elif y.startswith("#"):
  89. if y == "#articleids":
  90. doi = [z["value"] for z in metadata["articleids"] if z["idtype"] == "doi"]
  91. if len(doi) > 0:
  92. output[x] = doi[0]
  93. else:
  94. output[x] = ""
  95. elif y == "#yearpubdate":
  96. output[x] = pub_date_object.year
  97. elif y == "#monthpubdate":
  98. output[x] = datetime.datetime.strftime(pub_date_object,"%B")
  99. elif y == "#authorsfirst":
  100. if len(metadata["authors"]) > 0:
  101. output[x] = metadata["authors"][0]["name"]
  102. else:
  103. output[x] = ""
  104. elif y == "#authors":
  105. if len(metadata["authors"]) > 0:
  106. output[x] = " | ".join([x["name"] for x in metadata["authors"]])
  107. else:
  108. output[x] = ""
  109. else:
  110. if y in metadata.keys():
  111. output[x] = str(metadata[y])
  112. else:
  113. output[x] = ""
  114. return output
  115. # -
  116. output = list(map(generate_output, list(summaries_all.values())))
  117. # +
  118. # CSV
  119. # -
  120. with open("yb_list.csv","w", encoding = "utf-8") as f:
  121. w = csv.DictWriter(f, output[0].keys(), delimiter = ";")
  122. w.writeheader()
  123. w.writerows(output)
  124. # +
  125. # XLSX
  126. # +
  127. wb = Workbook()
  128. ws = wb.active
  129. ws.title = "Extraction"
  130. header = list(output[0].keys())
  131. ws.append(header)
  132. for row in output:
  133. ws.append(list(row.values()))
  134. wb.save("yb_list.xlsx")