import argparse import json import os import sqlite3 from typing import Dict, List from render import * import db as database from jinja2 import Environment, PackageLoader, select_autoescape import pandas as pd import tqdm class DataStore: def __init__(self) -> None: self.db = sqlite3.connect("stock.db") self.pricesCache: Dict[str,] = {} def getAllKRXCorp(self) -> List[database.KRXCorp]: return database.GetAllKRXCorp(self.db) def getStockPrice(self,code,length) -> pd.DataFrame: if code in self.pricesCache and len(self.pricesCache[code]) >= length: return self.pricesCache[code] else: s = GetStockPriceFrom(self.db,code,length) s = pd.DataFrame(s, columns=[s for s in database.STOCK_INDEX.__members__.keys()]) s.set_index("DATE", inplace=True) self.pricesCache[code] = s return self.pricesCache[code] def clearCache(self) -> None: self.pricesCache = {} def __del__(self) -> None: self.db.close() class OutputCollectorElement: def __init__(self, name: str, description: str) -> None: self.name = name self.description = description self.corpListByDate:Dict[str,database.KRXCorp] = {} def __str__(self) -> str: return f"OutputCollectorElement:{self.name}" def addCorp(self, date, corp): self.corpListByDate.setdefault(date, []).append(corp) def toDict(self) -> Dict: return { "name": self.name, "description": self.description, "corpListByDate": {k:[d.toDict() for d in v] for k,v in self.corpListByDate.items()} } class OutputCollector: def __init__(self) -> None: self.data: Dict[str,OutputCollectorElement] = {} def addResult(self, key, help = ""): """ add output category to collect """ self.data[key] = OutputCollectorElement(key, help) def collect(self, key, corp, date): self.data[key].addCorp(date, corp) def isVolumeNTimes(stock: pd.DataFrame, mul: float, nday:int, order=1) -> bool: return stock.iloc[nday]['VOLUME'] > stock.iloc[nday+order]['VOLUME'] * mul def isVolumeMulPriceGreaterThan(stock: pd.DataFrame, threshold: int, nday: int) -> bool: return stock.iloc[nday]['VOLUME'] * stock.iloc[nday]['CLOSE'] > threshold def isMACDCrossSignal(signal: pd.Series, macd: pd.Series, nday: int, order=1) -> bool: return (signal.iloc[nday] < macd.iloc[nday] and signal.iloc[nday+order] > macd.iloc[nday+order]) def isRelativeDiffLessThan(a:pd.Series,b:pd.Series, threshold: float,nday:int) -> bool: return (a.iloc[nday] - b.iloc[nday]) / b.iloc[nday] < threshold def isDiffGreaterThan(a:pd.Series,b:pd.Series, nday:int) -> bool: """a is bigger than b""" return (a.iloc[nday] > b.iloc[nday]) def prepareCollector(collector: OutputCollector) -> None: collector.addResult("cross 2", """\ 5일선과 20일선이 서로 만나는 시점 즉 상대 오차가 1% 이하이고 5일선과 60일선이 서로 만나는 시점을 찾습니다. """) collector.addResult("cross 3", """\ cross 2의 조건에서 더해서 거래량이 이전 날짜보다 3배 증가하고 100000 이상인 시점을 찾습니다. """) collector.addResult("cross 4", """\ 20일선과 60일선이 서로 만나는 시점 즉 상대 오차가 1% 이하이고 거래량이 1000000 이상인 시점을 찾습니다. """) collector.addResult("d20d5", """\ 5일선이 20선보다 큰 시점을 찾습니다. """) collector.addResult("d20d5VolumeX5", """\ d20d5의 조건에서 더해서 거래량이 이전 날짜보다 5배 증가한 시점을 찾습니다. """) collector.addResult("DiffDistance", """\ 5일선과 20일선이 서로 만나는 시점 즉 상대 오차가 3% 이하이고 5일선과 60일선이 서로 만나고 거래량이 이전 날짜보다 3배 증가한 시점을 찾습니다. """) collector.addResult("volume", """\ 거래량이 이전 날짜보다 3배 증가한 시점을 찾습니다. """) collector.addResult("volume5", """\ 거래량과 가격의 곱이 50,000,000,000 이상인 시점을 찾습니다. """) collector.addResult("volumeX5", """\ 거래량이 이전 날짜보다 5배 증가한 시점을 찾습니다. """) collector.addResult("macd", """\ signal과 macd가 서로 교차한 시점을 찾습니다. 즉 signal이 아래로 떨어지고 macd가 위로 올라가는 시점을 찾습니다. """) def collect(data: DataStore, collector: OutputCollector, corp: database.KRXCorp , nday: int) -> None: stock = data.getStockPrice(corp.Code,70) if len(stock) < 70: return d5 = stock["CLOSE"].loc[::-1].rolling(window=5 ).mean().dropna().loc[::-1] d20 = stock["CLOSE"].loc[::-1].rolling(window=20 ).mean().dropna().loc[::-1] d60 = stock["CLOSE"].loc[::-1].rolling(window=60 ).mean().dropna().loc[::-1] if (isRelativeDiffLessThan(d5, d20, 0.01, nday) and isRelativeDiffLessThan(d5, d60, 0.01, nday)): collector.collect("cross 2", corp, stock.index[nday]) if (isVolumeNTimes(stock, 3, 0) and isVolumeMulPriceGreaterThan(stock, 100000, nday)): collector.collect("cross 3", corp, stock.index[nday]) if (isRelativeDiffLessThan(d20, d60, 0.01, nday) and isVolumeMulPriceGreaterThan(stock, 1000000, nday)): collector.collect("cross 4", corp, stock.index[nday]) if (isDiffGreaterThan(d5, d20, nday)): collector.collect("d20d5", corp, stock.index[nday]) if (isVolumeNTimes(stock, 5, nday)): collector.collect("d20d5VolumeX5", corp, stock.index[nday]) if (isRelativeDiffLessThan(d5, d20, 0.03, nday) and isRelativeDiffLessThan(d5, d60, 0.03, nday) and isVolumeNTimes(stock, 3, nday)): collector.collect("DiffDistance", corp, stock.index[nday]) if (isVolumeNTimes(stock, 3, nday)): collector.collect("volume", corp, stock.index[nday]) if (isVolumeMulPriceGreaterThan(stock, 50000000, nday)): collector.collect("volume5", corp, stock.index[nday]) if (isVolumeNTimes(stock, 5, nday)): collector.collect("volumeX5", corp, stock.index[nday]) ewm12 = stock["CLOSE"].loc[::-1].ewm(span=12).mean().loc[::-1] ewm26 = stock["CLOSE"].loc[::-1].ewm(span=26).mean().loc[::-1] macd = (ewm12 - ewm26) signal = macd.ewm(span=9).mean() if (isMACDCrossSignal(macd, signal, nday)): collector.collect("macd", corp, stock.index[nday]) parser = argparse.ArgumentParser(description="주식 검색 정보를 출력합니다.") parser.add_argument("--format", "-f", choices=["json", "html"], default="html", help="출력 포맷을 지정합니다. 기본값은 html입니다.") parser.add_argument("--dir", "-d", default=".", help="출력할 폴더를 지정합니다.") parser.add_argument("--corp", "-c", help="주식 코드를 지정합니다. 지정하지 않으면 모든 주식을 검색합니다.") parser.add_argument("--printStdout", action="store_true", help="출력한 결과를 표준 출력으로 출력합니다.") parser.add_argument("--version", "-v", action="version", version="%(prog)s 1.0") parser.add_argument("--verbose", "-V", action="store_true", help="출력할 내용을 자세히 표시합니다.") if __name__ == "__main__": args = parser.parse_args() dataStore = DataStore() krx_corps = dataStore.getAllKRXCorp() if args.corp: krx_corps = [corp for corp in krx_corps if corp.Code == args.corp] env = Environment( loader=PackageLoader('render', 'templates'), autoescape=select_autoescape(['html', 'xml']) ) collector = OutputCollector() prepareCollector(collector) for corp in tqdm.tqdm(krx_corps): for nday in range(0, 5): collect(dataStore, collector, corp, nday) dataStore.clearCache() for k,v in collector.data.items(): if args.format == "json": data = json.dumps(v.toDict(), indent=4, ensure_ascii=False) if args.printStdout: print(k) print(data) else: with open(os.path.join(args.dir, k + ".json", encoding="UTF-8"), "w") as f: f.write(data) else: template = env.get_template("Lists.html") days = v.corpListByDate.keys() days = list(days) days.sort(reverse=True) days = days[:5] html = template.render(collected=v, title=k, days=days) if args.printStdout: print(html) else: with open(os.path.join(args.dir, k + ".html"), "w", encoding="UTF-8") as f: f.write(html)