오픈라우터 하기전

This commit is contained in:
2026-03-11 15:23:01 +09:00
parent 41a1a0fc4b
commit bc6fc1706f

View File

@@ -28,6 +28,17 @@ import requests
from database import TradeDB, ENV_CONFIG_KEYS
# WebSocket 실시간 체결가 캐시 + 봉집계기 (REST 폴링 전면 대체)
# CandleAggregator: 틱 → 3분봉 in-memory 집계 → check_buy_signal_tail_catch / check_sell_signals / execute_buy 에서 활용
try:
from kis_ws import (
KISWebSocketPriceCache, CandleAggregator,
get_kiwoom_candles_df, _get_kiwoom_creds,
)
_KIS_WS_AVAILABLE = True
except ImportError:
_KIS_WS_AVAILABLE = False
# 로깅 설정
logging.basicConfig(
format='[%(asctime)s] %(message)s',
@@ -43,9 +54,9 @@ LOG_GREEN = "\033[92m" # 통과
LOG_CYAN = "\033[96m" # 강조
LOG_RESET = "\033[0m"
# DB 초기화 (스크립트所在 디렉터리 기준 경로)
# DB 초기화 — MariaDB 192.168.0.141 (database.py 모듈 상수 사용)
SCRIPT_DIR = Path(__file__).resolve().parent
db = TradeDB(db_path=str(SCRIPT_DIR / "quant_bot.db"))
db = TradeDB() # db_path 인수 무시됨, MariaDB 직접 연결
# DB에서 환경변수 로드
def get_env_from_db(key, default=""):
@@ -227,6 +238,12 @@ def _is_token_expired_response(j):
class KISClient:
"""한국투자증권 Open API 클라이언트"""
def __init__(self, mock=None):
# 실전/모의 토큰 모두 최신 상태로 유지 (모드와 무관하게 양쪽 갱신)
try:
from kis_token_manager import ensure_both_tokens
ensure_both_tokens()
except Exception as _te:
logger.warning(f"토큰 자동갱신 건너뜀: {_te}")
# 모의 여부 결정
if mock is not None:
@@ -364,7 +381,20 @@ class KISClient:
return None
def _headers(self, tr_id, hashkey=None):
"""API 호출용 헤더 생성"""
"""
API 호출용 헤더 생성.
KisTokenManager.get_token() 으로 매번 만료 여부 확인:
- 유효하면 메모리에서 즉시 반환 (오버헤드 없음)
- 만료 10분 전이면 선제 갱신 후 새 토큰 사용
→ EGW00123(만료 에러) 없이 자동 교체
"""
try:
from kis_token_manager import KisTokenManager
fresh = KisTokenManager.instance(is_mock=self.mock).get_token()
if fresh:
self.access_token = fresh
except Exception:
pass # 실패 시 기존 access_token 유지
headers = {
"content-type": "application/json; charset=utf-8",
"authorization": f"Bearer {self.access_token}",
@@ -1122,16 +1152,23 @@ class KISClient:
return False
j = r.json()
if j.get("rt_cd") == "0":
self._last_sell_msg_cd = None
self._last_sell_msg1 = None
ord_no = j.get("output", {}).get("ODNO", "")
logger.info(f"✅ 매도 주문 성공: {code} {qty}주 (주문번호: {ord_no})")
return True
else:
# execute_sell 에서 실패 원인(영업일 아님 등) 구분할 수 있도록 저장
self._last_sell_msg_cd = j.get("msg_cd", "")
self._last_sell_msg1 = str(j.get("msg1", "") or "")
logger.error(
f"[매도주문] 실패 code={code} path={path} TR_ID={tr_id} "
f"rt_cd={j.get('rt_cd')} msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}"
f"rt_cd={j.get('rt_cd')} msg_cd={self._last_sell_msg_cd} msg1={self._last_sell_msg1}"
)
return False
except Exception as e:
self._last_sell_msg_cd = None
self._last_sell_msg1 = None
logger.error(f"매도 주문 예외({code}): {e}")
return False
@@ -1203,16 +1240,22 @@ class KISClient:
df = df.sort_values("time").reset_index(drop=True)
# 기술적 지표 추가
if len(df) >= 14:
# RSI 기간: DB/env의 RSI_PERIOD 로 조절 (기본 14, 단타/스캘핑 시 3·5 권장)
# RSI 수학적 안정화를 위해 호출 측에서 limit≥100 이상 요청하는 것이 전제
rsi_period = get_env_int("RSI_PERIOD", 14)
if len(df) >= rsi_period:
delta = df["close"].diff(1)
gain = delta.where(delta > 0, 0).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
gain = delta.where(delta > 0, 0).rolling(window=rsi_period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=rsi_period).mean()
rs = gain / loss.replace(0, float("nan"))
df["RSI"] = 100 - (100 / (1 + rs))
if len(df) >= 20:
df["MA20"] = df["close"].rolling(window=20).mean()
# MA5: check_buy_signal_tail_catch 에서 ma5_gap_pct 계산에 사용 (없으면 None으로 처리됨)
if len(df) >= 5:
df["MA5"] = df["close"].rolling(window=5).mean()
return df.tail(limit)
except Exception as e:
logger.error(f"분봉 조회 실패({code}): {e}")
@@ -1554,7 +1597,7 @@ class ShortTradingBot:
self.ml_predictor = None
if ML_AVAILABLE:
try:
self.ml_predictor = MLPredictor(db_path=str(SCRIPT_DIR / "quant_bot.db"))
self.ml_predictor = MLPredictor() # MariaDB 내부 연결
if self.ml_predictor.should_retrain():
self.ml_predictor.train_model(retrain=True)
except Exception as e:
@@ -1570,6 +1613,13 @@ class ShortTradingBot:
use_kelly=get_env_bool("USE_KELLY_FORMULA", True),
kelly_multiplier=get_env_float("KELLY_MULTIPLIER", 0.25),
slot_base_amount_cap=get_env_int("SLOT_BASE_AMOUNT_CAP", 0),
# ── 무조건 깔고 가는 MAX_LOSS 기반 투자 상한 ─────────────
# ATR 계산 결과가 아무리 커도 이 상한 초과 불가
max_loss_per_trade_krw=get_env_int("MAX_LOSS_PER_TRADE_KRW", 200000),
stop_loss_pct=get_env_float("STOP_LOSS_PCT", -0.03),
# ── 사이즈 클래스별 비율 (DB에서 주입) ───────────────────
size_small_ratio=get_env_float("SIZE_CLASS_SMALL_RATIO", 0.70),
size_mid_ratio=get_env_float("SIZE_CLASS_MID_RATIO", 0.85),
)
logger.info("✅ RiskManager 뼈대 생성 완료")
else:
@@ -1596,6 +1646,14 @@ class ShortTradingBot:
self.holdings = {}
# 당일 매매불가로 확인된 종목 (같은 종목 반복 주문 방지 → 다음 후보로 넘어감)
self.untradable_skip_set = set()
# 최근 매도 종목 쿨다운 캐시 {code: 매도_timestamp}
# 매도 직후 같은 종목을 즉시 재매수하는 반복매매 루프 방지.
# 쿨다운 기간은 REENTRY_COOLDOWN_SEC(기본 5분)으로 조정.
self.recently_sold: dict = {}
# 매도 실패 백오프 캐시 {code: until_timestamp}
# "영업일이 아닙니다" 등 일시적 API 거부 시 재시도 방지.
# 재시도 대기 시간은 SELL_FAILURE_BACKOFF_SEC(기본 1800초=30분) 으로 조정.
self._sell_backoff: dict = {}
active_trades = self.db.get_active_trades(strategy_prefix="SHORT")
for code, trade in active_trades.items():
self.holdings[code] = {
@@ -1619,10 +1677,251 @@ class ShortTradingBot:
self._asset_task = None
self.is_first_run = True
# ── WebSocket + CandleAggregator 초기화 ──────────────────────────────
# 틱 수신 → 3분봉 in-memory 집계 → REST 폴링(get_minute_chart) 전면 대체
# KISWebSocketPriceCache: 실시간 체결가 수신 (check_sell_signals 현재가)
# CandleAggregator : 3분봉 OHLCV·RSI 메모리 집계 (buy/ATR/RiskManager)
# start() 실패 시 is_active=False → REST fallback 자동 적용
self.ws_cache: Optional["KISWebSocketPriceCache"] = None
self.candle_agg: Optional["CandleAggregator"] = None
self._init_websocket()
# ── WebSocket + CandleAggregator 초기화 / 갭보정 / 구독 관리 ───────────
def _init_websocket(self):
"""WebSocket 시작 → CandleAggregator(3분봉) 연결 → 종목 구독 → 갭 보정."""
if not _KIS_WS_AVAILABLE:
logger.info(" kis_ws 모듈 없음 → REST inquire_price / get_minute_chart 폴링 유지")
return
try:
self.ws_cache = KISWebSocketPriceCache(
app_key = self.client.app_key,
app_secret = self.client.app_secret,
is_mock = self.client.mock,
)
# CandleAggregator: 3분봉 집계 (buy 타점·ATR·RiskManager 전용)
# 3분봉(주전략) + 15분봉 + 60분봉 — 추세 필터 / 다른 전략 확장용
self.candle_agg = CandleAggregator(db=self.db, timeframes=[3, 15, 60])
self.ws_cache.attach_candle_aggregator(self.candle_agg)
ws_ok = self.ws_cache.start()
if not ws_ok:
logger.info(" WebSocket 비활성 (모의 or 키 미설정) → REST fallback 유지")
self.ws_cache = None
self.candle_agg = None
return
# 봇 재시작 시 보유 종목 즉시 구독
for code in list(self.holdings.keys()):
self.ws_cache.subscribe(code)
# 유니버스 후보 종목도 미리 구독 (매수 타점 체크 전 봉 데이터 확보)
candidates = self.db.get_target_candidates()
for c in candidates:
code = c.get("code") or c.get("stk_cd", "")
if code and code not in self.holdings:
self.ws_cache.subscribe(code)
# ── 영구 구독 ETF: 시장 방향 필터용 (유니버스 변경과 무관하게 항상 유지) ──
perm_raw = get_env_from_db("PERMANENT_WS_CODES", "069500,229200")
self._permanent_ws_codes: set = {
c.strip() for c in str(perm_raw).split(",") if c.strip()
}
for code in sorted(self._permanent_ws_codes):
self.ws_cache.subscribe(code)
logger.info("📡 [영구구독] %s (시장방향 ETF)", code)
logger.info(
"✅ WebSocket + CandleAggregator(3분봉) 활성 (구독 %d종목) "
"— get_minute_chart REST 폴링 대체",
len(self.ws_cache._subscribed),
)
# 시작 시 REST 갭 보정 (봉 버퍼 비어있는 경우 RSI 안정화)
self._fill_all_gaps()
except Exception as _ws_e:
logger.warning("⚠️ WebSocket 초기화 예외(무시): %s", _ws_e)
self.ws_cache = None
self.candle_agg = None
def _fill_all_gaps(self):
"""
봇 시작·재접속 후 구독 중인 모든 종목의 분봉 갭을 보정.
RSI(14) 안정화를 위해 limit=120 사용.
▶ 키움 우선 전략:
- 키움 ka10080 은 1회 호출에 최대 900봉(≈6개월치) 제공 → 장 초반에도 즉시 봉 확보 가능
- KIS get_minute_chart 는 당일봉만 제공 → 장 시작 직후 봉 부족 → 키움 우선
- 키움 키 없으면 KIS fallback (3분봉만, 15/60분봉은 KIS 지원 안 함)
"""
if not self.candle_agg or not self.ws_cache:
return
limit = get_env_int("SHORT_GAP_FILL_LIMIT", 120)
with self.ws_cache._sub_lock:
codes = set(self.ws_cache._subscribed)
# ── 키움 크레덴셜 조회 ────────────────────────────────────────
kw_key, kw_secret, kw_mock = _get_kiwoom_creds(self.db)
use_kiwoom = bool(kw_key and kw_secret)
logger.info(
"🔧 [갭보정] %d종목 분봉 로드 시작 (tfs=%s, limit=%d, kiwoom=%s)",
len(codes), self.candle_agg.timeframes, limit, "" if use_kiwoom else "❌→KIS fallback",
)
for code in sorted(codes):
for tf in self.candle_agg.timeframes:
df = None
# 키움 우선 (토큰은 23시간 캐시 → au10001 한도 방지)
if use_kiwoom:
try:
df = get_kiwoom_candles_df(
code, tf, kw_key, kw_secret,
is_mock=kw_mock, n=limit,
)
except Exception as e:
logger.debug("키움 갭보정 실패 (%s %dM): %s", code, tf, e)
# KIS fallback: 당일봉만 → 3분봉에만 유효
if (df is None or df.empty) and tf <= 3:
try:
df = self.client.get_minute_chart(
code, period=str(tf), limit=limit
)
except Exception as e:
logger.debug("KIS 갭보정 실패 (%s %dM): %s", code, tf, e)
if df is not None and not df.empty:
self.candle_agg.fill_gap_from_rest(code, tf, df)
# 같은 종목 내 timeframe 전환: 짧은 딜레이
time.sleep(random.uniform(0.2, 0.4))
# 종목 간 딜레이
time.sleep(random.uniform(0.3, 0.6))
def _sync_subscriptions(self, candidates: list):
"""
target_candidates DB 목록과 WS 구독 목록 동기화.
- 유니버스에서 빠진 종목(보유 중 아닌 것) → unsubscribe + RAM 정리
- 신규 종목 → subscribe + 3분봉 갭 보정 (봉 버퍼 즉시 확보).
※ 영구 구독 ETF(_permanent_ws_codes)는 절대 해제하지 않음 (시장 방향 필터용)
"""
if not self.ws_cache:
return
new_codes = {c.get("code") or c.get("stk_cd", "") for c in candidates if c}
new_codes.discard("")
# 현재 보유 종목은 매도 완료 전까지 반드시 유지
new_codes |= set(self.holdings.keys())
# 영구 구독 ETF는 유니버스와 무관하게 항상 유지
new_codes |= getattr(self, '_permanent_ws_codes', set())
with self.ws_cache._sub_lock:
current_subs = set(self.ws_cache._subscribed)
# ── 구독 해제: 유니버스에서 빠진 종목 ─────────────────────────
# 보유 중 종목은 매도 감시를 위해 구독 유지
for code in sorted(current_subs - new_codes):
self.ws_cache.unsubscribe(code)
if self.candle_agg:
self.candle_agg.remove_code(code)
# ── 신규 구독: 유니버스에 새로 들어온 종목 ─────────────────────
kw_key, kw_secret, kw_mock = _get_kiwoom_creds(self.db)
use_kiwoom = bool(kw_key and kw_secret)
for code in sorted(new_codes - current_subs):
self.ws_cache.subscribe(code)
# 신규 구독 즉시 갭 보정 (봉 없으면 매수 타점 체크 불가) — 키움 우선
if not self.candle_agg:
continue
lim = get_env_int("SHORT_GAP_FILL_LIMIT", 120)
for tf in self.candle_agg.timeframes:
df = None
if use_kiwoom:
try:
df = get_kiwoom_candles_df(
code, tf, kw_key, kw_secret,
is_mock=kw_mock, n=lim,
)
except Exception as e:
logger.debug("키움 신규갭보정 실패 (%s %dM): %s", code, tf, e)
if (df is None or df.empty) and tf <= 3:
try:
df = self.client.get_minute_chart(
code, period=str(tf), limit=lim
)
except Exception as e:
logger.debug("KIS 신규갭보정 실패 (%s %dM): %s", code, tf, e)
if df is not None and not df.empty:
self.candle_agg.fill_gap_from_rest(code, tf, df)
# tf 간 딜레이 (차트 API, 토큰은 캐시 재사용)
time.sleep(random.uniform(0.2, 0.4))
def _get_candles_df(self, code: str, tf: int = 3, n: int = 20) -> Optional[pd.DataFrame]:
"""
CandleAggregator 메모리 봉 → DataFrame 변환 헬퍼.
확정봉(confirmed) + 진행봉(current, is_confirmed=0) 을 합쳐
get_minute_chart 와 동일한 컬럼(open/high/low/close/volume/RSI/MA5/MA20)을 반환.
- 진행봉의 close = 현재가 (최신 틱) → 매수 타점 실시간 포착
- CandleAggregator 미사용·데이터 부족 시 None 반환 → 호출부에서 REST fallback
Args:
code : 종목코드
tf : 봉 주기(분), 꼬리잡기 전략은 항상 3
n : 반환할 최대 봉 수 (tail 기준)
"""
if not self.candle_agg:
return None
# 확정봉: RSI_PERIOD(14)보다 넉넉하게 가져와 RSI 안정화
rsi_period = get_env_int("RSI_PERIOD", 14)
fetch_n = max(n + rsi_period + 5, n + 20)
confirmed = self.candle_agg.get_candles(code, tf, fetch_n)
if not confirmed:
return None
rows = list(confirmed)
# 진행 중인 봉(최신 틱 close) 을 tail 에 추가 → 실시간 캔들 패턴 포착
current = self.candle_agg.get_current_candle(code, tf)
if current and current.get("open", 0) > 0 and current.get("close", 0) > 0:
rows.append(current)
if len(rows) < 2:
return None
df = pd.DataFrame(rows)[["open", "high", "low", "close", "volume"]].copy()
df = df.reset_index(drop=True)
# 기술적 지표: get_minute_chart 와 동일 로직
if len(df) >= rsi_period:
delta = df["close"].diff(1)
gain = delta.where(delta > 0, 0).rolling(window=rsi_period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=rsi_period).mean()
rs = gain / loss.replace(0, float("nan"))
df["RSI"] = 100 - (100 / (1 + rs))
if len(df) >= 20:
df["MA20"] = df["close"].rolling(window=20).mean()
if len(df) >= 5:
df["MA5"] = df["close"].rolling(window=5).mean()
return df.tail(n).reset_index(drop=True)
# ── 설정 리로드 ─────────────────────────────────────────────────────────
def reload_config(self):
"""[실시간 리로드] DB(env) 설정을 봇에 반영. 메인 루프마다 호출 시 재시작 없이 적용."""
# [손절/익절 설정]
self.stop_loss_pct = get_env_float("STOP_LOSS_PCT", -0.04)
# ★ STOP_LOSS_PCT 부호 안전장치: 양수(0.02)로 입력 시 자동으로 음수(-0.02)로 변환.
# 양수 값이 그대로 사용되면 profit_pct <= stop_loss_pct 조건이 손익분기 이상에서도
# 참이 돼 칼손절이 수익 구간에서도 발동하는 심각한 버그가 발생함.
if self.stop_loss_pct > 0:
logger.warning(
"🚨 STOP_LOSS_PCT=%.4f 양수 감지 → 자동 부호 반전(%.4f). DB에 음수로 저장 권장 (!설정 STOP_LOSS_PCT=-%.4f)",
self.stop_loss_pct, -self.stop_loss_pct, self.stop_loss_pct,
)
self.stop_loss_pct = -self.stop_loss_pct
self.take_profit_pct = get_env_float("TAKE_PROFIT_PCT", 0.05)
self.max_stocks = get_env_int("MAX_STOCKS", 3)
self.min_drop_rate = get_env_float("MIN_DROP_RATE", 0.03)
@@ -1911,7 +2210,7 @@ class ShortTradingBot:
name = self.holdings[code].get("name", code)
del self.holdings[code]
try:
self.db.close_trade(code=code, sell_price=0, sell_reason="잔고동기화(외부매도)")
self.db.close_trade(code=code, sell_price=0, sell_reason="잔고동기화(외부매도)", strategy="SHORT_ANT_SHAKING")
except Exception as e:
logger.debug(f"잔고동기화 close_trade 스킵 {code}: {e}")
logger.info(f"📲 [잔고동기화] 보유 제거: {name} ({code}) - 계좌에 없음")
@@ -2621,9 +2920,41 @@ MIN_DROP_RATE=0.025
logger.warning(f"🧪 FORCE_BUY_TEST 현재가 조회 실패: {e}")
return None
# ── 시장 방향 필터 (USE_MARKET_REGIME_FILTER=true 시 활성) ──
# KODEX200/KOSDAQ150 60분봉 RSI로 상승장 확인 → 하락장이면 롱 진입 차단
if get_env_bool("USE_MARKET_REGIME_FILTER", False):
min_rsi = get_env_float("MARKET_REGIME_MIN_RSI", 48.0)
regime = self.db.get_market_regime(tf=60)
if not regime.get("is_bull") or regime.get("avg_rsi", 50) < min_rsi:
logger.debug(
"[시장필터] %s %s: 하락장 차단 (ETF RSI=%.1f < %.1f)",
code, name, regime.get("avg_rsi", 0), min_rsi,
)
return None
# ── 테마 과열 필터 (USE_THEME_HEAT_FILTER=true 시 활성) ──
if get_env_bool("USE_THEME_HEAT_FILTER", False):
heat_max = get_env_float("THEME_HEAT_RSI_MAX", 72.0)
meta = self.db.get_stock_meta(code)
if meta and meta.get("theme"):
momentum = self.db.get_theme_momentum(meta["theme"], tf=60)
if momentum.get("count", 0) >= 3 and momentum.get("avg_rsi3", 0) > heat_max:
logger.debug(
"[테마필터] %s %s: 테마(%s) 과열 차단 (RSI=%.1f > %.1f)",
code, name, meta["theme"],
momentum["avg_rsi3"], heat_max,
)
return None
min_candle_len = get_env_int("MIN_CANDLE_LEN_TAIL", 14)
min_price_tail = get_env_float("MIN_PRICE_TAIL", 1000.0)
df = self.client.get_minute_chart(code, period="3", limit=20)
# [WebSocket 우선] 메모리 봉 사용 → WS 미활성 시 REST fallback
df = self._get_candles_df(code, tf=3, n=20)
if df is None or df.empty:
logger.debug("📡 [%s] WS봉 없음 → REST 3분봉 fallback", code)
df = self.client.get_minute_chart(code, period="3", limit=20)
if df is None or df.empty or len(df) < min_candle_len:
logger.info(f"{LOG_YELLOW}🔍 [탈락-3분봉] {name} {code}: 봉수 부족 (len={len(df) if df is not None and not df.empty else 0}, 기준 {min_candle_len}){LOG_RESET}")
return None
@@ -2638,16 +2969,25 @@ MIN_DROP_RATE=0.025
candle_low = float(candle["low"])
candle_close = float(candle["close"])
# 분봉 마지막 봉 close=0인 경우 (장 마감/미체결 봉 등) → 현재가 API로 보정
# 분봉 마지막 봉 close=0인 경우 (장 마감/미체결 봉 등) → WS 캐시 우선 보정, fallback=REST
if current_price <= 0 or candle_close <= 0:
try:
price_data = self.client.inquire_price(code)
if price_data:
current_price = abs(float(str(price_data.get("stck_prpr", 0)).replace(",", "")))
if current_price > 0:
candle_close = current_price
ws_price = self.ws_cache.get_price(code) if self.ws_cache else None
if ws_price:
p = abs(float(str(ws_price.get("stck_prpr", 0)).replace(",", "")))
if p > 0:
current_price = p
candle_close = p
df = df.copy()
df.loc[df.index[-1], "close"] = current_price
df.loc[df.index[-1], "close"] = p
else:
price_data = self.client.inquire_price(code)
if price_data:
current_price = abs(float(str(price_data.get("stck_prpr", 0)).replace(",", "")))
if current_price > 0:
candle_close = current_price
df = df.copy()
df.loc[df.index[-1], "close"] = current_price
except Exception as e:
logger.debug(f"현재가 보정 실패({code}): {e}")
if candle_open <= 0 and len(df) >= 2:
@@ -2659,7 +2999,10 @@ MIN_DROP_RATE=0.025
logger.info(f"{LOG_YELLOW}🔍 [탈락-가격] {name} {code}: 시가/현재가 부적절 (현재 {current_price:,.0f}원, 최소 {min_price_tail:,.0f}){LOG_RESET}")
return None
# [수정된 부분: 매수체크 시 당일 진짜 시가, 고가, 저가를 API로 호출해와서 낙폭 계산]
# 당일 시가·고가·저가 확보 우선순위:
# 1순위: WebSocket 캐시 (H0STCNT0 틱에 OPRC/HGPR/LWPR 포함 → REST 호출 없음)
# 2순위: 분봉 DataFrame (캐시 없을 때)
# 3순위: REST inquire_price (분봉만으로 부족한 경우 최후 fallback)
day_open = float(df["open"].iloc[0])
day_high = float(df["high"].max())
# 저가가 0인 분봉(비정상 값) 때문에 낙폭이 100%로 계산되는 것을 방지
@@ -2669,18 +3012,27 @@ MIN_DROP_RATE=0.025
day_low = float(valid_lows.min()) if not valid_lows.empty else float(lows.min())
except Exception:
day_low = float(df["low"].min())
try:
# 3분봉 20개(1시간) 한계를 넘어 하루 전체 기준을 잡기 위해 현재가 API 호출
today_price_data = self.client.inquire_price(code)
if today_price_data:
api_open = abs(float(str(today_price_data.get("stck_oprc", 0)).replace(",", "")))
api_high = abs(float(str(today_price_data.get("stck_hgpr", 0)).replace(",", "")))
api_low = abs(float(str(today_price_data.get("stck_lwpr", 0)).replace(",", "")))
if api_open > 0: day_open = api_open
if api_high > 0: day_high = api_high
if api_low > 0: day_low = api_low
# 1순위: WS 캐시에 당일 시고저 있으면 REST 호출 생략 (API 과부하 방지)
ws_data = self.ws_cache.get_price(code) if self.ws_cache else None
if ws_data:
ws_open = abs(float(str(ws_data.get("stck_oprc", 0)).replace(",", "")))
ws_high = abs(float(str(ws_data.get("stck_hgpr", 0)).replace(",", "")))
ws_low = abs(float(str(ws_data.get("stck_lwpr", 0)).replace(",", "")))
if ws_open > 0: day_open = ws_open
if ws_high > 0: day_high = ws_high
if ws_low > 0: day_low = ws_low
else:
# 2순위 fallback: REST inquire_price (WS 캐시 없거나 만료된 경우에만)
today_price_data = self.client.inquire_price(code)
if today_price_data:
api_open = abs(float(str(today_price_data.get("stck_oprc", 0)).replace(",", "")))
api_high = abs(float(str(today_price_data.get("stck_hgpr", 0)).replace(",", "")))
api_low = abs(float(str(today_price_data.get("stck_lwpr", 0)).replace(",", "")))
if api_open > 0: day_open = api_open
if api_high > 0: day_high = api_high
if api_low > 0: day_low = api_low
except Exception as e:
logger.debug(f"일일 시고저 보정 실패({code}): {e}")
@@ -2769,8 +3121,10 @@ MIN_DROP_RATE=0.025
return None
# RSI 과열 방지 (수치: env)
# RSI_PERIOD 는 get_minute_chart 에서 계산 시 사용한 기간과 동일해야 함
rsi_val = 50.0
if "RSI" in df.columns and len(df) >= 14:
rsi_period = get_env_int("RSI_PERIOD", 14)
if "RSI" in df.columns and len(df) >= rsi_period:
rsi_val = float(df["RSI"].iloc[-1])
rsi_threshold = get_env_float("RSI_OVERHEAT_THRESHOLD", 78.0)
if rsi_val >= rsi_threshold:
@@ -2874,11 +3228,20 @@ MIN_DROP_RATE=0.025
except Exception:
pass
# 현재가 조회
price_data = self.client.inquire_price(code)
# 현재가 조회 — WebSocket 캐시 우선, 없으면 REST fallback
# ws_cache.get_price() 는 5초 이내 수신된 tick 데이터만 유효 취급
price_data = None
_used_ws = False
if self.ws_cache and self.ws_cache.is_active:
price_data = self.ws_cache.get_price(code)
if price_data:
_used_ws = True
if not price_data:
# WebSocket 미연결·캐시 만료 → REST (기존 방식 그대로)
price_data = self.client.inquire_price(code)
if not price_data:
continue
current_price = abs(float(price_data.get("stck_prpr", 0)))
if current_price == 0:
continue
@@ -2896,10 +3259,12 @@ MIN_DROP_RATE=0.025
# ATR 조회 (DB 또는 재계산)
atr = holding.get("atr_entry", 0)
if atr == 0:
# ATR 재계산 (3분봉)
# ATR 재계산 — WS 메모리 봉 우선, 없으면 REST fallback
try:
df = self.client.get_minute_chart(code, period="3", limit=20)
if not df.empty:
df = self._get_candles_df(code, tf=3, n=20)
if df is None or df.empty:
df = self.client.get_minute_chart(code, period="3", limit=20)
if df is not None and not df.empty:
atr = self.calculate_atr(df)
if atr > 0:
self.holdings[code]["atr_entry"] = atr
@@ -3038,8 +3403,11 @@ MIN_DROP_RATE=0.025
"qty": qty,
"price": current_price,
})
time.sleep(random.uniform(0.3, 0.7))
# WebSocket으로 가격을 받아온 경우 REST API 딜레이 불필요.
# REST fallback 시에는 기존과 동일하게 딜레이 적용 (과부하 방지).
if not _used_ws:
time.sleep(random.uniform(0.3, 0.7))
except Exception as e:
logger.error(f"매도 신호 체크 실패({code}): {e}")
continue
@@ -3098,10 +3466,12 @@ MIN_DROP_RATE=0.025
# ============================================================
# [매수 금액] 변동성 역가중 (Volatility Inverse Weighting)
# ============================================================
# ATR 계산용 분봉 데이터 (변동성 계산에 필요)
# ATR 계산용 분봉 데이터 — WS 메모리 봉 우선, 없으면 REST fallback
df_minute = None
try:
df_minute = self.client.get_minute_chart(code, period="3", limit=20)
df_minute = self._get_candles_df(code, tf=3, n=20)
if df_minute is None or df_minute.empty:
df_minute = self.client.get_minute_chart(code, period="3", limit=20)
except Exception as e:
logger.debug(f"분봉 조회 실패({code}): {e}")
@@ -3248,6 +3618,20 @@ MIN_DROP_RATE=0.025
logger.info(f"💰 [매수 체결] {name} ({code}): {price:,.0f}× {qty}주 (API 체결 확인) | 손절={stop_price:,.0f}원, 목표={target_price:,.0f}")
else:
logger.info(f"💰 [매수 체결] {name} ({code}): {price:,.0f}× {qty}주 (주문기준) | 손절={stop_price:,.0f}원, 목표={target_price:,.0f}")
# 매수 후 WebSocket 구독 등록 → 이후 check_sell_signals에서 REST 없이 실시간 수신
if self.ws_cache and self.ws_cache.is_active:
self.ws_cache.subscribe(code)
# 신규 보유 종목 즉시 갭보정 (봉 버퍼 미확보 시 ATR 계산 즉시 가능하게)
if self.candle_agg:
try:
lim = get_env_int("SHORT_GAP_FILL_LIMIT", 100)
df_gap = self.client.get_minute_chart(code, period="3", limit=lim)
if df_gap is not None and not df_gap.empty:
self.candle_agg.fill_gap_from_rest(code, 3, df_gap)
except Exception as _ge:
logger.debug("매수후 갭보정 실패(%s): %s", code, _ge)
# 체결 알림 (MM) — API 추가 호출 없이 메모리 값만 사용
try:
invest_amt = price * qty
@@ -3271,9 +3655,31 @@ MIN_DROP_RATE=0.025
if code not in self.holdings:
logger.warning(f"⚠️ [{name}] 보유 종목 아님")
return False
# ★ 매도 실패 백오프 체크 (영업일 아님·시장 마감 등 일시 오류 반복 방지)
backoff_until = self._sell_backoff.get(code, 0)
if time.time() < backoff_until:
remain_min = (backoff_until - time.time()) / 60
logger.debug("⏸ [%s(%s)] 매도 백오프 중 — %.0f분 후 재시도", name, code, remain_min)
return False
# 매도 주문
success = self.client.sell_market_order(code, qty)
if not success:
# 실패 원인 분석 → 영업일·시장마감 오류면 백오프 등록
msg_cd = getattr(self.client, "_last_sell_msg_cd", None) or ""
msg1 = getattr(self.client, "_last_sell_msg1", "") or ""
# KIS 비영업일·장마감 오류코드 (40100000=모의 영업일 아님, 40200000=실전 장외시간)
non_biz_codes = {"40100000", "40200000"}
if msg_cd in non_biz_codes or "영업일" in msg1 or "장외" in msg1 or "시장" in msg1:
backoff_sec = get_env_int("SELL_FAILURE_BACKOFF_SEC", 1800)
self._sell_backoff[code] = time.time() + backoff_sec
logger.warning(
"⏸ [%s(%s)] 매도 실패('%s') → %d분 후 재시도 (SELL_FAILURE_BACKOFF_SEC=%d)",
name, code, msg1, backoff_sec // 60, backoff_sec,
)
return False
if success:
# 현재가 조회
price_data = self.client.inquire_price(code)
@@ -3287,15 +3693,23 @@ MIN_DROP_RATE=0.025
buy_price = holding.get("buy_price", sell_price)
profit_val = (sell_price - buy_price) * qty # 손익 금액
# DB에서 매도 처리
# DB에서 매도 처리 (strategy 지정 → 꼬리잡기봇 row만 삭제, 스캘핑봇 row 보호)
self.db.close_trade(
code=code,
sell_price=sell_price,
sell_reason=signal['reason'],
strategy="SHORT_ANT_SHAKING",
)
del self.holdings[code]
# 재진입 쿨다운 기록 (REENTRY_COOLDOWN_SEC 동안 같은 종목 재매수 차단)
self.recently_sold[code] = time.time()
# 매도 후 WebSocket 구독 해제 → 불필요한 데이터 수신 차단
if self.ws_cache and self.ws_cache.is_active:
self.ws_cache.unsubscribe(code)
# 🔥 매도 후 예수금 + 총자산 즉시 업데이트 (손익 반영)
self._update_account_light(profit_val=profit_val)
@@ -3303,12 +3717,16 @@ MIN_DROP_RATE=0.025
# 체결 알림 (MM) — 매도 직후 _update_account_light로 갱신된 예수금/총자산 사용 (추가 API 없음)
try:
pct = signal['profit_pct'] * 100
cum_pnl = self.current_total_asset - self.total_deposit if self.total_deposit else 0
cum_pct = (cum_pnl / self.total_deposit * 100) if self.total_deposit and self.total_deposit > 0 else 0
cum_pnl = self.current_total_asset - self.total_deposit if self.total_deposit else 0
cum_pct = (cum_pnl / self.total_deposit * 100) if self.total_deposit and self.total_deposit > 0 else 0
# 당일 손익: 오늘 장 시작 시 총자산 대비 현재 총자산 차이
day_pnl = self.current_total_asset - self.start_day_asset if self.start_day_asset else 0
day_pct = (day_pnl / self.start_day_asset * 100) if self.start_day_asset > 0 else 0
mm_msg = (
f"🔴 **매도 체결** {name} ({code})\n"
f"{sell_price:,.0f}× {qty:,}주 | {signal['reason']} | 수익률 {pct:+.2f}% (실현 {profit_val:+,.0f}원)\n"
f"예수금 {self.current_cash:,.0f}원 | 총자산 {self.current_total_asset:,.0f}원 | 누적손익 {cum_pnl:+,.0f}원 ({cum_pct:+.2f}%) | 보유 {len(self.holdings)}종목"
f"예수금 {self.current_cash:,.0f}원 | 총자산 {self.current_total_asset:,.0f}원 | 보유 {len(self.holdings)}종목\n"
f"당일손익 {day_pnl:+,.0f}원 ({day_pct:+.2f}%) | 누적손익 {cum_pnl:+,.0f}원 ({cum_pct:+.2f}%)"
)
self.send_mm(mm_msg)
except Exception as e:
@@ -3404,8 +3822,13 @@ MIN_DROP_RATE=0.025
self.today_date = today_str
self.untradable_skip_set.clear()
logger.debug("📅 날짜 변경 -> 매매불가 제외 목록 초기화")
active_count = len(self.holdings)
active_count = len(self.holdings)
db_candidates = self.db.get_target_candidates()
# 신규 후보 WS 구독 + 3분봉 갭보정 (봉 버퍼 미확보 종목 자동 보완)
if db_candidates:
self._sync_subscriptions(db_candidates)
if db_candidates and active_count < self.max_stocks:
strength_preview = " | 강도순: " + ", ".join(
f"{c.get('name', c.get('code',''))} {c.get('score', 0):.1f}" for c in db_candidates[:5]
@@ -3421,6 +3844,17 @@ MIN_DROP_RATE=0.025
# 매매불가 종목은 당일 재시도 안 함 → 다음 후보로
if code in self.untradable_skip_set:
continue
# ★ 재진입 쿨다운 체크: 최근 매도 종목은 일정 시간 동안 재매수 차단.
# 손절 직후 즉시 재매수 → 손절 반복 루프를 근본 차단.
reentry_cooldown = get_env_int("REENTRY_COOLDOWN_SEC", 300)
elapsed_since_sell = time.time() - self.recently_sold.get(code, 0)
if elapsed_since_sell < reentry_cooldown:
remaining = int(reentry_cooldown - elapsed_since_sell)
logger.info(
"⏳ [재진입 차단] %s(%s) 매도 후 쿨다운 중 — 남은 시간 %d초/%d",
name, code, remaining, reentry_cooldown,
)
continue
signal = self.check_buy_signal_tail_catch(code, name)
if signal:
signals_this_turn += 1