프로젝트/비트코인 자동매매

업비트 웹소켓 데이터 PostgreSQL DB에 저장하기 - 파이썬 비트코인 자동매매

Tech&Fin 2022. 2. 23. 06:41
반응형

지난 포스팅까지 PostgreSQL 데이터베이스 서버를 리눅스 서버에 설치하고 기본적인 세팅 및 외부에서 접속할 수 있도록 설정을 하는 방법에 대해서 살펴 보았습니다.

 

이번 시간에는 업비트 웹소켓 데이터를 PostgreSQL DB에 저장하기 위해 테이블 및 인덱스를 생성하고 파이썬 프로그램을 이용해서 웹소켓 데이터를 저장하는 방법에 대해서 살펴 보겠습니다.

 

아직 PostgreSQL 설치를 하지 않으신 분들은 아래 포스팅을 순차적으로 참고 부탁 드립니다.

 

2022.02.07 - [코딩스토리/리눅스] - 오라클 리눅스 8 - PostgreSQL 13 데이터베이스 설치하기

 

오라클 리눅스 8 - PostgreSQL 13 데이터베이스 설치하기

이번 시간에는 앞으로 Tech&Fin에서 다루는 비트코인 자동매매 프로그램에서 사용할 PostgreSQL 데이터베이스 13버젼을 오라클 리눅스 8 환경에서 설치하는 방법에 대해서 살펴 보려고 합니다. Tech&Fin

technfin.tistory.com

 

2022.02.09 - [코딩스토리/리눅스] - 오라클 리눅스 8 - PostgreSQL 13 설정하기

 

오라클 리눅스 8 - PostgreSQL 13 설정하기

이번 시간에는 지난 포스팅에 이어 오라클 리눅스 8에 설치한 PostgreSQL의 DB를 설정하는 방법에 대해서 살펴 보겠습니다. 목차 - 클릭하면 이동합니다. PostgreSQL 설정하기 PostgreSQL을 설정하기 위해

technfin.tistory.com

 

2022.02.19 - [코딩스토리/리눅스] - 오라클 리눅스 8 - PostgreSQL 13 외부 접속하기

 

오라클 리눅스 8 - PostgreSQL 13 외부 접속하기

지난 포스팅에서 리눅스 서버에 PostgreSQL 13 버전을 설치하고 설정하는 방법에 대해서 살펴 보았습니다. 이어서 이번 시간에는 방화벽 설정등을 마무리 짓고 DBeaver를 통해 외부에서 DB에 접속하는

technfin.tistory.com

 

 

