| import os | |
| import zipfile | |
| from configuration import get_aida_yago_tsv_file_path, get_resources_dir | |
| TRAIN_START_LINE = "-DOCSTART- (1 EU)" | |
| TESTA_START_LINE = "-DOCSTART- (947testa CRICKET)" | |
| TESTB_START_LINE = "-DOCSTART- (1163testb SOCCER)" | |
| CANONICAL_REDIRECTS = None | |
| class AnnotationRecord: | |
| def __init__(self, line): | |
| """ | |
| Lines with tabs are tokens the are part of a mention: | |
| - column 1 is the token | |
| - column 2 is either B (beginning of a mention) or I (continuation of a mention) | |
| - column 3 is the full mention used to find entity candidates | |
| - column 4 is the corresponding YAGO2 entity (in YAGO encoding, i.e. unicode characters are backslash encoded and spaces are replaced by underscores, see also the tools on the YAGO2 website), OR --NME--, denoting that there is no matching entity in YAGO2 for this particular mention, or that we are missing the connection between the mention string and the YAGO2 entity. | |
| - column 5 is the corresponding Wikipedia URL of the entity (added for convenience when evaluating against a Wikipedia based method) | |
| - column 6 is the corresponding Wikipedia ID of the entity (added for convenience when evaluating against a Wikipedia based method - the ID refers to the dump used for annotation, 2010-08-17) | |
| - column 7 is the corresponding Freebase mid, if there is one (thanks to Massimiliano Ciaramita from Google Zürich for creating the mapping and making it available to us) | |
| """ | |
| data_columns = line.split('\t') | |
| self.token = None | |
| self.begin_inside_tag = None | |
| self.full_mention = None | |
| self.yago_entity = None | |
| self.wikipedia_url = None | |
| self.wikipedia_id = None | |
| self.freebase_mid = None | |
| self.candidates = None | |
| if data_columns: | |
| self.token = data_columns[0] | |
| if len(data_columns) > 1: | |
| self.begin_inside_tag = data_columns[1] | |
| if len(data_columns) > 2: | |
| self.full_mention = data_columns[2] | |
| if len(data_columns) > 3: | |
| self.yago_entity = data_columns[3] | |
| if len(data_columns) > 4: | |
| self.wikipedia_url = data_columns[4] | |
| if len(data_columns) > 5: | |
| self.wikipedia_id = data_columns[5] | |
| if len(data_columns) > 6: | |
| self.freebase_mid = data_columns[6] | |
| def set_candidates(self, candidate_record): | |
| self.candidates = candidate_record | |
| self.candidates.non_considered_word_count -= 1 | |
| def __str__(self): | |
| res = "" | |
| t = [self.token, self.begin_inside_tag, self.full_mention, self.yago_entity, self.wikipedia_url, | |
| self.wikipedia_id, self.freebase_mid] | |
| for ind, e in enumerate(t): | |
| if not e: | |
| continue | |
| if ind < len(t) - 1: | |
| res += e + "|" | |
| else: | |
| res += e | |
| if res[-1] == "|": | |
| res = res[:-1] | |
| return res | |
| class Document: | |
| def __init__(self, document_id): | |
| self.document_id = document_id | |
| self.annotations = [] | |
| self.current_annotation = [] | |
| def add_annotation(self, line, candidates): | |
| if not line: | |
| self.flush_current_annotation() | |
| else: | |
| ar = AnnotationRecord(line) | |
| for c in candidates: | |
| if c.non_considered_word_count < 1: | |
| continue | |
| if c.orig_text == ar.full_mention: | |
| ar.set_candidates(c) | |
| break | |
| self.current_annotation.append(ar) | |
| def flush_current_annotation(self): | |
| self.annotations.append(self.current_annotation) | |
| self.current_annotation = [] | |
| class Candidate: | |
| def __init__(self, candidate_line): | |
| self.id = "" | |
| self.in_count = 0 | |
| self.out_count = 0 | |
| self.links = 0 | |
| self.url = "" | |
| self.name = "" | |
| self.normal_name = "" | |
| self.normal_wiki_title = "" | |
| self.predicted_type = "" | |
| for item in candidate_line.split('\t'): | |
| if item == 'CANDIDATE' or not item.strip(): | |
| continue | |
| elif item.startswith('id:'): | |
| self.id = item[3:] | |
| elif item.startswith('inCount:'): | |
| self.in_count = int(item[8:]) | |
| elif item.startswith('outCount:'): | |
| self.out_count = int(item[9:]) | |
| elif item.startswith('links:'): | |
| self.links = item[6:] | |
| elif item.startswith('url:'): | |
| self.url = item[4:] | |
| elif item.startswith('name:'): | |
| self.name = item[5:] | |
| elif item.startswith('normalName:'): | |
| self.normal_name = item[11:] | |
| elif item.startswith('normalWikiTitle:'): | |
| self.normal_wiki_title = item[16:] | |
| elif item.startswith('predictedType:'): | |
| self.predicted_type = item[14:] | |
| else: | |
| raise ValueError(f"Undefined PPRforNED CANDIDATE column: {item}") | |
| def __str__(self): | |
| return f"id: {self.id}\twiki_page: {self.url.replace('http://en.wikipedia.org/wiki/', '')}" | |
| class CandidateRecord: | |
| def __init__(self, entity_header): | |
| self.candidates = [] | |
| self.text = "" | |
| self.normal_name = "" | |
| self.predicted_type = "" | |
| self.q = False | |
| self.qid = "" | |
| self.docid = -1 | |
| self.orig_text = "" | |
| self.non_considered_word_count = 0 | |
| self.url = "" | |
| for item in entity_header.split('\t'): | |
| if item == 'ENTITY': | |
| continue | |
| elif item.startswith('text:'): | |
| self.text = item[5:] | |
| elif item.startswith('normalName:'): | |
| self.normal_name = item[11:] | |
| elif item.startswith('predictedType:'): | |
| self.predicted_type = item[14:] | |
| elif item.startswith('q:'): | |
| self.q = bool(item[2:]) | |
| elif item.startswith('qid:'): | |
| self.qid = item[4:] | |
| elif item.startswith('docId:'): | |
| self.docid = int(item[6:]) - 1 | |
| elif item.startswith('origText:'): | |
| self.orig_text = item[9:] | |
| self.non_considered_word_count = len(self.orig_text.split()) | |
| elif item.startswith('url:'): | |
| self.url = item[4:] | |
| else: | |
| raise ValueError(f"Undefined PPRforNED column: {item}") | |
| def add_candidate(self, candidate_line): | |
| self.candidates.append(Candidate(candidate_line)) | |
| def __str__(self): | |
| cnds = '\n\t'.join([str(x) for x in self.candidates]) | |
| return f"doc_id: {self.docid}\toriginal_text: {self.orig_text}\tcandidates:\n\t{cnds}" | |
| def get_candidates(ppr_for_ned_candidates_zip, last_document_id): | |
| candidates_string = ppr_for_ned_candidates_zip.read(str(last_document_id + 1)).decode("utf-8").split("\n") | |
| candidates = [] | |
| for c_line in candidates_string: | |
| if not c_line.strip(): | |
| continue | |
| if c_line.startswith("ENTITY"): | |
| candidates.append(CandidateRecord(c_line)) | |
| elif c_line.startswith("CANDIDATE"): | |
| assert len(candidates) | |
| candidates[-1].add_candidate(c_line) | |
| else: | |
| raise ValueError("This must be unreachable!") | |
| return candidates | |
| class AIDADataset: | |
| def __init__(self): | |
| super(AIDADataset, self).__init__() | |
| self.dataset = None | |
| self.data_path = str(get_aida_yago_tsv_file_path().absolute()) | |
| assert os.path.exists(self.data_path), f"The passed dataset address: {self.data_path} does not exist" | |
| self.load_dataset() | |
| def load_dataset(self): | |
| ppr_for_ned_candidates_zip = zipfile.ZipFile(get_resources_dir() / "data" / "PPRforNED.zip", "r") | |
| annotations = [[], [], []] | |
| current_document = None | |
| current_document_candidates = None | |
| data_split_id = -1 | |
| last_document_id = 0 | |
| with open(self.data_path, "r", encoding="utf-8") as data_file: | |
| for ind, line in enumerate(data_file): | |
| line = line.strip() | |
| if line.startswith("-DOCSTART-"): | |
| if current_document: | |
| annotations[data_split_id].append(current_document) | |
| last_document_id += 1 | |
| if line == TRAIN_START_LINE or line == TESTA_START_LINE or line == TESTB_START_LINE: | |
| data_split_id += 1 | |
| current_document = Document(last_document_id) | |
| current_document_candidates = get_candidates(ppr_for_ned_candidates_zip, last_document_id) | |
| else: | |
| current_document.add_annotation(line, current_document_candidates) | |
| if current_document: | |
| annotations[data_split_id].append(current_document) | |
| self.dataset = {"train": annotations[0], "testa": annotations[1], "testb": annotations[2]} | |
| ppr_for_ned_candidates_zip.close() | |