지난 포스팅까지 PostgreSQL 데이터베이스 서버를 리눅스 서버에 설치하고 기본적인 세팅 및 외부에서 접속할 수 있도록 설정을 하는 방법에 대해서 살펴 보았습니다.
이번 시간에는 업비트 웹소켓 데이터를 PostgreSQL DB에 저장하기 위해 테이블 및 인덱스를 생성하고 파이썬 프로그램을 이용해서 웹소켓 데이터를 저장하는 방법에 대해서 살펴 보겠습니다.
아직 PostgreSQL 설치를 하지 않으신 분들은 아래 포스팅을 순차적으로 참고 부탁 드립니다.
2022.02.07 - [코딩스토리/리눅스] - 오라클 리눅스 8 - PostgreSQL 13 데이터베이스 설치하기
2022.02.09 - [코딩스토리/리눅스] - 오라클 리눅스 8 - PostgreSQL 13 설정하기
2022.02.19 - [코딩스토리/리눅스] - 오라클 리눅스 8 - PostgreSQL 13 외부 접속하기
목차 - 클릭하면 이동합니다.
업비트 웹소켓 데이터 저장하기
테이블 생성하기
업비트 웹소켓 데이터를 PostgreSQL DB에 저장하기 위해서는 먼저 웹소켓 데이터를 저장할 테이블 및 인덱스를 만들어 주어야 합니다.
CREATE TABLE TICKER_DATA
(
DATETIME TIMESTAMP,
CODE VARCHAR(20),
OPENING_PRICE NUMERIC,
HIGH_PRICE NUMERIC,
LOW_PRICE NUMERIC,
TRADE_PRICE NUMERIC,
PREV_CLOSING_PRICE NUMERIC,
CHANGE VARCHAR(10),
CHANGE_PRICE NUMERIC,
SIGNED_CHANGE_PRICE NUMERIC,
CHANGE_RATE NUMERIC,
SIGNED_CHANGE_RATE NUMERIC,
TRADE_VOLUME NUMERIC,
ACC_TRADE_VOLUME NUMERIC,
ACC_TRADE_VOLUME_24H NUMERIC,
ACC_TRADE_PRICE NUMERIC,
ACC_TRADE_PRICE_24H NUMERIC,
TRADE_DATE VARCHAR(10),
TRADE_TIME VARCHAR(10),
TRADE_TIMESTAMP NUMERIC,
ASK_BID VARCHAR(10),
ACC_ASK_VOLUME NUMERIC,
ACC_BID_VOLUME NUMERIC,
HIGHEST_52_WEEK_PRICE NUMERIC,
HIGHEST_52_WEEK_DATE VARCHAR(10),
LOWEST_52_WEEK_PRICE NUMERIC,
LOWEST_52_WEEK_DATE VARCHAR(10),
MARKET_STATE VARCHAR(20),
IS_TRADING_SUSPENDED VARCHAR(10),
DELISTING_DATE TIMESTAMP,
MARKET_WARNING VARCHAR(10),
TIMESTAMP NUMERIC,
STREAM_TYPE VARCHAR(10),
SYS_DATETIME TIMESTAMP
)
테이블을 생성할때 컬럼 타입이 오라클과는 약간 다르긴 하지만 위와 같이 거의 비슷한 문법을 사용하여 테이블을 생성할 수 있습니다. 기본적으로 텍스트 형태는 VARCHAR, 숫자형태는 NUMERIC, 그리고 날짜 형식은 TIMESTAMP 형식으로 컬럼을 선언하면 됩니다.
인덱스 생성
CREATE INDEX IDX_TD_DT ON TICKER_DATA(DATETIME);
CREATE INDEX IDX_TD_CD ON TICKER_DATA(CODE);
CREATE INDEX IDX_TD_DT_CD ON TICKER_DATA(DATETIME, CODE);
CREATE INDEX IDX_TD_CD_DT ON TICKER_DATA(CODE, DATETIME);
인덱스는 추후 대량 데이터를 검색할 때 속도에 많은 차이를 가져오게 됩니다. 지금은 기본적으로 날짜와 코드에 대한 기본적인 인덱스만 생성하고 추후 테이블의 사용용도가 늘어나게 되면 그 때 플랜을 점검하면서 추가하거나 수정하는 것으로 하겠습니다.
웹소켓 데이터 저장 프로그램 - save_ticker_pg.py
import os
import sys
import time
import json
import datetime
import asyncio
import logging
import traceback
import websockets
import psycopg2
# 실행 환경에 따른 공통 모듈 Import
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from module import upbit
# 프로그램 정보
pgm_name = 'save_ticker_pg'
pgm_name_kr = '업비트 TICKER 웹소켓 데이터 저장(PostgreSQL)'
# -----------------------------------------------------------------------------
# - Name : get_subscribe_items
# - Desc : 구독 대상 종목 조회
# -----------------------------------------------------------------------------
def get_subscribe_items():
try:
subscribe_items = []
# KRW 마켓 전 종목 추출
items = upbit.get_items('KRW', '')
# 종목코드 배열로 변환
for item in items:
subscribe_items.append(item['market'])
return subscribe_items
# ---------------------------------------
# Exception 처리
# ----------------------------------------
except Exception:
raise
# -----------------------------------------------------------------------------
# - Name : upbit_ws_client
# - Desc : 업비트 웹소켓
# -----------------------------------------------------------------------------
async def upbit_ws_client():
try:
# 처리 Count 용
data_cnt = 0
# 중복 실행 방지용
seconds = 0
# 종목별 시퀀스
item_seq = {}
# 구독 데이터 조회
subscribe_items = get_subscribe_items()
logging.info('구독 종목 개수 : ' + str(len(subscribe_items)))
logging.info('구독 종목 : ' + str(subscribe_items))
# 구독 데이터 조립
subscribe_fmt = [
{"ticket": "test-websocket"},
{
"type": "ticker",
"codes": subscribe_items,
"isOnlyRealtime": True
},
{"format": "SIMPLE"}
]
subscribe_data = json.dumps(subscribe_fmt)
# PostgreSQL 데이터 베이스 연결
conn = psycopg2.connect(host=upbit.get_env_keyvalue('pg_host')
,dbname=upbit.get_env_keyvalue('pg_dbname')
,user=upbit.get_env_keyvalue('pg_userid')
,password=upbit.get_env_keyvalue('pg_passwd')
,port=upbit.get_env_keyvalue('pg_port'))
# 자동 커밋
conn.autocommit = True
# 커서 획득
cur = conn.cursor()
# INSERT SQL 준비
sql = "INSERT INTO TICKER_DATA (DATETIME,CODE,OPENING_PRICE,HIGH_PRICE,LOW_PRICE,TRADE_PRICE,PREV_CLOSING_PRICE,CHANGE,CHANGE_PRICE,SIGNED_CHANGE_PRICE \
,CHANGE_RATE,SIGNED_CHANGE_RATE,TRADE_VOLUME,ACC_TRADE_VOLUME,ACC_TRADE_VOLUME_24H,ACC_TRADE_PRICE,ACC_TRADE_PRICE_24H,TRADE_DATE,TRADE_TIME,TRADE_TIMESTAMP \
,ASK_BID,ACC_ASK_VOLUME,ACC_BID_VOLUME,HIGHEST_52_WEEK_PRICE,HIGHEST_52_WEEK_DATE,LOWEST_52_WEEK_PRICE,LOWEST_52_WEEK_DATE,MARKET_STATE,IS_TRADING_SUSPENDED,DELISTING_DATE \
,MARKET_WARNING,TIMESTAMP,STREAM_TYPE,SYS_DATETIME) \
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,CURRENT_TIMESTAMP);"
async with websockets.connect(upbit.ws_url) as websocket:
await websocket.send(subscribe_data)
while True:
# 5초 단위 종목 개수 변동여부 확인 용
period = datetime.datetime.now()
data = await websocket.recv()
data = json.loads(data)
# 저장용 데이터 조립
args = (datetime.datetime.fromtimestamp(int(data['tms']/1000)), data['cd'], data['op'], data['hp'], data['lp'], data['tp'],data['pcp'], data['c'], data['cp'], data['scp']
,data['cr'], data['scr'], data['tv'], data['atv'], data['atv24h'], data['atp'], data['atp24h'], data['tdt'], data['ttm'], data['ttms']
,data['ab'], data['aav'], data['abv'], data['h52wp'], data['h52wdt'], data['l52wp'], data['l52wdt'], data['ms'], data['its'], data['dd']
,data['mw'], data['tms'], data['st'])
# 데이터 저장
cur.execute(sql, args)
# 데이터 저장 로깅
data_cnt = data_cnt + 1
if data_cnt % 1000 == 0:
logging.info("[" + str(datetime.datetime.now()) + "] [TICKER_DATA] Inserting Data....[" + str(data_cnt) + "]")
# 5초마다 종목 정보 재 조회 후 추가된 종목이 있으면 웹소켓 다시 시작
if (period.second % 5) == 0 and seconds != period.second:
# 중복 실행 방지
seconds = period.second
# 종목 재조회
re_subscribe_items = get_subscribe_items()
# 현재 종목과 다르면 웹소켓 다시 시작
if subscribe_items != re_subscribe_items:
logging.info('종목 달리짐! 웹소켓 다시 시작')
logging.info('\n\n')
logging.info('*************************************************')
logging.info('기존 종목[' + str(len(subscribe_items)) + '] : ' + str(subscribe_items))
logging.info('종목 재조회[' + str(len(re_subscribe_items)) + '] : ' + str(re_subscribe_items))
logging.info('*************************************************')
logging.info('\n\n')
await websocket.close()
time.sleep(1)
await upbit_ws_client()
# ----------------------------------------
# 모든 함수의 공통 부분(Exception 처리)
# ----------------------------------------
except Exception as e:
logging.error('Exception Raised!')
logging.error(e)
logging.error('Connect Again!')
# 웹소켓 다시 시작
await upbit_ws_client()
# -----------------------------------------------------------------------------
# - Name : main
# - Desc : 메인
# -----------------------------------------------------------------------------
async def main():
try:
# 웹소켓 시작
await upbit_ws_client()
except Exception as e:
logging.error('Exception Raised!')
logging.error(e)
# -----------------------------------------------------------------------------
# - Name : main
# - Desc : 메인
# -----------------------------------------------------------------------------
if __name__ == "__main__":
# noinspection PyBroadException
try:
print("***** USAGE ******")
print("[1] 로그레벨(D:DEBUG, E:ERROR, 그외:INFO)")
if sys.platform.startswith('win32'):
# 로그레벨(D:DEBUG, E:ERROR, 그외:INFO)
log_level = 'I'
upbit.set_loglevel(log_level)
else:
# 로그레벨(D:DEBUG, E:ERROR, 그외:INFO)
log_level = sys.argv[1].upper()
upbit.set_loglevel(log_level)
if log_level == '':
logging.error("입력값 오류!")
sys.exit(-1)
logging.info("***** INPUT ******")
logging.info("[1] 로그레벨(D:DEBUG, E:ERROR, 그외:INFO):" + str(log_level))
# ---------------------------------------------------------------------
# Logic Start!
# ---------------------------------------------------------------------
# 웹소켓 시작
asyncio.run(main())
except KeyboardInterrupt:
logging.error("KeyboardInterrupt Exception 발생!")
logging.error(traceback.format_exc())
sys.exit(-100)
except Exception:
logging.error("Exception 발생!")
logging.error(traceback.format_exc())
sys.exit(-200)
파이썬의 websocket을 이용하여 업비트의 현재가(Ticker) 정보를 구독하여 실시간으로 데이터를 수신하며 데이터가 수신될때마다 DB에 저장하는 로직입니다.
import psycopg2
파이썬에서 PostgreSQL DB에 접속하여 SQL을 수행하려면 위의 모듈이 필요합니다.
python -m pip install --upgrade pip
python -m pip install psycopg2-binary
만약 pip를 이용하여 위의 모듈을 설치할 때 오류가 발생한다면 위의 명령어를 사용하여 PIP 모듈을 최신으로 업그레이드 한 후 psycopg2 대신 psycopg2-binary를 설치해 보시기 바랍니다.
# PostgreSQL 데이터 베이스 연결
conn = psycopg2.connect(host=upbit.get_env_keyvalue('pg_host')
,dbname=upbit.get_env_keyvalue('pg_dbname')
,user=upbit.get_env_keyvalue('pg_userid')
,password=upbit.get_env_keyvalue('pg_passwd')
,port=upbit.get_env_keyvalue('pg_port'))
파이썬에서 psycopg2 모듈을 이용해 DB에 접속하는 부분입니다. 접속 정보는 노출되지 않도록 별도 파일(env.txt)에 보관하고 원하는 곳에서 파일을 읽어서 가져 오는 것이 좋습니다. 해당 방법은 아래 포스팅을 참고 부탁 드립니다.
2022.01.25 - [프로젝트/비트코인 자동매매] - 비트코인 자동매매 프로그램 환경변수 파일로 빼기 - 보안 강화
그 밖에도 새로운 종목이 상장되거나 폐지 될 가능성이 있어 5초 마다 구독 종목 개수를 비교하여 갱신하도록 구현되어 있으며 PostgreSQL의 AutoCommit 기능을 활성화하여 별도의 커밋 명령어 없이 데이터를 저장하도록 했습니다. 프로그램의 로직에 관련하여 궁금한 점이 있으시면 댓글로 문의 부탁 드리겠습니다.
프로그램에 사용된 공통 모듈은 위의 파일을 다운로드 받아 사용하시면 됩니다.
프로그램 실행
python save_ticker_pg.py I
프로그램을 구동하면 데이터가 DB에 저장되기 시작합니다.
마치며
과거에 비슷한 내용으로 오라클 DB에 데이터를 저장하는 방법을 포스팅한 적이 있습니다. 하지만 오라클 DB를 제약 없이 사용하기 위해서는 비용이 많이 발생하기 때문에 무료로 사용할 수 있는 PostgreSQL DB를 사용하기로 결정 했습니다.
다만 DB는 무료로 사용이 가능하다 하더라도 프리티어 무료 서버의 저장공간이 한정되어 있기 때문에 이 부분은 어쩔 수 없이 고려해야 할 것 같습니다.
DB에 저장을 시작한 현 시점(2022-02-23 06:35)에 89G의 저장공간 중 82G가 남아있는 상태이고 PostgreSQL 데이터는 68M 정도 쌓인 상태입니다. 하루에 어느정도의 로우가 쌓이는지, 용량은 얼마나 차지하는지 등을 분석해 보면 보유 가능한 데이터의 양을 계산할 수 있고 해당 내용을 기반으로 정기적으로 데이터를 정리해 주면 계속해서 무료로 사용이 가능할 것이라 판단 됩니다.
추후 용량을 모니터링 해 보고 PostgreSQL에서 데이터를 정리하고 Table Space의 용량을 reclaim하는 방법인 Vaccum을 진행하는 방법에 대해서도 살펴볼 수 있도록 하겠습니다.
블로그를 구독하시면 소식을 조금 더 빨리 받아보실 수 있습니다. 감사합니다.
'프로젝트 > 비트코인 자동매매' 카테고리의 다른 글
비트코인 자동매매 프로그램 만들기 시즌2 안내 (1) | 2024.04.30 |
---|---|
급등주 찾기 쿼리 - PostgreSQL 버전 (4) | 2022.05.03 |
비트코인 자동매매 프로그램 자주 발생하는 오류 및 대처 방법 (30) | 2022.03.12 |
업비트 공지사항 크롤링하여 텔레그램으로 메세지 알림받기 (21) | 2022.01.29 |
급등주 찾기 - 업비트 파이썬 비트코인 자동매매 프로그램 (19) | 2022.01.26 |
비트코인 자동매매 프로그램 환경변수 파일로 빼기 - 보안 강화 (5) | 2022.01.25 |