2022-02-13 17:34:03 +09:00
|
|
|
import enum
|
2022-02-22 16:33:07 +09:00
|
|
|
from io import TextIOWrapper
|
|
|
|
from typing import Iterable, NamedTuple, List, Sequence, TypeVar
|
2022-02-13 17:34:03 +09:00
|
|
|
import json
|
|
|
|
|
|
|
|
KoreanBase="[Ko, En] NER, POStag data/국문 NER, POS"
|
|
|
|
EnglishBase="[Ko, En] NER, POStag data/영문 NER, POS"
|
|
|
|
|
|
|
|
class Sentence(NamedTuple):
|
|
|
|
word: List[str]
|
|
|
|
pos: List[str]
|
|
|
|
"""
|
|
|
|
POS
|
|
|
|
"""
|
|
|
|
namedEntity: List[str]
|
|
|
|
"""
|
|
|
|
Named Entity
|
|
|
|
"""
|
|
|
|
detail: List[str]
|
|
|
|
"""
|
|
|
|
Named Entity Detail
|
|
|
|
"""
|
|
|
|
def append(self,word,pos,namedEntity,detail):
|
|
|
|
self.word.append(word)
|
|
|
|
self.pos.append(pos)
|
|
|
|
self.namedEntity.append(namedEntity)
|
|
|
|
self.detail.append(detail)
|
|
|
|
T = TypeVar('T')
|
2022-02-22 16:33:07 +09:00
|
|
|
def readDataList(lst: Iterable[str], sep="\t"):
|
|
|
|
ret:List[str] = []
|
2022-02-13 17:34:03 +09:00
|
|
|
for l in lst:
|
2022-02-22 16:33:07 +09:00
|
|
|
l = l.strip()
|
|
|
|
if l == "":
|
2022-02-13 17:34:03 +09:00
|
|
|
yield ret
|
|
|
|
ret.clear()
|
|
|
|
else:
|
2022-02-22 16:33:07 +09:00
|
|
|
ret.append(l.split(sep))
|
2022-02-13 17:34:03 +09:00
|
|
|
|
2022-02-22 16:33:07 +09:00
|
|
|
def readKoreanData(fp: TextIOWrapper) -> List[Sentence]:
|
|
|
|
ret = []
|
|
|
|
# NOTE(monoid): Do not use csv reader.
|
|
|
|
for lines in readDataList(fp):
|
2022-02-13 17:34:03 +09:00
|
|
|
sentence = Sentence([],[],[],[])
|
|
|
|
for line in lines:
|
|
|
|
word_pos:str = line[0]
|
|
|
|
words = word_pos.split("/")
|
|
|
|
sentence.append(words[0],line[1],line[2],line[3])
|
|
|
|
ret.append(sentence)
|
2022-02-22 16:33:07 +09:00
|
|
|
|
|
|
|
fp.close()
|
2022-02-13 17:34:03 +09:00
|
|
|
return ret
|
|
|
|
|
2022-02-22 16:33:07 +09:00
|
|
|
def readEnglishData(fp: TextIOWrapper) -> List[Sentence]:
|
|
|
|
ret = []
|
|
|
|
for lines in readDataList(fp,sep=" "):
|
|
|
|
if len(lines) == 1 and lines[0][0] == "-DOCSTART-":
|
|
|
|
continue
|
|
|
|
sentence = Sentence([],[],[],[])
|
|
|
|
for line in lines:
|
|
|
|
sentence.append(line[0],line[1],line[2],line[3])
|
|
|
|
ret.append(sentence)
|
|
|
|
return ret
|
2022-02-13 17:34:03 +09:00
|
|
|
|
|
|
|
def readKoreanDataAll():
|
|
|
|
"""
|
|
|
|
Each entry is structured as follows:
|
|
|
|
POS,
|
2022-02-22 16:33:07 +09:00
|
|
|
Return: train, dev, test tuple
|
2022-02-13 17:34:03 +09:00
|
|
|
"""
|
2022-02-22 16:33:07 +09:00
|
|
|
with open(f"{KoreanBase}/dev.txt", encoding="utf-8") as fp:
|
|
|
|
dev = readKoreanData(fp)
|
|
|
|
with open(f"{KoreanBase}/test.txt", encoding="utf-8") as fp:
|
|
|
|
test = readKoreanData(fp)
|
|
|
|
with open(f"{KoreanBase}/train.txt", encoding="utf-8") as fp:
|
|
|
|
train = readKoreanData(fp)
|
|
|
|
return train, dev, test
|
|
|
|
|
|
|
|
def readEnglishDataAll():
|
|
|
|
with open(f"{EnglishBase}/valid.txt", encoding="utf-8") as fp:
|
|
|
|
print("a")
|
|
|
|
dev = readEnglishData(fp)
|
|
|
|
print("b")
|
|
|
|
with open(f"{EnglishBase}/test.txt", encoding="utf-8") as fp:
|
|
|
|
test = readEnglishData(fp)
|
|
|
|
with open(f"{EnglishBase}/train.txt", encoding="utf-8") as fp:
|
|
|
|
train = readEnglishData(fp)
|
2022-02-13 17:34:03 +09:00
|
|
|
return train, dev, test
|
|
|
|
|
|
|
|
class TagIdConverter:
|
|
|
|
def __init__(self, dict_path = "tags.json") -> None:
|
|
|
|
with open(dict_path,"r+",encoding="utf-8") as fp:
|
|
|
|
data = json.load(fp)
|
|
|
|
self.vocab = {}
|
|
|
|
for item in data:
|
|
|
|
self.vocab[item["name"]] = item["index"]
|
|
|
|
self.ids_to_token = {}
|
|
|
|
for item in data:
|
|
|
|
self.ids_to_token[item["index"]] = item["name"]
|
|
|
|
|
|
|
|
@property
|
|
|
|
def O_id(self):
|
|
|
|
return self.vocab["O"]
|
|
|
|
@property
|
|
|
|
def pad_id(self):
|
|
|
|
return self.vocab["[PAD]"]
|
|
|
|
|
|
|
|
def convert_ids_to_tokens(self,ids: List[int]):
|
|
|
|
return [self.ids_to_token[id] for id in ids]
|
|
|
|
def convert_tokens_to_ids(self, tokens: List[str]):
|
|
|
|
return [self.vocab[tt] for tt in tokens]
|
|
|
|
|
|
|
|
class MatchState(enum.IntEnum):
|
|
|
|
MATCH = 0
|
|
|
|
BEGIN = 1
|
|
|
|
INTER = 2
|
|
|
|
|
|
|
|
|
|
|
|
def match_indexes(a,b) -> Sequence[MatchState]:
|
|
|
|
s = iter(b)
|
|
|
|
v = ""
|
|
|
|
try:
|
|
|
|
v = next(s)
|
|
|
|
except StopIteration:
|
|
|
|
return
|
|
|
|
for k in a:
|
|
|
|
try:
|
|
|
|
if k == v:
|
|
|
|
yield MatchState.MATCH, k, v
|
|
|
|
v = next(s)
|
|
|
|
else:
|
|
|
|
yield MatchState.BEGIN, k, v
|
|
|
|
cum = v
|
|
|
|
while True:
|
|
|
|
v: str = next(s)
|
|
|
|
cum += v.strip("#")
|
|
|
|
yield MatchState.INTER, k, v
|
|
|
|
if k == cum:
|
|
|
|
v = next(s)
|
|
|
|
break
|
|
|
|
except StopIteration:
|
|
|
|
break
|
|
|
|
|
|
|
|
def make_long_namedEntity(a,b,c):
|
|
|
|
it = iter(c)
|
|
|
|
ret = []
|
|
|
|
entityType = ""
|
|
|
|
o = False
|
|
|
|
for s,_,_ in match_indexes(a,b):
|
|
|
|
try:
|
|
|
|
if s == MatchState.MATCH:
|
|
|
|
v = next(it)
|
|
|
|
ret.append(v)
|
|
|
|
elif s == MatchState.BEGIN:
|
|
|
|
v = next(it)
|
|
|
|
ret.append(v)
|
|
|
|
if v == "O":
|
|
|
|
o = True
|
|
|
|
else:
|
|
|
|
vv = v.split("-")
|
|
|
|
entityType = vv[1]
|
|
|
|
o = False
|
|
|
|
elif s == MatchState.INTER:
|
|
|
|
if o:
|
|
|
|
ret.append("O")
|
|
|
|
else:
|
|
|
|
ret.append(f"I-{entityType}")
|
|
|
|
except StopIteration:
|
|
|
|
break
|
|
|
|
return ret
|
|
|
|
|
|
|
|
"""
|
|
|
|
extracts and stores tags set from the given data.
|
|
|
|
"""
|
|
|
|
if __name__ == "__main__":
|
|
|
|
from tqdm import tqdm
|
|
|
|
t = TagIdConverter()
|
|
|
|
|
2022-02-22 16:33:07 +09:00
|
|
|
train, dev, test = readEnglishDataAll()
|
2022-02-13 17:34:03 +09:00
|
|
|
vocab = set()
|
|
|
|
def getTags(lst: List[Sentence]):
|
|
|
|
for s in tqdm(lst):
|
|
|
|
for e in s.detail:
|
2022-02-14 22:23:35 +09:00
|
|
|
vocab.add(e)
|
2022-02-13 17:34:03 +09:00
|
|
|
print("get tags from train...")
|
|
|
|
getTags(train)
|
|
|
|
print("get tags from dev...")
|
|
|
|
getTags(dev)
|
|
|
|
print("get tags from test...")
|
|
|
|
getTags(test)
|
|
|
|
print(vocab)
|
|
|
|
for v in vocab:
|
|
|
|
if v == "O":
|
|
|
|
continue
|
|
|
|
s = v.split("-")
|
|
|
|
s[0] = {"B":"I","I":"B"}[(s[0])]
|
|
|
|
v:str = "-".join(s)
|
|
|
|
if not v in vocab:
|
|
|
|
print("could not found pair " ,v)
|
2022-02-14 22:23:35 +09:00
|
|
|
vocab.add(v)
|
2022-02-13 17:34:03 +09:00
|
|
|
tags = [{"name":"[PAD]","index":0}]
|
|
|
|
i = 1
|
|
|
|
vocab_list = [*vocab]
|
|
|
|
vocab_list.sort()
|
|
|
|
for v in vocab_list:
|
|
|
|
tags.append({"name":v,"index":i})
|
|
|
|
i += 1
|
|
|
|
print(tags)
|
2022-02-22 16:33:07 +09:00
|
|
|
#with open("tags.json","w",encoding="utf-8") as fp:
|
|
|
|
# json.dump(tags,fp,ensure_ascii=False, indent=2)
|