#!/usr/bin/env python3 """ upbit_backtest_web.py — 업비트 단타 백테스트 & 대시보드 ========================================================= 실행: python3 upbit_backtest_web.py 접속: http://localhost:6060 탭1. 실거래 분석 → trade_history 기반 실제 매매 결과 탭2. 꼬리잡기 백테스트 → upbit_candles 분봉 가격 재현(Price-Replay) 백테스트 탭3. ENV 설정 → env_config 조회/수정 (업비트 전용 파라미터) 탭4. 캔들 수집 → 업비트 REST API로 분봉 수집 후 DB 저장 DB: MariaDB 192.168.0.141:3306/upbit_quant_db """ import sys, os, json, time, logging, threading, hashlib, uuid from datetime import datetime, timedelta from typing import List, Dict, Optional from urllib.parse import urlencode import requests import pymysql import pymysql.cursors from flask import Flask, jsonify, request, render_template_string logging.basicConfig(level=logging.INFO, format="[%(asctime)s] %(message)s", datefmt="%H:%M:%S") logger = logging.getLogger("upbit_backtest_web") app = Flask(__name__) # ── MariaDB 접속 정보 ────────────────────────────────────────────────────── _DB_CFG = dict( host="192.168.0.141", port=3306, user="jae", password="1234", database="upbit_quant_db", charset="utf8mb4", autocommit=True, cursorclass=pymysql.cursors.DictCursor, connect_timeout=10, ) def _db(): """요청마다 새 MariaDB 연결 반환""" conn = pymysql.connect(**_DB_CFG) return conn # ── 업비트 REST 클라이언트 (캔들 수집용) ────────────────────────────────── _UPBIT_BASE = "https://api.upbit.com/v1" _last_req_t = 0.0 def _upbit_get(path: str, params: dict = None, auth_key: str = "", auth_secret: str = "") -> Optional[dict]: """업비트 REST GET (레이트리밋 0.12초 보호, HTTP 429 재시도)""" global _last_req_t elapsed = time.time() - _last_req_t if elapsed < 0.12: time.sleep(0.12 - elapsed) _last_req_t = time.time() url = f"{_UPBIT_BASE}{path}" hdrs = {} if auth_key and auth_secret: try: import jwt as pyjwt payload = {"access_key": auth_key, "nonce": str(uuid.uuid4())} qs = urlencode(params or {}, doseq=True) if qs: payload["query_hash"] = hashlib.sha512(qs.encode()).hexdigest() payload["query_hash_alg"] = "SHA512" hdrs["Authorization"] = f"Bearer {pyjwt.encode(payload, auth_secret, algorithm='HS256')}" except ImportError: pass for attempt in range(4): try: r = requests.get(url, params=params, headers=hdrs, timeout=10) if r.status_code == 429: time.sleep(1.5 + attempt) continue if r.status_code == 200: return r.json() return None except Exception as e: time.sleep((2 ** attempt) * 0.5) return None # ── 캔들 수집 백그라운드 Job 상태 ───────────────────────────────────────── _fetch_jobs: Dict[str, dict] = {} # ──────────────────────────────────────────────────────────────────────────── # RSI 시리즈 계산 헬퍼 # ──────────────────────────────────────────────────────────────────────────── def _compute_rsi_series(closes: list, period: int = 14) -> list: """RSI 시리즈 계산 (Wilder 스무딩)""" rsi_list = [None] * len(closes) if len(closes) < period + 1: return rsi_list deltas = [closes[i] - closes[i-1] for i in range(1, len(closes))] gains = [max(d, 0) for d in deltas] losses = [max(-d, 0) for d in deltas] avg_g = sum(gains[:period]) / period avg_l = sum(losses[:period]) / period for i in range(period, len(closes)): idx = i - 1 if i > period: avg_g = (avg_g * (period-1) + gains[idx]) / period avg_l = (avg_l * (period-1) + losses[idx]) / period rs = avg_g / avg_l if avg_l > 0 else float("inf") rsi_list[i] = 100 - (100 / (1 + rs)) if avg_l > 0 else 100.0 return rsi_list # ──────────────────────────────────────────────────────────────────────────── # API: 실거래 분석 # ──────────────────────────────────────────────────────────────────────────── @app.route("/api/actual", methods=["GET"]) def api_actual(): """trade_history 기반 실제 매매 성과 분석""" strategy = request.args.get("strategy", "UPBIT_TAIL_CATCH") start = request.args.get("start", "") end = request.args.get("end", "") conn = _db() try: cur = conn.cursor() params = [strategy] sql = "SELECT * FROM trade_history WHERE strategy = %s" if start: sql += " AND sell_date >= %s" params.append(start + " 00:00:00") if end: sql += " AND sell_date <= %s" params.append(end + " 23:59:59") sql += " ORDER BY sell_date ASC" cur.execute(sql, params) trades = cur.fetchall() or [] # 날짜 직렬화 for t in trades: for k in ("buy_date", "sell_date"): if t.get(k): t[k] = str(t[k]) # 누적 손익 계산 equity, cum_pnl = [], 0.0 for t in trades: cum_pnl += float(t.get("realized_pnl") or 0) equity.append({ "date": str(t.get("sell_date") or "")[:10], "cum_pnl": round(cum_pnl), "pnl": round(float(t.get("realized_pnl") or 0)), }) total = len(trades) wins = [t for t in trades if float(t.get("realized_pnl") or 0) > 0] losses = [t for t in trades if float(t.get("realized_pnl") or 0) < 0] total_pnl = sum(float(t.get("realized_pnl") or 0) for t in trades) avg_hold = (sum(float(t.get("hold_minutes") or 0) for t in trades) / total) if total else 0 win_pnl = sum(float(t.get("realized_pnl") or 0) for t in wins) loss_pnl = sum(float(t.get("realized_pnl") or 0) for t in losses) pf = round(abs(win_pnl / loss_pnl), 2) if loss_pnl != 0 else 9999.0 # MDD peak, mdd, cum = 0.0, 0.0, 0.0 for t in trades: cum += float(t.get("realized_pnl") or 0) if cum > peak: peak = cum dd = peak - cum if dd > mdd: mdd = dd # 매도 이유별 집계 reasons: Dict[str, int] = {} for t in trades: r = t.get("sell_reason") or "기타" reasons[r] = reasons.get(r, 0) + 1 # 일별 P&L daily: Dict[str, float] = {} for t in trades: day = str(t.get("sell_date") or "")[:10] if day: daily[day] = daily.get(day, 0) + float(t.get("realized_pnl") or 0) daily_list = [{"date": d, "pnl": round(v)} for d, v in sorted(daily.items())] # 종목별 상위 손익 code_pnl: Dict[str, float] = {} code_name: Dict[str, str] = {} for t in trades: c = t["code"] code_pnl[c] = code_pnl.get(c, 0) + float(t.get("realized_pnl") or 0) code_name[c] = t.get("name") or c top_codes = sorted(code_pnl.items(), key=lambda x: x[1], reverse=True)[:10] top_list = [{"code": c, "name": code_name[c], "pnl": round(v)} for c, v in top_codes] return jsonify({ "summary": { "total_trades": total, "win_trades": len(wins), "loss_trades": len(losses), "win_rate": round(len(wins) / total * 100, 1) if total else 0, "total_pnl": round(total_pnl), "avg_hold_min": round(avg_hold, 1), "profit_factor": pf, "max_drawdown": round(mdd), }, "equity": equity, "daily": daily_list, "reasons": reasons, "top_codes": top_list, "trades": trades[-200:], }) finally: conn.close() # ──────────────────────────────────────────────────────────────────────────── # API: 꼬리잡기 백테스트 (upbit_candles 기반) # ──────────────────────────────────────────────────────────────────────────── @app.route("/api/backtest/tail", methods=["GET"]) def api_backtest_tail(): """ 업비트 꼬리잡기 전략 가격 재현 백테스트. entry 조건: 당일 낙폭 + 회복률 + 망치봉 꼬리 + RSI exit 조건: 손절 / 익절 / 어깨 컷(trailing) / 24h 강제청산(코인은 24h 장) """ start = request.args.get("start", "") end = request.args.get("end", "") # 코인 필터: 콤마 구분으로 특정 마켓만 선택 (빈 값이면 전체) market_filter = request.args.get("markets", "").strip().upper() rsi_period = int( request.args.get("rsi_period", 14)) rsi_threshold = float(request.args.get("rsi_threshold", 78)) # RSI 과열 기준 min_drop_rate = float(request.args.get("min_drop_rate", 3.0)) / 100 min_recovery_ratio = float(request.args.get("min_recovery_ratio", 50)) / 100 max_rec = float(request.args.get("max_rec", 80)) / 100 tail_ratio_min = float(request.args.get("tail_ratio_min", 1.5)) tail_pct_min = float(request.args.get("tail_pct_min", 0.3)) / 100 sl_pct = float(request.args.get("sl_pct", 3.0)) / 100 tp_pct = float(request.args.get("tp_pct", 5.0)) / 100 shoulder_min_high = float(request.args.get("shoulder_min_high", 1.5)) / 100 shoulder_cut_pct = float(request.args.get("shoulder_cut_pct", 3.0)) / 100 high_chase_thr = float(request.args.get("high_chase_thr", 96.0)) / 100 slot_money = float(request.args.get("slot_money", 100_000)) fee_rate = float(request.args.get("fee_rate", 0.05)) / 100 # 업비트 0.05% 편도 cooldown_min = int( request.args.get("cooldown_min", 30)) max_hold_hours = float(request.args.get("max_hold_hours", 24)) # 코인: 24시간 강제청산 max_daily = int( request.args.get("max_daily", 5)) timeframe = int( request.args.get("timeframe", 3)) # 분봉 단위 (기본 3분봉) conn = _db() try: start_key = (start.replace("-", "") + "0000") if start else "20260101" end_key = (end.replace("-", "") + "2359") if end else "99991231" cur = conn.cursor() # 코인 필터 적용: 특정 마켓만 선택하거나 전체 조회 filter_list = [m.strip() for m in market_filter.split(",") if m.strip()] if market_filter else [] if filter_list: fmt = ",".join(["%s"] * len(filter_list)) cur.execute( f"SELECT DISTINCT code FROM upbit_candles " f"WHERE timeframe=%s AND candle_time >= %s AND candle_time <= %s " f"AND code IN ({fmt}) ORDER BY code", [timeframe, start_key, end_key] + filter_list ) else: cur.execute( "SELECT DISTINCT code FROM upbit_candles " "WHERE timeframe=%s AND candle_time >= %s AND candle_time <= %s ORDER BY code", [timeframe, start_key, end_key] ) codes = [r["code"] for r in (cur.fetchall() or [])] all_trades: List[Dict] = [] def _t2dt(t: str) -> datetime: """YYYYMMDDHHMI → datetime""" return datetime.strptime(t, "%Y%m%d%H%M") for code in codes: cur.execute( "SELECT candle_time, open_price, high_price, low_price, close_price, volume " "FROM upbit_candles " "WHERE timeframe=%s AND code=%s " "AND candle_time >= %s AND candle_time <= %s " "AND is_confirmed=1 " "ORDER BY candle_time ASC", [timeframe, code, start_key, end_key] ) rows = cur.fetchall() or [] if len(rows) < rsi_period + 5: continue candles = [dict(r) for r in rows] closes = [float(c["close_price"]) for c in candles] rsis = _compute_rsi_series(closes, rsi_period) position = None last_exit_dt: Dict[str, datetime] = {} daily_cnt: Dict[str, int] = {} # look-ahead 없는 당일 누적 OHLC cur_day = None running_open = 0.0 running_high = 0.0 running_low = 0.0 for i in range(rsi_period + 1, len(candles)): c = candles[i] day = c["candle_time"][:8] op = float(c["open_price"]) hi = float(c["high_price"]) lo = float(c["low_price"]) cl = float(c["close_price"]) # ── 당일 누적 OHLC 갱신 (선행 편향 없음) ───────────────── if day != cur_day: cur_day = day running_open = op running_high = hi running_low = lo if lo > 0 else hi else: running_high = max(running_high, hi) if lo > 0: running_low = min(running_low, lo) # ── 포지션 보유 중: 청산 체크 ───────────────────────────── if position is not None: max_p = max(position["max_price"], hi) position["max_price"] = max_p reason = None exit_price = cl # 손절: 캔들 저가가 손절선 이하 if lo <= position["stop"]: reason = "손절" exit_price = position["stop"] # 익절: 캔들 고가가 익절선 이상 elif hi >= position["target"]: reason = "익절" exit_price = position["target"] # 어깨매도: 고점에서 shoulder_cut_pct 하락 (수익 보존) elif max_p >= position["entry_price"] * (1 + shoulder_min_high): trail_stop = max_p * (1 - shoulder_cut_pct) if cl <= trail_stop: reason = "어깨매도" exit_price = cl # 24시간 강제청산 (코인은 장 마감 없음 → max_hold_hours로 대체) if reason is None: held_hours = (_t2dt(c["candle_time"]) - _t2dt(position["entry_time"])).total_seconds() / 3600 if held_hours >= max_hold_hours: reason = "보유시간초과" exit_price = cl if reason: qty = position["qty"] buy_amt = position["entry_price"] * qty sell_amt = exit_price * qty # 업비트 수수료: 매수/매도 각각 fee_rate (증권거래세 없음) pnl = (sell_amt - buy_amt - buy_amt * fee_rate - sell_amt * fee_rate) hold_min = int((_t2dt(c["candle_time"]) - _t2dt(position["entry_time"])).total_seconds() / 60) all_trades.append({ "code": code, "buy_time": position["entry_time"], "sell_time": c["candle_time"], "buy_price": position["entry_price"], "sell_price": round(exit_price, 4), "qty": qty, "pnl": round(pnl), "profit_rate": round((exit_price - position["entry_price"]) / position["entry_price"] * 100, 2), "hold_min": hold_min, "sell_reason": reason, "rsi_entry": round(position["rsi"], 1), }) last_exit_dt[day] = _t2dt(c["candle_time"]) position = None continue # ── 포지션 없음: 매수 신호 체크 ────────────────────────── # 쿨다운 체크 if day in last_exit_dt: elapsed = (_t2dt(c["candle_time"]) - last_exit_dt[day]).total_seconds() / 60 if elapsed < cooldown_min: continue # 일일 최대 거래횟수 if daily_cnt.get(day, 0) >= max_daily: continue # RSI 과열 방지 rsi = rsis[i] if rsi is None or rsi >= rsi_threshold: continue # 낙폭 필터 (look-ahead 없는 running_low 사용) if running_open <= 0: continue drop_rate = (running_open - running_low) / running_open if drop_rate < min_drop_rate: continue # 회복률 필터 day_range = running_high - running_low if day_range <= 0: continue recovery = (cl - running_low) / day_range if not (min_recovery_ratio <= recovery <= max_rec): continue # 망치봉 꼬리 계산 body_top = max(op, cl) body_bottom = min(op, cl) body_len = max(body_top - body_bottom, 1.0) tail_len = max(body_bottom - lo, 0.0) tail_ratio = tail_len / body_len tail_pct = tail_len / lo if lo > 0 else 0 if tail_ratio < tail_ratio_min or tail_pct < tail_pct_min: continue # 고점 추격 방지 if cl >= running_high * high_chase_thr: continue # 진입 가격: 신호봉 다음봉 시가 (현실적 체결 가정) if i + 1 >= len(candles): continue next_c = candles[i + 1] entry_price = float(next_c["open_price"]) if entry_price <= 0: continue qty = slot_money / entry_price stop = entry_price * (1 - sl_pct) target = entry_price * (1 + tp_pct) position = { "entry_price": entry_price, "entry_time": next_c["candle_time"], "qty": qty, "stop": stop, "target": target, "max_price": entry_price, "rsi": rsi, } daily_cnt[day] = daily_cnt.get(day, 0) + 1 # ── 결과 집계 ────────────────────────────────────────────────── all_trades.sort(key=lambda x: x["sell_time"]) equity = [] cum = 0.0 for t in all_trades: cum += t["pnl"] equity.append({"date": t["sell_time"][:8], "cum_pnl": round(cum), "pnl": t["pnl"]}) total = len(all_trades) wins = [t for t in all_trades if t["pnl"] > 0] losses = [t for t in all_trades if t["pnl"] < 0] total_pnl = sum(t["pnl"] for t in all_trades) avg_hold = (sum(t["hold_min"] for t in all_trades) / total) if total else 0 win_pnl = sum(t["pnl"] for t in wins) loss_pnl = sum(t["pnl"] for t in losses) pf = round(abs(win_pnl / loss_pnl), 2) if loss_pnl != 0 else 9999.0 peak, mdd, cum = 0.0, 0.0, 0.0 for t in all_trades: cum += t["pnl"] if cum > peak: peak = cum dd = peak - cum if dd > mdd: mdd = dd reasons: Dict[str, int] = {} for t in all_trades: reasons[t["sell_reason"]] = reasons.get(t["sell_reason"], 0) + 1 daily: Dict[str, float] = {} for t in all_trades: d8 = t["sell_time"][:8] daily[d8] = daily.get(d8, 0) + t["pnl"] daily_list = [{"date": d[:4]+"-"+d[4:6]+"-"+d[6:], "pnl": round(v)} for d, v in sorted(daily.items())] return jsonify({ "params": { "rsi_period": rsi_period, "rsi_threshold": rsi_threshold, "min_drop_rate": min_drop_rate * 100, "min_recovery": min_recovery_ratio * 100, "sl_pct": sl_pct * 100, "tp_pct": tp_pct * 100, "shoulder_cut": shoulder_cut_pct * 100, "slot_money": slot_money, "fee_rate": fee_rate * 100, "timeframe": timeframe, "codes_analyzed": len(codes), }, "summary": { "total_trades": total, "win_trades": len(wins), "loss_trades": len(losses), "win_rate": round(len(wins) / total * 100, 1) if total else 0, "total_pnl": round(total_pnl), "avg_hold_min": round(avg_hold, 1), "profit_factor": pf, "max_drawdown": round(mdd), }, "equity": equity, "daily": daily_list, "reasons": reasons, "trades": all_trades[-200:], }) finally: conn.close() # ──────────────────────────────────────────────────────────────────────────── # API: ENV 설정 조회/수정 # ──────────────────────────────────────────────────────────────────────────── @app.route("/api/env", methods=["GET"]) def api_env_get(): """env_config 최신 row 조회""" conn = _db() try: cur = conn.cursor() cur.execute("SELECT * FROM env_config ORDER BY id DESC LIMIT 1") row = cur.fetchone() if not row: return jsonify({}) result = {k: v for k, v in row.items() if k not in ("id", "created_at")} return jsonify(result) finally: conn.close() @app.route("/api/env", methods=["POST"]) def api_env_set(): """env_config 업데이트 (제출된 필드만 수정)""" body = request.get_json(force=True) or {} if not body: return jsonify({"error": "빈 요청"}), 400 # 허용된 컬럼만 업데이트 (보안) conn = _db() try: cur = conn.cursor() cur.execute("DESCRIBE env_config") allowed_cols = {r["Field"] for r in (cur.fetchall() or [])} - {"id", "created_at"} sets, vals = [], [] for k, v in body.items(): if k in allowed_cols: sets.append(f"`{k}` = %s") vals.append(str(v)) if not sets: return jsonify({"error": "유효한 필드 없음"}), 400 # 최신 row 업데이트 cur.execute("SELECT id FROM env_config ORDER BY id DESC LIMIT 1") row = cur.fetchone() if row: vals.append(row["id"]) cur.execute(f"UPDATE env_config SET {', '.join(sets)} WHERE id = %s", vals) else: cols = ", ".join(f"`{k}`" for k in body.keys() if k in allowed_cols) marks = ", ".join(["%s"] * len(sets)) cur.execute(f"INSERT INTO env_config ({cols}) VALUES ({marks})", vals) return jsonify({"ok": True, "updated": len(sets)}) finally: conn.close() # ──────────────────────────────────────────────────────────────────────────── # API: 캔들 수집 (업비트 REST → upbit_candles DB 저장) # ──────────────────────────────────────────────────────────────────────────── @app.route("/api/candles/fetch", methods=["POST"]) def api_candles_fetch(): """ 업비트 REST API로 지정 마켓/기간/분봉 수집 후 DB 저장 body: {market: "KRW-BTC", start: "2026-01-01", end: "2026-03-09", timeframe: 3} """ body = request.get_json(force=True) or {} market = body.get("market", "").upper() start_str = body.get("start", "") end_str = body.get("end", datetime.now().strftime("%Y-%m-%d")) tf = int(body.get("timeframe", 3)) markets = body.get("markets", []) # 여러 마켓 동시 수집 if not market and not markets: return jsonify({"error": "market 또는 markets 필드 필수"}), 400 target_markets = markets if markets else [market] job_id = str(uuid.uuid4())[:8] _fetch_jobs[job_id] = {"status": "running", "progress": 0, "total": 0, "saved": 0, "errors": []} def _worker(): total_saved = 0 conn_w = _db() try: cur_w = conn_w.cursor() for mkt in target_markets: # 수집 기간 설정 (업비트는 최신순 200봉씩 제공) if start_str: start_dt = datetime.strptime(start_str, "%Y-%m-%d") else: start_dt = datetime.now() - timedelta(days=7) end_dt = datetime.strptime(end_str, "%Y-%m-%d") + timedelta(days=1) # to 파라미터를 줄여가며 페이지네이션 수집 to_dt = end_dt count = 0 while to_dt > start_dt: to_str = to_dt.strftime("%Y-%m-%dT%H:%M:%S") data = _upbit_get( f"/candles/minutes/{tf}", {"market": mkt, "to": to_str, "count": 200} ) if not data: break batch = [] oldest_dt = to_dt for c in data: # candle_date_time_kst: "2026-03-09T14:30:00" cdt_str = c.get("candle_date_time_kst", "")[:16] if not cdt_str: continue try: cdt = datetime.strptime(cdt_str, "%Y-%m-%dT%H:%M") except Exception: continue if cdt < start_dt: continue ct = cdt.strftime("%Y%m%d%H%M") batch.append(( mkt, ct, tf, float(c.get("opening_price", 0)), float(c.get("high_price", 0)), float(c.get("low_price", 0)), float(c.get("trade_price", 0)), float(c.get("candle_acc_trade_volume", 0)), 1, )) if cdt < oldest_dt: oldest_dt = cdt if batch: cur_w.executemany(""" INSERT INTO upbit_candles (code, candle_time, timeframe, open_price, high_price, low_price, close_price, volume, is_confirmed) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s) ON DUPLICATE KEY UPDATE open_price=VALUES(open_price), high_price=VALUES(high_price), low_price=VALUES(low_price), close_price=VALUES(close_price), volume=VALUES(volume) """, batch) total_saved += len(batch) count += len(batch) # 다음 페이지: oldest_dt - 1분 to_dt = oldest_dt - timedelta(minutes=tf) if to_dt <= start_dt: break time.sleep(0.15) _fetch_jobs[job_id]["progress"] += 1 _fetch_jobs[job_id]["saved"] = total_saved except Exception as e: _fetch_jobs[job_id]["errors"].append(str(e)) logger.error(f"캔들 수집 오류: {e}") finally: conn_w.close() _fetch_jobs[job_id]["status"] = "done" t = threading.Thread(target=_worker, daemon=True) t.start() return jsonify({"ok": True, "job_id": job_id, "markets": target_markets}) @app.route("/api/candles/fetch/status/", methods=["GET"]) def api_fetch_status(job_id: str): """캔들 수집 진행 상태 조회""" job = _fetch_jobs.get(job_id) if not job: return jsonify({"error": "job_id 없음"}), 404 return jsonify(job) @app.route("/api/candles/markets", methods=["GET"]) def api_candles_markets(): """upbit_candles에 저장된 마켓 목록 조회""" tf = request.args.get("timeframe", "3") conn = _db() try: cur = conn.cursor() cur.execute( "SELECT code, COUNT(*) as cnt, MIN(candle_time) as first_dt, MAX(candle_time) as last_dt " "FROM upbit_candles WHERE timeframe=%s GROUP BY code ORDER BY code", [tf] ) return jsonify(cur.fetchall() or []) finally: conn.close() @app.route("/api/candles/upbit_markets", methods=["GET"]) def api_upbit_markets(): """업비트 KRW 마켓 목록 조회 (실시간)""" data = _upbit_get("/market/all") if not data: return jsonify([]) return jsonify([m["market"] for m in data if m["market"].startswith("KRW-")]) # ──────────────────────────────────────────────────────────────────────────── # API: 보유 현황 / 후보 목록 # ──────────────────────────────────────────────────────────────────────────── @app.route("/api/holdings", methods=["GET"]) def api_holdings(): """현재 보유 포지션 조회""" conn = _db() try: cur = conn.cursor() cur.execute("SELECT * FROM active_trades WHERE status='HOLDING' ORDER BY buy_date DESC") rows = cur.fetchall() or [] for r in rows: for k in ("buy_date", "updated_at"): if r.get(k): r[k] = str(r[k]) return jsonify(rows) finally: conn.close() @app.route("/api/candidates", methods=["GET"]) def api_candidates(): """최근 스캔 후보 목록""" conn = _db() try: cur = conn.cursor() cur.execute("SELECT * FROM target_candidates ORDER BY score DESC LIMIT 50") rows = cur.fetchall() or [] for r in rows: for k in ("scan_time", "updated_at"): if r.get(k): r[k] = str(r[k]) return jsonify(rows) finally: conn.close() # ──────────────────────────────────────────────────────────────────────────── # HTML 대시보드 템플릿 # ──────────────────────────────────────────────────────────────────────────── HTML_TEMPLATE = """ Upbit 단타 봇 V2 — 대시보드
누적 손익 (원)
일별 P&L
최근 매매 내역 (최대 200건)
매수시각매도시각종목 매수가매도가수량 손익(원)수익률보유(분)사유
""" @app.route("/") def index(): return render_template_string(HTML_TEMPLATE) if __name__ == "__main__": app.run(host="0.0.0.0", port=6060, debug=False)