Files
upbit_trader/upbit_candle_collector.py
2026-03-13 04:37:58 +09:00

440 lines
16 KiB
Python

#!/usr/bin/env python3
"""
upbit_candle_collector.py — 업비트 분봉 수집기 (과거 데이터 페이지네이션)
==========================================================================
업비트 REST API: GET /v1/candles/minutes/{unit}
- 인증 불필요 (Public API)
- 1회 최대 200봉 반환
- `to` 파라미터로 기준 시각 이전 봉 반환 → 줄여가며 전체 수집
지원 분봉 단위: 1, 3, 5, 10, 15, 30, 60, 240
실행 예시:
# KRW-BTC 3분봉, 최근 7일
python3 upbit_candle_collector.py --market KRW-BTC --unit 3
# 여러 마켓, 60분봉, 기간 지정
python3 upbit_candle_collector.py --market KRW-BTC,KRW-ETH,KRW-SOL --unit 60 --start 2026-01-01
# 전체 KRW 마켓 3분봉 자동 수집
python3 upbit_candle_collector.py --all-krw --unit 3 --start 2026-03-01
# DB 저장 없이 CSV만 출력
python3 upbit_candle_collector.py --market KRW-BTC --unit 1 --csv
레이트리밋 (업비트):
- 시세 API: 분당 600회 → 요청 간 0.12초 대기 (내장)
"""
import os
import sys
import time
import csv
import argparse
import logging
from datetime import datetime, timedelta
from typing import List, Optional
import requests
import pymysql
import pymysql.cursors
logging.basicConfig(
format="[%(asctime)s] %(message)s",
datefmt="%H:%M:%S",
level=logging.INFO,
)
logger = logging.getLogger("UpbitCollector")
# ── MariaDB 접속 정보 ──────────────────────────────────────────────────────
_DB_CFG = dict(
host="192.168.0.141", port=3306,
user="jae", password="1234",
database="upbit_quant_db",
charset="utf8mb4",
autocommit=True,
cursorclass=pymysql.cursors.DictCursor,
connect_timeout=10,
)
# 업비트 지원 분봉 단위
VALID_UNITS = {1, 3, 5, 10, 15, 30, 60, 240}
_UPBIT_BASE = "https://api.upbit.com/v1"
_last_req = 0.0
def _get(path: str, params: dict = None, max_retry: int = 5) -> Optional[list]:
"""
업비트 REST GET 요청
- 레이트리밋: 분당 600회 → 0.12초 간격 유지
- HTTP 429(Too Many Requests): 점진적 대기 후 재시도
- 네트워크 오류: 지수 백오프 재시도
"""
global _last_req
# 레이트리밋 보호: 0.12초 미만이면 대기
elapsed = time.time() - _last_req
if elapsed < 0.12:
time.sleep(0.12 - elapsed)
_last_req = time.time()
url = f"{_UPBIT_BASE}{path}"
for attempt in range(max_retry):
try:
r = requests.get(url, params=params, timeout=10)
# HTTP 429: Too Many Requests → 점진적 대기
if r.status_code == 429:
wait = 1.0 + attempt * 1.5
logger.warning(f"⏳ HTTP 429 — {wait:.1f}초 대기 (attempt {attempt+1})")
time.sleep(wait)
continue
if r.status_code == 200:
return r.json()
logger.warning(f"⚠️ HTTP {r.status_code}: {r.text[:200]}")
return None
except requests.RequestException as e:
wait = (2 ** attempt) * 0.5
logger.warning(f"⚠️ 네트워크 오류 ({attempt+1}/{max_retry}): {e}{wait:.1f}")
time.sleep(wait)
return None
def fetch_candles(
market: str,
unit: int,
start_dt: datetime,
end_dt: datetime,
) -> List[dict]:
"""
업비트 분봉 페이지네이션 수집
- 업비트는 최신봉이 index 0 (역순) 으로 반환
- `to` 파라미터: 이 시각(exclusive) 이전의 봉 반환
- 한 번에 최대 200봉 → `to`를 과거로 당겨가며 반복
Args:
market : KRW-BTC 등 마켓코드
unit : 분봉 단위 (1/3/5/10/15/30/60/240)
start_dt : 수집 시작 시각
end_dt : 수집 종료 시각
Returns:
[{candle_time, open, high, low, close, volume}, ...] (시간 오름차순)
"""
if unit not in VALID_UNITS:
raise ValueError(f"지원하지 않는 단위: {unit} (허용: {sorted(VALID_UNITS)})")
all_candles = []
# 수집 기준 시각: end_dt부터 역방향으로
to_dt = end_dt + timedelta(minutes=unit) # exclusive라 unit 하나 더함
logger.info(f"📥 [{market}] {unit}분봉 수집 시작: {start_dt:%Y-%m-%d} ~ {end_dt:%Y-%m-%d}")
page = 0
while to_dt > start_dt:
to_str = to_dt.strftime("%Y-%m-%dT%H:%M:%S")
data = _get(f"/candles/minutes/{unit}", {
"market": market,
"to": to_str,
"count": 200,
})
if not data:
logger.warning(f" [{market}] 데이터 없음 (to={to_str})")
break
batch = []
oldest = to_dt
for c in data:
# candle_date_time_kst: "2026-03-09T14:30:00"
kst_str = c.get("candle_date_time_kst", "")[:16]
if not kst_str:
continue
try:
cdt = datetime.strptime(kst_str, "%Y-%m-%dT%H:%M")
except ValueError:
continue
if cdt < start_dt:
continue
if cdt < oldest:
oldest = cdt
batch.append({
"code": market,
"candle_time": cdt.strftime("%Y%m%d%H%M"),
"timeframe": unit,
"open": float(c.get("opening_price", 0)),
"high": float(c.get("high_price", 0)),
"low": float(c.get("low_price", 0)),
"close": float(c.get("trade_price", 0)),
"volume": float(c.get("candle_acc_trade_volume", 0)),
})
all_candles.extend(batch)
page += 1
if batch:
logger.info(f" [{market}] 페이지 {page}: +{len(batch)}봉 (누계 {len(all_candles)}봉) oldest={oldest:%Y-%m-%d %H:%M}")
# 다음 페이지: 이번 배치에서 가장 과거 봉의 1분 전
if oldest <= start_dt:
break # start_dt 이전까지 수집 완료
to_dt = oldest - timedelta(minutes=1)
# API가 200봉 미만 반환 → 더 이상 데이터 없음
if len(data) < 200:
logger.info(f" [{market}] 200봉 미만 반환 → 수집 완료")
break
# 시간 오름차순 정렬
all_candles.sort(key=lambda x: x["candle_time"])
logger.info(f"✅ [{market}] {unit}분봉 {len(all_candles):,}봉 수집 완료")
return all_candles
def save_to_db(candles: List[dict], conn) -> int:
"""
수집한 캔들을 upbit_candles 테이블에 upsert 저장
중복 봉은 OHLCV 업데이트 (ON DUPLICATE KEY UPDATE)
"""
if not candles:
return 0
cur = conn.cursor()
sql = """
INSERT INTO upbit_candles
(code, candle_time, timeframe, open_price, high_price,
low_price, close_price, volume, is_confirmed)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, 1)
ON DUPLICATE KEY UPDATE
open_price = VALUES(open_price),
high_price = VALUES(high_price),
low_price = VALUES(low_price),
close_price = VALUES(close_price),
volume = VALUES(volume)
"""
batch = [
(c["code"], c["candle_time"], c["timeframe"],
c["open"], c["high"], c["low"], c["close"], c["volume"])
for c in candles
]
cur.executemany(sql, batch)
return len(batch)
def save_to_csv(candles: List[dict], path: str):
"""수집한 캔들을 CSV로 저장"""
if not candles:
return
with open(path, "w", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(f, fieldnames=["code","candle_time","timeframe","open","high","low","close","volume"])
writer.writeheader()
writer.writerows(candles)
logger.info(f"💾 CSV 저장: {path} ({len(candles):,}봉)")
def get_all_krw_markets() -> List[str]:
"""업비트 전체 KRW 마켓 목록 조회"""
data = _get("/market/all")
if not data:
return []
markets = [m["market"] for m in data if m["market"].startswith("KRW-")]
logger.info(f"📋 KRW 마켓 {len(markets)}개 로드")
return markets
def get_top_volume_markets(top_n: int) -> List[str]:
"""
24시간 거래대금 기준 상위 N개 KRW 마켓 반환
- /v1/ticker API: acc_trade_price_24h (24h KRW 거래대금) 기준 내림차순 정렬
- 백테스트용으로 유동성이 충분한 마켓만 선별하는 데 사용
"""
# 전체 KRW 마켓 목록 먼저 조회
all_markets = get_all_krw_markets()
if not all_markets:
return []
# ticker API는 한 번에 최대 100개 마켓 조회 가능 → 배치 처리
BATCH = 100
tickers = []
for i in range(0, len(all_markets), BATCH):
batch = all_markets[i:i + BATCH]
result = _get("/ticker", {"markets": ",".join(batch)})
if result:
tickers.extend(result)
time.sleep(0.15) # 레이트리밋 방지
if not tickers:
logger.warning("⚠️ 티커 조회 실패 — 전체 마켓 반환")
return all_markets
# 24h 거래대금 내림차순 정렬
tickers.sort(key=lambda x: float(x.get("acc_trade_price_24h", 0)), reverse=True)
top = [t["market"] for t in tickers[:top_n]]
logger.info(f"🏆 거래대금 상위 {top_n}개 마켓:")
for rank, t in enumerate(tickers[:top_n], 1):
vol_b = float(t.get("acc_trade_price_24h", 0)) / 1e8 # 억원
logger.info(f" {rank:>2}. {t['market']:<15} 24h거래대금 {vol_b:>8,.1f}억원")
return top
def _estimate_time(n_markets: int, unit: int, start_dt: datetime, end_dt: datetime) -> str:
"""수집 예상 시간 계산 (대략적인 추정)"""
days = (end_dt - start_dt).days + 1
candles = days * 24 * 60 // unit # 총 예상 봉 수
pages = candles / 200 # 페이지 수 (200봉/요청)
sec_each = pages * 0.13 # 요청당 ~0.13초
total_sec = n_markets * (sec_each + 0.5) # 마켓 간 0.5초 딜레이
if total_sec < 60:
return f"{total_sec:.0f}"
elif total_sec < 3600:
return f"{total_sec/60:.0f}"
else:
return f"{total_sec/3600:.1f}시간"
# ────────────────────────────────────────────────────────────────────────────
# CLI 진입점
# ────────────────────────────────────────────────────────────────────────────
def main():
today = datetime.now().strftime("%Y-%m-%d")
month_ago = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
parser = argparse.ArgumentParser(
description="업비트 분봉 수집기 (과거 데이터 페이지네이션)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
예시:
# ★ 가장 쉬운 백테스트 준비: 거래대금 상위 30개 마켓, 3분봉, 최근 30일
python3 upbit_candle_collector.py --top-volume 30 --unit 3
# 거래대금 상위 50개, 1분봉, 2개월치
python3 upbit_candle_collector.py --top-volume 50 --unit 1 --start 2026-01-01
# 특정 마켓만
python3 upbit_candle_collector.py --market KRW-BTC,KRW-ETH,KRW-SOL --unit 3
# 전체 KRW 마켓 (시간 매우 오래 걸림)
python3 upbit_candle_collector.py --all-krw --unit 3 --start 2026-03-01
# CSV로만 저장 (DB 저장 안 함)
python3 upbit_candle_collector.py --market KRW-BTC --unit 1 --csv --no-db
""",
)
parser.add_argument("--market", default="", help="마켓코드 (콤마 구분, 예: KRW-BTC,KRW-ETH)")
parser.add_argument("--all-krw", action="store_true", help="전체 KRW 마켓 수집")
parser.add_argument("--top-volume", default=0, type=int, metavar="N",
help="24h 거래대금 상위 N개 마켓 자동 선택 (백테스트 권장)")
parser.add_argument("--unit", default=3, type=int, help=f"분봉 단위 {sorted(VALID_UNITS)}")
parser.add_argument("--start", default=month_ago, help="시작일 (YYYY-MM-DD, 기본: 30일 전)")
parser.add_argument("--end", default=today, help="종료일 (YYYY-MM-DD, 기본: 오늘)")
parser.add_argument("--csv", action="store_true", help="CSV 파일로도 저장")
parser.add_argument("--no-db", action="store_true", help="DB 저장 안 함 (CSV만)")
args = parser.parse_args()
# 입력 검증
if args.unit not in VALID_UNITS:
print(f"❌ 지원하지 않는 단위: {args.unit} 허용값: {sorted(VALID_UNITS)}")
sys.exit(1)
# 마켓 목록 결정
if args.top_volume > 0:
# ★ 거래대금 상위 N개 자동 선택 (백테스트 권장)
markets = get_top_volume_markets(args.top_volume)
elif args.all_krw:
markets = get_all_krw_markets()
elif args.market:
markets = [m.strip().upper() for m in args.market.split(",") if m.strip()]
else:
print("❌ --top-volume N / --market / --all-krw 중 하나 필요")
parser.print_help()
sys.exit(1)
if not markets:
print("❌ 수집할 마켓이 없습니다.")
sys.exit(1)
start_dt = datetime.strptime(args.start, "%Y-%m-%d")
end_dt = datetime.strptime(args.end, "%Y-%m-%d") + timedelta(hours=23, minutes=59)
days = (end_dt - start_dt).days + 1
# 예상 시간 미리 출력
eta = _estimate_time(len(markets), args.unit, start_dt, end_dt)
logger.info(f"📅 수집 기간: {args.start} ~ {args.end} ({days}일)")
logger.info(f"📊 대상 마켓: {len(markets)}개 | 분봉: {args.unit}분 | 예상 소요: {eta}")
logger.info(f"📋 마켓 목록: {', '.join(markets)}")
# DB 연결 (--no-db 아닌 경우)
conn = None
if not args.no_db:
try:
conn = pymysql.connect(**_DB_CFG)
logger.info(f"✅ MariaDB 연결 완료 (upbit_quant_db)")
except Exception as e:
logger.error(f"❌ DB 연결 실패: {e}")
if not args.csv:
sys.exit(1)
total_saved = 0
csv_dir = os.path.dirname(os.path.abspath(__file__))
t_start = time.time()
try:
for i, market in enumerate(markets):
elapsed = time.time() - t_start
if i > 0:
avg_sec = elapsed / i
remain = avg_sec * (len(markets) - i)
eta_str = f" 남은시간 약 {remain/60:.0f}" if remain >= 60 else f" 남은시간 약 {remain:.0f}"
else:
eta_str = ""
logger.info(f"\n[{i+1}/{len(markets)}] ── {market} {args.unit}분봉 ──{eta_str}")
candles = fetch_candles(market, args.unit, start_dt, end_dt)
if not candles:
logger.warning(f" [{market}] 수집된 봉 없음 — 스킵")
continue
# DB 저장
if conn and not args.no_db:
saved = save_to_db(candles, conn)
total_saved += saved
logger.info(f" [{market}] DB 저장: {saved:,}")
# CSV 저장
if args.csv or args.no_db:
csv_path = os.path.join(
csv_dir,
f"{market.replace('-','_')}_{args.unit}m_{args.start}_{args.end}.csv"
)
save_to_csv(candles, csv_path)
# 마켓 간 딜레이 (API 부하 방지)
if i < len(markets) - 1:
time.sleep(0.5)
finally:
if conn:
conn.close()
elapsed_total = time.time() - t_start
logger.info(
f"\n🎉 완료! 총 DB 저장: {total_saved:,}봉 ({args.unit}분봉) "
f"| 소요시간: {elapsed_total/60:.1f}"
)
if __name__ == "__main__":
main()