393 lines
16 KiB
Python
393 lines
16 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
scalping_engine.py — 스캘핑 백테스트·파라미터서치·실매매 공통 엔진
|
||
====================================================================
|
||
계산식만 같으면 백테스트와 실매매 결과가 같아지도록, 공통 로직만 이 모듈에 두고
|
||
backtest_web / param_search / kis_scalping_ver2 가 모두 이 엔진만 호출합니다.
|
||
|
||
■ 엔진에 들어간 공통 로직 (백테스트·파라미터서치·실매매 동일)
|
||
- 매수: 시간대(time_start_hm~time_end_hm), 쿨다운(cooldown_min), 일일 진입 횟수(max_daily),
|
||
RSI(3) 과매도(rsi_oversold), 이전봉 음봉+현재봉 양봉, 당일 낙폭(drop_rate), 거래량 배수(vol_mult).
|
||
- 매도: 손절가(sl_pct), 익절가(tp_pct), 트레일링스탑(trail_trigger/trail_stop), 장마감청산(is_eod).
|
||
|
||
■ 엔진 밖(실매매 전용) 로직 — 백테스트에는 없음
|
||
- 매수 추가 필터: 고점추격 방지(high_chase_thr), 급등주(max_daily_chg), 시장/테마 필터, ML, min_price 등.
|
||
- 매도 추가: 본절사수(breakeven), 금액손실컷(MAX_LOSS_PER_TRADE_KRW). (백테스트는 % 손절/익절/트레일만 사용)
|
||
|
||
캔들 형식: list of dict with keys candle_time(YYYYMMDDHHMI), open, high, low, close, volume
|
||
"""
|
||
|
||
from datetime import datetime
|
||
from typing import List, Dict, Any, Optional, Tuple
|
||
|
||
# DB 기본값 로드 (백테스트/param_search가 동일한 값 사용하도록 단일 소스)
|
||
def get_scalping_defaults_from_db() -> Dict[str, Any]:
|
||
"""
|
||
env_config 최신 행에서 스캘핑 관련 기본값 로드.
|
||
백테스트 API·param_search가 이 함수만 쓰면 실매매(DB)와 동일한 값으로 동작.
|
||
반환 키: cooldown_min, time_start_hm, time_end_hm, fee_rate, sell_tax,
|
||
slot_money, rsi_period, vol_mult, trail_trigger, trail_stop, max_daily
|
||
"""
|
||
try:
|
||
from database import TradeDB
|
||
db = TradeDB()
|
||
row = db.conn.execute("SELECT * FROM env_config ORDER BY id DESC LIMIT 1").fetchone()
|
||
db.close()
|
||
if row:
|
||
r = dict(row)
|
||
# SCALP_COOLDOWN_SEC(초) → cooldown_min(분). 실매매와 동일 키 사용
|
||
sec = r.get("SCALP_COOLDOWN_SEC") or r.get("REENTRY_COOLDOWN_SEC") or "600"
|
||
cooldown_min = max(0, int(float(sec)) // 60)
|
||
fee_pct = float(r.get("FEE_RATE_PCT") or 0.015)
|
||
tax_pct = float(r.get("SELL_TAX_RATE_PCT") or 0.18)
|
||
slot = float(r.get("SLOT_MONEY_DEFAULT") or 300_000)
|
||
else:
|
||
cooldown_min, fee_pct, tax_pct, slot = 10, 0.015, 0.18, 300_000.0
|
||
except Exception:
|
||
cooldown_min, fee_pct, tax_pct, slot = 10, 0.015, 0.18, 300_000.0
|
||
return {
|
||
"cooldown_min": cooldown_min,
|
||
"time_start_hm": 900,
|
||
"time_end_hm": 1530,
|
||
"time_start": 900,
|
||
"time_end": 1530,
|
||
"fee_rate": fee_pct / 100,
|
||
"sell_tax": tax_pct / 100,
|
||
"slot_money": slot,
|
||
"rsi_period": 3,
|
||
"vol_mult": 0,
|
||
"trail_trigger": 0.007,
|
||
"trail_stop": 0.004,
|
||
"max_daily": 3,
|
||
}
|
||
|
||
|
||
def compute_rsi_series(closes: list, period: int = 3) -> list:
|
||
"""RSI 시리즈 계산 (Wilder 스무딩). backtest_web과 동일."""
|
||
rsi_list = [None] * len(closes)
|
||
if len(closes) < period + 1:
|
||
return rsi_list
|
||
deltas = [closes[i] - closes[i - 1] for i in range(1, len(closes))]
|
||
gains = [max(d, 0) for d in deltas]
|
||
losses = [max(-d, 0) for d in deltas]
|
||
avg_gain = sum(gains[:period]) / period
|
||
avg_loss = sum(losses[:period]) / period
|
||
for i in range(period, len(closes)):
|
||
idx = i - 1
|
||
if i > period:
|
||
avg_gain = (avg_gain * (period - 1) + gains[idx]) / period
|
||
avg_loss = (avg_loss * (period - 1) + losses[idx]) / period
|
||
rs = avg_gain / avg_loss if avg_loss > 0 else float("inf")
|
||
rsi_val = 100 - (100 / (1 + rs)) if avg_loss > 0 else 100.0
|
||
rsi_list[i] = rsi_val
|
||
return rsi_list
|
||
|
||
|
||
def _t2dt(t: str) -> datetime:
|
||
"""candle_time 문자열 → datetime."""
|
||
return datetime.strptime(t, "%Y%m%d%H%M")
|
||
|
||
|
||
def run_scalping_backtest(
|
||
codes_candles: Dict[str, List[Dict]],
|
||
params: Dict[str, Any],
|
||
) -> List[Dict]:
|
||
"""
|
||
종목별 캔들에 대해 스캘핑 백테스트 실행. 실매매와 동일한 규칙 적용.
|
||
|
||
params:
|
||
rsi_period, rsi_oversold, sl_pct, tp_pct, drop_rate,
|
||
slot_money, fee_rate, sell_tax,
|
||
cooldown_min, trail_trigger, trail_stop,
|
||
time_start_hm, time_end_hm, max_daily, vol_mult
|
||
sl_pct/tp_pct/drop_rate/trail_trigger/trail_stop: 비율 (0.015 = 1.5%)
|
||
fee_rate, sell_tax: 비율 (0.00015, 0.0018)
|
||
time_start_hm, time_end_hm: HHMM 정수 (900, 1530)
|
||
"""
|
||
rsi_period = int(params.get("rsi_period", 3))
|
||
rsi_oversold = float(params.get("rsi_oversold", 25))
|
||
sl_pct = float(params.get("sl_pct", 0.015))
|
||
tp_pct = float(params.get("tp_pct", 0.015))
|
||
drop_rate = float(params.get("drop_rate", 0.015))
|
||
slot_money = float(params.get("slot_money", 300_000))
|
||
fee_rate = float(params.get("fee_rate", 0.00015))
|
||
sell_tax = float(params.get("sell_tax", 0.0018))
|
||
cooldown_min = float(params.get("cooldown_min", 10))
|
||
trail_trigger = float(params.get("trail_trigger", 0))
|
||
trail_stop = float(params.get("trail_stop", 0.004))
|
||
time_start_hm = int(params.get("time_start_hm", 900))
|
||
time_end_hm = int(params.get("time_end_hm", 1530))
|
||
max_daily = int(params.get("max_daily", 3))
|
||
vol_mult = float(params.get("vol_mult", 0))
|
||
|
||
all_trades: List[Dict] = []
|
||
|
||
for code, rows in codes_candles.items():
|
||
if len(rows) < rsi_period + 5:
|
||
continue
|
||
candles = [dict(r) for r in rows]
|
||
closes = [float(c["close"]) for c in candles]
|
||
volumes = [float(c.get("volume", 0)) for c in candles]
|
||
rsis = compute_rsi_series(closes, rsi_period)
|
||
|
||
position: Optional[Dict] = None
|
||
last_exit_dt: Dict[str, datetime] = {}
|
||
daily_cnt: Dict[str, int] = {}
|
||
cur_day = None
|
||
running_open = 0.0
|
||
running_low = 0.0
|
||
|
||
for i in range(rsi_period + 1, len(candles)):
|
||
c = candles[i]
|
||
day = c["candle_time"][:8]
|
||
hm = int(c["candle_time"][8:12])
|
||
cl = float(c["close"])
|
||
lo = float(c["low"])
|
||
hi = float(c["high"])
|
||
vol = volumes[i] if i < len(volumes) else 0
|
||
|
||
if day != cur_day:
|
||
cur_day = day
|
||
running_open = float(c["open"])
|
||
running_low = lo
|
||
else:
|
||
running_low = min(running_low, lo)
|
||
|
||
is_eod = (i == len(candles) - 1) or (candles[i + 1]["candle_time"][:8] != day)
|
||
|
||
# ── 포지션 보유 중: 청산 체크 ──
|
||
if position is not None:
|
||
max_price = max(position["max_price"], hi)
|
||
position["max_price"] = max_price
|
||
reason = None
|
||
exit_price = cl
|
||
|
||
if lo <= position["stop"]:
|
||
reason = "손절"
|
||
exit_price = position["stop"]
|
||
elif hi >= position["target"]:
|
||
reason = "익절"
|
||
exit_price = position["target"]
|
||
elif trail_trigger > 0 and max_price >= position["entry_price"] * (1 + trail_trigger):
|
||
ts = max_price * (1 - trail_stop)
|
||
if cl <= ts:
|
||
reason = "트레일링스탑"
|
||
exit_price = cl
|
||
if reason is None and is_eod:
|
||
reason = "장마감청산"
|
||
exit_price = cl
|
||
|
||
if reason:
|
||
qty = position["qty"]
|
||
buy_amt = position["entry_price"] * qty
|
||
sell_amt = exit_price * qty
|
||
pnl = (
|
||
sell_amt
|
||
- buy_amt
|
||
- buy_amt * fee_rate
|
||
- sell_amt * fee_rate
|
||
- sell_amt * sell_tax
|
||
)
|
||
hold_min = int(
|
||
(_t2dt(c["candle_time"]) - _t2dt(position["entry_time"])).total_seconds() / 60
|
||
)
|
||
all_trades.append({
|
||
"code": code,
|
||
"buy_time": position["entry_time"],
|
||
"sell_time": c["candle_time"],
|
||
"buy_price": position["entry_price"],
|
||
"sell_price": round(exit_price, 2),
|
||
"qty": qty,
|
||
"pnl": round(pnl),
|
||
"profit_rate": round(
|
||
(exit_price - position["entry_price"]) / position["entry_price"] * 100, 2
|
||
),
|
||
"hold_min": hold_min,
|
||
"sell_reason": reason,
|
||
"rsi_entry": round(position["rsi"], 1),
|
||
})
|
||
last_exit_dt[day] = _t2dt(c["candle_time"])
|
||
position = None
|
||
continue
|
||
|
||
# ── 포지션 없음: 매수 신호 ──
|
||
if hm < time_start_hm or hm >= time_end_hm:
|
||
continue
|
||
if day in last_exit_dt:
|
||
elapsed = (_t2dt(c["candle_time"]) - last_exit_dt[day]).total_seconds() / 60
|
||
if elapsed < cooldown_min:
|
||
continue
|
||
if daily_cnt.get(day, 0) >= max_daily:
|
||
continue
|
||
|
||
rsi = rsis[i]
|
||
if rsi is None or rsi > rsi_oversold:
|
||
continue
|
||
prev_c = candles[i - 1]
|
||
prev_bear = float(prev_c["close"]) < float(prev_c["open"])
|
||
curr_bull = cl > float(c["open"])
|
||
if not (prev_bear and curr_bull):
|
||
continue
|
||
if running_open <= 0:
|
||
continue
|
||
dr = (running_open - running_low) / running_open
|
||
if dr < drop_rate:
|
||
continue
|
||
if vol_mult > 0:
|
||
win = max(1, min(20, i))
|
||
vol_avg = sum(volumes[i - win : i]) / win
|
||
if vol_avg > 0 and vol < vol_avg * vol_mult:
|
||
continue
|
||
|
||
if i + 1 >= len(candles):
|
||
continue
|
||
next_c = candles[i + 1]
|
||
if next_c["candle_time"][:8] != day:
|
||
continue
|
||
entry_price = float(next_c["open"])
|
||
if entry_price <= 0:
|
||
continue
|
||
|
||
qty = max(1, int(slot_money / entry_price))
|
||
stop = entry_price * (1 - sl_pct)
|
||
target = entry_price * (1 + tp_pct)
|
||
position = {
|
||
"entry_price": entry_price,
|
||
"entry_time": next_c["candle_time"],
|
||
"qty": qty,
|
||
"stop": stop,
|
||
"target": target,
|
||
"max_price": entry_price,
|
||
"rsi": rsi,
|
||
}
|
||
daily_cnt[day] = daily_cnt.get(day, 0) + 1
|
||
|
||
all_trades.sort(key=lambda x: x["sell_time"])
|
||
return all_trades
|
||
|
||
|
||
# ── 실시간 봇용: 단일 시점 매수/매도 판단 (백테스트와 동일 규칙) ─────────────────
|
||
|
||
def check_buy_signal_live(
|
||
candles: List[Dict],
|
||
params: Dict[str, Any],
|
||
state: Dict[str, Any],
|
||
) -> Tuple[Optional[str], Optional[str], Optional[Dict[str, Any]]]:
|
||
"""
|
||
실시간 봇: 현재 캔들 리스트의 마지막 봉이 백테스트와 동일한 매수 신호인지 판단.
|
||
candles: 확정 봉 리스트 (candle_time, open, high, low, close, volume)
|
||
state: { "last_exit_dt": datetime or None (당일 마지막 청산 시각), "daily_cnt": int (당일 이미 진입한 횟수) }
|
||
반환: (reject_reason, reject_msg, sig) — 통과 시 (None, None, {"signal": True, "rsi": float}),
|
||
탈락 시 ("탈락-XXX", "메시지", None). 실매 ver2에서 ver1과 동일한 로그 출력용.
|
||
"""
|
||
if len(candles) < 4:
|
||
return ("탈락-봉부족", "확정봉 4개 미만", None)
|
||
rsi_period = int(params.get("rsi_period", 3))
|
||
rsi_oversold = float(params.get("rsi_oversold", 25))
|
||
rsi_overbought = float(params.get("rsi_overbought", 75.0))
|
||
sl_pct = float(params.get("sl_pct", 0.015))
|
||
tp_pct = float(params.get("tp_pct", 0.015))
|
||
drop_rate = float(params.get("drop_rate", 0.015))
|
||
time_start_hm = int(params.get("time_start_hm", 900))
|
||
time_end_hm = int(params.get("time_end_hm", 1530))
|
||
cooldown_min = float(params.get("cooldown_min", 10))
|
||
max_daily = int(params.get("max_daily", 3))
|
||
vol_mult = float(params.get("vol_mult", 0))
|
||
|
||
i = len(candles) - 1
|
||
c = candles[i]
|
||
day = c["candle_time"][:8]
|
||
hm = int(c["candle_time"][8:12])
|
||
cl = float(c["close"])
|
||
lo = float(c["low"])
|
||
prev_c = candles[i - 1]
|
||
|
||
if hm < time_start_hm or hm >= time_end_hm:
|
||
return (None, None, None)
|
||
last_exit_dt = state.get("last_exit_dt")
|
||
if last_exit_dt is not None:
|
||
elapsed = (_t2dt(c["candle_time"]) - last_exit_dt).total_seconds() / 60
|
||
if elapsed < cooldown_min:
|
||
return (None, None, None)
|
||
if state.get("daily_cnt", 0) >= max_daily:
|
||
return (None, None, None)
|
||
|
||
closes = [float(x["close"]) for x in candles]
|
||
rsis = compute_rsi_series(closes, rsi_period)
|
||
rsi = rsis[i] if i < len(rsis) else None
|
||
if rsi is None:
|
||
return ("탈락-RSI없음", "RSI 미계산 (봉 축적 중)", None)
|
||
if rsi <= 0.0:
|
||
return ("탈락-RSI무효", "RSI3=0.0 (봉 부족, 계산 불가)", None)
|
||
if rsi > rsi_overbought:
|
||
return ("탈락-RSI과열", "RSI3=%.1f > %.0f" % (rsi, rsi_overbought), None)
|
||
if rsi > rsi_oversold:
|
||
return ("탈락-RSI조건", "RSI3=%.1f (과매도<%.0f 아님)" % (rsi, rsi_oversold), None)
|
||
prev_bear = float(prev_c["close"]) < float(prev_c["open"])
|
||
curr_bull = cl > float(c["open"])
|
||
if not (prev_bear and curr_bull):
|
||
return ("탈락-되돌림없음", "prev_bear=%s curr_bull=%s" % (prev_bear, curr_bull), None)
|
||
|
||
running_open = float(c["open"])
|
||
running_low = lo
|
||
for j in range(i - 1, -1, -1):
|
||
if candles[j]["candle_time"][:8] != day:
|
||
break
|
||
running_open = float(candles[j]["open"])
|
||
running_low = min(running_low, float(candles[j]["low"]))
|
||
if running_open <= 0:
|
||
return (None, None, None)
|
||
dr = (running_open - running_low) / running_open
|
||
if dr < drop_rate:
|
||
return ("탈락-낙폭", "%.2f%% < %.1f%%(SCALP_MIN_DROP_RATE)" % (dr * 100, drop_rate * 100), None)
|
||
if vol_mult > 0:
|
||
volumes = [float(x.get("volume", 0)) for x in candles]
|
||
vol = volumes[i] if i < len(volumes) else 0
|
||
win = max(1, min(20, i))
|
||
vol_avg = sum(volumes[i - win : i]) / win
|
||
if vol_avg > 0 and vol < vol_avg * vol_mult:
|
||
return ("탈락-거래량", "%.0f < 평균%.0f × %.1f" % (vol, vol_avg, vol_mult), None)
|
||
return (None, None, {"signal": True, "rsi": rsi})
|
||
|
||
|
||
def check_sell_signal_live(
|
||
position: Dict[str, Any],
|
||
current_candle: Dict[str, Any],
|
||
params: Dict[str, Any],
|
||
is_eod: bool = False,
|
||
) -> Optional[tuple]:
|
||
"""
|
||
실시간 봇: 보유 포지션에 대해 백테스트와 동일한 청산 조건 판단.
|
||
position: { "entry_price", "entry_time", "qty", "stop", "target", "max_price", "rsi" }
|
||
current_candle: { "high", "low", "close" } (현재가 기준이면 high=low=close 또는 고점 갱신값)
|
||
is_eod: 장 마감 구간이면 True 시 장마감청산 반환
|
||
반환: (reason_str, exit_price) 또는 None
|
||
"""
|
||
sl_pct = float(params.get("sl_pct", 0.015))
|
||
tp_pct = float(params.get("tp_pct", 0.015))
|
||
trail_trigger = float(params.get("trail_trigger", 0))
|
||
trail_stop = float(params.get("trail_stop", 0.004))
|
||
hi = float(current_candle.get("high", current_candle["close"]))
|
||
lo = float(current_candle.get("low", current_candle["close"]))
|
||
cl = float(current_candle["close"])
|
||
max_price = max(position["max_price"], hi)
|
||
reason = None
|
||
exit_price = cl
|
||
if lo <= position["stop"]:
|
||
reason = "손절"
|
||
exit_price = position["stop"]
|
||
elif hi >= position["target"]:
|
||
reason = "익절"
|
||
exit_price = position["target"]
|
||
elif trail_trigger > 0 and max_price >= position["entry_price"] * (1 + trail_trigger):
|
||
ts = max_price * (1 - trail_stop)
|
||
if cl <= ts:
|
||
reason = "트레일링스탑"
|
||
exit_price = cl
|
||
if reason is None and is_eod:
|
||
reason = "장마감청산"
|
||
exit_price = cl
|
||
if reason:
|
||
return (reason, exit_price)
|
||
return None
|