123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- ###
- #
- # MIMIC-IV database creation for Datacamp 2022
- #
- ###
- import pandas as pd
- import sqlite3
- import dask.dataframe as dd
- import dask
- from io import StringIO
- #
- # Parameters
- #
- sqlite_path = "../data/mimic-iv.sqlite"
- csv_ed_path = "../data/mimic-iv-ed/mimic-iv-ed-1.0/ed/"
- csv_mimic_path = "../data/mimic-iv/mimiciv/1.0/"
- # Loading sqlite database
- con = sqlite3.connect(sqlite_path)
- # Saving CSV to sqlite database
- ed_csv = [
- "triage",
- "medrecon",
- "edstays",
- "diagnosis"
- ]
- global_csv = [
- ["core/patients", "subject_id"],
- ["hosp/d_labitems", None]
- ]
- datasets = {}
- ## ED data
- for csv in ed_csv:
- print(f"Writting {csv}")
- datasets[csv] = pd.read_csv(f"{csv_ed_path}/{csv}.csv.gz")
- datasets[csv].to_sql(csv, con, if_exists="replace")
- # Getting patients list
- patients_list = datasets["edstays"]["subject_id"].unique().tolist()
- ## Global Mimic Data
- for csv in global_csv:
- print(f"Writting {csv[0]}")
- datasets[csv[0]] = pd.read_csv(f"{csv_mimic_path}/{csv[0]}.csv.gz")
- filename = csv[0].split("/")[-1]
- # Filtering
- if csv[1] == "subject_id":
- datasets[csv[0]] = datasets[csv[0]][
- datasets[csv[0]]["subject_id"].isin(patients_list)
- ].reset_index(drop=True)
- datasets[csv[0]].to_sql(filename, con, if_exists="replace")
- # Special cases : microbiology and labevents
- ## microbiology
- microbiology_df = pd.read_csv(f"{csv_mimic_path}/hosp/microbiologyevents.csv.gz")
- microbiology_df = microbiology_df[
- (microbiology_df["subject_id"].isin(patients_list))
- ].reset_index(drop=True)
- microbiology_df_filter = pd.merge(
- datasets["edstays"][["subject_id","intime", "outtime", "stay_id"]],
- microbiology_df[["microevent_id","subject_id", "charttime"]],
- on="subject_id"
- )
- microbiology_df_filter = microbiology_df_filter[
- (
- (microbiology_df_filter["charttime"] >= microbiology_df_filter["intime"])
- & (microbiology_df_filter["charttime"] <= microbiology_df_filter["outtime"])
- )]
- microbiology_df = pd.merge(microbiology_df, microbiology_df_filter[["stay_id", "microevent_id"]], left_on="microevent_id", right_on="microevent_id", how="inner").drop_duplicates("microevent_id")
- microbiology_df.to_sql("microbiologyevents", con, if_exists="replace")
- ## labevents
- ### The database is too big to be processed once, dealing with batchs
- n_lines = int(1e9)
- # Dropping table
- cursor = con.cursor()
- cursor.execute("DROP TABLE IF EXISTS labevents")
- cursor.close()
- with open(f"{csv_mimic_path}/hosp/labevents.csv", "r") as f:
- header = f.readline()
- i = 0
- while True:
- print(f"Writting rows {i*n_lines} to {(i+1)*n_lines}")
- i += 1
- lines = f.readlines(n_lines)
- if f.closed or len(lines) == 0:
- break
- temp_file = StringIO("\n".join([header]+lines))
- temp_df = pd.read_csv(temp_file)
- # Filtering intesresting results
- temp_df = temp_df[
- (temp_df["subject_id"].astype("int64").isin(patients_list))
- ].reset_index(drop=True)
- # Filtering results to the accurate date
- temp_df_filter = pd.merge(
- datasets["edstays"][["subject_id","intime", "outtime", "stay_id"]],
- temp_df[["labevent_id","subject_id", "charttime"]],
- on="subject_id"
- )
- temp_df_filter = temp_df_filter[
- (
- (temp_df_filter["charttime"] >= temp_df_filter["intime"])
- & (temp_df_filter["charttime"] <= temp_df_filter["outtime"])
- )]
- temp_df = pd.merge(temp_df, temp_df_filter[["stay_id", "labevent_id"]], left_on="labevent_id", right_on="labevent_id", how="inner")
- # Writting to database
- temp_df.to_sql("labevents", con, if_exists="append")
- # Creating index
- indexes = {
- "triage":["subject_id","stay_id"],
- "medrecon":["subject_id","stay_id"],
- "edstays":["subject_id","stay_id","hadm_id"],
- "diagnosis":["subject_id","stay_id"],
- "patients":["subject_id"],
- "d_labitems":["itemid"],
- "microbiologyevents":["microevent_id","subject_id","stay_id","hadm_id","micro_specimen_id"],
- "labevents":["labevent_id","subject_id","stay_id","hadm_id","specimen_id","itemid"],
- }
- cursor = con.cursor()
- for table_name, indexes_columns in indexes.items():
- for indexes_column in indexes_columns:
- indexes_query = f"CREATE INDEX {table_name}_{indexes_column} ON {table_name} ({indexes_column})"
- cursor.execute(indexes_query)
- cursor.close()
|