""" KIS Short Trading Bot Ver3 — 꼬리잡기 봇 (tail_engine 공통 로직, 단독 실행) - Ver2와 동일한 한투 API/WS/봇 구조, 매수/매도 판단만 tail_engine 사용 (백테스트와 동일). - kis_short_ver2 import 없이 단독 실행 가능 (본 파일에 필요한 코드 포함). """ import os import re import json import time import random import logging import datetime import hashlib import hmac import base64 import warnings import asyncio import subprocess from datetime import datetime as dt from pathlib import Path from typing import List, Dict, Optional import pandas as pd import requests from database import TradeDB, ENV_CONFIG_KEYS import tail_engine as te # WebSocket 실시간 체결가 캐시 + 봉집계기 (REST 폴링 전면 대체) # CandleAggregator: 틱 → 3분봉 in-memory 집계 → check_buy_signal_tail_catch / check_sell_signals / execute_buy 에서 활용 try: from kis_ws import ( KISWebSocketPriceCache, CandleAggregator, get_kiwoom_candles_df, _get_kiwoom_creds, ) _KIS_WS_AVAILABLE = True except ImportError: _KIS_WS_AVAILABLE = False # 로깅 설정 logging.basicConfig( format='[%(asctime)s] %(message)s', datefmt='%H:%M:%S', level=logging.INFO, ) logger = logging.getLogger("KISShortBot") # 로그 색상 (ANSI) - 탈락/통과 구분 LOG_RED = "\033[91m" # 탈락 LOG_YELLOW = "\033[93m" # 탈락 (Pass-조건) LOG_GREEN = "\033[92m" # 통과 LOG_CYAN = "\033[96m" # 강조 LOG_RESET = "\033[0m" # DB 초기화 — MariaDB 192.168.0.141 (database.py 모듈 상수 사용) SCRIPT_DIR = Path(__file__).resolve().parent db = TradeDB() # db_path 인수 무시됨, MariaDB 직접 연결 # DB에서 환경변수 로드 def get_env_from_db(key, default=""): """DB에서 환경변수 읽기""" env_data = db.get_latest_env() if env_data and env_data.get("snapshot"): return env_data["snapshot"].get(key, default) return default def get_env_float(key, default): """환경변수를 float로 변환 (DB 우선)""" value = get_env_from_db(key, str(default)) if isinstance(value, str) and "#" in value: value = value.split("#")[0].strip() try: return float(value) if value else default except (ValueError, TypeError): return default def get_env_int(key, default): """환경변수를 int로 변환 (DB 우선)""" value = get_env_from_db(key, str(default)) if isinstance(value, str) and "#" in value: value = value.split("#")[0].strip() try: return int(value) if value else default except (ValueError, TypeError): return default def get_env_bool(key, default=False): """환경변수를 bool로 변환 (DB 우선)""" value = get_env_from_db(key, str(default)).lower() return value in ("true", "1", "yes") # Mattermost 설정 MM_SERVER_URL = get_env_from_db("MM_SERVER_URL", "https://mattermost.hoonfam.org") MM_BOT_TOKEN = get_env_from_db("MM_BOT_TOKEN_", "").strip() MM_CONFIG_FILE = SCRIPT_DIR / "mm_config.json" # 기본 채널(alias) + 단타 봇 전용 채널(alias) MM_CHANNEL_DEFAULT = get_env_from_db("MATTERMOST_CHANNEL", "stock") MM_CHANNEL_SHORT = get_env_from_db("KIS_SHORT_MM_CHANNEL", MM_CHANNEL_DEFAULT) # Gemini API (AI 리포트용) - google.genai 신규 SDK (Client 사용, configure 없음) try: import google.genai as genai GEMINI_AVAILABLE = True except ImportError: GEMINI_AVAILABLE = False logger.warning("⚠️ google-genai 미설치 - AI 리포트 기능 사용 불가") GEMINI_API_KEY = get_env_from_db("GEMINI_API_KEY", "").strip() GEMINI_MODEL_ID = "gemini-2.5-flash" # 또는 gemini-2.5-flash (모델명 확인) gemini_client = None if GEMINI_API_KEY and GEMINI_AVAILABLE: try: gemini_client = genai.Client(api_key=GEMINI_API_KEY) except Exception as e: logger.warning(f"⚠️ Gemini 초기화 실패: {e}") gemini_client = None else: gemini_client = None # ML 예측 (선택적) try: from ml_predictor import MLPredictor ML_AVAILABLE = True except ImportError: ML_AVAILABLE = False logger.warning("⚠️ ml_predictor 미설치 - ML 예측 기능 사용 불가") # RiskManager (변동성 기반 리스크 관리) try: from risk_manager import RiskManager RISK_MANAGER_AVAILABLE = True except ImportError: RISK_MANAGER_AVAILABLE = False logger.warning("⚠️ risk_manager 미설치 - 변동성 역가중 매수 금액 계산 불가") # ============================================================ # 한투(KIS) API 클라이언트 (kis_long_term_checker.py 참고) # ============================================================ # 모의계좌용 토큰 캐시 경로 KIS_TOKEN_CACHE_PATH_MOCK = SCRIPT_DIR / ".kis_token_cache_mock.json" # 실계좌용 토큰 캐시 경로 KIS_TOKEN_CACHE_PATH_REAL = SCRIPT_DIR / ".kis_token_cache_real.json" # 한투 접근 토큰 유효기간 24시간. 자주 발급하면 영구 제명될 수 있으므로 캐시 철저 재사용. # 만료 1분 전에만 재발급 (불필요한 발급 최소화) KIS_TOKEN_EXPIRE_MARGIN_SEC = 60 def _parse_kis_token_expired(expired_str): """한투 API 만료시간 문자열 파싱. 'YYYY-MM-DD HH:MM:SS' 또는 'YYYY-MM-DDTHH:MM:SS' 등 지원.""" if not expired_str or not isinstance(expired_str, str): return None s = expired_str.strip().replace("T", " ")[:19] if len(s) < 19: return None try: return dt.strptime(s, "%Y-%m-%d %H:%M:%S") except ValueError: return None def _load_kis_token_cache(mock): """캐시 파일에서 토큰 로드. 만료 1분 전까지 유효하면 재사용 (24h 토큰 자주 발급 시 영구 제명 주의).""" if mock: path = KIS_TOKEN_CACHE_PATH_MOCK else: path = KIS_TOKEN_CACHE_PATH_REAL if not path.exists(): logger.info("한투 토큰 캐시 없음 → API 발급 예정 (캐시 경로: %s)", path) return None try: logger.info("패스 %s", path) with open(path, "r", encoding="utf-8") as f: cache = json.load(f) if cache.get("mock") != mock: logger.info("한투 토큰 캐시 모의/실전 불일치 → API 발급 예정") return None token = cache.get("access_token") expired_str = cache.get("access_token_token_expired") or cache.get("expires_at") if not token or not expired_str: logger.info("한투 토큰 캐시 내용 불완전 → API 발급 예정") return None expired_dt = _parse_kis_token_expired(expired_str) if expired_dt is None: logger.info("한투 토큰 캐시 만료시간 파싱 실패(%s) → API 발급 예정", expired_str[:30]) return None if dt.now() >= expired_dt - datetime.timedelta(seconds=KIS_TOKEN_EXPIRE_MARGIN_SEC): logger.info("한투 토큰 캐시 만료 임박(%s) → API 발급 예정", expired_str[:19]) return None return token except Exception as e: logger.warning("한투 토큰 캐시 로드 실패(%s): %s", path, e) return None def _save_kis_token_cache(access_token, access_token_token_expired, mock): """발급받은 토큰을 캐시 파일에 저장.""" try: if mock: path = KIS_TOKEN_CACHE_PATH_MOCK else: path = KIS_TOKEN_CACHE_PATH_REAL with open(path, "w", encoding="utf-8") as f: json.dump({ "access_token": access_token, "access_token_token_expired": access_token_token_expired, "mock": mock, }, f, ensure_ascii=False, indent=2) logger.info("한투 토큰 캐시 저장 완료: %s", path) except Exception as e: logger.warning("한투 토큰 캐시 저장 실패: %s", e) def _invalidate_kis_token_cache(mock): """토큰 만료(EGW00123) 시 캐시 파일 삭제 → 다음 _auth()에서 새 토큰 발급.""" path = KIS_TOKEN_CACHE_PATH_MOCK if mock else KIS_TOKEN_CACHE_PATH_REAL try: if path.exists(): path.unlink() logger.info("한투 토큰 캐시 삭제 (만료 감지): %s", path) except Exception as e: logger.warning("한투 토큰 캐시 삭제 실패(%s): %s", path, e) def _is_token_expired_response(j): """응답이 '기간이 만료된 token' 오류(EGW00123)인지 여부.""" if not j or not isinstance(j, dict): return False msg_cd = j.get("msg_cd") or "" msg1 = str(j.get("msg1", "")) return msg_cd == "EGW00123" or "만료된 token" in msg1 or "만료" in msg1 class KISClient: """한국투자증권 Open API 클라이언트""" def __init__(self, mock=None): # 실전/모의 토큰 모두 최신 상태로 유지 (모드와 무관하게 양쪽 갱신) try: from kis_token_manager import ensure_both_tokens ensure_both_tokens() except Exception as _te: logger.warning(f"토큰 자동갱신 건너뜀: {_te}") # 모의 여부 결정 if mock is not None: use_mock = mock else: use_mock = get_env_bool("KIS_MOCK", True) # 모의투자는 MOCK 전용 키만 사용(실전 키로 폴백 안 함 → 토큰/캐시가 실전이랑 섞이지 않도록) if use_mock: self.app_key = get_env_from_db("KIS_APP_KEY_MOCK", "").strip() self.app_secret = get_env_from_db("KIS_APP_SECRET_MOCK", "").strip() if not self.app_key or not self.app_secret: logger.error("❌ 모의투자용 APP KEY/SECRET이 DB에 없습니다. KIS_APP_KEY_MOCK, KIS_APP_SECRET_MOCK 설정 필요.") raise ValueError("모의투자 KIS_APP_KEY_MOCK / KIS_APP_SECRET_MOCK 미설정") else: self.app_key = (get_env_from_db("KIS_APP_KEY_REAL", "") or get_env_from_db("KIS_APP_KEY", "")).strip() self.app_secret = (get_env_from_db("KIS_APP_SECRET_REAL", "") or get_env_from_db("KIS_APP_SECRET", "")).strip() # 계좌번호: 모의/실전 분리 if use_mock: raw_no = get_env_from_db("KIS_ACCOUNT_NO_MOCK", "").strip() raw_code = get_env_from_db("KIS_ACCOUNT_CODE_MOCK", "").strip() if not raw_code: raw_code = "01" else: raw_no = (get_env_from_db("KIS_ACCOUNT_NO_REAL", "") or get_env_from_db("KIS_ACCOUNT_NO", "")).strip() raw_code = (get_env_from_db("KIS_ACCOUNT_CODE_REAL", "") or get_env_from_db("KIS_ACCOUNT_CODE", "01")).strip() if not raw_code: raw_code = "01" # 10자리면 앞 8 / 뒤 2 분리 if len(raw_no) >= 10: self.acc_no = raw_no[:8] self.acc_code = raw_no[8:10] else: self.acc_no = raw_no self.acc_code = raw_code[:2] if len(raw_code) >= 2 else "01" if len(self.acc_no) != 8: logger.warning("⚠️ 계좌번호 CANO 8자리 아님: '%s'(%s자리). DB 확인.", self.acc_no, len(self.acc_no)) if len(self.acc_no) != 8 or len(self.acc_code) != 2: logger.error( "❌ 계좌번호 형식 오류: CANO=%s(%s자리), ACNT_PRDT_CD=%s(%s자리) → OPSQ2000 발생. " "모의면 KIS_ACCOUNT_NO_MOCK/KIS_ACCOUNT_CODE_MOCK, 실전이면 KIS_ACCOUNT_NO/KIS_ACCOUNT_CODE 확인.", self.acc_no, len(self.acc_no), self.acc_code, len(self.acc_code) ) else: logger.info("✅ 한투 계좌 CANO=%s, ACNT_PRDT_CD=%s (모의=%s)", self.acc_no, self.acc_code, use_mock) self.mock = use_mock if self.mock is True: self.base_url = "https://openapivts.koreainvestment.com:29443" else: self.base_url = "https://openapi.koreainvestment.com:9443" self.access_token = None # 매수 주문 실패 시 사유 저장 (매매불가 종목 당일 제외용) self._last_order_msg_cd = None self._last_order_msg1 = None # 현재가 API 캐시 (레이트리밋·지연 완화용) # {종목코드: (timestamp, output_dict)} self._price_cache = {} logger.info("한투 API 연결: 모의=%s → %s", self.mock, self.base_url) self._auth() def _auth(self): """접근 토큰 발급""" if not self.app_key or not self.app_secret: if self.mock: key_hint = "KIS_APP_KEY_MOCK, KIS_APP_SECRET_MOCK" else: key_hint = "KIS_APP_KEY_REAL, KIS_APP_SECRET_REAL (또는 KIS_APP_KEY, KIS_APP_SECRET)" logger.error("한투 API 키가 없습니다. DB env_config에 설정 필요: %s", key_hint) raise ValueError("KIS 앱키/시크릿 설정 필요 (모의=%s)" % self.mock) # ✅ path를 먼저 정의 (발급 성공/실패 양쪽에서 사용) path = KIS_TOKEN_CACHE_PATH_MOCK if self.mock else KIS_TOKEN_CACHE_PATH_REAL mode_str = "모의" if self.mock else "실전" cached = _load_kis_token_cache(self.mock) if cached: self.access_token = cached token_head = (cached[:8] + "…") if cached and len(cached) > 8 else "(없음)" logger.info("한투 토큰 캐시 사용 (%s) | 파일=%s | 토큰앞8=%s", mode_str, path, token_head) return # 캐시 없음/만료 → kis_token_manager 경로로만 발급 (잠금·1일1회 준수, SMS 시각 안정화) try: from kis_token_manager import ensure_token if ensure_token(self.mock): cached = _load_kis_token_cache(self.mock) if cached: self.access_token = cached token_head = (cached[:8] + "…") if len(cached) > 8 else "(없음)" logger.info("한투 토큰 발급 완료 (%s) | 캐시=%s | 토큰앞8=%s", mode_str, path, token_head) return except Exception as e: logger.warning("kis_token_manager 발급 실패, 재시도: %s", e) raise RuntimeError("한투 토큰 발급 실패 (캐시 없음·만료 시 kis_token_manager.ensure_token 사용)") def _get_hashkey(self, body): """해시키(Hashkey) 발급 - POST 요청 시 body 무결성 검증용""" try: url = f"{self.base_url}/uapi/hashkey" headers = { "content-type": "application/json", "appkey": self.app_key, "appsecret": self.app_secret, } r = requests.post(url, headers=headers, json=body, timeout=5) if r.status_code == 200: data = r.json() if data.get("rt_cd") == "0": return data.get("HASH") return None except Exception as e: logger.debug(f"해시키 발급 실패: {e}") return None def _headers(self, tr_id, hashkey=None): """ API 호출용 헤더 생성. KisTokenManager.get_token() 으로 매번 만료 여부 확인: - 유효하면 메모리에서 즉시 반환 (오버헤드 없음) - 만료 10분 전이면 선제 갱신 후 새 토큰 사용 → EGW00123(만료 에러) 없이 자동 교체 """ try: from kis_token_manager import KisTokenManager fresh = KisTokenManager.instance(is_mock=self.mock).get_token() if fresh: self.access_token = fresh except Exception: pass # 실패 시 기존 access_token 유지 headers = { "content-type": "application/json; charset=utf-8", "authorization": f"Bearer {self.access_token}", "appkey": self.app_key, "appsecret": self.app_secret, "tr_id": tr_id, } if hashkey: headers["hashkey"] = hashkey return headers def _get(self, path, tr_id, params, max_retries=5, tr_cont=None): """ GET 요청. EGW00201(초당 거래건수 초과) 시 점진적 대기 후 재시도 (200/500 공통). EGW00123(기간이 만료된 token) 시 캐시 삭제 후 새 토큰 발급·1회 재시도. - 한투 API 제한: 초당 20개 (실제로는 더 엄격, 모의투자는 초당 2~3회 권장) - 기본 호출 간격: 0.5초 이상 권장 """ url = f"{self.base_url}{path}" if tr_cont: headers_extra = {"tr_cont": tr_cont} else: headers_extra = {} logger.debug(f"[API호출] GET {path} TR_ID={tr_id} params={params} tr_cont={tr_cont}") time.sleep(0.5) token_refreshed = False for attempt in range(max_retries): try: headers = self._headers(tr_id) for k, v in headers_extra.items(): headers[k] = v r = requests.get(url, headers=headers, params=params, timeout=15) # HTTP 429 (Too Many Requests) if r.status_code == 429: wait_time = 1 + (attempt * 1) logger.warning( f"⏳ API 호출 제한 (429) -> {wait_time}초 대기 후 재시도 " f"({attempt+1}/{max_retries}) path={path}" ) time.sleep(wait_time) continue # 200/500 응답에서 EGW00201(초당 거래건수 초과)이면 대기 후 재시도 if r.status_code in (200, 500): try: j = r.json() except Exception: j = {} if r.status_code == 200 and j.get("rt_cd") == "0": logger.debug(f"[API성공] GET {path} TR_ID={tr_id} status=200 rt_cd=0") return r if j.get("msg_cd") == "EGW00201" or "초과" in str(j.get("msg1", "")) or "과부하" in str(j.get("msg1", "")): wait_time = 1.5 + (attempt * 1.0) logger.warning( f"⏳ API 과부하 (EGW00201) GET {path} TR_ID={tr_id} -> {wait_time:.1f}초 대기 후 재시도 " f"({attempt+1}/{max_retries}) msg1={j.get('msg1', '')}" ) time.sleep(wait_time) continue # EGW00123: 기간이 만료된 token → 캐시 삭제 후 새 토큰 발급, 1회만 재시도 if _is_token_expired_response(j) and not token_refreshed: token_refreshed = True _invalidate_kis_token_cache(self.mock) self._auth() logger.info("한투 토큰 만료(EGW00123) 감지 → 캐시 삭제 후 재발급, GET 재시도") time.sleep(0.5) continue # HTTP 200이 아니거나 rt_cd != "0"인 경우 try: body_preview = (r.text or "")[:500] except Exception: body_preview = "" logger.warning( f"[API실패] GET {path} TR_ID={tr_id} status={r.status_code} " f"params={params} body={body_preview}" ) return r except requests.exceptions.RequestException as e: if attempt < max_retries - 1: wait_time = (2 ** attempt) + random.uniform(0.5, 1.5) logger.warning(f"⚠️ 네트워크 에러 -> {wait_time:.1f}초 대기 후 재시도: {e}") time.sleep(wait_time) else: logger.error(f"❌ GET 요청 실패 ({path}): {e}") return r def _post(self, path, tr_id, body, use_hashkey=True, max_retries=3): """ POST 요청. EGW00201(초당 거래건수 초과) 시 점진적 대기 후 재시도. EGW00123(기간이 만료된 token) 시 캐시 삭제 후 새 토큰 발급·1회 재시도. - 한투 API 제한: 초당 20개 (실제로는 더 엄격) """ url = f"{self.base_url}{path}" body_preview = str(body)[:200] if body else "{}" logger.debug(f"[API호출] POST {path} TR_ID={tr_id} body={body_preview}...") time.sleep(0.5) token_refreshed = False for attempt in range(max_retries): try: hashkey = self._get_hashkey(body) if use_hashkey else None if use_hashkey and not hashkey: logger.debug("해시키 발급 실패, 해시키 없이 진행") r = requests.post(url, headers=self._headers(tr_id, hashkey), json=body, timeout=15) # HTTP 429 (Too Many Requests) if r.status_code == 429: wait_time = 5 + (attempt * 1) logger.warning( f"⏳ API 호출 제한 (429) -> {wait_time}초 대기 후 재시도 " f"({attempt+1}/{max_retries}) path={path}" ) time.sleep(wait_time) continue if r.status_code in (200, 500): try: j = r.json() except Exception: j = {} if r.status_code == 200 and j.get("rt_cd") == "0": logger.debug(f"[API성공] POST {path} TR_ID={tr_id} status=200 rt_cd=0") return r if j.get("msg_cd") == "EGW00201" or "초과" in str(j.get("msg1", "")) or "과부하" in str(j.get("msg1", "")): wait_time = 5 + (attempt * 1) logger.warning( f"⏳ API 과부하 (EGW00201) POST {path} TR_ID={tr_id} -> {wait_time}초 대기 후 재시도 " f"({attempt+1}/{max_retries}) rt_cd={j.get('rt_cd')} msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}" ) time.sleep(wait_time) continue # EGW00123: 기간이 만료된 token → 캐시 삭제 후 새 토큰 발급, 1회만 재시도 if _is_token_expired_response(j) and not token_refreshed: token_refreshed = True _invalidate_kis_token_cache(self.mock) self._auth() logger.info("한투 토큰 만료(EGW00123) 감지 → 캐시 삭제 후 재발급, POST 재시도") time.sleep(0.5) continue try: body_preview = (r.text or "")[:500] except Exception: body_preview = "" logger.warning( f"[API실패] POST {path} TR_ID={tr_id} status={r.status_code} " f"body={body_preview}" ) return r except requests.exceptions.RequestException as e: if attempt < max_retries - 1: wait_time = (2 ** attempt) + random.uniform(0.5, 1.5) logger.warning(f"⚠️ 네트워크 에러 -> {wait_time:.1f}초 대기 후 재시도: {e}") time.sleep(wait_time) else: logger.error(f"❌ POST 요청 실패 ({path}): {e}") return r def inquire_price(self, stock_code): """ 주식 현재가 시세 조회 [v1_국내주식-008] (단건). output: stck_prpr(현재가) ✅, stck_oprc(시가), stck_hgpr(고가), stck_lwpr(저가) 등 당일 OHLC 포함. 실패 시 오류코드(rt_cd, msg_cd, msg1) 로깅. """ # 짧은 TTL 캐시로 동일 종목 반복 호출 시 레이트리밋·지연 완화 cache_ttl = get_env_float("KIS_PRICE_CACHE_TTL_SEC", 1.0) if cache_ttl > 0: now_ts = time.time() cached = self._price_cache.get(stock_code) if cached: ts, output = cached if now_ts - ts <= cache_ttl: logger.debug(f"[현재가API-캐시히트] code={stock_code} ttl={cache_ttl}s") return output path = "/uapi/domestic-stock/v1/quotations/inquire-price" tr_id = "FHKST01010100" params = {"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": stock_code} logger.debug(f"[현재가API] 호출 code={stock_code} path={path} TR_ID={tr_id}") r = self._get(path, tr_id, params) if r.status_code != 200: try: body_preview = (r.text or "")[:300] except Exception: body_preview = "" logger.warning( f"[현재가API] HTTP 실패 code={stock_code} path={path} TR_ID={tr_id} " f"status={r.status_code} body={body_preview}" ) return None try: j = r.json() except Exception as e: logger.warning( f"[현재가API] JSON 파싱 실패 code={stock_code} path={path} TR_ID={tr_id} exception={e}" ) return None if j.get("rt_cd") != "0": logger.warning( f"[현재가API] 오류 code={stock_code} path={path} TR_ID={tr_id} " f"rt_cd={j.get('rt_cd')} msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}" ) return None output = j.get("output") if cache_ttl > 0 and output is not None: try: self._price_cache[stock_code] = (time.time(), output) except Exception: pass return output def inquire_multprice(self, stock_codes: List[str], max_per_call: int = 20): """ 다중 종목 현재가 조회 [intstock-multprice] - 한투 API: /uapi/domestic-stock/v1/quotations/intstock-multprice - 성공 시 {종목코드: output딕셔너리} 반환, 실패 시 None (오류 시 rt_cd/msg_cd/msg1 로깅) - TR_ID: FHKST01010600 - ⚠️ 배치 응답에는 stck_oprc(시가)/stck_hgpr(고가)/stck_lwpr(저가)가 없을 수 있음(API 스펙). 시가·고가·저가 필요 시 단건 inquire_price() 사용. """ if not stock_codes: return None codes = list(stock_codes)[: max_per_call * 10] result = {} for i in range(0, len(codes), max_per_call): chunk = codes[i : i + max_per_call] iscd = ",".join(chunk) path = "/uapi/domestic-stock/v1/quotations/intstock-multprice" tr_id = "FHKST01010600" params = {"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": iscd} r = self._get(path, tr_id, params) if r.status_code != 200: try: body_preview = (r.text or "")[:300] except Exception: body_preview = "" logger.warning( f"[다중시세API] HTTP 실패 status={r.status_code} body={body_preview}" ) continue try: j = r.json() except Exception as e: logger.warning(f"[다중시세API] JSON 파싱 실패 exception={e}") continue if j.get("rt_cd") != "0": logger.warning( f"[다중시세API] 오류 rt_cd={j.get('rt_cd')} " f"msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}" ) continue out = j.get("output") if out is None: continue if isinstance(out, list): for item in out: if isinstance(item, dict): code = ( item.get("stck_shrn_iscd") or item.get("rsym") or item.get("FID_INPUT_ISCD") or item.get("mksc_shrn_iscd") ) if code: result[code] = item elif isinstance(out, dict): code = ( out.get("stck_shrn_iscd") or out.get("rsym") or out.get("FID_INPUT_ISCD") ) if code: result[code] = out time.sleep(random.uniform(0.2, 0.5)) return result if result else None def inquire_prices_batch(self, stock_codes: List[str]): """ 다중 종목 현재가 일괄 조회 - intstock-multprice API 우선 시도 후, 실패 시 순차 조회(inquire_price)로 fallback - 순차 조회 시 종목당 0.3~0.6초 딜레이 - 배치 응답에는 stck_prpr(현재가)는 있으나, stck_oprc/stck_hgpr/stck_lwpr 는 없을 수 있음. 시가·고가·저가 필요 시 배치 대신 종목별 inquire_price() 사용(개미털기 스캔은 이미 단건 사용). """ if not stock_codes: return {} multi = self.inquire_multprice(stock_codes) if multi: return multi result = {} for code in stock_codes: try: price_data = self.inquire_price(code) if price_data: result[code] = price_data time.sleep(random.uniform(0.3, 0.6)) except Exception as e: logger.warning(f"종목 조회 실패({code}) exception={e!r}") continue return result def get_account_balance(self): """계좌 잔고 조회 [v1_국내주식-010]. 모의/실전에 따라 TR ID 분기 (EGW2004 방지).""" if self.mock: tr_id = "VTTC8434R" else: tr_id = "TTTC8434R" params = { "CANO": self.acc_no, "ACNT_PRDT_CD": self.acc_code, "AFHR_FLPR_YN": "N", "OFL_YN": "", "INQR_DVSN": "01", # 01: 예수금/잔고 요약 (output2에 예수금) - 블로그·한투 문서 기준 "UNPR_DVSN": "01", "FUND_STTL_ICLD_YN": "N", "FNCG_AMT_AUTO_RDPT_YN": "N", "PRCS_DVSN": "00", # 00: 조회 (블로그 기준) "CTX_AREA_FK100": "", "CTX_AREA_NK100": "", } try: logger.info(f"💵 [예수금] 잔고 조회 요청: TR={tr_id}, CANO={self.acc_no}, ACNT_PRDT_CD={self.acc_code}, 모의={self.mock}") r = self._get( "/uapi/domestic-stock/v1/trading/inquire-balance", tr_id, params, ) if r.status_code != 200: logger.warning( f"💵 [예수금] 잔고 API HTTP 오류: status={r.status_code}, body={getattr(r, 'text', '')[:200]} | " f"TR={tr_id} (모의={self.mock}), CANO={self.acc_no}, ACNT_PRDT_CD={self.acc_code}. " f"EGW2004 시 모의면 VTTC8434R/실전이면 TTTC8434R 확인" ) return None j = r.json() if j.get("rt_cd") != "0": msg1 = (j.get("msg1") or "")[:150] msg_cd = j.get("msg_cd", "") logger.error( f"💵 [예수금] 잔고 API 응답 오류: rt_cd={j.get('rt_cd')}, msg_cd={msg_cd}, msg1={msg1} | " f"요청 파라미터: TR={tr_id}, CANO={self.acc_no}({len(self.acc_no)}자리), " f"ACNT_PRDT_CD={self.acc_code}({len(self.acc_code)}자리), 모의={self.mock} | " f"전체 응답: {j}" ) if "OPSQ2000" in str(msg_cd) or "INVALID_CHECK_ACNO" in msg1: logger.error( "💵 [예수금] OPSQ2000 = 계좌번호 검증 실패. " f"모의투자 서버({self.base_url})에 계좌번호 CANO={self.acc_no}, ACNT_PRDT_CD={self.acc_code}가 등록되어 있는지 확인. " f"한투 모의투자 앱/웹에서 계좌번호 확인 필요. DB의 KIS_ACCOUNT_NO_MOCK/KIS_ACCOUNT_CODE_MOCK 값 확인." ) return None logger.debug(f"💵 [예수금] 잔고 조회 성공: output2 keys={list(j.get('output2', [{}])[0].keys()) if isinstance(j.get('output2'), list) and j.get('output2') else []}") return j except Exception as e: logger.error(f"💵 [예수금] 잔고 조회 예외: {e} | CANO={self.acc_no}, ACNT_PRDT_CD={self.acc_code}, 모의={self.mock}") return None def get_account_evaluation(self): """계좌 평가 잔고 조회 [v1_국내주식-011]. 모의=VTTC8494R, 실전=TTTC8494R.""" if self.mock: tr_id = "VTTC8494R" else: tr_id = "TTTC8494R" try: logger.info(tr_id) r = self._get( "/uapi/domestic-stock/v1/trading/inquire-balance-rlz-pl", tr_id, { "CANO": self.acc_no, "ACNT_PRDT_CD": self.acc_code, "AFHR_FLPR_YN": "N", "OFL_YN": "", "INQR_DVSN": "01", "UNPR_DVSN": "01", "FUND_STTL_ICLD_YN": "N", "FNCG_AMT_AUTO_RDPT_YN": "N", "PRCS_DVSN": "01", "CTX_AREA_FK100": "", "CTX_AREA_NK100": "", }, ) if r.status_code != 200: return None j = r.json() if j.get("rt_cd") != "0": return None return j except Exception as e: logger.error(f"계좌 평가 조회 실패: {e}") return None def get_order_history(self, start_date=None, end_date=None): """주문 체결 내역 조회 [v1_국내주식-012] (모의=VTTC8001R, 실전=TTTC8001R)""" try: if self.mock: tr_id = "VTTC8001R" else: tr_id = "TTTC8001R" if not start_date: start_date = dt.now().strftime("%Y%m%d") if not end_date: end_date = dt.now().strftime("%Y%m%d") r = self._get( "/uapi/domestic-stock/v1/trading/inquire-daily-ccld", tr_id, { "CANO": self.acc_no, "ACNT_PRDT_CD": self.acc_code, "INQR_STRT_DT": start_date, "INQR_END_DT": end_date, "SLL_BUY_DVSN_CD": "00", # 00:전체 "INQR_DVSN": "00", # 00:역순 "PDNO": "", "CCLD_DVSN": "00", # 00:전체 "ORD_GNO_BRNO": "", "ODNO": "", "INQR_DVSN_3": "00", "INQR_DVSN_1": "", "CTX_AREA_FK100": "", "CTX_AREA_NK100": "", }, ) if r.status_code != 200: return None j = r.json() if j.get("rt_cd") != "0": return None return j except Exception as e: logger.error(f"주문 내역 조회 실패: {e}") return None def get_execution_by_odno(self, odno, code=None, wait_sec=2): """ 주문번호(ODNO)로 당일 체결 내역 조회. 시장가 매수 후 실제 체결가/체결수량 확인용. [v1_국내주식-012] inquire-daily-ccld 응답 output2에서 해당 주문 찾아 체결정보 반환. Args: odno: 주문번호 (buy_order 성공 시 반환값) code: 종목코드 (선택, 일치 행 필터용) wait_sec: 조회 전 대기 초 (체결 반영 지연 고려, 기본 2초) Returns: 성공 시 {"filled_qty": int, "avg_price": float}, 미체결/조회실패 시 None """ if not odno: return None time.sleep(max(0, wait_sec)) try: j = self.get_order_history() if not j: return None out2 = j.get("output2", []) if isinstance(out2, dict): out2 = [out2] for row in out2: row_odno = str(row.get("ODNO") or row.get("ord_no") or "").strip() row_pdno = str(row.get("PDNO") or row.get("pdno") or "").strip() if row_odno != str(odno).strip(): continue if code and row_pdno and row_pdno != str(code).strip(): continue # 총체결수량 / 체결평균가 (한투 문서 필드명 다양하므로 후보 나열) filled = row.get("tot_ccld_qty") or row.get("TOT_CCLD_QTY") or row.get("ccld_qty") or row.get("ord_qty") or row.get("ORD_QTY") avg_pr = row.get("avg_prvs") or row.get("AVG_PRVS") or row.get("rjct_avg_prvs") or row.get("RJCT_AVG_PRVS") or row.get("ord_unpr") or row.get("ORD_UNPR") if filled is not None and avg_pr is not None: qty = int(float(str(filled).replace(",", ""))) price = float(str(avg_pr).replace(",", "")) if qty > 0 and price > 0: logger.debug(f"[체결조회] ODNO={odno} -> 체결수량={qty}, 체결평균가={price:,.0f}") return {"filled_qty": qty, "avg_price": price} logger.debug(f"[체결조회] ODNO={odno} 해당 주문 체결 내역 없음 (output2 건수={len(out2)})") return None except Exception as e: logger.warning(f"주문번호 체결 조회 예외 ODNO={odno}: {e}") return None def get_volume_surge_stocks(self, market="J", min_volume_rate="50", limit=50): """거래량 급증 종목 조회 [v1_국내주식-023]""" try: r = self._get( "/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice", "FHKST03010200", { "FID_COND_MRKT_DIV_CODE": market, "FID_INPUT_ISCD": "", "FID_INPUT_HOUR_1": dt.now().strftime("%Y%m%d"), "FID_INPUT_HOUR_2": dt.now().strftime("%Y%m%d"), "FID_PW_DATA_INCU_YN": "Y", }, ) # 실제로는 거래량 급증 API를 사용해야 하지만, 여기서는 예시로 현재가 조회 활용 # 실제 구현 시: /uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice 사용 return [] except Exception as e: logger.error(f"거래량 급증 종목 조회 실패: {e}") return [] def get_top_price_movers(self, market="J", sort_type="1", limit=50): """등락률 상위 종목 조회 [v1_국내주식-027]""" try: r = self._get( "/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice", "FHKST03010200", { "FID_COND_MRKT_DIV_CODE": market, "FID_INPUT_ISCD": "", "FID_INPUT_HOUR_1": dt.now().strftime("%Y%m%d"), "FID_INPUT_HOUR_2": dt.now().strftime("%Y%m%d"), "FID_PW_DATA_INCU_YN": "Y", }, ) # 실제 구현 필요 return [] except Exception as e: logger.error(f"등락률 상위 조회 실패: {e}") return [] def get_investor_trend(self, stock_code, days=5): """외국인/기관 매매 동향 조회""" path = "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice" tr_id = "FHKST03010100" try: # 일봉 데이터에서 외국인/기관 정보 추출 end_dt = dt.now() start_dt = end_dt - datetime.timedelta(days=days + 10) start_ymd = start_dt.strftime("%Y%m%d") end_ymd = end_dt.strftime("%Y%m%d") params = { "FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": stock_code, "FID_INPUT_DATE_1": start_ymd, "FID_INPUT_DATE_2": end_ymd, "FID_PERIOD_DIV_CODE": "D", "FID_ORG_ADJ_PRC": "1", } logger.debug(f"[투자자동향] 호출 code={stock_code} path={path} TR_ID={tr_id}") r = self._get(path, tr_id, params) if r.status_code != 200: logger.warning(f"[투자자동향] HTTP 실패 code={stock_code} path={path} TR_ID={tr_id} status={r.status_code}") return None j = r.json() if j.get("rt_cd") != "0": logger.warning( f"[투자자동향] 오류 code={stock_code} path={path} TR_ID={tr_id} " f"rt_cd={j.get('rt_cd')} msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}" ) return None out2 = j.get("output2", []) if not out2: return None # 최근 N일 외국인/기관 순매수 합계 foreign_sum = 0 org_sum = 0 for item in out2[:days]: try: foreign_raw = item.get("frgn_ntby_qty") or item.get("frgn_ntby_shnu") or "0" foreign_net = int(float(str(foreign_raw).replace(",", "").replace("+", "").replace("-", ""))) if str(foreign_raw).startswith("-"): foreign_net = -foreign_net org_raw = item.get("orgn_ntby_qty") or "0" org_net = int(float(str(org_raw).replace(",", "").replace("+", "").replace("-", ""))) if str(org_raw).startswith("-"): org_net = -org_net foreign_sum += foreign_net org_sum += org_net except: continue return { "foreign_net_buy": foreign_sum, "org_net_buy": org_sum, "total_net_buy": foreign_sum + org_sum, } except Exception as e: logger.error(f"외국인/기관 동향 조회 실패({stock_code}): {e}") return None def get_daily_chart(self, code, limit=10): """일봉 차트 조회 [v1_국내주식-017] - 거래대금(대/중/소형) 계산용""" path = "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice" tr_id = "FHKST03010100" try: end_dt = dt.now() start_dt = end_dt - datetime.timedelta(days=limit + 30) start_ymd = start_dt.strftime("%Y%m%d") end_ymd = end_dt.strftime("%Y%m%d") params = { "FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": code, "FID_INPUT_DATE_1": start_ymd, "FID_INPUT_DATE_2": end_ymd, "FID_PERIOD_DIV_CODE": "D", "FID_ORG_ADJ_PRC": "1", } logger.debug(f"[일봉차트] 호출 code={code} path={path} TR_ID={tr_id}") r = self._get(path, tr_id, params) if r.status_code != 200: logger.warning(f"[일봉차트] HTTP 실패 code={code} path={path} TR_ID={tr_id} status={r.status_code}") return pd.DataFrame() j = r.json() if j.get("rt_cd") != "0": logger.warning( f"[일봉차트] 오류 code={code} path={path} TR_ID={tr_id} " f"rt_cd={j.get('rt_cd')} msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}" ) return pd.DataFrame() data = j.get("output2", []) if not data: return pd.DataFrame() rows = [] for item in data: try: rows.append({ "date": item.get("stck_bsop_date", ""), "open": abs(float(item.get("stck_oprc", 0))), "high": abs(float(item.get("stck_hgpr", 0))), "low": abs(float(item.get("stck_lwpr", 0))), "close": abs(float(item.get("stck_clpr", 0))), "volume": int(item.get("acml_vol", 0)), }) except Exception: continue if not rows: return pd.DataFrame() df = pd.DataFrame(rows) df = df.sort_values("date").reset_index(drop=True) return df.tail(limit) except Exception as e: logger.error(f"일봉 조회 실패({code}): {e}") return pd.DataFrame() def buy_order(self, code, qty, price=0, order_type="01"): """ 매수 주문 (모의=VTTC0802U, 실전=TTTC0802U) Args: code: 종목코드 qty: 수량 price: 가격 (0이면 시장가) order_type: 주문구분 - "01": 시장가 - "00": 지정가 - "05": 조건부지정가 - "06": 최유리지정가 - "07": 최우선지정가 - "10": 보통(IOC) - "13": 시장가(IOC) - "16": 최유리(IOC) - "20": 보통(FOK) - "23": 시장가(FOK) - "26": 최유리(FOK) """ try: if self.mock: tr_id = "VTTC0802U" else: tr_id = "TTTC0802U" if price > 0: ord_unpr = str(price) else: ord_unpr = "0" path = "/uapi/domestic-stock/v1/trading/order-cash" body = { "CANO": self.acc_no, "ACNT_PRDT_CD": self.acc_code, "PDNO": code, "ORD_DVSN": order_type, "ORD_QTY": str(qty), "ORD_UNPR": ord_unpr, } logger.debug(f"[매수주문] 호출 code={code} qty={qty} price={price} path={path} TR_ID={tr_id}") r = self._post(path, tr_id, body, use_hashkey=True) if r.status_code != 200: logger.error(f"[매수주문] HTTP 에러 code={code} path={path} TR_ID={tr_id} status={r.status_code}") return False j = r.json() if j.get("rt_cd") == "0": self._last_order_msg_cd = None self._last_order_msg1 = None ord_no = j.get("output", {}).get("ODNO", "") logger.info(f"✅ 매수 주문 성공: {code} {qty}주 (주문번호: {ord_no})") # 체결 확인용으로 주문번호 반환 (실패 시 False) return ord_no if ord_no else True else: # 매매불가 등 실패 시 bot에서 당일 제외용으로 구분할 수 있도록 저장 self._last_order_msg_cd = j.get("msg_cd", "") self._last_order_msg1 = str(j.get("msg1", "") or "") logger.error( f"[매수주문] 실패 code={code} path={path} TR_ID={tr_id} " f"rt_cd={j.get('rt_cd')} msg_cd={self._last_order_msg_cd} msg1={self._last_order_msg1}" ) return False except Exception as e: self._last_order_msg_cd = None self._last_order_msg1 = None logger.error(f"매수 주문 예외({code}): {e}") return False def buy_market_order(self, code, qty): """ 시장가 매수 주문. - 모의투자: FOK/IOC 미지원이므로 무조건 "01" 일반 시장가. - 실전: USE_MARKET_IOC=true면 "13" 시장가 IOC, false면 "01" 일반 시장가. """ if self.mock: order_type = "01" else: use_ioc = get_env_from_db("USE_MARKET_IOC", "true").strip().lower() in ("true", "1", "yes") order_type = "13" if use_ioc else "01" return self.buy_order(code, qty, price=0, order_type=order_type) def sell_order(self, code, qty, price=0, order_type="01"): """ 매도 주문 (모의=VTTC0801U, 실전=TTTC0801U) Args: code: 종목코드 qty: 수량 price: 가격 (0이면 시장가) order_type: 주문구분 (buy_order와 동일) """ try: if self.mock: tr_id = "VTTC0801U" else: tr_id = "TTTC0801U" if price > 0: ord_unpr = str(price) else: ord_unpr = "0" path = "/uapi/domestic-stock/v1/trading/order-cash" body = { "CANO": self.acc_no, "ACNT_PRDT_CD": self.acc_code, "PDNO": code, "ORD_DVSN": order_type, "ORD_QTY": str(qty), "ORD_UNPR": ord_unpr, } logger.debug(f"[매도주문] 호출 code={code} qty={qty} price={price} path={path} TR_ID={tr_id}") r = self._post(path, tr_id, body, use_hashkey=True) if r.status_code != 200: logger.error(f"[매도주문] HTTP 에러 code={code} path={path} TR_ID={tr_id} status={r.status_code}") return False j = r.json() if j.get("rt_cd") == "0": self._last_sell_msg_cd = None self._last_sell_msg1 = None ord_no = j.get("output", {}).get("ODNO", "") logger.info(f"✅ 매도 주문 성공: {code} {qty}주 (주문번호: {ord_no})") return True else: # execute_sell 에서 실패 원인(영업일 아님 등) 구분할 수 있도록 저장 self._last_sell_msg_cd = j.get("msg_cd", "") self._last_sell_msg1 = str(j.get("msg1", "") or "") logger.error( f"[매도주문] 실패 code={code} path={path} TR_ID={tr_id} " f"rt_cd={j.get('rt_cd')} msg_cd={self._last_sell_msg_cd} msg1={self._last_sell_msg1}" ) return False except Exception as e: self._last_sell_msg_cd = None self._last_sell_msg1 = None logger.error(f"매도 주문 예외({code}): {e}") return False def sell_market_order(self, code, qty): """시장가 매도 주문 (간편 메서드)""" return self.sell_order(code, qty, price=0, order_type="01") def get_minute_chart(self, code, period="3", limit=100): """분봉 차트 조회 [v1_국내주식-017]""" path = "/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice" tr_id = "FHKST03010200" try: end_dt = dt.now() start_dt = end_dt - datetime.timedelta(days=1) start_ymd = start_dt.strftime("%Y%m%d") end_ymd = end_dt.strftime("%Y%m%d") # 분봉 코드: 1분=1, 3분=3, 5분=5, 10분=10, 30분=30, 60분=60 period_map = {"1": "1", "3": "3", "5": "5", "10": "10", "30": "30", "60": "60"} period_code = period_map.get(str(period), "3") params = { "FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": code, "FID_INPUT_HOUR_1": start_ymd, "FID_INPUT_HOUR_2": end_ymd, "FID_PW_DATA_INCU_YN": "Y", "FID_ETC_CLS_CODE": "", # 기타분류코드 (필수 파라미터, 빈 값 가능) } logger.debug(f"[분봉차트] 호출 code={code} period={period} path={path} TR_ID={tr_id}") r = self._get(path, tr_id, params) if r.status_code != 200: logger.warning(f"[분봉차트] HTTP 실패 code={code} path={path} TR_ID={tr_id} status={r.status_code}") return pd.DataFrame() j = r.json() if j.get("rt_cd") != "0": logger.warning( f"[분봉차트] 오류 code={code} path={path} TR_ID={tr_id} " f"rt_cd={j.get('rt_cd')} msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}" ) return pd.DataFrame() data = j.get("output2", []) if not data: return pd.DataFrame() rows = [] for item in data: try: # 정렬용: 영업일자+체결시각(있으면) → 마지막 봉이 실제 최신봉이 되도록 date_str = str(item.get("stck_bsop_date", "") or "") time_str = str(item.get("stck_cntg_hour", "") or item.get("cntg_hour", "") or "000000") sort_key = date_str + time_str rows.append({ "time": sort_key, "open": abs(float(item.get("stck_oprc", 0))), "high": abs(float(item.get("stck_hgpr", 0))), "low": abs(float(item.get("stck_lwpr", 0))), "close": abs(float(item.get("stck_clpr", 0))), "volume": int(item.get("acml_vol", 0)), }) except Exception: continue if not rows: return pd.DataFrame() df = pd.DataFrame(rows) df = df.sort_values("time").reset_index(drop=True) # 기술적 지표 추가 # RSI 기간: DB/env의 RSI_PERIOD 로 조절 (기본 14, 단타/스캘핑 시 3·5 권장) # RSI 수학적 안정화를 위해 호출 측에서 limit≥100 이상 요청하는 것이 전제 rsi_period = get_env_int("RSI_PERIOD", 14) if len(df) >= rsi_period: delta = df["close"].diff(1) gain = delta.where(delta > 0, 0).rolling(window=rsi_period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=rsi_period).mean() rs = gain / loss.replace(0, float("nan")) df["RSI"] = 100 - (100 / (1 + rs)) if len(df) >= 20: df["MA20"] = df["close"].rolling(window=20).mean() # MA5: check_buy_signal_tail_catch 에서 ma5_gap_pct 계산에 사용 (없으면 None으로 처리됨) if len(df) >= 5: df["MA5"] = df["close"].rolling(window=5).mean() return df.tail(limit) except Exception as e: logger.error(f"분봉 조회 실패({code}): {e}") return pd.DataFrame() def get_orderbook(self, stock_code): """호가 조회 [v1_국내주식-009]""" try: r = self._get( "/uapi/domestic-stock/v1/quotations/inquire-asking-price-exp-ccn", "FHKST01010200", { "FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": stock_code, }, ) if r.status_code != 200: return None j = r.json() if j.get("rt_cd") != "0": return None return j.get("output") except Exception as e: logger.error(f"호가 조회 실패({stock_code}): {e}") return None # ============================================================ # 순위분석 API (키움 봇과 동일한 로직 + 레버리지/스팩/ETN 제외 옵션) # ============================================================ @staticmethod def _is_valid_stock(name: str, code: str) -> bool: """ 종목 필터링 (키움 kiwoom_trader_ver2와 동일, ETF는 포함) - 스팩, ETN, 우선주, 레버리지, 인버스 등만 제외 (ETF는 위험도 낮아 포함) """ if not code or len(code) != 6 or not code.isdigit(): return False name = (name or "").strip() exclude = [ "스팩", "SPAC", "ETN", "W", "ELW", "채권", "레버리지", "인버스", "곱버스", "선물", "콜", "풋", "2X", "3X", "합성", "H", "B", ] # ETF는 exclude 목록에 없음 → 일반 주식·ETF 모두 통과 if any(k in name for k in exclude): return False if name.endswith("우") or name.endswith("우B"): return False return True def _filter_rank_by_valid_stock(self, rank_list: list) -> list: """랭크 API 응답 리스트에서 스팩/ETN/레버리지 등 제외 (키움 옵션과 동일)""" if not rank_list: return [] filtered = [] for item in rank_list: code = (item.get("stk_cd") or item.get("mksc_shrn_iscd") or item.get("code") or "").strip() name = (item.get("stk_nm") or item.get("prst_name") or item.get("hts_kor_isnm") or "").strip() if self._is_valid_stock(name, code): filtered.append(item) return filtered def _fetch_volume_rank_paged( self, market: str, blng_cls_code: str, limit: int, exclude_spec_etn_leverage: bool, ) -> list: """ 거래량순위 API 1회 조회 (한투 volume-rank API는 다음 페이지 tr_cont 미지원 → 1회만 호출). """ path = "/uapi/domestic-stock/v1/quotations/volume-rank" tr_id = "FHPST01710000" params = { "FID_COND_MRKT_DIV_CODE": market, "FID_COND_SCR_DIV_CODE": "20171", "FID_INPUT_ISCD": "0000", "FID_DIV_CLS_CODE": "0", "FID_BLNG_CLS_CODE": blng_cls_code, "FID_TRGT_CLS_CODE": "111111111", "FID_TRGT_EXLS_CLS_CODE": "0000000000", "FID_INPUT_PRICE_1": "0", "FID_INPUT_PRICE_2": "0", "FID_VOL_CNT": "0", "FID_INPUT_DATE_1": "", } try: time.sleep(0.5) r = self._get(path, tr_id, params, tr_cont=None) if r.status_code != 200: return [] j = r.json() if j.get("rt_cd") != "0": return [] output = j.get("output", []) if exclude_spec_etn_leverage: output = self._filter_rank_by_valid_stock(output) logger.info(f" 📡 [순위API] 수신 {len(output)}건 (다음페이지 미지원 → 1회만 호출)") return output[:limit] except Exception as e: logger.debug(f"거래량순위 조회 실패: {e}") return [] def get_volume_rank( self, market: str = "J", limit: int = 50, exclude_spec_etn_leverage: bool = True, ): """ 거래량순위 조회 [v1_국내주식-047] (1회 호출, API가 반환한 건수만큼 수집) """ try: output = self._fetch_volume_rank_paged( market=market, blng_cls_code="0", limit=limit, exclude_spec_etn_leverage=exclude_spec_etn_leverage, ) return output except Exception as e: logger.debug(f"거래량순위 조회 실패: {e}") return [] def get_price_change_rank( self, market: str = "J", sort_type: str = "1", limit: int = 50, exclude_spec_etn_leverage: bool = True, ): """ 등락률순위 조회 (동일 volume-rank API, FID_BLNG_CLS_CODE로 등락 구분) sort_type: "1"=상승률 상위, "2"=하락률 상위(낙폭 큰 종목, N자 망치봉 스캔에 유리) """ # 한투 volume-rank API: 4=등락률(상승), 5=등락률(하락). 미지원 시 빈값/에러 가능. blng = "5" if sort_type == "2" else "4" try: out = self._fetch_volume_rank_paged( market=market, blng_cls_code=blng, limit=limit, exclude_spec_etn_leverage=exclude_spec_etn_leverage, ) if out: return out except Exception as e: logger.debug(f"등락률순위(blng={blng}) 조회 실패: {e}") # API가 4/5 미지원이면 기존처럼 거래량순위로 fallback (상승 위주 후보 확보) if sort_type != "2": return self.get_volume_rank(market=market, limit=limit, exclude_spec_etn_leverage=exclude_spec_etn_leverage) return [] def get_price_decline_rank( self, market: str = "J", limit: int = 100, exclude_spec_etn_leverage: bool = True, ): """하락률 순위(낙폭 큰 종목) 조회. N자 망치봉/개미털기 스캔 유니버스 확대용.""" return self.get_price_change_rank( market=market, sort_type="2", limit=limit, exclude_spec_etn_leverage=exclude_spec_etn_leverage, ) def get_trading_value_rank( self, market: str = "J", limit: int = 50, exclude_spec_etn_leverage: bool = True, ): """거래대금순위 조회 (연속 조회로 limit건까지). FID_BLNG_CLS_CODE=3""" try: return self._fetch_volume_rank_paged( market=market, blng_cls_code="3", limit=limit, exclude_spec_etn_leverage=exclude_spec_etn_leverage, ) except Exception as e: logger.debug(f"거래대금순위 조회 실패: {e}") return [] def get_turnover_rank( self, market: str = "J", limit: int = 50, exclude_spec_etn_leverage: bool = True, ): """회전율순위 조회 (연속 조회로 limit건까지). FID_BLNG_CLS_CODE=2""" try: return self._fetch_volume_rank_paged( market=market, blng_cls_code="2", limit=limit, exclude_spec_etn_leverage=exclude_spec_etn_leverage, ) except Exception as e: logger.debug(f"회전율순위 조회 실패: {e}") return [] def get_volume_growth_rank( self, market: str = "J", limit: int = 50, exclude_spec_etn_leverage: bool = True, ): """거래증가율순위 조회 (연속 조회로 limit건까지). FID_BLNG_CLS_CODE=1""" try: return self._fetch_volume_rank_paged( market=market, blng_cls_code="1", limit=limit, exclude_spec_etn_leverage=exclude_spec_etn_leverage, ) except Exception as e: logger.debug(f"거래증가율순위 조회 실패: {e}") return [] def get_execution_strength_rank( self, market: str = "J", limit: int = 200, exclude_spec_etn_leverage: bool = True, ): """체결강도 상위 순위 조회 (FHPST01710000, FID_BLNG_CLS_CODE=6). 매수세 강한 종목 필터용.""" try: return self._fetch_volume_rank_paged( market=market, blng_cls_code="6", limit=limit, exclude_spec_etn_leverage=exclude_spec_etn_leverage, ) except Exception as e: logger.debug(f"체결강도순위 조회 실패: {e}") return [] def get_execution_strength_map(self, market: str = "J", limit: int = 200): """ 체결강도 상위 API 조회 후 종목코드 -> 체결강도 값 매핑 반환. output 필드: cntr_str(체결강도) 등 한투 문서 기준으로 파싱. 미제공 시 0. """ strength_map = {} try: rows = self.get_execution_strength_rank(market=market, limit=limit, exclude_spec_etn_leverage=True) for item in (rows or []): code = (item.get("stk_cd") or item.get("mksc_shrn_iscd") or item.get("code") or "").strip() if not code or len(code) != 6: continue # 한투 volume-rank 체결강도: cntr_str 또는 유사 필드 (문서 확인 후 조정) raw = item.get("cntr_str") or item.get("exec_str") or item.get("strg_rt") or item.get("prdy_ctrt") or "" try: strength = float(str(raw).replace(",", "").strip()) if raw else 0 except (ValueError, TypeError): strength = 0 strength_map[code] = strength except Exception as e: logger.debug(f"체결강도 맵 조회 실패: {e}") return strength_map # ============================================================ # Mattermost 봇 클래스 # ============================================================ class MattermostBot: """Mattermost 알림 봇""" def __init__(self): self.api_url = f"{MM_SERVER_URL.rstrip('/')}/api/v4/posts" self.headers = { "Authorization": f"Bearer {MM_BOT_TOKEN}", "Content-Type": "application/json" } self.channels = self._load_channels() def _load_channels(self): """채널 설정 로드""" try: if MM_CONFIG_FILE.exists(): with open(MM_CONFIG_FILE, 'r', encoding='utf-8') as f: return json.load(f).get("channels", {}) return {} except Exception as e: logger.error(f"⚠️ MM 설정 로드 실패: {e}") return {} def send(self, channel_alias, message): """메시지 전송""" channel_id = self.channels.get(channel_alias) if not channel_id: logger.warning(f"❌ '{channel_alias}' 채널 ID 없음") return False payload = {"channel_id": channel_id, "message": message} try: res = requests.post(self.api_url, headers=self.headers, json=payload, timeout=3) res.raise_for_status() return True except Exception as e: logger.error(f"❌ MM 전송 에러: {e}") return False def _save_ai_recommendations_from_text(db_instance, analysis_text: str): """AI 분석문에서 'KEY=값' 추천 줄만 추출해 DB에 저장 (!적용 시 사용).""" if not analysis_text or not db_instance: return valid_keys = set(ENV_CONFIG_KEYS) lines = [] for line in analysis_text.splitlines(): line = line.strip() if not line or line.startswith("#"): continue m = re.match(r"^([A-Z][A-Z0-9_]*)=(.+)$", line) if m and m.group(1) in valid_keys: lines.append(f"{m.group(1)}={m.group(2).strip()}") if lines: db_instance.set_last_ai_recommendations("\n".join(lines)) # ============================================================ # 단타 트레이딩 봇 # ============================================================ class ShortTradingBot: """단타용 트레이딩 봇 - 개미털기(눌림목) 전략""" def __init__(self): self.db = db self.client = KISClient() # Mattermost 초기화 self.mm = MattermostBot() # 단타 봇 전용 채널(alias) 우선 사용, 없으면 기본 채널 사용 self.mm_channel = MM_CHANNEL_SHORT # ML 예측 초기화 (선택적) self.ml_predictor = None if ML_AVAILABLE: try: self.ml_predictor = MLPredictor() # MariaDB 내부 연결 if self.ml_predictor.should_retrain(): self.ml_predictor.train_model(retrain=True) except Exception as e: logger.warning(f"⚠️ ML 예측 초기화 실패: {e}") # RiskManager 초기화 (뼈대만 생성 → reload_config에서 DB 값으로 실시간 갱신) self.risk_mgr = None if RISK_MANAGER_AVAILABLE: self.risk_mgr = RiskManager( risk_pct_per_trade=get_env_float("RISK_PCT_PER_TRADE", 0.01), max_position_pct=get_env_float("MAX_POSITION_PCT", 0.15), min_position_amount=get_env_int("MIN_POSITION_AMOUNT", 50000), use_kelly=get_env_bool("USE_KELLY_FORMULA", True), kelly_multiplier=get_env_float("KELLY_MULTIPLIER", 0.25), slot_base_amount_cap=get_env_int("SLOT_BASE_AMOUNT_CAP", 0), # ── 무조건 깔고 가는 MAX_LOSS 기반 투자 상한 ───────────── # ATR 계산 결과가 아무리 커도 이 상한 초과 불가 max_loss_per_trade_krw=get_env_int("MAX_LOSS_PER_TRADE_KRW", 200000), stop_loss_pct=get_env_float("STOP_LOSS_PCT", -0.03), # ── 사이즈 클래스별 비율 (DB에서 주입) ─────────────────── size_small_ratio=get_env_float("SIZE_CLASS_SMALL_RATIO", 0.70), size_mid_ratio=get_env_float("SIZE_CLASS_MID_RATIO", 0.85), ) logger.info("✅ RiskManager 뼈대 생성 완료") else: logger.warning("⚠️ RiskManager 미사용: 고정 슬롯 금액 방식으로 폴백") # ★ [실시간 리로드] DB에서 최신 설정값을 불러와 봇·RiskManager에 반영 (재시작 없이 적용) self.reload_config() # 리포트 플래그 (reload_config 이후 유지) self.morning_report_sent = False self.closing_report_sent = False self.final_report_sent = False self.ai_report_sent = False # 자산 추적 self.today_date = dt.now().strftime("%Y-%m-%d") self.start_day_asset = 0 self.current_total_asset = 0 self.current_cash = 0 self.d2_excc_amt = 0 # D+2 예수금 (output2 prvs_rcdl_excc_amt) self.total_deposit = get_env_float("TOTAL_DEPOSIT", 0) # DB에서 활성 트레이드 로드 (단타만: SHORT_% - 늘림목과 섞이지 않도록) self.holdings = {} # 당일 매매불가로 확인된 종목 (같은 종목 반복 주문 방지 → 다음 후보로 넘어감) self.untradable_skip_set = set() # 최근 매도 종목 쿨다운 캐시 {code: 매도_timestamp} # 매도 직후 같은 종목을 즉시 재매수하는 반복매매 루프 방지. # 쿨다운 기간은 REENTRY_COOLDOWN_SEC(기본 5분)으로 조정. self.recently_sold: dict = {} # 매도 실패 백오프 캐시 {code: until_timestamp} # "영업일이 아닙니다" 등 일시적 API 거부 시 재시도 방지. # 재시도 대기 시간은 SELL_FAILURE_BACKOFF_SEC(기본 1800초=30분) 으로 조정. self._sell_backoff: dict = {} active_trades = self.db.get_active_trades(strategy_prefix="SHORT") for code, trade in active_trades.items(): self.holdings[code] = { "buy_price": trade.get("avg_buy_price", 0), "qty": trade.get("current_qty", 0), "buy_time": trade.get("buy_date", dt.now().strftime("%Y-%m-%d %H:%M:%S")), "name": trade.get("name", code), } # ★ 계좌 잔고 API와 동기화 (폰/타 앱으로 산 종목 반영) balance = self.client.get_account_balance() if balance is not None: self._sync_holdings_from_balance(balance) self._update_assets(balance=balance) # 이미 받은 잔고로 자산 갱신 (API 1회만) else: self._update_assets() # 비동기 태스크 관리 self._universe_task = None self._report_task = None self._asset_task = None self.is_first_run = True # ── WebSocket + CandleAggregator 초기화 ────────────────────────────── # 틱 수신 → 3분봉 in-memory 집계 → REST 폴링(get_minute_chart) 전면 대체 # KISWebSocketPriceCache: 실시간 체결가 수신 (check_sell_signals 현재가) # CandleAggregator : 3분봉 OHLCV·RSI 메모리 집계 (buy/ATR/RiskManager) # start() 실패 시 is_active=False → REST fallback 자동 적용 self.ws_cache: Optional["KISWebSocketPriceCache"] = None self.candle_agg: Optional["CandleAggregator"] = None self._init_websocket() # ── WebSocket + CandleAggregator 초기화 / 갭보정 / 구독 관리 ─────────── def _init_websocket(self): """WebSocket 시작 → CandleAggregator(3분봉) 연결 → 종목 구독 → 갭 보정.""" if not _KIS_WS_AVAILABLE: logger.info("ℹ️ kis_ws 모듈 없음 → REST inquire_price / get_minute_chart 폴링 유지") return try: self.ws_cache = KISWebSocketPriceCache( app_key = self.client.app_key, app_secret = self.client.app_secret, is_mock = self.client.mock, ) # CandleAggregator: 3분봉 집계 (buy 타점·ATR·RiskManager 전용) # 3분봉(주전략) + 15분봉 + 60분봉 — 추세 필터 / 다른 전략 확장용 self.candle_agg = CandleAggregator(db=self.db, timeframes=[3, 15, 60]) self.ws_cache.attach_candle_aggregator(self.candle_agg) ws_ok = self.ws_cache.start() if not ws_ok: logger.info("ℹ️ WebSocket 비활성 (모의 or 키 미설정) → REST fallback 유지") self.ws_cache = None self.candle_agg = None return # 봇 재시작 시 보유 종목 즉시 구독 for code in list(self.holdings.keys()): self.ws_cache.subscribe(code) # 유니버스 후보 종목도 미리 구독 (매수 타점 체크 전 봉 데이터 확보) candidates = self.db.get_target_candidates() for c in candidates: code = c.get("code") or c.get("stk_cd", "") if code and code not in self.holdings: self.ws_cache.subscribe(code) # ── 영구 구독 ETF: 시장 방향 필터용 (유니버스 변경과 무관하게 항상 유지) ── perm_raw = get_env_from_db("PERMANENT_WS_CODES", "069500,229200") self._permanent_ws_codes: set = { c.strip() for c in str(perm_raw).split(",") if c.strip() } for code in sorted(self._permanent_ws_codes): self.ws_cache.subscribe(code) logger.info("📡 [영구구독] %s (시장방향 ETF)", code) logger.info( "✅ WebSocket + CandleAggregator(3분봉) 활성 (구독 %d종목) " "— get_minute_chart REST 폴링 대체", len(self.ws_cache._subscribed), ) # 시작 시 REST 갭 보정 (봉 버퍼 비어있는 경우 RSI 안정화) self._fill_all_gaps() except Exception as _ws_e: logger.warning("⚠️ WebSocket 초기화 예외(무시): %s", _ws_e) self.ws_cache = None self.candle_agg = None def _fill_all_gaps(self): """ 봇 시작·재접속 후 구독 중인 모든 종목의 분봉 갭을 보정. RSI(14) 안정화를 위해 limit=120 사용. ▶ 키움 우선 전략: - 키움 ka10080 은 1회 호출에 최대 900봉(≈6개월치) 제공 → 장 초반에도 즉시 봉 확보 가능 - KIS get_minute_chart 는 당일봉만 제공 → 장 시작 직후 봉 부족 → 키움 우선 - 키움 키 없으면 KIS fallback (3분봉만, 15/60분봉은 KIS 지원 안 함) """ if not self.candle_agg or not self.ws_cache: return limit = get_env_int("SHORT_GAP_FILL_LIMIT", 120) with self.ws_cache._sub_lock: codes = set(self.ws_cache._subscribed) # ── 키움 크레덴셜 조회 ──────────────────────────────────────── kw_key, kw_secret, kw_mock = _get_kiwoom_creds(self.db) use_kiwoom = bool(kw_key and kw_secret) logger.info( "🔧 [갭보정] %d종목 분봉 로드 시작 (tfs=%s, limit=%d, kiwoom=%s)", len(codes), self.candle_agg.timeframes, limit, "✅" if use_kiwoom else "❌→KIS fallback", ) for code in sorted(codes): for tf in self.candle_agg.timeframes: df = None # 키움 우선 (토큰은 23시간 캐시 → au10001 한도 방지) if use_kiwoom: try: df = get_kiwoom_candles_df( code, tf, kw_key, kw_secret, is_mock=kw_mock, n=limit, ) except Exception as e: logger.debug("키움 갭보정 실패 (%s %dM): %s", code, tf, e) # KIS fallback: 당일봉만 → 3분봉에만 유효 if (df is None or df.empty) and tf <= 3: try: df = self.client.get_minute_chart( code, period=str(tf), limit=limit ) except Exception as e: logger.debug("KIS 갭보정 실패 (%s %dM): %s", code, tf, e) if df is not None and not df.empty: self.candle_agg.fill_gap_from_rest(code, tf, df) # 같은 종목 내 timeframe 전환: 짧은 딜레이 time.sleep(random.uniform(0.2, 0.4)) # 종목 간 딜레이 time.sleep(random.uniform(0.3, 0.6)) def _sync_subscriptions(self, candidates: list): """ target_candidates DB 목록과 WS 구독 목록 동기화. - 유니버스에서 빠진 종목(보유 중 아닌 것) → unsubscribe + RAM 정리 - 신규 종목 → subscribe + 3분봉 갭 보정 (봉 버퍼 즉시 확보). ※ 영구 구독 ETF(_permanent_ws_codes)는 절대 해제하지 않음 (시장 방향 필터용) """ if not self.ws_cache: return new_codes = {c.get("code") or c.get("stk_cd", "") for c in candidates if c} new_codes.discard("") # 현재 보유 종목은 매도 완료 전까지 반드시 유지 new_codes |= set(self.holdings.keys()) # 영구 구독 ETF는 유니버스와 무관하게 항상 유지 new_codes |= getattr(self, '_permanent_ws_codes', set()) with self.ws_cache._sub_lock: current_subs = set(self.ws_cache._subscribed) # ── 구독 해제: 유니버스에서 빠진 종목 ───────────────────────── # 보유 중 종목은 매도 감시를 위해 구독 유지 for code in sorted(current_subs - new_codes): self.ws_cache.unsubscribe(code) if self.candle_agg: self.candle_agg.remove_code(code) # ── 신규 구독: 유니버스에 새로 들어온 종목 ───────────────────── kw_key, kw_secret, kw_mock = _get_kiwoom_creds(self.db) use_kiwoom = bool(kw_key and kw_secret) for code in sorted(new_codes - current_subs): self.ws_cache.subscribe(code) # 신규 구독 즉시 갭 보정 (봉 없으면 매수 타점 체크 불가) — 키움 우선 if not self.candle_agg: continue lim = get_env_int("SHORT_GAP_FILL_LIMIT", 120) for tf in self.candle_agg.timeframes: df = None if use_kiwoom: try: df = get_kiwoom_candles_df( code, tf, kw_key, kw_secret, is_mock=kw_mock, n=lim, ) except Exception as e: logger.debug("키움 신규갭보정 실패 (%s %dM): %s", code, tf, e) if (df is None or df.empty) and tf <= 3: try: df = self.client.get_minute_chart( code, period=str(tf), limit=lim ) except Exception as e: logger.debug("KIS 신규갭보정 실패 (%s %dM): %s", code, tf, e) if df is not None and not df.empty: self.candle_agg.fill_gap_from_rest(code, tf, df) # tf 간 딜레이 (차트 API, 토큰은 캐시 재사용) time.sleep(random.uniform(0.2, 0.4)) def _get_candles_df(self, code: str, tf: int = 3, n: int = 20) -> Optional[pd.DataFrame]: """ CandleAggregator 메모리 봉 → DataFrame 변환 헬퍼. 확정봉(confirmed) + 진행봉(current, is_confirmed=0) 을 합쳐 get_minute_chart 와 동일한 컬럼(open/high/low/close/volume/RSI/MA5/MA20)을 반환. - 진행봉의 close = 현재가 (최신 틱) → 매수 타점 실시간 포착 - CandleAggregator 미사용·데이터 부족 시 None 반환 → 호출부에서 REST fallback Args: code : 종목코드 tf : 봉 주기(분), 꼬리잡기 전략은 항상 3 n : 반환할 최대 봉 수 (tail 기준) """ if not self.candle_agg: return None # 확정봉: RSI_PERIOD(14)보다 넉넉하게 가져와 RSI 안정화 rsi_period = get_env_int("RSI_PERIOD", 14) fetch_n = max(n + rsi_period + 5, n + 20) confirmed = self.candle_agg.get_candles(code, tf, fetch_n) if not confirmed: return None rows = list(confirmed) # 진행 중인 봉(최신 틱 close) 을 tail 에 추가 → 실시간 캔들 패턴 포착 current = self.candle_agg.get_current_candle(code, tf) if current and current.get("open", 0) > 0 and current.get("close", 0) > 0: rows.append(current) if len(rows) < 2: return None df = pd.DataFrame(rows)[["open", "high", "low", "close", "volume"]].copy() df = df.reset_index(drop=True) # 기술적 지표: get_minute_chart 와 동일 로직 if len(df) >= rsi_period: delta = df["close"].diff(1) gain = delta.where(delta > 0, 0).rolling(window=rsi_period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=rsi_period).mean() rs = gain / loss.replace(0, float("nan")) df["RSI"] = 100 - (100 / (1 + rs)) if len(df) >= 20: df["MA20"] = df["close"].rolling(window=20).mean() if len(df) >= 5: df["MA5"] = df["close"].rolling(window=5).mean() return df.tail(n).reset_index(drop=True) # ── 설정 리로드 ───────────────────────────────────────────────────────── def reload_config(self): """[실시간 리로드] DB(env) 설정을 봇에 반영. 메인 루프마다 호출 시 재시작 없이 적용.""" # [손절/익절 설정] self.stop_loss_pct = get_env_float("STOP_LOSS_PCT", -0.04) # ★ STOP_LOSS_PCT 부호 안전장치: 양수(0.02)로 입력 시 자동으로 음수(-0.02)로 변환. # 양수 값이 그대로 사용되면 profit_pct <= stop_loss_pct 조건이 손익분기 이상에서도 # 참이 돼 칼손절이 수익 구간에서도 발동하는 심각한 버그가 발생함. if self.stop_loss_pct > 0: logger.warning( "🚨 STOP_LOSS_PCT=%.4f 양수 감지 → 자동 부호 반전(%.4f). DB에 음수로 저장 권장 (!설정 STOP_LOSS_PCT=-%.4f)", self.stop_loss_pct, -self.stop_loss_pct, self.stop_loss_pct, ) self.stop_loss_pct = -self.stop_loss_pct self.take_profit_pct = get_env_float("TAKE_PROFIT_PCT", 0.05) self.max_stocks = get_env_int("MAX_STOCKS", 3) self.min_drop_rate = get_env_float("MIN_DROP_RATE", 0.03) self.min_recovery_ratio = get_env_float("MIN_RECOVERY_RATIO_SHORT", 0.5) self.stop_atr_multiplier = get_env_float("STOP_ATR_MULTIPLIER_TAIL", 2.5) self.target_atr_multiplier = get_env_float("TARGET_ATR_MULTIPLIER_TAIL", 8.0) self.min_hold_hours = get_env_float("MIN_HOLD_HOURS", 24.0) # [리스크 관리 설정] self.risk_pct_per_trade = get_env_float("RISK_PCT_PER_TRADE", 0.01) self.kelly_multiplier = get_env_float("KELLY_MULTIPLIER", 0.25) self.max_position_pct = get_env_float("MAX_POSITION_PCT", 0.15) self.min_position_amount = get_env_int("MIN_POSITION_AMOUNT", 50000) use_kelly = get_env_bool("USE_KELLY_FORMULA", True) if self.risk_mgr is not None: self.risk_mgr.risk_pct = self.risk_pct_per_trade self.risk_mgr.max_pos_pct = self.max_position_pct self.risk_mgr.min_amount = self.min_position_amount self.risk_mgr.use_kelly = use_kelly self.risk_mgr.kelly_mult = self.kelly_multiplier # ML 신호 필터링 self.use_ml_signal = get_env_bool("USE_ML_SIGNAL", False) self.ml_min_probability = get_env_float("ML_MIN_PROBABILITY", 0.57) # 자산 기준 (리포트용) self.total_deposit = get_env_float("TOTAL_DEPOSIT", 0) def _get_effective_slot_cap(self) -> float: """ 슬롯당 실투입 상한 금액 계산. - env: SLOT_BASE_AMOUNT_CAP (직접 지정) - env: MAX_LOSS_PER_TRADE_KRW + STOP_LOSS_PCT 로부터 역산 (MAX_LOSS_PER_TRADE_KRW / |STOP_LOSS_PCT|) 둘 다 설정되어 있으면 더 작은 값 사용. 설정이 없으면 0 리턴(제한 없음). """ slot_cap_env = get_env_float("SLOT_BASE_AMOUNT_CAP", 0.0) max_loss_krw = get_env_float("MAX_LOSS_PER_TRADE_KRW", 0.0) stop_abs = abs(self.stop_loss_pct) if self.stop_loss_pct != 0 else 0.0 derived_cap = 0.0 if max_loss_krw > 0 and stop_abs > 0: try: derived_cap = max_loss_krw / stop_abs except Exception: derived_cap = 0.0 candidates = [c for c in (slot_cap_env, derived_cap) if c and c > 0] if not candidates: return 0.0 return float(min(candidates)) def _seconds_until_next_5min(self): """다음 5분 정각까지 남은 초 계산""" now = dt.now() next_min = ((now.minute // 5) + 1) * 5 if next_min >= 60: next_time = now.replace(hour=now.hour + 1, minute=0, second=0, microsecond=0) else: next_time = now.replace(minute=next_min, second=0, microsecond=0) return (next_time - now).total_seconds() def update_universe(self): """ 유니버스 업데이트 (5분마다 호출) - Ver2: 스캔/API 미사용. 후보는 target_candidates 테이블에서만 읽음 (get_target_candidates). DB는 키움 등 외부에서 채우고, 여기서는 API 호출·DB 쓰기 없음. """ logger.info(f"🔄 [유니버스] DB 전용 모드 | 후보는 target_candidates 테이블에서만 조회 (API 미호출)") # 가져오기는 매매 루프에서 self.db.get_target_candidates() 로 유지 async def _universe_scan_scheduler(self): """5분마다 정각에 유니버스 스캔 실행 (비동기 백그라운드)""" loop = asyncio.get_event_loop() while True: try: if self.is_first_run: wait_sec = 0 # 첫 실행은 즉시 else: wait_sec = max(0, self._seconds_until_next_5min()) if wait_sec > 0: await asyncio.sleep(wait_sec) now = dt.now() logger.info(f"🔄 [스캔 주기] 정각 스캔 시작 | 시각:{now.hour:02d}:{now.minute:02d}:{now.second:02d}") # 동기 함수를 executor에서 실행 (메인 루프 블로킹 방지) await loop.run_in_executor(None, self.update_universe) self.is_first_run = False await asyncio.sleep(5) # 스캔 직후 5초 대기 (과부하 방지) except asyncio.CancelledError: break except Exception as e: logger.error(f"❌ [스캔 스케줄러] 에러: {e}") await asyncio.sleep(60) async def _report_scheduler(self): """리포트 전송 스케줄러 (비동기 백그라운드)""" while True: try: await asyncio.sleep(60) # 1분마다 체크 now = dt.now() # 13:00 - 오전 리포트 + AI 리포트 if now.hour == 13 and now.minute == 0 and not self.morning_report_sent: loop = asyncio.get_event_loop() await loop.run_in_executor(None, self.send_morning_report) await loop.run_in_executor(None, self.send_ai_report) # 15:15 - 장마감 전 리포트 elif now.hour == 15 and now.minute == 15 and not self.closing_report_sent: loop = asyncio.get_event_loop() await loop.run_in_executor(None, self.send_closing_report) # 15:35 - 최종 리포트 elif now.hour == 15 and now.minute == 35 and not self.final_report_sent: loop = asyncio.get_event_loop() await loop.run_in_executor(None, self.send_final_report) except asyncio.CancelledError: break except Exception as e: logger.error(f"❌ [리포트 스케줄러] 에러: {e}") await asyncio.sleep(60) async def _asset_update_scheduler(self): """자산 정보 업데이트 스케줄러 (30분마다, 비동기 백그라운드)""" while True: try: await asyncio.sleep(60) # 1분마다 체크 now = dt.now() # 30분마다 자산 업데이트 if now.minute % 30 == 0: loop = asyncio.get_event_loop() await loop.run_in_executor(None, self._update_assets) await asyncio.sleep(60) # 업데이트 후 1분 대기 (중복 방지) except asyncio.CancelledError: break except Exception as e: logger.error(f"❌ [자산 업데이트 스케줄러] 에러: {e}") await asyncio.sleep(60) def _sync_holdings_from_balance(self, balance): """ 계좌 잔고 API 응답(output1)으로 self.holdings 동기화. 폰·타 앱으로 매수한 종목이 계좌에 있으면 holdings에 반영하고, 매도한 종목은 제거. """ try: output1 = balance.get("output1") or [] if isinstance(output1, dict): output1 = [output1] if not isinstance(output1, list): return api_codes = set() for item in output1: code = (item.get("pdno") or item.get("PDNO") or "").strip() if not code: continue # 보유수량 (한투: hldg_qty 등) qty_val = item.get("hldg_qty") or item.get("HLDG_QTY") or item.get("ord_qty") or item.get("ORD_QTY") or 0 qty = int(float(str(qty_val).replace(",", ""))) if qty_val is not None else 0 if qty <= 0: continue api_codes.add(code) # 매입평균가 (한투 pchs_avg_pric). 없거나 0이면 평가금액-평가손익으로 역산 avg_pr_raw = item.get("pchs_avg_pric") or item.get("PCHS_AVG_PRIC") api_buy_price = 0.0 if avg_pr_raw is not None: avg_pr_str = str(avg_pr_raw).replace(",", "").strip() if avg_pr_str not in ("", "0", "0.0"): try: api_buy_price = abs(float(avg_pr_str)) except Exception: api_buy_price = 0.0 if api_buy_price <= 0: try: evlu_amt = float(item.get("evlu_amt") or item.get("EVLU_AMT") or 0) evlu_pfls = float(item.get("evlu_pfls_amt") or item.get("EVLU_PFLS_AMT") or 0) cost = evlu_amt - evlu_pfls api_buy_price = cost / qty if qty and cost > 0 else 0.0 except Exception: api_buy_price = 0.0 if api_buy_price <= 0: api_buy_price = 0.0 # 매수일시 (API에 있으면 그때로, 없으면 현재) buy_time_str = item.get("fstc_pchs_dt") or item.get("FSTC_PCHS_DT") or item.get("pchs_dt") or item.get("PCHS_DT") or item.get("ord_dt") or "" buy_time_str = (buy_time_str or "").strip().replace("-", "").replace(" ", "").replace(":", "") if len(buy_time_str) >= 14: try: buy_time = dt.strptime(buy_time_str[:14], "%Y%m%d%H%M%S").strftime("%Y-%m-%d %H:%M:%S") except Exception: buy_time = dt.now().strftime("%Y-%m-%d %H:%M:%S") elif len(buy_time_str) >= 8: try: buy_time = dt.strptime(buy_time_str[:8], "%Y%m%d").strftime("%Y-%m-%d %H:%M:%S") except Exception: buy_time = dt.now().strftime("%Y-%m-%d %H:%M:%S") else: buy_time = dt.now().strftime("%Y-%m-%d %H:%M:%S") name = (item.get("prdt_name") or item.get("PRDT_NAME") or item.get("prdt_name_eng") or code or "").strip() if not name: name = code existing = self.holdings.get(code) # ⇒ buy_price 결정: API > 기존 holdings > DB 순서로 폴백 buy_price = api_buy_price existing_buy_price = 0.0 if existing: try: existing_buy_price = float(existing.get("buy_price", 0) or 0) except Exception: existing_buy_price = 0.0 if buy_price <= 0 and existing_buy_price > 0: buy_price = existing_buy_price if buy_price <= 0: db_trade = None try: if hasattr(self.db, "get_active_trade"): db_trade = self.db.get_active_trade(code) except Exception as e: logger.debug(f"잔고동기화 DB 평단 조회 실패({code}): {e}") if db_trade: try: db_buy_price = float( db_trade.get("avg_buy_price") or db_trade.get("buy_price") or 0 ) except Exception: db_buy_price = 0.0 if db_buy_price > 0: buy_price = db_buy_price if existing: if buy_price <= 0: # 매입가를 신뢰할 수 없으면 기존 매입가를 보존하고 수량/이름만 갱신 logger.warning( f"⚠️ [잔고동기화] {name} ({code}) 매입가 복원 실패 → 기존 매입가 유지" ) existing["qty"] = qty if name: existing["name"] = name continue # 수량/매입가 API·DB 기준으로 갱신 (폰에서 추가 매수/일부 매도 반영) existing["qty"] = qty existing["buy_price"] = buy_price if name: existing["name"] = name continue # API에만 있는 종목(폰 등 외부 매수) → holdings에 추가, 기본 손절/목표가 적용 if buy_price <= 0: logger.warning( f"⚠️ [잔고동기화] {name} ({code}) 매입가 0/없음 → holdings 추가 스킵" ) continue stop_price = buy_price * (1 + self.stop_loss_pct) if buy_price > 0 else 0 target_price = buy_price * (1 + self.take_profit_pct) if buy_price > 0 else 0 self.holdings[code] = { "buy_price": buy_price, "qty": qty, "buy_time": buy_time, "name": name, "max_price": buy_price, "atr_entry": buy_price * 0.01 if buy_price > 0 else 0, "stop_price": stop_price, "target_price": target_price, } self.db.upsert_trade({ "code": code, "name": name, "strategy": "SHORT_ANT_SHAKING", "avg_buy_price": buy_price, "current_price": buy_price, "target_qty": qty, "current_qty": qty, "status": "HOLDING", "buy_date": buy_time, "stop_price": stop_price, "target_price": target_price, "atr_entry": self.holdings[code]["atr_entry"], }) logger.info(f"📲 [잔고동기화] 외부 매수 반영: {name} ({code}) {qty}주 @ {buy_price:,.0f}원") # API에 없는 종목 제거 (다른 경로에서 매도된 경우). output1이 리스트일 때만 적용 if isinstance(output1, list): for code in list(self.holdings.keys()): if code not in api_codes: name = self.holdings[code].get("name", code) del self.holdings[code] try: self.db.close_trade(code=code, sell_price=0, sell_reason="잔고동기화(외부매도)", strategy="SHORT_ANT_SHAKING") except Exception as e: logger.debug(f"잔고동기화 close_trade 스킵 {code}: {e}") logger.info(f"📲 [잔고동기화] 보유 제거: {name} ({code}) - 계좌에 없음") except Exception as e: logger.warning(f"잔고 동기화 예외: {e}") def _update_assets(self, balance=None): """자산 정보 업데이트 (잔고 동기화 포함). balance 생략 시 API 호출.""" try: if balance is None: balance = self.client.get_account_balance() if balance is None: logger.warning( "💵 [예수금] get_account_balance가 None 반환 → 예수금 갱신 스킵 " "(토큰·계좌·TR ID 확인. 모의=VTTC8434R, 실전=TTTC8434R)" ) return # 폰/타 앱 매매 반영: 잔고 API 기준으로 holdings 동기화 self._sync_holdings_from_balance(balance) # 한투 API: output1=주식 잔고(종목별), output2=예수금 관련(dnca_tot_amt 등) - 블로그·문서 기준 def _parse_amt(v): if v is None or str(v).strip() == "": return None return float(str(v).replace(",", "").strip()) def _cash_block(obj): if not obj: return {} if isinstance(obj, list) and obj: return obj[0] if isinstance(obj, dict): return obj return {} out2 = _cash_block(balance.get("output2")) if isinstance(balance.get("output1"), dict): out1 = balance.get("output1", {}) else: out1 = {} ord_psbl_val = _parse_amt(out2.get("ord_psbl_cash") or out1.get("ord_psbl_cash")) dnca_tot_val = _parse_amt(out2.get("dnca_tot_amt") or out1.get("dnca_tot_amt")) or 0 # 거래가능 = D+2 예수금 (prvs_rcdl_excc_amt). 없으면 예수금총액 fallback prvs_rcdl = _parse_amt(out2.get("prvs_rcdl_excc_amt")) if prvs_rcdl is not None: self.d2_excc_amt = prvs_rcdl else: self.d2_excc_amt = 0 if prvs_rcdl is not None and prvs_rcdl > 0: self.current_cash = prvs_rcdl logger.info(f"💵 [예수금] 거래가능=D+2예수금={self.current_cash:,.0f}원") else: self.current_cash = dnca_tot_val logger.info(f"💵 [예수금] 거래가능=예수금총액={self.current_cash:,.0f}원 (D+2 없음)") # 보유 종목 평가액 계산 holdings_value = 0 for code, holding in self.holdings.items(): price_data = self.client.inquire_price(code) if price_data: current_price = abs(float(price_data.get("stck_prpr", 0))) holdings_value += current_price * holding["qty"] self.current_total_asset = self.current_cash + holdings_value if self.start_day_asset == 0: self.start_day_asset = self.current_total_asset except Exception as e: logger.error(f"자산 정보 업데이트 실패: {e}") def _update_account_light(self, profit_val=0): """ 경량 계좌 갱신 (매수/매도 직후 즉시 호출!) - API 부하를 줄이기 위해 예수금 + 보유 종목 평가액만 빠르게 계산 - 총자산 = 예수금 + 보유 종목 평가액 (+ 손익 반영) """ try: balance = self.client.get_account_balance() if balance is None: logger.warning("💵 [예수금-경량] get_account_balance None → 예수금 갱신 스킵") return False def _parse_amt_light(v): if v is None or str(v).strip() == "": return None return float(str(v).replace(",", "").strip()) def _cash_block_light(obj): if not obj: return {} if isinstance(obj, list) and obj: return obj[0] if isinstance(obj, dict): return obj return {} out2 = _cash_block_light(balance.get("output2")) if isinstance(balance.get("output1"), dict): out1 = balance.get("output1", {}) else: out1 = {} ord_psbl_val = _parse_amt_light(out2.get("ord_psbl_cash") or out1.get("ord_psbl_cash")) dnca_tot_val = _parse_amt_light(out2.get("dnca_tot_amt") or out1.get("dnca_tot_amt")) or 0 prvs_rcdl = _parse_amt_light(out2.get("prvs_rcdl_excc_amt")) if prvs_rcdl is not None: self.d2_excc_amt = prvs_rcdl if prvs_rcdl is not None and prvs_rcdl > 0: new_cash = prvs_rcdl else: new_cash = dnca_tot_val logger.info( f"💵 [예수금-경량] 거래가능(D+2)={new_cash:,.0f}원 (이전={self.current_cash:,.0f})" ) if new_cash > 0 or self.current_cash == 0: self.current_cash = new_cash # 보유 종목 평가액: output1=주식 잔고(종목별), output2=예수금 요약 (블로그 기준) output1_list = balance.get("output1", []) if isinstance(output1_list, dict): output1_list = [output1_list] holdings_value = 0 for code, holding in self.holdings.items(): for item in output1_list: if (item.get("pdno") or "").strip() == code: evlu_amt = float(item.get("evlu_amt", 0)) holdings_value += evlu_amt break else: price_data = self.client.inquire_price(code) if price_data: current_price = abs(float(price_data.get("stck_prpr", 0))) holdings_value += current_price * holding["qty"] self.current_total_asset = self.current_cash + holdings_value if profit_val != 0: self.current_total_asset += profit_val logger.debug(f"💵 [경량갱신] 예수금: {self.current_cash:,.0f}원 | 총자산: {self.current_total_asset:,.0f}원") return True except Exception as e: logger.error(f"❌ 경량 갱신 실패: {e}") return False def _update_cash_only(self): """예수금만 빠르게 업데이트 (하위 호환성용, _update_account_light 사용 권장)""" return self._update_account_light(profit_val=0) def send_mm(self, msg): """Mattermost 알림 전송. 성공 시 True, 실패 시 False.""" try: return self.mm.send(self.mm_channel, msg) except Exception as e: logger.error(f"❌ MM 전송 에러: {e}") return False def check_market_status(self): """장 운영 시간 체크""" # FORCE_MARKET_OPEN 플래그 확인 (테스트용) force_open = get_env_bool("FORCE_MARKET_OPEN", False) if force_open: logger.debug("🔓 FORCE_MARKET_OPEN=true - 장 상태 무시하고 계속 진행") return True # 정상 장 운영 시간 체크 now = dt.now() if not (datetime.time(8, 30) <= now.time() <= datetime.time(16, 0)): return False if now.weekday() >= 5: # 주말 return False return True def send_morning_report(self): """오전 장 뜸할 때 리포트 (13:00)""" if self.morning_report_sent: return self._update_assets() day_pnl = self.current_total_asset - self.start_day_asset if self.start_day_asset > 0: day_pnl_pct = day_pnl / self.start_day_asset * 100 else: day_pnl_pct = 0 msg = f"""📊 **[오전 장 현황 - 13:00]** - 당일 시작: {self.start_day_asset:,.0f}원 - 현재 자산: {self.current_total_asset:,.0f}원 - 당일 손익: {day_pnl:+,.0f}원 ({day_pnl_pct:+.2f}%) - 보유 종목: {len(self.holdings)}개""" self.send_mm(msg) self.morning_report_sent = True logger.info("📊 오전 리포트 전송 완료") def send_closing_report(self): """장마감 전 리포트 (15:15)""" if self.closing_report_sent: return self._update_assets() day_pnl = self.current_total_asset - self.start_day_asset if self.start_day_asset > 0: day_pnl_pct = day_pnl / self.start_day_asset * 100 else: day_pnl_pct = 0 msg = f"""📈 **[장마감 전 현황 - 15:15]** - 당일 손익: {day_pnl:+,.0f}원 ({day_pnl_pct:+.2f}%) - 현재 자산: {self.current_total_asset:,.0f}원 - 보유 종목: {len(self.holdings)}개 - 예수금(주문가능): {self.current_cash:,.0f}원 | D+2예수금: {self.d2_excc_amt:,.0f}원""" self.send_mm(msg) self.closing_report_sent = True logger.info("📈 장마감 전 리포트 전송 완료") def send_final_report(self): """장마감 후 최종 리포트 (15:35)""" if self.final_report_sent: return self._update_assets() # 당일 손익 day_pnl = self.current_total_asset - self.start_day_asset if self.start_day_asset > 0: day_pnl_pct = day_pnl / self.start_day_asset * 100 else: day_pnl_pct = 0 # 누적 손익 cumulative_pnl = self.current_total_asset - self.total_deposit if self.total_deposit > 0: cumulative_pnl_pct = cumulative_pnl / self.total_deposit * 100 else: cumulative_pnl_pct = 0 # 오늘 거래 내역 today_trades = self.db.get_trades_by_date(self.today_date) msg = f"""🏁 **[장마감 최종 보고 - 15:35]** ━━━━━━━━━━━━━━━━━━━━ 📅 **당일 손익** - 시작: {self.start_day_asset:,.0f}원 - 종료: {self.current_total_asset:,.0f}원 - 손익: {day_pnl:+,.0f}원 ({day_pnl_pct:+.2f}%) 💰 **누적 손익 (총 입금액 대비)** - 총 입금: {self.total_deposit:,.0f}원 - 현재 자산: {self.current_total_asset:,.0f}원 - 누적 손익: {cumulative_pnl:+,.0f}원 ({cumulative_pnl_pct:+.2f}%) 📊 **거래 현황** - 오늘 매매: {len(today_trades)}건 - 보유 종목: {len(self.holdings)}개 - 예수금(주문가능): {self.current_cash:,.0f}원 | D+2예수금: {self.d2_excc_amt:,.0f}원 ━━━━━━━━━━━━━━━━━━━━""" self.send_mm(msg) self.final_report_sent = True logger.info("🏁 장마감 최종 리포트 전송 완료") def _get_env_numeric_snapshot(self): """DB 최신 env에서 계좌/키/토큰/URL 제외한 수치·설정만 반환 (키=값 줄 단위).""" EXCLUDE = { "MM_SERVER_URL", "MM_BOT_TOKEN_", "MATTERMOST_CHANNEL", "GEMINI_API_KEY", "KIS_APP_KEY_REAL", "KIS_APP_SECRET_REAL", "KIS_APP_KEY_MOCK", "KIS_APP_SECRET_MOCK", "KIS_ACCOUNT_NO_REAL", "KIS_ACCOUNT_CODE_REAL", "KIS_ACCOUNT_NO_MOCK", "KIS_ACCOUNT_CODE_MOCK", "KIS_SHORT_MM_CHANNEL", "KIS_LONG_MM_CHANNEL", } latest = self.db.get_latest_env() if not latest or not latest.get("snapshot"): return "" snap = latest["snapshot"] lines = [] for k, v in sorted(snap.items()): if k in EXCLUDE or v is None: continue v = (v or "").strip() if "#" in v: v = v.split("#")[0].strip() if not v: continue lines.append(f"{k}={v}") return "\n".join(lines) def _get_journalctl_recent(self, lines=200, unit=None): """journalctl 최근 N줄. unit 있으면 -u unit 적용 (예: kis_short_ver2.service).""" cmd = ["journalctl", "-n", str(lines), "-o", "short-iso"] if unit: cmd = ["journalctl", "-u", unit, "-n", str(lines), "-o", "short-iso"] try: r = subprocess.run(cmd, capture_output=True, text=True, timeout=10) if r.returncode == 0 and r.stdout: return r.stdout.strip() except Exception as e: logger.warning(f"⚠️ journalctl 조회 실패: {e}") return "" def send_ai_report(self): """AI 분석 리포트 (13:00) - DB 수치만 보여주고, 거래 내역으로 승률 분석·추천(설정수치=값 형식).""" if self.ai_report_sent or not gemini_client: return try: # DB에서 수치 설정만 (계좌/키/토큰 제외) env_lines = self._get_env_numeric_snapshot() # journalctl 최근 로그 (라인 수는 env/DB에서 로드) log_lines = get_env_int("AI_JOURNAL_LINES", 500) try: log_lines_int = int(log_lines) except Exception: log_lines_int = 500 if log_lines_int <= 0: log_lines_int = 500 journal_unit = os.environ.get("JOURNALCTL_UNIT", "kis_short_ver2.service").strip() or None journal_log = self._get_journalctl_recent(lines=log_lines_int, unit=journal_unit) if not journal_log: journal_log = "(journalctl 로그 없음)" # 최근 거래 내역 조회 recent_trades = [] try: conn = self.db.conn cursor = conn.execute(""" SELECT code, name, buy_price, sell_price, qty, profit_rate, realized_pnl, strategy, sell_reason, buy_date, sell_date, hold_minutes FROM trade_history ORDER BY id DESC LIMIT 10 """) for row in cursor.fetchall(): recent_trades.append({ 'code': row[0], 'name': row[1], 'buy_price': row[2], 'sell_price': row[3], 'qty': row[4], 'profit_rate': row[5], 'realized_pnl': row[6], 'strategy': row[7], 'sell_reason': row[8], 'buy_date': row[9], 'sell_date': row[10], 'hold_minutes': row[11] or 0 }) except Exception as e: logger.error(f"거래 내역 조회 실패: {e}") return db_candidates = self.db.get_target_candidates() candidate_count = len(db_candidates) # 거래 내역 없을 때: 수치만 보여주고 유니버스·추천 if not recent_trades: summary = f"""📊 **현재 상태** - 유니버스 후보: {candidate_count}개 - 최근 거래: 없음""" prompt = f"""당신은 퀀트 트레이딩 전문가입니다. **현재 상태** - 유니버스 후보: {candidate_count}개 - 최근 거래: 없음 **현재 DB 설정 수치 (일부만 표시됨, 계좌/키 등 제외)** ``` {env_lines} ``` **봇 최근 로그 (journalctl 최근 {log_lines_int}줄) – 탈락 사유·API 과부하·매수 시도 등 참고** ``` {journal_log[:15000]} ``` **당신의 임무** 1. 위 설정과 후보 수({candidate_count}개)를 보고 문제점 분석 (필터가 너무 까다로운지 등). 2. **추천**: 반드시 아래 형식으로만 한 줄에 하나씩 작성. 이유·주석 붙이지 말 것. 그대로 DB에 복붙해 적용할 수 있어야 함. - 예: MAX_STOCKS=4 - 예: MIN_DROP_RATE=0.025 - 예: MIN_RECOVERY_RATIO_SHORT=0.4 3. 예상 효과 한두 줄. **출력 형식 (반드시 준수)** ## 🔍 문제점 1. [구체적 문제 1] 2. [구체적 문제 2] ## 💡 수치 추천 (한 줄에 하나, 그대로 DB 적용 가능하게) MAX_STOCKS=4 MIN_DROP_RATE=0.025 (필요한 것만, 변수명은 위 설정 목록에 있는 것만 사용) ## 📈 예상 효과 - [효과 1] """ response = gemini_client.models.generate_content(model=GEMINI_MODEL_ID, contents=prompt) analysis = getattr(response, "text", None) or (response.candidates[0].content.parts[0].text if response.candidates else "") # 마지막 AI 추천문 저장 (!적용 시 매터모스트 원격 조종용) _save_ai_recommendations_from_text(self.db, analysis) message = f"""🤖 **[13시 AI 자동 분석 + 수치 추천]** {summary} {analysis} --- 💬 단타 전략 최적화. 추천 줄은 그대로 DB에 적용 가능합니다. """ self.send_mm(message) self.ai_report_sent = True logger.info("🤖 AI 리포트 전송 완료 (거래 내역 없음)") return # 통계 계산 total = len(recent_trades) wins = sum(1 for t in recent_trades if t['profit_rate'] > 0) losses = total - wins win_rate = (wins / total * 100) if total > 0 else 0 avg_profit = sum(t['profit_rate'] for t in recent_trades) / total total_pnl = sum(t['realized_pnl'] for t in recent_trades) avg_hold = sum(t['hold_minutes'] for t in recent_trades) / total trades_text = "" for i, t in enumerate(recent_trades, 1): trades_text += f""" [거래 {i}] {t['name']} ({t['strategy']}) - 매수: {t['buy_price']:,.0f}원 × {t['qty']}주 | 매도: {t['sell_price']:,.0f}원 - 손익: {t['profit_rate']:+.2f}% ({t['realized_pnl']:,.0f}원) | 보유: {t['hold_minutes']}분 - 사유: {t['sell_reason']} """ prompt = f"""당신은 퀀트 트레이딩 전문가입니다. **현재 상태** - 유니버스 후보: {candidate_count}개 - 최근 거래: {total}건 | 승률: {win_rate:.1f}% ({wins}승 {losses}패) - 평균 수익률: {avg_profit:.2f}% | 총 손익: {total_pnl:,.0f}원 | 평균 보유: {avg_hold:.0f}분 **최근 거래 내역** {trades_text} **현재 DB 설정 수치 (계좌/키 등 제외, 수치만)** ``` {env_lines} ``` **봇 최근 로그 (journalctl 최근 {log_lines_int}줄) – 탈락 사유·API 과부하·매수 시도 등 참고** ``` {journal_log[:15000]} ``` **당신의 임무** 1. **승률이 왜 떨어졌는지** 거래 내역과 설정 수치를 보고 구체적으로 3가지 진단 (손절/진입/보유 등). 2. **추천**: 반드시 "설정수치=값" 한 줄에 하나만. 이유·주석 붙이지 말 것. 그대로 DB에 복붙해 적용할 수 있어야 함. - 예: MAX_STOCKS=4 - 예: MIN_DROP_RATE=0.025 - 예: MIN_RECOVERY_RATIO_SHORT=0.4 변수명은 위 설정 목록에 있는 것만 사용. 3. 예상 효과 한두 줄. **출력 형식 (반드시 준수)** ## 🔍 문제점 (승률 하락 원인) 1. [구체적 문제 1] 2. [구체적 문제 2] 3. [구체적 문제 3] ## 💡 수치 추천 (한 줄에 하나, 그대로 DB 적용) MAX_STOCKS=4 MIN_DROP_RATE=0.025 (필요한 것만) ## 📈 예상 효과 - [효과 1] """ response = gemini_client.models.generate_content(model=GEMINI_MODEL_ID, contents=prompt) analysis = getattr(response, "text", None) or (response.candidates[0].content.parts[0].text if response.candidates else "") # 마지막 AI 추천문 저장 (!적용 시 매터모스트 원격 조종용) _save_ai_recommendations_from_text(self.db, analysis) summary = f"""📊 **현재 상태** - 유니버스 후보: {candidate_count}개 - 최근 거래: {total}건 | 승률: {win_rate:.1f}% ({wins}승 {losses}패) - 평균 수익률: {avg_profit:.2f}% | 총 손익: {total_pnl:+,.0f}원 | 평균 보유: {avg_hold:.0f}분""" message = f"""🤖 **[13시 AI 자동 분석 + 수치 추천]** {summary} {analysis} --- 💬 단타 전략 최적화. 추천 줄은 그대로 DB에 적용 가능합니다. """ self.send_mm(message) self.ai_report_sent = True logger.info("🤖 AI 리포트 전송 완료") except Exception as e: logger.error(f"AI 리포트 생성 실패: {e}") def _fetch_scan_universe_from_api(self, max_codes=500): """ KIS API로 스캔 대상 종목 리스트 조회 (6소스 각 100건 → 최대 500개). - 거래량·거래대금·회전율·등락률(상승)·등락률(하락)·거래증가율 각 100건 합산 후 중복 제거. - 리스트는 DB에 저장하지 않음. 스캔 끝난 뒤 후보만 DB에 한 번에 인서트. Returns: list[dict]: [{"code": "006자리", "name": "종목명"}, ...] (중복 제거, 최대 max_codes개) """ def _code_from_item(item): code = (item.get("stk_cd") or item.get("mksc_shrn_iscd") or item.get("code") or "").strip() return code if code and len(code) == 6 else None def _name_from_item(item): return ( (item.get("stk_nm") or item.get("prst_name") or item.get("hts_kor_isnm") or "").strip() or "" ) scan_list = [] seen = set() # 1) 거래량순위 100개 (키움은 거래대금+회전율만 사용, KIS는 거래량+거래대금+회전율 3가지로 풀 확대) try: time.sleep(random.uniform(0.5, 1.0)) vol_list = self.client.get_volume_rank(market="J", limit=100) for item in (vol_list or []): c = _code_from_item(item) if c and c not in seen: seen.add(c) scan_list.append({"code": c, "name": _name_from_item(item) or ""}) logger.info(f" 📡 [스캔유니버스] 거래량순위 API → {len(vol_list)}건 수신, 누적 {len(scan_list)}종목") except Exception as e: logger.warning(f" ⚠️ [스캔유니버스] 거래량순위 조회 실패: {e}") # 2) 거래대금순위 100개 (키움과 동일 소스) try: time.sleep(random.uniform(0.5, 1.0)) val_list = self.client.get_trading_value_rank(market="J", limit=100) for item in (val_list or []): c = _code_from_item(item) if c and c not in seen: seen.add(c) scan_list.append({"code": c, "name": _name_from_item(item) or ""}) logger.info(f" 📡 [스캔유니버스] 거래대금순위 API → {len(val_list)}건 수신, 누적 {len(scan_list)}종목") except Exception as e: logger.warning(f" ⚠️ [스캔유니버스] 거래대금순위 조회 실패: {e}") # 3) 회전율순위 100개 (키움 개미털기 2번째 소스와 동일) try: time.sleep(random.uniform(0.5, 1.0)) turn_list = self.client.get_turnover_rank(market="J", limit=100) for item in (turn_list or []): c = _code_from_item(item) if c and c not in seen: seen.add(c) scan_list.append({"code": c, "name": _name_from_item(item) or ""}) logger.info(f" 📡 [스캔유니버스] 회전율순위 API → {len(turn_list)}건 수신, 누적 {len(scan_list)}종목") except Exception as e: logger.warning(f" ⚠️ [스캔유니버스] 회전율순위 조회 실패: {e}") # 4) 등락률순위(상승) 100개 (기존 후보군 풀 확대) try: time.sleep(random.uniform(0.5, 1.0)) chg_list = self.client.get_price_change_rank(market="J", sort_type="1", limit=100) for item in (chg_list or []): c = _code_from_item(item) if c and c not in seen: seen.add(c) scan_list.append({"code": c, "name": _name_from_item(item) or ""}) logger.info(f" 📡 [스캔유니버스] 등락률순위(상승) API → {len(chg_list)}건 수신, 누적 {len(scan_list)}종목") except Exception as e: logger.warning(f" ⚠️ [스캔유니버스] 등락률순위(상승) 조회 실패: {e}") # 4-2) 등락률순위(하락) 100개 — 낙폭 큰 종목 직접 조회 → N자 망치봉 스캔 효율·Pass-낙폭 포착 try: time.sleep(random.uniform(0.5, 1.0)) decline_list = self.client.get_price_decline_rank(market="J", limit=100) for item in (decline_list or []): c = _code_from_item(item) if c and c not in seen: seen.add(c) scan_list.append({"code": c, "name": _name_from_item(item) or ""}) logger.info(f" 📡 [스캔유니버스] 등락률순위(하락) API → {len(decline_list)}건 수신, 누적 {len(scan_list)}종목") except Exception as e: logger.warning(f" ⚠️ [스캔유니버스] 등락률순위(하락) 조회 실패: {e}") # 5) 거래증가율순위 100개 (거래량/대금/회전율과 다른 풀 → 후보 다양화) try: time.sleep(random.uniform(0.5, 1.0)) growth_list = self.client.get_volume_growth_rank(market="J", limit=100) for item in (growth_list or []): c = _code_from_item(item) if c and c not in seen: seen.add(c) scan_list.append({"code": c, "name": _name_from_item(item) or ""}) logger.info(f" 📡 [스캔유니버스] 거래증가율순위 API → {len(growth_list)}건 수신, 누적 {len(scan_list)}종목 (6소스 합산 → 개미털기 필터)") except Exception as e: logger.warning(f" ⚠️ [스캔유니버스] 거래증가율순위 조회 실패: {e}") if not scan_list: logger.warning(" ⚠️ [스캔유니버스] API에서 0건 수신 → 스캔 불가 (권한/계정/시간 확인)") return [] scan_list = scan_list[:max_codes] # 종목명 비어 있으면 시세 배치로 채우기 (KIS volume-rank는 종목명 미제공 시 많음) need_name_codes = [x["code"] for x in scan_list[:20] if not (x.get("name") or "").strip()] if need_name_codes: try: time.sleep(random.uniform(0.2, 0.4)) batch = self.client.inquire_prices_batch(need_name_codes[:20]) name_map = {} _market_names = {"KOSPI", "KOSDAQ", "ETF", "KOSPI200", "KSQ150"} for code, out in (batch or {}).items(): n = (out.get("stck_kor_isnm") or out.get("rprs_mrkt_kor_name") or "").strip() if n and n not in _market_names: name_map[code] = n for x in scan_list: if not (x.get("name") or "").strip() and x["code"] in name_map: x["name"] = name_map[x["code"]] except Exception as e: logger.debug(f" 스캔 종목명 배치 조회 스킵: {e}") for x in scan_list: if not (x.get("name") or "").strip(): x["name"] = x["code"] logger.info( f" 📋 스캔 대상: {len(scan_list)}개 종목 (거래량·거래대금·회전율·등락률상승·등락률하락·거래증가율 각 100건 합산 → 개미털기 필터)" ) if scan_list: part = ", ".join(f"{x['code']} {x.get('name') or x['code']}" for x in scan_list[:15]) logger.info(f" 📋 스캔 대상(일부): {part}{' ...' if len(scan_list) > 15 else ''}") return scan_list def scan_ant_shaking_candidates(self, max_candidates=20): """ 개미털기(눌림목) 후보 종목 스캔 - kiwoom_trader_dual 방식: 유니버스 리스트만 빠르게 채움. - 스캔에서는 종목별 API 호출·낙폭/회복 필터·탈락 로그 없음. - 낙폭/회복/3분봉/RSI/피뢰침/ML 전부 메인 루프의 check_buy_signal_tail_catch에서만 체크. """ logger.info("🐜 [개미털기] 스캔 시작 (유니버스 리스트만 등록 → 매수 시 한곳에서 전 조건 체크)") top_n_light = get_env_int("CANDIDATE_LIST_TOP_N_LIGHT", 20) max_codes = get_env_int("SCAN_UNIVERSE_MAX_CODES", 150) scan_list = self._fetch_scan_universe_from_api(max_codes=max_codes) if not scan_list: logger.warning(" ⚠️ [개미털기] 스캔 대상 0개 → API에서 리스트를 받지 못함") return [] n_save = min(top_n_light, len(scan_list)) result = [] for i, x in enumerate(scan_list[:n_save]): name = (x.get("name") or x["code"]).strip() or x["code"] # DB 저장은 update_universe()에서 강도 계산·정렬 후 강도순 상위 N명만 함 (여기서는 리스트만 반환) result.append({ "code": x["code"], "name": name, "price": 0, "score": 0, "drop_rate": 0, "recovery": 0, }) part = ", ".join(f"{x.get('name') or x['code']}({x['code']})" for x in scan_list[:5]) logger.info(f" ✅ [개미털기] 후보 {n_save}명 수집 (강도·정렬·DB 반영은 유니버스 업데이트에서 일괄 처리) | 일부: {part}{' ...' if n_save > 5 else ''}") return result def calculate_atr(self, df, period=14): """ ATR (Average True Range) 계산 - 변동성 지표 - TR(True Range) = max(고가-저가, |고가-전일종가|, |저가-전일종가|) - ATR = TR의 14일 이동평균 """ try: if df is None or len(df) < period: return 0 df = df.copy() # True Range 계산 high_low = df['high'] - df['low'] high_close = (df['high'] - df['close'].shift()).abs() low_close = (df['low'] - df['close'].shift()).abs() df['tr'] = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1) # ATR = TR의 14일 이동평균 atr = df['tr'].rolling(window=period).mean().iloc[-1] return float(atr) if not pd.isna(atr) and atr > 0 else 0 except Exception as e: logger.debug(f"ATR 계산 실패: {e}") return 0 def _df_to_engine_candles(self, df) -> List[Dict]: """DataFrame → tail_engine 형식 (candle_time YYYYMMDDHHMI, open, high, low, close, volume).""" candles = [] for idx in range(len(df)): row = df.iloc[idx] t = row.get("time") or row.get("candle_time") or "" if hasattr(t, "strftime"): t = t.strftime("%Y%m%d%H%M") if isinstance(t, str) and len(t) >= 12: t = t.replace("-", "").replace(" ", "").replace(":", "")[:12] candles.append({ "candle_time": t or f"{dt.now().strftime('%Y%m%d')}0930", "open": float(row.get("open", 0)), "high": float(row.get("high", 0)), "low": float(row.get("low", 0)), "close": float(row.get("close", 0)), "volume": float(row.get("volume", 0)), }) return candles def _force_buy_tail(self, code: str, name: str): """FORCE_BUY_TEST 시 현재가만 시그널.""" try: price_data = self.client.inquire_price(code) current_price = abs(float(str(price_data.get("stck_prpr", 0)).replace(",", ""))) if price_data else 0 if current_price <= 0: return None return {"code": code, "name": name, "price": current_price, "score": 5.0, "entry_features": {}} except Exception: return None def check_buy_signal_tail_catch(self, code: str, name: str): """tail_engine.check_buy_signal_live 사용 (확정봉만). V2 전용 필터(시장/테마/MA20/ML 등)만 추가.""" try: if get_env_bool("FORCE_BUY_TEST", False): return self._force_buy_tail(code, name) if get_env_bool("USE_MARKET_REGIME_FILTER", False): min_rsi = get_env_float("MARKET_REGIME_MIN_RSI", 48.0) regime = self.db.get_market_regime(tf=60) if not regime.get("is_bull") or regime.get("avg_rsi", 50) < min_rsi: logger.debug( "[시장필터] %s %s: 하락장 차단 (ETF RSI=%.1f < %.1f)", code, name, regime.get("avg_rsi", 0), min_rsi, ) return None if get_env_bool("USE_THEME_HEAT_FILTER", False): heat_max = get_env_float("THEME_HEAT_RSI_MAX", 72.0) meta = self.db.get_stock_meta(code) if meta and meta.get("theme"): momentum = self.db.get_theme_momentum(meta["theme"], tf=60) if momentum.get("count", 0) >= 3 and momentum.get("avg_rsi3", 0) > heat_max: logger.debug( "[테마필터] %s %s: 테마(%s) 과열 차단 (RSI=%.1f > %.1f)", code, name, meta["theme"], momentum["avg_rsi3"], heat_max, ) return None min_candle_len = get_env_int("MIN_CANDLE_LEN_TAIL", 14) min_price_tail = get_env_float("MIN_PRICE_TAIL", 1000.0) candles = None df = None # 확정봉만 사용 (백테스트와 동일: 신호봉 확정 후 다음 봉 시가 진입) if self.candle_agg: raw = self.candle_agg.get_candles(code, 3, 50) # 이미 확정봉만 반환 if raw and len(raw) >= 10: candles = [{"candle_time": c.get("candle_time", ""), "open": float(c.get("open", 0)), "high": float(c.get("high", 0)), "low": float(c.get("low", 0)), "close": float(c.get("close", 0)), "volume": float(c.get("volume", 0))} for c in raw] if not candles and self.db: ws = self.db.get_ws_candles(code, 3, limit=50, confirmed_only=True) if ws and len(ws) >= 10: candles = [{"candle_time": str(c.get("candle_time", "")), "open": float(c.get("open", 0)), "high": float(c.get("high", 0)), "low": float(c.get("low", 0)), "close": float(c.get("close", 0)), "volume": float(c.get("volume", 0))} for c in ws] if not candles: df = self._get_candles_df(code, tf=3, n=20) if df is None or df.empty: logger.debug("📡 [%s] WS봉 없음 → REST 3분봉 fallback", code) df = self.client.get_minute_chart(code, period="3", limit=20) if df is None or df.empty or len(df) < min_candle_len: logger.info(f"{LOG_YELLOW}🔍 [탈락-3분봉] {name} {code}: 봉수 부족 (len={len(df) if df is not None and not df.empty else 0}, 기준 {min_candle_len}){LOG_RESET}") return None if "close" not in df.columns or "high" not in df.columns or "low" not in df.columns or "open" not in df.columns: logger.info(f"{LOG_YELLOW}🔍 [탈락-3분봉] {name} {code}: OHLC 컬럼 없음{LOG_RESET}") return None # 진행봉(미확정) 제외: 마지막 행 제거 후 사용 → 확정봉만으로 신호 판단 if len(df) > 1: df = df.iloc[:-1].copy() if len(df) < min_candle_len: logger.info(f"{LOG_YELLOW}🔍 [탈락-3분봉] {name} {code}: 봉수 부족 (len={len(df)}, 기준 {min_candle_len}){LOG_RESET}") return None candles = self._df_to_engine_candles(df) if len(candles) < 10: return None current_price = float(candles[-1]["close"]) if current_price <= 0 or current_price < min_price_tail: logger.info(f"{LOG_YELLOW}🔍 [탈락-가격] {name} {code}: 시가/현재가 부적절 (현재 {current_price:,.0f}원, 최소 {min_price_tail:,.0f}){LOG_RESET}") return None today_yyyymmdd = dt.now().strftime("%Y%m%d") last_exit_dt = None if code in self.recently_sold: try: last_exit_dt = dt.fromtimestamp(self.recently_sold[code]) if last_exit_dt.strftime("%Y%m%d") != today_yyyymmdd: last_exit_dt = None except Exception: pass try: today_trades = self.db.get_trades_by_date(self.today_date) daily_cnt = len([t for t in today_trades if t.get("code") == code and str(t.get("strategy", "")).startswith("SHORT_")]) except Exception: daily_cnt = 0 state = {"last_exit_dt": last_exit_dt, "daily_cnt": daily_cnt} _d = te.get_tail_defaults_from_db(self.db) params = { **_d, "min_drop_rate": self.min_drop_rate, "min_recovery_ratio": self.min_recovery_ratio, "tail_ratio_min": get_env_float("TAIL_RATIO_MIN", 1.5), "tail_pct_min": get_env_float("TAIL_PCT_MIN", 0.003), "sl_pct": abs(self.stop_loss_pct) if self.stop_loss_pct < 0 else 0.03, "tp_pct": self.take_profit_pct, } reject_reason, reject_msg, sig = te.check_buy_signal_live(candles, params, state) if reject_reason: logger.info(f"{LOG_YELLOW}🔍 [{reject_reason}] {name} {code}: {reject_msg}{LOG_RESET}") return None if sig is None: return None # 엔진 통과 → 통과 로그 (백테스트와 동일 로직) logger.info(f"{LOG_GREEN}🔍 [통과-낙폭·회복] {name} {code} → 꼬리/피뢰침/RSI 체크{LOG_RESET}") # V2 전용: 피뢰침 급등주, MA20, ML (엔진에는 없음) — df 없으면 캔들 리스트에서 계산 if df is not None and not df.empty: day_open = float(df["open"].iloc[0]) day_high = float(df["high"].max()) else: day_open = float(candles[0]["open"]) day_high = max(float(c["high"]) for c in candles) try: if df is not None and not df.empty: lows = df["low"] valid = lows[lows > 0] day_low = float(valid.min()) if not valid.empty else float(df["low"].min()) else: day_low = min(float(c["low"]) for c in candles if float(c.get("low", 0)) > 0) if candles else 0 if day_low <= 0: day_low = float(candles[0]["low"]) if candles else 0 except Exception: day_low = float(candles[0]["low"]) if candles else 0 if self.ws_cache and self.ws_cache.is_active: w = self.ws_cache.get_price(code) if w: day_open = abs(float(str(w.get("stck_oprc", day_open)).replace(",", ""))) or day_open day_high = abs(float(str(w.get("stck_hgpr", day_high)).replace(",", ""))) or day_high day_low = abs(float(str(w.get("stck_lwpr", day_low)).replace(",", ""))) or day_low current_price = abs(float(str(w.get("stck_prpr", current_price)).replace(",", ""))) or current_price high_chase_threshold = get_env_float("HIGH_PRICE_CHASE_THRESHOLD", 0.96) if current_price >= day_high * high_chase_threshold: logger.info(f"{LOG_YELLOW}🔍 [탈락-피뢰침 고점추격] {name} {code}: 현재가 {current_price:,.0f} ≥ 고점대비 {high_chase_threshold*100:.0f}%{LOG_RESET}") return None if day_low > 0: range_change_pct = (day_high - day_low) / day_low * 100 else: range_change_pct = 0 max_daily_change = get_env_float("MAX_DAILY_CHANGE_PCT", 20.0) if range_change_pct > max_daily_change: logger.info(f"{LOG_YELLOW}🔍 [탈락-피뢰침 급등주] {name} {code}: 일일 변동폭 {range_change_pct:.1f}% > {max_daily_change:.0f}%{LOG_RESET}") return None if df is not None and not df.empty and "MA20" in df.columns and len(df) >= 20: ma20 = float(df["MA20"].iloc[-1]) if current_price < ma20: logger.info(f"{LOG_YELLOW}🔍 [탈락-MA20] {name} {code}: 현재가 {current_price:,.0f} < MA20 {ma20:,.0f}{LOG_RESET}") return None ma20_cap_pct = get_env_float("MA20_MAX_ABOVE_PCT", 3.0) if ma20 > 0 and current_price > ma20 * (1 + ma20_cap_pct / 100): logger.info(f"{LOG_YELLOW}🔍 [탈락-MA20초과] {name} {code}: MA20 대비 {ma20_cap_pct:.0f}% 초과{LOG_RESET}") return None tail_ratio = sig.get("tail_ratio", 0) recovery_pos = sig.get("recovery_pos", 0) rsi_val = sig.get("rsi_val", 50.0) tail_pct = sig.get("tail_pct", 0) entry_features = { "rsi": rsi_val, "volume_ratio": None, "tail_length_pct": tail_pct * 100, "ma5_gap_pct": None, "ma20_gap_pct": None, "foreign_net_buy": 0, "institution_net_buy": 0, "market_hour": dt.now().hour, } if self.use_ml_signal and getattr(self, "ml_predictor", None): try: ml_prob = getattr(self, "ml_predictor", None).predict_win_probability(entry_features) ml_min_prob = getattr(self, "ml_min_probability", 0.55) if ml_prob < ml_min_prob: logger.info(f"{LOG_YELLOW}🔍 [탈락-ML] {name} {code}: 승률 {ml_prob:.2%} < {ml_min_prob:.0%}{LOG_RESET}") return None except Exception: pass score_base = get_env_float("TAIL_SCORE_BASE", 5.0) score_ratio_mult = get_env_float("TAIL_SCORE_RATIO_MULT", 2.0) score = score_base + (tail_ratio * score_ratio_mult) logger.info( f"{LOG_CYAN}🎯 [무릎 타점] {name} | 가격:{current_price:,.0f} | " f"꼬리비율:{tail_ratio:.1f}배 | 회복:{recovery_pos*100:.0f}% | RSI:{rsi_val:.1f}{LOG_RESET}" ) return { "code": code, "name": name, "price": current_price, "score": score, "entry_features": entry_features, } except Exception as e: logger.info(f"{LOG_YELLOW}🔍 [탈락-예외] {name} {code}: {e}{LOG_RESET}") return None def check_sell_signals(self): """tail_engine.check_sell_signal_live 사용 (손절/익절/어깨컷/장마감) + 금액손실컷만 V2 유지.""" if not self.holdings: return [] sell_signals = [] min_hold_sec = get_env_float("MIN_HOLD_AFTER_BUY_SEC", 30.0) now = dt.now() is_eod = (now.hour == 15 and now.minute >= 25) or now.hour > 15 _params = te.get_tail_defaults_from_db(self.db) _params["sl_pct"] = abs(self.stop_loss_pct) if self.stop_loss_pct < 0 else 0.03 _params["tp_pct"] = self.take_profit_pct for code, holding in list(self.holdings.items()): try: name = holding.get("name", code) buy_price = holding["buy_price"] buy_time_str = holding.get("buy_time", "") qty = holding["qty"] if buy_time_str and min_hold_sec > 0: try: buy_time = dt.strptime(buy_time_str, "%Y-%m-%d %H:%M:%S") if (now - buy_time).total_seconds() < min_hold_sec: continue except Exception: pass price_data = None if self.ws_cache and self.ws_cache.is_active: price_data = self.ws_cache.get_price(code) if not price_data: price_data = self.client.inquire_price(code) if not price_data: continue current_price = abs(float(price_data.get("stck_prpr", 0))) if current_price == 0: continue max_price = holding.get("max_price", buy_price) if current_price > max_price: max_price = current_price self.holdings[code]["max_price"] = max_price profit_pct = (current_price - buy_price) / buy_price if buy_price > 0 else 0 profit_val = (current_price - buy_price) * qty # V2 전용: 금액손실컷 if profit_val <= -get_env_int("MAX_LOSS_PER_TRADE_KRW", 200000): sell_signals.append({ "code": code, "name": name, "current_price": current_price, "price": current_price, "qty": qty, "buy_price": buy_price, "profit_pct": profit_pct, "reason": f"금액손실컷 ({profit_val:,.0f}원)", }) continue position = { "entry_price": buy_price, "entry_time": holding.get("buy_time", ""), "stop": holding.get("stop_price", buy_price * (1 + self.stop_loss_pct)), "target": holding.get("target_price", buy_price * (1 + self.take_profit_pct)), "max_price": max_price, } current_candle = {"high": max_price, "low": current_price, "close": current_price} res = te.check_sell_signal_live(position, current_candle, _params, is_eod=is_eod) if res: reason, _ = res sell_signals.append({ "code": code, "name": name, "current_price": current_price, "price": current_price, "qty": qty, "buy_price": buy_price, "profit_pct": profit_pct, "reason": reason, }) except Exception as e: logger.error("매도 신호 체크 오류(%s): %s", code, e) return sell_signals def execute_buy(self, signal): """ 매수 실행 - 키움 봇과 동일하게 대형주/소형주 비율 맞추는 로직 포함 - 대형주: 기본 금액 100% / 중형주: 85% / 소형주: 70% - 켈리 기반 매수 금액 + 종목당 최대 15% 제한 """ code = signal["code"] name = signal["name"] price = signal["price"] # 이미 보유 중이면 스킵 if code in self.holdings: logger.warning(f"⚠️ [{name}] 이미 보유 중 -> 매수 스킵") return False # 최대 보유 종목 수 체크 if len(self.holdings) >= self.max_stocks: logger.warning(f"⚠️ 최대 보유 종목 수 도달 ({self.max_stocks}개)") return False # 🔥 매수 직전 예수금 실시간 확인 if not self._update_account_light(profit_val=0): logger.warning(f"⚠️ [{name}] 예수금 조회 실패 -> 매수 스킵") return False # ============================================================ # [대/중/소형주 구분] - 일봉 거래대금 평균 (키움 봇과 동일) # ============================================================ size_class = None try: df = self.client.get_daily_chart(code, limit=10) if not df.empty and "volume" in df.columns and "close" in df.columns: trade_values = df["volume"] * df["close"] avg_trade_value = trade_values.mean() large_min = get_env_float("SIZE_CLASS_LARGE_MIN", 5000000000) # 50억 (대형주) mid_min = get_env_float("SIZE_CLASS_MID_MIN", 500000000) # 5억 (중형주) if avg_trade_value >= large_min: size_class = "대" elif avg_trade_value >= mid_min: size_class = "중" else: size_class = "소" logger.info( f"📊 [{name}] 거래대금 평균 {avg_trade_value/1e8:.1f}억원 → {size_class}형주" ) except Exception as e: logger.debug(f"대/중/소형 조회 스킵({code}): {e}") # ============================================================ # [매수 금액] 변동성 역가중 (Volatility Inverse Weighting) # ============================================================ # ATR 계산용 분봉 데이터 — WS 메모리 봉 우선, 없으면 REST fallback df_minute = None try: df_minute = self._get_candles_df(code, tf=3, n=20) if df_minute is None or df_minute.empty: df_minute = self.client.get_minute_chart(code, period="3", limit=20) except Exception as e: logger.debug(f"분봉 조회 실패({code}): {e}") # RiskManager 사용 시: 변동성 역가중으로 매수 금액 계산 if self.risk_mgr is not None: # 켈리 비율 (DB에서 계산, 없으면 None) kelly_fraction = None if self.risk_mgr.use_kelly: try: kelly_fraction = self.db.calculate_half_kelly() except Exception as e: logger.debug(f"켈리 비율 계산 스킵: {e}") # 변동성 역가중 매수 금액 계산 amount = self.risk_mgr.get_position_size( stock_name=name, current_balance=self.current_cash, df=df_minute, # ATR 계산용 분봉 데이터 kelly_fraction=kelly_fraction, size_class=size_class, # 대/중/소형 구분 ) if amount <= 0: logger.warning(f"⚠️ [{name}] RiskManager 계산 금액 0원 -> 매수 스킵") return False # 수량 계산 (수수료 고려) qty = self.risk_mgr.calculate_quantity(price, amount) else: # 폴백: 기존 고정 슬롯 방식 (RiskManager 미사용 시, 수치: env/DB) slot_default = int(get_env_float("SLOT_MONEY_DEFAULT", 100000.0)) if self.max_stocks > 0: slot_money = int(self.current_cash * 0.9 / self.max_stocks) else: slot_money = slot_default base_amount = slot_money # 슬롯 캡(손절 기반 상한) 적용: MAX_LOSS_PER_TRADE_KRW / |STOP_LOSS_PCT| 와 SLOT_BASE_AMOUNT_CAP 중 작은 값 effective_slot_cap = self._get_effective_slot_cap() if effective_slot_cap > 0: base_amount = min(base_amount, int(effective_slot_cap)) if self.stop_loss_pct != 0: stop_pct_abs = abs(self.stop_loss_pct) else: stop_pct_abs = 0.04 if stop_pct_abs > 0: kelly_risk_amount = self.current_cash * self.risk_pct_per_trade * self.kelly_multiplier kelly_based_amount = int(kelly_risk_amount / stop_pct_abs) base_amount = min(base_amount, kelly_based_amount) small_ratio = get_env_float("SIZE_CLASS_SMALL_RATIO", 0.7) mid_ratio = get_env_float("SIZE_CLASS_MID_RATIO", 0.85) if size_class == "소": amount = int(base_amount * small_ratio) logger.info(f"💰 [{name}] 소형주 → 매수 금액 {small_ratio*100:.0f}%: {amount:,.0f}원") elif size_class == "중": amount = int(base_amount * mid_ratio) logger.info(f"💰 [{name}] 중형주 → 매수 금액 {mid_ratio*100:.0f}%: {amount:,.0f}원") else: amount = base_amount max_limit = int(self.current_cash * self.max_position_pct) if amount > max_limit: logger.info(f"📐 [{name}] 최대 포지션 제한: {amount:,.0f}원 → {max_limit:,.0f}원") amount = max_limit amount = max(amount, self.min_position_amount) qty = int(amount / price) if qty <= 0: logger.warning(f"⚠️ [{name}] 매수 수량 0 (가격: {price:,.0f}원, 금액: {amount:,.0f}원)") return False required_amount = price * qty * 1.05 if self.current_cash < required_amount: logger.warning( f"⚠️ [{name}] 예수금 부족: 필요 {required_amount:,.0f}원 / " f"보유 {self.current_cash:,.0f}원 -> 매수 스킵" ) return False # ATR 계산 (변동성 기반 손절가/목표가 설정용) # df_minute는 위에서 이미 조회했으므로 재사용 atr = 0 stop_price = price * (1 + self.stop_loss_pct) target_price = price * (1 + self.take_profit_pct) if df_minute is not None and not df_minute.empty: try: atr = self.calculate_atr(df_minute) if atr > 0: # ATR 기반 손절가/목표가 설정 stop_price = price - (atr * self.stop_atr_multiplier) target_price = price + (atr * self.target_atr_multiplier) logger.info(f"📊 [{name}] ATR 기반 손절가/목표가: ATR={atr:.0f}원, 손절={stop_price:,.0f}원, 목표={target_price:,.0f}원") except Exception as e: logger.debug(f"ATR 계산 스킵({code}): {e}") atr = price * 0.01 # 기본값 1% else: atr = price * 0.01 # 기본값 1% odno = self.client.buy_market_order(code, qty) if not odno: # 매매불가 종목이면 당일 제외 목록에 넣고 다음 후보로 넘어가기 msg_cd = getattr(self.client, "_last_order_msg_cd", None) or "" msg1 = getattr(self.client, "_last_order_msg1", "") or "" if msg_cd == "40070000" or "매매불가" in msg1: self.untradable_skip_set.add(code) logger.warning(f"🚫 [{name}] 매매불가 종목 -> 당일 매수 제외, 다음 후보로 넘어감") return False # 주문 접수 후 체결 조회 API로 실제 체결가/체결수량 확인 (주문만 보고 가정하지 않음) fill = self.client.get_execution_by_odno(odno, code=code, wait_sec=2) if fill: price = fill["avg_price"] qty = fill["filled_qty"] if qty <= 0: logger.warning(f"⚠️ [{name}] 체결 조회 수량 0 -> 매수 반영 스킵") return False else: # 체결 조회 실패 시 주문 수량/시그널가로 반영 (기존 동작, 로그로 구분) logger.info(f"📋 [{name}] 체결 조회 미확인 -> 주문기준으로 반영 (가격={price:,.0f}, 수량={qty})") self._update_account_light(profit_val=0) buy_time = dt.now().strftime("%Y-%m-%d %H:%M:%S") self.holdings[code] = { "buy_price": price, "qty": qty, "buy_time": buy_time, "name": name, "max_price": price, # 고점 추적 "atr_entry": atr, # 매수 시점 ATR 저장 "stop_price": stop_price, # 손절가 "target_price": target_price, # 목표가 } self.db.upsert_trade({ "code": code, "name": name, "strategy": "SHORT_ANT_SHAKING", "avg_buy_price": price, "current_price": price, "target_qty": qty, "current_qty": qty, "status": "HOLDING", "buy_date": buy_time, "stop_price": stop_price, "target_price": target_price, "atr_entry": atr, "entry_features": signal.get("entry_features"), }) if fill: logger.info(f"💰 [매수 체결] {name} ({code}): {price:,.0f}원 × {qty}주 (API 체결 확인) | 손절={stop_price:,.0f}원, 목표={target_price:,.0f}원") else: logger.info(f"💰 [매수 체결] {name} ({code}): {price:,.0f}원 × {qty}주 (주문기준) | 손절={stop_price:,.0f}원, 목표={target_price:,.0f}원") # 매수 후 WebSocket 구독 등록 → 이후 check_sell_signals에서 REST 없이 실시간 수신 if self.ws_cache and self.ws_cache.is_active: self.ws_cache.subscribe(code) # 신규 보유 종목 즉시 갭보정 (봉 버퍼 미확보 시 ATR 계산 즉시 가능하게) if self.candle_agg: try: lim = get_env_int("SHORT_GAP_FILL_LIMIT", 100) df_gap = self.client.get_minute_chart(code, period="3", limit=lim) if df_gap is not None and not df_gap.empty: self.candle_agg.fill_gap_from_rest(code, 3, df_gap) except Exception as _ge: logger.debug("매수후 갭보정 실패(%s): %s", code, _ge) # 체결 알림 (MM) — API 추가 호출 없이 메모리 값만 사용 try: invest_amt = price * qty cash_after = self.current_cash - invest_amt if self.current_cash >= invest_amt else 0 mm_msg = ( f"🟢 **매수 체결** {name} ({code})\n" f"{price:,.0f}원 × {qty:,}주 | 손절 {stop_price:,.0f}원 / 목표 {target_price:,.0f}원\n" f"예수금(예상) {cash_after:,.0f}원 | 보유 {len(self.holdings)}종목" ) self.send_mm(mm_msg) except Exception as e: logger.debug(f"매수 체결 MM 발송 스킵: {e}") return True def execute_sell(self, signal): """매도 실행""" code = signal["code"] name = signal["name"] qty = signal["qty"] if code not in self.holdings: logger.warning(f"⚠️ [{name}] 보유 종목 아님") return False # ★ 매도 실패 백오프 체크 (영업일 아님·시장 마감 등 일시 오류 반복 방지) backoff_until = self._sell_backoff.get(code, 0) if time.time() < backoff_until: remain_min = (backoff_until - time.time()) / 60 logger.debug("⏸ [%s(%s)] 매도 백오프 중 — %.0f분 후 재시도", name, code, remain_min) return False # 매도 주문 success = self.client.sell_market_order(code, qty) if not success: # 실패 원인 분석 → 영업일·시장마감 오류면 백오프 등록 msg_cd = getattr(self.client, "_last_sell_msg_cd", None) or "" msg1 = getattr(self.client, "_last_sell_msg1", "") or "" # KIS 비영업일·장마감 오류코드 (40100000=모의 영업일 아님, 40200000=실전 장외시간) non_biz_codes = {"40100000", "40200000"} if msg_cd in non_biz_codes or "영업일" in msg1 or "장외" in msg1 or "시장" in msg1: backoff_sec = get_env_int("SELL_FAILURE_BACKOFF_SEC", 1800) self._sell_backoff[code] = time.time() + backoff_sec logger.warning( "⏸ [%s(%s)] 매도 실패('%s') → %d분 후 재시도 (SELL_FAILURE_BACKOFF_SEC=%d)", name, code, msg1, backoff_sec // 60, backoff_sec, ) return False if success: # 현재가 조회 price_data = self.client.inquire_price(code) if price_data: sell_price = abs(float(price_data.get("stck_prpr", 0))) else: sell_price = signal.get("price", 0) # 손익 계산 (매도 후 총자산 반영용) holding = self.holdings.get(code, {}) buy_price = holding.get("buy_price", sell_price) profit_val = (sell_price - buy_price) * qty # 손익 금액 # DB에서 매도 처리 (strategy 지정 → 꼬리잡기봇 row만 삭제, 스캘핑봇 row 보호) self.db.close_trade( code=code, sell_price=sell_price, sell_reason=signal['reason'], strategy="SHORT_ANT_SHAKING", ) del self.holdings[code] # 재진입 쿨다운 기록 (REENTRY_COOLDOWN_SEC 동안 같은 종목 재매수 차단) self.recently_sold[code] = time.time() # 매도 후 WebSocket 구독 해제 → 불필요한 데이터 수신 차단 if self.ws_cache and self.ws_cache.is_active: self.ws_cache.unsubscribe(code) # 🔥 매도 후 예수금 + 총자산 즉시 업데이트 (손익 반영) self._update_account_light(profit_val=profit_val) logger.info(f"💸 [매도 체결] {name} ({code}): {qty}주 ({signal['reason']}, {signal['profit_pct']*100:+.2f}%)") # 체결 알림 (MM) — 매도 직후 _update_account_light로 갱신된 예수금/총자산 사용 (추가 API 없음) try: pct = signal['profit_pct'] * 100 cum_pnl = self.current_total_asset - self.total_deposit if self.total_deposit else 0 cum_pct = (cum_pnl / self.total_deposit * 100) if self.total_deposit and self.total_deposit > 0 else 0 # 당일 손익: 오늘 장 시작 시 총자산 대비 현재 총자산 차이 day_pnl = self.current_total_asset - self.start_day_asset if self.start_day_asset else 0 day_pct = (day_pnl / self.start_day_asset * 100) if self.start_day_asset > 0 else 0 mm_msg = ( f"🔴 **매도 체결** {name} ({code})\n" f"{sell_price:,.0f}원 × {qty:,}주 | {signal['reason']} | 수익률 {pct:+.2f}% (실현 {profit_val:+,.0f}원)\n" f"예수금 {self.current_cash:,.0f}원 | 총자산 {self.current_total_asset:,.0f}원 | 보유 {len(self.holdings)}종목\n" f"당일손익 {day_pnl:+,.0f}원 ({day_pct:+.2f}%) | 누적손익 {cum_pnl:+,.0f}원 ({cum_pct:+.2f}%)" ) self.send_mm(mm_msg) except Exception as e: logger.debug(f"매도 체결 MM 발송 스킵: {e}") return True return False def run(self): """메인 루프 (진입점). 내부적으로 asyncio.run(_run_async()) 호출.""" asyncio.run(self._run_async()) async def _run_async(self): """비동기 메인 루프 - 백그라운드 태스크 시작 후 동기 매매 루프 실행""" logger.info("🚀 단타 트레이딩 봇 시작 (비동기 백그라운드 작업 활성화)") # 매터모스트 원격 조종 리스너 (!적용 / !설정 → DB 반영 후 reload_config) try: from mm_remote import MattermostRemoteController self.mm_remote = MattermostRemoteController( server_url=MM_SERVER_URL, bot_token=MM_BOT_TOKEN, channel_alias=self.mm_channel, mm_config_path=MM_CONFIG_FILE, db=self.db, ) self.mm_remote.start() except Exception as e: logger.warning("⚠️ MM 원격 조종 리스너 시작 실패: %s", e) self.mm_remote = None # 백그라운드 태스크 시작 self._universe_task = asyncio.create_task(self._universe_scan_scheduler()) self._report_task = asyncio.create_task(self._report_scheduler()) self._asset_task = asyncio.create_task(self._asset_update_scheduler()) logger.info("✅ 백그라운드 태스크 시작 완료 (유니버스 스캔, 리포트, 자산 업데이트)") # 동기 매매 루프는 별도 스레드에서 실행 (메인 이벤트 루프 블로킹 방지) loop = asyncio.get_event_loop() await loop.run_in_executor(None, self._sync_trading_loop) def _sync_trading_loop(self): """동기 매매 루프 (메인 로직) - 백그라운드 작업과 분리""" logger.info("📈 매매 루프 시작 (동기 모드)") while True: try: # ★ DB 설정 실시간 반영 (재시작 없이 적용) self.reload_config() now = dt.now() current_date = now.strftime("%Y-%m-%d") # 날짜 변경 시 리포트 플래그 리셋 if current_date != self.today_date: self.today_date = current_date self.morning_report_sent = False self.closing_report_sent = False self.final_report_sent = False self.ai_report_sent = False self.start_day_asset = 0 logger.info(f"📅 날짜 변경: {current_date}") # 리포트 전송은 백그라운드 태스크에서 처리하므로 여기서는 제거 # (기존 코드와의 호환성을 위해 주석 처리) # if now.hour == 13 and now.minute == 0: # self.send_morning_report() # self.send_ai_report() # elif now.hour == 15 and now.minute == 15: # self.send_closing_report() # elif now.hour == 15 and now.minute == 35: # self.send_final_report() # 장 상태 체크 if not self.check_market_status(): logger.info("⏸ 장 시간 아님 - 대기 중...") time.sleep(60) continue # 자산 정보 업데이트는 백그라운드 태스크에서 처리하므로 여기서는 제거 # (기존 코드와의 호환성을 위해 주석 처리) # if now.minute % 30 == 0: # 30분마다 # self._update_assets() # 매도 신호 체크 (우선) sell_signals = self.check_sell_signals() for signal in sell_signals: self.execute_sell(signal) # 매수: 후보 종목을 3분봉으로 실시간 타점 체크 → 그 순간에만 매수 (키움 TAIL_CATCH 방식) today_str = dt.now().strftime("%Y-%m-%d") if today_str != self.today_date: self.today_date = today_str self.untradable_skip_set.clear() logger.debug("📅 날짜 변경 -> 매매불가 제외 목록 초기화") active_count = len(self.holdings) db_candidates = self.db.get_target_candidates() # 신규 후보 WS 구독 + 3분봉 갭보정 (봉 버퍼 미확보 종목 자동 보완) if db_candidates: self._sync_subscriptions(db_candidates) if db_candidates and active_count < self.max_stocks: strength_preview = " | 강도순: " + ", ".join( f"{c.get('name', c.get('code',''))} {c.get('score', 0):.1f}" for c in db_candidates[:5] ) if db_candidates else "" logger.info(f"🔍 [매수체크] 후보 {len(db_candidates)}명 순회 (보유 {active_count}/{self.max_stocks}){strength_preview}") signals_this_turn = 0 attempts_this_turn = 0 for db_item in db_candidates: code = db_item.get("code") or db_item.get("stk_cd", "") name = db_item.get("name") or db_item.get("stk_nm", code) if not code or code in self.holdings: continue # 매매불가 종목은 당일 재시도 안 함 → 다음 후보로 if code in self.untradable_skip_set: continue # ★ 재진입 쿨다운 체크: 최근 매도 종목은 일정 시간 동안 재매수 차단. # 손절 직후 즉시 재매수 → 손절 반복 루프를 근본 차단. reentry_cooldown = get_env_int("REENTRY_COOLDOWN_SEC", 300) elapsed_since_sell = time.time() - self.recently_sold.get(code, 0) if elapsed_since_sell < reentry_cooldown: remaining = int(reentry_cooldown - elapsed_since_sell) logger.info( "⏳ [재진입 차단] %s(%s) 매도 후 쿨다운 중 — 남은 시간 %d초/%d초", name, code, remaining, reentry_cooldown, ) continue signal = self.check_buy_signal_tail_catch(code, name) if signal: signals_this_turn += 1 attempts_this_turn += 1 logger.info(f"🛒 [매수 시도] {name} ({code}) {attempts_this_turn}건째") ok = self.execute_buy(signal) if ok: logger.info(f"✅ [매수체크] 이번 턴 시그널 {signals_this_turn}건, 시도 {attempts_this_turn}건, 성공 1건") time.sleep(random.uniform(1, 2)) break # 실패(매매불가, 금액0, 예수금 부족, API 오류 등) -> 다음 후보로 순회 time.sleep(random.uniform(0.3, 0.8)) continue time.sleep(random.uniform(0.5, 1.0)) if signals_this_turn == 0 and db_candidates: logger.info(f"🔍 [매수체크] 이번 순회 시그널 0건 (조건 통과한 후보 없음) → 다음 턴에 재시도") elif attempts_this_turn > 0 and len(self.holdings) == active_count: logger.info(f"🔍 [매수체크] 이번 턴 시그널 {signals_this_turn}건, 시도 {attempts_this_turn}건, 체결 0건 (실패 후 다음 턴)") elif not db_candidates and active_count == 0: logger.debug("🔍 [매수] 타겟 0개 (유니버스 스캔 대기 중)") # 대기 (너무 길면 매수 타점 놓침 → 2~4초로 단축) time.sleep(random.uniform(2, 4)) except KeyboardInterrupt: logger.info("⏹ 봇 종료") # 백그라운드 태스크 취소 if self._universe_task: self._universe_task.cancel() if self._report_task: self._report_task.cancel() if self._asset_task: self._asset_task.cancel() break except Exception as e: logger.error(f"❌ 루프 에러: {e}") time.sleep(5) if __name__ == "__main__": logger.info("🚀 KIS Short V3 (tail_engine 기반, 단독 실행)") bot = ShortTradingBot() # V3: 매수/매도 = tail_engine (백테스트와 동일 규칙) bot.run()