Stock/db.py

196 lines
6.6 KiB
Python
Raw Permalink Normal View History

2022-05-25 03:16:39 +09:00
import sqlite3
import datetime
import pandas as pd
import krx
import os
import argparse
import enum
import json
import tqdm
CREATE_STOCK_STATEMENT = """
CREATE TABLE "STOCK" (
"Code" TEXT,
"Date" TEXT,
"Close" INTEGER NOT NULL,
"Diff" INTEGER NOT NULL,
"Open" INTEGER NOT NULL,
"High" INTEGER NOT NULL,
"Low" INTEGER NOT NULL,
"Volume" INTEGER NOT NULL,
PRIMARY KEY("Date","Code")
);"""
@enum.unique
class STOCK_INDEX(enum.IntEnum):
CODE = 0
DATE = 1
CLOSE = 2
DIFF = 3
OPEN = 4
HIGH = 5
LOW = 6
VOLUME = 7
CREATE_KRXCorp_STATEMENT = """
CREATE TABLE "KRXCorp" (
"Name" TEXT,
"Code" TEXT,
"Sector" TEXT,
"Product" TEXT,
"ListingDay" TEXT,
"ClosingMonth" TEXT,
"Representative" TEXT,
"Homepage" TEXT,
"AddressArea" TEXT,
"LastUpdate" TEXT,
PRIMARY KEY("Code")
);
"""
CREATE_UNIQUE_Stock_INDEX_Statement = """
CREATE UNIQUE INDEX "STOCK_INDEX" ON "STOCK" (
"Code",
"Date"
)
"""
def create_table(nday:int = 90):
"""
Create table for stock data
:param nday: determine lastest crolling day.
"""
print("initialize table...")
with sqlite3.connect("stock.db") as db:
db.execute(CREATE_STOCK_STATEMENT)
db.execute(CREATE_KRXCorp_STATEMENT)
db.execute(CREATE_UNIQUE_Stock_INDEX_Statement)
code_df: pd.DataFrame = krx.load_from_krx_server()
code_df.to_csv("krx.csv")
print("compelete to download from krx")
code_df = code_df.rename(columns={'회사명': 'name', '종목코드': 'code'})
code_df_size = len(code_df)
last_updated = (datetime.date.today() - datetime.timedelta(nday)).isoformat()
for i,row in code_df.iterrows():
code = row["code"]
print(f"{code_df_size}/{i+1} : code { code }",end="\r")
db.execute("""
INSERT INTO "KRXCorp" (Name,Code,Sector,Product,ListingDay,ClosingMonth,Representative,Homepage,AddressArea,LastUpdate) VALUES (
?,?,?,?,?,?,?,?,?,?)
""",(row["name"],row["code"],row["업종"],row["주요제품"],row["상장일"],row["결산월"],row["대표자명"],row["홈페이지"],row["지역"]
, last_updated))
print("\nComplete!")
db.commit()
def update_krx(nday:int = 90):
print("update krx...")
with sqlite3.connect("stock.db") as db:
if os.path.exists("krx.csv"):
code_df = pd.read_csv("krx.csv", index_col=0, dtype=str)
else:
code_df: pd.DataFrame = krx.load_from_krx_server()
code_df.to_csv("krx.csv")
print("compelete to download from krx")
code_df = code_df.rename(columns={'회사명': 'name', '종목코드': 'code'})
pbar = tqdm.tqdm(code_df.iterrows(), total=len(code_df))
for _,row in pbar:
code:str = row["code"]
name:str = row["name"]
pbar.set_description(f"{ code }")
q = db.execute("SELECT COUNT(*) FROM KRXCorp Where Code = ?",[code])
a = q.fetchone()[0]
lastUpdate = (datetime.date.today() - datetime.timedelta(nday)).isoformat()
if a > 0:
db.execute("""
UPDATE "KRXCorp" Set Name = ?,
Sector = ?,
Product = ?,
ListingDay = ?,
ClosingMonth = ?,
Representative = ?,
Homepage = ?,
2023-07-22 16:10:40 +09:00
AddressArea = ?
2022-05-25 03:16:39 +09:00
WHERE Code = ?;
""",(row["name"],row["업종"],row["주요제품"],row["상장일"],row["결산월"],row["대표자명"],row["홈페이지"],row["지역"],code
))
else:
db.execute("""
INSERT INTO "KRXCorp" (Name,Code,Sector,Product,ListingDay,ClosingMonth,Representative,Homepage,AddressArea,LastUpdate) VALUES (
?,?,?,?,?,?,?,?,?,?)
""",(row["name"],row["code"],row["업종"],row["주요제품"],row["상장일"],row["결산월"],row["대표자명"],row["홈페이지"],row["지역"],lastUpdate
))
print("\nComplete!")
db.commit()
def get_data_from_krx(db,stock_code):
cursor = db.execute("""SELECT * FROM KRXCorp WHERE code = ?""", [stock_code])
return cursor.fetchone()
class KRXCorp:
__slots__ = ("Name","Code","Sector","Product","ListingDay",
"ClosingMonth","Representative","Homepage","AddressArea","LastUpdate")
def __init__(self,**kwargs):
super().__init__()
self.Name = kwargs["Name"]
self.Code = kwargs["Code"]
self.Sector = kwargs["Sector"]
self.Product = kwargs["Product"]
self.ListingDay = kwargs["ListingDay"]
self.ClosingMonth = kwargs["ClosingMonth"]
self.Representative = kwargs["Representative"]
self.Homepage = kwargs["Homepage"]
self.AddressArea = kwargs["AddressArea"]
self.LastUpdate = kwargs["LastUpdate"]
@classmethod
def from_db(cls,arr):
return cls(Name=arr[0], Code = arr[1],Sector =arr[2],Product =arr[3],
ListingDay =arr[4],ClosingMonth =arr[5],Representative= arr[6],
Homepage = arr[7],AddressArea = arr[8],LastUpdate = arr[9])
def __repr__(self):
return f"{{{self.Name}: {self.Code}}}"
def toDict(self):
return {
"Name": self.Name,
"Code": self.Code,
"Sector": self.Sector,
"Product": self.Product,
"ListingDay": self.ListingDay,
"ClosingMonth": self.ClosingMonth,
"Representative": self.Representative,
"Homepage": self.Homepage,
"AddressArea": self.AddressArea,
"LastUpdate": self.LastUpdate
}
def GetAllKRXCorp(db):
return [KRXCorp.from_db(c) for c in db.execute("""SELECT * From KRXCorp""")]
def GetAllStockCode(db):
return [c[0] for c in db.execute("""SELECT Code From KRXCorp""")]
parser = argparse.ArgumentParser(description="Stock DataBase")
parser.add_argument("--create",action="store_true",help="create table")
parser.add_argument("--update",action="store_true",help="update table")
parser.add_argument("--getAll",action="store_true",help="get data from krx")
if __name__ == "__main__":
args = parser.parse_args()
if args.create:
if os.path.exists("stock.db"):
print("db file already exists!")
else:
create_table()
if args.update:
update_krx()
if args.getAll:
with sqlite3.connect("stock.db") as db:
print(GetAllKRXCorp(db))