목차 - 클릭하면 이동합니다.

     

    업비트 웹소켓 데이터 저장하기

    테이블 생성하기

    업비트 웹소켓 데이터를 PostgreSQL DB에 저장하기 위해서는 먼저 웹소켓 데이터를 저장할 테이블 및 인덱스를 만들어 주어야 합니다.

     

    CREATE TABLE TICKER_DATA
    (
      DATETIME               TIMESTAMP,
      CODE                   VARCHAR(20),
      OPENING_PRICE          NUMERIC,
      HIGH_PRICE             NUMERIC,
      LOW_PRICE              NUMERIC,
      TRADE_PRICE            NUMERIC,
      PREV_CLOSING_PRICE     NUMERIC,
      CHANGE                 VARCHAR(10),
      CHANGE_PRICE           NUMERIC,
      SIGNED_CHANGE_PRICE    NUMERIC,
      CHANGE_RATE            NUMERIC,
      SIGNED_CHANGE_RATE     NUMERIC,
      TRADE_VOLUME           NUMERIC,
      ACC_TRADE_VOLUME       NUMERIC,
      ACC_TRADE_VOLUME_24H   NUMERIC,
      ACC_TRADE_PRICE        NUMERIC,
      ACC_TRADE_PRICE_24H    NUMERIC,
      TRADE_DATE             VARCHAR(10),
      TRADE_TIME             VARCHAR(10),
      TRADE_TIMESTAMP        NUMERIC,
      ASK_BID                VARCHAR(10),
      ACC_ASK_VOLUME         NUMERIC,
      ACC_BID_VOLUME         NUMERIC,
      HIGHEST_52_WEEK_PRICE  NUMERIC,
      HIGHEST_52_WEEK_DATE   VARCHAR(10),
      LOWEST_52_WEEK_PRICE   NUMERIC,
      LOWEST_52_WEEK_DATE    VARCHAR(10),
      MARKET_STATE           VARCHAR(20),
      IS_TRADING_SUSPENDED   VARCHAR(10),
      DELISTING_DATE         TIMESTAMP,
      MARKET_WARNING         VARCHAR(10),
      TIMESTAMP              NUMERIC,
      STREAM_TYPE            VARCHAR(10),
      SYS_DATETIME           TIMESTAMP 
    )

    테이블을 생성할때 컬럼 타입이 오라클과는 약간 다르긴 하지만 위와 같이 거의 비슷한 문법을 사용하여 테이블을 생성할 수 있습니다. 기본적으로 텍스트 형태는 VARCHAR, 숫자형태는 NUMERIC, 그리고 날짜 형식은 TIMESTAMP 형식으로 컬럼을 선언하면 됩니다.

     

    인덱스 생성

    CREATE INDEX IDX_TD_DT ON TICKER_DATA(DATETIME);
    CREATE INDEX IDX_TD_CD ON TICKER_DATA(CODE);
    CREATE INDEX IDX_TD_DT_CD ON TICKER_DATA(DATETIME, CODE);
    CREATE INDEX IDX_TD_CD_DT ON TICKER_DATA(CODE, DATETIME);

    인덱스는 추후 대량 데이터를 검색할 때 속도에 많은 차이를 가져오게 됩니다. 지금은 기본적으로 날짜와 코드에 대한 기본적인 인덱스만 생성하고 추후 테이블의 사용용도가 늘어나게 되면 그 때 플랜을 점검하면서 추가하거나 수정하는 것으로 하겠습니다.

     

    웹소켓 데이터 저장 프로그램 - save_ticker_pg.py

    import os
    import sys
    import time
    import json
    import datetime
    import asyncio
    import logging
    import traceback
    import websockets
    import psycopg2
    
    # 실행 환경에 따른 공통 모듈 Import
    sys.path.append(os.path.dirname(os.path.dirname(__file__)))
    from module import upbit
    
    # 프로그램 정보
    pgm_name = 'save_ticker_pg'
    pgm_name_kr = '업비트 TICKER 웹소켓 데이터 저장(PostgreSQL)'
    
    
    # -----------------------------------------------------------------------------
    # - Name : get_subscribe_items
    # - Desc : 구독 대상 종목 조회
    # -----------------------------------------------------------------------------
    def get_subscribe_items():
        try:
            subscribe_items = []
    
            # KRW 마켓 전 종목 추출
            items = upbit.get_items('KRW', '')
    
            # 종목코드 배열로 변환
            for item in items:
                subscribe_items.append(item['market'])
    
            return subscribe_items
    
        # ---------------------------------------
        # Exception 처리
        # ----------------------------------------
        except Exception:
            raise
    
    
    # -----------------------------------------------------------------------------
    # - Name : upbit_ws_client
    # - Desc : 업비트 웹소켓
    # -----------------------------------------------------------------------------
    async def upbit_ws_client():
        try:
    
            # 처리 Count 용
            data_cnt = 0
    
            # 중복 실행 방지용
            seconds = 0
    
            # 종목별 시퀀스
            item_seq = {}
    
            # 구독 데이터 조회
            subscribe_items = get_subscribe_items()
    
            logging.info('구독 종목 개수 : ' + str(len(subscribe_items)))
            logging.info('구독 종목 : ' + str(subscribe_items))
    
            # 구독 데이터 조립
            subscribe_fmt = [
                {"ticket": "test-websocket"},
                {
                    "type": "ticker",
                    "codes": subscribe_items,
                    "isOnlyRealtime": True
                },
                {"format": "SIMPLE"}
            ]
    
            subscribe_data = json.dumps(subscribe_fmt)
    
            # PostgreSQL 데이터 베이스 연결
            conn = psycopg2.connect(host=upbit.get_env_keyvalue('pg_host')
                                   ,dbname=upbit.get_env_keyvalue('pg_dbname')
                                   ,user=upbit.get_env_keyvalue('pg_userid')
                                   ,password=upbit.get_env_keyvalue('pg_passwd')
                                   ,port=upbit.get_env_keyvalue('pg_port'))
    
            # 자동 커밋
            conn.autocommit = True
    
            # 커서 획득
            cur = conn.cursor()
    
            # INSERT SQL 준비
            sql = "INSERT INTO TICKER_DATA (DATETIME,CODE,OPENING_PRICE,HIGH_PRICE,LOW_PRICE,TRADE_PRICE,PREV_CLOSING_PRICE,CHANGE,CHANGE_PRICE,SIGNED_CHANGE_PRICE \
                                           ,CHANGE_RATE,SIGNED_CHANGE_RATE,TRADE_VOLUME,ACC_TRADE_VOLUME,ACC_TRADE_VOLUME_24H,ACC_TRADE_PRICE,ACC_TRADE_PRICE_24H,TRADE_DATE,TRADE_TIME,TRADE_TIMESTAMP \
                                           ,ASK_BID,ACC_ASK_VOLUME,ACC_BID_VOLUME,HIGHEST_52_WEEK_PRICE,HIGHEST_52_WEEK_DATE,LOWEST_52_WEEK_PRICE,LOWEST_52_WEEK_DATE,MARKET_STATE,IS_TRADING_SUSPENDED,DELISTING_DATE \
                                           ,MARKET_WARNING,TIMESTAMP,STREAM_TYPE,SYS_DATETIME) \
                                    VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,CURRENT_TIMESTAMP);"
    
            async with websockets.connect(upbit.ws_url) as websocket:
    
                await websocket.send(subscribe_data)
    
                while True:
                    # 5초 단위 종목 개수 변동여부 확인 용
                    period = datetime.datetime.now()
    
                    data = await websocket.recv()
                    data = json.loads(data)
    
                    # 저장용 데이터 조립
                    args = (datetime.datetime.fromtimestamp(int(data['tms']/1000)), data['cd'], data['op'], data['hp'], data['lp'], data['tp'],data['pcp'], data['c'], data['cp'], data['scp']
                           ,data['cr'], data['scr'], data['tv'], data['atv'], data['atv24h'], data['atp'], data['atp24h'], data['tdt'], data['ttm'], data['ttms']
                           ,data['ab'], data['aav'], data['abv'], data['h52wp'], data['h52wdt'], data['l52wp'], data['l52wdt'], data['ms'], data['its'], data['dd']
                           ,data['mw'], data['tms'], data['st'])
    
                    # 데이터 저장
                    cur.execute(sql, args)
    
                    # 데이터 저장 로깅
                    data_cnt = data_cnt + 1
                    if data_cnt % 1000 == 0:
                        logging.info("[" + str(datetime.datetime.now()) + "] [TICKER_DATA] Inserting Data....[" + str(data_cnt) + "]")
    
                    # 5초마다 종목 정보 재 조회 후 추가된 종목이 있으면 웹소켓 다시 시작
                    if (period.second % 5) == 0 and seconds != period.second:
                        # 중복 실행 방지
                        seconds = period.second
    
                        # 종목 재조회
                        re_subscribe_items = get_subscribe_items()
    
                        # 현재 종목과 다르면 웹소켓 다시 시작
                        if subscribe_items != re_subscribe_items:
                            logging.info('종목 달리짐! 웹소켓 다시 시작')
                            logging.info('\n\n')
                            logging.info('*************************************************')
                            logging.info('기존 종목[' + str(len(subscribe_items)) + '] : ' + str(subscribe_items))
                            logging.info('종목 재조회[' + str(len(re_subscribe_items)) + '] : ' + str(re_subscribe_items))
                            logging.info('*************************************************')
                            logging.info('\n\n')
                            await websocket.close()
                            time.sleep(1)
                            await upbit_ws_client()
    
        # ----------------------------------------
        # 모든 함수의 공통 부분(Exception 처리)
        # ----------------------------------------
        except Exception as e:
            logging.error('Exception Raised!')
            logging.error(e)
            logging.error('Connect Again!')
    
            # 웹소켓 다시 시작
            await upbit_ws_client()
    
    
    # -----------------------------------------------------------------------------
    # - Name : main
    # - Desc : 메인
    # -----------------------------------------------------------------------------
    async def main():
        try:
            # 웹소켓 시작
            await upbit_ws_client()
    
        except Exception as e:
            logging.error('Exception Raised!')
            logging.error(e)
    
    
    # -----------------------------------------------------------------------------
    # - Name : main
    # - Desc : 메인
    # -----------------------------------------------------------------------------
    if __name__ == "__main__":
    
        # noinspection PyBroadException
        try:
            print("***** USAGE ******")
            print("[1] 로그레벨(D:DEBUG, E:ERROR, 그외:INFO)")
    
            if sys.platform.startswith('win32'):
                # 로그레벨(D:DEBUG, E:ERROR, 그외:INFO)
                log_level = 'I'
                upbit.set_loglevel(log_level)
            else:
                # 로그레벨(D:DEBUG, E:ERROR, 그외:INFO)
                log_level = sys.argv[1].upper()
                upbit.set_loglevel(log_level)
    
            if log_level == '':
                logging.error("입력값 오류!")
                sys.exit(-1)
    
            logging.info("***** INPUT ******")
            logging.info("[1] 로그레벨(D:DEBUG, E:ERROR, 그외:INFO):" + str(log_level))
    
            # ---------------------------------------------------------------------
            # Logic Start!
            # ---------------------------------------------------------------------
            # 웹소켓 시작
            asyncio.run(main())
    
        except KeyboardInterrupt:
            logging.error("KeyboardInterrupt Exception 발생!")
            logging.error(traceback.format_exc())
            sys.exit(-100)
    
        except Exception:
            logging.error("Exception 발생!")
            logging.error(traceback.format_exc())
            sys.exit(-200)

    파이썬의 websocket을 이용하여 업비트의 현재가(Ticker) 정보를 구독하여 실시간으로 데이터를 수신하며 데이터가 수신될때마다 DB에 저장하는 로직입니다.

     

    import psycopg2

    파이썬에서 PostgreSQL DB에 접속하여 SQL을 수행하려면 위의 모듈이 필요합니다. 

     

    python -m pip install --upgrade pip
    python -m pip install psycopg2-binary

     

    만약 pip를 이용하여 위의 모듈을 설치할 때 오류가 발생한다면 위의 명령어를 사용하여 PIP 모듈을 최신으로 업그레이드 한 후 psycopg2 대신 psycopg2-binary를 설치해 보시기 바랍니다.

     

    # PostgreSQL 데이터 베이스 연결
    conn = psycopg2.connect(host=upbit.get_env_keyvalue('pg_host')
                           ,dbname=upbit.get_env_keyvalue('pg_dbname')
                           ,user=upbit.get_env_keyvalue('pg_userid')
                           ,password=upbit.get_env_keyvalue('pg_passwd')
                           ,port=upbit.get_env_keyvalue('pg_port'))

    파이썬에서 psycopg2 모듈을 이용해 DB에 접속하는 부분입니다. 접속 정보는 노출되지 않도록 별도 파일(env.txt)에 보관하고 원하는 곳에서 파일을 읽어서 가져 오는 것이 좋습니다. 해당 방법은 아래 포스팅을 참고 부탁 드립니다.

     

    2022.01.25 - [프로젝트/비트코인 자동매매] - 비트코인 자동매매 프로그램 환경변수 파일로 빼기 - 보안 강화

    반응형
     

    비트코인 자동매매 프로그램 환경변수 파일로 빼기 - 보안 강화

    비트코인 자동매매 프로그램을 만들다보면 업비트 API 액세스 키 및 시크릿 키 그리고 텔레그램 키 등 여러가지 보안 정보들을 사용해야 합니다. 지금까지는 제일 간편한 방법으로 각종 Key를 공

    technfin.tistory.com

     

    그 밖에도 새로운 종목이 상장되거나 폐지 될 가능성이 있어 5초 마다 구독 종목 개수를 비교하여 갱신하도록 구현되어 있으며 PostgreSQL의 AutoCommit 기능을 활성화하여 별도의 커밋 명령어 없이 데이터를 저장하도록 했습니다. 프로그램의 로직에 관련하여 궁금한 점이 있으시면 댓글로 문의 부탁 드리겠습니다.

     

    upbit.py
    0.07MB

    프로그램에 사용된 공통 모듈은 위의 파일을 다운로드 받아 사용하시면 됩니다.

     

    프로그램 실행

    python save_ticker_pg.py I

    프로그램을 구동하면 데이터가 DB에 저장되기 시작합니다.

     

    마치며

    과거에 비슷한 내용으로 오라클 DB에 데이터를 저장하는 방법을 포스팅한 적이 있습니다. 하지만 오라클 DB를 제약 없이 사용하기 위해서는 비용이 많이 발생하기 때문에 무료로 사용할 수 있는 PostgreSQL DB를 사용하기로 결정 했습니다. 

     

    다만 DB는 무료로 사용이 가능하다 하더라도 프리티어 무료 서버의 저장공간이 한정되어 있기 때문에 이 부분은 어쩔 수 없이 고려해야 할 것 같습니다.

     

    DB에 저장을 시작한 현 시점(2022-02-23 06:35)에 89G의 저장공간 중 82G가 남아있는 상태이고 PostgreSQL 데이터는 68M 정도 쌓인 상태입니다. 하루에 어느정도의 로우가 쌓이는지, 용량은 얼마나 차지하는지 등을 분석해 보면 보유 가능한 데이터의 양을 계산할 수 있고 해당 내용을 기반으로 정기적으로 데이터를 정리해 주면 계속해서 무료로 사용이 가능할 것이라 판단 됩니다.

     

    추후 용량을 모니터링 해 보고 PostgreSQL에서 데이터를 정리하고 Table Space의 용량을 reclaim하는 방법인 Vaccum을 진행하는 방법에 대해서도 살펴볼 수 있도록 하겠습니다.

     

    블로그를 구독하시면 소식을 조금 더 빨리 받아보실 수 있습니다. 감사합니다.

    반응형