123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509 |
- import re
- import os
- import pandas as pd
- import numpy as np
- def _getDictionnaryKeys(dictionnary):
- """
- Function that get keys from a dict object and flatten sub dict.
- """
-
- keys_array = []
- for key in dictionnary.keys():
- keys_array.append(key)
- if (type(dictionnary[key]) == type({})):
- keys_array = keys_array+_getDictionnaryKeys(dictionnary[key])
- return(keys_array)
- class pandasToBrat:
-
- """
- Class for Pandas brat folder management.
- For each brat folder, there is an instance of pandasToBrat.
- It supports importation and exportation of configurations for relations and entities.
- Documents importation and exportation.
- Annotations and entities importation and exportation.
- Inputs :
- folder, str : path of brat folder
- """
-
- def __init__(self, folder):
- self.folder = folder
- self.conf_file = 'annotation.conf'
-
- self.emptyDFCols = {
- "annotations":["id","type_id", "word", "label", "start", "end"],
- "relations":["id","type_id","relation","Arg1","Arg2"]
- }
-
- # Adding '/' to folder path if missing
- if(self.folder[-1] != '/'):
- self.folder += '/'
-
- # Creating folder if do not exist
- if (os.path.isdir(self.folder)) == False:
- os.mkdir(self.folder)
-
- # Loading conf file if exists | creating empty conf file if not
- self.read_conf()
-
- def _emptyData(self):
- fileList = self._getFileList()
- nb_files = fileList.shape[0]
-
- confirmation = input("Deleting all data ({} files), press y to confirm :".format(nb_files))
- if confirmation == 'y':
- fileList["filename"].apply(lambda x: os.remove(self.folder+x))
- print("{} files deleted.".format(nb_files))
-
- def _generateEntitiesStr (self, conf, data = '', level = 0):
-
- if (type(conf) != type({})):
- return data
-
- # Parsing keys
- for key in conf.keys():
- value = conf[key]
- if value == True:
- data += '\n'+level*'\t'+key
- elif value == False:
- data += '\n'+level*'\t'+'!'+key
- elif type(value) == type({}):
- data += '\n'+level*'\t'+key
- data = self._generateEntitiesStr(value, data, level+1)
- return data
-
- def _writeEntitiesLevel (self, conf, data, last_n = -1):
-
- for n in range(last_n,len(conf)):
- # If empty : pass, if not the last line : pass
- if (conf[n] != '' and n > last_n):
- level = len(conf[n].split("\t"))-1
- if (n+1 <= len(conf)): # Level of next item
- next_level = len(conf[n+1].split("\t"))-1
- else:
- next_level = level
-
- splitted_str = conf[n].split("\t")
- str_clean = splitted_str[len(splitted_str)-1]
-
- if (level >= next_level): # On écrit les lignes de même niveau
- if (str_clean[0] == '!'):
- data[str_clean[1:]] = False
- else:
- data[str_clean] = True
-
- if (level > next_level):
- # On casse la boucle
- break
- elif (level < next_level): # On écrit les lignes inférieurs par récurence
- splitted_str = conf[n].split("\t")
- last_n, data[str_clean] = self._writeEntitiesLevel(conf, {}, n)
- return(n, data)
-
- def _readRelations(self, relations, entities = []):
- data = {}
- for relation in relations.split("\n"):
- if relation != '':
- relation_data = relation.split("\t")[0]
- args = list(map(lambda x: x.split(":")[1], relation.split("\t")[1].split(", ")))
- args_valid = list(filter(lambda x: x in entities, args))
- if (len(args_valid) > 0):
- data[relation_data] = {"args":args_valid}
-
- return data
-
- def _writeRelations(self, relations, entities = []):
- data = ''
- for relation in relations:
- args_array = list(filter(lambda x: x in entities, relations[relation]["args"]))
- if (len(args_array) > 0):
- data += '\n'+relation+'\t'
- for n in range(0, len(args_array)):
- data += int(bool(n))*', '+'Arg'+str(n+1)+':'+args_array[n]
-
- return data
-
- def read_conf (self):
- """
- Get the current Brat configuration.
- Output :
- Dict containing "entities" and "relations" configurations.
- """
-
- if (os.path.isfile(self.folder+self.conf_file)):
-
- # Reading file
- file = open(self.folder+self.conf_file)
- conf_str = file.read()
- file.close()
-
- # Splitting conf_str
- conf_data = re.split(re.compile(r"\[[a-zA-Z]+\]", re.DOTALL), conf_str)[1:]
-
- data = {}
-
- # Reading enteties
- data["entities"] = self._writeEntitiesLevel(conf_data[0].split("\n"), {})[1]
-
- # Reading relations
- entitiesKeys = _getDictionnaryKeys(data["entities"])
- data["relations"] = self._readRelations(conf_data[1], entitiesKeys)
-
- return(data)
-
- else:
- self.write_conf()
- self.read_conf()
-
- def write_conf(self, entities = {}, relations = {}, events = {}, attributes = {}):
- """
- Write or overwrite configuration file.
- It actually doesn't suppport events and attributes configuration data.
- inputs :
- entities, dict : dict containing the entities. If an entities do have children, his value is an other dict, otherwise, it is set as True.
- relations, dict : dict containing the relations between entities, each key is a relation name, the value is a dict with a "args" key containing the list of related entities.
- """
-
- # TODO : Add events and attributes support.
- conf_str = ''
-
- # Entities
- conf_str += '\n\n[entities]'
- conf_str += self._generateEntitiesStr(entities)
-
- # relations
- conf_str += '\n\n[relations]'
- entitiesKeys = _getDictionnaryKeys(entities)
- conf_str += self._writeRelations(relations, entitiesKeys)
-
- # attributes
- conf_str += '\n\n[attributes]'
- # events
- conf_str += '\n\n[events]'
-
- # Write conf file
- file = open(self.folder+self.conf_file,'w')
- file.write(conf_str)
- file.close()
-
- def _getFileList(self):
- # Listing files
- filesDF = pd.DataFrame({'filename':pd.Series(os.listdir(self.folder))})
- filesDFSplitted = filesDF["filename"].str.split(".", expand = True)
- filesDF["id"] = filesDFSplitted[0]
- filesDF["filetype"] = filesDFSplitted[1]
-
- filesDF = filesDF[filesDF["filetype"].isin(["txt","ann"])]
-
- return(filesDF)
-
- def _parseData(self):
-
- # Listing files
- filesDF = self._getFileList()
-
- # Getting data from txt and ann
- filesDF_txt = filesDF.rename(columns = {"filename":"text_data"}).loc[filesDF["filetype"] == "txt", ["id","text_data"]]
- filesDF_ann = filesDF.rename(columns = {"filename":"annotation"}).loc[filesDF["filetype"] == "ann", ["id","annotation"]]
- dataDF = filesDF_txt.join(filesDF_ann.set_index("id"), on = "id")
- dataDF["text_data"] = dataDF["text_data"].apply(lambda x: open(self.folder+x).read())
- dataDF["annotation"] = dataDF["annotation"].apply(lambda x: open(self.folder+x).read())
-
- return(dataDF)
- def read_text(self):
- """
- read_text
- Get a pandas DataFrame containing the brat documents.
- Input : None
- Output : Pandas dataframe
- """
-
- dataDF = self._parseData()
-
- return(dataDF[["id","text_data"]])
- def read_annotation(self, ids = []):
- """
- read_annotation
- Get annotations from the brat folder.
- You can get specific annotation by filtering by id.
- input :
- ids, list (optionnal) : list of id for which you want the annotation data, if empty all annotations are returned.
- output :
- dict containing an annotations and relations data.
- """
-
- data = {}
- data["annotations"] = pd.DataFrame(columns=self.emptyDFCols["annotations"])
- data["relations"] = pd.DataFrame(columns=self.emptyDFCols["relations"])
-
- dataDF = self._parseData()[["id","annotation"]]
- dataDF = dataDF[(dataDF["annotation"].isna() == False) & (dataDF["annotation"] != '')] # Removing empty annotation
-
- # Filtering by ids
- if (len(ids) > 0):
- dataDF = dataDF[dataDF["id"].isin(pd.Series(ids).astype(str))]
-
- if (dataDF.shape[0] > 0):
-
- # Ann data to pandas
- dataDF = dataDF.join(dataDF["annotation"].str.split("\n").apply(pd.Series).stack().reset_index(level = 0).set_index("level_0")).reset_index(drop = True).drop("annotation", axis = 1).rename(columns = {0: "annotation"})
- dataDF = dataDF[dataDF["annotation"].str.len() > 0].reset_index(drop = True)
- dataDF = dataDF.join(dataDF["annotation"].str.split("\t", expand = True).rename(columns = {0: 'type_id', 1: 'data', 2: 'word'})).drop("annotation", axis = 1)
- dataDF["type"] = dataDF["type_id"].str.slice(0,1)
-
- ## Annotations
- data["annotations"] = dataDF[dataDF["type"] == 'T']
- if (data["annotations"].shape[0] > 0):
- data["annotations"] = data["annotations"].join(data["annotations"]["data"].str.split(" ", expand = True).rename(columns = {0: "label", 1: "start", 2: "end"})).drop(columns = ["data","type"])
- ## Relations
- data["relations"] = dataDF[dataDF["type"] == 'R']
- if (data["relations"].shape[0] > 0):
- tmp_splitted = data["relations"]["data"].str.split(" ", expand = True).rename(columns = {0: "relation"})
- ### Col names
- rename_dict = dict(zip(list(tmp_splitted.columns.values[1:]), list("Arg"+tmp_splitted.columns.values[1:].astype(str).astype(object))))
- tmp_splitted = tmp_splitted.rename(columns = rename_dict)
- ### Merging data
- tmp_splitted = tmp_splitted[["relation"]].join(tmp_splitted.loc[:,tmp_splitted.columns[tmp_splitted.columns != 'relation']].applymap(lambda x: x.split(":")[1]))
- data["relations"] = data["relations"].join(tmp_splitted).drop(columns = ["data","type","word"])
-
- return(data)
-
- def _write_function(self, x, filetype = "txt", overwrite = False):
-
- filenames = []
-
- if (filetype == 'txt' or filetype == 'both'):
- filenames.append(self.folder+str(x["filename"])+'.txt')
-
- if (filetype == 'ann' or filetype == 'both'):
- filenames.append(self.folder+str(x["filename"])+'.ann')
-
- for filename in filenames:
- try:
- open(str(filename), "r")
- is_file = True
- except FileNotFoundError:
- is_file = False
-
- if ((is_file == False) or (overwrite == True)):
- file = open(str(filename), "w")
- file.write(x["content"])
- file.close()
-
- def write_text(self, text_id, text, empty = False, overWriteAnnotations = False):
-
- """
- write_text
- Send text data from the brat folder.
- input :
- text_id, pd.Series : pandas series containing documents ids
- text, pd.Series : pandas series containing documents text in the same order as text_id
- empty, boolean : if True the brat folder is emptyied of all but configuration data (text and ann files) before writting
- overwriteAnnotations, boolean : if True, the current annotation files are replaced by blank one
- """
- if overWriteAnnotations == True: # On controle la façon dont la variable est écrite
- overwriteAnn = True
- else:
- overwriteAnn = False
-
- if (type(text) == type(pd.Series()) and type(text_id) == type(pd.Series()) and text.shape[0] == text_id.shape[0]):
-
- # ID check : check should be smaller than text : check if not inverted
- if (text_id.astype(str).str.len().max() < text.astype(str).str.len().max()):
-
- # empty : option to erase existing data
- if (empty):
- self._emptyData()
- # Writting data
- print("Writting data")
- df_text = pd.DataFrame({"filename":text_id, "content":text})
- df_ann = pd.DataFrame({"filename":text_id, "content":""})
-
- df_text.apply(lambda x: self._write_function(x, filetype = "txt", overwrite = True), axis = 1)
- df_ann.apply(lambda x: self._write_function(x, filetype = "ann", overwrite = overwriteAnn), axis = 1)
- print("data written.")
-
- else:
- raise ValueError('ID is larger than text, maybe you inverted them.')
-
- else:
- raise ValueError('Incorrect variable type, expected two Pandas Series of same shape.')
-
- def write_annotations(self, df, text_id, word, label, start, end, overwrite = False):
-
- """
- write_annotations
- Send annotation data from the brat folder. Useful to pre-anotate some data.
- input :
- df, pd.Dataframe : dataframe containing annotations data, should contains the text id, the annotated word, the annotated label, the start and end offset.
- text_id, str : name of the column in df which contains the document id
- word, str : name of the column in df which contains the annotated word
- label, str : name of the column in df which contains the label of the annotated word
- start, str : name of the column in df which contains the start offset
- end, str : name of the column in df which contains the end offset
- overwrite, boolean : if True, the current annotation files are replaced by new data, otherwise, the new annotations are merged with existing one
- """
-
- # Checking data types
- if (type(df) == type(pd.DataFrame())):
-
- # Loading df
- df = df.rename(columns = {text_id:"id",word:"word",label:"label",start:"start",end:"end"})
- df["type_id"] = df.groupby("id").cumcount()+1
-
- # List of ids
- ids = df["id"].unique()
- # Loading current data
- current_annotation = self.read_annotation(ids)
- current_annotations = current_annotation["annotations"]
- tmaxDFAnnotations = current_annotations.set_index(["id"])["type_id"].str.slice(1,).astype(int).reset_index().groupby("id").max().rename(columns = {"type_id":"Tmax"})
-
- if (overwrite == True):
- df["type_id"] = "T"+df["type_id"].astype(str)
- new_annotations = df
- else:
- df = df.join(tmaxDFAnnotations, on = "id").fillna(0)
- df["type_id"] = "T"+(df["type_id"]+df["Tmax"]).astype(int).astype(str)
- df = df.drop(columns = ["Tmax"])
-
- new_annotations = pd.concat((current_annotations, df[self.emptyDFCols["annotations"]])).reset_index(drop = True)
-
- new_annotations.drop_duplicates() ## Removing duplicates
-
- # Injecting new annotations
- current_annotation["annotations"] = new_annotations
-
- # Calling write function
- self._write_annotation(current_annotation["annotations"], current_annotation["relations"])
-
- else:
- raise ValueError('Incorrect variable type, expected a Pandas DF.')
-
-
- def write_relations(self, df, text_id, relation, overwrite = False):
-
- # Checking data types
- if (type(df) == type(pd.DataFrame())):
-
- # Loading df
- df = df.rename(columns = {text_id:"id",relation:"relation"})
- df["type_id"] = df.groupby("id").cumcount()+1 # type_id
-
- # Columns names
- old_columns = df.columns[np.isin(df.columns, ["id", "relation","type_id"]) == False]
- new_columns = "Arg"+np.array(list(range(1,len(old_columns)+1))).astype(str).astype(object)
- df = df.rename(columns = dict(zip(old_columns, new_columns)))
-
- # List of ids
- ids = df["id"].unique()
- # Loading current data
- current_annotation = self.read_annotation(ids)
- current_relations = current_annotation["relations"]
- rmaxDFrelations = current_relations.set_index(["id"])["type_id"].str.slice(1,).astype(int).reset_index().groupby("id").max().rename(columns = {"type_id":"Rmax"})
- if (overwrite == True):
- df["type_id"] = "R"+df["type_id"].astype(str)
- new_relations = df
- else:
- df = df.join(rmaxDFrelations, on = "id").fillna(0)
- df["type_id"] = "R"+(df["type_id"]+df["Rmax"]).astype(int).astype(str)
- df = df.drop(columns = ["Rmax"])
-
- # Adding missing columns
- if (len(df.columns) > len(current_relations.columns)):
- for column in df.columns[np.isin(df.columns, current_relations.columns) == False]:
- current_relations[column] = np.nan
- else:
- for column in current_relations.columns[np.isin(current_relations.columns, df.columns) == False]:
- df[column] = np.nan
-
- new_relations = pd.concat((current_relations, df[current_relations.columns])).reset_index(drop = True)
-
- new_relations.drop_duplicates() ## Removing duplicates
-
- # Injecting new annotations
- current_annotation["relations"] = new_relations
-
- # Calling write function
- self._write_annotation(current_annotation["annotations"], current_annotation["relations"])
-
- else:
- raise ValueError('Incorrect variable type, expected a Pandas DF.')
-
- def _generate_annotations_str (self, annotations):
-
- annotations = annotations.reset_index(drop = True)
- annotations["label_span"] = annotations[["label","start","end"]].apply(lambda x: ' '.join(x.astype(str).values), axis = 1)
- annotations_str = '\n'.join(annotations[["type_id","label_span","word"]].apply(lambda x: '\t'.join(x.astype(str).values), axis = 1).values)
- return(annotations_str)
-
- def _generate_relations_str (self, relations):
-
-
- relations = relations.fillna('').applymap(lambda x: '' if x == 'nan' else x) #cleaning data
- columns = relations.columns[np.isin(relations.columns, ["id","type_id","relation"]) == False].values.tolist()
- boolmap = relations[columns].transpose().applymap(lambda x: int(x != ''))
- rct = relations[columns].transpose()
- temp_relations = (boolmap*(np.array(np.repeat(rct.index,rct.shape[1])).reshape(rct.shape)+':')+rct.astype(str)).transpose()
- relations_str = '\n'.join(relations[["type_id","relation"]].join(temp_relations[columns]).apply(lambda x: '\t'.join(x.values), axis = 1).values)
- return(relations_str)
-
- def _write_file(self, data):
- file = open(self.folder+str(data["id"])+".ann", "w")
- file.write(data["str_to_write"])
- file.close()
-
- def _write_annotation(self,annotations,relations):
-
- # Checking data types
- if (type(annotations) == type(pd.DataFrame()) and type(relations) == type(pd.DataFrame())):
-
- # Gerenating str
- data_annotations = annotations.groupby("id").agg(lambda x: self._generate_annotations_str(x)).iloc[:,0]
- data_relations = relations.groupby("id").agg(lambda x: self._generate_relations_str(x)).iloc[:,0]
- # Merging data
- data = pd.DataFrame({"annotations":data_annotations, "relations":data_relations}).fillna('')
- data["str_to_write"] = data.apply(lambda x : '\n'.join(x.values), axis = 1)
- data = data.reset_index().rename(columns = {"index":"id"})
-
- # Writting files
- data.apply(self._write_file, axis = 1)
-
- return(data)
-
- else:
- raise ValueError('Incorrect variable type, expected a Pandas DF.')
|