Files
kis_bot/scalping_engine.py
2026-03-17 12:33:30 +09:00

393 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
scalping_engine.py — 스캘핑 백테스트·파라미터서치·실매매 공통 엔진
====================================================================
계산식만 같으면 백테스트와 실매매 결과가 같아지도록, 공통 로직만 이 모듈에 두고
backtest_web / param_search / kis_scalping_ver2 가 모두 이 엔진만 호출합니다.
■ 엔진에 들어간 공통 로직 (백테스트·파라미터서치·실매매 동일)
- 매수: 시간대(time_start_hm~time_end_hm), 쿨다운(cooldown_min), 일일 진입 횟수(max_daily),
RSI(3) 과매도(rsi_oversold), 이전봉 음봉+현재봉 양봉, 당일 낙폭(drop_rate), 거래량 배수(vol_mult).
- 매도: 손절가(sl_pct), 익절가(tp_pct), 트레일링스탑(trail_trigger/trail_stop), 장마감청산(is_eod).
■ 엔진 밖(실매매 전용) 로직 — 백테스트에는 없음
- 매수 추가 필터: 고점추격 방지(high_chase_thr), 급등주(max_daily_chg), 시장/테마 필터, ML, min_price 등.
- 매도 추가: 본절사수(breakeven), 금액손실컷(MAX_LOSS_PER_TRADE_KRW). (백테스트는 % 손절/익절/트레일만 사용)
캔들 형식: list of dict with keys candle_time(YYYYMMDDHHMI), open, high, low, close, volume
"""
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple
# DB 기본값 로드 (백테스트/param_search가 동일한 값 사용하도록 단일 소스)
def get_scalping_defaults_from_db() -> Dict[str, Any]:
"""
env_config 최신 행에서 스캘핑 관련 기본값 로드.
백테스트 API·param_search가 이 함수만 쓰면 실매매(DB)와 동일한 값으로 동작.
반환 키: cooldown_min, time_start_hm, time_end_hm, fee_rate, sell_tax,
slot_money, rsi_period, vol_mult, trail_trigger, trail_stop, max_daily
"""
try:
from database import TradeDB
db = TradeDB()
row = db.conn.execute("SELECT * FROM env_config ORDER BY id DESC LIMIT 1").fetchone()
db.close()
if row:
r = dict(row)
# SCALP_COOLDOWN_SEC(초) → cooldown_min(분). 실매매와 동일 키 사용
sec = r.get("SCALP_COOLDOWN_SEC") or r.get("REENTRY_COOLDOWN_SEC") or "600"
cooldown_min = max(0, int(float(sec)) // 60)
fee_pct = float(r.get("FEE_RATE_PCT") or 0.015)
tax_pct = float(r.get("SELL_TAX_RATE_PCT") or 0.18)
slot = float(r.get("SLOT_MONEY_DEFAULT") or 300_000)
else:
cooldown_min, fee_pct, tax_pct, slot = 10, 0.015, 0.18, 300_000.0
except Exception:
cooldown_min, fee_pct, tax_pct, slot = 10, 0.015, 0.18, 300_000.0
return {
"cooldown_min": cooldown_min,
"time_start_hm": 900,
"time_end_hm": 1530,
"time_start": 900,
"time_end": 1530,
"fee_rate": fee_pct / 100,
"sell_tax": tax_pct / 100,
"slot_money": slot,
"rsi_period": 3,
"vol_mult": 0,
"trail_trigger": 0.007,
"trail_stop": 0.004,
"max_daily": 3,
}
def compute_rsi_series(closes: list, period: int = 3) -> list:
"""RSI 시리즈 계산 (Wilder 스무딩). backtest_web과 동일."""
rsi_list = [None] * len(closes)
if len(closes) < period + 1:
return rsi_list
deltas = [closes[i] - closes[i - 1] for i in range(1, len(closes))]
gains = [max(d, 0) for d in deltas]
losses = [max(-d, 0) for d in deltas]
avg_gain = sum(gains[:period]) / period
avg_loss = sum(losses[:period]) / period
for i in range(period, len(closes)):
idx = i - 1
if i > period:
avg_gain = (avg_gain * (period - 1) + gains[idx]) / period
avg_loss = (avg_loss * (period - 1) + losses[idx]) / period
rs = avg_gain / avg_loss if avg_loss > 0 else float("inf")
rsi_val = 100 - (100 / (1 + rs)) if avg_loss > 0 else 100.0
rsi_list[i] = rsi_val
return rsi_list
def _t2dt(t: str) -> datetime:
"""candle_time 문자열 → datetime."""
return datetime.strptime(t, "%Y%m%d%H%M")
def run_scalping_backtest(
codes_candles: Dict[str, List[Dict]],
params: Dict[str, Any],
) -> List[Dict]:
"""
종목별 캔들에 대해 스캘핑 백테스트 실행. 실매매와 동일한 규칙 적용.
params:
rsi_period, rsi_oversold, sl_pct, tp_pct, drop_rate,
slot_money, fee_rate, sell_tax,
cooldown_min, trail_trigger, trail_stop,
time_start_hm, time_end_hm, max_daily, vol_mult
sl_pct/tp_pct/drop_rate/trail_trigger/trail_stop: 비율 (0.015 = 1.5%)
fee_rate, sell_tax: 비율 (0.00015, 0.0018)
time_start_hm, time_end_hm: HHMM 정수 (900, 1530)
"""
rsi_period = int(params.get("rsi_period", 3))
rsi_oversold = float(params.get("rsi_oversold", 25))
sl_pct = float(params.get("sl_pct", 0.015))
tp_pct = float(params.get("tp_pct", 0.015))
drop_rate = float(params.get("drop_rate", 0.015))
slot_money = float(params.get("slot_money", 300_000))
fee_rate = float(params.get("fee_rate", 0.00015))
sell_tax = float(params.get("sell_tax", 0.0018))
cooldown_min = float(params.get("cooldown_min", 10))
trail_trigger = float(params.get("trail_trigger", 0))
trail_stop = float(params.get("trail_stop", 0.004))
time_start_hm = int(params.get("time_start_hm", 900))
time_end_hm = int(params.get("time_end_hm", 1530))
max_daily = int(params.get("max_daily", 3))
vol_mult = float(params.get("vol_mult", 0))
all_trades: List[Dict] = []
for code, rows in codes_candles.items():
if len(rows) < rsi_period + 5:
continue
candles = [dict(r) for r in rows]
closes = [float(c["close"]) for c in candles]
volumes = [float(c.get("volume", 0)) for c in candles]
rsis = compute_rsi_series(closes, rsi_period)
position: Optional[Dict] = None
last_exit_dt: Dict[str, datetime] = {}
daily_cnt: Dict[str, int] = {}
cur_day = None
running_open = 0.0
running_low = 0.0
for i in range(rsi_period + 1, len(candles)):
c = candles[i]
day = c["candle_time"][:8]
hm = int(c["candle_time"][8:12])
cl = float(c["close"])
lo = float(c["low"])
hi = float(c["high"])
vol = volumes[i] if i < len(volumes) else 0
if day != cur_day:
cur_day = day
running_open = float(c["open"])
running_low = lo
else:
running_low = min(running_low, lo)
is_eod = (i == len(candles) - 1) or (candles[i + 1]["candle_time"][:8] != day)
# ── 포지션 보유 중: 청산 체크 ──
if position is not None:
max_price = max(position["max_price"], hi)
position["max_price"] = max_price
reason = None
exit_price = cl
if lo <= position["stop"]:
reason = "손절"
exit_price = position["stop"]
elif hi >= position["target"]:
reason = "익절"
exit_price = position["target"]
elif trail_trigger > 0 and max_price >= position["entry_price"] * (1 + trail_trigger):
ts = max_price * (1 - trail_stop)
if cl <= ts:
reason = "트레일링스탑"
exit_price = cl
if reason is None and is_eod:
reason = "장마감청산"
exit_price = cl
if reason:
qty = position["qty"]
buy_amt = position["entry_price"] * qty
sell_amt = exit_price * qty
pnl = (
sell_amt
- buy_amt
- buy_amt * fee_rate
- sell_amt * fee_rate
- sell_amt * sell_tax
)
hold_min = int(
(_t2dt(c["candle_time"]) - _t2dt(position["entry_time"])).total_seconds() / 60
)
all_trades.append({
"code": code,
"buy_time": position["entry_time"],
"sell_time": c["candle_time"],
"buy_price": position["entry_price"],
"sell_price": round(exit_price, 2),
"qty": qty,
"pnl": round(pnl),
"profit_rate": round(
(exit_price - position["entry_price"]) / position["entry_price"] * 100, 2
),
"hold_min": hold_min,
"sell_reason": reason,
"rsi_entry": round(position["rsi"], 1),
})
last_exit_dt[day] = _t2dt(c["candle_time"])
position = None
continue
# ── 포지션 없음: 매수 신호 ──
if hm < time_start_hm or hm >= time_end_hm:
continue
if day in last_exit_dt:
elapsed = (_t2dt(c["candle_time"]) - last_exit_dt[day]).total_seconds() / 60
if elapsed < cooldown_min:
continue
if daily_cnt.get(day, 0) >= max_daily:
continue
rsi = rsis[i]
if rsi is None or rsi > rsi_oversold:
continue
prev_c = candles[i - 1]
prev_bear = float(prev_c["close"]) < float(prev_c["open"])
curr_bull = cl > float(c["open"])
if not (prev_bear and curr_bull):
continue
if running_open <= 0:
continue
dr = (running_open - running_low) / running_open
if dr < drop_rate:
continue
if vol_mult > 0:
win = max(1, min(20, i))
vol_avg = sum(volumes[i - win : i]) / win
if vol_avg > 0 and vol < vol_avg * vol_mult:
continue
if i + 1 >= len(candles):
continue
next_c = candles[i + 1]
if next_c["candle_time"][:8] != day:
continue
entry_price = float(next_c["open"])
if entry_price <= 0:
continue
qty = max(1, int(slot_money / entry_price))
stop = entry_price * (1 - sl_pct)
target = entry_price * (1 + tp_pct)
position = {
"entry_price": entry_price,
"entry_time": next_c["candle_time"],
"qty": qty,
"stop": stop,
"target": target,
"max_price": entry_price,
"rsi": rsi,
}
daily_cnt[day] = daily_cnt.get(day, 0) + 1
all_trades.sort(key=lambda x: x["sell_time"])
return all_trades
# ── 실시간 봇용: 단일 시점 매수/매도 판단 (백테스트와 동일 규칙) ─────────────────
def check_buy_signal_live(
candles: List[Dict],
params: Dict[str, Any],
state: Dict[str, Any],
) -> Tuple[Optional[str], Optional[str], Optional[Dict[str, Any]]]:
"""
실시간 봇: 현재 캔들 리스트의 마지막 봉이 백테스트와 동일한 매수 신호인지 판단.
candles: 확정 봉 리스트 (candle_time, open, high, low, close, volume)
state: { "last_exit_dt": datetime or None (당일 마지막 청산 시각), "daily_cnt": int (당일 이미 진입한 횟수) }
반환: (reject_reason, reject_msg, sig) — 통과 시 (None, None, {"signal": True, "rsi": float}),
탈락 시 ("탈락-XXX", "메시지", None). 실매 ver2에서 ver1과 동일한 로그 출력용.
"""
if len(candles) < 4:
return ("탈락-봉부족", "확정봉 4개 미만", None)
rsi_period = int(params.get("rsi_period", 3))
rsi_oversold = float(params.get("rsi_oversold", 25))
rsi_overbought = float(params.get("rsi_overbought", 75.0))
sl_pct = float(params.get("sl_pct", 0.015))
tp_pct = float(params.get("tp_pct", 0.015))
drop_rate = float(params.get("drop_rate", 0.015))
time_start_hm = int(params.get("time_start_hm", 900))
time_end_hm = int(params.get("time_end_hm", 1530))
cooldown_min = float(params.get("cooldown_min", 10))
max_daily = int(params.get("max_daily", 3))
vol_mult = float(params.get("vol_mult", 0))
i = len(candles) - 1
c = candles[i]
day = c["candle_time"][:8]
hm = int(c["candle_time"][8:12])
cl = float(c["close"])
lo = float(c["low"])
prev_c = candles[i - 1]
if hm < time_start_hm or hm >= time_end_hm:
return (None, None, None)
last_exit_dt = state.get("last_exit_dt")
if last_exit_dt is not None:
elapsed = (_t2dt(c["candle_time"]) - last_exit_dt).total_seconds() / 60
if elapsed < cooldown_min:
return (None, None, None)
if state.get("daily_cnt", 0) >= max_daily:
return (None, None, None)
closes = [float(x["close"]) for x in candles]
rsis = compute_rsi_series(closes, rsi_period)
rsi = rsis[i] if i < len(rsis) else None
if rsi is None:
return ("탈락-RSI없음", "RSI 미계산 (봉 축적 중)", None)
if rsi <= 0.0:
return ("탈락-RSI무효", "RSI3=0.0 (봉 부족, 계산 불가)", None)
if rsi > rsi_overbought:
return ("탈락-RSI과열", "RSI3=%.1f > %.0f" % (rsi, rsi_overbought), None)
if rsi > rsi_oversold:
return ("탈락-RSI조건", "RSI3=%.1f (과매도<%.0f 아님)" % (rsi, rsi_oversold), None)
prev_bear = float(prev_c["close"]) < float(prev_c["open"])
curr_bull = cl > float(c["open"])
if not (prev_bear and curr_bull):
return ("탈락-되돌림없음", "prev_bear=%s curr_bull=%s" % (prev_bear, curr_bull), None)
running_open = float(c["open"])
running_low = lo
for j in range(i - 1, -1, -1):
if candles[j]["candle_time"][:8] != day:
break
running_open = float(candles[j]["open"])
running_low = min(running_low, float(candles[j]["low"]))
if running_open <= 0:
return (None, None, None)
dr = (running_open - running_low) / running_open
if dr < drop_rate:
return ("탈락-낙폭", "%.2f%% < %.1f%%(SCALP_MIN_DROP_RATE)" % (dr * 100, drop_rate * 100), None)
if vol_mult > 0:
volumes = [float(x.get("volume", 0)) for x in candles]
vol = volumes[i] if i < len(volumes) else 0
win = max(1, min(20, i))
vol_avg = sum(volumes[i - win : i]) / win
if vol_avg > 0 and vol < vol_avg * vol_mult:
return ("탈락-거래량", "%.0f < 평균%.0f × %.1f" % (vol, vol_avg, vol_mult), None)
return (None, None, {"signal": True, "rsi": rsi})
def check_sell_signal_live(
position: Dict[str, Any],
current_candle: Dict[str, Any],
params: Dict[str, Any],
is_eod: bool = False,
) -> Optional[tuple]:
"""
실시간 봇: 보유 포지션에 대해 백테스트와 동일한 청산 조건 판단.
position: { "entry_price", "entry_time", "qty", "stop", "target", "max_price", "rsi" }
current_candle: { "high", "low", "close" } (현재가 기준이면 high=low=close 또는 고점 갱신값)
is_eod: 장 마감 구간이면 True 시 장마감청산 반환
반환: (reason_str, exit_price) 또는 None
"""
sl_pct = float(params.get("sl_pct", 0.015))
tp_pct = float(params.get("tp_pct", 0.015))
trail_trigger = float(params.get("trail_trigger", 0))
trail_stop = float(params.get("trail_stop", 0.004))
hi = float(current_candle.get("high", current_candle["close"]))
lo = float(current_candle.get("low", current_candle["close"]))
cl = float(current_candle["close"])
max_price = max(position["max_price"], hi)
reason = None
exit_price = cl
if lo <= position["stop"]:
reason = "손절"
exit_price = position["stop"]
elif hi >= position["target"]:
reason = "익절"
exit_price = position["target"]
elif trail_trigger > 0 and max_price >= position["entry_price"] * (1 + trail_trigger):
ts = max_price * (1 - trail_stop)
if cl <= ts:
reason = "트레일링스탑"
exit_price = cl
if reason is None and is_eod:
reason = "장마감청산"
exit_price = cl
if reason:
return (reason, exit_price)
return None