196 lines
4.9 KiB
Python
196 lines
4.9 KiB
Python
import enum
|
|
from typing import NamedTuple, List, Sequence, TypeVar
|
|
import json
|
|
|
|
KoreanBase="[Ko, En] NER, POStag data/국문 NER, POS"
|
|
EnglishBase="[Ko, En] NER, POStag data/영문 NER, POS"
|
|
|
|
class Sentence(NamedTuple):
|
|
word: List[str]
|
|
pos: List[str]
|
|
"""
|
|
POS
|
|
"""
|
|
namedEntity: List[str]
|
|
"""
|
|
Named Entity
|
|
"""
|
|
detail: List[str]
|
|
"""
|
|
Named Entity Detail
|
|
"""
|
|
def append(self,word,pos,namedEntity,detail):
|
|
self.word.append(word)
|
|
self.pos.append(pos)
|
|
self.namedEntity.append(namedEntity)
|
|
self.detail.append(detail)
|
|
T = TypeVar('T')
|
|
def readDataList(lst: List[T]):
|
|
ret = []
|
|
for l in lst:
|
|
if len(l) > 0:
|
|
ret.append(l)
|
|
else:
|
|
yield ret
|
|
ret.clear()
|
|
|
|
def readKoreanData(path: str) -> List[Sentence]:
|
|
fp = open(path,encoding="utf-8")
|
|
data = []
|
|
for line in fp.readlines():
|
|
line = line.strip()
|
|
if line == "":
|
|
data.append([])
|
|
else:
|
|
data.append(line.split("\t"))
|
|
fp.close()
|
|
# Do not use csv reader.
|
|
ret = []
|
|
|
|
for lines in readDataList(data):
|
|
sentence = Sentence([],[],[],[])
|
|
for line in lines:
|
|
word_pos:str = line[0]
|
|
words = word_pos.split("/")
|
|
sentence.append(words[0],line[1],line[2],line[3])
|
|
ret.append(sentence)
|
|
|
|
return ret
|
|
|
|
|
|
def readKoreanDataAll():
|
|
"""
|
|
@return train, dev, test tuple
|
|
Each entry is structured as follows:
|
|
POS,
|
|
"""
|
|
dev = readKoreanData(f"{KoreanBase}/dev.txt")
|
|
test = readKoreanData(f"{KoreanBase}/test.txt")
|
|
train = readKoreanData(f"{KoreanBase}/train.txt")
|
|
return train, dev, test
|
|
|
|
class TagIdConverter:
|
|
def __init__(self, dict_path = "tags.json") -> None:
|
|
with open(dict_path,"r+",encoding="utf-8") as fp:
|
|
data = json.load(fp)
|
|
self.vocab = {}
|
|
for item in data:
|
|
self.vocab[item["name"]] = item["index"]
|
|
self.ids_to_token = {}
|
|
for item in data:
|
|
self.ids_to_token[item["index"]] = item["name"]
|
|
|
|
@property
|
|
def O_id(self):
|
|
return self.vocab["O"]
|
|
@property
|
|
def pad_id(self):
|
|
return self.vocab["[PAD]"]
|
|
|
|
def convert_ids_to_tokens(self,ids: List[int]):
|
|
return [self.ids_to_token[id] for id in ids]
|
|
def convert_tokens_to_ids(self, tokens: List[str]):
|
|
return [self.vocab[tt] for tt in tokens]
|
|
|
|
class MatchState(enum.IntEnum):
|
|
MATCH = 0
|
|
BEGIN = 1
|
|
INTER = 2
|
|
|
|
|
|
def match_indexes(a,b) -> Sequence[MatchState]:
|
|
s = iter(b)
|
|
v = ""
|
|
try:
|
|
v = next(s)
|
|
except StopIteration:
|
|
return
|
|
for k in a:
|
|
try:
|
|
if k == v:
|
|
yield MatchState.MATCH, k, v
|
|
v = next(s)
|
|
else:
|
|
yield MatchState.BEGIN, k, v
|
|
cum = v
|
|
while True:
|
|
v: str = next(s)
|
|
cum += v.strip("#")
|
|
yield MatchState.INTER, k, v
|
|
if k == cum:
|
|
v = next(s)
|
|
break
|
|
except StopIteration:
|
|
break
|
|
|
|
def make_long_namedEntity(a,b,c):
|
|
it = iter(c)
|
|
ret = []
|
|
entityType = ""
|
|
o = False
|
|
for s,_,_ in match_indexes(a,b):
|
|
try:
|
|
if s == MatchState.MATCH:
|
|
v = next(it)
|
|
ret.append(v)
|
|
elif s == MatchState.BEGIN:
|
|
v = next(it)
|
|
ret.append(v)
|
|
if v == "O":
|
|
o = True
|
|
else:
|
|
vv = v.split("-")
|
|
entityType = vv[1]
|
|
o = False
|
|
elif s == MatchState.INTER:
|
|
if o:
|
|
ret.append("O")
|
|
else:
|
|
ret.append(f"I-{entityType}")
|
|
except StopIteration:
|
|
break
|
|
return ret
|
|
|
|
|
|
|
|
"""
|
|
extracts and stores tags set from the given data.
|
|
"""
|
|
if __name__ == "__main__":
|
|
from tqdm import tqdm
|
|
t = TagIdConverter()
|
|
|
|
train, dev, test = readKoreanDataAll()
|
|
vocab = set()
|
|
def getTags(lst: List[Sentence]):
|
|
for s in tqdm(lst):
|
|
for e in s.detail:
|
|
if not e in vocab:
|
|
vocab.add(e)
|
|
print("get tags from train...")
|
|
getTags(train)
|
|
print("get tags from dev...")
|
|
getTags(dev)
|
|
print("get tags from test...")
|
|
getTags(test)
|
|
print(vocab)
|
|
|
|
for v in vocab:
|
|
if v == "O":
|
|
continue
|
|
s = v.split("-")
|
|
s[0] = {"B":"I","I":"B"}[(s[0])]
|
|
v:str = "-".join(s)
|
|
if not v in vocab:
|
|
print("could not found pair " ,v)
|
|
|
|
tags = [{"name":"[PAD]","index":0}]
|
|
i = 1
|
|
vocab_list = [*vocab]
|
|
vocab_list.sort()
|
|
for v in vocab_list:
|
|
tags.append({"name":v,"index":i})
|
|
i += 1
|
|
print(tags)
|
|
with open("tags.json","w",encoding="utf-8") as fp:
|
|
json.dump(tags,fp,ensure_ascii=False, indent=2) |