#!/usr/bin/env python3 import xml.etree.ElementTree as ET from nltk.tokenize import RegexpTokenizer from nltk.corpus import stopwords from nltk.metrics import edit_distance from pyxdameraulevenshtein import damerau_levenshtein_distance_ndarray, normalized_damerau_levenshtein_distance_ndarray import numpy as np import pandas as pd import string import re import os, sys sys.path.append('src/python/') import utils_proc import copy import collections import datetime # function to check whether a file containts discussions # achieved by excluding title pages, table of content, etc. # TODO: function works well for 1891 - 1900, not checked after that !!! def check_if_discussion(path_meta_xml_file, list_attributes = ['TITEL_NORMAL_DE', 'TITEL_NORMAL_FR', 'TITEL_ORIGINAL_DE', 'TITEL_ORIGINAL_FR'], list_nondiscussion = ['inhaltsverzeiGGchnis', 'inhaltsverzeichniss', 'jahresinhalt', 'rednerliste', 'jahres-rednerliste', 'umschlag', 'sachregister', 'titelblatt', 'numerierung'], list_nondiscussion2 = ['table', 'matières', 'répertoire', 'procès-verbaux']): # parse, get root and then part of interest XML_tree = ET.parse(path_meta_xml_file) XML_root = XML_tree.getroot() XML_poi = XML_root[0].find('ADS_TEXTEINHEIT') # for each title attribute for attribute in list_attributes: # if xml contains this attribute if attribute in XML_poi.attrib: # get title and generate set with lower case terms title = XML_poi.attrib[attribute] set_title = set([term.lower() for term in title.split()]) #print(set_title) # if one of terms is in list_nondiscussion, return False if set_title.intersection(set(list_nondiscussion)): #print('NOOO', path_meta_xml_file) return False # if two terms are in list_nondiscussion2, also return False if len(set_title.intersection(set(list_nondiscussion2))) > 1: #print('NOOO', path_meta_xml_file) return False return True # function to get date and council def get_document_title_(path_meta_xml_file, list_attributes): # parse, get root and then part of interest XML_tree = ET.parse(path_meta_xml_file) XML_root = XML_tree.getroot() XML_poi = XML_root[0].find('ADS_TEXTEINHEIT') # get titles list_titles = [] for attribute in list_attributes: if attribute in XML_poi.attrib: title = XML_poi.attrib[attribute] list_titles.append(title) else: list_titles.append('(empty)') return list_titles # function to get date and council def get_council_and_date(path_meta_xml_file): # parse, get root and then part of interest XML_tree = ET.parse(path_meta_xml_file) XML_root = XML_tree.getroot() XML_poi = XML_root[0] # get council and date str_council = XML_poi.find('META_FROM_DB').attrib['RAT'] str_date = XML_poi.attrib['PUBLIKATIONS_DATUM'] return (str_council, str_date) # helper function to get text without font information # example for font information: [font face="11.718" size="Times-Roman"] sometext [/font] # input: # - sometext: string # output: # - newtext: modified string def get_text(sometext): # initialize newtext = '' # find text between font information for text in re.findall('\].*?\[',sometext): #print(text) if text.startswith(']') and text.endswith('['): newtext += text[1:-1] #print(newtext) return newtext # function to annotated corrected XML def get_annotated_xml(XML_root, df_lastnames, list_notnames, str_council, str_date, name_txt, entries, bln_print=False): # list of votation terms list_votationterms = ['Abstimmung', 'Schlussabstimmung', 'Generalabstimmung', 'Angenommen', 'Abgelehnt', 'Einverstanden', 'Stimmen', '(Eintreten)', '(Nichteintreten)', 'Votation', 'Vote', 'votation', '(Adoptés)', 'adoptés', 'adoptée', 'rejetée', "D'accord", 'voix'] # list of stopwords list_stopwords = stopwords.words('german') list_stopwords.extend(['dass', 'resp', 'daran', 'dr', 'herr', 'herrn', 'hr']) list_stopwords.extend(stopwords.words('french')) list_stopwords.extend(['ils', 'les', 'celle']) # create new XML as a copy of the corrected one XML_new = copy.deepcopy(XML_root) ind_last_page = len(XML_root) - 1 # initialize flags to distinguish speeches from votes this_is_speech = False prev_is_speech = False this_is_vote = False # for every page for ind_p, page in enumerate(XML_root): if bln_print: print(page.tag, page.attrib) # for every textbox on that page for ind_t, textbox in enumerate(page): # specify start and end of document try: if ind_p == 0 and entries[0] == int(textbox.attrib['id']): XML_new = label_docstartend(XML_new, ind_p, ind_t, 'doc_start') with open(name_txt, 'a') as f: f.write(' '.join(('<<<=====================', 'the document starts here', '\n\n'))) if ind_p == ind_last_page and entries[1] == int(textbox.attrib['id']): XML_new = label_docstartend(XML_new, ind_p, ind_t, 'doc_end') with open(name_txt, 'a') as f: f.write(' '.join(('=====================>>>', 'the document ends here', '\n\n'))) except KeyError: pass if (textbox.tag == 'textbox'): if 'type_textbox' in textbox.attrib.keys(): if (textbox.attrib['type_textbox'] == 'text'): if bln_print: print(textbox.tag, textbox.attrib) # get complete text of that textbox complete_text, ind_tl_colon = get_complete_text(textbox) if bln_print: print(complete_text[:100]) # identify and label language in XML dict_lang = identify_language(complete_text) XML_new = label_language(XML_new, ind_p, ind_t, dict_lang) # get texttype of that textbox by majority vote # TODO add that type to XML textbox_texttype = get_textbox_type(textbox) if bln_print: print(textbox_texttype) if textbox_texttype in ['text_col1', 'text_col2']: XML_new, this_is_speech = label_speechstart(XML_new, ind_p, ind_t, complete_text, ind_tl_colon, df_lastnames, list_stopwords, list_notnames, str_council, str_date, name_txt, bln_print=False) if this_is_speech: prev_is_speech = True continue XML_new, this_is_vote = label_votations(XML_new, ind_p, ind_t, complete_text, list_votationterms, name_txt, bln_print=False) if this_is_vote: prev_is_speech = False continue if prev_is_speech and (not this_is_vote): XML_new = label_speechcont(XML_new, ind_p, ind_t) return XML_new # helper function to get type of textbox_type # corresponds to majority vote of types of textlines # input: # - textbox # output: # - textbox_type: string def get_textbox_type(textbox): # initialize empty dictionary dict_type = {} # for every textline in that textbox for ind_tl, textline in enumerate(textbox): if textline.tag == 'textline': # count types if textline.attrib['type'] not in dict_type.keys(): dict_type[textline.attrib['type']] = 1 else: dict_type[textline.attrib['type']] += 1 # list of all types with maximum count list_types = [type for type, count in dict_type.items() if count == max(dict_type.values())] # if only one with maximum value if len(list_types) == 1: textbox_type = list_types[0] # if several with same maximum value else: textbox_type = 'notdistinct' return textbox_type # helper function to get complete text of a textbox # input: # - textbox # output: # - complete_text: string # - ind_tl_colon: index of textline with colon (needed for label speech start) def get_complete_text(textbox): # helper function to get text without font information def get_text(sometext): newtext = '' for text in re.findall('\].*?\[',sometext): #print(text) if text.startswith(']') and text.endswith('['): newtext += text[1:-1] #print(newtext) return newtext # initialize empty string complete_text = '' # initialize index of textline colon to impossible value ind_tl_colon = -1 # for every textline in that textbox for ind_tl, textline in enumerate(textbox): if textline.tag == 'textline': # get that text thattext = get_text(textline.text) # append that text to string complete_text += thattext # in first two textlines of textbox, check for colon if ind_tl < 2: if ':' in thattext: ind_tl_colon = ind_tl return complete_text, ind_tl_colon # function to label speech starts # input: # - text: string to be analyzed # - df_names: dataframe of politicians # - list_stopwords: list of german and french stopwords # - bln_print: whether to print during execution, default False # output: # - (str_name, str_role, list_uniqueID, str_canton): tuple with strings and ID def label_speechstart(XML_new, ind_p, ind_t, text, ind_tl_colon, df_names, list_stopwords, list_notnames, str_council, str_date, name_txt, bln_print=False): # lists of roles list_roles = ['Präsident', 'Präsidentin', 'Vizepräsident', 'Präsidium', 'Président', 'Présidente', 'président', 'présidente', 'Berichterstatter', 'Berichterstatterin', 'rapporteur', 'Sprecher', 'Sprecherin', 'porte-parole', 'porteparole', 'Bundesrat', 'Bundesrath', 'Bundesrätin', 'conseiller', 'fédéral', 'fédéral', 'Bundespräsident'] list_roles_ext = ['Mehrheit', 'Minderheit', 'majorité', 'minorité', 'deutscher', 'deutsche', 'français', 'française', 'Kommission', 'commission'] # initialize flag this_is_speech = False flag_print = False # font text end fontend = '[/font]' # very consistently, a speaker can be identified by looking for a colon # at the beginning of a textbox and identifiying a name or a role in front # of that colon if ind_tl_colon >= 0: # extract the index of the colon in the text colon_index_text = text.index(':') # look at first few terms of that textbox text_start = re.sub(r'[\(\)]','',text[:colon_index_text]) list_oi = tokenizer.tokenize(text_start) list_oi_full = list_oi if bln_print: print('possible speech start: ', list_oi) # to avoid false positives, the number of elements in list_oi is checked # - if it is too long, it is part of a speech and not a speech start # - for intermediate lengths between 5 and 8, it can either be a speech # start (if it contains a role) or part of a speech # - short lengths typically indicate a speech start, but not always. These # false positives cannot be avoided with this procedure. if (len(list_oi) < 9): if (len(list_oi) < 5) or (len(set(list_oi).intersection(list_roles)) > 0): # remove stopwords list_oi = [term for term in list_oi if term.lower() not in list_stopwords] # remove punctuation list_oi = [''.join(c for c in s if c not in string.punctuation) for s in list_oi] list_oi = [s for s in list_oi if s] # remove lower case terms # list_oi = [term for term in list_oi if not term.islower()] # remove numbers list_oi = [term for term in list_oi if not term.isdigit()] # remove single characters # TODO: might need to be changed for fractions (some fractions are abbreviated as single letters) # TODO: needs to be changed to include 'I' for Minderheit I 1891/20000093 # TODO: maybe exclude I and A to account for Appenzell list_oi = [term for term in list_oi if len(term)>1] # if possible, find a name from the list str_name, str_role, list_uniqueID, str_canton = find_names(list_oi, list_roles, list_roles_ext, df_names, list_notnames, str_council, str_date, bln_print=False) # get rid of role Bundesrat with no name associated to it # helps to reduce false positives # TODO: might lead to false negatives, i.e. if a person was not identified by its name but is referenced as federal council if str_role == 'federalcouncil' and str_name == '': str_role = '' # get rid of 'Präsident stimmt nicht Président ne vote pas' if set(str_role.split()).intersection(set(['Präsident', 'Präsidentin', 'Président', 'Présidente'])) and not str_name: if set(['stimmt', 'nicht', 'vote', 'pas']).intersection(list_oi): if bln_print: print('get rid of Präsident stimmt nicht, Président ne vote pas', list_oi) str_role = '' # get rid of 'Für den Antrag "Name" stimmen: Votent pour la proposition "Name":' if str_name: if len(set(['Antrag', 'stimmen', 'Votent', 'proposition']).intersection(list_oi)) > 1: if bln_print: print('get rid of Für den Antrag <Name> stimmen: Votent pour la proposition <Name>:', list_oi) str_name = '' with open(name_txt, 'a') as f: f.write(' '.join(('page', str(ind_p + 1), str(list_oi), '\n'))) f.write(' '.join(('name:', str_name, '\n'))) f.write(' '.join(('role:', str_role, '\n'))) f.write(' '.join(('uniqueID(s):', str(list_uniqueID), '\n'))) f.write(' '.join(('text:', text[colon_index_text+1:colon_index_text+100], '\n\n'))) # if a name has been found, add it to XML_new if str_name or str_role: # add attribute speech_start to textbox XML_new[ind_p][ind_t].attrib['text_type'] = 'speech_start' # add speaker as attribute to first textline XML_new[ind_p][ind_t][0].attrib['speaker'] = (str_name, str_role, list_uniqueID, str_canton) # update text of XML (speaker is on first line, actual speech start on second line of speech_start textbox) # if colon is on first line if ind_tl_colon == 0: # get text of that line and colon index thattext = XML_new[ind_p][ind_t][0].text colon_index = thattext.index(':') try: # write speaker to first line XML_new[ind_p][ind_t][0].text = thattext[:colon_index+1] + fontend # get start of speech with correct font start if thattext[colon_index+1:].startswith('[font'): startspeech = thattext[colon_index+1:] elif re.match('^[ ]?\[/font\]$', thattext[colon_index+1:]): startspeech = '' elif re.match('^[ ]?\[/font\]', thattext[colon_index+1:]): startspeech = thattext[colon_index+8:] else: startspeech = thattext[colon_index+1:] # write beginning of speech to second line # (create new ET element if necessary) if len(list(XML_new[ind_p][ind_t])) > 1: XML_new[ind_p][ind_t][1].text = startspeech + ' ' + XML_new[ind_p][ind_t][1].text else: XML_new[ind_p][ind_t].append(copy.deepcopy(XML_new[ind_p][ind_t][0])) XML_new[ind_p][ind_t][1].attrib.pop('speaker') XML_new[ind_p][ind_t][1].text = startspeech except: print('error in self.input_file when splitting speaker') #print(thattext) #print(len(list(XML_new[ind_p][ind_t]))) #print(list(XML_new[ind_p][ind_t])) #print(XML_new[ind_p][ind_t]) #print('gefundener Name:', str_name, str_role) pass # if colon is on second line if ind_tl_colon == 1: # get text of that line and colon index thattext = XML_new[ind_p][ind_t][1].text colon_index = thattext.index(':') # get start of speech with correct font start if thattext[colon_index+1:].startswith('[font'): startspeech = thattext[colon_index+1:] elif re.match('^[ ]?\[/font\]$', thattext[colon_index+1:]): startspeech = '' elif re.match('^[ ]?\[/font\]', thattext[colon_index+1:]): startspeech = thattext[colon_index+8:] else: startspeech = thattext[colon_index+1:] # write speaker to first line XML_new[ind_p][ind_t][0].text = XML_new[ind_p][ind_t][0].text + ' ' + thattext[:colon_index+1] + fontend # write beginning of speech to second line XML_new[ind_p][ind_t][1].text = startspeech # set flag this_is_speech = True if bln_print: print('found a name:', text_start, list_oi, ind_tl_colon, str_name, str_role, list_uniqueID, '\n') return XML_new, this_is_speech # function to extract votation paragraphs # !!! error prone, possible improvements see notebook extract_discussions # input: # - XML_new: # - text: string # - list_votationterms: list of votation terms # - bln_print: whether to print during execution, default False # output: # - XML_new: updated def label_votations(XML_new, ind_p, ind_t, text, list_votationterms, name_txt, bln_print=True): # get first terms of that text list_oi = tokenizer.tokenize(text)[:15] # if len(set(list_oi).intersection(set(list_votationterms))) > 1: # if there is an overlap with typical votation terms: if set(list_oi).intersection(set(list_votationterms)): # add attribute vote to textbox XML_new[ind_p][ind_t].attrib['text_type'] = 'vote' # set flag this_is_vote = True with open(name_txt, 'a') as f: f.write(' '.join(('page', str(ind_p + 1), text, '\n'))) f.write(' '.join(('is a vote', '\n\n'))) if bln_print: print('found a vote:', list_oi) else: #pass # set flag this_is_vote = False if bln_print: print('not a vote', list_oi) return XML_new, this_is_vote # function to label continuation of speech # only adds label to corresponding textbox def label_speechcont(XML_new, ind_p, ind_t): XML_new[ind_p][ind_t].attrib['text_type'] = 'speech_cont' return XML_new # function to label start and end of document # only adds label to corresponding textbox # type_ is either 'doc_start' or 'doc_end' def label_docstartend(XML_new, ind_p, ind_t, type_): XML_new[ind_p][ind_t].attrib[type_] = 'here' return XML_new # helper function to flatten nested irregular list def flatten(l): for el in l: if isinstance(el, collections.Iterable) and not isinstance(el, (str, bytes)): yield from flatten(el) else: yield el # function to find names # input: # - term: term that might be name # - df_names: yearly dataframe with all MPs # - str_name: string to which name should be attached # - list_uniqueID: list with one or several uniqueIDs # - list_tupels: list of tupels containing all types of names def find_names(list_oi, list_roles, list_roles_ext, df_names, list_notnames, str_council, str_date, bln_print=False): def get_string(term, df_names, str_name, list_uniqueID): # get name type name_type = df_names['nameType'].loc[df_names['shortName']==term].iloc[0] if bln_print: if name_type != 'simple': print(df_names[df_names['shortName']==term]) print(term, name_type) # extract uniqueID and complete name for this term list_temp = [] if name_type in ['simple', 'double', 'comp']: list_temp = [df_names.loc[(df_names['nameType']==name_type) & (df_names['shortName']==term)].iat[0, df_names.columns.get_loc('uniqueIndex')]] str_completeName = df_names['completeName'].loc[df_names['shortName']==term].iloc[0] # TODO: how to handle for people mentioned in text??? elif name_type in ['canton']: list_temp = list(df_names.loc[(df_names['shortName']==term)].iloc[:, df_names.columns.get_loc('uniqueIndex')]) str_completeName = term + ' (CANTON MISSING)' if bln_print: print(list_temp, str_completeName) # set or update unique ID and name # if no unique ID and name has been assigned so far if len(list_uniqueID) == 0 and str_name == '': list_uniqueID = list_temp str_name = add_to_string(str_name, str_completeName) # if there are already one or several peope else: # if it is a double name, if name_type == 'double': if list_uniqueID == list_temp: # do nothing if person has already been found pass else: # check whether we found a person with the same first part of the double lastname # and overwrite if this is the case # e.g. if we found a Meyer before we found a Meyer-Boller, e.g. 1971/20000010 if str_completeName.split('-')[0] == str_name.split(' ')[0]: list_uniqueID = list_temp str_name = add_to_string('', str_completeName) # if we have a new person, we append elif len(set(list_temp).intersection(set(flatten(list_uniqueID)))) != 0: list_uniqueID.append(list_temp) str_name = add_to_string(str_name, str_completeName) return str_name, list_uniqueID, name_type def update_list_uniqueID(list_uniqueID, list_temp, name_type): # if name_type is canton, we override other entries by correct one if name_type == 'canton' and len(list_temp) == 1 and list_temp[0] in list_uniqueID: list_uniqueID = list_temp return list_uniqueID # function to find correct term (in case of misspellings, etc.) def get_approximate_term(term, array_all): # TODO: probably need to improve this procedure # - find better values .... # initialize string term_approx = '' # get normalized array array_normalized = array_all[normalized_damerau_levenshtein_distance_ndarray(term, array_all) <= 0.35] array_normalized_values = normalized_damerau_levenshtein_distance_ndarray(term, array_normalized) # get absolute array array_absolute = array_all[damerau_levenshtein_distance_ndarray(term, array_all) <= 2] array_absolute_values = damerau_levenshtein_distance_ndarray(term, array_absolute) if bln_print: print(term) print(array_normalized, array_normalized_values) print(array_absolute, array_absolute_values) # intersection set_intersection = set(array_normalized).intersection(set(array_absolute)) # if a similar name was found if len(set_intersection) == 1: term_approx = list(set_intersection)[0] # or several elif len(set_intersection) > 1: # !!! we only look at normalized values # !!! we don't account for names with same values !!! array_min = array_normalized[array_normalized_values.argmin()] term_approx = array_min if bln_print: print('we found several possible names', set_intersection, 'and choose', array_min) return term_approx # small function to add term to str_name def add_to_string(string, term): if not string: string = term elif string == term == 'federalcouncil': pass else: string += ' ' + term return string # initialize strings and ID str_name = '' str_role = '' list_uniqueID = [] str_canton = '' name_type = '' str_firstname = '' # extract lists and arrays of names list_all_names = list(df_names['shortName']) array_all_names = np.array(df_names['shortName']) list_all_firstnames = list(df_names['FirstName']) # for every term for term in list_oi: term_approx_role = get_approximate_term(term, np.array(list_roles)) if term in list_roles or term_approx_role: # update str_role # TODO: also look for similar terms (misspellings) # TODO: what with Bundespräsident? # TODO: is Berichterstatter the same as Sprecher? if term_approx_role: term_ = term_approx_role else: term_ = term # assign role in English if term_ in ['Präsident', 'Präsidentin', 'Präsidium', 'Président', 'Présidente', 'président', 'présidente']: str_assignedRole = 'president' elif term_ in ['Vizepräsident']: str_assignedRole = 'vice-president' elif term_ in ['Berichterstatter', 'Berichterstatterin', 'rapporteur', 'Sprecher', 'Sprecherin', 'porte-parole', 'porteparole']: str_assignedRole = 'reporter' elif term_ in ['Bundesrat', 'Bundesrath', 'Bundesrätin', 'conseiller', 'fédéral', 'Bundespräsident', 'Bundespräsidentin']: str_assignedRole = 'federalcouncil' str_council = 'Bundesrat' # needs to be German to be used in dataframe # update str_role str_role = add_to_string(str_role, str_assignedRole) if bln_print: print('found a role', term, str_assignedRole) elif term in list_roles_ext: str_assignedRole = '' # get more details on reporter # TODO: could be refined for Minderheit I, II, III, etc... # TODO: add italian if term in ['Mehrheit', 'majorité']: str_assignedRole = 'majority' elif term in ['Minderheit', 'minorité']: str_assignedRole = 'minority' elif term in ['deutscher', 'deutsch', 'allemand', 'allemande']: str_assignedRole = 'German' elif term in ['français', 'française', 'französischer', 'französische']: str_assignedRole = 'French' # update str_role str_role = add_to_string(str_role, str_assignedRole) # cannot happen for the first term in list_oi elif name_type == 'canton': list_cantons = get_list_cantons(df_names, str_name.split(' ')[0], str_council, str_firstname) canton_type = '' for list_, type_ in list_cantons: if term in list_: str_canton = term canton_type = type_ if bln_print: print('!!! is a canton', term, list_oi, str_name, str_role) break # if person was not uniquely identified, check for misspellings if not canton_type: # look for similar names based on (normalized) Damerau-Levenshtein distance # only look at cantonname, citizenship, firstname and additionalinfo list_cantons_approx = [list_cantons[i] for i in (0, 2, 3, 4)] for list_, type_ in list_cantons_approx: term_approx = get_approximate_term(term, np.array(list_)) if term_approx: str_canton = term_approx canton_type = type_ if bln_print: print('!!! is a canton', term, list_oi, str_name, str_role) break # if a canton or similar was found if canton_type: # get rid of CANTON MISSING str_name = str_name.split(' ')[0] df_temp = get_df_temp(df_names, str_name, str_council) # extract uniqueID # if Citizenship, get list of cities and compare each to term if canton_type == 'Citizenship': # get list of cities list_cities = [entry for entry in df_temp[canton_type] if str_canton in get_cities([entry])] str_citizenship = '' if len(list_cities) == 1: str_citizenship = list_cities[0] list_temp = list(df_temp.loc[(df_temp['shortName']==str_name) & (df_temp[canton_type]==str_citizenship)].iloc[:, df_temp.columns.get_loc('uniqueIndex')]) str_completeName = df_temp['completeName'].loc[(df_temp['shortName']==str_name) & (df_temp[canton_type]==str_citizenship)].iloc[0] elif len(list_cities) > 1: print('found more than one person with citizenship', str_canton, str_name, list_cities) # TODO what happens with these:? list_temp = list(df_names.loc[(df_names['shortName']==str_name)].iloc[:, df_names.columns.get_loc('uniqueIndex')]) str_completeName = str_name + ' (CANTON MISSING)' else: print('found no person with citizenship', str_canton, str_name, list_cities) else: if bln_print: print(canton_type, str_canton, str_name, df_temp) list_temp = list(df_temp.loc[(df_temp['shortName']==str_name) & (df_temp[canton_type]==str_canton)].iloc[:, df_temp.columns.get_loc('uniqueIndex')]) str_completeName = df_temp['completeName'].loc[(df_temp['shortName']==str_name) & (df_temp[canton_type]==str_canton)].iloc[0] if bln_print: print(list_temp, list_uniqueID, str_completeName) if len(list_temp) > 0: list_uniqueID = update_list_uniqueID(list_uniqueID, list_temp, name_type) if bln_print: print(str_completeName) if 'CANTON MISSING' in str_completeName: str_name = add_to_string('', str_completeName) elif str_completeName.split(' ')[0] == str_name: str_name = add_to_string('', str_completeName) else: str_name = add_to_string(str_name, str_completeName) else: if bln_print: print('could not be identified as a canton:', term, list_oi, str_name, str_role) # if term is first name # needed when people are referenced by FirstName LastName, e.g. Simon Kohler elif term in list_all_firstnames: str_firstname = term if bln_print: print('found a first name', str_firstname) # if term is not easily mistaken as a name (avoid false positives) elif term not in list_notnames: # if term is in the list of all names if term in list_all_names: # if term is not in str_name already, e.g. if second part of double name is also a name # e.g. 1952/20035242 Widmer-Kunz (it is the same as Widmer) # TODO: maybe also add to term_approx?? if term not in str_name: # get correct name and uniqueID, or role, for that term str_name, list_uniqueID, name_type = get_string(term, df_names, str_name, list_uniqueID) # if term is not in list_all_names else: # look for similar names based on (normalized) Damerau-Levenshtein distance term_approx = get_approximate_term(term, array_all_names) # if one was found, get correct name, etc. if term_approx: str_name, list_uniqueID, name_type = get_string(term_approx, df_names, str_name, list_uniqueID) if bln_print: print('=== approximate name', str_name, term_approx) # initial checks for not uniquely identified peoples # TODO check for false positives of these procedures if name_type == 'canton': # check if person can be identified from firstname if str_firstname: df_temp = df_names.loc[(df_names['shortName']==str_name.split(' ')[0]) & (df_names['FirstName']==str_firstname)] if df_temp.shape[0] == 1: list_temp = list(df_temp.loc[(df_temp['shortName']==str_name.split(' ')[0]) & (df_temp['FirstName']==str_firstname)].iloc[:, df_temp.columns.get_loc('uniqueIndex')]) str_completeName = df_temp['completeName'].loc[(df_temp['shortName']==str_name.split(' ')[0]) & (df_temp['FirstName']==str_firstname)].iloc[0] list_uniqueID = update_list_uniqueID(list_uniqueID, list_temp, name_type) if str_completeName.split(' ')[0] == str_name.split(' ')[0] or str_completeName.split(' ')[1] == str_name.split(' ')[0]: str_name = add_to_string('', str_completeName) else: str_name = add_to_string(str_name, str_completeName) # check if person can be identified from council df_temp = df_names.loc[(df_names['nameType']==name_type) & (df_names['shortName']==str_name.split(' ')[0]) & (df_names['CouncilName']==str_council)] if df_temp.shape[0] == 1: list_temp = list(df_temp.loc[(df_temp['shortName']==str_name.split(' ')[0]) & (df_temp['CouncilName']==str_council)].iloc[:, df_temp.columns.get_loc('uniqueIndex')]) str_completeName = df_temp['completeName'].loc[(df_temp['shortName']==str_name.split(' ')[0]) & (df_temp['CouncilName']==str_council)].iloc[0] list_uniqueID = update_list_uniqueID(list_uniqueID, list_temp, name_type) if str_completeName.split(' ')[0] == str_name.split(' ')[0] or str_completeName.split(' ')[1] == str_name.split(' ')[0]: str_name = add_to_string('', str_completeName) else: str_name = add_to_string(str_name, str_completeName) else: # check if person can be identified from date of discussion # exclude people that joined after date of discussion df_temp_before = df_temp[pd.to_datetime(df_temp['DateJoining'], format='%d.%m.%Y') <= datetime.datetime.strptime(str_date, '%Y-%m-%d %H:%M')] if df_temp_before.shape[0] == 1: list_temp = list(df_temp_before['uniqueIndex']) str_completeName = df_temp_before['completeName'].iloc[0] list_uniqueID = update_list_uniqueID(list_uniqueID, list_temp, name_type) if str_completeName.split(' ')[0] == str_name.split(' ')[0]: str_name = add_to_string('', str_completeName) else: str_name = add_to_string(str_name, str_completeName) # exclude people that left before date of discussion df_temp_after = df_temp[pd.to_datetime(df_temp['DateLeaving'], format='%d.%m.%Y') >= datetime.datetime.strptime(str_date, '%Y-%m-%d %H:%M')] if df_temp_after.shape[0] == 1: list_temp = list(df_temp_after['uniqueIndex']) str_completeName = df_temp_after['completeName'].iloc[0] list_uniqueID = update_list_uniqueID(list_uniqueID, list_temp, name_type) if str_completeName.split(' ')[0] == str_name.split(' ')[0]: str_name = add_to_string('', str_completeName) else: str_name = add_to_string(str_name, str_completeName) if bln_print: print(str_date, df_temp.shape, df_temp_before.shape, df_temp_after.shape) # TODO: function to update list unique ID and str_name # if a federal council is referenced as "Name Bundesrat", it is not found by the existing procedure if str_council == 'Bundesrat' and 'CANTON MISSING' in str_name: # check if person can be identified from council df_temp = df_names.loc[(df_names['nameType']==name_type) & (df_names['shortName']==str_name.split(' ')[0]) & (df_names['CouncilName']==str_council)] if df_temp.shape[0] == 1: list_temp = list(df_names.loc[(df_names['nameType']==name_type) & (df_names['shortName']==str_name.split(' ')[0]) & (df_names['CouncilName']==str_council)].iloc[:, df_names.columns.get_loc('uniqueIndex')]) str_completeName = df_names['completeName'].loc[(df_names['nameType']==name_type) & (df_names['shortName']==str_name.split(' ')[0]) & (df_names['CouncilName']==str_council)].iloc[0] list_uniqueID = update_list_uniqueID(list_uniqueID, list_temp, name_type) if str_completeName.split(' ')[0] == str_name.split(' ')[0] or str_completeName.split(' ')[1] == str_name.split(' ')[0]: str_name = add_to_string('', str_completeName) else: str_name = add_to_string(str_name, str_completeName) return str_name, str_role, list_uniqueID, str_canton # two functions for language identification # Author: Luis Salamanca # small modifications by Lili Gasser # Using stopwords # input: # - text: string # - valid_lang: tuple of valid languages # output: # - dict_language_counts: dictionary of stopword counts for each valid language def identify_language(text, valid_lang = ('german', 'french', 'italian')): # tokenize tokens = text.split(' ') # all lowercase test_words = [word.lower() for word in tokens] # make a set test_words_set = set(test_words) # initialize dictionary of language elements dict_language_counts = {} # iterate through languages of stopwords for language in stopwords.fileids(): if language in valid_lang: # get stopword set stopwords_set = set(stopwords.words(language)) # get intersection between text of interest and stopword set for this language common_elements = test_words_set.intersection(stopwords_set) # save number of common elements to dictionary dict_language_counts[language] = len(common_elements) return dict_language_counts # Simply, given the number of ocurrences of the stopwords, it assigns a label # to a specific textbox, also considering the possibility of textboxes # mixing languages. For this case, the value ratio_similar is intended # input: # - XML_new: XML file to update # - aux_dict_l: corresponds to dict_language_counts # output: # - lang_max: string def label_language(XML_new, ind_p, ind_t, aux_dict_l): # specify a similarity ratio ratio_similar = 0.8 # if there are counts, determine language if sum(aux_dict_l.values()): aux_dict_l_norm = {k: v / total for total in (sum(aux_dict_l.values()),) for k, v in aux_dict_l.items()} lang_max_aux = max(aux_dict_l_norm.keys(), key=(lambda key: aux_dict_l_norm[key])) lang_max = '' count_l = 0 for lang in aux_dict_l_norm.keys(): if (aux_dict_l_norm[lang] > aux_dict_l_norm[lang_max_aux] * ratio_similar): if count_l > 0: lang_max += '_' lang_max += lang count_l += 1 if count_l > 1: lang_max = 'mixed_' + lang_max else: lang_max = 'languageNotIdentified' # add attribute to textbox XML_new[ind_p][ind_t].attrib['language'] = lang_max return XML_new def get_cities(list_citizenship): return [city[:-5] for item in list_citizenship for city in item.split(',')] def get_df_temp(df_names, str_name, str_council = '', str_firstname = ''): if str_firstname: if str_council in ['Nationalrat', 'Ständerat', 'Bundesrat']: df_temp = df_names.loc[(df_names['shortName']==str_name) & (df_names['FirstName']==str_firstname) & (df_names['CouncilName']==str_council)] else: df_temp = df_names.loc[(df_names['shortName']==str_name) & (df_names['FirstName']==str_firstname)] else: if str_council in ['Nationalrat', 'Ständerat', 'Bundesrat']: df_temp = df_names.loc[(df_names['shortName']==str_name) & (df_names['CouncilName']==str_council)] else: df_temp = df_names.loc[(df_names['shortName']==str_name)] return df_temp # function to get list of places def get_list_cantons(df_names, str_name, str_council = '', str_firstname = ''): # specify strings as they are used in Ratsmitglieder_1848_DE_corr.xlsx and therefore in df_names str_CantonName = 'CantonName' str_CantonAbbreviation = 'CantonAbbreviation' str_Citizenship = 'Citizenship' str_FirstName = 'FirstName' str_additionalInfo = 'additionalInfo' str_additionalInfo2 = 'additionalInfo2' str_additionalInfo3 = 'additionalInfo3' # get dataframe df_temp = get_df_temp(df_names, str_name, str_council, str_firstname) # list of cantons list_cantonname = list(df_temp[str_CantonName]) # list of canton abbreviations list_cantonabbr = list(df_temp[str_CantonAbbreviation]) # list of citizenships list_citizenship = list(df_temp[str_Citizenship]) list_citizenship = get_cities(list_citizenship) # list of first names list_firstname = list(df_temp[str_FirstName]) # list of additional information list_additionalInfo = list(df_temp[str_additionalInfo]) list_additionalInfo2 = list(df_temp[str_additionalInfo2]) list_additionalInfo3 = list(df_temp[str_additionalInfo3]) # generate list of cantons including string # additionalinfo should be before citizenship (helps to find people which have same citizenship but a specified addtionalinfo) list_cantons = [(list_cantonname, str_CantonName), (list_cantonabbr, str_CantonAbbreviation), (list_additionalInfo, str_additionalInfo), (list_additionalInfo2, str_additionalInfo2), (list_additionalInfo3, str_additionalInfo3), (list_citizenship, str_Citizenship), (list_firstname, str_FirstName), ] return list_cantons # tokenizer tokenizer = RegexpTokenizer(r'\w+(?:-/w+)*|\$[\d\.]+') # functions from hf_extractdiscussions # ============================================================================== # TODO: check whether they still are needed # function to exclude overlapping textboxes between documents # input: # - dict_text: dictionary of texts of one document # - dict_overlaps_year: dictionary with overlaps # output: # - dict_text: modified dict_text def exclude_overlaps(dict_text, dict_overlaps): # initialize to impossible values first_entry = -1 last_entry = 1000 # get index of textbox from first and last page # the overlap dictionary only contains an entry, if an overlap was detected for entry, array in dict_overlaps.items(): if entry == 'first': first_entry = int(array[0]) if entry == 'last': last_entry = int(array[0]) # get list of keys for first and last page list_first_page = [key for key in dict_text if key.split(',')[1] == '0'] last_page = max([int(key.split(',')[1]) for key in dict_text]) list_last_page = [key for key in dict_text if key.split(',')[1] == str(last_page)] # modify dict_text on first page for key in list_first_page: if int(key.split(',')[2]) < first_entry: dict_text[key] = '' # ... and on last page for key in list_last_page: if int(key.split(',')[2]) > last_entry: dict_text[key] = '' return dict_text # small function to get first item of tupels in a list def get_first_item(list_tupels): list_first_item = [tupel[0] for tupel in list_tupels] return list_first_item # small function to get last two items of tupels in a list def get_last_item(list_tupels): list_last_item = [tupel[-2:] for tupel in list_tupels] return list_last_item def tokenize_dictionary(dictionary, tokenizer, only_text=False): dictionary_tokenized = {} # if there is only text, e.g. when we look at all texts of a document at once (level 2 in flattened dictionary) if only_text: for key, text in dictionary.items(): dictionary_tokenized[key] = tokenizer.tokenize(text) # if the values are actually tuples (speaker, text), e.g. when a document corresponds to what one person said else: for key, text in dictionary.items(): dictionary_tokenized[key] = (text[0], tokenizer.tokenize(text[1])) return dictionary_tokenized # value of dictionary needs to be tokenized !!!! def remove_stopwords_from_dictionary(dictionary, list_stopwords, only_text=False): dict_docs_afterswr = {} # if there is only text, e.g. when we look at all texts of a document at once (level 2 in flattened dictionary) if only_text: for doc, text in dictionary.items(): list_text = text list_words_tokenized = [word for word in list_text if word.lower() not in list_stopwords] dict_docs_afterswr[doc] = ' '.join(list_words_tokenized) # if the values are actually tuples (speaker, text), e.g. when a document corresponds to what one person said else: for doc, tupel in dictionary.items(): before_colon = tupel[0] list_text = tupel[1] list_words_tokenized = [word for word in list_text if word.lower() not in list_stopwords] dict_docs_afterswr[doc] = (before_colon, ' '.join(list_words_tokenized)) return dict_docs_afterswr def dict_only_text(dictionary): dictionary_only_text = {} for key, tupel in dictionary.items(): dictionary_only_text[key] = tupel[1] return dictionary_only_text