database_constitution.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. ###
  2. #
  3. # MIMIC-IV database creation for Datacamp 2022
  4. #
  5. ###
  6. import pandas as pd
  7. import sqlite3
  8. import dask.dataframe as dd
  9. import dask
  10. from io import StringIO
  11. #
  12. # Parameters
  13. #
  14. sqlite_path = "../data/mimic-iv.sqlite"
  15. csv_ed_path = "../data/mimic-iv-ed/mimic-iv-ed-1.0/ed/"
  16. csv_mimic_path = "../data/mimic-iv/mimiciv/1.0/"
  17. # Loading sqlite database
  18. con = sqlite3.connect(sqlite_path)
  19. # Saving CSV to sqlite database
  20. ed_csv = [
  21. "triage",
  22. "medrecon",
  23. "edstays",
  24. "diagnosis"
  25. ]
  26. global_csv = [
  27. ["core/patients", "subject_id"],
  28. ["hosp/d_labitems", None]
  29. ]
  30. datasets = {}
  31. ## ED data
  32. for csv in ed_csv:
  33. print(f"Writting {csv}")
  34. datasets[csv] = pd.read_csv(f"{csv_ed_path}/{csv}.csv.gz")
  35. datasets[csv].to_sql(csv, con, if_exists="replace")
  36. # Getting patients list
  37. patients_list = datasets["edstays"]["subject_id"].unique().tolist()
  38. ## Global Mimic Data
  39. for csv in global_csv:
  40. print(f"Writting {csv[0]}")
  41. datasets[csv[0]] = pd.read_csv(f"{csv_mimic_path}/{csv[0]}.csv.gz")
  42. filename = csv[0].split("/")[-1]
  43. # Filtering
  44. if csv[1] == "subject_id":
  45. datasets[csv[0]] = datasets[csv[0]][
  46. datasets[csv[0]]["subject_id"].isin(patients_list)
  47. ].reset_index(drop=True)
  48. datasets[csv[0]].to_sql(filename, con, if_exists="replace")
  49. # Special cases : microbiology and labevents
  50. ## microbiology
  51. microbiology_df = pd.read_csv(f"{csv_mimic_path}/hosp/microbiologyevents.csv.gz")
  52. microbiology_df = microbiology_df[
  53. (microbiology_df["subject_id"].isin(patients_list))
  54. ].reset_index(drop=True)
  55. microbiology_df_filter = pd.merge(
  56. datasets["edstays"][["subject_id","intime", "outtime", "stay_id"]],
  57. microbiology_df[["microevent_id","subject_id", "charttime"]],
  58. on="subject_id"
  59. )
  60. microbiology_df_filter = microbiology_df_filter[
  61. (
  62. (microbiology_df_filter["charttime"] >= microbiology_df_filter["intime"])
  63. & (microbiology_df_filter["charttime"] <= microbiology_df_filter["outtime"])
  64. )]
  65. microbiology_df = pd.merge(microbiology_df, microbiology_df_filter[["stay_id", "microevent_id"]], left_on="microevent_id", right_on="microevent_id", how="inner").drop_duplicates("microevent_id")
  66. microbiology_df.to_sql("microbiologyevents", con, if_exists="replace")
  67. ## labevents
  68. ### The database is too big to be processed once, dealing with batchs
  69. n_lines = int(1e9)
  70. # Dropping table
  71. cursor = con.cursor()
  72. cursor.execute("DROP TABLE IF EXISTS labevents")
  73. cursor.close()
  74. with open(f"{csv_mimic_path}/hosp/labevents.csv", "r") as f:
  75. header = f.readline()
  76. i = 0
  77. while True:
  78. print(f"Writting rows {i*n_lines} to {(i+1)*n_lines}")
  79. i += 1
  80. lines = f.readlines(n_lines)
  81. if f.closed or len(lines) == 0:
  82. break
  83. temp_file = StringIO("\n".join([header]+lines))
  84. temp_df = pd.read_csv(temp_file)
  85. # Filtering intesresting results
  86. temp_df = temp_df[
  87. (temp_df["subject_id"].astype("int64").isin(patients_list))
  88. ].reset_index(drop=True)
  89. # Filtering results to the accurate date
  90. temp_df_filter = pd.merge(
  91. datasets["edstays"][["subject_id","intime", "outtime", "stay_id"]],
  92. temp_df[["labevent_id","subject_id", "charttime"]],
  93. on="subject_id"
  94. )
  95. temp_df_filter = temp_df_filter[
  96. (
  97. (temp_df_filter["charttime"] >= temp_df_filter["intime"])
  98. & (temp_df_filter["charttime"] <= temp_df_filter["outtime"])
  99. )]
  100. temp_df = pd.merge(temp_df, temp_df_filter[["stay_id", "labevent_id"]], left_on="labevent_id", right_on="labevent_id", how="inner")
  101. # Writting to database
  102. temp_df.to_sql("labevents", con, if_exists="append")
  103. # Creating index
  104. indexes = {
  105. "triage":["subject_id","stay_id"],
  106. "medrecon":["subject_id","stay_id"],
  107. "edstays":["subject_id","stay_id","hadm_id"],
  108. "diagnosis":["subject_id","stay_id"],
  109. "patients":["subject_id"],
  110. "d_labitems":["itemid"],
  111. "microbiologyevents":["microevent_id","subject_id","stay_id","hadm_id","micro_specimen_id"],
  112. "labevents":["labevent_id","subject_id","stay_id","hadm_id","specimen_id","itemid"],
  113. }
  114. cursor = con.cursor()
  115. for table_name, indexes_columns in indexes.items():
  116. for indexes_column in indexes_columns:
  117. indexes_query = f"CREATE INDEX {table_name}_{indexes_column} ON {table_name} ({indexes_column})"
  118. cursor.execute(indexes_query)
  119. cursor.close()