Files
kis_bot/kis_short_ver2.py
2026-03-17 12:33:30 +09:00

3885 lines
183 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
KIS Short Trading Bot Ver1 - 단타용 한투 API 트레이딩 시스템
- 한국투자증권(KIS) Open API 사용
- 개미털기(눌림목) 전략 기반 단타 매매
- 빠른 손절/익절 로직
- kiwoom_trader_ver2.py의 단타 전략을 한투 API로 변환
"""
import os
import re
import json
import time
import random
import logging
import datetime
import hashlib
import hmac
import base64
import warnings
import asyncio
import subprocess
from datetime import datetime as dt
from pathlib import Path
from typing import List, Dict, Optional
import pandas as pd
import requests
from database import TradeDB, ENV_CONFIG_KEYS
# WebSocket 실시간 체결가 캐시 + 봉집계기 (REST 폴링 전면 대체)
# CandleAggregator: 틱 → 3분봉 in-memory 집계 → check_buy_signal_tail_catch / check_sell_signals / execute_buy 에서 활용
try:
from kis_ws import (
KISWebSocketPriceCache, CandleAggregator,
get_kiwoom_candles_df, _get_kiwoom_creds,
)
_KIS_WS_AVAILABLE = True
except ImportError:
_KIS_WS_AVAILABLE = False
# 로깅 설정
logging.basicConfig(
format='[%(asctime)s] %(message)s',
datefmt='%H:%M:%S',
level=logging.INFO,
)
logger = logging.getLogger("KISShortBot")
# 로그 색상 (ANSI) - 탈락/통과 구분
LOG_RED = "\033[91m" # 탈락
LOG_YELLOW = "\033[93m" # 탈락 (Pass-조건)
LOG_GREEN = "\033[92m" # 통과
LOG_CYAN = "\033[96m" # 강조
LOG_RESET = "\033[0m"
# DB 초기화 — MariaDB 192.168.0.141 (database.py 모듈 상수 사용)
SCRIPT_DIR = Path(__file__).resolve().parent
db = TradeDB() # db_path 인수 무시됨, MariaDB 직접 연결
# DB에서 환경변수 로드
def get_env_from_db(key, default=""):
"""DB에서 환경변수 읽기"""
env_data = db.get_latest_env()
if env_data and env_data.get("snapshot"):
return env_data["snapshot"].get(key, default)
return default
def get_env_float(key, default):
"""환경변수를 float로 변환 (DB 우선)"""
value = get_env_from_db(key, str(default))
if isinstance(value, str) and "#" in value:
value = value.split("#")[0].strip()
try:
return float(value) if value else default
except (ValueError, TypeError):
return default
def get_env_int(key, default):
"""환경변수를 int로 변환 (DB 우선)"""
value = get_env_from_db(key, str(default))
if isinstance(value, str) and "#" in value:
value = value.split("#")[0].strip()
try:
return int(value) if value else default
except (ValueError, TypeError):
return default
def get_env_bool(key, default=False):
"""환경변수를 bool로 변환 (DB 우선)"""
value = get_env_from_db(key, str(default)).lower()
return value in ("true", "1", "yes")
# Mattermost 설정
MM_SERVER_URL = get_env_from_db("MM_SERVER_URL", "https://mattermost.hoonfam.org")
MM_BOT_TOKEN = get_env_from_db("MM_BOT_TOKEN_", "").strip()
MM_CONFIG_FILE = SCRIPT_DIR / "mm_config.json"
# 기본 채널(alias) + 단타 봇 전용 채널(alias)
MM_CHANNEL_DEFAULT = get_env_from_db("MATTERMOST_CHANNEL", "stock")
MM_CHANNEL_SHORT = get_env_from_db("KIS_SHORT_MM_CHANNEL", MM_CHANNEL_DEFAULT)
# Gemini API (AI 리포트용) - google.genai 신규 SDK (Client 사용, configure 없음)
try:
import google.genai as genai
GEMINI_AVAILABLE = True
except ImportError:
GEMINI_AVAILABLE = False
logger.warning("⚠️ google-genai 미설치 - AI 리포트 기능 사용 불가")
GEMINI_API_KEY = get_env_from_db("GEMINI_API_KEY", "").strip()
GEMINI_MODEL_ID = "gemini-2.5-flash" # 또는 gemini-2.5-flash (모델명 확인)
gemini_client = None
if GEMINI_API_KEY and GEMINI_AVAILABLE:
try:
gemini_client = genai.Client(api_key=GEMINI_API_KEY)
except Exception as e:
logger.warning(f"⚠️ Gemini 초기화 실패: {e}")
gemini_client = None
else:
gemini_client = None
# ML 예측 (선택적)
try:
from ml_predictor import MLPredictor
ML_AVAILABLE = True
except ImportError:
ML_AVAILABLE = False
logger.warning("⚠️ ml_predictor 미설치 - ML 예측 기능 사용 불가")
# RiskManager (변동성 기반 리스크 관리)
try:
from risk_manager import RiskManager
RISK_MANAGER_AVAILABLE = True
except ImportError:
RISK_MANAGER_AVAILABLE = False
logger.warning("⚠️ risk_manager 미설치 - 변동성 역가중 매수 금액 계산 불가")
# ============================================================
# 한투(KIS) API 클라이언트 (kis_long_term_checker.py 참고)
# ============================================================
# 모의계좌용 토큰 캐시 경로
KIS_TOKEN_CACHE_PATH_MOCK = SCRIPT_DIR / ".kis_token_cache_mock.json"
# 실계좌용 토큰 캐시 경로
KIS_TOKEN_CACHE_PATH_REAL = SCRIPT_DIR / ".kis_token_cache_real.json"
# 한투 접근 토큰 유효기간 24시간. 자주 발급하면 영구 제명될 수 있으므로 캐시 철저 재사용.
# 만료 1분 전에만 재발급 (불필요한 발급 최소화)
KIS_TOKEN_EXPIRE_MARGIN_SEC = 60
def _parse_kis_token_expired(expired_str):
"""한투 API 만료시간 문자열 파싱. 'YYYY-MM-DD HH:MM:SS' 또는 'YYYY-MM-DDTHH:MM:SS' 등 지원."""
if not expired_str or not isinstance(expired_str, str):
return None
s = expired_str.strip().replace("T", " ")[:19]
if len(s) < 19:
return None
try:
return dt.strptime(s, "%Y-%m-%d %H:%M:%S")
except ValueError:
return None
def _load_kis_token_cache(mock):
"""캐시 파일에서 토큰 로드. 만료 1분 전까지 유효하면 재사용 (24h 토큰 자주 발급 시 영구 제명 주의)."""
if mock:
path = KIS_TOKEN_CACHE_PATH_MOCK
else:
path = KIS_TOKEN_CACHE_PATH_REAL
if not path.exists():
logger.info("한투 토큰 캐시 없음 → API 발급 예정 (캐시 경로: %s)", path)
return None
try:
logger.info("패스 %s", path)
with open(path, "r", encoding="utf-8") as f:
cache = json.load(f)
if cache.get("mock") != mock:
logger.info("한투 토큰 캐시 모의/실전 불일치 → API 발급 예정")
return None
token = cache.get("access_token")
expired_str = cache.get("access_token_token_expired") or cache.get("expires_at")
if not token or not expired_str:
logger.info("한투 토큰 캐시 내용 불완전 → API 발급 예정")
return None
expired_dt = _parse_kis_token_expired(expired_str)
if expired_dt is None:
logger.info("한투 토큰 캐시 만료시간 파싱 실패(%s) → API 발급 예정", expired_str[:30])
return None
if dt.now() >= expired_dt - datetime.timedelta(seconds=KIS_TOKEN_EXPIRE_MARGIN_SEC):
logger.info("한투 토큰 캐시 만료 임박(%s) → API 발급 예정", expired_str[:19])
return None
return token
except Exception as e:
logger.warning("한투 토큰 캐시 로드 실패(%s): %s", path, e)
return None
def _save_kis_token_cache(access_token, access_token_token_expired, mock):
"""발급받은 토큰을 캐시 파일에 저장."""
try:
if mock:
path = KIS_TOKEN_CACHE_PATH_MOCK
else:
path = KIS_TOKEN_CACHE_PATH_REAL
with open(path, "w", encoding="utf-8") as f:
json.dump({
"access_token": access_token,
"access_token_token_expired": access_token_token_expired,
"mock": mock,
}, f, ensure_ascii=False, indent=2)
logger.info("한투 토큰 캐시 저장 완료: %s", path)
except Exception as e:
logger.warning("한투 토큰 캐시 저장 실패: %s", e)
def _invalidate_kis_token_cache(mock):
"""토큰 만료(EGW00123) 시 캐시 파일 삭제 → 다음 _auth()에서 새 토큰 발급."""
path = KIS_TOKEN_CACHE_PATH_MOCK if mock else KIS_TOKEN_CACHE_PATH_REAL
try:
if path.exists():
path.unlink()
logger.info("한투 토큰 캐시 삭제 (만료 감지): %s", path)
except Exception as e:
logger.warning("한투 토큰 캐시 삭제 실패(%s): %s", path, e)
def _is_token_expired_response(j):
"""응답이 '기간이 만료된 token' 오류(EGW00123)인지 여부."""
if not j or not isinstance(j, dict):
return False
msg_cd = j.get("msg_cd") or ""
msg1 = str(j.get("msg1", ""))
return msg_cd == "EGW00123" or "만료된 token" in msg1 or "만료" in msg1
class KISClient:
"""한국투자증권 Open API 클라이언트"""
def __init__(self, mock=None):
# 실전/모의 토큰 모두 최신 상태로 유지 (모드와 무관하게 양쪽 갱신)
try:
from kis_token_manager import ensure_both_tokens
ensure_both_tokens()
except Exception as _te:
logger.warning(f"토큰 자동갱신 건너뜀: {_te}")
# 모의 여부 결정
if mock is not None:
use_mock = mock
else:
use_mock = get_env_bool("KIS_MOCK", True)
# 모의투자는 MOCK 전용 키만 사용(실전 키로 폴백 안 함 → 토큰/캐시가 실전이랑 섞이지 않도록)
if use_mock:
self.app_key = get_env_from_db("KIS_APP_KEY_MOCK", "").strip()
self.app_secret = get_env_from_db("KIS_APP_SECRET_MOCK", "").strip()
if not self.app_key or not self.app_secret:
logger.error("❌ 모의투자용 APP KEY/SECRET이 DB에 없습니다. KIS_APP_KEY_MOCK, KIS_APP_SECRET_MOCK 설정 필요.")
raise ValueError("모의투자 KIS_APP_KEY_MOCK / KIS_APP_SECRET_MOCK 미설정")
else:
self.app_key = (get_env_from_db("KIS_APP_KEY_REAL", "") or get_env_from_db("KIS_APP_KEY", "")).strip()
self.app_secret = (get_env_from_db("KIS_APP_SECRET_REAL", "") or get_env_from_db("KIS_APP_SECRET", "")).strip()
# 계좌번호: 모의/실전 분리
if use_mock:
raw_no = get_env_from_db("KIS_ACCOUNT_NO_MOCK", "").strip()
raw_code = get_env_from_db("KIS_ACCOUNT_CODE_MOCK", "").strip()
if not raw_code:
raw_code = "01"
else:
raw_no = (get_env_from_db("KIS_ACCOUNT_NO_REAL", "") or get_env_from_db("KIS_ACCOUNT_NO", "")).strip()
raw_code = (get_env_from_db("KIS_ACCOUNT_CODE_REAL", "") or get_env_from_db("KIS_ACCOUNT_CODE", "01")).strip()
if not raw_code:
raw_code = "01"
# 10자리면 앞 8 / 뒤 2 분리
if len(raw_no) >= 10:
self.acc_no = raw_no[:8]
self.acc_code = raw_no[8:10]
else:
self.acc_no = raw_no
self.acc_code = raw_code[:2] if len(raw_code) >= 2 else "01"
if len(self.acc_no) != 8:
logger.warning("⚠️ 계좌번호 CANO 8자리 아님: '%s'(%s자리). DB 확인.", self.acc_no, len(self.acc_no))
if len(self.acc_no) != 8 or len(self.acc_code) != 2:
logger.error(
"❌ 계좌번호 형식 오류: CANO=%s(%s자리), ACNT_PRDT_CD=%s(%s자리) → OPSQ2000 발생. "
"모의면 KIS_ACCOUNT_NO_MOCK/KIS_ACCOUNT_CODE_MOCK, 실전이면 KIS_ACCOUNT_NO/KIS_ACCOUNT_CODE 확인.",
self.acc_no, len(self.acc_no), self.acc_code, len(self.acc_code)
)
else:
logger.info("✅ 한투 계좌 CANO=%s, ACNT_PRDT_CD=%s (모의=%s)", self.acc_no, self.acc_code, use_mock)
self.mock = use_mock
if self.mock is True:
self.base_url = "https://openapivts.koreainvestment.com:29443"
else:
self.base_url = "https://openapi.koreainvestment.com:9443"
self.access_token = None
# 매수 주문 실패 시 사유 저장 (매매불가 종목 당일 제외용)
self._last_order_msg_cd = None
self._last_order_msg1 = None
# 현재가 API 캐시 (레이트리밋·지연 완화용)
# {종목코드: (timestamp, output_dict)}
self._price_cache = {}
logger.info("한투 API 연결: 모의=%s%s", self.mock, self.base_url)
self._auth()
def _auth(self):
"""접근 토큰 발급"""
if not self.app_key or not self.app_secret:
if self.mock:
key_hint = "KIS_APP_KEY_MOCK, KIS_APP_SECRET_MOCK"
else:
key_hint = "KIS_APP_KEY_REAL, KIS_APP_SECRET_REAL (또는 KIS_APP_KEY, KIS_APP_SECRET)"
logger.error("한투 API 키가 없습니다. DB env_config에 설정 필요: %s", key_hint)
raise ValueError("KIS 앱키/시크릿 설정 필요 (모의=%s)" % self.mock)
# ✅ path를 먼저 정의 (발급 성공/실패 양쪽에서 사용)
path = KIS_TOKEN_CACHE_PATH_MOCK if self.mock else KIS_TOKEN_CACHE_PATH_REAL
mode_str = "모의" if self.mock else "실전"
cached = _load_kis_token_cache(self.mock)
if cached:
self.access_token = cached
token_head = (cached[:8] + "") if cached and len(cached) > 8 else "(없음)"
logger.info("한투 토큰 캐시 사용 (%s) | 파일=%s | 토큰앞8=%s", mode_str, path, token_head)
return
# 캐시 없음/만료 → kis_token_manager 경로로만 발급 (잠금·1일1회 준수, SMS 시각 안정화)
try:
from kis_token_manager import ensure_token
if ensure_token(self.mock):
cached = _load_kis_token_cache(self.mock)
if cached:
self.access_token = cached
token_head = (cached[:8] + "") if len(cached) > 8 else "(없음)"
logger.info("한투 토큰 발급 완료 (%s) | 캐시=%s | 토큰앞8=%s", mode_str, path, token_head)
return
except Exception as e:
logger.warning("kis_token_manager 발급 실패, 재시도: %s", e)
raise RuntimeError("한투 토큰 발급 실패 (캐시 없음·만료 시 kis_token_manager.ensure_token 사용)")
def _get_hashkey(self, body):
"""해시키(Hashkey) 발급 - POST 요청 시 body 무결성 검증용"""
try:
url = f"{self.base_url}/uapi/hashkey"
headers = {
"content-type": "application/json",
"appkey": self.app_key,
"appsecret": self.app_secret,
}
r = requests.post(url, headers=headers, json=body, timeout=5)
if r.status_code == 200:
data = r.json()
if data.get("rt_cd") == "0":
return data.get("HASH")
return None
except Exception as e:
logger.debug(f"해시키 발급 실패: {e}")
return None
def _headers(self, tr_id, hashkey=None):
"""
API 호출용 헤더 생성.
KisTokenManager.get_token() 으로 매번 만료 여부 확인:
- 유효하면 메모리에서 즉시 반환 (오버헤드 없음)
- 만료 10분 전이면 선제 갱신 후 새 토큰 사용
→ EGW00123(만료 에러) 없이 자동 교체
"""
try:
from kis_token_manager import KisTokenManager
fresh = KisTokenManager.instance(is_mock=self.mock).get_token()
if fresh:
self.access_token = fresh
except Exception:
pass # 실패 시 기존 access_token 유지
headers = {
"content-type": "application/json; charset=utf-8",
"authorization": f"Bearer {self.access_token}",
"appkey": self.app_key,
"appsecret": self.app_secret,
"tr_id": tr_id,
}
if hashkey:
headers["hashkey"] = hashkey
return headers
def _get(self, path, tr_id, params, max_retries=5, tr_cont=None):
"""
GET 요청. EGW00201(초당 거래건수 초과) 시 점진적 대기 후 재시도 (200/500 공통).
EGW00123(기간이 만료된 token) 시 캐시 삭제 후 새 토큰 발급·1회 재시도.
- 한투 API 제한: 초당 20개 (실제로는 더 엄격, 모의투자는 초당 2~3회 권장)
- 기본 호출 간격: 0.5초 이상 권장
"""
url = f"{self.base_url}{path}"
if tr_cont:
headers_extra = {"tr_cont": tr_cont}
else:
headers_extra = {}
logger.debug(f"[API호출] GET {path} TR_ID={tr_id} params={params} tr_cont={tr_cont}")
time.sleep(0.5)
token_refreshed = False
for attempt in range(max_retries):
try:
headers = self._headers(tr_id)
for k, v in headers_extra.items():
headers[k] = v
r = requests.get(url, headers=headers, params=params, timeout=15)
# HTTP 429 (Too Many Requests)
if r.status_code == 429:
wait_time = 1 + (attempt * 1)
logger.warning(
f"⏳ API 호출 제한 (429) -> {wait_time}초 대기 후 재시도 "
f"({attempt+1}/{max_retries}) path={path}"
)
time.sleep(wait_time)
continue
# 200/500 응답에서 EGW00201(초당 거래건수 초과)이면 대기 후 재시도
if r.status_code in (200, 500):
try:
j = r.json()
except Exception:
j = {}
if r.status_code == 200 and j.get("rt_cd") == "0":
logger.debug(f"[API성공] GET {path} TR_ID={tr_id} status=200 rt_cd=0")
return r
if j.get("msg_cd") == "EGW00201" or "초과" in str(j.get("msg1", "")) or "과부하" in str(j.get("msg1", "")):
wait_time = 1.5 + (attempt * 1.0)
logger.warning(
f"⏳ API 과부하 (EGW00201) GET {path} TR_ID={tr_id} -> {wait_time:.1f}초 대기 후 재시도 "
f"({attempt+1}/{max_retries}) msg1={j.get('msg1', '')}"
)
time.sleep(wait_time)
continue
# EGW00123: 기간이 만료된 token → 캐시 삭제 후 새 토큰 발급, 1회만 재시도
if _is_token_expired_response(j) and not token_refreshed:
token_refreshed = True
_invalidate_kis_token_cache(self.mock)
self._auth()
logger.info("한투 토큰 만료(EGW00123) 감지 → 캐시 삭제 후 재발급, GET 재시도")
time.sleep(0.5)
continue
# HTTP 200이 아니거나 rt_cd != "0"인 경우
try:
body_preview = (r.text or "")[:500]
except Exception:
body_preview = ""
logger.warning(
f"[API실패] GET {path} TR_ID={tr_id} status={r.status_code} "
f"params={params} body={body_preview}"
)
return r
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
wait_time = (2 ** attempt) + random.uniform(0.5, 1.5)
logger.warning(f"⚠️ 네트워크 에러 -> {wait_time:.1f}초 대기 후 재시도: {e}")
time.sleep(wait_time)
else:
logger.error(f"❌ GET 요청 실패 ({path}): {e}")
return r
def _post(self, path, tr_id, body, use_hashkey=True, max_retries=3):
"""
POST 요청. EGW00201(초당 거래건수 초과) 시 점진적 대기 후 재시도.
EGW00123(기간이 만료된 token) 시 캐시 삭제 후 새 토큰 발급·1회 재시도.
- 한투 API 제한: 초당 20개 (실제로는 더 엄격)
"""
url = f"{self.base_url}{path}"
body_preview = str(body)[:200] if body else "{}"
logger.debug(f"[API호출] POST {path} TR_ID={tr_id} body={body_preview}...")
time.sleep(0.5)
token_refreshed = False
for attempt in range(max_retries):
try:
hashkey = self._get_hashkey(body) if use_hashkey else None
if use_hashkey and not hashkey:
logger.debug("해시키 발급 실패, 해시키 없이 진행")
r = requests.post(url, headers=self._headers(tr_id, hashkey), json=body, timeout=15)
# HTTP 429 (Too Many Requests)
if r.status_code == 429:
wait_time = 5 + (attempt * 1)
logger.warning(
f"⏳ API 호출 제한 (429) -> {wait_time}초 대기 후 재시도 "
f"({attempt+1}/{max_retries}) path={path}"
)
time.sleep(wait_time)
continue
if r.status_code in (200, 500):
try:
j = r.json()
except Exception:
j = {}
if r.status_code == 200 and j.get("rt_cd") == "0":
logger.debug(f"[API성공] POST {path} TR_ID={tr_id} status=200 rt_cd=0")
return r
if j.get("msg_cd") == "EGW00201" or "초과" in str(j.get("msg1", "")) or "과부하" in str(j.get("msg1", "")):
wait_time = 5 + (attempt * 1)
logger.warning(
f"⏳ API 과부하 (EGW00201) POST {path} TR_ID={tr_id} -> {wait_time}초 대기 후 재시도 "
f"({attempt+1}/{max_retries}) rt_cd={j.get('rt_cd')} msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}"
)
time.sleep(wait_time)
continue
# EGW00123: 기간이 만료된 token → 캐시 삭제 후 새 토큰 발급, 1회만 재시도
if _is_token_expired_response(j) and not token_refreshed:
token_refreshed = True
_invalidate_kis_token_cache(self.mock)
self._auth()
logger.info("한투 토큰 만료(EGW00123) 감지 → 캐시 삭제 후 재발급, POST 재시도")
time.sleep(0.5)
continue
try:
body_preview = (r.text or "")[:500]
except Exception:
body_preview = ""
logger.warning(
f"[API실패] POST {path} TR_ID={tr_id} status={r.status_code} "
f"body={body_preview}"
)
return r
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
wait_time = (2 ** attempt) + random.uniform(0.5, 1.5)
logger.warning(f"⚠️ 네트워크 에러 -> {wait_time:.1f}초 대기 후 재시도: {e}")
time.sleep(wait_time)
else:
logger.error(f"❌ POST 요청 실패 ({path}): {e}")
return r
def inquire_price(self, stock_code):
"""
주식 현재가 시세 조회 [v1_국내주식-008] (단건).
output: stck_prpr(현재가) ✅, stck_oprc(시가), stck_hgpr(고가), stck_lwpr(저가) 등 당일 OHLC 포함.
실패 시 오류코드(rt_cd, msg_cd, msg1) 로깅.
"""
# 짧은 TTL 캐시로 동일 종목 반복 호출 시 레이트리밋·지연 완화
cache_ttl = get_env_float("KIS_PRICE_CACHE_TTL_SEC", 1.0)
if cache_ttl > 0:
now_ts = time.time()
cached = self._price_cache.get(stock_code)
if cached:
ts, output = cached
if now_ts - ts <= cache_ttl:
logger.debug(f"[현재가API-캐시히트] code={stock_code} ttl={cache_ttl}s")
return output
path = "/uapi/domestic-stock/v1/quotations/inquire-price"
tr_id = "FHKST01010100"
params = {"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": stock_code}
logger.debug(f"[현재가API] 호출 code={stock_code} path={path} TR_ID={tr_id}")
r = self._get(path, tr_id, params)
if r.status_code != 200:
try:
body_preview = (r.text or "")[:300]
except Exception:
body_preview = ""
logger.warning(
f"[현재가API] HTTP 실패 code={stock_code} path={path} TR_ID={tr_id} "
f"status={r.status_code} body={body_preview}"
)
return None
try:
j = r.json()
except Exception as e:
logger.warning(
f"[현재가API] JSON 파싱 실패 code={stock_code} path={path} TR_ID={tr_id} exception={e}"
)
return None
if j.get("rt_cd") != "0":
logger.warning(
f"[현재가API] 오류 code={stock_code} path={path} TR_ID={tr_id} "
f"rt_cd={j.get('rt_cd')} msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}"
)
return None
output = j.get("output")
if cache_ttl > 0 and output is not None:
try:
self._price_cache[stock_code] = (time.time(), output)
except Exception:
pass
return output
def inquire_multprice(self, stock_codes: List[str], max_per_call: int = 20):
"""
다중 종목 현재가 조회 [intstock-multprice]
- 한투 API: /uapi/domestic-stock/v1/quotations/intstock-multprice
- 성공 시 {종목코드: output딕셔너리} 반환, 실패 시 None (오류 시 rt_cd/msg_cd/msg1 로깅)
- TR_ID: FHKST01010600
- ⚠️ 배치 응답에는 stck_oprc(시가)/stck_hgpr(고가)/stck_lwpr(저가)가 없을 수 있음(API 스펙).
시가·고가·저가 필요 시 단건 inquire_price() 사용.
"""
if not stock_codes:
return None
codes = list(stock_codes)[: max_per_call * 10]
result = {}
for i in range(0, len(codes), max_per_call):
chunk = codes[i : i + max_per_call]
iscd = ",".join(chunk)
path = "/uapi/domestic-stock/v1/quotations/intstock-multprice"
tr_id = "FHKST01010600"
params = {"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": iscd}
r = self._get(path, tr_id, params)
if r.status_code != 200:
try:
body_preview = (r.text or "")[:300]
except Exception:
body_preview = ""
logger.warning(
f"[다중시세API] HTTP 실패 status={r.status_code} body={body_preview}"
)
continue
try:
j = r.json()
except Exception as e:
logger.warning(f"[다중시세API] JSON 파싱 실패 exception={e}")
continue
if j.get("rt_cd") != "0":
logger.warning(
f"[다중시세API] 오류 rt_cd={j.get('rt_cd')} "
f"msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}"
)
continue
out = j.get("output")
if out is None:
continue
if isinstance(out, list):
for item in out:
if isinstance(item, dict):
code = (
item.get("stck_shrn_iscd")
or item.get("rsym")
or item.get("FID_INPUT_ISCD")
or item.get("mksc_shrn_iscd")
)
if code:
result[code] = item
elif isinstance(out, dict):
code = (
out.get("stck_shrn_iscd")
or out.get("rsym")
or out.get("FID_INPUT_ISCD")
)
if code:
result[code] = out
time.sleep(random.uniform(0.2, 0.5))
return result if result else None
def inquire_prices_batch(self, stock_codes: List[str]):
"""
다중 종목 현재가 일괄 조회
- intstock-multprice API 우선 시도 후, 실패 시 순차 조회(inquire_price)로 fallback
- 순차 조회 시 종목당 0.3~0.6초 딜레이
- 배치 응답에는 stck_prpr(현재가)는 있으나, stck_oprc/stck_hgpr/stck_lwpr 는 없을 수 있음.
시가·고가·저가 필요 시 배치 대신 종목별 inquire_price() 사용(개미털기 스캔은 이미 단건 사용).
"""
if not stock_codes:
return {}
multi = self.inquire_multprice(stock_codes)
if multi:
return multi
result = {}
for code in stock_codes:
try:
price_data = self.inquire_price(code)
if price_data:
result[code] = price_data
time.sleep(random.uniform(0.3, 0.6))
except Exception as e:
logger.warning(f"종목 조회 실패({code}) exception={e!r}")
continue
return result
def get_account_balance(self):
"""계좌 잔고 조회 [v1_국내주식-010]. 모의/실전에 따라 TR ID 분기 (EGW2004 방지)."""
if self.mock:
tr_id = "VTTC8434R"
else:
tr_id = "TTTC8434R"
params = {
"CANO": self.acc_no,
"ACNT_PRDT_CD": self.acc_code,
"AFHR_FLPR_YN": "N",
"OFL_YN": "",
"INQR_DVSN": "01", # 01: 예수금/잔고 요약 (output2에 예수금) - 블로그·한투 문서 기준
"UNPR_DVSN": "01",
"FUND_STTL_ICLD_YN": "N",
"FNCG_AMT_AUTO_RDPT_YN": "N",
"PRCS_DVSN": "00", # 00: 조회 (블로그 기준)
"CTX_AREA_FK100": "",
"CTX_AREA_NK100": "",
}
try:
logger.info(f"💵 [예수금] 잔고 조회 요청: TR={tr_id}, CANO={self.acc_no}, ACNT_PRDT_CD={self.acc_code}, 모의={self.mock}")
r = self._get(
"/uapi/domestic-stock/v1/trading/inquire-balance",
tr_id,
params,
)
if r.status_code != 200:
logger.warning(
f"💵 [예수금] 잔고 API HTTP 오류: status={r.status_code}, body={getattr(r, 'text', '')[:200]} | "
f"TR={tr_id} (모의={self.mock}), CANO={self.acc_no}, ACNT_PRDT_CD={self.acc_code}. "
f"EGW2004 시 모의면 VTTC8434R/실전이면 TTTC8434R 확인"
)
return None
j = r.json()
if j.get("rt_cd") != "0":
msg1 = (j.get("msg1") or "")[:150]
msg_cd = j.get("msg_cd", "")
logger.error(
f"💵 [예수금] 잔고 API 응답 오류: rt_cd={j.get('rt_cd')}, msg_cd={msg_cd}, msg1={msg1} | "
f"요청 파라미터: TR={tr_id}, CANO={self.acc_no}({len(self.acc_no)}자리), "
f"ACNT_PRDT_CD={self.acc_code}({len(self.acc_code)}자리), 모의={self.mock} | "
f"전체 응답: {j}"
)
if "OPSQ2000" in str(msg_cd) or "INVALID_CHECK_ACNO" in msg1:
logger.error(
"💵 [예수금] OPSQ2000 = 계좌번호 검증 실패. "
f"모의투자 서버({self.base_url})에 계좌번호 CANO={self.acc_no}, ACNT_PRDT_CD={self.acc_code}가 등록되어 있는지 확인. "
f"한투 모의투자 앱/웹에서 계좌번호 확인 필요. DB의 KIS_ACCOUNT_NO_MOCK/KIS_ACCOUNT_CODE_MOCK 값 확인."
)
return None
logger.debug(f"💵 [예수금] 잔고 조회 성공: output2 keys={list(j.get('output2', [{}])[0].keys()) if isinstance(j.get('output2'), list) and j.get('output2') else []}")
return j
except Exception as e:
logger.error(f"💵 [예수금] 잔고 조회 예외: {e} | CANO={self.acc_no}, ACNT_PRDT_CD={self.acc_code}, 모의={self.mock}")
return None
def get_account_evaluation(self):
"""계좌 평가 잔고 조회 [v1_국내주식-011]. 모의=VTTC8494R, 실전=TTTC8494R."""
if self.mock:
tr_id = "VTTC8494R"
else:
tr_id = "TTTC8494R"
try:
logger.info(tr_id)
r = self._get(
"/uapi/domestic-stock/v1/trading/inquire-balance-rlz-pl",
tr_id,
{
"CANO": self.acc_no,
"ACNT_PRDT_CD": self.acc_code,
"AFHR_FLPR_YN": "N",
"OFL_YN": "",
"INQR_DVSN": "01",
"UNPR_DVSN": "01",
"FUND_STTL_ICLD_YN": "N",
"FNCG_AMT_AUTO_RDPT_YN": "N",
"PRCS_DVSN": "01",
"CTX_AREA_FK100": "",
"CTX_AREA_NK100": "",
},
)
if r.status_code != 200:
return None
j = r.json()
if j.get("rt_cd") != "0":
return None
return j
except Exception as e:
logger.error(f"계좌 평가 조회 실패: {e}")
return None
def get_order_history(self, start_date=None, end_date=None):
"""주문 체결 내역 조회 [v1_국내주식-012] (모의=VTTC8001R, 실전=TTTC8001R)"""
try:
if self.mock:
tr_id = "VTTC8001R"
else:
tr_id = "TTTC8001R"
if not start_date:
start_date = dt.now().strftime("%Y%m%d")
if not end_date:
end_date = dt.now().strftime("%Y%m%d")
r = self._get(
"/uapi/domestic-stock/v1/trading/inquire-daily-ccld",
tr_id,
{
"CANO": self.acc_no,
"ACNT_PRDT_CD": self.acc_code,
"INQR_STRT_DT": start_date,
"INQR_END_DT": end_date,
"SLL_BUY_DVSN_CD": "00", # 00:전체
"INQR_DVSN": "00", # 00:역순
"PDNO": "",
"CCLD_DVSN": "00", # 00:전체
"ORD_GNO_BRNO": "",
"ODNO": "",
"INQR_DVSN_3": "00",
"INQR_DVSN_1": "",
"CTX_AREA_FK100": "",
"CTX_AREA_NK100": "",
},
)
if r.status_code != 200:
return None
j = r.json()
if j.get("rt_cd") != "0":
return None
return j
except Exception as e:
logger.error(f"주문 내역 조회 실패: {e}")
return None
def get_execution_by_odno(self, odno, code=None, wait_sec=2):
"""
주문번호(ODNO)로 당일 체결 내역 조회. 시장가 매수 후 실제 체결가/체결수량 확인용.
[v1_국내주식-012] inquire-daily-ccld 응답 output2에서 해당 주문 찾아 체결정보 반환.
Args:
odno: 주문번호 (buy_order 성공 시 반환값)
code: 종목코드 (선택, 일치 행 필터용)
wait_sec: 조회 전 대기 초 (체결 반영 지연 고려, 기본 2초)
Returns:
성공 시 {"filled_qty": int, "avg_price": float}, 미체결/조회실패 시 None
"""
if not odno:
return None
time.sleep(max(0, wait_sec))
try:
j = self.get_order_history()
if not j:
return None
out2 = j.get("output2", [])
if isinstance(out2, dict):
out2 = [out2]
for row in out2:
row_odno = str(row.get("ODNO") or row.get("ord_no") or "").strip()
row_pdno = str(row.get("PDNO") or row.get("pdno") or "").strip()
if row_odno != str(odno).strip():
continue
if code and row_pdno and row_pdno != str(code).strip():
continue
# 총체결수량 / 체결평균가 (한투 문서 필드명 다양하므로 후보 나열)
filled = row.get("tot_ccld_qty") or row.get("TOT_CCLD_QTY") or row.get("ccld_qty") or row.get("ord_qty") or row.get("ORD_QTY")
avg_pr = row.get("avg_prvs") or row.get("AVG_PRVS") or row.get("rjct_avg_prvs") or row.get("RJCT_AVG_PRVS") or row.get("ord_unpr") or row.get("ORD_UNPR")
if filled is not None and avg_pr is not None:
qty = int(float(str(filled).replace(",", "")))
price = float(str(avg_pr).replace(",", ""))
if qty > 0 and price > 0:
logger.debug(f"[체결조회] ODNO={odno} -> 체결수량={qty}, 체결평균가={price:,.0f}")
return {"filled_qty": qty, "avg_price": price}
logger.debug(f"[체결조회] ODNO={odno} 해당 주문 체결 내역 없음 (output2 건수={len(out2)})")
return None
except Exception as e:
logger.warning(f"주문번호 체결 조회 예외 ODNO={odno}: {e}")
return None
def get_volume_surge_stocks(self, market="J", min_volume_rate="50", limit=50):
"""거래량 급증 종목 조회 [v1_국내주식-023]"""
try:
r = self._get(
"/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice",
"FHKST03010200",
{
"FID_COND_MRKT_DIV_CODE": market,
"FID_INPUT_ISCD": "",
"FID_INPUT_HOUR_1": dt.now().strftime("%Y%m%d"),
"FID_INPUT_HOUR_2": dt.now().strftime("%Y%m%d"),
"FID_PW_DATA_INCU_YN": "Y",
},
)
# 실제로는 거래량 급증 API를 사용해야 하지만, 여기서는 예시로 현재가 조회 활용
# 실제 구현 시: /uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice 사용
return []
except Exception as e:
logger.error(f"거래량 급증 종목 조회 실패: {e}")
return []
def get_top_price_movers(self, market="J", sort_type="1", limit=50):
"""등락률 상위 종목 조회 [v1_국내주식-027]"""
try:
r = self._get(
"/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice",
"FHKST03010200",
{
"FID_COND_MRKT_DIV_CODE": market,
"FID_INPUT_ISCD": "",
"FID_INPUT_HOUR_1": dt.now().strftime("%Y%m%d"),
"FID_INPUT_HOUR_2": dt.now().strftime("%Y%m%d"),
"FID_PW_DATA_INCU_YN": "Y",
},
)
# 실제 구현 필요
return []
except Exception as e:
logger.error(f"등락률 상위 조회 실패: {e}")
return []
def get_investor_trend(self, stock_code, days=5):
"""외국인/기관 매매 동향 조회"""
path = "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
tr_id = "FHKST03010100"
try:
# 일봉 데이터에서 외국인/기관 정보 추출
end_dt = dt.now()
start_dt = end_dt - datetime.timedelta(days=days + 10)
start_ymd = start_dt.strftime("%Y%m%d")
end_ymd = end_dt.strftime("%Y%m%d")
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": stock_code,
"FID_INPUT_DATE_1": start_ymd,
"FID_INPUT_DATE_2": end_ymd,
"FID_PERIOD_DIV_CODE": "D",
"FID_ORG_ADJ_PRC": "1",
}
logger.debug(f"[투자자동향] 호출 code={stock_code} path={path} TR_ID={tr_id}")
r = self._get(path, tr_id, params)
if r.status_code != 200:
logger.warning(f"[투자자동향] HTTP 실패 code={stock_code} path={path} TR_ID={tr_id} status={r.status_code}")
return None
j = r.json()
if j.get("rt_cd") != "0":
logger.warning(
f"[투자자동향] 오류 code={stock_code} path={path} TR_ID={tr_id} "
f"rt_cd={j.get('rt_cd')} msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}"
)
return None
out2 = j.get("output2", [])
if not out2:
return None
# 최근 N일 외국인/기관 순매수 합계
foreign_sum = 0
org_sum = 0
for item in out2[:days]:
try:
foreign_raw = item.get("frgn_ntby_qty") or item.get("frgn_ntby_shnu") or "0"
foreign_net = int(float(str(foreign_raw).replace(",", "").replace("+", "").replace("-", "")))
if str(foreign_raw).startswith("-"):
foreign_net = -foreign_net
org_raw = item.get("orgn_ntby_qty") or "0"
org_net = int(float(str(org_raw).replace(",", "").replace("+", "").replace("-", "")))
if str(org_raw).startswith("-"):
org_net = -org_net
foreign_sum += foreign_net
org_sum += org_net
except:
continue
return {
"foreign_net_buy": foreign_sum,
"org_net_buy": org_sum,
"total_net_buy": foreign_sum + org_sum,
}
except Exception as e:
logger.error(f"외국인/기관 동향 조회 실패({stock_code}): {e}")
return None
def get_daily_chart(self, code, limit=10):
"""일봉 차트 조회 [v1_국내주식-017] - 거래대금(대/중/소형) 계산용"""
path = "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
tr_id = "FHKST03010100"
try:
end_dt = dt.now()
start_dt = end_dt - datetime.timedelta(days=limit + 30)
start_ymd = start_dt.strftime("%Y%m%d")
end_ymd = end_dt.strftime("%Y%m%d")
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": code,
"FID_INPUT_DATE_1": start_ymd,
"FID_INPUT_DATE_2": end_ymd,
"FID_PERIOD_DIV_CODE": "D",
"FID_ORG_ADJ_PRC": "1",
}
logger.debug(f"[일봉차트] 호출 code={code} path={path} TR_ID={tr_id}")
r = self._get(path, tr_id, params)
if r.status_code != 200:
logger.warning(f"[일봉차트] HTTP 실패 code={code} path={path} TR_ID={tr_id} status={r.status_code}")
return pd.DataFrame()
j = r.json()
if j.get("rt_cd") != "0":
logger.warning(
f"[일봉차트] 오류 code={code} path={path} TR_ID={tr_id} "
f"rt_cd={j.get('rt_cd')} msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}"
)
return pd.DataFrame()
data = j.get("output2", [])
if not data:
return pd.DataFrame()
rows = []
for item in data:
try:
rows.append({
"date": item.get("stck_bsop_date", ""),
"open": abs(float(item.get("stck_oprc", 0))),
"high": abs(float(item.get("stck_hgpr", 0))),
"low": abs(float(item.get("stck_lwpr", 0))),
"close": abs(float(item.get("stck_clpr", 0))),
"volume": int(item.get("acml_vol", 0)),
})
except Exception:
continue
if not rows:
return pd.DataFrame()
df = pd.DataFrame(rows)
df = df.sort_values("date").reset_index(drop=True)
return df.tail(limit)
except Exception as e:
logger.error(f"일봉 조회 실패({code}): {e}")
return pd.DataFrame()
def buy_order(self, code, qty, price=0, order_type="01"):
"""
매수 주문 (모의=VTTC0802U, 실전=TTTC0802U)
Args:
code: 종목코드
qty: 수량
price: 가격 (0이면 시장가)
order_type: 주문구분
- "01": 시장가
- "00": 지정가
- "05": 조건부지정가
- "06": 최유리지정가
- "07": 최우선지정가
- "10": 보통(IOC)
- "13": 시장가(IOC)
- "16": 최유리(IOC)
- "20": 보통(FOK)
- "23": 시장가(FOK)
- "26": 최유리(FOK)
"""
try:
if self.mock:
tr_id = "VTTC0802U"
else:
tr_id = "TTTC0802U"
if price > 0:
ord_unpr = str(price)
else:
ord_unpr = "0"
path = "/uapi/domestic-stock/v1/trading/order-cash"
body = {
"CANO": self.acc_no,
"ACNT_PRDT_CD": self.acc_code,
"PDNO": code,
"ORD_DVSN": order_type,
"ORD_QTY": str(qty),
"ORD_UNPR": ord_unpr,
}
logger.debug(f"[매수주문] 호출 code={code} qty={qty} price={price} path={path} TR_ID={tr_id}")
r = self._post(path, tr_id, body, use_hashkey=True)
if r.status_code != 200:
logger.error(f"[매수주문] HTTP 에러 code={code} path={path} TR_ID={tr_id} status={r.status_code}")
return False
j = r.json()
if j.get("rt_cd") == "0":
self._last_order_msg_cd = None
self._last_order_msg1 = None
ord_no = j.get("output", {}).get("ODNO", "")
logger.info(f"✅ 매수 주문 성공: {code} {qty}주 (주문번호: {ord_no})")
# 체결 확인용으로 주문번호 반환 (실패 시 False)
return ord_no if ord_no else True
else:
# 매매불가 등 실패 시 bot에서 당일 제외용으로 구분할 수 있도록 저장
self._last_order_msg_cd = j.get("msg_cd", "")
self._last_order_msg1 = str(j.get("msg1", "") or "")
logger.error(
f"[매수주문] 실패 code={code} path={path} TR_ID={tr_id} "
f"rt_cd={j.get('rt_cd')} msg_cd={self._last_order_msg_cd} msg1={self._last_order_msg1}"
)
return False
except Exception as e:
self._last_order_msg_cd = None
self._last_order_msg1 = None
logger.error(f"매수 주문 예외({code}): {e}")
return False
def buy_market_order(self, code, qty):
"""
시장가 매수 주문.
- 모의투자: FOK/IOC 미지원이므로 무조건 "01" 일반 시장가.
- 실전: USE_MARKET_IOC=true면 "13" 시장가 IOC, false면 "01" 일반 시장가.
"""
if self.mock:
order_type = "01"
else:
use_ioc = get_env_from_db("USE_MARKET_IOC", "true").strip().lower() in ("true", "1", "yes")
order_type = "13" if use_ioc else "01"
return self.buy_order(code, qty, price=0, order_type=order_type)
def sell_order(self, code, qty, price=0, order_type="01"):
"""
매도 주문 (모의=VTTC0801U, 실전=TTTC0801U)
Args:
code: 종목코드
qty: 수량
price: 가격 (0이면 시장가)
order_type: 주문구분 (buy_order와 동일)
"""
try:
if self.mock:
tr_id = "VTTC0801U"
else:
tr_id = "TTTC0801U"
if price > 0:
ord_unpr = str(price)
else:
ord_unpr = "0"
path = "/uapi/domestic-stock/v1/trading/order-cash"
body = {
"CANO": self.acc_no,
"ACNT_PRDT_CD": self.acc_code,
"PDNO": code,
"ORD_DVSN": order_type,
"ORD_QTY": str(qty),
"ORD_UNPR": ord_unpr,
}
logger.debug(f"[매도주문] 호출 code={code} qty={qty} price={price} path={path} TR_ID={tr_id}")
r = self._post(path, tr_id, body, use_hashkey=True)
if r.status_code != 200:
logger.error(f"[매도주문] HTTP 에러 code={code} path={path} TR_ID={tr_id} status={r.status_code}")
return False
j = r.json()
if j.get("rt_cd") == "0":
self._last_sell_msg_cd = None
self._last_sell_msg1 = None
ord_no = j.get("output", {}).get("ODNO", "")
logger.info(f"✅ 매도 주문 성공: {code} {qty}주 (주문번호: {ord_no})")
return True
else:
# execute_sell 에서 실패 원인(영업일 아님 등) 구분할 수 있도록 저장
self._last_sell_msg_cd = j.get("msg_cd", "")
self._last_sell_msg1 = str(j.get("msg1", "") or "")
logger.error(
f"[매도주문] 실패 code={code} path={path} TR_ID={tr_id} "
f"rt_cd={j.get('rt_cd')} msg_cd={self._last_sell_msg_cd} msg1={self._last_sell_msg1}"
)
return False
except Exception as e:
self._last_sell_msg_cd = None
self._last_sell_msg1 = None
logger.error(f"매도 주문 예외({code}): {e}")
return False
def sell_market_order(self, code, qty):
"""시장가 매도 주문 (간편 메서드)"""
return self.sell_order(code, qty, price=0, order_type="01")
def get_minute_chart(self, code, period="3", limit=100):
"""분봉 차트 조회 [v1_국내주식-017]"""
path = "/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
tr_id = "FHKST03010200"
try:
end_dt = dt.now()
start_dt = end_dt - datetime.timedelta(days=1)
start_ymd = start_dt.strftime("%Y%m%d")
end_ymd = end_dt.strftime("%Y%m%d")
# 분봉 코드: 1분=1, 3분=3, 5분=5, 10분=10, 30분=30, 60분=60
period_map = {"1": "1", "3": "3", "5": "5", "10": "10", "30": "30", "60": "60"}
period_code = period_map.get(str(period), "3")
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": code,
"FID_INPUT_HOUR_1": start_ymd,
"FID_INPUT_HOUR_2": end_ymd,
"FID_PW_DATA_INCU_YN": "Y",
"FID_ETC_CLS_CODE": "", # 기타분류코드 (필수 파라미터, 빈 값 가능)
}
logger.debug(f"[분봉차트] 호출 code={code} period={period} path={path} TR_ID={tr_id}")
r = self._get(path, tr_id, params)
if r.status_code != 200:
logger.warning(f"[분봉차트] HTTP 실패 code={code} path={path} TR_ID={tr_id} status={r.status_code}")
return pd.DataFrame()
j = r.json()
if j.get("rt_cd") != "0":
logger.warning(
f"[분봉차트] 오류 code={code} path={path} TR_ID={tr_id} "
f"rt_cd={j.get('rt_cd')} msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}"
)
return pd.DataFrame()
data = j.get("output2", [])
if not data:
return pd.DataFrame()
rows = []
for item in data:
try:
# 정렬용: 영업일자+체결시각(있으면) → 마지막 봉이 실제 최신봉이 되도록
date_str = str(item.get("stck_bsop_date", "") or "")
time_str = str(item.get("stck_cntg_hour", "") or item.get("cntg_hour", "") or "000000")
sort_key = date_str + time_str
rows.append({
"time": sort_key,
"open": abs(float(item.get("stck_oprc", 0))),
"high": abs(float(item.get("stck_hgpr", 0))),
"low": abs(float(item.get("stck_lwpr", 0))),
"close": abs(float(item.get("stck_clpr", 0))),
"volume": int(item.get("acml_vol", 0)),
})
except Exception:
continue
if not rows:
return pd.DataFrame()
df = pd.DataFrame(rows)
df = df.sort_values("time").reset_index(drop=True)
# 기술적 지표 추가
# RSI 기간: DB/env의 RSI_PERIOD 로 조절 (기본 14, 단타/스캘핑 시 3·5 권장)
# RSI 수학적 안정화를 위해 호출 측에서 limit≥100 이상 요청하는 것이 전제
rsi_period = get_env_int("RSI_PERIOD", 14)
if len(df) >= rsi_period:
delta = df["close"].diff(1)
gain = delta.where(delta > 0, 0).rolling(window=rsi_period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=rsi_period).mean()
rs = gain / loss.replace(0, float("nan"))
df["RSI"] = 100 - (100 / (1 + rs))
if len(df) >= 20:
df["MA20"] = df["close"].rolling(window=20).mean()
# MA5: check_buy_signal_tail_catch 에서 ma5_gap_pct 계산에 사용 (없으면 None으로 처리됨)
if len(df) >= 5:
df["MA5"] = df["close"].rolling(window=5).mean()
return df.tail(limit)
except Exception as e:
logger.error(f"분봉 조회 실패({code}): {e}")
return pd.DataFrame()
def get_orderbook(self, stock_code):
"""호가 조회 [v1_국내주식-009]"""
try:
r = self._get(
"/uapi/domestic-stock/v1/quotations/inquire-asking-price-exp-ccn",
"FHKST01010200",
{
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": stock_code,
},
)
if r.status_code != 200:
return None
j = r.json()
if j.get("rt_cd") != "0":
return None
return j.get("output")
except Exception as e:
logger.error(f"호가 조회 실패({stock_code}): {e}")
return None
# ============================================================
# 순위분석 API (키움 봇과 동일한 로직 + 레버리지/스팩/ETN 제외 옵션)
# ============================================================
@staticmethod
def _is_valid_stock(name: str, code: str) -> bool:
"""
종목 필터링 (키움 kiwoom_trader_ver2와 동일, ETF는 포함)
- 스팩, ETN, 우선주, 레버리지, 인버스 등만 제외 (ETF는 위험도 낮아 포함)
"""
if not code or len(code) != 6 or not code.isdigit():
return False
name = (name or "").strip()
exclude = [
"스팩", "SPAC", "ETN", "W", "ELW", "채권",
"레버리지", "인버스", "곱버스", "선물", "", "",
"2X", "3X", "합성", "H", "B",
]
# ETF는 exclude 목록에 없음 → 일반 주식·ETF 모두 통과
if any(k in name for k in exclude):
return False
if name.endswith("") or name.endswith("우B"):
return False
return True
def _filter_rank_by_valid_stock(self, rank_list: list) -> list:
"""랭크 API 응답 리스트에서 스팩/ETN/레버리지 등 제외 (키움 옵션과 동일)"""
if not rank_list:
return []
filtered = []
for item in rank_list:
code = (item.get("stk_cd") or item.get("mksc_shrn_iscd") or item.get("code") or "").strip()
name = (item.get("stk_nm") or item.get("prst_name") or item.get("hts_kor_isnm") or "").strip()
if self._is_valid_stock(name, code):
filtered.append(item)
return filtered
def _fetch_volume_rank_paged(
self,
market: str,
blng_cls_code: str,
limit: int,
exclude_spec_etn_leverage: bool,
) -> list:
"""
거래량순위 API 1회 조회 (한투 volume-rank API는 다음 페이지 tr_cont 미지원 → 1회만 호출).
"""
path = "/uapi/domestic-stock/v1/quotations/volume-rank"
tr_id = "FHPST01710000"
params = {
"FID_COND_MRKT_DIV_CODE": market,
"FID_COND_SCR_DIV_CODE": "20171",
"FID_INPUT_ISCD": "0000",
"FID_DIV_CLS_CODE": "0",
"FID_BLNG_CLS_CODE": blng_cls_code,
"FID_TRGT_CLS_CODE": "111111111",
"FID_TRGT_EXLS_CLS_CODE": "0000000000",
"FID_INPUT_PRICE_1": "0",
"FID_INPUT_PRICE_2": "0",
"FID_VOL_CNT": "0",
"FID_INPUT_DATE_1": "",
}
try:
time.sleep(0.5)
r = self._get(path, tr_id, params, tr_cont=None)
if r.status_code != 200:
return []
j = r.json()
if j.get("rt_cd") != "0":
return []
output = j.get("output", [])
if exclude_spec_etn_leverage:
output = self._filter_rank_by_valid_stock(output)
logger.info(f" 📡 [순위API] 수신 {len(output)}건 (다음페이지 미지원 → 1회만 호출)")
return output[:limit]
except Exception as e:
logger.debug(f"거래량순위 조회 실패: {e}")
return []
def get_volume_rank(
self,
market: str = "J",
limit: int = 50,
exclude_spec_etn_leverage: bool = True,
):
"""
거래량순위 조회 [v1_국내주식-047] (1회 호출, API가 반환한 건수만큼 수집)
"""
try:
output = self._fetch_volume_rank_paged(
market=market,
blng_cls_code="0",
limit=limit,
exclude_spec_etn_leverage=exclude_spec_etn_leverage,
)
return output
except Exception as e:
logger.debug(f"거래량순위 조회 실패: {e}")
return []
def get_price_change_rank(
self,
market: str = "J",
sort_type: str = "1",
limit: int = 50,
exclude_spec_etn_leverage: bool = True,
):
"""
등락률순위 조회 (동일 volume-rank API, FID_BLNG_CLS_CODE로 등락 구분)
sort_type: "1"=상승률 상위, "2"=하락률 상위(낙폭 큰 종목, N자 망치봉 스캔에 유리)
"""
# 한투 volume-rank API: 4=등락률(상승), 5=등락률(하락). 미지원 시 빈값/에러 가능.
blng = "5" if sort_type == "2" else "4"
try:
out = self._fetch_volume_rank_paged(
market=market,
blng_cls_code=blng,
limit=limit,
exclude_spec_etn_leverage=exclude_spec_etn_leverage,
)
if out:
return out
except Exception as e:
logger.debug(f"등락률순위(blng={blng}) 조회 실패: {e}")
# API가 4/5 미지원이면 기존처럼 거래량순위로 fallback (상승 위주 후보 확보)
if sort_type != "2":
return self.get_volume_rank(market=market, limit=limit, exclude_spec_etn_leverage=exclude_spec_etn_leverage)
return []
def get_price_decline_rank(
self,
market: str = "J",
limit: int = 100,
exclude_spec_etn_leverage: bool = True,
):
"""하락률 순위(낙폭 큰 종목) 조회. N자 망치봉/개미털기 스캔 유니버스 확대용."""
return self.get_price_change_rank(
market=market,
sort_type="2",
limit=limit,
exclude_spec_etn_leverage=exclude_spec_etn_leverage,
)
def get_trading_value_rank(
self,
market: str = "J",
limit: int = 50,
exclude_spec_etn_leverage: bool = True,
):
"""거래대금순위 조회 (연속 조회로 limit건까지). FID_BLNG_CLS_CODE=3"""
try:
return self._fetch_volume_rank_paged(
market=market,
blng_cls_code="3",
limit=limit,
exclude_spec_etn_leverage=exclude_spec_etn_leverage,
)
except Exception as e:
logger.debug(f"거래대금순위 조회 실패: {e}")
return []
def get_turnover_rank(
self,
market: str = "J",
limit: int = 50,
exclude_spec_etn_leverage: bool = True,
):
"""회전율순위 조회 (연속 조회로 limit건까지). FID_BLNG_CLS_CODE=2"""
try:
return self._fetch_volume_rank_paged(
market=market,
blng_cls_code="2",
limit=limit,
exclude_spec_etn_leverage=exclude_spec_etn_leverage,
)
except Exception as e:
logger.debug(f"회전율순위 조회 실패: {e}")
return []
def get_volume_growth_rank(
self,
market: str = "J",
limit: int = 50,
exclude_spec_etn_leverage: bool = True,
):
"""거래증가율순위 조회 (연속 조회로 limit건까지). FID_BLNG_CLS_CODE=1"""
try:
return self._fetch_volume_rank_paged(
market=market,
blng_cls_code="1",
limit=limit,
exclude_spec_etn_leverage=exclude_spec_etn_leverage,
)
except Exception as e:
logger.debug(f"거래증가율순위 조회 실패: {e}")
return []
def get_execution_strength_rank(
self,
market: str = "J",
limit: int = 200,
exclude_spec_etn_leverage: bool = True,
):
"""체결강도 상위 순위 조회 (FHPST01710000, FID_BLNG_CLS_CODE=6). 매수세 강한 종목 필터용."""
try:
return self._fetch_volume_rank_paged(
market=market,
blng_cls_code="6",
limit=limit,
exclude_spec_etn_leverage=exclude_spec_etn_leverage,
)
except Exception as e:
logger.debug(f"체결강도순위 조회 실패: {e}")
return []
def get_execution_strength_map(self, market: str = "J", limit: int = 200):
"""
체결강도 상위 API 조회 후 종목코드 -> 체결강도 값 매핑 반환.
output 필드: cntr_str(체결강도) 등 한투 문서 기준으로 파싱. 미제공 시 0.
"""
strength_map = {}
try:
rows = self.get_execution_strength_rank(market=market, limit=limit, exclude_spec_etn_leverage=True)
for item in (rows or []):
code = (item.get("stk_cd") or item.get("mksc_shrn_iscd") or item.get("code") or "").strip()
if not code or len(code) != 6:
continue
# 한투 volume-rank 체결강도: cntr_str 또는 유사 필드 (문서 확인 후 조정)
raw = item.get("cntr_str") or item.get("exec_str") or item.get("strg_rt") or item.get("prdy_ctrt") or ""
try:
strength = float(str(raw).replace(",", "").strip()) if raw else 0
except (ValueError, TypeError):
strength = 0
strength_map[code] = strength
except Exception as e:
logger.debug(f"체결강도 맵 조회 실패: {e}")
return strength_map
# ============================================================
# Mattermost 봇 클래스
# ============================================================
class MattermostBot:
"""Mattermost 알림 봇"""
def __init__(self):
self.api_url = f"{MM_SERVER_URL.rstrip('/')}/api/v4/posts"
self.headers = {
"Authorization": f"Bearer {MM_BOT_TOKEN}",
"Content-Type": "application/json"
}
self.channels = self._load_channels()
def _load_channels(self):
"""채널 설정 로드"""
try:
if MM_CONFIG_FILE.exists():
with open(MM_CONFIG_FILE, 'r', encoding='utf-8') as f:
return json.load(f).get("channels", {})
return {}
except Exception as e:
logger.error(f"⚠️ MM 설정 로드 실패: {e}")
return {}
def send(self, channel_alias, message):
"""메시지 전송"""
channel_id = self.channels.get(channel_alias)
if not channel_id:
logger.warning(f"'{channel_alias}' 채널 ID 없음")
return False
payload = {"channel_id": channel_id, "message": message}
try:
res = requests.post(self.api_url, headers=self.headers, json=payload, timeout=3)
res.raise_for_status()
return True
except Exception as e:
logger.error(f"❌ MM 전송 에러: {e}")
return False
def _save_ai_recommendations_from_text(db_instance, analysis_text: str):
"""AI 분석문에서 'KEY=값' 추천 줄만 추출해 DB에 저장 (!적용 시 사용)."""
if not analysis_text or not db_instance:
return
valid_keys = set(ENV_CONFIG_KEYS)
lines = []
for line in analysis_text.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
m = re.match(r"^([A-Z][A-Z0-9_]*)=(.+)$", line)
if m and m.group(1) in valid_keys:
lines.append(f"{m.group(1)}={m.group(2).strip()}")
if lines:
db_instance.set_last_ai_recommendations("\n".join(lines))
# ============================================================
# 단타 트레이딩 봇
# ============================================================
class ShortTradingBot:
"""단타용 트레이딩 봇 - 개미털기(눌림목) 전략"""
def __init__(self):
self.db = db
self.client = KISClient()
# Mattermost 초기화
self.mm = MattermostBot()
# 단타 봇 전용 채널(alias) 우선 사용, 없으면 기본 채널 사용
self.mm_channel = MM_CHANNEL_SHORT
# ML 예측 초기화 (선택적)
self.ml_predictor = None
if ML_AVAILABLE:
try:
self.ml_predictor = MLPredictor() # MariaDB 내부 연결
if self.ml_predictor.should_retrain():
self.ml_predictor.train_model(retrain=True)
except Exception as e:
logger.warning(f"⚠️ ML 예측 초기화 실패: {e}")
# RiskManager 초기화 (뼈대만 생성 → reload_config에서 DB 값으로 실시간 갱신)
self.risk_mgr = None
if RISK_MANAGER_AVAILABLE:
self.risk_mgr = RiskManager(
risk_pct_per_trade=get_env_float("RISK_PCT_PER_TRADE", 0.01),
max_position_pct=get_env_float("MAX_POSITION_PCT", 0.15),
min_position_amount=get_env_int("MIN_POSITION_AMOUNT", 50000),
use_kelly=get_env_bool("USE_KELLY_FORMULA", True),
kelly_multiplier=get_env_float("KELLY_MULTIPLIER", 0.25),
slot_base_amount_cap=get_env_int("SLOT_BASE_AMOUNT_CAP", 0),
# ── 무조건 깔고 가는 MAX_LOSS 기반 투자 상한 ─────────────
# ATR 계산 결과가 아무리 커도 이 상한 초과 불가
max_loss_per_trade_krw=get_env_int("MAX_LOSS_PER_TRADE_KRW", 200000),
stop_loss_pct=get_env_float("STOP_LOSS_PCT", -0.03),
# ── 사이즈 클래스별 비율 (DB에서 주입) ───────────────────
size_small_ratio=get_env_float("SIZE_CLASS_SMALL_RATIO", 0.70),
size_mid_ratio=get_env_float("SIZE_CLASS_MID_RATIO", 0.85),
)
logger.info("✅ RiskManager 뼈대 생성 완료")
else:
logger.warning("⚠️ RiskManager 미사용: 고정 슬롯 금액 방식으로 폴백")
# ★ [실시간 리로드] DB에서 최신 설정값을 불러와 봇·RiskManager에 반영 (재시작 없이 적용)
self.reload_config()
# 리포트 플래그 (reload_config 이후 유지)
self.morning_report_sent = False
self.closing_report_sent = False
self.final_report_sent = False
self.ai_report_sent = False
# 자산 추적
self.today_date = dt.now().strftime("%Y-%m-%d")
self.start_day_asset = 0
self.current_total_asset = 0
self.current_cash = 0
self.d2_excc_amt = 0 # D+2 예수금 (output2 prvs_rcdl_excc_amt)
self.total_deposit = get_env_float("TOTAL_DEPOSIT", 0)
# DB에서 활성 트레이드 로드 (단타만: SHORT_% - 늘림목과 섞이지 않도록)
self.holdings = {}
# 당일 매매불가로 확인된 종목 (같은 종목 반복 주문 방지 → 다음 후보로 넘어감)
self.untradable_skip_set = set()
# 최근 매도 종목 쿨다운 캐시 {code: 매도_timestamp}
# 매도 직후 같은 종목을 즉시 재매수하는 반복매매 루프 방지.
# 쿨다운 기간은 REENTRY_COOLDOWN_SEC(기본 5분)으로 조정.
self.recently_sold: dict = {}
# 매도 실패 백오프 캐시 {code: until_timestamp}
# "영업일이 아닙니다" 등 일시적 API 거부 시 재시도 방지.
# 재시도 대기 시간은 SELL_FAILURE_BACKOFF_SEC(기본 1800초=30분) 으로 조정.
self._sell_backoff: dict = {}
active_trades = self.db.get_active_trades(strategy_prefix="SHORT")
for code, trade in active_trades.items():
self.holdings[code] = {
"buy_price": trade.get("avg_buy_price", 0),
"qty": trade.get("current_qty", 0),
"buy_time": trade.get("buy_date", dt.now().strftime("%Y-%m-%d %H:%M:%S")),
"name": trade.get("name", code),
}
# ★ 계좌 잔고 API와 동기화 (폰/타 앱으로 산 종목 반영)
balance = self.client.get_account_balance()
if balance is not None:
self._sync_holdings_from_balance(balance)
self._update_assets(balance=balance) # 이미 받은 잔고로 자산 갱신 (API 1회만)
else:
self._update_assets()
# 비동기 태스크 관리
self._universe_task = None
self._report_task = None
self._asset_task = None
self.is_first_run = True
# ── WebSocket + CandleAggregator 초기화 ──────────────────────────────
# 틱 수신 → 3분봉 in-memory 집계 → REST 폴링(get_minute_chart) 전면 대체
# KISWebSocketPriceCache: 실시간 체결가 수신 (check_sell_signals 현재가)
# CandleAggregator : 3분봉 OHLCV·RSI 메모리 집계 (buy/ATR/RiskManager)
# start() 실패 시 is_active=False → REST fallback 자동 적용
self.ws_cache: Optional["KISWebSocketPriceCache"] = None
self.candle_agg: Optional["CandleAggregator"] = None
self._init_websocket()
# ── WebSocket + CandleAggregator 초기화 / 갭보정 / 구독 관리 ───────────
def _init_websocket(self):
"""WebSocket 시작 → CandleAggregator(3분봉) 연결 → 종목 구독 → 갭 보정."""
if not _KIS_WS_AVAILABLE:
logger.info(" kis_ws 모듈 없음 → REST inquire_price / get_minute_chart 폴링 유지")
return
try:
self.ws_cache = KISWebSocketPriceCache(
app_key = self.client.app_key,
app_secret = self.client.app_secret,
is_mock = self.client.mock,
)
# CandleAggregator: 3분봉 집계 (buy 타점·ATR·RiskManager 전용)
# 3분봉(주전략) + 15분봉 + 60분봉 — 추세 필터 / 다른 전략 확장용
self.candle_agg = CandleAggregator(db=self.db, timeframes=[3, 15, 60])
self.ws_cache.attach_candle_aggregator(self.candle_agg)
ws_ok = self.ws_cache.start()
if not ws_ok:
logger.info(" WebSocket 비활성 (모의 or 키 미설정) → REST fallback 유지")
self.ws_cache = None
self.candle_agg = None
return
# 봇 재시작 시 보유 종목 즉시 구독
for code in list(self.holdings.keys()):
self.ws_cache.subscribe(code)
# 유니버스 후보 종목도 미리 구독 (매수 타점 체크 전 봉 데이터 확보)
candidates = self.db.get_target_candidates()
for c in candidates:
code = c.get("code") or c.get("stk_cd", "")
if code and code not in self.holdings:
self.ws_cache.subscribe(code)
# ── 영구 구독 ETF: 시장 방향 필터용 (유니버스 변경과 무관하게 항상 유지) ──
perm_raw = get_env_from_db("PERMANENT_WS_CODES", "069500,229200")
self._permanent_ws_codes: set = {
c.strip() for c in str(perm_raw).split(",") if c.strip()
}
for code in sorted(self._permanent_ws_codes):
self.ws_cache.subscribe(code)
logger.info("📡 [영구구독] %s (시장방향 ETF)", code)
logger.info(
"✅ WebSocket + CandleAggregator(3분봉) 활성 (구독 %d종목) "
"— get_minute_chart REST 폴링 대체",
len(self.ws_cache._subscribed),
)
# 시작 시 REST 갭 보정 (봉 버퍼 비어있는 경우 RSI 안정화)
self._fill_all_gaps()
except Exception as _ws_e:
logger.warning("⚠️ WebSocket 초기화 예외(무시): %s", _ws_e)
self.ws_cache = None
self.candle_agg = None
def _fill_all_gaps(self):
"""
봇 시작·재접속 후 구독 중인 모든 종목의 분봉 갭을 보정.
RSI(14) 안정화를 위해 limit=120 사용.
▶ 키움 우선 전략:
- 키움 ka10080 은 1회 호출에 최대 900봉(≈6개월치) 제공 → 장 초반에도 즉시 봉 확보 가능
- KIS get_minute_chart 는 당일봉만 제공 → 장 시작 직후 봉 부족 → 키움 우선
- 키움 키 없으면 KIS fallback (3분봉만, 15/60분봉은 KIS 지원 안 함)
"""
if not self.candle_agg or not self.ws_cache:
return
limit = get_env_int("SHORT_GAP_FILL_LIMIT", 120)
with self.ws_cache._sub_lock:
codes = set(self.ws_cache._subscribed)
# ── 키움 크레덴셜 조회 ────────────────────────────────────────
kw_key, kw_secret, kw_mock = _get_kiwoom_creds(self.db)
use_kiwoom = bool(kw_key and kw_secret)
logger.info(
"🔧 [갭보정] %d종목 분봉 로드 시작 (tfs=%s, limit=%d, kiwoom=%s)",
len(codes), self.candle_agg.timeframes, limit, "" if use_kiwoom else "❌→KIS fallback",
)
for code in sorted(codes):
for tf in self.candle_agg.timeframes:
df = None
# 키움 우선 (토큰은 23시간 캐시 → au10001 한도 방지)
if use_kiwoom:
try:
df = get_kiwoom_candles_df(
code, tf, kw_key, kw_secret,
is_mock=kw_mock, n=limit,
)
except Exception as e:
logger.debug("키움 갭보정 실패 (%s %dM): %s", code, tf, e)
# KIS fallback: 당일봉만 → 3분봉에만 유효
if (df is None or df.empty) and tf <= 3:
try:
df = self.client.get_minute_chart(
code, period=str(tf), limit=limit
)
except Exception as e:
logger.debug("KIS 갭보정 실패 (%s %dM): %s", code, tf, e)
if df is not None and not df.empty:
self.candle_agg.fill_gap_from_rest(code, tf, df)
# 같은 종목 내 timeframe 전환: 짧은 딜레이
time.sleep(random.uniform(0.2, 0.4))
# 종목 간 딜레이
time.sleep(random.uniform(0.3, 0.6))
def _sync_subscriptions(self, candidates: list):
"""
target_candidates DB 목록과 WS 구독 목록 동기화.
- 유니버스에서 빠진 종목(보유 중 아닌 것) → unsubscribe + RAM 정리
- 신규 종목 → subscribe + 3분봉 갭 보정 (봉 버퍼 즉시 확보).
※ 영구 구독 ETF(_permanent_ws_codes)는 절대 해제하지 않음 (시장 방향 필터용)
"""
if not self.ws_cache:
return
new_codes = {c.get("code") or c.get("stk_cd", "") for c in candidates if c}
new_codes.discard("")
# 현재 보유 종목은 매도 완료 전까지 반드시 유지
new_codes |= set(self.holdings.keys())
# 영구 구독 ETF는 유니버스와 무관하게 항상 유지
new_codes |= getattr(self, '_permanent_ws_codes', set())
with self.ws_cache._sub_lock:
current_subs = set(self.ws_cache._subscribed)
# ── 구독 해제: 유니버스에서 빠진 종목 ─────────────────────────
# 보유 중 종목은 매도 감시를 위해 구독 유지
for code in sorted(current_subs - new_codes):
self.ws_cache.unsubscribe(code)
if self.candle_agg:
self.candle_agg.remove_code(code)
# ── 신규 구독: 유니버스에 새로 들어온 종목 ─────────────────────
kw_key, kw_secret, kw_mock = _get_kiwoom_creds(self.db)
use_kiwoom = bool(kw_key and kw_secret)
for code in sorted(new_codes - current_subs):
self.ws_cache.subscribe(code)
# 신규 구독 즉시 갭 보정 (봉 없으면 매수 타점 체크 불가) — 키움 우선
if not self.candle_agg:
continue
lim = get_env_int("SHORT_GAP_FILL_LIMIT", 120)
for tf in self.candle_agg.timeframes:
df = None
if use_kiwoom:
try:
df = get_kiwoom_candles_df(
code, tf, kw_key, kw_secret,
is_mock=kw_mock, n=lim,
)
except Exception as e:
logger.debug("키움 신규갭보정 실패 (%s %dM): %s", code, tf, e)
if (df is None or df.empty) and tf <= 3:
try:
df = self.client.get_minute_chart(
code, period=str(tf), limit=lim
)
except Exception as e:
logger.debug("KIS 신규갭보정 실패 (%s %dM): %s", code, tf, e)
if df is not None and not df.empty:
self.candle_agg.fill_gap_from_rest(code, tf, df)
# tf 간 딜레이 (차트 API, 토큰은 캐시 재사용)
time.sleep(random.uniform(0.2, 0.4))
def _get_candles_df(self, code: str, tf: int = 3, n: int = 20) -> Optional[pd.DataFrame]:
"""
CandleAggregator 메모리 봉 → DataFrame 변환 헬퍼.
확정봉(confirmed) + 진행봉(current, is_confirmed=0) 을 합쳐
get_minute_chart 와 동일한 컬럼(open/high/low/close/volume/RSI/MA5/MA20)을 반환.
- 진행봉의 close = 현재가 (최신 틱) → 매수 타점 실시간 포착
- CandleAggregator 미사용·데이터 부족 시 None 반환 → 호출부에서 REST fallback
Args:
code : 종목코드
tf : 봉 주기(분), 꼬리잡기 전략은 항상 3
n : 반환할 최대 봉 수 (tail 기준)
"""
if not self.candle_agg:
return None
# 확정봉: RSI_PERIOD(14)보다 넉넉하게 가져와 RSI 안정화
rsi_period = get_env_int("RSI_PERIOD", 14)
fetch_n = max(n + rsi_period + 5, n + 20)
confirmed = self.candle_agg.get_candles(code, tf, fetch_n)
if not confirmed:
return None
rows = list(confirmed)
# 진행 중인 봉(최신 틱 close) 을 tail 에 추가 → 실시간 캔들 패턴 포착
current = self.candle_agg.get_current_candle(code, tf)
if current and current.get("open", 0) > 0 and current.get("close", 0) > 0:
rows.append(current)
if len(rows) < 2:
return None
df = pd.DataFrame(rows)[["open", "high", "low", "close", "volume"]].copy()
df = df.reset_index(drop=True)
# 기술적 지표: get_minute_chart 와 동일 로직
if len(df) >= rsi_period:
delta = df["close"].diff(1)
gain = delta.where(delta > 0, 0).rolling(window=rsi_period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=rsi_period).mean()
rs = gain / loss.replace(0, float("nan"))
df["RSI"] = 100 - (100 / (1 + rs))
if len(df) >= 20:
df["MA20"] = df["close"].rolling(window=20).mean()
if len(df) >= 5:
df["MA5"] = df["close"].rolling(window=5).mean()
return df.tail(n).reset_index(drop=True)
# ── 설정 리로드 ─────────────────────────────────────────────────────────
def reload_config(self):
"""[실시간 리로드] DB(env) 설정을 봇에 반영. 메인 루프마다 호출 시 재시작 없이 적용."""
# [손절/익절 설정]
self.stop_loss_pct = get_env_float("STOP_LOSS_PCT", -0.04)
# ★ STOP_LOSS_PCT 부호 안전장치: 양수(0.02)로 입력 시 자동으로 음수(-0.02)로 변환.
# 양수 값이 그대로 사용되면 profit_pct <= stop_loss_pct 조건이 손익분기 이상에서도
# 참이 돼 칼손절이 수익 구간에서도 발동하는 심각한 버그가 발생함.
if self.stop_loss_pct > 0:
logger.warning(
"🚨 STOP_LOSS_PCT=%.4f 양수 감지 → 자동 부호 반전(%.4f). DB에 음수로 저장 권장 (!설정 STOP_LOSS_PCT=-%.4f)",
self.stop_loss_pct, -self.stop_loss_pct, self.stop_loss_pct,
)
self.stop_loss_pct = -self.stop_loss_pct
self.take_profit_pct = get_env_float("TAKE_PROFIT_PCT", 0.05)
self.max_stocks = get_env_int("MAX_STOCKS", 3)
self.min_drop_rate = get_env_float("MIN_DROP_RATE", 0.03)
self.min_recovery_ratio = get_env_float("MIN_RECOVERY_RATIO_SHORT", 0.5)
self.stop_atr_multiplier = get_env_float("STOP_ATR_MULTIPLIER_TAIL", 2.5)
self.target_atr_multiplier = get_env_float("TARGET_ATR_MULTIPLIER_TAIL", 8.0)
self.min_hold_hours = get_env_float("MIN_HOLD_HOURS", 24.0)
# [리스크 관리 설정]
self.risk_pct_per_trade = get_env_float("RISK_PCT_PER_TRADE", 0.01)
self.kelly_multiplier = get_env_float("KELLY_MULTIPLIER", 0.25)
self.max_position_pct = get_env_float("MAX_POSITION_PCT", 0.15)
self.min_position_amount = get_env_int("MIN_POSITION_AMOUNT", 50000)
use_kelly = get_env_bool("USE_KELLY_FORMULA", True)
if self.risk_mgr is not None:
self.risk_mgr.risk_pct = self.risk_pct_per_trade
self.risk_mgr.max_pos_pct = self.max_position_pct
self.risk_mgr.min_amount = self.min_position_amount
self.risk_mgr.use_kelly = use_kelly
self.risk_mgr.kelly_mult = self.kelly_multiplier
# ML 신호 필터링
self.use_ml_signal = get_env_bool("USE_ML_SIGNAL", False)
self.ml_min_probability = get_env_float("ML_MIN_PROBABILITY", 0.57)
# 자산 기준 (리포트용)
self.total_deposit = get_env_float("TOTAL_DEPOSIT", 0)
def _get_effective_slot_cap(self) -> float:
"""
슬롯당 실투입 상한 금액 계산.
- env: SLOT_BASE_AMOUNT_CAP (직접 지정)
- env: MAX_LOSS_PER_TRADE_KRW + STOP_LOSS_PCT 로부터 역산
(MAX_LOSS_PER_TRADE_KRW / |STOP_LOSS_PCT|)
둘 다 설정되어 있으면 더 작은 값 사용.
설정이 없으면 0 리턴(제한 없음).
"""
slot_cap_env = get_env_float("SLOT_BASE_AMOUNT_CAP", 0.0)
max_loss_krw = get_env_float("MAX_LOSS_PER_TRADE_KRW", 0.0)
stop_abs = abs(self.stop_loss_pct) if self.stop_loss_pct != 0 else 0.0
derived_cap = 0.0
if max_loss_krw > 0 and stop_abs > 0:
try:
derived_cap = max_loss_krw / stop_abs
except Exception:
derived_cap = 0.0
candidates = [c for c in (slot_cap_env, derived_cap) if c and c > 0]
if not candidates:
return 0.0
return float(min(candidates))
def _seconds_until_next_5min(self):
"""다음 5분 정각까지 남은 초 계산"""
now = dt.now()
next_min = ((now.minute // 5) + 1) * 5
if next_min >= 60:
next_time = now.replace(hour=now.hour + 1, minute=0, second=0, microsecond=0)
else:
next_time = now.replace(minute=next_min, second=0, microsecond=0)
return (next_time - now).total_seconds()
def update_universe(self):
"""
유니버스 업데이트 (5분마다 호출) - Ver2: 스캔/API 미사용.
후보는 target_candidates 테이블에서만 읽음 (get_target_candidates).
DB는 키움 등 외부에서 채우고, 여기서는 API 호출·DB 쓰기 없음.
"""
logger.info(f"🔄 [유니버스] DB 전용 모드 | 후보는 target_candidates 테이블에서만 조회 (API 미호출)")
# 가져오기는 매매 루프에서 self.db.get_target_candidates() 로 유지
async def _universe_scan_scheduler(self):
"""5분마다 정각에 유니버스 스캔 실행 (비동기 백그라운드)"""
loop = asyncio.get_event_loop()
while True:
try:
if self.is_first_run:
wait_sec = 0 # 첫 실행은 즉시
else:
wait_sec = max(0, self._seconds_until_next_5min())
if wait_sec > 0:
await asyncio.sleep(wait_sec)
now = dt.now()
logger.info(f"🔄 [스캔 주기] 정각 스캔 시작 | 시각:{now.hour:02d}:{now.minute:02d}:{now.second:02d}")
# 동기 함수를 executor에서 실행 (메인 루프 블로킹 방지)
await loop.run_in_executor(None, self.update_universe)
self.is_first_run = False
await asyncio.sleep(5) # 스캔 직후 5초 대기 (과부하 방지)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"❌ [스캔 스케줄러] 에러: {e}")
await asyncio.sleep(60)
async def _report_scheduler(self):
"""리포트 전송 스케줄러 (비동기 백그라운드)"""
while True:
try:
await asyncio.sleep(60) # 1분마다 체크
now = dt.now()
# 13:00 - 오전 리포트 + AI 리포트
if now.hour == 13 and now.minute == 0 and not self.morning_report_sent:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.send_morning_report)
await loop.run_in_executor(None, self.send_ai_report)
# 15:15 - 장마감 전 리포트
elif now.hour == 15 and now.minute == 15 and not self.closing_report_sent:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.send_closing_report)
# 15:35 - 최종 리포트
elif now.hour == 15 and now.minute == 35 and not self.final_report_sent:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.send_final_report)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"❌ [리포트 스케줄러] 에러: {e}")
await asyncio.sleep(60)
async def _asset_update_scheduler(self):
"""자산 정보 업데이트 스케줄러 (30분마다, 비동기 백그라운드)"""
while True:
try:
await asyncio.sleep(60) # 1분마다 체크
now = dt.now()
# 30분마다 자산 업데이트
if now.minute % 30 == 0:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._update_assets)
await asyncio.sleep(60) # 업데이트 후 1분 대기 (중복 방지)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"❌ [자산 업데이트 스케줄러] 에러: {e}")
await asyncio.sleep(60)
def _sync_holdings_from_balance(self, balance):
"""
계좌 잔고 API 응답(output1)으로 self.holdings 동기화.
폰·타 앱으로 매수한 종목이 계좌에 있으면 holdings에 반영하고, 매도한 종목은 제거.
"""
try:
output1 = balance.get("output1") or []
if isinstance(output1, dict):
output1 = [output1]
if not isinstance(output1, list):
return
api_codes = set()
for item in output1:
code = (item.get("pdno") or item.get("PDNO") or "").strip()
if not code:
continue
# 보유수량 (한투: hldg_qty 등)
qty_val = item.get("hldg_qty") or item.get("HLDG_QTY") or item.get("ord_qty") or item.get("ORD_QTY") or 0
qty = int(float(str(qty_val).replace(",", ""))) if qty_val is not None else 0
if qty <= 0:
continue
api_codes.add(code)
# 매입평균가 (한투 pchs_avg_pric). 없거나 0이면 평가금액-평가손익으로 역산
avg_pr_raw = item.get("pchs_avg_pric") or item.get("PCHS_AVG_PRIC")
api_buy_price = 0.0
if avg_pr_raw is not None:
avg_pr_str = str(avg_pr_raw).replace(",", "").strip()
if avg_pr_str not in ("", "0", "0.0"):
try:
api_buy_price = abs(float(avg_pr_str))
except Exception:
api_buy_price = 0.0
if api_buy_price <= 0:
try:
evlu_amt = float(item.get("evlu_amt") or item.get("EVLU_AMT") or 0)
evlu_pfls = float(item.get("evlu_pfls_amt") or item.get("EVLU_PFLS_AMT") or 0)
cost = evlu_amt - evlu_pfls
api_buy_price = cost / qty if qty and cost > 0 else 0.0
except Exception:
api_buy_price = 0.0
if api_buy_price <= 0:
api_buy_price = 0.0
# 매수일시 (API에 있으면 그때로, 없으면 현재)
buy_time_str = item.get("fstc_pchs_dt") or item.get("FSTC_PCHS_DT") or item.get("pchs_dt") or item.get("PCHS_DT") or item.get("ord_dt") or ""
buy_time_str = (buy_time_str or "").strip().replace("-", "").replace(" ", "").replace(":", "")
if len(buy_time_str) >= 14:
try:
buy_time = dt.strptime(buy_time_str[:14], "%Y%m%d%H%M%S").strftime("%Y-%m-%d %H:%M:%S")
except Exception:
buy_time = dt.now().strftime("%Y-%m-%d %H:%M:%S")
elif len(buy_time_str) >= 8:
try:
buy_time = dt.strptime(buy_time_str[:8], "%Y%m%d").strftime("%Y-%m-%d %H:%M:%S")
except Exception:
buy_time = dt.now().strftime("%Y-%m-%d %H:%M:%S")
else:
buy_time = dt.now().strftime("%Y-%m-%d %H:%M:%S")
name = (item.get("prdt_name") or item.get("PRDT_NAME") or item.get("prdt_name_eng") or code or "").strip()
if not name:
name = code
existing = self.holdings.get(code)
# ⇒ buy_price 결정: API > 기존 holdings > DB 순서로 폴백
buy_price = api_buy_price
existing_buy_price = 0.0
if existing:
try:
existing_buy_price = float(existing.get("buy_price", 0) or 0)
except Exception:
existing_buy_price = 0.0
if buy_price <= 0 and existing_buy_price > 0:
buy_price = existing_buy_price
if buy_price <= 0:
db_trade = None
try:
if hasattr(self.db, "get_active_trade"):
db_trade = self.db.get_active_trade(code)
except Exception as e:
logger.debug(f"잔고동기화 DB 평단 조회 실패({code}): {e}")
if db_trade:
try:
db_buy_price = float(
db_trade.get("avg_buy_price")
or db_trade.get("buy_price")
or 0
)
except Exception:
db_buy_price = 0.0
if db_buy_price > 0:
buy_price = db_buy_price
if existing:
if buy_price <= 0:
# 매입가를 신뢰할 수 없으면 기존 매입가를 보존하고 수량/이름만 갱신
logger.warning(
f"⚠️ [잔고동기화] {name} ({code}) 매입가 복원 실패 → 기존 매입가 유지"
)
existing["qty"] = qty
if name:
existing["name"] = name
continue
# 수량/매입가 API·DB 기준으로 갱신 (폰에서 추가 매수/일부 매도 반영)
existing["qty"] = qty
existing["buy_price"] = buy_price
if name:
existing["name"] = name
continue
# API에만 있는 종목(폰 등 외부 매수) → holdings에 추가, 기본 손절/목표가 적용
if buy_price <= 0:
logger.warning(
f"⚠️ [잔고동기화] {name} ({code}) 매입가 0/없음 → holdings 추가 스킵"
)
continue
stop_price = buy_price * (1 + self.stop_loss_pct) if buy_price > 0 else 0
target_price = buy_price * (1 + self.take_profit_pct) if buy_price > 0 else 0
self.holdings[code] = {
"buy_price": buy_price,
"qty": qty,
"buy_time": buy_time,
"name": name,
"max_price": buy_price,
"atr_entry": buy_price * 0.01 if buy_price > 0 else 0,
"stop_price": stop_price,
"target_price": target_price,
}
self.db.upsert_trade({
"code": code,
"name": name,
"strategy": "SHORT_ANT_SHAKING",
"avg_buy_price": buy_price,
"current_price": buy_price,
"target_qty": qty,
"current_qty": qty,
"status": "HOLDING",
"buy_date": buy_time,
"stop_price": stop_price,
"target_price": target_price,
"atr_entry": self.holdings[code]["atr_entry"],
})
logger.info(f"📲 [잔고동기화] 외부 매수 반영: {name} ({code}) {qty}주 @ {buy_price:,.0f}")
# API에 없는 종목 제거 (다른 경로에서 매도된 경우). output1이 리스트일 때만 적용
if isinstance(output1, list):
for code in list(self.holdings.keys()):
if code not in api_codes:
name = self.holdings[code].get("name", code)
del self.holdings[code]
try:
self.db.close_trade(code=code, sell_price=0, sell_reason="잔고동기화(외부매도)", strategy="SHORT_ANT_SHAKING")
except Exception as e:
logger.debug(f"잔고동기화 close_trade 스킵 {code}: {e}")
logger.info(f"📲 [잔고동기화] 보유 제거: {name} ({code}) - 계좌에 없음")
except Exception as e:
logger.warning(f"잔고 동기화 예외: {e}")
def _update_assets(self, balance=None):
"""자산 정보 업데이트 (잔고 동기화 포함). balance 생략 시 API 호출."""
try:
if balance is None:
balance = self.client.get_account_balance()
if balance is None:
logger.warning(
"💵 [예수금] get_account_balance가 None 반환 → 예수금 갱신 스킵 "
"(토큰·계좌·TR ID 확인. 모의=VTTC8434R, 실전=TTTC8434R)"
)
return
# 폰/타 앱 매매 반영: 잔고 API 기준으로 holdings 동기화
self._sync_holdings_from_balance(balance)
# 한투 API: output1=주식 잔고(종목별), output2=예수금 관련(dnca_tot_amt 등) - 블로그·문서 기준
def _parse_amt(v):
if v is None or str(v).strip() == "":
return None
return float(str(v).replace(",", "").strip())
def _cash_block(obj):
if not obj:
return {}
if isinstance(obj, list) and obj:
return obj[0]
if isinstance(obj, dict):
return obj
return {}
out2 = _cash_block(balance.get("output2"))
if isinstance(balance.get("output1"), dict):
out1 = balance.get("output1", {})
else:
out1 = {}
ord_psbl_val = _parse_amt(out2.get("ord_psbl_cash") or out1.get("ord_psbl_cash"))
dnca_tot_val = _parse_amt(out2.get("dnca_tot_amt") or out1.get("dnca_tot_amt")) or 0
# 거래가능 = D+2 예수금 (prvs_rcdl_excc_amt). 없으면 예수금총액 fallback
prvs_rcdl = _parse_amt(out2.get("prvs_rcdl_excc_amt"))
if prvs_rcdl is not None:
self.d2_excc_amt = prvs_rcdl
else:
self.d2_excc_amt = 0
if prvs_rcdl is not None and prvs_rcdl > 0:
self.current_cash = prvs_rcdl
logger.info(f"💵 [예수금] 거래가능=D+2예수금={self.current_cash:,.0f}")
else:
self.current_cash = dnca_tot_val
logger.info(f"💵 [예수금] 거래가능=예수금총액={self.current_cash:,.0f}원 (D+2 없음)")
# 보유 종목 평가액 계산
holdings_value = 0
for code, holding in self.holdings.items():
price_data = self.client.inquire_price(code)
if price_data:
current_price = abs(float(price_data.get("stck_prpr", 0)))
holdings_value += current_price * holding["qty"]
self.current_total_asset = self.current_cash + holdings_value
if self.start_day_asset == 0:
self.start_day_asset = self.current_total_asset
except Exception as e:
logger.error(f"자산 정보 업데이트 실패: {e}")
def _update_account_light(self, profit_val=0):
"""
경량 계좌 갱신 (매수/매도 직후 즉시 호출!)
- API 부하를 줄이기 위해 예수금 + 보유 종목 평가액만 빠르게 계산
- 총자산 = 예수금 + 보유 종목 평가액 (+ 손익 반영)
"""
try:
balance = self.client.get_account_balance()
if balance is None:
logger.warning("💵 [예수금-경량] get_account_balance None → 예수금 갱신 스킵")
return False
def _parse_amt_light(v):
if v is None or str(v).strip() == "":
return None
return float(str(v).replace(",", "").strip())
def _cash_block_light(obj):
if not obj:
return {}
if isinstance(obj, list) and obj:
return obj[0]
if isinstance(obj, dict):
return obj
return {}
out2 = _cash_block_light(balance.get("output2"))
if isinstance(balance.get("output1"), dict):
out1 = balance.get("output1", {})
else:
out1 = {}
ord_psbl_val = _parse_amt_light(out2.get("ord_psbl_cash") or out1.get("ord_psbl_cash"))
dnca_tot_val = _parse_amt_light(out2.get("dnca_tot_amt") or out1.get("dnca_tot_amt")) or 0
prvs_rcdl = _parse_amt_light(out2.get("prvs_rcdl_excc_amt"))
if prvs_rcdl is not None:
self.d2_excc_amt = prvs_rcdl
if prvs_rcdl is not None and prvs_rcdl > 0:
new_cash = prvs_rcdl
else:
new_cash = dnca_tot_val
logger.info(
f"💵 [예수금-경량] 거래가능(D+2)={new_cash:,.0f}원 (이전={self.current_cash:,.0f})"
)
if new_cash > 0 or self.current_cash == 0:
self.current_cash = new_cash
# 보유 종목 평가액: output1=주식 잔고(종목별), output2=예수금 요약 (블로그 기준)
output1_list = balance.get("output1", [])
if isinstance(output1_list, dict):
output1_list = [output1_list]
holdings_value = 0
for code, holding in self.holdings.items():
for item in output1_list:
if (item.get("pdno") or "").strip() == code:
evlu_amt = float(item.get("evlu_amt", 0))
holdings_value += evlu_amt
break
else:
price_data = self.client.inquire_price(code)
if price_data:
current_price = abs(float(price_data.get("stck_prpr", 0)))
holdings_value += current_price * holding["qty"]
self.current_total_asset = self.current_cash + holdings_value
if profit_val != 0:
self.current_total_asset += profit_val
logger.debug(f"💵 [경량갱신] 예수금: {self.current_cash:,.0f}원 | 총자산: {self.current_total_asset:,.0f}")
return True
except Exception as e:
logger.error(f"❌ 경량 갱신 실패: {e}")
return False
def _update_cash_only(self):
"""예수금만 빠르게 업데이트 (하위 호환성용, _update_account_light 사용 권장)"""
return self._update_account_light(profit_val=0)
def send_mm(self, msg):
"""Mattermost 알림 전송. 성공 시 True, 실패 시 False."""
try:
return self.mm.send(self.mm_channel, msg)
except Exception as e:
logger.error(f"❌ MM 전송 에러: {e}")
return False
def check_market_status(self):
"""장 운영 시간 체크"""
# FORCE_MARKET_OPEN 플래그 확인 (테스트용)
force_open = get_env_bool("FORCE_MARKET_OPEN", False)
if force_open:
logger.debug("🔓 FORCE_MARKET_OPEN=true - 장 상태 무시하고 계속 진행")
return True
# 정상 장 운영 시간 체크
now = dt.now()
if not (datetime.time(8, 30) <= now.time() <= datetime.time(16, 0)):
return False
if now.weekday() >= 5: # 주말
return False
return True
def send_morning_report(self):
"""오전 장 뜸할 때 리포트 (13:00)"""
if self.morning_report_sent:
return
self._update_assets()
day_pnl = self.current_total_asset - self.start_day_asset
if self.start_day_asset > 0:
day_pnl_pct = day_pnl / self.start_day_asset * 100
else:
day_pnl_pct = 0
msg = f"""📊 **[오전 장 현황 - 13:00]**
- 당일 시작: {self.start_day_asset:,.0f}
- 현재 자산: {self.current_total_asset:,.0f}
- 당일 손익: {day_pnl:+,.0f}원 ({day_pnl_pct:+.2f}%)
- 보유 종목: {len(self.holdings)}"""
self.send_mm(msg)
self.morning_report_sent = True
logger.info("📊 오전 리포트 전송 완료")
def send_closing_report(self):
"""장마감 전 리포트 (15:15)"""
if self.closing_report_sent:
return
self._update_assets()
day_pnl = self.current_total_asset - self.start_day_asset
if self.start_day_asset > 0:
day_pnl_pct = day_pnl / self.start_day_asset * 100
else:
day_pnl_pct = 0
msg = f"""📈 **[장마감 전 현황 - 15:15]**
- 당일 손익: {day_pnl:+,.0f}원 ({day_pnl_pct:+.2f}%)
- 현재 자산: {self.current_total_asset:,.0f}
- 보유 종목: {len(self.holdings)}
- 예수금(주문가능): {self.current_cash:,.0f}원 | D+2예수금: {self.d2_excc_amt:,.0f}"""
self.send_mm(msg)
self.closing_report_sent = True
logger.info("📈 장마감 전 리포트 전송 완료")
def send_final_report(self):
"""장마감 후 최종 리포트 (15:35)"""
if self.final_report_sent:
return
self._update_assets()
# 당일 손익
day_pnl = self.current_total_asset - self.start_day_asset
if self.start_day_asset > 0:
day_pnl_pct = day_pnl / self.start_day_asset * 100
else:
day_pnl_pct = 0
# 누적 손익
cumulative_pnl = self.current_total_asset - self.total_deposit
if self.total_deposit > 0:
cumulative_pnl_pct = cumulative_pnl / self.total_deposit * 100
else:
cumulative_pnl_pct = 0
# 오늘 거래 내역
today_trades = self.db.get_trades_by_date(self.today_date)
msg = f"""🏁 **[장마감 최종 보고 - 15:35]**
━━━━━━━━━━━━━━━━━━━━
📅 **당일 손익**
- 시작: {self.start_day_asset:,.0f}
- 종료: {self.current_total_asset:,.0f}
- 손익: {day_pnl:+,.0f}원 ({day_pnl_pct:+.2f}%)
💰 **누적 손익 (총 입금액 대비)**
- 총 입금: {self.total_deposit:,.0f}
- 현재 자산: {self.current_total_asset:,.0f}
- 누적 손익: {cumulative_pnl:+,.0f}원 ({cumulative_pnl_pct:+.2f}%)
📊 **거래 현황**
- 오늘 매매: {len(today_trades)}
- 보유 종목: {len(self.holdings)}
- 예수금(주문가능): {self.current_cash:,.0f}원 | D+2예수금: {self.d2_excc_amt:,.0f}
━━━━━━━━━━━━━━━━━━━━"""
self.send_mm(msg)
self.final_report_sent = True
logger.info("🏁 장마감 최종 리포트 전송 완료")
def _get_env_numeric_snapshot(self):
"""DB 최신 env에서 계좌/키/토큰/URL 제외한 수치·설정만 반환 (키=값 줄 단위)."""
EXCLUDE = {
"MM_SERVER_URL", "MM_BOT_TOKEN_", "MATTERMOST_CHANNEL", "GEMINI_API_KEY",
"KIS_APP_KEY_REAL", "KIS_APP_SECRET_REAL", "KIS_APP_KEY_MOCK", "KIS_APP_SECRET_MOCK",
"KIS_ACCOUNT_NO_REAL", "KIS_ACCOUNT_CODE_REAL", "KIS_ACCOUNT_NO_MOCK", "KIS_ACCOUNT_CODE_MOCK",
"KIS_SHORT_MM_CHANNEL", "KIS_LONG_MM_CHANNEL",
}
latest = self.db.get_latest_env()
if not latest or not latest.get("snapshot"):
return ""
snap = latest["snapshot"]
lines = []
for k, v in sorted(snap.items()):
if k in EXCLUDE or v is None:
continue
v = (v or "").strip()
if "#" in v:
v = v.split("#")[0].strip()
if not v:
continue
lines.append(f"{k}={v}")
return "\n".join(lines)
def _get_journalctl_recent(self, lines=200, unit=None):
"""journalctl 최근 N줄. unit 있으면 -u unit 적용 (예: kis_short_ver2.service)."""
cmd = ["journalctl", "-n", str(lines), "-o", "short-iso"]
if unit:
cmd = ["journalctl", "-u", unit, "-n", str(lines), "-o", "short-iso"]
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if r.returncode == 0 and r.stdout:
return r.stdout.strip()
except Exception as e:
logger.warning(f"⚠️ journalctl 조회 실패: {e}")
return ""
def send_ai_report(self):
"""AI 분석 리포트 (13:00) - DB 수치만 보여주고, 거래 내역으로 승률 분석·추천(설정수치=값 형식)."""
if self.ai_report_sent or not gemini_client:
return
try:
# DB에서 수치 설정만 (계좌/키/토큰 제외)
env_lines = self._get_env_numeric_snapshot()
# journalctl 최근 로그 (라인 수는 env/DB에서 로드)
log_lines = get_env_int("AI_JOURNAL_LINES", 500)
try:
log_lines_int = int(log_lines)
except Exception:
log_lines_int = 500
if log_lines_int <= 0:
log_lines_int = 500
journal_unit = os.environ.get("JOURNALCTL_UNIT", "kis_short_ver2.service").strip() or None
journal_log = self._get_journalctl_recent(lines=log_lines_int, unit=journal_unit)
if not journal_log:
journal_log = "(journalctl 로그 없음)"
# 최근 거래 내역 조회
recent_trades = []
try:
conn = self.db.conn
cursor = conn.execute("""
SELECT code, name, buy_price, sell_price, qty, profit_rate,
realized_pnl, strategy, sell_reason, buy_date, sell_date, hold_minutes
FROM trade_history
ORDER BY id DESC
LIMIT 10
""")
for row in cursor.fetchall():
recent_trades.append({
'code': row[0],
'name': row[1],
'buy_price': row[2],
'sell_price': row[3],
'qty': row[4],
'profit_rate': row[5],
'realized_pnl': row[6],
'strategy': row[7],
'sell_reason': row[8],
'buy_date': row[9],
'sell_date': row[10],
'hold_minutes': row[11] or 0
})
except Exception as e:
logger.error(f"거래 내역 조회 실패: {e}")
return
db_candidates = self.db.get_target_candidates()
candidate_count = len(db_candidates)
# 거래 내역 없을 때: 수치만 보여주고 유니버스·추천
if not recent_trades:
summary = f"""📊 **현재 상태**
- 유니버스 후보: {candidate_count}
- 최근 거래: 없음"""
prompt = f"""당신은 퀀트 트레이딩 전문가입니다.
**현재 상태**
- 유니버스 후보: {candidate_count}
- 최근 거래: 없음
**현재 DB 설정 수치 (일부만 표시됨, 계좌/키 등 제외)**
```
{env_lines}
```
**봇 최근 로그 (journalctl 최근 {log_lines_int}줄) 탈락 사유·API 과부하·매수 시도 등 참고**
```
{journal_log[:15000]}
```
**당신의 임무**
1. 위 설정과 후보 수({candidate_count}개)를 보고 문제점 분석 (필터가 너무 까다로운지 등).
2. **추천**: 반드시 아래 형식으로만 한 줄에 하나씩 작성. 이유·주석 붙이지 말 것. 그대로 DB에 복붙해 적용할 수 있어야 함.
- 예: MAX_STOCKS=4
- 예: MIN_DROP_RATE=0.025
- 예: MIN_RECOVERY_RATIO_SHORT=0.4
3. 예상 효과 한두 줄.
**출력 형식 (반드시 준수)**
## 🔍 문제점
1. [구체적 문제 1]
2. [구체적 문제 2]
## 💡 수치 추천 (한 줄에 하나, 그대로 DB 적용 가능하게)
MAX_STOCKS=4
MIN_DROP_RATE=0.025
(필요한 것만, 변수명은 위 설정 목록에 있는 것만 사용)
## 📈 예상 효과
- [효과 1]
"""
response = gemini_client.models.generate_content(model=GEMINI_MODEL_ID, contents=prompt)
analysis = getattr(response, "text", None) or (response.candidates[0].content.parts[0].text if response.candidates else "")
# 마지막 AI 추천문 저장 (!적용 시 매터모스트 원격 조종용)
_save_ai_recommendations_from_text(self.db, analysis)
message = f"""🤖 **[13시 AI 자동 분석 + 수치 추천]**
{summary}
{analysis}
---
💬 단타 전략 최적화. 추천 줄은 그대로 DB에 적용 가능합니다.
"""
self.send_mm(message)
self.ai_report_sent = True
logger.info("🤖 AI 리포트 전송 완료 (거래 내역 없음)")
return
# 통계 계산
total = len(recent_trades)
wins = sum(1 for t in recent_trades if t['profit_rate'] > 0)
losses = total - wins
win_rate = (wins / total * 100) if total > 0 else 0
avg_profit = sum(t['profit_rate'] for t in recent_trades) / total
total_pnl = sum(t['realized_pnl'] for t in recent_trades)
avg_hold = sum(t['hold_minutes'] for t in recent_trades) / total
trades_text = ""
for i, t in enumerate(recent_trades, 1):
trades_text += f"""
[거래 {i}] {t['name']} ({t['strategy']})
- 매수: {t['buy_price']:,.0f}× {t['qty']}주 | 매도: {t['sell_price']:,.0f}
- 손익: {t['profit_rate']:+.2f}% ({t['realized_pnl']:,.0f}원) | 보유: {t['hold_minutes']}
- 사유: {t['sell_reason']}
"""
prompt = f"""당신은 퀀트 트레이딩 전문가입니다.
**현재 상태**
- 유니버스 후보: {candidate_count}
- 최근 거래: {total}건 | 승률: {win_rate:.1f}% ({wins}{losses}패)
- 평균 수익률: {avg_profit:.2f}% | 총 손익: {total_pnl:,.0f}원 | 평균 보유: {avg_hold:.0f}
**최근 거래 내역**
{trades_text}
**현재 DB 설정 수치 (계좌/키 등 제외, 수치만)**
```
{env_lines}
```
**봇 최근 로그 (journalctl 최근 {log_lines_int}줄) 탈락 사유·API 과부하·매수 시도 등 참고**
```
{journal_log[:15000]}
```
**당신의 임무**
1. **승률이 왜 떨어졌는지** 거래 내역과 설정 수치를 보고 구체적으로 3가지 진단 (손절/진입/보유 등).
2. **추천**: 반드시 "설정수치=값" 한 줄에 하나만. 이유·주석 붙이지 말 것. 그대로 DB에 복붙해 적용할 수 있어야 함.
- 예: MAX_STOCKS=4
- 예: MIN_DROP_RATE=0.025
- 예: MIN_RECOVERY_RATIO_SHORT=0.4
변수명은 위 설정 목록에 있는 것만 사용.
3. 예상 효과 한두 줄.
**출력 형식 (반드시 준수)**
## 🔍 문제점 (승률 하락 원인)
1. [구체적 문제 1]
2. [구체적 문제 2]
3. [구체적 문제 3]
## 💡 수치 추천 (한 줄에 하나, 그대로 DB 적용)
MAX_STOCKS=4
MIN_DROP_RATE=0.025
(필요한 것만)
## 📈 예상 효과
- [효과 1]
"""
response = gemini_client.models.generate_content(model=GEMINI_MODEL_ID, contents=prompt)
analysis = getattr(response, "text", None) or (response.candidates[0].content.parts[0].text if response.candidates else "")
# 마지막 AI 추천문 저장 (!적용 시 매터모스트 원격 조종용)
_save_ai_recommendations_from_text(self.db, analysis)
summary = f"""📊 **현재 상태**
- 유니버스 후보: {candidate_count}
- 최근 거래: {total}건 | 승률: {win_rate:.1f}% ({wins}{losses}패)
- 평균 수익률: {avg_profit:.2f}% | 총 손익: {total_pnl:+,.0f}원 | 평균 보유: {avg_hold:.0f}"""
message = f"""🤖 **[13시 AI 자동 분석 + 수치 추천]**
{summary}
{analysis}
---
💬 단타 전략 최적화. 추천 줄은 그대로 DB에 적용 가능합니다.
"""
self.send_mm(message)
self.ai_report_sent = True
logger.info("🤖 AI 리포트 전송 완료")
except Exception as e:
logger.error(f"AI 리포트 생성 실패: {e}")
def _fetch_scan_universe_from_api(self, max_codes=500):
"""
KIS API로 스캔 대상 종목 리스트 조회 (6소스 각 100건 → 최대 500개).
- 거래량·거래대금·회전율·등락률(상승)·등락률(하락)·거래증가율 각 100건 합산 후 중복 제거.
- 리스트는 DB에 저장하지 않음. 스캔 끝난 뒤 후보만 DB에 한 번에 인서트.
Returns:
list[dict]: [{"code": "006자리", "name": "종목명"}, ...] (중복 제거, 최대 max_codes개)
"""
def _code_from_item(item):
code = (item.get("stk_cd") or item.get("mksc_shrn_iscd") or item.get("code") or "").strip()
return code if code and len(code) == 6 else None
def _name_from_item(item):
return (
(item.get("stk_nm") or item.get("prst_name") or item.get("hts_kor_isnm") or "").strip()
or ""
)
scan_list = []
seen = set()
# 1) 거래량순위 100개 (키움은 거래대금+회전율만 사용, KIS는 거래량+거래대금+회전율 3가지로 풀 확대)
try:
time.sleep(random.uniform(0.5, 1.0))
vol_list = self.client.get_volume_rank(market="J", limit=100)
for item in (vol_list or []):
c = _code_from_item(item)
if c and c not in seen:
seen.add(c)
scan_list.append({"code": c, "name": _name_from_item(item) or ""})
logger.info(f" 📡 [스캔유니버스] 거래량순위 API → {len(vol_list)}건 수신, 누적 {len(scan_list)}종목")
except Exception as e:
logger.warning(f" ⚠️ [스캔유니버스] 거래량순위 조회 실패: {e}")
# 2) 거래대금순위 100개 (키움과 동일 소스)
try:
time.sleep(random.uniform(0.5, 1.0))
val_list = self.client.get_trading_value_rank(market="J", limit=100)
for item in (val_list or []):
c = _code_from_item(item)
if c and c not in seen:
seen.add(c)
scan_list.append({"code": c, "name": _name_from_item(item) or ""})
logger.info(f" 📡 [스캔유니버스] 거래대금순위 API → {len(val_list)}건 수신, 누적 {len(scan_list)}종목")
except Exception as e:
logger.warning(f" ⚠️ [스캔유니버스] 거래대금순위 조회 실패: {e}")
# 3) 회전율순위 100개 (키움 개미털기 2번째 소스와 동일)
try:
time.sleep(random.uniform(0.5, 1.0))
turn_list = self.client.get_turnover_rank(market="J", limit=100)
for item in (turn_list or []):
c = _code_from_item(item)
if c and c not in seen:
seen.add(c)
scan_list.append({"code": c, "name": _name_from_item(item) or ""})
logger.info(f" 📡 [스캔유니버스] 회전율순위 API → {len(turn_list)}건 수신, 누적 {len(scan_list)}종목")
except Exception as e:
logger.warning(f" ⚠️ [스캔유니버스] 회전율순위 조회 실패: {e}")
# 4) 등락률순위(상승) 100개 (기존 후보군 풀 확대)
try:
time.sleep(random.uniform(0.5, 1.0))
chg_list = self.client.get_price_change_rank(market="J", sort_type="1", limit=100)
for item in (chg_list or []):
c = _code_from_item(item)
if c and c not in seen:
seen.add(c)
scan_list.append({"code": c, "name": _name_from_item(item) or ""})
logger.info(f" 📡 [스캔유니버스] 등락률순위(상승) API → {len(chg_list)}건 수신, 누적 {len(scan_list)}종목")
except Exception as e:
logger.warning(f" ⚠️ [스캔유니버스] 등락률순위(상승) 조회 실패: {e}")
# 4-2) 등락률순위(하락) 100개 — 낙폭 큰 종목 직접 조회 → N자 망치봉 스캔 효율·Pass-낙폭 포착
try:
time.sleep(random.uniform(0.5, 1.0))
decline_list = self.client.get_price_decline_rank(market="J", limit=100)
for item in (decline_list or []):
c = _code_from_item(item)
if c and c not in seen:
seen.add(c)
scan_list.append({"code": c, "name": _name_from_item(item) or ""})
logger.info(f" 📡 [스캔유니버스] 등락률순위(하락) API → {len(decline_list)}건 수신, 누적 {len(scan_list)}종목")
except Exception as e:
logger.warning(f" ⚠️ [스캔유니버스] 등락률순위(하락) 조회 실패: {e}")
# 5) 거래증가율순위 100개 (거래량/대금/회전율과 다른 풀 → 후보 다양화)
try:
time.sleep(random.uniform(0.5, 1.0))
growth_list = self.client.get_volume_growth_rank(market="J", limit=100)
for item in (growth_list or []):
c = _code_from_item(item)
if c and c not in seen:
seen.add(c)
scan_list.append({"code": c, "name": _name_from_item(item) or ""})
logger.info(f" 📡 [스캔유니버스] 거래증가율순위 API → {len(growth_list)}건 수신, 누적 {len(scan_list)}종목 (6소스 합산 → 개미털기 필터)")
except Exception as e:
logger.warning(f" ⚠️ [스캔유니버스] 거래증가율순위 조회 실패: {e}")
if not scan_list:
logger.warning(" ⚠️ [스캔유니버스] API에서 0건 수신 → 스캔 불가 (권한/계정/시간 확인)")
return []
scan_list = scan_list[:max_codes]
# 종목명 비어 있으면 시세 배치로 채우기 (KIS volume-rank는 종목명 미제공 시 많음)
need_name_codes = [x["code"] for x in scan_list[:20] if not (x.get("name") or "").strip()]
if need_name_codes:
try:
time.sleep(random.uniform(0.2, 0.4))
batch = self.client.inquire_prices_batch(need_name_codes[:20])
name_map = {}
_market_names = {"KOSPI", "KOSDAQ", "ETF", "KOSPI200", "KSQ150"}
for code, out in (batch or {}).items():
n = (out.get("stck_kor_isnm") or out.get("rprs_mrkt_kor_name") or "").strip()
if n and n not in _market_names:
name_map[code] = n
for x in scan_list:
if not (x.get("name") or "").strip() and x["code"] in name_map:
x["name"] = name_map[x["code"]]
except Exception as e:
logger.debug(f" 스캔 종목명 배치 조회 스킵: {e}")
for x in scan_list:
if not (x.get("name") or "").strip():
x["name"] = x["code"]
logger.info(
f" 📋 스캔 대상: {len(scan_list)}개 종목 (거래량·거래대금·회전율·등락률상승·등락률하락·거래증가율 각 100건 합산 → 개미털기 필터)"
)
if scan_list:
part = ", ".join(f"{x['code']} {x.get('name') or x['code']}" for x in scan_list[:15])
logger.info(f" 📋 스캔 대상(일부): {part}{' ...' if len(scan_list) > 15 else ''}")
return scan_list
def scan_ant_shaking_candidates(self, max_candidates=20):
"""
개미털기(눌림목) 후보 종목 스캔 - kiwoom_trader_dual 방식: 유니버스 리스트만 빠르게 채움.
- 스캔에서는 종목별 API 호출·낙폭/회복 필터·탈락 로그 없음.
- 낙폭/회복/3분봉/RSI/피뢰침/ML 전부 메인 루프의 check_buy_signal_tail_catch에서만 체크.
"""
logger.info("🐜 [개미털기] 스캔 시작 (유니버스 리스트만 등록 → 매수 시 한곳에서 전 조건 체크)")
top_n_light = get_env_int("CANDIDATE_LIST_TOP_N_LIGHT", 20)
max_codes = get_env_int("SCAN_UNIVERSE_MAX_CODES", 150)
scan_list = self._fetch_scan_universe_from_api(max_codes=max_codes)
if not scan_list:
logger.warning(" ⚠️ [개미털기] 스캔 대상 0개 → API에서 리스트를 받지 못함")
return []
n_save = min(top_n_light, len(scan_list))
result = []
for i, x in enumerate(scan_list[:n_save]):
name = (x.get("name") or x["code"]).strip() or x["code"]
# DB 저장은 update_universe()에서 강도 계산·정렬 후 강도순 상위 N명만 함 (여기서는 리스트만 반환)
result.append({
"code": x["code"],
"name": name,
"price": 0,
"score": 0,
"drop_rate": 0,
"recovery": 0,
})
part = ", ".join(f"{x.get('name') or x['code']}({x['code']})" for x in scan_list[:5])
logger.info(f" ✅ [개미털기] 후보 {n_save}명 수집 (강도·정렬·DB 반영은 유니버스 업데이트에서 일괄 처리) | 일부: {part}{' ...' if n_save > 5 else ''}")
return result
def calculate_atr(self, df, period=14):
"""
ATR (Average True Range) 계산 - 변동성 지표
- TR(True Range) = max(고가-저가, |고가-전일종가|, |저가-전일종가|)
- ATR = TR의 14일 이동평균
"""
try:
if df is None or len(df) < period:
return 0
df = df.copy()
# True Range 계산
high_low = df['high'] - df['low']
high_close = (df['high'] - df['close'].shift()).abs()
low_close = (df['low'] - df['close'].shift()).abs()
df['tr'] = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
# ATR = TR의 14일 이동평균
atr = df['tr'].rolling(window=period).mean().iloc[-1]
return float(atr) if not pd.isna(atr) and atr > 0 else 0
except Exception as e:
logger.debug(f"ATR 계산 실패: {e}")
return 0
def check_buy_signal_tail_catch(self, code: str, name: str):
"""
[3분봉 망치봉 무릎 타점] N자 차트에서 밑꼬리 망치봉이 나온 뒤 무릎~어깨 구간에서만 매수.
- 피뢰침(고점 추격·급등주)은 여기서만 적용 (스캔에서는 후보만 넓게 수집).
- 수치는 전부 get_env_float / get_env_int 등 env·DB 사용.
"""
try:
# [테스트용] DB에서 FORCE_BUY_TEST=true 시 조건 무시하고 현재가로 시그널 반환 (매수 체결 테스트 후 반드시 false로 복구)
if get_env_bool("FORCE_BUY_TEST", False):
try:
price_data = self.client.inquire_price(code)
current_price = abs(float(str(price_data.get("stck_prpr", 0)).replace(",", ""))) if price_data else 0
if current_price <= 0:
return None
logger.info(f"{LOG_CYAN}🧪 [테스트] FORCE_BUY_TEST=true → 조건 건너뛰고 시그널 반환 {name} {code} @ {current_price:,.0f}{LOG_RESET}")
return {
"code": code,
"name": name,
"price": current_price,
"score": 5.0,
"entry_features": {},
}
except Exception as e:
logger.warning(f"🧪 FORCE_BUY_TEST 현재가 조회 실패: {e}")
return None
# ── 시장 방향 필터 (USE_MARKET_REGIME_FILTER=true 시 활성) ──
# KODEX200/KOSDAQ150 60분봉 RSI로 상승장 확인 → 하락장이면 롱 진입 차단
if get_env_bool("USE_MARKET_REGIME_FILTER", False):
min_rsi = get_env_float("MARKET_REGIME_MIN_RSI", 48.0)
regime = self.db.get_market_regime(tf=60)
if not regime.get("is_bull") or regime.get("avg_rsi", 50) < min_rsi:
logger.debug(
"[시장필터] %s %s: 하락장 차단 (ETF RSI=%.1f < %.1f)",
code, name, regime.get("avg_rsi", 0), min_rsi,
)
return None
# ── 테마 과열 필터 (USE_THEME_HEAT_FILTER=true 시 활성) ──
if get_env_bool("USE_THEME_HEAT_FILTER", False):
heat_max = get_env_float("THEME_HEAT_RSI_MAX", 72.0)
meta = self.db.get_stock_meta(code)
if meta and meta.get("theme"):
momentum = self.db.get_theme_momentum(meta["theme"], tf=60)
if momentum.get("count", 0) >= 3 and momentum.get("avg_rsi3", 0) > heat_max:
logger.debug(
"[테마필터] %s %s: 테마(%s) 과열 차단 (RSI=%.1f > %.1f)",
code, name, meta["theme"],
momentum["avg_rsi3"], heat_max,
)
return None
min_candle_len = get_env_int("MIN_CANDLE_LEN_TAIL", 14)
min_price_tail = get_env_float("MIN_PRICE_TAIL", 1000.0)
# [WebSocket 우선] 메모리 봉 사용 → WS 미활성 시 REST fallback
df = self._get_candles_df(code, tf=3, n=20)
if df is None or df.empty:
logger.debug("📡 [%s] WS봉 없음 → REST 3분봉 fallback", code)
df = self.client.get_minute_chart(code, period="3", limit=20)
if df is None or df.empty or len(df) < min_candle_len:
logger.info(f"{LOG_YELLOW}🔍 [탈락-3분봉] {name} {code}: 봉수 부족 (len={len(df) if df is not None and not df.empty else 0}, 기준 {min_candle_len}){LOG_RESET}")
return None
if "close" not in df.columns or "high" not in df.columns or "low" not in df.columns or "open" not in df.columns:
logger.info(f"{LOG_YELLOW}🔍 [탈락-3분봉] {name} {code}: OHLC 컬럼 없음{LOG_RESET}")
return None
current_price = float(df["close"].iloc[-1])
candle = df.iloc[-1]
candle_open = float(candle["open"])
candle_high = float(candle["high"])
candle_low = float(candle["low"])
candle_close = float(candle["close"])
# 분봉 마지막 봉 close=0인 경우 (장 마감/미체결 봉 등) → WS 캐시 우선 보정, fallback=REST
if current_price <= 0 or candle_close <= 0:
try:
ws_price = self.ws_cache.get_price(code) if self.ws_cache else None
if ws_price:
p = abs(float(str(ws_price.get("stck_prpr", 0)).replace(",", "")))
if p > 0:
current_price = p
candle_close = p
df = df.copy()
df.loc[df.index[-1], "close"] = p
else:
price_data = self.client.inquire_price(code)
if price_data:
current_price = abs(float(str(price_data.get("stck_prpr", 0)).replace(",", "")))
if current_price > 0:
candle_close = current_price
df = df.copy()
df.loc[df.index[-1], "close"] = current_price
except Exception as e:
logger.debug(f"현재가 보정 실패({code}): {e}")
if candle_open <= 0 and len(df) >= 2:
candle_open = float(df["open"].iloc[-2])
if candle_open <= 0:
candle_open = float(df["open"].iloc[0])
if candle_open <= 0 or current_price < min_price_tail:
logger.info(f"{LOG_YELLOW}🔍 [탈락-가격] {name} {code}: 시가/현재가 부적절 (현재 {current_price:,.0f}원, 최소 {min_price_tail:,.0f}){LOG_RESET}")
return None
# 당일 시가·고가·저가 확보 우선순위:
# 1순위: WebSocket 캐시 (H0STCNT0 틱에 OPRC/HGPR/LWPR 포함 → REST 호출 없음)
# 2순위: 분봉 DataFrame (캐시 없을 때)
# 3순위: REST inquire_price (분봉만으로 부족한 경우 최후 fallback)
day_open = float(df["open"].iloc[0])
day_high = float(df["high"].max())
# 저가가 0인 분봉(비정상 값) 때문에 낙폭이 100%로 계산되는 것을 방지
try:
lows = df["low"]
valid_lows = lows[lows > 0]
day_low = float(valid_lows.min()) if not valid_lows.empty else float(lows.min())
except Exception:
day_low = float(df["low"].min())
try:
# 1순위: WS 캐시에 당일 시고저 있으면 REST 호출 생략 (API 과부하 방지)
ws_data = self.ws_cache.get_price(code) if self.ws_cache else None
if ws_data:
ws_open = abs(float(str(ws_data.get("stck_oprc", 0)).replace(",", "")))
ws_high = abs(float(str(ws_data.get("stck_hgpr", 0)).replace(",", "")))
ws_low = abs(float(str(ws_data.get("stck_lwpr", 0)).replace(",", "")))
if ws_open > 0: day_open = ws_open
if ws_high > 0: day_high = ws_high
if ws_low > 0: day_low = ws_low
else:
# 2순위 fallback: REST inquire_price (WS 캐시 없거나 만료된 경우에만)
today_price_data = self.client.inquire_price(code)
if today_price_data:
api_open = abs(float(str(today_price_data.get("stck_oprc", 0)).replace(",", "")))
api_high = abs(float(str(today_price_data.get("stck_hgpr", 0)).replace(",", "")))
api_low = abs(float(str(today_price_data.get("stck_lwpr", 0)).replace(",", "")))
if api_open > 0: day_open = api_open
if api_high > 0: day_high = api_high
if api_low > 0: day_low = api_low
except Exception as e:
logger.debug(f"일일 시고저 보정 실패({code}): {e}")
if day_open > 0 and day_low > 0:
drop_rate = (day_open - day_low) / day_open
else:
drop_rate = 0
day_range = day_high - day_low
recovery_pos_day = (current_price - day_low) / day_range if day_range > 0 else 0
if drop_rate < self.min_drop_rate:
logger.info(
f"{LOG_YELLOW}🔍 [탈락-낙폭] {name} {code}: 낙폭 {drop_rate*100:.2f}% < {self.min_drop_rate*100:.1f}% "
f"(당일 시가 {day_open:,.0f}원 → 저점 {day_low:,.0f}원){LOG_RESET}"
)
return None
if recovery_pos_day < self.min_recovery_ratio:
logger.info(
f"{LOG_YELLOW}🔍 [탈락-회복률] {name} {code}: 회복률 {recovery_pos_day*100:.1f}% < {self.min_recovery_ratio*100:.0f}% "
f"(저점 {day_low:,.0f}원 → 현재 {current_price:,.0f}원 / 범위 {day_range:,.0f}원){LOG_RESET}"
)
return None
logger.info(f"{LOG_GREEN}🔍 [통과-낙폭·회복] {name} {code} → 꼬리/피뢰침/RSI 체크{LOG_RESET}")
# [핵심] 망치봉(밑꼬리) 계산: 꼬리 길이 / 몸통 길이
# 장 외에는 마지막 봉이 전일 마감봉(open=high=low=close)이라 꼬리 0 → 역순으로 밑꼬리 있는 최근 봉 사용
tail_lookback = get_env_int("TAIL_CANDLE_LOOKBACK", 5)
body_top = max(candle_open, candle_close)
body_bottom = min(candle_open, candle_close)
body_length = body_top - body_bottom
tail_length = body_bottom - candle_low
if tail_length < 0:
tail_length = 0.0
if body_length <= 0:
body_length = 1.0
tail_ratio = tail_length / body_length
tail_pct = (tail_length / candle_low) if (candle_low > 0 and tail_length > 0) else 0
# 마지막 봉에 밑꼬리가 없으면(장외/도지) 최근 N개 봉 중 밑꼬리 있는 봉으로 재계산
if tail_length <= 0 and len(df) >= 2:
for idx in range(len(df) - 2, max(-1, len(df) - 1 - tail_lookback), -1):
c = df.iloc[idx]
o, h, l, cl = float(c["open"]), float(c["high"]), float(c["low"]), float(c["close"])
if l <= 0:
continue
bt = max(o, cl)
bb = min(o, cl)
bl = bt - bb
tl = bb - l
if tl <= 0:
continue
if bl <= 0:
bl = 1.0
tail_ratio = tl / bl
tail_pct = (tl / l) if l > 0 else 0
tail_length = tl
break
tail_ratio_min = get_env_float("TAIL_RATIO_MIN", 1.5)
tail_pct_min = get_env_float("TAIL_PCT_MIN", 0.003)
if tail_ratio < tail_ratio_min or tail_pct < tail_pct_min:
logger.info(f"{LOG_YELLOW}🔍 [탈락-꼬리] {name} {code}: 꼬리비율 {tail_ratio:.2f} (기준 {tail_ratio_min}) 또는 꼬리% {tail_pct*100:.2f}% (기준 {tail_pct_min*100:.2f}%){LOG_RESET}")
return None
# [무릎 타점] 그 3분 봉 안에서의 회복률 (무릎~어깨 직전만) — 항상 마지막 봉 기준
total_range = candle_high - candle_low
recovery_pos = (current_price - candle_low) / total_range if total_range > 0 else 0
max_rec_3m = get_env_float("MAX_RECOVERY_RATIO_3M", 0.8)
if not (self.min_recovery_ratio <= recovery_pos <= max_rec_3m):
logger.info(f"{LOG_YELLOW}🔍 [탈락-회복3분] {name} {code}: 3분봉 회복률 {recovery_pos*100:.1f}% (기준 {self.min_recovery_ratio*100:.0f}~{max_rec_3m*100:.0f}%){LOG_RESET}")
return None
# [매수체크 전용] 피뢰침 - 고점 추격 방지 (당일 API 보정된 day_high/day_low 사용)
high_chase_threshold = get_env_float("HIGH_PRICE_CHASE_THRESHOLD", 0.96)
if current_price >= day_high * high_chase_threshold:
logger.info(f"{LOG_YELLOW}🔍 [탈락-피뢰침 고점추격] {name} {code}: 현재가 {current_price:,.0f} ≥ 고점대비 {high_chase_threshold*100:.0f}%{LOG_RESET}")
return None
# [매수체크 전용] 피뢰침 - 급등주 제외 (당일 API 보정된 day_high/day_low 사용)
if day_low > 0:
range_change_pct = (day_high - day_low) / day_low * 100
else:
range_change_pct = 0
max_daily_change = get_env_float("MAX_DAILY_CHANGE_PCT", 20.0)
if range_change_pct > max_daily_change:
logger.info(f"{LOG_YELLOW}🔍 [탈락-피뢰침 급등주] {name} {code}: 일일 변동폭 {range_change_pct:.1f}% > {max_daily_change:.0f}%{LOG_RESET}")
return None
# RSI 과열 방지 (수치: env)
# RSI_PERIOD 는 get_minute_chart 에서 계산 시 사용한 기간과 동일해야 함
rsi_val = 50.0
rsi_period = get_env_int("RSI_PERIOD", 14)
if "RSI" in df.columns and len(df) >= rsi_period:
rsi_val = float(df["RSI"].iloc[-1])
rsi_threshold = get_env_float("RSI_OVERHEAT_THRESHOLD", 78.0)
if rsi_val >= rsi_threshold:
logger.info(f"{LOG_YELLOW}🔍 [탈락-RSI] {name} {code}: RSI {rsi_val:.1f}{rsi_threshold:.0f}{LOG_RESET}")
return None
# MA20: 20일선 아래면 역배열로 패스 (수치: env)
ma5_gap_pct_val, ma20_gap_pct_val, volume_ratio_val = None, None, None
if "MA20" in df.columns and len(df) >= 20:
ma20 = float(df["MA20"].iloc[-1])
if current_price < ma20:
logger.info(f"{LOG_YELLOW}🔍 [탈락-MA20] {name} {code}: 현재가 {current_price:,.0f} < MA20 {ma20:,.0f}{LOG_RESET}")
return None
ma20_cap_pct = get_env_float("MA20_MAX_ABOVE_PCT", 3.0)
if ma20 > 0 and current_price > ma20 * (1 + ma20_cap_pct / 100):
logger.info(f"{LOG_YELLOW}🔍 [탈락-MA20초과] {name} {code}: MA20 대비 {ma20_cap_pct:.0f}% 초과{LOG_RESET}")
return None
ma20_gap_pct_val = (current_price - ma20) / ma20 * 100 if ma20 > 0 else None
if "MA5" in df.columns and len(df) >= 5:
ma5 = float(df["MA5"].iloc[-1])
ma5_gap_pct_val = (current_price - ma5) / ma5 * 100 if ma5 and float(ma5) > 0 else None
if "volume" in df.columns and df["volume"].mean() > 0:
volume_ratio_val = float(df["volume"].iloc[-1] / df["volume"].mean())
investor_trend = self.client.get_investor_trend(code, days=3)
entry_features = {
"rsi": rsi_val,
"volume_ratio": volume_ratio_val,
"tail_length_pct": tail_pct * 100,
"ma5_gap_pct": ma5_gap_pct_val,
"ma20_gap_pct": ma20_gap_pct_val,
"foreign_net_buy": investor_trend.get("foreign_net_buy", 0) if investor_trend else 0,
"institution_net_buy": investor_trend.get("org_net_buy", 0) if investor_trend else 0,
"market_hour": dt.now().hour,
}
# ML 승률 (설정 시 env/DB 기준)
if self.use_ml_signal and self.ml_predictor:
try:
ml_features = {
"rsi": entry_features["rsi"],
"volume_ratio": entry_features["volume_ratio"] if entry_features["volume_ratio"] is not None else 1.0,
"tail_length_pct": entry_features["tail_length_pct"],
"ma5_gap_pct": entry_features["ma5_gap_pct"] if entry_features["ma5_gap_pct"] is not None else 0.0,
"ma20_gap_pct": entry_features["ma20_gap_pct"] if entry_features["ma20_gap_pct"] is not None else 0.0,
"foreign_net_buy": entry_features["foreign_net_buy"],
"institution_net_buy": entry_features["institution_net_buy"],
"market_hour": entry_features["market_hour"],
}
ml_prob = self.ml_predictor.predict_win_probability(ml_features)
if ml_prob < self.ml_min_probability:
logger.info(f"{LOG_YELLOW}🔍 [탈락-ML] {name} {code}: 승률 {ml_prob:.2%} < {self.ml_min_probability:.0%}{LOG_RESET}")
return None
except Exception:
pass
score_base = get_env_float("TAIL_SCORE_BASE", 5.0)
score_ratio_mult = get_env_float("TAIL_SCORE_RATIO_MULT", 2.0)
score = score_base + (tail_ratio * score_ratio_mult)
logger.info(
f"{LOG_CYAN}🎯 [무릎 타점] {name} | 가격:{current_price:,.0f} | "
f"꼬리비율:{tail_ratio:.1f}배 | 회복:{recovery_pos*100:.0f}% | RSI:{rsi_val:.1f}{LOG_RESET}"
)
return {
"code": code,
"name": name,
"price": current_price,
"score": score,
"entry_features": entry_features,
}
except Exception as e:
logger.info(f"{LOG_YELLOW}🔍 [탈락-예외] {name} {code}: {e}{LOG_RESET}")
return None
def check_sell_signals(self):
"""
매도 신호 체크 (ATR 기반 변동성 매도 로직)
- 어깨 매도 (고점 대비 SHOULDER_CUT_PCT 하락, 단 고점이 매수가+SHOULDER_MIN_HIGH_PCT 이상일 때만 적용)
- ATR 기반 스캘핑 (본절사수/익절보존)
- 순수익 보존
- 목표가/손절가
"""
if not self.holdings:
return []
sell_signals = []
min_hold_sec = get_env_float("MIN_HOLD_AFTER_BUY_SEC", 30.0) # 매수 직후 N초 간 매도 신호 무시 (모의투자 잔고 반영 지연 대응)
for code, holding in list(self.holdings.items()):
try:
name = holding.get("name", code)
buy_price = holding["buy_price"]
buy_time_str = holding.get("buy_time", "")
qty = holding["qty"]
# 매수 직후 최소 보유 시간 미만이면 매도 체크 스킵 (모의투자 "잔고내역이 없습니다" 방지)
if buy_time_str and min_hold_sec > 0:
try:
buy_time = dt.strptime(buy_time_str, "%Y-%m-%d %H:%M:%S")
if (dt.now() - buy_time).total_seconds() < min_hold_sec:
continue
except Exception:
pass
# 현재가 조회 — WebSocket 캐시 우선, 없으면 REST fallback
# ws_cache.get_price() 는 5초 이내 수신된 tick 데이터만 유효 취급
price_data = None
_used_ws = False
if self.ws_cache and self.ws_cache.is_active:
price_data = self.ws_cache.get_price(code)
if price_data:
_used_ws = True
if not price_data:
# WebSocket 미연결·캐시 만료 → REST (기존 방식 그대로)
price_data = self.client.inquire_price(code)
if not price_data:
continue
current_price = abs(float(price_data.get("stck_prpr", 0)))
if current_price == 0:
continue
# 고점 갱신 (holdings에 저장)
max_price = holding.get("max_price", buy_price)
if current_price > max_price:
max_price = current_price
self.holdings[code]["max_price"] = max_price
# 손익률 계산
profit_pct = (current_price - buy_price) / buy_price if buy_price > 0 else 0
profit_val = (current_price - buy_price) * qty
# ATR 조회 (DB 또는 재계산)
atr = holding.get("atr_entry", 0)
if atr == 0:
# ATR 재계산 — WS 메모리 봉 우선, 없으면 REST fallback
try:
df = self._get_candles_df(code, tf=3, n=20)
if df is None or df.empty:
df = self.client.get_minute_chart(code, period="3", limit=20)
if df is not None and not df.empty:
atr = self.calculate_atr(df)
if atr > 0:
self.holdings[code]["atr_entry"] = atr
except Exception as e:
logger.debug(f"ATR 조회 실패({code}): {e}")
atr = buy_price * 0.01 # 기본값 1%
# 손절가/목표가 (ATR 기반 또는 기본값)
stop_price = holding.get("stop_price", buy_price * (1 + self.stop_loss_pct))
target_price_atr = holding.get("target_price", buy_price * (1 + self.take_profit_pct))
target_price_pct = buy_price * (1 + self.take_profit_pct) # 퍼센트 기반 목표가
# ATR 기반 손절가/목표가 계산
if atr > 0:
stop_price = buy_price - (atr * self.stop_atr_multiplier)
target_price_atr = buy_price + (atr * self.target_atr_multiplier)
# 목표가: ATR 기반과 퍼센트 기반 중 더 작은 값 사용 (둘 다 체크)
target_price = min(target_price_atr, target_price_pct)
# 매도 사유 판단
sell_reason = None
# ==========================================================
# [1] 어깨 매도 (Shoulder Cut) - 최우선! (개선안 B)
# 고점이 매수가 대비 일정 % 이상 올랐을 때만 적용.
# 고점(max_price) 대비 N% 하락 시 수익 구간에서만 익절 보존 목적로 매도.
# (고점이 매수가 근처면 손실로 어깨매도되는 것 방지)
#
# 수수료 반영: 어깨매도 체결가는 고점 대비 약 (1 - SHOULDER_CUT_PCT) 수준.
# 수수료(ROUND_TRIP_COST_PCT) 차감 후 SHOULDER_MIN_NET_PCT(예: 0.1%) 이상 남기려면
# 고점이 매수가 대비 최소 (1 + round_trip + min_net) / (1 - shoulder_cut) - 1 이상이어야 함.
# → 아래 min_high_required 로 하한을 보장하고, env 값과 max 취해 사용.
# ==========================================================
shoulder_cut_pct = get_env_float("SHOULDER_CUT_PCT", 0.03)
shoulder_min_high_pct = get_env_float("SHOULDER_MIN_HIGH_PCT", 0.01)
round_trip_cost_pct = get_env_float("ROUND_TRIP_COST_PCT", 0.0026)
shoulder_min_net_pct = get_env_float("SHOULDER_MIN_NET_PCT", 0.001)
if shoulder_cut_pct < 1.0 and (1.0 - shoulder_cut_pct) > 0:
min_high_required = (1.0 + round_trip_cost_pct + shoulder_min_net_pct) / (1.0 - shoulder_cut_pct) - 1.0
shoulder_min_high_pct = max(shoulder_min_high_pct, min_high_required)
drop_from_high = (max_price - current_price) / max_price if max_price > 0 else 0
high_above_entry = (max_price - buy_price) / buy_price if buy_price > 0 else 0
if drop_from_high >= shoulder_cut_pct and high_above_entry >= shoulder_min_high_pct:
sell_reason = "어깨매도"
# ==========================================================
# [2] 금액 기준 손절 (원 단위)
# 퍼센트는 작아도 물량이 크면 원화 손실이 커져서 걸림. 0이면 비활성화.
# ==========================================================
max_loss_per_trade_krw = get_env_int("MAX_LOSS_PER_TRADE_KRW", 200000)
if max_loss_per_trade_krw > 0 and not sell_reason and profit_val <= -max_loss_per_trade_krw:
sell_reason = "금액손실컷"
# ==========================================================
# [3] ATR 기반 스캘핑 로직 (수치: env/DB)
# ==========================================================
if not sell_reason and atr > 0:
scalp_up = get_env_float("SCALP_ATR_UP_MULT", 1.0)
scalp_down = get_env_float("SCALP_ATR_DOWN_MULT", 0.2)
scalp_drop = get_env_float("SCALP_ATR_DROP_MULT", 1.0)
if (max_price >= buy_price + atr * scalp_up) and (current_price <= buy_price + atr * scalp_down):
sell_reason = "스캘핑_본절사수"
if not sell_reason and current_price < (max_price - atr * scalp_drop) and profit_pct > 0:
sell_reason = "스캘핑_익절보존"
# 보유 시간 계산 (N자 패턴 등 다음날 상승 가능성 있는 종목 24시간 보유)
hours_passed = 0
if buy_time_str:
try:
buy_time = dt.strptime(buy_time_str, "%Y-%m-%d %H:%M:%S")
hours_passed = (dt.now() - buy_time).total_seconds() / 3600
except:
pass
# ==========================================================
# [4] 빠른 익절 보호 (매수 후 N시간 이내, 수치: env/DB)
# ==========================================================
if not sell_reason and hours_passed > 0:
use_quick_profit = get_env_bool("USE_QUICK_PROFIT_PROTECTION", True)
quick_hours = get_env_float("QUICK_PROFIT_PROTECT_HOURS", 0.5)
quick_max_ratio = get_env_float("QUICK_PROFIT_MAX_RATIO", 1.005)
quick_current_min = get_env_float("QUICK_PROFIT_CURRENT_MIN", 1.0015)
if use_quick_profit and hours_passed < quick_hours:
if max_price >= buy_price * quick_max_ratio and current_price <= buy_price * quick_current_min:
sell_reason = "💨 작은수익보호"
# ==========================================================
# [5] 24시간 보유 전략 (수치: env/DB)
# ==========================================================
min_hold_hours = get_env_float("MIN_HOLD_HOURS", 24.0)
early_take_pct = get_env_float("MIN_HOLD_EARLY_TAKE_PCT", 0.05)
hold_high_pct = get_env_float("MIN_HOLD_HIGH_PCT", 1.07)
hold_drop_from_high = get_env_float("MIN_HOLD_DROP_FROM_HIGH", 0.97)
post_hold_take_pct = get_env_float("POST_HOLD_TAKE_PCT", 0.02)
post_hold_drop = get_env_float("POST_HOLD_DROP_FROM_HIGH", 0.97)
if not sell_reason and hours_passed > 0:
if hours_passed < min_hold_hours:
if profit_pct > early_take_pct:
sell_reason = f"💰 {hours_passed:.1f}시간내 {early_take_pct*100:.0f}%+ 익절"
elif max_price >= buy_price * hold_high_pct and current_price <= max_price * hold_drop_from_high:
sell_reason = f"📈 {hours_passed:.1f}시간내 고점→하락"
else:
if profit_pct > post_hold_take_pct:
sell_reason = f"{hours_passed:.1f}시간 경과 {post_hold_take_pct*100:.0f}%+ 익절"
elif profit_pct > 0 and current_price < max_price * post_hold_drop:
sell_reason = f"{hours_passed:.1f}시간 경과 익절보호"
# ==========================================================
# [6] 목표가 달성 및 손절 (24시간 보유 전략과 별개로 항상 체크)
# ==========================================================
if not sell_reason:
# 목표가 달성 체크 (ATR 기반 또는 퍼센트 기반 중 먼저 도달한 것)
if current_price >= target_price:
if current_price >= target_price_atr and current_price >= target_price_pct:
sell_reason = "목표달성(ATR+퍼센트)"
elif current_price >= target_price_atr:
sell_reason = "목표달성(ATR)"
else:
sell_reason = "목표달성(퍼센트)"
elif current_price <= stop_price:
sell_reason = "전략손절"
elif profit_pct <= self.stop_loss_pct:
sell_reason = f"칼손절({profit_pct * 100:.1f}%)"
elif profit_pct >= self.take_profit_pct:
sell_reason = "익절(퍼센트)"
# 매도 신호 추가
if sell_reason:
sell_signals.append({
"code": code,
"name": name,
"reason": sell_reason,
"profit_pct": profit_pct,
"qty": qty,
"price": current_price,
})
# WebSocket으로 가격을 받아온 경우 REST API 딜레이 불필요.
# REST fallback 시에는 기존과 동일하게 딜레이 적용 (과부하 방지).
if not _used_ws:
time.sleep(random.uniform(0.3, 0.7))
except Exception as e:
logger.error(f"매도 신호 체크 실패({code}): {e}")
continue
return sell_signals
def execute_buy(self, signal):
"""
매수 실행
- 키움 봇과 동일하게 대형주/소형주 비율 맞추는 로직 포함
- 대형주: 기본 금액 100% / 중형주: 85% / 소형주: 70%
- 켈리 기반 매수 금액 + 종목당 최대 15% 제한
"""
code = signal["code"]
name = signal["name"]
price = signal["price"]
# 이미 보유 중이면 스킵
if code in self.holdings:
logger.warning(f"⚠️ [{name}] 이미 보유 중 -> 매수 스킵")
return False
# 최대 보유 종목 수 체크
if len(self.holdings) >= self.max_stocks:
logger.warning(f"⚠️ 최대 보유 종목 수 도달 ({self.max_stocks}개)")
return False
# 🔥 매수 직전 예수금 실시간 확인
if not self._update_account_light(profit_val=0):
logger.warning(f"⚠️ [{name}] 예수금 조회 실패 -> 매수 스킵")
return False
# ============================================================
# [대/중/소형주 구분] - 일봉 거래대금 평균 (키움 봇과 동일)
# ============================================================
size_class = None
try:
df = self.client.get_daily_chart(code, limit=10)
if not df.empty and "volume" in df.columns and "close" in df.columns:
trade_values = df["volume"] * df["close"]
avg_trade_value = trade_values.mean()
large_min = get_env_float("SIZE_CLASS_LARGE_MIN", 5000000000) # 50억 (대형주)
mid_min = get_env_float("SIZE_CLASS_MID_MIN", 500000000) # 5억 (중형주)
if avg_trade_value >= large_min:
size_class = ""
elif avg_trade_value >= mid_min:
size_class = ""
else:
size_class = ""
logger.info(
f"📊 [{name}] 거래대금 평균 {avg_trade_value/1e8:.1f}억원 → {size_class}형주"
)
except Exception as e:
logger.debug(f"대/중/소형 조회 스킵({code}): {e}")
# ============================================================
# [매수 금액] 변동성 역가중 (Volatility Inverse Weighting)
# ============================================================
# ATR 계산용 분봉 데이터 — WS 메모리 봉 우선, 없으면 REST fallback
df_minute = None
try:
df_minute = self._get_candles_df(code, tf=3, n=20)
if df_minute is None or df_minute.empty:
df_minute = self.client.get_minute_chart(code, period="3", limit=20)
except Exception as e:
logger.debug(f"분봉 조회 실패({code}): {e}")
# RiskManager 사용 시: 변동성 역가중으로 매수 금액 계산
if self.risk_mgr is not None:
# 켈리 비율 (DB에서 계산, 없으면 None)
kelly_fraction = None
if self.risk_mgr.use_kelly:
try:
kelly_fraction = self.db.calculate_half_kelly()
except Exception as e:
logger.debug(f"켈리 비율 계산 스킵: {e}")
# 변동성 역가중 매수 금액 계산
amount = self.risk_mgr.get_position_size(
stock_name=name,
current_balance=self.current_cash,
df=df_minute, # ATR 계산용 분봉 데이터
kelly_fraction=kelly_fraction,
size_class=size_class, # 대/중/소형 구분
)
if amount <= 0:
logger.warning(f"⚠️ [{name}] RiskManager 계산 금액 0원 -> 매수 스킵")
return False
# 수량 계산 (수수료 고려)
qty = self.risk_mgr.calculate_quantity(price, amount)
else:
# 폴백: 기존 고정 슬롯 방식 (RiskManager 미사용 시, 수치: env/DB)
slot_default = int(get_env_float("SLOT_MONEY_DEFAULT", 100000.0))
if self.max_stocks > 0:
slot_money = int(self.current_cash * 0.9 / self.max_stocks)
else:
slot_money = slot_default
base_amount = slot_money
# 슬롯 캡(손절 기반 상한) 적용: MAX_LOSS_PER_TRADE_KRW / |STOP_LOSS_PCT| 와 SLOT_BASE_AMOUNT_CAP 중 작은 값
effective_slot_cap = self._get_effective_slot_cap()
if effective_slot_cap > 0:
base_amount = min(base_amount, int(effective_slot_cap))
if self.stop_loss_pct != 0:
stop_pct_abs = abs(self.stop_loss_pct)
else:
stop_pct_abs = 0.04
if stop_pct_abs > 0:
kelly_risk_amount = self.current_cash * self.risk_pct_per_trade * self.kelly_multiplier
kelly_based_amount = int(kelly_risk_amount / stop_pct_abs)
base_amount = min(base_amount, kelly_based_amount)
small_ratio = get_env_float("SIZE_CLASS_SMALL_RATIO", 0.7)
mid_ratio = get_env_float("SIZE_CLASS_MID_RATIO", 0.85)
if size_class == "":
amount = int(base_amount * small_ratio)
logger.info(f"💰 [{name}] 소형주 → 매수 금액 {small_ratio*100:.0f}%: {amount:,.0f}")
elif size_class == "":
amount = int(base_amount * mid_ratio)
logger.info(f"💰 [{name}] 중형주 → 매수 금액 {mid_ratio*100:.0f}%: {amount:,.0f}")
else:
amount = base_amount
max_limit = int(self.current_cash * self.max_position_pct)
if amount > max_limit:
logger.info(f"📐 [{name}] 최대 포지션 제한: {amount:,.0f}원 → {max_limit:,.0f}")
amount = max_limit
amount = max(amount, self.min_position_amount)
qty = int(amount / price)
if qty <= 0:
logger.warning(f"⚠️ [{name}] 매수 수량 0 (가격: {price:,.0f}원, 금액: {amount:,.0f}원)")
return False
required_amount = price * qty * 1.05
if self.current_cash < required_amount:
logger.warning(
f"⚠️ [{name}] 예수금 부족: 필요 {required_amount:,.0f}원 / "
f"보유 {self.current_cash:,.0f}원 -> 매수 스킵"
)
return False
# ATR 계산 (변동성 기반 손절가/목표가 설정용)
# df_minute는 위에서 이미 조회했으므로 재사용
atr = 0
stop_price = price * (1 + self.stop_loss_pct)
target_price = price * (1 + self.take_profit_pct)
if df_minute is not None and not df_minute.empty:
try:
atr = self.calculate_atr(df_minute)
if atr > 0:
# ATR 기반 손절가/목표가 설정
stop_price = price - (atr * self.stop_atr_multiplier)
target_price = price + (atr * self.target_atr_multiplier)
logger.info(f"📊 [{name}] ATR 기반 손절가/목표가: ATR={atr:.0f}원, 손절={stop_price:,.0f}원, 목표={target_price:,.0f}")
except Exception as e:
logger.debug(f"ATR 계산 스킵({code}): {e}")
atr = price * 0.01 # 기본값 1%
else:
atr = price * 0.01 # 기본값 1%
odno = self.client.buy_market_order(code, qty)
if not odno:
# 매매불가 종목이면 당일 제외 목록에 넣고 다음 후보로 넘어가기
msg_cd = getattr(self.client, "_last_order_msg_cd", None) or ""
msg1 = getattr(self.client, "_last_order_msg1", "") or ""
if msg_cd == "40070000" or "매매불가" in msg1:
self.untradable_skip_set.add(code)
logger.warning(f"🚫 [{name}] 매매불가 종목 -> 당일 매수 제외, 다음 후보로 넘어감")
return False
# 주문 접수 후 체결 조회 API로 실제 체결가/체결수량 확인 (주문만 보고 가정하지 않음)
fill = self.client.get_execution_by_odno(odno, code=code, wait_sec=2)
if fill:
price = fill["avg_price"]
qty = fill["filled_qty"]
if qty <= 0:
logger.warning(f"⚠️ [{name}] 체결 조회 수량 0 -> 매수 반영 스킵")
return False
else:
# 체결 조회 실패 시 주문 수량/시그널가로 반영 (기존 동작, 로그로 구분)
logger.info(f"📋 [{name}] 체결 조회 미확인 -> 주문기준으로 반영 (가격={price:,.0f}, 수량={qty})")
self._update_account_light(profit_val=0)
buy_time = dt.now().strftime("%Y-%m-%d %H:%M:%S")
self.holdings[code] = {
"buy_price": price,
"qty": qty,
"buy_time": buy_time,
"name": name,
"max_price": price, # 고점 추적
"atr_entry": atr, # 매수 시점 ATR 저장
"stop_price": stop_price, # 손절가
"target_price": target_price, # 목표가
}
self.db.upsert_trade({
"code": code,
"name": name,
"strategy": "SHORT_ANT_SHAKING",
"avg_buy_price": price,
"current_price": price,
"target_qty": qty,
"current_qty": qty,
"status": "HOLDING",
"buy_date": buy_time,
"stop_price": stop_price,
"target_price": target_price,
"atr_entry": atr,
"entry_features": signal.get("entry_features"),
})
if fill:
logger.info(f"💰 [매수 체결] {name} ({code}): {price:,.0f}× {qty}주 (API 체결 확인) | 손절={stop_price:,.0f}원, 목표={target_price:,.0f}")
else:
logger.info(f"💰 [매수 체결] {name} ({code}): {price:,.0f}× {qty}주 (주문기준) | 손절={stop_price:,.0f}원, 목표={target_price:,.0f}")
# 매수 후 WebSocket 구독 등록 → 이후 check_sell_signals에서 REST 없이 실시간 수신
if self.ws_cache and self.ws_cache.is_active:
self.ws_cache.subscribe(code)
# 신규 보유 종목 즉시 갭보정 (봉 버퍼 미확보 시 ATR 계산 즉시 가능하게)
if self.candle_agg:
try:
lim = get_env_int("SHORT_GAP_FILL_LIMIT", 100)
df_gap = self.client.get_minute_chart(code, period="3", limit=lim)
if df_gap is not None and not df_gap.empty:
self.candle_agg.fill_gap_from_rest(code, 3, df_gap)
except Exception as _ge:
logger.debug("매수후 갭보정 실패(%s): %s", code, _ge)
# 체결 알림 (MM) — API 추가 호출 없이 메모리 값만 사용
try:
invest_amt = price * qty
cash_after = self.current_cash - invest_amt if self.current_cash >= invest_amt else 0
mm_msg = (
f"🟢 **매수 체결** {name} ({code})\n"
f"{price:,.0f}× {qty:,}주 | 손절 {stop_price:,.0f}원 / 목표 {target_price:,.0f}\n"
f"예수금(예상) {cash_after:,.0f}원 | 보유 {len(self.holdings)}종목"
)
self.send_mm(mm_msg)
except Exception as e:
logger.debug(f"매수 체결 MM 발송 스킵: {e}")
return True
def execute_sell(self, signal):
"""매도 실행"""
code = signal["code"]
name = signal["name"]
qty = signal["qty"]
if code not in self.holdings:
logger.warning(f"⚠️ [{name}] 보유 종목 아님")
return False
# ★ 매도 실패 백오프 체크 (영업일 아님·시장 마감 등 일시 오류 반복 방지)
backoff_until = self._sell_backoff.get(code, 0)
if time.time() < backoff_until:
remain_min = (backoff_until - time.time()) / 60
logger.debug("⏸ [%s(%s)] 매도 백오프 중 — %.0f분 후 재시도", name, code, remain_min)
return False
# 매도 주문
success = self.client.sell_market_order(code, qty)
if not success:
# 실패 원인 분석 → 영업일·시장마감 오류면 백오프 등록
msg_cd = getattr(self.client, "_last_sell_msg_cd", None) or ""
msg1 = getattr(self.client, "_last_sell_msg1", "") or ""
# KIS 비영업일·장마감 오류코드 (40100000=모의 영업일 아님, 40200000=실전 장외시간)
non_biz_codes = {"40100000", "40200000"}
if msg_cd in non_biz_codes or "영업일" in msg1 or "장외" in msg1 or "시장" in msg1:
backoff_sec = get_env_int("SELL_FAILURE_BACKOFF_SEC", 1800)
self._sell_backoff[code] = time.time() + backoff_sec
logger.warning(
"⏸ [%s(%s)] 매도 실패('%s') → %d분 후 재시도 (SELL_FAILURE_BACKOFF_SEC=%d)",
name, code, msg1, backoff_sec // 60, backoff_sec,
)
return False
if success:
# 현재가 조회
price_data = self.client.inquire_price(code)
if price_data:
sell_price = abs(float(price_data.get("stck_prpr", 0)))
else:
sell_price = signal.get("price", 0)
# 손익 계산 (매도 후 총자산 반영용)
holding = self.holdings.get(code, {})
buy_price = holding.get("buy_price", sell_price)
profit_val = (sell_price - buy_price) * qty # 손익 금액
# DB에서 매도 처리 (strategy 지정 → 꼬리잡기봇 row만 삭제, 스캘핑봇 row 보호)
self.db.close_trade(
code=code,
sell_price=sell_price,
sell_reason=signal['reason'],
strategy="SHORT_ANT_SHAKING",
)
del self.holdings[code]
# 재진입 쿨다운 기록 (REENTRY_COOLDOWN_SEC 동안 같은 종목 재매수 차단)
self.recently_sold[code] = time.time()
# 매도 후 WebSocket 구독 해제 → 불필요한 데이터 수신 차단
if self.ws_cache and self.ws_cache.is_active:
self.ws_cache.unsubscribe(code)
# 🔥 매도 후 예수금 + 총자산 즉시 업데이트 (손익 반영)
self._update_account_light(profit_val=profit_val)
logger.info(f"💸 [매도 체결] {name} ({code}): {qty}주 ({signal['reason']}, {signal['profit_pct']*100:+.2f}%)")
# 체결 알림 (MM) — 매도 직후 _update_account_light로 갱신된 예수금/총자산 사용 (추가 API 없음)
try:
pct = signal['profit_pct'] * 100
cum_pnl = self.current_total_asset - self.total_deposit if self.total_deposit else 0
cum_pct = (cum_pnl / self.total_deposit * 100) if self.total_deposit and self.total_deposit > 0 else 0
# 당일 손익: 오늘 장 시작 시 총자산 대비 현재 총자산 차이
day_pnl = self.current_total_asset - self.start_day_asset if self.start_day_asset else 0
day_pct = (day_pnl / self.start_day_asset * 100) if self.start_day_asset > 0 else 0
mm_msg = (
f"🔴 **매도 체결** {name} ({code})\n"
f"{sell_price:,.0f}× {qty:,}주 | {signal['reason']} | 수익률 {pct:+.2f}% (실현 {profit_val:+,.0f}원)\n"
f"예수금 {self.current_cash:,.0f}원 | 총자산 {self.current_total_asset:,.0f}원 | 보유 {len(self.holdings)}종목\n"
f"당일손익 {day_pnl:+,.0f}원 ({day_pct:+.2f}%) | 누적손익 {cum_pnl:+,.0f}원 ({cum_pct:+.2f}%)"
)
self.send_mm(mm_msg)
except Exception as e:
logger.debug(f"매도 체결 MM 발송 스킵: {e}")
return True
return False
def run(self):
"""메인 루프 (진입점). 내부적으로 asyncio.run(_run_async()) 호출."""
asyncio.run(self._run_async())
async def _run_async(self):
"""비동기 메인 루프 - 백그라운드 태스크 시작 후 동기 매매 루프 실행"""
logger.info("🚀 단타 트레이딩 봇 시작 (비동기 백그라운드 작업 활성화)")
# 매터모스트 원격 조종 리스너 (!적용 / !설정 → DB 반영 후 reload_config)
try:
from mm_remote import MattermostRemoteController
self.mm_remote = MattermostRemoteController(
server_url=MM_SERVER_URL,
bot_token=MM_BOT_TOKEN,
channel_alias=self.mm_channel,
mm_config_path=MM_CONFIG_FILE,
db=self.db,
)
self.mm_remote.start()
except Exception as e:
logger.warning("⚠️ MM 원격 조종 리스너 시작 실패: %s", e)
self.mm_remote = None
# 백그라운드 태스크 시작
self._universe_task = asyncio.create_task(self._universe_scan_scheduler())
self._report_task = asyncio.create_task(self._report_scheduler())
self._asset_task = asyncio.create_task(self._asset_update_scheduler())
logger.info("✅ 백그라운드 태스크 시작 완료 (유니버스 스캔, 리포트, 자산 업데이트)")
# 동기 매매 루프는 별도 스레드에서 실행 (메인 이벤트 루프 블로킹 방지)
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._sync_trading_loop)
def _sync_trading_loop(self):
"""동기 매매 루프 (메인 로직) - 백그라운드 작업과 분리"""
logger.info("📈 매매 루프 시작 (동기 모드)")
while True:
try:
# ★ DB 설정 실시간 반영 (재시작 없이 적용)
self.reload_config()
now = dt.now()
current_date = now.strftime("%Y-%m-%d")
# 날짜 변경 시 리포트 플래그 리셋
if current_date != self.today_date:
self.today_date = current_date
self.morning_report_sent = False
self.closing_report_sent = False
self.final_report_sent = False
self.ai_report_sent = False
self.start_day_asset = 0
logger.info(f"📅 날짜 변경: {current_date}")
# 리포트 전송은 백그라운드 태스크에서 처리하므로 여기서는 제거
# (기존 코드와의 호환성을 위해 주석 처리)
# if now.hour == 13 and now.minute == 0:
# self.send_morning_report()
# self.send_ai_report()
# elif now.hour == 15 and now.minute == 15:
# self.send_closing_report()
# elif now.hour == 15 and now.minute == 35:
# self.send_final_report()
# 장 상태 체크
if not self.check_market_status():
logger.info("⏸ 장 시간 아님 - 대기 중...")
time.sleep(60)
continue
# 자산 정보 업데이트는 백그라운드 태스크에서 처리하므로 여기서는 제거
# (기존 코드와의 호환성을 위해 주석 처리)
# if now.minute % 30 == 0: # 30분마다
# self._update_assets()
# 매도 신호 체크 (우선)
sell_signals = self.check_sell_signals()
for signal in sell_signals:
self.execute_sell(signal)
# 매수: 후보 종목을 3분봉으로 실시간 타점 체크 → 그 순간에만 매수 (키움 TAIL_CATCH 방식)
today_str = dt.now().strftime("%Y-%m-%d")
if today_str != self.today_date:
self.today_date = today_str
self.untradable_skip_set.clear()
logger.debug("📅 날짜 변경 -> 매매불가 제외 목록 초기화")
active_count = len(self.holdings)
db_candidates = self.db.get_target_candidates()
# 신규 후보 WS 구독 + 3분봉 갭보정 (봉 버퍼 미확보 종목 자동 보완)
if db_candidates:
self._sync_subscriptions(db_candidates)
if db_candidates and active_count < self.max_stocks:
strength_preview = " | 강도순: " + ", ".join(
f"{c.get('name', c.get('code',''))} {c.get('score', 0):.1f}" for c in db_candidates[:5]
) if db_candidates else ""
logger.info(f"🔍 [매수체크] 후보 {len(db_candidates)}명 순회 (보유 {active_count}/{self.max_stocks}){strength_preview}")
signals_this_turn = 0
attempts_this_turn = 0
for db_item in db_candidates:
code = db_item.get("code") or db_item.get("stk_cd", "")
name = db_item.get("name") or db_item.get("stk_nm", code)
if not code or code in self.holdings:
continue
# 매매불가 종목은 당일 재시도 안 함 → 다음 후보로
if code in self.untradable_skip_set:
continue
# ★ 재진입 쿨다운 체크: 최근 매도 종목은 일정 시간 동안 재매수 차단.
# 손절 직후 즉시 재매수 → 손절 반복 루프를 근본 차단.
reentry_cooldown = get_env_int("REENTRY_COOLDOWN_SEC", 300)
elapsed_since_sell = time.time() - self.recently_sold.get(code, 0)
if elapsed_since_sell < reentry_cooldown:
remaining = int(reentry_cooldown - elapsed_since_sell)
logger.info(
"⏳ [재진입 차단] %s(%s) 매도 후 쿨다운 중 — 남은 시간 %d초/%d",
name, code, remaining, reentry_cooldown,
)
continue
signal = self.check_buy_signal_tail_catch(code, name)
if signal:
signals_this_turn += 1
attempts_this_turn += 1
logger.info(f"🛒 [매수 시도] {name} ({code}) {attempts_this_turn}건째")
ok = self.execute_buy(signal)
if ok:
logger.info(f"✅ [매수체크] 이번 턴 시그널 {signals_this_turn}건, 시도 {attempts_this_turn}건, 성공 1건")
time.sleep(random.uniform(1, 2))
break
# 실패(매매불가, 금액0, 예수금 부족, API 오류 등) -> 다음 후보로 순회
time.sleep(random.uniform(0.3, 0.8))
continue
time.sleep(random.uniform(0.5, 1.0))
if signals_this_turn == 0 and db_candidates:
logger.info(f"🔍 [매수체크] 이번 순회 시그널 0건 (조건 통과한 후보 없음) → 다음 턴에 재시도")
elif attempts_this_turn > 0 and len(self.holdings) == active_count:
logger.info(f"🔍 [매수체크] 이번 턴 시그널 {signals_this_turn}건, 시도 {attempts_this_turn}건, 체결 0건 (실패 후 다음 턴)")
elif not db_candidates and active_count == 0:
logger.debug("🔍 [매수] 타겟 0개 (유니버스 스캔 대기 중)")
# 대기 (너무 길면 매수 타점 놓침 → 2~4초로 단축)
time.sleep(random.uniform(2, 4))
except KeyboardInterrupt:
logger.info("⏹ 봇 종료")
# 백그라운드 태스크 취소
if self._universe_task:
self._universe_task.cancel()
if self._report_task:
self._report_task.cancel()
if self._asset_task:
self._asset_task.cancel()
break
except Exception as e:
logger.error(f"❌ 루프 에러: {e}")
time.sleep(5)
if __name__ == "__main__":
bot = ShortTradingBot()
bot.run()