440 lines
16 KiB
Python
440 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
upbit_candle_collector.py — 업비트 분봉 수집기 (과거 데이터 페이지네이션)
|
|
==========================================================================
|
|
업비트 REST API: GET /v1/candles/minutes/{unit}
|
|
- 인증 불필요 (Public API)
|
|
- 1회 최대 200봉 반환
|
|
- `to` 파라미터로 기준 시각 이전 봉 반환 → 줄여가며 전체 수집
|
|
|
|
지원 분봉 단위: 1, 3, 5, 10, 15, 30, 60, 240
|
|
|
|
실행 예시:
|
|
# KRW-BTC 3분봉, 최근 7일
|
|
python3 upbit_candle_collector.py --market KRW-BTC --unit 3
|
|
|
|
# 여러 마켓, 60분봉, 기간 지정
|
|
python3 upbit_candle_collector.py --market KRW-BTC,KRW-ETH,KRW-SOL --unit 60 --start 2026-01-01
|
|
|
|
# 전체 KRW 마켓 3분봉 자동 수집
|
|
python3 upbit_candle_collector.py --all-krw --unit 3 --start 2026-03-01
|
|
|
|
# DB 저장 없이 CSV만 출력
|
|
python3 upbit_candle_collector.py --market KRW-BTC --unit 1 --csv
|
|
|
|
레이트리밋 (업비트):
|
|
- 시세 API: 분당 600회 → 요청 간 0.12초 대기 (내장)
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import csv
|
|
import argparse
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Optional
|
|
|
|
import requests
|
|
import pymysql
|
|
import pymysql.cursors
|
|
|
|
logging.basicConfig(
|
|
format="[%(asctime)s] %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
level=logging.INFO,
|
|
)
|
|
logger = logging.getLogger("UpbitCollector")
|
|
|
|
# ── MariaDB 접속 정보 ──────────────────────────────────────────────────────
|
|
_DB_CFG = dict(
|
|
host="192.168.0.141", port=3306,
|
|
user="jae", password="1234",
|
|
database="upbit_quant_db",
|
|
charset="utf8mb4",
|
|
autocommit=True,
|
|
cursorclass=pymysql.cursors.DictCursor,
|
|
connect_timeout=10,
|
|
)
|
|
|
|
# 업비트 지원 분봉 단위
|
|
VALID_UNITS = {1, 3, 5, 10, 15, 30, 60, 240}
|
|
|
|
_UPBIT_BASE = "https://api.upbit.com/v1"
|
|
_last_req = 0.0
|
|
|
|
|
|
def _get(path: str, params: dict = None, max_retry: int = 5) -> Optional[list]:
|
|
"""
|
|
업비트 REST GET 요청
|
|
- 레이트리밋: 분당 600회 → 0.12초 간격 유지
|
|
- HTTP 429(Too Many Requests): 점진적 대기 후 재시도
|
|
- 네트워크 오류: 지수 백오프 재시도
|
|
"""
|
|
global _last_req
|
|
# 레이트리밋 보호: 0.12초 미만이면 대기
|
|
elapsed = time.time() - _last_req
|
|
if elapsed < 0.12:
|
|
time.sleep(0.12 - elapsed)
|
|
_last_req = time.time()
|
|
|
|
url = f"{_UPBIT_BASE}{path}"
|
|
for attempt in range(max_retry):
|
|
try:
|
|
r = requests.get(url, params=params, timeout=10)
|
|
|
|
# HTTP 429: Too Many Requests → 점진적 대기
|
|
if r.status_code == 429:
|
|
wait = 1.0 + attempt * 1.5
|
|
logger.warning(f"⏳ HTTP 429 — {wait:.1f}초 대기 (attempt {attempt+1})")
|
|
time.sleep(wait)
|
|
continue
|
|
|
|
if r.status_code == 200:
|
|
return r.json()
|
|
|
|
logger.warning(f"⚠️ HTTP {r.status_code}: {r.text[:200]}")
|
|
return None
|
|
|
|
except requests.RequestException as e:
|
|
wait = (2 ** attempt) * 0.5
|
|
logger.warning(f"⚠️ 네트워크 오류 ({attempt+1}/{max_retry}): {e} → {wait:.1f}초")
|
|
time.sleep(wait)
|
|
|
|
return None
|
|
|
|
|
|
def fetch_candles(
|
|
market: str,
|
|
unit: int,
|
|
start_dt: datetime,
|
|
end_dt: datetime,
|
|
) -> List[dict]:
|
|
"""
|
|
업비트 분봉 페이지네이션 수집
|
|
- 업비트는 최신봉이 index 0 (역순) 으로 반환
|
|
- `to` 파라미터: 이 시각(exclusive) 이전의 봉 반환
|
|
- 한 번에 최대 200봉 → `to`를 과거로 당겨가며 반복
|
|
|
|
Args:
|
|
market : KRW-BTC 등 마켓코드
|
|
unit : 분봉 단위 (1/3/5/10/15/30/60/240)
|
|
start_dt : 수집 시작 시각
|
|
end_dt : 수집 종료 시각
|
|
|
|
Returns:
|
|
[{candle_time, open, high, low, close, volume}, ...] (시간 오름차순)
|
|
"""
|
|
if unit not in VALID_UNITS:
|
|
raise ValueError(f"지원하지 않는 단위: {unit} (허용: {sorted(VALID_UNITS)})")
|
|
|
|
all_candles = []
|
|
# 수집 기준 시각: end_dt부터 역방향으로
|
|
to_dt = end_dt + timedelta(minutes=unit) # exclusive라 unit 하나 더함
|
|
|
|
logger.info(f"📥 [{market}] {unit}분봉 수집 시작: {start_dt:%Y-%m-%d} ~ {end_dt:%Y-%m-%d}")
|
|
page = 0
|
|
|
|
while to_dt > start_dt:
|
|
to_str = to_dt.strftime("%Y-%m-%dT%H:%M:%S")
|
|
data = _get(f"/candles/minutes/{unit}", {
|
|
"market": market,
|
|
"to": to_str,
|
|
"count": 200,
|
|
})
|
|
|
|
if not data:
|
|
logger.warning(f" [{market}] 데이터 없음 (to={to_str})")
|
|
break
|
|
|
|
batch = []
|
|
oldest = to_dt
|
|
|
|
for c in data:
|
|
# candle_date_time_kst: "2026-03-09T14:30:00"
|
|
kst_str = c.get("candle_date_time_kst", "")[:16]
|
|
if not kst_str:
|
|
continue
|
|
try:
|
|
cdt = datetime.strptime(kst_str, "%Y-%m-%dT%H:%M")
|
|
except ValueError:
|
|
continue
|
|
|
|
if cdt < start_dt:
|
|
continue
|
|
|
|
if cdt < oldest:
|
|
oldest = cdt
|
|
|
|
batch.append({
|
|
"code": market,
|
|
"candle_time": cdt.strftime("%Y%m%d%H%M"),
|
|
"timeframe": unit,
|
|
"open": float(c.get("opening_price", 0)),
|
|
"high": float(c.get("high_price", 0)),
|
|
"low": float(c.get("low_price", 0)),
|
|
"close": float(c.get("trade_price", 0)),
|
|
"volume": float(c.get("candle_acc_trade_volume", 0)),
|
|
})
|
|
|
|
all_candles.extend(batch)
|
|
page += 1
|
|
|
|
if batch:
|
|
logger.info(f" [{market}] 페이지 {page}: +{len(batch)}봉 (누계 {len(all_candles)}봉) oldest={oldest:%Y-%m-%d %H:%M}")
|
|
|
|
# 다음 페이지: 이번 배치에서 가장 과거 봉의 1분 전
|
|
if oldest <= start_dt:
|
|
break # start_dt 이전까지 수집 완료
|
|
|
|
to_dt = oldest - timedelta(minutes=1)
|
|
|
|
# API가 200봉 미만 반환 → 더 이상 데이터 없음
|
|
if len(data) < 200:
|
|
logger.info(f" [{market}] 200봉 미만 반환 → 수집 완료")
|
|
break
|
|
|
|
# 시간 오름차순 정렬
|
|
all_candles.sort(key=lambda x: x["candle_time"])
|
|
logger.info(f"✅ [{market}] {unit}분봉 {len(all_candles):,}봉 수집 완료")
|
|
return all_candles
|
|
|
|
|
|
def save_to_db(candles: List[dict], conn) -> int:
|
|
"""
|
|
수집한 캔들을 upbit_candles 테이블에 upsert 저장
|
|
중복 봉은 OHLCV 업데이트 (ON DUPLICATE KEY UPDATE)
|
|
"""
|
|
if not candles:
|
|
return 0
|
|
|
|
cur = conn.cursor()
|
|
sql = """
|
|
INSERT INTO upbit_candles
|
|
(code, candle_time, timeframe, open_price, high_price,
|
|
low_price, close_price, volume, is_confirmed)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, 1)
|
|
ON DUPLICATE KEY UPDATE
|
|
open_price = VALUES(open_price),
|
|
high_price = VALUES(high_price),
|
|
low_price = VALUES(low_price),
|
|
close_price = VALUES(close_price),
|
|
volume = VALUES(volume)
|
|
"""
|
|
batch = [
|
|
(c["code"], c["candle_time"], c["timeframe"],
|
|
c["open"], c["high"], c["low"], c["close"], c["volume"])
|
|
for c in candles
|
|
]
|
|
cur.executemany(sql, batch)
|
|
return len(batch)
|
|
|
|
|
|
def save_to_csv(candles: List[dict], path: str):
|
|
"""수집한 캔들을 CSV로 저장"""
|
|
if not candles:
|
|
return
|
|
with open(path, "w", newline="", encoding="utf-8-sig") as f:
|
|
writer = csv.DictWriter(f, fieldnames=["code","candle_time","timeframe","open","high","low","close","volume"])
|
|
writer.writeheader()
|
|
writer.writerows(candles)
|
|
logger.info(f"💾 CSV 저장: {path} ({len(candles):,}봉)")
|
|
|
|
|
|
def get_all_krw_markets() -> List[str]:
|
|
"""업비트 전체 KRW 마켓 목록 조회"""
|
|
data = _get("/market/all")
|
|
if not data:
|
|
return []
|
|
markets = [m["market"] for m in data if m["market"].startswith("KRW-")]
|
|
logger.info(f"📋 KRW 마켓 {len(markets)}개 로드")
|
|
return markets
|
|
|
|
|
|
def get_top_volume_markets(top_n: int) -> List[str]:
|
|
"""
|
|
24시간 거래대금 기준 상위 N개 KRW 마켓 반환
|
|
- /v1/ticker API: acc_trade_price_24h (24h KRW 거래대금) 기준 내림차순 정렬
|
|
- 백테스트용으로 유동성이 충분한 마켓만 선별하는 데 사용
|
|
"""
|
|
# 전체 KRW 마켓 목록 먼저 조회
|
|
all_markets = get_all_krw_markets()
|
|
if not all_markets:
|
|
return []
|
|
|
|
# ticker API는 한 번에 최대 100개 마켓 조회 가능 → 배치 처리
|
|
BATCH = 100
|
|
tickers = []
|
|
for i in range(0, len(all_markets), BATCH):
|
|
batch = all_markets[i:i + BATCH]
|
|
result = _get("/ticker", {"markets": ",".join(batch)})
|
|
if result:
|
|
tickers.extend(result)
|
|
time.sleep(0.15) # 레이트리밋 방지
|
|
|
|
if not tickers:
|
|
logger.warning("⚠️ 티커 조회 실패 — 전체 마켓 반환")
|
|
return all_markets
|
|
|
|
# 24h 거래대금 내림차순 정렬
|
|
tickers.sort(key=lambda x: float(x.get("acc_trade_price_24h", 0)), reverse=True)
|
|
|
|
top = [t["market"] for t in tickers[:top_n]]
|
|
logger.info(f"🏆 거래대금 상위 {top_n}개 마켓:")
|
|
for rank, t in enumerate(tickers[:top_n], 1):
|
|
vol_b = float(t.get("acc_trade_price_24h", 0)) / 1e8 # 억원
|
|
logger.info(f" {rank:>2}. {t['market']:<15} 24h거래대금 {vol_b:>8,.1f}억원")
|
|
return top
|
|
|
|
|
|
def _estimate_time(n_markets: int, unit: int, start_dt: datetime, end_dt: datetime) -> str:
|
|
"""수집 예상 시간 계산 (대략적인 추정)"""
|
|
days = (end_dt - start_dt).days + 1
|
|
candles = days * 24 * 60 // unit # 총 예상 봉 수
|
|
pages = candles / 200 # 페이지 수 (200봉/요청)
|
|
sec_each = pages * 0.13 # 요청당 ~0.13초
|
|
total_sec = n_markets * (sec_each + 0.5) # 마켓 간 0.5초 딜레이
|
|
if total_sec < 60:
|
|
return f"{total_sec:.0f}초"
|
|
elif total_sec < 3600:
|
|
return f"{total_sec/60:.0f}분"
|
|
else:
|
|
return f"{total_sec/3600:.1f}시간"
|
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────────
|
|
# CLI 진입점
|
|
# ────────────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
today = datetime.now().strftime("%Y-%m-%d")
|
|
month_ago = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="업비트 분봉 수집기 (과거 데이터 페이지네이션)",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
예시:
|
|
# ★ 가장 쉬운 백테스트 준비: 거래대금 상위 30개 마켓, 3분봉, 최근 30일
|
|
python3 upbit_candle_collector.py --top-volume 30 --unit 3
|
|
|
|
# 거래대금 상위 50개, 1분봉, 2개월치
|
|
python3 upbit_candle_collector.py --top-volume 50 --unit 1 --start 2026-01-01
|
|
|
|
# 특정 마켓만
|
|
python3 upbit_candle_collector.py --market KRW-BTC,KRW-ETH,KRW-SOL --unit 3
|
|
|
|
# 전체 KRW 마켓 (시간 매우 오래 걸림)
|
|
python3 upbit_candle_collector.py --all-krw --unit 3 --start 2026-03-01
|
|
|
|
# CSV로만 저장 (DB 저장 안 함)
|
|
python3 upbit_candle_collector.py --market KRW-BTC --unit 1 --csv --no-db
|
|
""",
|
|
)
|
|
parser.add_argument("--market", default="", help="마켓코드 (콤마 구분, 예: KRW-BTC,KRW-ETH)")
|
|
parser.add_argument("--all-krw", action="store_true", help="전체 KRW 마켓 수집")
|
|
parser.add_argument("--top-volume", default=0, type=int, metavar="N",
|
|
help="24h 거래대금 상위 N개 마켓 자동 선택 (백테스트 권장)")
|
|
parser.add_argument("--unit", default=3, type=int, help=f"분봉 단위 {sorted(VALID_UNITS)}")
|
|
parser.add_argument("--start", default=month_ago, help="시작일 (YYYY-MM-DD, 기본: 30일 전)")
|
|
parser.add_argument("--end", default=today, help="종료일 (YYYY-MM-DD, 기본: 오늘)")
|
|
parser.add_argument("--csv", action="store_true", help="CSV 파일로도 저장")
|
|
parser.add_argument("--no-db", action="store_true", help="DB 저장 안 함 (CSV만)")
|
|
args = parser.parse_args()
|
|
|
|
# 입력 검증
|
|
if args.unit not in VALID_UNITS:
|
|
print(f"❌ 지원하지 않는 단위: {args.unit} 허용값: {sorted(VALID_UNITS)}")
|
|
sys.exit(1)
|
|
|
|
# 마켓 목록 결정
|
|
if args.top_volume > 0:
|
|
# ★ 거래대금 상위 N개 자동 선택 (백테스트 권장)
|
|
markets = get_top_volume_markets(args.top_volume)
|
|
elif args.all_krw:
|
|
markets = get_all_krw_markets()
|
|
elif args.market:
|
|
markets = [m.strip().upper() for m in args.market.split(",") if m.strip()]
|
|
else:
|
|
print("❌ --top-volume N / --market / --all-krw 중 하나 필요")
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
if not markets:
|
|
print("❌ 수집할 마켓이 없습니다.")
|
|
sys.exit(1)
|
|
|
|
start_dt = datetime.strptime(args.start, "%Y-%m-%d")
|
|
end_dt = datetime.strptime(args.end, "%Y-%m-%d") + timedelta(hours=23, minutes=59)
|
|
days = (end_dt - start_dt).days + 1
|
|
|
|
# 예상 시간 미리 출력
|
|
eta = _estimate_time(len(markets), args.unit, start_dt, end_dt)
|
|
logger.info(f"📅 수집 기간: {args.start} ~ {args.end} ({days}일)")
|
|
logger.info(f"📊 대상 마켓: {len(markets)}개 | 분봉: {args.unit}분 | 예상 소요: {eta}")
|
|
logger.info(f"📋 마켓 목록: {', '.join(markets)}")
|
|
|
|
# DB 연결 (--no-db 아닌 경우)
|
|
conn = None
|
|
if not args.no_db:
|
|
try:
|
|
conn = pymysql.connect(**_DB_CFG)
|
|
logger.info(f"✅ MariaDB 연결 완료 (upbit_quant_db)")
|
|
except Exception as e:
|
|
logger.error(f"❌ DB 연결 실패: {e}")
|
|
if not args.csv:
|
|
sys.exit(1)
|
|
|
|
total_saved = 0
|
|
csv_dir = os.path.dirname(os.path.abspath(__file__))
|
|
t_start = time.time()
|
|
|
|
try:
|
|
for i, market in enumerate(markets):
|
|
elapsed = time.time() - t_start
|
|
if i > 0:
|
|
avg_sec = elapsed / i
|
|
remain = avg_sec * (len(markets) - i)
|
|
eta_str = f" 남은시간 약 {remain/60:.0f}분" if remain >= 60 else f" 남은시간 약 {remain:.0f}초"
|
|
else:
|
|
eta_str = ""
|
|
logger.info(f"\n[{i+1}/{len(markets)}] ── {market} {args.unit}분봉 ──{eta_str}")
|
|
|
|
candles = fetch_candles(market, args.unit, start_dt, end_dt)
|
|
|
|
if not candles:
|
|
logger.warning(f" [{market}] 수집된 봉 없음 — 스킵")
|
|
continue
|
|
|
|
# DB 저장
|
|
if conn and not args.no_db:
|
|
saved = save_to_db(candles, conn)
|
|
total_saved += saved
|
|
logger.info(f" [{market}] DB 저장: {saved:,}봉")
|
|
|
|
# CSV 저장
|
|
if args.csv or args.no_db:
|
|
csv_path = os.path.join(
|
|
csv_dir,
|
|
f"{market.replace('-','_')}_{args.unit}m_{args.start}_{args.end}.csv"
|
|
)
|
|
save_to_csv(candles, csv_path)
|
|
|
|
# 마켓 간 딜레이 (API 부하 방지)
|
|
if i < len(markets) - 1:
|
|
time.sleep(0.5)
|
|
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
elapsed_total = time.time() - t_start
|
|
logger.info(
|
|
f"\n🎉 완료! 총 DB 저장: {total_saved:,}봉 ({args.unit}분봉) "
|
|
f"| 소요시간: {elapsed_total/60:.1f}분"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|