Files
kis_bot/kis_short_ver1.py
2026-02-22 18:05:14 +09:00

3297 lines
153 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
KIS Short Trading Bot Ver1 - 단타용 한투 API 트레이딩 시스템
- 한국투자증권(KIS) Open API 사용
- 개미털기(눌림목) 전략 기반 단타 매매
- 빠른 손절/익절 로직
- kiwoom_trader_ver2.py의 단타 전략을 한투 API로 변환
"""
import os
import re
import json
import time
import random
import logging
import datetime
import hashlib
import hmac
import base64
import warnings
import asyncio
from datetime import datetime as dt
from pathlib import Path
from typing import List, Dict, Optional
import pandas as pd
import requests
from database import TradeDB
# 로깅 설정
logging.basicConfig(
format='[%(asctime)s] %(message)s',
datefmt='%H:%M:%S',
level=logging.INFO,
)
logger = logging.getLogger("KISShortBot")
# 로그 색상 (ANSI) - 탈락/통과 구분
LOG_RED = "\033[91m" # 탈락
LOG_YELLOW = "\033[93m" # 탈락 (Pass-조건)
LOG_GREEN = "\033[92m" # 통과
LOG_CYAN = "\033[96m" # 강조
LOG_RESET = "\033[0m"
# DB 초기화
SCRIPT_DIR = Path(__file__).resolve().parent
db = TradeDB(db_path=str(SCRIPT_DIR / "quant_bot.db"))
# DB에서 환경변수 로드
def get_env_from_db(key, default=""):
"""DB에서 환경변수 읽기"""
env_data = db.get_latest_env()
if env_data and env_data.get("snapshot"):
return env_data["snapshot"].get(key, default)
return default
def get_env_float(key, default):
"""환경변수를 float로 변환 (DB 우선)"""
value = get_env_from_db(key, str(default))
if isinstance(value, str) and "#" in value:
value = value.split("#")[0].strip()
try:
return float(value) if value else default
except (ValueError, TypeError):
return default
def get_env_int(key, default):
"""환경변수를 int로 변환 (DB 우선)"""
value = get_env_from_db(key, str(default))
if isinstance(value, str) and "#" in value:
value = value.split("#")[0].strip()
try:
return int(value) if value else default
except (ValueError, TypeError):
return default
def get_env_bool(key, default=False):
"""환경변수를 bool로 변환 (DB 우선)"""
value = get_env_from_db(key, str(default)).lower()
return value in ("true", "1", "yes")
# Mattermost 설정
MM_SERVER_URL = get_env_from_db("MM_SERVER_URL", "https://mattermost.hoonfam.org")
MM_BOT_TOKEN = get_env_from_db("MM_BOT_TOKEN_", "").strip()
MM_CONFIG_FILE = SCRIPT_DIR / "mm_config.json"
# 기본 채널(alias) + 단타 봇 전용 채널(alias)
MM_CHANNEL_DEFAULT = get_env_from_db("MATTERMOST_CHANNEL", "stock")
MM_CHANNEL_SHORT = get_env_from_db("KIS_SHORT_MM_CHANNEL", MM_CHANNEL_DEFAULT)
# Gemini API (AI 리포트용) - google.genai 신규 SDK (Client 사용, configure 없음)
try:
import google.genai as genai
GEMINI_AVAILABLE = True
except ImportError:
GEMINI_AVAILABLE = False
logger.warning("⚠️ google-genai 미설치 - AI 리포트 기능 사용 불가")
GEMINI_API_KEY = get_env_from_db("GEMINI_API_KEY", "").strip()
GEMINI_MODEL_ID = "gemini-2.5-flash" # 또는 gemini-2.5-flash (모델명 확인)
gemini_client = None
if GEMINI_API_KEY and GEMINI_AVAILABLE:
try:
gemini_client = genai.Client(api_key=GEMINI_API_KEY)
except Exception as e:
logger.warning(f"⚠️ Gemini 초기화 실패: {e}")
gemini_client = None
else:
gemini_client = None
# ML 예측 (선택적)
try:
from ml_predictor import MLPredictor
ML_AVAILABLE = True
except ImportError:
ML_AVAILABLE = False
logger.warning("⚠️ ml_predictor 미설치 - ML 예측 기능 사용 불가")
# RiskManager (변동성 기반 리스크 관리)
try:
from risk_manager import RiskManager
RISK_MANAGER_AVAILABLE = True
except ImportError:
RISK_MANAGER_AVAILABLE = False
logger.warning("⚠️ risk_manager 미설치 - 변동성 역가중 매수 금액 계산 불가")
# ============================================================
# 한투(KIS) API 클라이언트 (kis_long_term_checker.py 참고)
# ============================================================
# 모의계좌용 토큰 캐시 경로
KIS_TOKEN_CACHE_PATH_MOCK = SCRIPT_DIR / ".kis_token_cache_mock.json"
# 실계좌용 토큰 캐시 경로
KIS_TOKEN_CACHE_PATH_REAL = SCRIPT_DIR / ".kis_token_cache_real.json"
# 한투 접근 토큰 유효기간 24시간. 자주 발급하면 영구 제명될 수 있으므로 캐시 철저 재사용.
# 만료 1분 전에만 재발급 (불필요한 발급 최소화)
KIS_TOKEN_EXPIRE_MARGIN_SEC = 60
def _parse_kis_token_expired(expired_str):
"""한투 API 만료시간 문자열 파싱. 'YYYY-MM-DD HH:MM:SS' 또는 'YYYY-MM-DDTHH:MM:SS' 등 지원."""
if not expired_str or not isinstance(expired_str, str):
return None
s = expired_str.strip().replace("T", " ")[:19]
if len(s) < 19:
return None
try:
return dt.strptime(s, "%Y-%m-%d %H:%M:%S")
except ValueError:
return None
def _load_kis_token_cache(mock):
"""캐시 파일에서 토큰 로드. 만료 1분 전까지 유효하면 재사용 (24h 토큰 자주 발급 시 영구 제명 주의)."""
if mock:
path = KIS_TOKEN_CACHE_PATH_MOCK
else:
path = KIS_TOKEN_CACHE_PATH_REAL
if not path.exists():
logger.info("한투 토큰 캐시 없음 → API 발급 예정 (캐시 경로: %s)", path)
return None
try:
logger.info("패스 %s", path)
with open(path, "r", encoding="utf-8") as f:
cache = json.load(f)
if cache.get("mock") != mock:
logger.info("한투 토큰 캐시 모의/실전 불일치 → API 발급 예정")
return None
token = cache.get("access_token")
expired_str = cache.get("access_token_token_expired") or cache.get("expires_at")
if not token or not expired_str:
logger.info("한투 토큰 캐시 내용 불완전 → API 발급 예정")
return None
expired_dt = _parse_kis_token_expired(expired_str)
if expired_dt is None:
logger.info("한투 토큰 캐시 만료시간 파싱 실패(%s) → API 발급 예정", expired_str[:30])
return None
if dt.now() >= expired_dt - datetime.timedelta(seconds=KIS_TOKEN_EXPIRE_MARGIN_SEC):
logger.info("한투 토큰 캐시 만료 임박(%s) → API 발급 예정", expired_str[:19])
return None
return token
except Exception as e:
logger.warning("한투 토큰 캐시 로드 실패(%s): %s", path, e)
return None
def _save_kis_token_cache(access_token, access_token_token_expired, mock):
"""발급받은 토큰을 캐시 파일에 저장."""
try:
if mock:
path = KIS_TOKEN_CACHE_PATH_MOCK
else:
path = KIS_TOKEN_CACHE_PATH_REAL
with open(path, "w", encoding="utf-8") as f:
json.dump({
"access_token": access_token,
"access_token_token_expired": access_token_token_expired,
"mock": mock,
}, f, ensure_ascii=False, indent=2)
logger.info("한투 토큰 캐시 저장 완료: %s", path)
except Exception as e:
logger.warning("한투 토큰 캐시 저장 실패: %s", e)
class KISClient:
"""한국투자증권 Open API 클라이언트"""
def __init__(self, mock=None):
# 모의 여부 결정
if mock is not None:
use_mock = mock
else:
use_mock = get_env_bool("KIS_MOCK", True)
# 모의투자는 MOCK 전용 키만 사용(실전 키로 폴백 안 함 → 토큰/캐시가 실전이랑 섞이지 않도록)
if use_mock:
self.app_key = get_env_from_db("KIS_APP_KEY_MOCK", "").strip()
self.app_secret = get_env_from_db("KIS_APP_SECRET_MOCK", "").strip()
if not self.app_key or not self.app_secret:
logger.error("❌ 모의투자용 APP KEY/SECRET이 DB에 없습니다. KIS_APP_KEY_MOCK, KIS_APP_SECRET_MOCK 설정 필요.")
raise ValueError("모의투자 KIS_APP_KEY_MOCK / KIS_APP_SECRET_MOCK 미설정")
else:
self.app_key = (get_env_from_db("KIS_APP_KEY_REAL", "") or get_env_from_db("KIS_APP_KEY", "")).strip()
self.app_secret = (get_env_from_db("KIS_APP_SECRET_REAL", "") or get_env_from_db("KIS_APP_SECRET", "")).strip()
# 계좌번호: 모의/실전 분리
if use_mock:
raw_no = get_env_from_db("KIS_ACCOUNT_NO_MOCK", "").strip()
raw_code = get_env_from_db("KIS_ACCOUNT_CODE_MOCK", "").strip()
if not raw_code:
raw_code = "01"
else:
raw_no = (get_env_from_db("KIS_ACCOUNT_NO_REAL", "") or get_env_from_db("KIS_ACCOUNT_NO", "")).strip()
raw_code = (get_env_from_db("KIS_ACCOUNT_CODE_REAL", "") or get_env_from_db("KIS_ACCOUNT_CODE", "01")).strip()
if not raw_code:
raw_code = "01"
# 10자리면 앞 8 / 뒤 2 분리
if len(raw_no) >= 10:
self.acc_no = raw_no[:8]
self.acc_code = raw_no[8:10]
else:
self.acc_no = raw_no
self.acc_code = raw_code[:2] if len(raw_code) >= 2 else "01"
if len(self.acc_no) != 8:
logger.warning("⚠️ 계좌번호 CANO 8자리 아님: '%s'(%s자리). DB 확인.", self.acc_no, len(self.acc_no))
if len(self.acc_no) != 8 or len(self.acc_code) != 2:
logger.error(
"❌ 계좌번호 형식 오류: CANO=%s(%s자리), ACNT_PRDT_CD=%s(%s자리) → OPSQ2000 발생. "
"모의면 KIS_ACCOUNT_NO_MOCK/KIS_ACCOUNT_CODE_MOCK, 실전이면 KIS_ACCOUNT_NO/KIS_ACCOUNT_CODE 확인.",
self.acc_no, len(self.acc_no), self.acc_code, len(self.acc_code)
)
else:
logger.info("✅ 한투 계좌 CANO=%s, ACNT_PRDT_CD=%s (모의=%s)", self.acc_no, self.acc_code, use_mock)
self.mock = use_mock
if self.mock is True:
self.base_url = "https://openapivts.koreainvestment.com:29443"
else:
self.base_url = "https://openapi.koreainvestment.com:9443"
self.access_token = None
logger.info("한투 API 연결: 모의=%s%s", self.mock, self.base_url)
self._auth()
def _auth(self):
"""접근 토큰 발급"""
if not self.app_key or not self.app_secret:
if self.mock:
key_hint = "KIS_APP_KEY_MOCK, KIS_APP_SECRET_MOCK"
else:
key_hint = "KIS_APP_KEY_REAL, KIS_APP_SECRET_REAL (또는 KIS_APP_KEY, KIS_APP_SECRET)"
logger.error("한투 API 키가 없습니다. DB env_config에 설정 필요: %s", key_hint)
raise ValueError("KIS 앱키/시크릿 설정 필요 (모의=%s)" % self.mock)
# ✅ path를 먼저 정의 (발급 성공/실패 양쪽에서 사용)
path = KIS_TOKEN_CACHE_PATH_MOCK if self.mock else KIS_TOKEN_CACHE_PATH_REAL
mode_str = "모의" if self.mock else "실전"
cached = _load_kis_token_cache(self.mock)
if cached:
self.access_token = cached
token_head = (cached[:8] + "") if cached and len(cached) > 8 else "(없음)"
logger.info("한투 토큰 캐시 사용 (%s) | 파일=%s | 토큰앞8=%s", mode_str, path, token_head)
return
# 캐시 없음/만료 → API로 새 토큰 발급 (캐시 파일 없어도 자동 발급)
appkey_tail = (self.app_key[-4:] if len(self.app_key) >= 4 else self.app_key) or "????"
logger.info(
"한투 토큰 발급 요청 (%s) | 앱키 끝4자리=%s | 저장할 캐시=%s",
mode_str, appkey_tail, path,
)
url = f"{self.base_url}/oauth2/tokenP"
body = {"grant_type": "client_credentials", "appkey": self.app_key, "appsecret": self.app_secret}
try:
r = requests.post(url, json=body, timeout=10)
data = r.json()
if "access_token" in data:
self.access_token = data["access_token"]
expired = data.get("access_token_token_expired") or ""
_save_kis_token_cache(self.access_token, expired, self.mock)
token_head = (self.access_token[:8] + "") if self.access_token and len(self.access_token) > 8 else "(없음)"
logger.info(
"한투 토큰 발급 완료 (%s) | 캐시=%s | 앱키끝4=%s | 토큰앞8=%s",
mode_str, path, appkey_tail, token_head,
)
else:
logger.error("한투 토큰 발급 실패: %s", data)
if isinstance(data, dict) and data.get("error_code") == "EGW00133":
logger.warning("한투 1분당 1회 제한. 1분 후 재시도하거나 캐시 사용: %s", path)
raise RuntimeError("한투 토큰 발급 실패")
except Exception as e:
logger.error("한투 인증 예외: %s", e)
raise
def _get_hashkey(self, body):
"""해시키(Hashkey) 발급 - POST 요청 시 body 무결성 검증용"""
try:
url = f"{self.base_url}/uapi/hashkey"
headers = {
"content-type": "application/json",
"appkey": self.app_key,
"appsecret": self.app_secret,
}
r = requests.post(url, headers=headers, json=body, timeout=5)
if r.status_code == 200:
data = r.json()
if data.get("rt_cd") == "0":
return data.get("HASH")
return None
except Exception as e:
logger.debug(f"해시키 발급 실패: {e}")
return None
def _headers(self, tr_id, hashkey=None):
"""API 호출용 헤더 생성"""
headers = {
"content-type": "application/json; charset=utf-8",
"authorization": f"Bearer {self.access_token}",
"appkey": self.app_key,
"appsecret": self.app_secret,
"tr_id": tr_id,
}
if hashkey:
headers["hashkey"] = hashkey
return headers
def _get(self, path, tr_id, params, max_retries=3, tr_cont=None):
"""
GET 요청. EGW00201(초당 거래건수 초과) 시 점진적 대기 시간 증가 재시도
- 한투 API 제한: 초당 20개 (실제로는 더 엄격, 모의투자는 초당 2~3회 권장)
- EGW00201 감지 시: 5초 + (attempt * 1초) 대기 후 재시도
- 기본 호출 간격: 0.5초 이상 권장
"""
url = f"{self.base_url}{path}"
headers = self._headers(tr_id)
if tr_cont:
headers["tr_cont"] = tr_cont # 연속 조회 시 다음 페이지 요청 (한투: Response Header tr_cont=M 이면 Request Header tr_cont=N)
logger.debug(f"[API호출] GET {path} TR_ID={tr_id} params={params} tr_cont={tr_cont}")
time.sleep(0.5)
for attempt in range(max_retries):
try:
r = requests.get(url, headers=headers, params=params, timeout=15)
# HTTP 429 (Too Many Requests)
if r.status_code == 429:
wait_time = 1 + (attempt * 1) # 5초, 6초, 7초...
logger.warning(
f"⏳ API 호출 제한 (429) -> {wait_time}초 대기 후 재시도 "
f"({attempt+1}/{max_retries}) path={path}"
)
time.sleep(wait_time)
continue
if r.status_code == 200:
j = r.json()
if j.get("rt_cd") == "0":
logger.debug(f"[API성공] GET {path} TR_ID={tr_id} status=200 rt_cd=0")
return r
# EGW00201: 초당 거래건수 초과
elif j.get("msg_cd") == "EGW00201" or "초과" in str(j.get("msg1", "")) or "과부하" in str(j.get("msg1", "")):
wait_time = 1 + (attempt * 1) # 5초, 6초, 7초... (키움 봇 방식)
logger.warning(
f"⏳ API 과부하 (EGW00201) GET {path} TR_ID={tr_id} -> {wait_time}초 대기 후 재시도 "
f"({attempt+1}/{max_retries}) rt_cd={j.get('rt_cd')} msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}"
)
time.sleep(wait_time)
continue
# HTTP 200이 아니거나 rt_cd != "0"인 경우
try:
body_preview = (r.text or "")[:500]
except Exception:
body_preview = ""
logger.warning(
f"[API실패] GET {path} TR_ID={tr_id} status={r.status_code} "
f"params={params} body={body_preview}"
)
return r
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
wait_time = (2 ** attempt) + random.uniform(0.5, 1.5)
logger.warning(f"⚠️ 네트워크 에러 -> {wait_time:.1f}초 대기 후 재시도: {e}")
time.sleep(wait_time)
else:
logger.error(f"❌ GET 요청 실패 ({path}): {e}")
return r
def _post(self, path, tr_id, body, use_hashkey=True, max_retries=3):
"""
POST 요청. EGW00201(초당 거래건수 초과) 시 점진적 대기 시간 증가 재시도
- 한투 API 제한: 초당 20개 (실제로는 더 엄격)
- EGW00201 감지 시: 5초 + (attempt * 1초) 대기 후 재시도
"""
url = f"{self.base_url}{path}"
hashkey = None
# API 호출 정보 디버그 로그 (민감 정보는 일부만)
body_preview = str(body)[:200] if body else "{}"
logger.debug(f"[API호출] POST {path} TR_ID={tr_id} body={body_preview}...")
# 기본 안전 대기 (서버 부하 완화)
time.sleep(0.5)
# 해시키 발급 (선택적이지만 보안 강화)
if use_hashkey:
hashkey = self._get_hashkey(body)
if not hashkey:
logger.debug("해시키 발급 실패, 해시키 없이 진행")
for attempt in range(max_retries):
try:
r = requests.post(url, headers=self._headers(tr_id, hashkey), json=body, timeout=15)
# HTTP 429 (Too Many Requests)
if r.status_code == 429:
wait_time = 5 + (attempt * 1) # 5초, 6초, 7초...
logger.warning(
f"⏳ API 호출 제한 (429) -> {wait_time}초 대기 후 재시도 "
f"({attempt+1}/{max_retries}) path={path}"
)
time.sleep(wait_time)
# 해시키 재발급
if use_hashkey:
hashkey = self._get_hashkey(body)
continue
if r.status_code == 200:
j = r.json()
if j.get("rt_cd") == "0":
logger.debug(f"[API성공] POST {path} TR_ID={tr_id} status=200 rt_cd=0")
return r
# EGW00201: 초당 거래건수 초과
elif j.get("msg_cd") == "EGW00201" or "초과" in str(j.get("msg1", "")) or "과부하" in str(j.get("msg1", "")):
wait_time = 5 + (attempt * 1) # 5초, 6초, 7초... (키움 봇 방식)
logger.warning(
f"⏳ API 과부하 (EGW00201) POST {path} TR_ID={tr_id} -> {wait_time}초 대기 후 재시도 "
f"({attempt+1}/{max_retries}) rt_cd={j.get('rt_cd')} msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}"
)
time.sleep(wait_time)
if use_hashkey:
hashkey = self._get_hashkey(body)
continue
# HTTP 200이 아니거나 rt_cd != "0"인 경우
try:
body_preview = (r.text or "")[:500]
except Exception:
body_preview = ""
logger.warning(
f"[API실패] POST {path} TR_ID={tr_id} status={r.status_code} "
f"body={body_preview}"
)
return r
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
wait_time = (2 ** attempt) + random.uniform(0.5, 1.5)
logger.warning(f"⚠️ 네트워크 에러 -> {wait_time:.1f}초 대기 후 재시도: {e}")
time.sleep(wait_time)
else:
logger.error(f"❌ POST 요청 실패 ({path}): {e}")
return r
def inquire_price(self, stock_code):
"""
주식 현재가 시세 조회 [v1_국내주식-008] (단건).
output: stck_prpr(현재가) ✅, stck_oprc(시가), stck_hgpr(고가), stck_lwpr(저가) 등 당일 OHLC 포함.
실패 시 오류코드(rt_cd, msg_cd, msg1) 로깅.
"""
path = "/uapi/domestic-stock/v1/quotations/inquire-price"
tr_id = "FHKST01010100"
params = {"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": stock_code}
logger.debug(f"[현재가API] 호출 code={stock_code} path={path} TR_ID={tr_id}")
r = self._get(path, tr_id, params)
if r.status_code != 200:
try:
body_preview = (r.text or "")[:300]
except Exception:
body_preview = ""
logger.warning(
f"[현재가API] HTTP 실패 code={stock_code} path={path} TR_ID={tr_id} "
f"status={r.status_code} body={body_preview}"
)
return None
try:
j = r.json()
except Exception as e:
logger.warning(
f"[현재가API] JSON 파싱 실패 code={stock_code} path={path} TR_ID={tr_id} exception={e}"
)
return None
if j.get("rt_cd") != "0":
logger.warning(
f"[현재가API] 오류 code={stock_code} path={path} TR_ID={tr_id} "
f"rt_cd={j.get('rt_cd')} msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}"
)
return None
return j.get("output")
def inquire_multprice(self, stock_codes: List[str], max_per_call: int = 20):
"""
다중 종목 현재가 조회 [intstock-multprice]
- 한투 API: /uapi/domestic-stock/v1/quotations/intstock-multprice
- 성공 시 {종목코드: output딕셔너리} 반환, 실패 시 None (오류 시 rt_cd/msg_cd/msg1 로깅)
- TR_ID: FHKST01010600
- ⚠️ 배치 응답에는 stck_oprc(시가)/stck_hgpr(고가)/stck_lwpr(저가)가 없을 수 있음(API 스펙).
시가·고가·저가 필요 시 단건 inquire_price() 사용.
"""
if not stock_codes:
return None
codes = list(stock_codes)[: max_per_call * 10]
result = {}
for i in range(0, len(codes), max_per_call):
chunk = codes[i : i + max_per_call]
iscd = ",".join(chunk)
path = "/uapi/domestic-stock/v1/quotations/intstock-multprice"
tr_id = "FHKST01010600"
params = {"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": iscd}
r = self._get(path, tr_id, params)
if r.status_code != 200:
try:
body_preview = (r.text or "")[:300]
except Exception:
body_preview = ""
logger.warning(
f"[다중시세API] HTTP 실패 status={r.status_code} body={body_preview}"
)
continue
try:
j = r.json()
except Exception as e:
logger.warning(f"[다중시세API] JSON 파싱 실패 exception={e}")
continue
if j.get("rt_cd") != "0":
logger.warning(
f"[다중시세API] 오류 rt_cd={j.get('rt_cd')} "
f"msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}"
)
continue
out = j.get("output")
if out is None:
continue
if isinstance(out, list):
for item in out:
if isinstance(item, dict):
code = (
item.get("stck_shrn_iscd")
or item.get("rsym")
or item.get("FID_INPUT_ISCD")
or item.get("mksc_shrn_iscd")
)
if code:
result[code] = item
elif isinstance(out, dict):
code = (
out.get("stck_shrn_iscd")
or out.get("rsym")
or out.get("FID_INPUT_ISCD")
)
if code:
result[code] = out
time.sleep(random.uniform(0.2, 0.5))
return result if result else None
def inquire_prices_batch(self, stock_codes: List[str]):
"""
다중 종목 현재가 일괄 조회
- intstock-multprice API 우선 시도 후, 실패 시 순차 조회(inquire_price)로 fallback
- 순차 조회 시 종목당 0.3~0.6초 딜레이
- 배치 응답에는 stck_prpr(현재가)는 있으나, stck_oprc/stck_hgpr/stck_lwpr 는 없을 수 있음.
시가·고가·저가 필요 시 배치 대신 종목별 inquire_price() 사용(개미털기 스캔은 이미 단건 사용).
"""
if not stock_codes:
return {}
multi = self.inquire_multprice(stock_codes)
if multi:
return multi
result = {}
for code in stock_codes:
try:
price_data = self.inquire_price(code)
if price_data:
result[code] = price_data
time.sleep(random.uniform(0.3, 0.6))
except Exception as e:
logger.warning(f"종목 조회 실패({code}) exception={e!r}")
continue
return result
def get_account_balance(self):
"""계좌 잔고 조회 [v1_국내주식-010]. 모의/실전에 따라 TR ID 분기 (EGW2004 방지)."""
if self.mock:
tr_id = "VTTC8434R"
else:
tr_id = "TTTC8434R"
params = {
"CANO": self.acc_no,
"ACNT_PRDT_CD": self.acc_code,
"AFHR_FLPR_YN": "N",
"OFL_YN": "",
"INQR_DVSN": "01", # 01: 예수금/잔고 요약 (output2에 예수금) - 블로그·한투 문서 기준
"UNPR_DVSN": "01",
"FUND_STTL_ICLD_YN": "N",
"FNCG_AMT_AUTO_RDPT_YN": "N",
"PRCS_DVSN": "00", # 00: 조회 (블로그 기준)
"CTX_AREA_FK100": "",
"CTX_AREA_NK100": "",
}
try:
logger.info(f"💵 [예수금] 잔고 조회 요청: TR={tr_id}, CANO={self.acc_no}, ACNT_PRDT_CD={self.acc_code}, 모의={self.mock}")
r = self._get(
"/uapi/domestic-stock/v1/trading/inquire-balance",
tr_id,
params,
)
if r.status_code != 200:
logger.warning(
f"💵 [예수금] 잔고 API HTTP 오류: status={r.status_code}, body={getattr(r, 'text', '')[:200]} | "
f"TR={tr_id} (모의={self.mock}), CANO={self.acc_no}, ACNT_PRDT_CD={self.acc_code}. "
f"EGW2004 시 모의면 VTTC8434R/실전이면 TTTC8434R 확인"
)
return None
j = r.json()
if j.get("rt_cd") != "0":
msg1 = (j.get("msg1") or "")[:150]
msg_cd = j.get("msg_cd", "")
logger.error(
f"💵 [예수금] 잔고 API 응답 오류: rt_cd={j.get('rt_cd')}, msg_cd={msg_cd}, msg1={msg1} | "
f"요청 파라미터: TR={tr_id}, CANO={self.acc_no}({len(self.acc_no)}자리), "
f"ACNT_PRDT_CD={self.acc_code}({len(self.acc_code)}자리), 모의={self.mock} | "
f"전체 응답: {j}"
)
if "OPSQ2000" in str(msg_cd) or "INVALID_CHECK_ACNO" in msg1:
logger.error(
"💵 [예수금] OPSQ2000 = 계좌번호 검증 실패. "
f"모의투자 서버({self.base_url})에 계좌번호 CANO={self.acc_no}, ACNT_PRDT_CD={self.acc_code}가 등록되어 있는지 확인. "
f"한투 모의투자 앱/웹에서 계좌번호 확인 필요. DB의 KIS_ACCOUNT_NO_MOCK/KIS_ACCOUNT_CODE_MOCK 값 확인."
)
return None
logger.debug(f"💵 [예수금] 잔고 조회 성공: output2 keys={list(j.get('output2', [{}])[0].keys()) if isinstance(j.get('output2'), list) and j.get('output2') else []}")
return j
except Exception as e:
logger.error(f"💵 [예수금] 잔고 조회 예외: {e} | CANO={self.acc_no}, ACNT_PRDT_CD={self.acc_code}, 모의={self.mock}")
return None
def get_account_evaluation(self):
"""계좌 평가 잔고 조회 [v1_국내주식-011]. 모의=VTTC8494R, 실전=TTTC8494R."""
if self.mock:
tr_id = "VTTC8494R"
else:
tr_id = "TTTC8494R"
try:
logger.info(tr_id)
r = self._get(
"/uapi/domestic-stock/v1/trading/inquire-balance-rlz-pl",
tr_id,
{
"CANO": self.acc_no,
"ACNT_PRDT_CD": self.acc_code,
"AFHR_FLPR_YN": "N",
"OFL_YN": "",
"INQR_DVSN": "01",
"UNPR_DVSN": "01",
"FUND_STTL_ICLD_YN": "N",
"FNCG_AMT_AUTO_RDPT_YN": "N",
"PRCS_DVSN": "01",
"CTX_AREA_FK100": "",
"CTX_AREA_NK100": "",
},
)
if r.status_code != 200:
return None
j = r.json()
if j.get("rt_cd") != "0":
return None
return j
except Exception as e:
logger.error(f"계좌 평가 조회 실패: {e}")
return None
def get_order_history(self, start_date=None, end_date=None):
"""주문 체결 내역 조회 [v1_국내주식-012] (모의=VTTC8001R, 실전=TTTC8001R)"""
try:
if self.mock:
tr_id = "VTTC8001R"
else:
tr_id = "TTTC8001R"
if not start_date:
start_date = dt.now().strftime("%Y%m%d")
if not end_date:
end_date = dt.now().strftime("%Y%m%d")
r = self._get(
"/uapi/domestic-stock/v1/trading/inquire-daily-ccld",
tr_id,
{
"CANO": self.acc_no,
"ACNT_PRDT_CD": self.acc_code,
"INQR_STRT_DT": start_date,
"INQR_END_DT": end_date,
"SLL_BUY_DVSN_CD": "00", # 00:전체
"INQR_DVSN": "00", # 00:역순
"PDNO": "",
"CCLD_DVSN": "00", # 00:전체
"ORD_GNO_BRNO": "",
"ODNO": "",
"INQR_DVSN_3": "00",
"INQR_DVSN_1": "",
"CTX_AREA_FK100": "",
"CTX_AREA_NK100": "",
},
)
if r.status_code != 200:
return None
j = r.json()
if j.get("rt_cd") != "0":
return None
return j
except Exception as e:
logger.error(f"주문 내역 조회 실패: {e}")
return None
def get_volume_surge_stocks(self, market="J", min_volume_rate="50", limit=50):
"""거래량 급증 종목 조회 [v1_국내주식-023]"""
try:
r = self._get(
"/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice",
"FHKST03010200",
{
"FID_COND_MRKT_DIV_CODE": market,
"FID_INPUT_ISCD": "",
"FID_INPUT_HOUR_1": dt.now().strftime("%Y%m%d"),
"FID_INPUT_HOUR_2": dt.now().strftime("%Y%m%d"),
"FID_PW_DATA_INCU_YN": "Y",
},
)
# 실제로는 거래량 급증 API를 사용해야 하지만, 여기서는 예시로 현재가 조회 활용
# 실제 구현 시: /uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice 사용
return []
except Exception as e:
logger.error(f"거래량 급증 종목 조회 실패: {e}")
return []
def get_top_price_movers(self, market="J", sort_type="1", limit=50):
"""등락률 상위 종목 조회 [v1_국내주식-027]"""
try:
r = self._get(
"/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice",
"FHKST03010200",
{
"FID_COND_MRKT_DIV_CODE": market,
"FID_INPUT_ISCD": "",
"FID_INPUT_HOUR_1": dt.now().strftime("%Y%m%d"),
"FID_INPUT_HOUR_2": dt.now().strftime("%Y%m%d"),
"FID_PW_DATA_INCU_YN": "Y",
},
)
# 실제 구현 필요
return []
except Exception as e:
logger.error(f"등락률 상위 조회 실패: {e}")
return []
def get_investor_trend(self, stock_code, days=5):
"""외국인/기관 매매 동향 조회"""
path = "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
tr_id = "FHKST03010100"
try:
# 일봉 데이터에서 외국인/기관 정보 추출
end_dt = dt.now()
start_dt = end_dt - datetime.timedelta(days=days + 10)
start_ymd = start_dt.strftime("%Y%m%d")
end_ymd = end_dt.strftime("%Y%m%d")
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": stock_code,
"FID_INPUT_DATE_1": start_ymd,
"FID_INPUT_DATE_2": end_ymd,
"FID_PERIOD_DIV_CODE": "D",
"FID_ORG_ADJ_PRC": "1",
}
logger.debug(f"[투자자동향] 호출 code={stock_code} path={path} TR_ID={tr_id}")
r = self._get(path, tr_id, params)
if r.status_code != 200:
logger.warning(f"[투자자동향] HTTP 실패 code={stock_code} path={path} TR_ID={tr_id} status={r.status_code}")
return None
j = r.json()
if j.get("rt_cd") != "0":
logger.warning(
f"[투자자동향] 오류 code={stock_code} path={path} TR_ID={tr_id} "
f"rt_cd={j.get('rt_cd')} msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}"
)
return None
out2 = j.get("output2", [])
if not out2:
return None
# 최근 N일 외국인/기관 순매수 합계
foreign_sum = 0
org_sum = 0
for item in out2[:days]:
try:
foreign_raw = item.get("frgn_ntby_qty") or item.get("frgn_ntby_shnu") or "0"
foreign_net = int(float(str(foreign_raw).replace(",", "").replace("+", "").replace("-", "")))
if str(foreign_raw).startswith("-"):
foreign_net = -foreign_net
org_raw = item.get("orgn_ntby_qty") or "0"
org_net = int(float(str(org_raw).replace(",", "").replace("+", "").replace("-", "")))
if str(org_raw).startswith("-"):
org_net = -org_net
foreign_sum += foreign_net
org_sum += org_net
except:
continue
return {
"foreign_net_buy": foreign_sum,
"org_net_buy": org_sum,
"total_net_buy": foreign_sum + org_sum,
}
except Exception as e:
logger.error(f"외국인/기관 동향 조회 실패({stock_code}): {e}")
return None
def get_daily_chart(self, code, limit=10):
"""일봉 차트 조회 [v1_국내주식-017] - 거래대금(대/중/소형) 계산용"""
path = "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
tr_id = "FHKST03010100"
try:
end_dt = dt.now()
start_dt = end_dt - datetime.timedelta(days=limit + 30)
start_ymd = start_dt.strftime("%Y%m%d")
end_ymd = end_dt.strftime("%Y%m%d")
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": code,
"FID_INPUT_DATE_1": start_ymd,
"FID_INPUT_DATE_2": end_ymd,
"FID_PERIOD_DIV_CODE": "D",
"FID_ORG_ADJ_PRC": "1",
}
logger.debug(f"[일봉차트] 호출 code={code} path={path} TR_ID={tr_id}")
r = self._get(path, tr_id, params)
if r.status_code != 200:
logger.warning(f"[일봉차트] HTTP 실패 code={code} path={path} TR_ID={tr_id} status={r.status_code}")
return pd.DataFrame()
j = r.json()
if j.get("rt_cd") != "0":
logger.warning(
f"[일봉차트] 오류 code={code} path={path} TR_ID={tr_id} "
f"rt_cd={j.get('rt_cd')} msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}"
)
return pd.DataFrame()
data = j.get("output2", [])
if not data:
return pd.DataFrame()
rows = []
for item in data:
try:
rows.append({
"date": item.get("stck_bsop_date", ""),
"open": abs(float(item.get("stck_oprc", 0))),
"high": abs(float(item.get("stck_hgpr", 0))),
"low": abs(float(item.get("stck_lwpr", 0))),
"close": abs(float(item.get("stck_clpr", 0))),
"volume": int(item.get("acml_vol", 0)),
})
except Exception:
continue
if not rows:
return pd.DataFrame()
df = pd.DataFrame(rows)
df = df.sort_values("date").reset_index(drop=True)
return df.tail(limit)
except Exception as e:
logger.error(f"일봉 조회 실패({code}): {e}")
return pd.DataFrame()
def buy_order(self, code, qty, price=0, order_type="01"):
"""
매수 주문 (모의=VTTC0802U, 실전=TTTC0802U)
Args:
code: 종목코드
qty: 수량
price: 가격 (0이면 시장가)
order_type: 주문구분
- "01": 시장가
- "00": 지정가
- "05": 조건부지정가
- "06": 최유리지정가
- "07": 최우선지정가
- "10": 보통(IOC)
- "13": 시장가(IOC)
- "16": 최유리(IOC)
- "20": 보통(FOK)
- "23": 시장가(FOK)
- "26": 최유리(FOK)
"""
try:
if self.mock:
tr_id = "VTTC0802U"
else:
tr_id = "TTTC0802U"
if price > 0:
ord_unpr = str(price)
else:
ord_unpr = "0"
path = "/uapi/domestic-stock/v1/trading/order-cash"
body = {
"CANO": self.acc_no,
"ACNT_PRDT_CD": self.acc_code,
"PDNO": code,
"ORD_DVSN": order_type,
"ORD_QTY": str(qty),
"ORD_UNPR": ord_unpr,
}
logger.debug(f"[매수주문] 호출 code={code} qty={qty} price={price} path={path} TR_ID={tr_id}")
r = self._post(path, tr_id, body, use_hashkey=True)
if r.status_code != 200:
logger.error(f"[매수주문] HTTP 에러 code={code} path={path} TR_ID={tr_id} status={r.status_code}")
return False
j = r.json()
if j.get("rt_cd") == "0":
ord_no = j.get("output", {}).get("ODNO", "")
logger.info(f"✅ 매수 주문 성공: {code} {qty}주 (주문번호: {ord_no})")
return True
else:
logger.error(
f"[매수주문] 실패 code={code} path={path} TR_ID={tr_id} "
f"rt_cd={j.get('rt_cd')} msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}"
)
return False
except Exception as e:
logger.error(f"매수 주문 예외({code}): {e}")
return False
def buy_market_order(self, code, qty):
"""시장가 매수 주문 (간편 메서드)"""
return self.buy_order(code, qty, price=0, order_type="01")
def sell_order(self, code, qty, price=0, order_type="01"):
"""
매도 주문 (모의=VTTC0801U, 실전=TTTC0801U)
Args:
code: 종목코드
qty: 수량
price: 가격 (0이면 시장가)
order_type: 주문구분 (buy_order와 동일)
"""
try:
if self.mock:
tr_id = "VTTC0801U"
else:
tr_id = "TTTC0801U"
if price > 0:
ord_unpr = str(price)
else:
ord_unpr = "0"
path = "/uapi/domestic-stock/v1/trading/order-cash"
body = {
"CANO": self.acc_no,
"ACNT_PRDT_CD": self.acc_code,
"PDNO": code,
"ORD_DVSN": order_type,
"ORD_QTY": str(qty),
"ORD_UNPR": ord_unpr,
}
logger.debug(f"[매도주문] 호출 code={code} qty={qty} price={price} path={path} TR_ID={tr_id}")
r = self._post(path, tr_id, body, use_hashkey=True)
if r.status_code != 200:
logger.error(f"[매도주문] HTTP 에러 code={code} path={path} TR_ID={tr_id} status={r.status_code}")
return False
j = r.json()
if j.get("rt_cd") == "0":
ord_no = j.get("output", {}).get("ODNO", "")
logger.info(f"✅ 매도 주문 성공: {code} {qty}주 (주문번호: {ord_no})")
return True
else:
logger.error(
f"[매도주문] 실패 code={code} path={path} TR_ID={tr_id} "
f"rt_cd={j.get('rt_cd')} msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}"
)
return False
except Exception as e:
logger.error(f"매도 주문 예외({code}): {e}")
return False
def sell_market_order(self, code, qty):
"""시장가 매도 주문 (간편 메서드)"""
return self.sell_order(code, qty, price=0, order_type="01")
def get_minute_chart(self, code, period="3", limit=100):
"""분봉 차트 조회 [v1_국내주식-017]"""
path = "/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice"
tr_id = "FHKST03010200"
try:
end_dt = dt.now()
start_dt = end_dt - datetime.timedelta(days=1)
start_ymd = start_dt.strftime("%Y%m%d")
end_ymd = end_dt.strftime("%Y%m%d")
# 분봉 코드: 1분=1, 3분=3, 5분=5, 10분=10, 30분=30, 60분=60
period_map = {"1": "1", "3": "3", "5": "5", "10": "10", "30": "30", "60": "60"}
period_code = period_map.get(str(period), "3")
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": code,
"FID_INPUT_HOUR_1": start_ymd,
"FID_INPUT_HOUR_2": end_ymd,
"FID_PW_DATA_INCU_YN": "Y",
"FID_ETC_CLS_CODE": "", # 기타분류코드 (필수 파라미터, 빈 값 가능)
}
logger.debug(f"[분봉차트] 호출 code={code} period={period} path={path} TR_ID={tr_id}")
r = self._get(path, tr_id, params)
if r.status_code != 200:
logger.warning(f"[분봉차트] HTTP 실패 code={code} path={path} TR_ID={tr_id} status={r.status_code}")
return pd.DataFrame()
j = r.json()
if j.get("rt_cd") != "0":
logger.warning(
f"[분봉차트] 오류 code={code} path={path} TR_ID={tr_id} "
f"rt_cd={j.get('rt_cd')} msg_cd={j.get('msg_cd')} msg1={j.get('msg1')}"
)
return pd.DataFrame()
data = j.get("output2", [])
if not data:
return pd.DataFrame()
rows = []
for item in data:
try:
rows.append({
"time": item.get("stck_bsop_date", ""),
"open": abs(float(item.get("stck_oprc", 0))),
"high": abs(float(item.get("stck_hgpr", 0))),
"low": abs(float(item.get("stck_lwpr", 0))),
"close": abs(float(item.get("stck_clpr", 0))),
"volume": int(item.get("acml_vol", 0)),
})
except:
continue
if not rows:
return pd.DataFrame()
df = pd.DataFrame(rows)
df = df.sort_values("time").reset_index(drop=True)
# 기술적 지표 추가
if len(df) >= 14:
delta = df["close"].diff(1)
gain = delta.where(delta > 0, 0).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss.replace(0, float("nan"))
df["RSI"] = 100 - (100 / (1 + rs))
if len(df) >= 20:
df["MA20"] = df["close"].rolling(window=20).mean()
return df.tail(limit)
except Exception as e:
logger.error(f"분봉 조회 실패({code}): {e}")
return pd.DataFrame()
def get_orderbook(self, stock_code):
"""호가 조회 [v1_국내주식-009]"""
try:
r = self._get(
"/uapi/domestic-stock/v1/quotations/inquire-asking-price-exp-ccn",
"FHKST01010200",
{
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": stock_code,
},
)
if r.status_code != 200:
return None
j = r.json()
if j.get("rt_cd") != "0":
return None
return j.get("output")
except Exception as e:
logger.error(f"호가 조회 실패({stock_code}): {e}")
return None
# ============================================================
# 순위분석 API (키움 봇과 동일한 로직 + 레버리지/스팩/ETN 제외 옵션)
# ============================================================
@staticmethod
def _is_valid_stock(name: str, code: str) -> bool:
"""
종목 필터링 (키움 kiwoom_trader_ver2와 동일, ETF는 포함)
- 스팩, ETN, 우선주, 레버리지, 인버스 등만 제외 (ETF는 위험도 낮아 포함)
"""
if not code or len(code) != 6 or not code.isdigit():
return False
name = (name or "").strip()
exclude = [
"스팩", "SPAC", "ETN", "W", "ELW", "채권",
"레버리지", "인버스", "곱버스", "선물", "", "",
"2X", "3X", "합성", "H", "B",
]
# ETF는 exclude 목록에 없음 → 일반 주식·ETF 모두 통과
if any(k in name for k in exclude):
return False
if name.endswith("") or name.endswith("우B"):
return False
return True
def _filter_rank_by_valid_stock(self, rank_list: list) -> list:
"""랭크 API 응답 리스트에서 스팩/ETN/레버리지 등 제외 (키움 옵션과 동일)"""
if not rank_list:
return []
filtered = []
for item in rank_list:
code = (item.get("stk_cd") or item.get("mksc_shrn_iscd") or item.get("code") or "").strip()
name = (item.get("stk_nm") or item.get("prst_name") or item.get("hts_kor_isnm") or "").strip()
if self._is_valid_stock(name, code):
filtered.append(item)
return filtered
def _fetch_volume_rank_paged(
self,
market: str,
blng_cls_code: str,
limit: int,
exclude_spec_etn_leverage: bool,
) -> list:
"""
거래량순위 API 연속 조회 (tr_cont)로 limit건까지 수집.
API가 한 번에 20~30건만 주므로, tr_cont='M'이면 다음 페이지 요청 반복.
"""
path = "/uapi/domestic-stock/v1/quotations/volume-rank"
tr_id = "FHPST01710000"
base = {
"FID_COND_MRKT_DIV_CODE": market,
"FID_COND_SCR_DIV_CODE": "20171",
"FID_INPUT_ISCD": "0000",
"FID_DIV_CLS_CODE": "0",
"FID_BLNG_CLS_CODE": blng_cls_code,
"FID_TRGT_CLS_CODE": "111111111",
"FID_TRGT_EXLS_CLS_CODE": "0000000000",
"FID_INPUT_PRICE_1": "0",
"FID_INPUT_PRICE_2": "0",
"FID_VOL_CNT": "0",
"FID_INPUT_DATE_1": "",
}
accumulated = []
tr_cont = ""
max_pages = 20 # 20페이지 이상이면 중단 (과부하 방지)
page = 0
try:
while len(accumulated) < limit and page < max_pages:
params = {**base}
time.sleep(0.5)
# 연속 조회 시 tr_cont는 요청 헤더로 전달 (한투 문서: Request Header tr_cont=N)
r = self._get(path, tr_id, params, tr_cont=tr_cont if tr_cont else None)
if r.status_code != 200:
break
j = r.json()
if j.get("rt_cd") != "0":
break
output = j.get("output", [])
if exclude_spec_etn_leverage:
output = self._filter_rank_by_valid_stock(output)
# 2차 이상 수신 시: 이번 output이 이미 누적된 종목과 완전 동일하면 서버가 같은 페이지를 반복 준 것 → 중복 누적·추가 요청 중단
def _codes_from_list(lst):
s = set()
for item in lst:
c = (item.get("stk_cd") or item.get("mksc_shrn_iscd") or item.get("code") or "").strip()
if c:
s.add(c)
return s
if page >= 1 and output:
already = _codes_from_list(accumulated)
this_codes = _codes_from_list(output)
if this_codes and this_codes <= already:
logger.info(f" 📡 [순위API] 2차 수신 {len(output)}건은 1차와 동일(중복) → 연속조회 중단 (API가 다음 페이지 미지원 또는 동일 데이터 반환)")
break
accumulated.extend(output)
# 연속 조회: tr_cont는 HTTP 응답 헤더에 있음 (한투 문서). 소문자/대문자 모두 확인
tr_cont_resp = ""
for k, v in (r.headers or {}).items():
if k.strip().lower() == "tr_cont" and v:
tr_cont_resp = v.strip() if isinstance(v, str) else str(v)
break
if not tr_cont_resp:
tr_cont_resp = (r.headers.get("tr_cont") or r.headers.get("TR_CONT") or "").strip()
if isinstance(tr_cont_resp, str):
tr_cont_resp = tr_cont_resp.strip()
# 페이지네이션 동작 확인용 INFO 로그 (기본 로그 레벨에서 보이도록)
header_keys = list((r.headers or {}).keys())
logger.info(f" 📡 [순위API] 1차 수신 {len(output)}건, 누적 {len(accumulated)}건 | 응답 tr_cont='{tr_cont_resp}' | 헤더키: {header_keys}")
# tr_cont=M이면 다음 페이지 있음. 또는 첫 페이지에서 누적이 limit 미만이면 한 번 더 시도 (서버가 tr_cont 없이도 다음 페이지 지원하는 경우 대비)
if tr_cont_resp == "M":
tr_cont = "N"
page += 1
logger.info(f" 📡 [연속조회] tr_cont=M → tr_cont=N으로 다음 페이지 요청 (페이지 {page})")
elif page == 0 and len(output) > 0 and len(accumulated) < limit:
tr_cont = "N"
page += 1
logger.info(f" 📡 [연속조회] 누적 {len(accumulated)}건 < {limit}건 → tr_cont=N으로 다음 페이지 1회 시도")
else:
break
time.sleep(random.uniform(0.8, 1.5))
return accumulated[:limit]
except Exception as e:
logger.debug(f"거래량순위 연속 조회 실패: {e}")
return accumulated[:limit]
def get_volume_rank(
self,
market: str = "J",
limit: int = 50,
exclude_spec_etn_leverage: bool = True,
):
"""
거래량순위 조회 [v1_국내주식-047] (연속 조회로 limit건까지 수집)
"""
try:
output = self._fetch_volume_rank_paged(
market=market,
blng_cls_code="0",
limit=limit,
exclude_spec_etn_leverage=exclude_spec_etn_leverage,
)
return output
except Exception as e:
logger.debug(f"거래량순위 조회 실패: {e}")
return []
def get_price_change_rank(
self,
market: str = "J",
sort_type: str = "1",
limit: int = 50,
exclude_spec_etn_leverage: bool = True,
):
"""
등락률순위 조회 (동일 volume-rank API, FID_BLNG_CLS_CODE로 등락 구분)
sort_type: "1"=상승률 상위, "2"=하락률 상위(낙폭 큰 종목, N자 망치봉 스캔에 유리)
"""
# 한투 volume-rank API: 4=등락률(상승), 5=등락률(하락). 미지원 시 빈값/에러 가능.
blng = "5" if sort_type == "2" else "4"
try:
out = self._fetch_volume_rank_paged(
market=market,
blng_cls_code=blng,
limit=limit,
exclude_spec_etn_leverage=exclude_spec_etn_leverage,
)
if out:
return out
except Exception as e:
logger.debug(f"등락률순위(blng={blng}) 조회 실패: {e}")
# API가 4/5 미지원이면 기존처럼 거래량순위로 fallback (상승 위주 후보 확보)
if sort_type != "2":
return self.get_volume_rank(market=market, limit=limit, exclude_spec_etn_leverage=exclude_spec_etn_leverage)
return []
def get_price_decline_rank(
self,
market: str = "J",
limit: int = 100,
exclude_spec_etn_leverage: bool = True,
):
"""하락률 순위(낙폭 큰 종목) 조회. N자 망치봉/개미털기 스캔 유니버스 확대용."""
return self.get_price_change_rank(
market=market,
sort_type="2",
limit=limit,
exclude_spec_etn_leverage=exclude_spec_etn_leverage,
)
def get_trading_value_rank(
self,
market: str = "J",
limit: int = 50,
exclude_spec_etn_leverage: bool = True,
):
"""거래대금순위 조회 (연속 조회로 limit건까지). FID_BLNG_CLS_CODE=3"""
try:
return self._fetch_volume_rank_paged(
market=market,
blng_cls_code="3",
limit=limit,
exclude_spec_etn_leverage=exclude_spec_etn_leverage,
)
except Exception as e:
logger.debug(f"거래대금순위 조회 실패: {e}")
return []
def get_turnover_rank(
self,
market: str = "J",
limit: int = 50,
exclude_spec_etn_leverage: bool = True,
):
"""회전율순위 조회 (연속 조회로 limit건까지). FID_BLNG_CLS_CODE=2"""
try:
return self._fetch_volume_rank_paged(
market=market,
blng_cls_code="2",
limit=limit,
exclude_spec_etn_leverage=exclude_spec_etn_leverage,
)
except Exception as e:
logger.debug(f"회전율순위 조회 실패: {e}")
return []
def get_volume_growth_rank(
self,
market: str = "J",
limit: int = 50,
exclude_spec_etn_leverage: bool = True,
):
"""거래증가율순위 조회 (연속 조회로 limit건까지). FID_BLNG_CLS_CODE=1"""
try:
return self._fetch_volume_rank_paged(
market=market,
blng_cls_code="1",
limit=limit,
exclude_spec_etn_leverage=exclude_spec_etn_leverage,
)
except Exception as e:
logger.debug(f"거래증가율순위 조회 실패: {e}")
return []
def get_execution_strength_rank(
self,
market: str = "J",
limit: int = 200,
exclude_spec_etn_leverage: bool = True,
):
"""체결강도 상위 순위 조회 (FHPST01710000, FID_BLNG_CLS_CODE=6). 매수세 강한 종목 필터용."""
try:
return self._fetch_volume_rank_paged(
market=market,
blng_cls_code="6",
limit=limit,
exclude_spec_etn_leverage=exclude_spec_etn_leverage,
)
except Exception as e:
logger.debug(f"체결강도순위 조회 실패: {e}")
return []
def get_execution_strength_map(self, market: str = "J", limit: int = 200):
"""
체결강도 상위 API 조회 후 종목코드 -> 체결강도 값 매핑 반환.
output 필드: cntr_str(체결강도) 등 한투 문서 기준으로 파싱. 미제공 시 0.
"""
strength_map = {}
try:
rows = self.get_execution_strength_rank(market=market, limit=limit, exclude_spec_etn_leverage=True)
for item in (rows or []):
code = (item.get("stk_cd") or item.get("mksc_shrn_iscd") or item.get("code") or "").strip()
if not code or len(code) != 6:
continue
# 한투 volume-rank 체결강도: cntr_str 또는 유사 필드 (문서 확인 후 조정)
raw = item.get("cntr_str") or item.get("exec_str") or item.get("strg_rt") or item.get("prdy_ctrt") or ""
try:
strength = float(str(raw).replace(",", "").strip()) if raw else 0
except (ValueError, TypeError):
strength = 0
strength_map[code] = strength
except Exception as e:
logger.debug(f"체결강도 맵 조회 실패: {e}")
return strength_map
# ============================================================
# Mattermost 봇 클래스
# ============================================================
class MattermostBot:
"""Mattermost 알림 봇"""
def __init__(self):
self.api_url = f"{MM_SERVER_URL.rstrip('/')}/api/v4/posts"
self.headers = {
"Authorization": f"Bearer {MM_BOT_TOKEN}",
"Content-Type": "application/json"
}
self.channels = self._load_channels()
def _load_channels(self):
"""채널 설정 로드"""
try:
if MM_CONFIG_FILE.exists():
with open(MM_CONFIG_FILE, 'r', encoding='utf-8') as f:
return json.load(f).get("channels", {})
return {}
except Exception as e:
logger.error(f"⚠️ MM 설정 로드 실패: {e}")
return {}
def send(self, channel_alias, message):
"""메시지 전송"""
channel_id = self.channels.get(channel_alias)
if not channel_id:
logger.warning(f"'{channel_alias}' 채널 ID 없음")
return False
payload = {"channel_id": channel_id, "message": message}
try:
res = requests.post(self.api_url, headers=self.headers, json=payload, timeout=3)
res.raise_for_status()
return True
except Exception as e:
logger.error(f"❌ MM 전송 에러: {e}")
return False
# ============================================================
# 단타 트레이딩 봇
# ============================================================
class ShortTradingBot:
"""단타용 트레이딩 봇 - 개미털기(눌림목) 전략"""
def __init__(self):
self.db = db
self.client = KISClient()
# Mattermost 초기화
self.mm = MattermostBot()
# 단타 봇 전용 채널(alias) 우선 사용, 없으면 기본 채널 사용
self.mm_channel = MM_CHANNEL_SHORT
# ML 예측 초기화 (선택적)
self.ml_predictor = None
if ML_AVAILABLE:
try:
self.ml_predictor = MLPredictor(db_path=str(SCRIPT_DIR / "quant_bot.db"))
if self.ml_predictor.should_retrain():
self.ml_predictor.train_model(retrain=True)
except Exception as e:
logger.warning(f"⚠️ ML 예측 초기화 실패: {e}")
# 전략 파라미터 (DB·env 연동)
# ============================================================
# [손절/익절 설정] - 대형주 기준 현실적인 값
# ============================================================
# 손절 라인: -4% (주가 기준, 대형주는 노이즈 감안해 넉넉히)
self.stop_loss_pct = get_env_float("STOP_LOSS_PCT", -0.04) # -4% 손절 (대형주 기준)
self.take_profit_pct = get_env_float("TAKE_PROFIT_PCT", 0.05)
self.max_stocks = get_env_int("MAX_STOCKS", 3)
# 개미털기 필터: 낙폭 3% 이상 + 회복률 50% 이상 통과 (후보가 너무 적으면 MIN_DROP_RATE=0.02, MIN_RECOVERY_RATIO_SHORT=0.32 등 완화)
self.min_drop_rate = get_env_float("MIN_DROP_RATE", 0.03)
self.min_recovery_ratio = get_env_float("MIN_RECOVERY_RATIO_SHORT", 0.5)
# ATR 기반 손절/익절 배수 (변동성 기반 동적 손절가/목표가)
self.stop_atr_multiplier = get_env_float("STOP_ATR_MULTIPLIER_TAIL", 2.5)
self.target_atr_multiplier = get_env_float("TARGET_ATR_MULTIPLIER_TAIL", 8.0)
# 24시간 보유 전략 (N자 패턴 등 다음날 상승 가능성 있는 종목)
self.min_hold_hours = get_env_float("MIN_HOLD_HOURS", 24.0) # 최소 보유 시간 (기본 24시간)
# ============================================================
# [리스크 관리 설정] - 변동성 역가중 (Volatility Inverse Weighting)
# ============================================================
self.risk_pct_per_trade = get_env_float("RISK_PCT_PER_TRADE", 0.01) # 1% (계좌 기준)
self.kelly_multiplier = get_env_float("KELLY_MULTIPLIER", 0.25) # 쿼터 켈리 (0.25)
self.max_position_pct = get_env_float("MAX_POSITION_PCT", 0.15) # 종목당 최대 15%
self.min_position_amount = get_env_int("MIN_POSITION_AMOUNT", 50000) # 최소 5만원
# RiskManager 초기화 (변동성 역가중 + 쿼터 켈리 매수 금액 계산)
self.risk_mgr = None
if RISK_MANAGER_AVAILABLE:
# 켈리 공식 사용 여부 (기본 ON - 쿼터 켈리 0.25 적용)
use_kelly = get_env_bool("USE_KELLY_FORMULA", True)
self.risk_mgr = RiskManager(
risk_pct_per_trade=self.risk_pct_per_trade,
max_position_pct=self.max_position_pct,
min_position_amount=self.min_position_amount,
use_kelly=use_kelly,
kelly_multiplier=self.kelly_multiplier, # 쿼터 켈리 0.25
)
kelly_status = f"켈리{'ON' if use_kelly else 'OFF'}(배율={self.kelly_multiplier})"
logger.info(f"✅ RiskManager 활성화: 변동성 역가중 + {kelly_status}")
else:
logger.warning("⚠️ RiskManager 미사용: 고정 슬롯 금액 방식으로 폴백")
# ML 신호 필터링 설정
self.use_ml_signal = get_env_bool("USE_ML_SIGNAL", False)
self.ml_min_probability = get_env_float("ML_MIN_PROBABILITY", 0.57)
# 리포트 플래그
self.morning_report_sent = False
self.closing_report_sent = False
self.final_report_sent = False
self.ai_report_sent = False
# 자산 추적
self.today_date = dt.now().strftime("%Y-%m-%d")
self.start_day_asset = 0
self.current_total_asset = 0
self.current_cash = 0
self.d2_excc_amt = 0 # D+2 예수금 (output2 prvs_rcdl_excc_amt)
self.total_deposit = get_env_float("TOTAL_DEPOSIT", 0)
# DB에서 활성 트레이드 로드
self.holdings = {}
active_trades = self.db.get_active_trades()
for code, trade in active_trades.items():
self.holdings[code] = {
"buy_price": trade.get("avg_buy_price", 0),
"qty": trade.get("current_qty", 0),
"buy_time": trade.get("buy_date", dt.now().strftime("%Y-%m-%d %H:%M:%S")),
"name": trade.get("name", code),
}
# 초기 자산 조회
self._update_assets()
# 비동기 태스크 관리
self._universe_task = None
self._report_task = None
self._asset_task = None
self.is_first_run = True
def _seconds_until_next_5min(self):
"""다음 5분 정각까지 남은 초 계산"""
now = dt.now()
next_min = ((now.minute // 5) + 1) * 5
if next_min >= 60:
next_time = now.replace(hour=now.hour + 1, minute=0, second=0, microsecond=0)
else:
next_time = now.replace(minute=next_min, second=0, microsecond=0)
return (next_time - now).total_seconds()
def update_universe(self):
"""
유니버스 업데이트 (5분마다 호출) - 키움 봇과 동일한 복합 스캔 로직
- 개미털기 우선 (원본 점수 유지)
- 외국인/거래량/상승률/기관 추가 (보너스 점수)
- 강도 순으로 정렬 → Top 30
"""
logger.info(f"🔄 [유니버스 업데이트] 시작 | 예수금: {self.current_cash:,.0f}")
logger.info("📡 [복합 스캔] 개미털기 우선 + 4가지 보너스 소스")
try:
# 매수 가능 금액 계산
if self.max_stocks > 0:
slot_money = int(self.current_cash * 0.9 / self.max_stocks)
else:
slot_money = 100000
all_candidates = {} # {code: {name, price, base_score, bonus_score, total_score}}
# 이번 스캔 주기용으로 후보 테이블 비움 (통과 시마다 add_target_candidate로 채워짐)
self.db.update_target_candidates([])
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# 1. 개미털기 (눌림목) - 원본 점수 100% 유지! (통과 시마다 즉시 add_target_candidate)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
try:
ant_shaking = self.scan_ant_shaking_candidates(max_candidates=50)
logger.info(f" ✅ [개미털기] {len(ant_shaking)}개 수집 (강도 원본 유지)")
for item in ant_shaking:
code = item['code']
all_candidates[code] = {
'code': code,
'name': item['name'],
'price': item['price'],
'base_score': item['score'], # 개미털기 원본 점수 (100%)
'bonus_score': 0.0, # 보너스 점수 (추가)
'from_ant': True,
'drop_rate': item.get('drop_rate', 0),
'recovery': item.get('recovery', 0),
}
except Exception as e:
logger.warning(f" ⚠️ [개미털기] 수집 실패: {e}")
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# 2. 거래량순위 - 보너스 +0.3 (순위별 가산점)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
try:
volume_rank = self.client.get_volume_rank(market="J", limit=50)
logger.info(f" ✅ [거래량순위] {len(volume_rank)}개 수집")
for idx, item in enumerate(volume_rank):
code = item.get('stk_cd', '').strip() or item.get('code', '').strip()
if not code or len(code) != 6:
continue
bonus = (50 - idx) / 50.0 * 0.3 # 순위별 보너스 (최대 +0.3)
if code in all_candidates:
all_candidates[code]['bonus_score'] += bonus
else:
# 신규 추가 (가격 확인 필요)
price_data = self.client.inquire_price(code)
if price_data:
current_price = abs(float(price_data.get("stck_prpr", 0)))
if current_price > 0 and current_price <= slot_money:
all_candidates[code] = {
'code': code,
'name': item.get('stk_nm', code),
'price': current_price,
'base_score': 0.0,
'bonus_score': bonus,
'from_ant': False,
}
except Exception as e:
logger.warning(f" ⚠️ [거래량순위] 수집 실패: {e}")
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# 3. 등락률순위 (상승률) - 보너스 +0.2
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
try:
price_movers = self.client.get_price_change_rank(market="J", sort_type="1", limit=30)
logger.info(f" ✅ [등락률순위] {len(price_movers)}개 수집")
for idx, item in enumerate(price_movers):
code = item.get('stk_cd', '').strip() or item.get('code', '').strip()
if not code or len(code) != 6:
continue
bonus = (30 - idx) / 30.0 * 0.2 # 순위별 보너스 (최대 +0.2)
if code in all_candidates:
all_candidates[code]['bonus_score'] += bonus
else:
price_data = self.client.inquire_price(code)
if price_data:
current_price = abs(float(price_data.get("stck_prpr", 0)))
if current_price > 0 and current_price <= slot_money:
all_candidates[code] = {
'code': code,
'name': item.get('stk_nm', code),
'price': current_price,
'base_score': 0.0,
'bonus_score': bonus,
'from_ant': False,
}
except Exception as e:
logger.warning(f" ⚠️ [등락률순위] 수집 실패: {e}")
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# 4. 거래대금순위 - 보너스 +0.2
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
try:
value_rank = self.client.get_trading_value_rank(market="J", limit=30)
logger.info(f" ✅ [거래대금순위] {len(value_rank)}개 수집")
for idx, item in enumerate(value_rank):
code = item.get('stk_cd', '').strip() or item.get('code', '').strip()
if not code or len(code) != 6:
continue
bonus = (30 - idx) / 30.0 * 0.2 # 순위별 보너스 (최대 +0.2)
if code in all_candidates:
all_candidates[code]['bonus_score'] += bonus
except Exception as e:
logger.warning(f" ⚠️ [거래대금순위] 수집 실패: {e}")
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# 5. 외국인/기관 순매수 - 보너스 +0.3 (투자자 동향 기반)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# 개미털기 후보에 대해서만 투자자 동향 확인 (API 호출 최소화)
investor_check_count = 0
for code, candidate in list(all_candidates.items()):
if candidate.get('from_ant') and investor_check_count < 10: # 최대 10개만 체크
try:
investor_trend = self.client.get_investor_trend(code, days=2)
investor_check_count += 1
if investor_trend:
total_net = investor_trend.get("total_net_buy", 0)
if total_net > 20000:
candidate['bonus_score'] += 0.3 # 강한 매수세
elif total_net > 5000:
candidate['bonus_score'] += 0.15 # 매수세
except Exception as e:
logger.debug(f"투자자 동향 조회 실패({code}): {e}")
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# 최종 점수 계산 및 정렬
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
final_candidates = []
for code, item in all_candidates.items():
total_score = item['base_score'] + item['bonus_score']
final_candidates.append({
'code': code,
'name': item['name'],
'price': item['price'],
'score': total_score,
'base_score': item['base_score'],
'bonus_score': item['bonus_score'],
'from_ant': item.get('from_ant', False),
'drop_rate': item.get('drop_rate', 0),
'recovery': item.get('recovery', 0),
})
# 강도 순 정렬 (total_score 기준)
final_candidates.sort(key=lambda x: x['score'], reverse=True)
# 강도 4.0 이상만 필터링 (키움 봇과 동일)
filtered = [c for c in final_candidates if c['score'] >= 4.0]
if not filtered:
logger.warning(f" ⚠️ 강도 4.0 이상 후보 없음 (전체: {len(final_candidates)}개)")
# 강도 낮춰서라도 상위 30개 저장
filtered = final_candidates[:30]
# DB 저장: 스캔 중 통과 시마다 이미 add_target_candidate()로 실시간 저장됨 → 여기서는 전체 교체 안 함
# 스캔 완료 후에는 정렬만 해서 로그만 찍음 (매매 루프는 DB에서 실시간으로 후보 읽음)
logger.info(f" 💾 매수 후보군: 스캔 중 통과 즉시 저장 완료, 최종 후보 {len(filtered)}개 (DB 반영됨, 정렬 순 로그만 출력)")
# Top 5 상세 로그 (강도 순, 종목명 표시)
logger.info(f" 🔝 [유니버스 Top 5] (강도 순)")
# Top 5 종목의 전일 대비 정보 조회 (API 문서: stck_prdy_clpr = 전일 종가)
def _safe_float(val):
if not val:
return 0.0
try:
return abs(float(str(val).replace(",", "").strip() or 0))
except:
return 0.0
prev_day_map = {}
for x in filtered[:5]:
code = x["code"]
name = x.get("name", code)
try:
price_data = self.client.inquire_price(code)
if not price_data:
logger.info(f" ⚠️ 전일대비: {name} {code} API응답없음")
continue
current_price = _safe_float(price_data.get("stck_prpr")) or _safe_float(x.get("price", 0))
prev_close = _safe_float(price_data.get("stck_prdy_clpr")) # API 문서 필드명 그대로 사용
if prev_close <= 0: # 없으면 일봉에서 전일 close
df_daily = self.client.get_daily_chart(code, limit=3)
if not df_daily.empty and len(df_daily) >= 2:
prev_close = _safe_float(df_daily["close"].iloc[-2])
logger.info(f" 📊 전일대비: {name} {code} 일봉에서 전일종가 조회 성공 ({prev_close:,.0f}원)")
else:
logger.info(f" ⚠️ 전일대비: {name} {code} 일봉데이터없음 (len={len(df_daily) if not df_daily.empty else 0})")
if prev_close > 0 and current_price > 0:
change = current_price - prev_close
change_pct = (change / prev_close) * 100
prev_day_map[code] = (change, change_pct)
else:
logger.info(f" ⚠️ 전일대비: {name} {code} prev_close={prev_close}, current={current_price}")
time.sleep(random.uniform(0.2, 0.3))
except Exception as e:
logger.warning(f" ⚠️ 전일대비 조회 실패({name} {code}): {e}")
for i, x in enumerate(filtered[:5], 1):
base = x.get('base_score', 0)
bonus = x.get('bonus_score', 0)
total = x['score']
drop_pct = x.get('drop_rate', 0) * 100
recovery_pct = x.get('recovery', 0) * 100
if x.get('from_ant'):
source = "개미털기"
else:
source = "랭킹"
if x.get('ml_probability'):
ml_info = f" | ML {x.get('ml_probability', 0):.1%}"
else:
ml_info = ""
# 전일 대비 등락률 표시
prev_day_info = ""
code = x['code']
if code in prev_day_map:
change, change_pct = prev_day_map[code]
sign = "+" if change >= 0 else ""
prev_day_info = f" | 어제보다 {sign}{change:,.0f}원 ({sign}{change_pct:.2f}%)"
logger.info(
f" {i}. {x['name']} {x['code']}: "
f"강도 {total:.1f} (기본 {base:.1f} + 보너스 {bonus:.1f}) | "
f"낙폭 {drop_pct:.1f}% | 회복 {recovery_pct:.0f}% | {source}{ml_info}{prev_day_info}"
)
logger.info(f" ✅ 최종 후보: {len(filtered)}개 (강도 4.0 이상: {len([c for c in final_candidates if c['score'] >= 4.0])}개)")
except Exception as e:
logger.error(f"❌ 유니버스 업데이트 실패: {e}")
import traceback
logger.error(traceback.format_exc())
async def _universe_scan_scheduler(self):
"""5분마다 정각에 유니버스 스캔 실행 (비동기 백그라운드)"""
loop = asyncio.get_event_loop()
while True:
try:
if self.is_first_run:
wait_sec = 0 # 첫 실행은 즉시
else:
wait_sec = max(0, self._seconds_until_next_5min())
if wait_sec > 0:
await asyncio.sleep(wait_sec)
now = dt.now()
logger.info(f"🔄 [스캔 주기] 정각 스캔 시작 | 시각:{now.hour:02d}:{now.minute:02d}:{now.second:02d}")
# 동기 함수를 executor에서 실행 (메인 루프 블로킹 방지)
await loop.run_in_executor(None, self.update_universe)
self.is_first_run = False
await asyncio.sleep(5) # 스캔 직후 5초 대기 (과부하 방지)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"❌ [스캔 스케줄러] 에러: {e}")
await asyncio.sleep(60)
async def _report_scheduler(self):
"""리포트 전송 스케줄러 (비동기 백그라운드)"""
while True:
try:
await asyncio.sleep(60) # 1분마다 체크
now = dt.now()
# 13:00 - 오전 리포트 + AI 리포트
if now.hour == 13 and now.minute == 0 and not self.morning_report_sent:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.send_morning_report)
await loop.run_in_executor(None, self.send_ai_report)
# 15:15 - 장마감 전 리포트
elif now.hour == 15 and now.minute == 15 and not self.closing_report_sent:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.send_closing_report)
# 15:35 - 최종 리포트
elif now.hour == 15 and now.minute == 35 and not self.final_report_sent:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.send_final_report)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"❌ [리포트 스케줄러] 에러: {e}")
await asyncio.sleep(60)
async def _asset_update_scheduler(self):
"""자산 정보 업데이트 스케줄러 (30분마다, 비동기 백그라운드)"""
while True:
try:
await asyncio.sleep(60) # 1분마다 체크
now = dt.now()
# 30분마다 자산 업데이트
if now.minute % 30 == 0:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._update_assets)
await asyncio.sleep(60) # 업데이트 후 1분 대기 (중복 방지)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"❌ [자산 업데이트 스케줄러] 에러: {e}")
await asyncio.sleep(60)
def _update_assets(self):
"""자산 정보 업데이트"""
try:
balance = self.client.get_account_balance()
if balance is None:
logger.warning(
"💵 [예수금] get_account_balance가 None 반환 → 예수금 갱신 스킵 "
"(토큰·계좌·TR ID 확인. 모의=VTTC8434R, 실전=TTTC8434R)"
)
return
# 한투 API: output1=주식 잔고(종목별), output2=예수금 관련(dnca_tot_amt 등) - 블로그·문서 기준
def _parse_amt(v):
if v is None or str(v).strip() == "":
return None
return float(str(v).replace(",", "").strip())
def _cash_block(obj):
if not obj:
return {}
if isinstance(obj, list) and obj:
return obj[0]
if isinstance(obj, dict):
return obj
return {}
out2 = _cash_block(balance.get("output2"))
if isinstance(balance.get("output1"), dict):
out1 = balance.get("output1", {})
else:
out1 = {}
ord_psbl_val = _parse_amt(out2.get("ord_psbl_cash") or out1.get("ord_psbl_cash"))
dnca_tot_val = _parse_amt(out2.get("dnca_tot_amt") or out1.get("dnca_tot_amt")) or 0
# D+2 예수금 (전일 정산 수령 예정 금액) - 한투 output2 prvs_rcdl_excc_amt
prvs_rcdl = _parse_amt(out2.get("prvs_rcdl_excc_amt"))
if prvs_rcdl is not None:
self.d2_excc_amt = prvs_rcdl
else:
self.d2_excc_amt = 0
if ord_psbl_val is not None:
self.current_cash = ord_psbl_val
logger.info(
f"💵 [예수금] 주문가능(ord_psbl_cash)={self.current_cash:,.0f}원 | "
f"dnca_tot_amt={dnca_tot_val:,.0f} | D+2예수금(prvs_rcdl_excc_amt)={self.d2_excc_amt:,.0f}"
)
else:
self.current_cash = dnca_tot_val
logger.info(
f"💵 [예수금] dnca_tot_amt={self.current_cash:,.0f}원 | "
f"D+2예수금(prvs_rcdl_excc_amt)={self.d2_excc_amt:,.0f}원 (output2 keys={list(out2.keys()) if out2 else []})"
)
# 보유 종목 평가액 계산
holdings_value = 0
for code, holding in self.holdings.items():
price_data = self.client.inquire_price(code)
if price_data:
current_price = abs(float(price_data.get("stck_prpr", 0)))
holdings_value += current_price * holding["qty"]
self.current_total_asset = self.current_cash + holdings_value
if self.start_day_asset == 0:
self.start_day_asset = self.current_total_asset
except Exception as e:
logger.error(f"자산 정보 업데이트 실패: {e}")
def _update_account_light(self, profit_val=0):
"""
경량 계좌 갱신 (매수/매도 직후 즉시 호출!)
- API 부하를 줄이기 위해 예수금 + 보유 종목 평가액만 빠르게 계산
- 총자산 = 예수금 + 보유 종목 평가액 (+ 손익 반영)
"""
try:
balance = self.client.get_account_balance()
if balance is None:
logger.warning("💵 [예수금-경량] get_account_balance None → 예수금 갱신 스킵")
return False
def _parse_amt_light(v):
if v is None or str(v).strip() == "":
return None
return float(str(v).replace(",", "").strip())
def _cash_block_light(obj):
if not obj:
return {}
if isinstance(obj, list) and obj:
return obj[0]
if isinstance(obj, dict):
return obj
return {}
out2 = _cash_block_light(balance.get("output2"))
if isinstance(balance.get("output1"), dict):
out1 = balance.get("output1", {})
else:
out1 = {}
ord_psbl_val = _parse_amt_light(out2.get("ord_psbl_cash") or out1.get("ord_psbl_cash"))
dnca_tot_val = _parse_amt_light(out2.get("dnca_tot_amt") or out1.get("dnca_tot_amt")) or 0
prvs_rcdl = _parse_amt_light(out2.get("prvs_rcdl_excc_amt"))
if prvs_rcdl is not None:
self.d2_excc_amt = prvs_rcdl
if ord_psbl_val is not None:
new_cash = ord_psbl_val
else:
new_cash = dnca_tot_val
logger.info(
f"💵 [예수금-경량] 주문가능={new_cash:,.0f}원 (이전={self.current_cash:,.0f}) | D+2예수금={self.d2_excc_amt:,.0f}"
)
if new_cash > 0 or self.current_cash == 0:
self.current_cash = new_cash
# 보유 종목 평가액: output1=주식 잔고(종목별), output2=예수금 요약 (블로그 기준)
output1_list = balance.get("output1", [])
if isinstance(output1_list, dict):
output1_list = [output1_list]
holdings_value = 0
for code, holding in self.holdings.items():
for item in output1_list:
if (item.get("pdno") or "").strip() == code:
evlu_amt = float(item.get("evlu_amt", 0))
holdings_value += evlu_amt
break
else:
price_data = self.client.inquire_price(code)
if price_data:
current_price = abs(float(price_data.get("stck_prpr", 0)))
holdings_value += current_price * holding["qty"]
self.current_total_asset = self.current_cash + holdings_value
if profit_val != 0:
self.current_total_asset += profit_val
logger.debug(f"💵 [경량갱신] 예수금: {self.current_cash:,.0f}원 | 총자산: {self.current_total_asset:,.0f}")
return True
except Exception as e:
logger.error(f"❌ 경량 갱신 실패: {e}")
return False
def _update_cash_only(self):
"""예수금만 빠르게 업데이트 (하위 호환성용, _update_account_light 사용 권장)"""
return self._update_account_light(profit_val=0)
def send_mm(self, msg):
"""Mattermost 알림 전송"""
try:
self.mm.send(self.mm_channel, msg)
except Exception as e:
logger.error(f"❌ MM 전송 에러: {e}")
def check_market_status(self):
"""장 운영 시간 체크"""
# FORCE_MARKET_OPEN 플래그 확인 (테스트용)
force_open = get_env_bool("FORCE_MARKET_OPEN", False)
if force_open:
logger.debug("🔓 FORCE_MARKET_OPEN=true - 장 상태 무시하고 계속 진행")
return True
# 정상 장 운영 시간 체크
now = dt.now()
if not (datetime.time(8, 30) <= now.time() <= datetime.time(16, 0)):
return False
if now.weekday() >= 5: # 주말
return False
return True
def send_morning_report(self):
"""오전 장 뜸할 때 리포트 (13:00)"""
if self.morning_report_sent:
return
self._update_assets()
day_pnl = self.current_total_asset - self.start_day_asset
if self.start_day_asset > 0:
day_pnl_pct = day_pnl / self.start_day_asset * 100
else:
day_pnl_pct = 0
msg = f"""📊 **[오전 장 현황 - 13:00]**
- 당일 시작: {self.start_day_asset:,.0f}
- 현재 자산: {self.current_total_asset:,.0f}
- 당일 손익: {day_pnl:+,.0f}원 ({day_pnl_pct:+.2f}%)
- 보유 종목: {len(self.holdings)}"""
self.send_mm(msg)
self.morning_report_sent = True
logger.info("📊 오전 리포트 전송 완료")
def send_closing_report(self):
"""장마감 전 리포트 (15:15)"""
if self.closing_report_sent:
return
self._update_assets()
day_pnl = self.current_total_asset - self.start_day_asset
if self.start_day_asset > 0:
day_pnl_pct = day_pnl / self.start_day_asset * 100
else:
day_pnl_pct = 0
msg = f"""📈 **[장마감 전 현황 - 15:15]**
- 당일 손익: {day_pnl:+,.0f}원 ({day_pnl_pct:+.2f}%)
- 현재 자산: {self.current_total_asset:,.0f}
- 보유 종목: {len(self.holdings)}
- 예수금(주문가능): {self.current_cash:,.0f}원 | D+2예수금: {self.d2_excc_amt:,.0f}"""
self.send_mm(msg)
self.closing_report_sent = True
logger.info("📈 장마감 전 리포트 전송 완료")
def send_final_report(self):
"""장마감 후 최종 리포트 (15:35)"""
if self.final_report_sent:
return
self._update_assets()
# 당일 손익
day_pnl = self.current_total_asset - self.start_day_asset
if self.start_day_asset > 0:
day_pnl_pct = day_pnl / self.start_day_asset * 100
else:
day_pnl_pct = 0
# 누적 손익
cumulative_pnl = self.current_total_asset - self.total_deposit
if self.total_deposit > 0:
cumulative_pnl_pct = cumulative_pnl / self.total_deposit * 100
else:
cumulative_pnl_pct = 0
# 오늘 거래 내역
today_trades = self.db.get_trades_by_date(self.today_date)
msg = f"""🏁 **[장마감 최종 보고 - 15:35]**
━━━━━━━━━━━━━━━━━━━━
📅 **당일 손익**
- 시작: {self.start_day_asset:,.0f}
- 종료: {self.current_total_asset:,.0f}
- 손익: {day_pnl:+,.0f}원 ({day_pnl_pct:+.2f}%)
💰 **누적 손익 (총 입금액 대비)**
- 총 입금: {self.total_deposit:,.0f}
- 현재 자산: {self.current_total_asset:,.0f}
- 누적 손익: {cumulative_pnl:+,.0f}원 ({cumulative_pnl_pct:+.2f}%)
📊 **거래 현황**
- 오늘 매매: {len(today_trades)}
- 보유 종목: {len(self.holdings)}
- 예수금(주문가능): {self.current_cash:,.0f}원 | D+2예수금: {self.d2_excc_amt:,.0f}
━━━━━━━━━━━━━━━━━━━━"""
self.send_mm(msg)
self.final_report_sent = True
logger.info("🏁 장마감 최종 리포트 전송 완료")
def send_ai_report(self):
"""AI 분석 리포트 (13:00)"""
if self.ai_report_sent or not gemini_client:
return
try:
# 최근 거래 내역 조회
recent_trades = []
try:
conn = self.db.conn
cursor = conn.execute("""
SELECT code, name, buy_price, sell_price, qty, profit_rate,
realized_pnl, strategy, sell_reason, buy_date, sell_date, hold_minutes
FROM trade_history
ORDER BY id DESC
LIMIT 10
""")
for row in cursor.fetchall():
recent_trades.append({
'code': row[0],
'name': row[1],
'buy_price': row[2],
'sell_price': row[3],
'qty': row[4],
'profit_rate': row[5],
'realized_pnl': row[6],
'strategy': row[7],
'sell_reason': row[8],
'buy_date': row[9],
'sell_date': row[10],
'hold_minutes': row[11] or 0
})
except Exception as e:
logger.error(f"거래 내역 조회 실패: {e}")
return
# 현재 유니버스 상태
db_candidates = self.db.get_target_candidates()
candidate_count = len(db_candidates)
# 거래 내역이 없어도 유니버스 상태는 리포트에 포함
if not recent_trades:
# 거래 내역 없을 때도 유니버스 상태 리포트
summary = f"""📊 **현재 상태**
- 유니버스 후보: {candidate_count}
- 최근 거래: 없음"""
prompt = f"""당신은 퀀트 트레이딩 전문가입니다.
**현재 상태:**
- 유니버스 후보: {candidate_count}
- 최근 거래: 없음
**현재 설정값:**
- 최대 보유: {self.max_stocks}
- 최소 낙폭: {self.min_drop_rate*100:.1f}%
- 최소 회복률: {self.min_recovery_ratio*100:.0f}%
- ML 사용: {self.use_ml_signal}
- ML 최소 승률: {self.ml_min_probability:.1%}
**당신의 임무:**
1. 후보가 {candidate_count}개인 이유 분석 (필터 조건이 너무 까다로운지 등)
2. **수치 추천** (변수명=값 형식, 정확한 숫자 제시)
- 예: MIN_DROP_RATE=0.025 (2.5%)
- 예: MIN_RECOVERY_RATIO=0.40 (40%)
3. 예상 효과
**출력 형식:**
## 🔍 문제점
1. [구체적 문제 1]
2. [구체적 문제 2]
## 💡 수치 추천 (DB 설정 변경)
- 변수명1=값1 (이유: ...)
- 변수명2=값2 (이유: ...)
## 📈 예상 효과
- [효과 1]
- [효과 2]
"""
response = gemini_client.models.generate_content(model=GEMINI_MODEL_ID, contents=prompt)
analysis = getattr(response, "text", None) or (response.candidates[0].content.parts[0].text if response.candidates else "")
message = f"""🤖 **[13시 AI 자동 분석 + 수치 추천]**
{summary}
{analysis}
---
💬 단타 전략 최적화를 위한 AI 분석입니다. (수치 추천 포함)
"""
self.send_mm(message)
self.ai_report_sent = True
logger.info("🤖 AI 리포트 전송 완료 (거래 내역 없음, 유니버스 상태 포함)")
return
# 통계 계산
total = len(recent_trades)
wins = sum(1 for t in recent_trades if t['profit_rate'] > 0)
losses = total - wins
if total > 0:
win_rate = wins / total * 100
else:
win_rate = 0
avg_profit = sum(t['profit_rate'] for t in recent_trades) / total
total_pnl = sum(t['realized_pnl'] for t in recent_trades)
avg_hold = sum(t['hold_minutes'] for t in recent_trades) / total
# AI 분석
trades_text = ""
for i, t in enumerate(recent_trades, 1):
trades_text += f"""
[거래 {i}] {t['name']} ({t['strategy']})
- 매수: {t['buy_price']:,.0f}× {t['qty']}
- 매도: {t['sell_price']:,.0f}
- 손익: {t['profit_rate']:+.2f}% ({t['realized_pnl']:,.0f}원)
- 보유: {t['hold_minutes']}
- 사유: {t['sell_reason']}
"""
# 현재 유니버스 상태
db_candidates = self.db.get_target_candidates()
candidate_count = len(db_candidates)
prompt = f"""당신은 퀀트 트레이딩 전문가입니다.
**현재 상태:**
- 유니버스 후보: {candidate_count}
- 최근 거래: {total}
- 승률: {win_rate:.1f}% ({wins}{losses}패)
- 평균 수익률: {avg_profit:.2f}%
- 총 손익: {total_pnl:,.0f}
- 평균 보유: {avg_hold:.0f}
**최근 거래 내역:**
{trades_text}
**현재 설정값:**
- 최대 보유: {self.max_stocks}
- 최소 낙폭: {self.min_drop_rate*100:.1f}%
- 최소 회복률: {self.min_recovery_ratio*100:.0f}%
- ML 사용: {self.use_ml_signal}
- ML 최소 승률: {self.ml_min_probability:.1%}
**당신의 임무:**
1. 문제점 3가지 진단 (구체적으로)
- 특히 후보가 {candidate_count}개인 이유 분석
2. **수치 추천** (변수명=값 형식, 정확한 숫자 제시)
- 예: MIN_DROP_RATE=0.025 (2.5%)
- 예: MIN_RECOVERY_RATIO=0.40 (40%)
- 예: MAX_STOCKS=5
3. 예상 효과
**출력 형식:**
## 🔍 문제점
1. [구체적 문제 1]
2. [구체적 문제 2]
3. [구체적 문제 3]
## 💡 수치 추천 (DB 설정 변경)
- 변수명1=값1 (이유: ...)
- 변수명2=값2 (이유: ...)
## 📈 예상 효과
- [효과 1]
- [효과 2]
**간결하고 명확하게 답변하세요.**
"""
response = gemini_client.models.generate_content(model=GEMINI_MODEL_ID, contents=prompt)
analysis = getattr(response, "text", None) or (response.candidates[0].content.parts[0].text if response.candidates else "")
summary = f"""📊 **현재 상태**
- 유니버스 후보: {candidate_count}
- 최근 거래: {total}
- 승률: {win_rate:.1f}% ({wins}{losses}패)
- 평균 수익률: {avg_profit:.2f}%
- 총 손익: {total_pnl:+,.0f}
- 평균 보유: {avg_hold:.0f}"""
message = f"""🤖 **[13시 AI 자동 분석 + 수치 추천]**
{summary}
{analysis}
---
💬 단타 전략 최적화를 위한 AI 분석입니다. (수치 추천 포함)
"""
self.send_mm(message)
self.ai_report_sent = True
logger.info("🤖 AI 리포트 전송 완료")
except Exception as e:
logger.error(f"AI 리포트 생성 실패: {e}")
def _fetch_scan_universe_from_api(self, max_codes=500):
"""
KIS API로 스캔 대상 종목 리스트 조회 (6소스 각 100건 → 최대 500개).
- 거래량·거래대금·회전율·등락률(상승)·등락률(하락)·거래증가율 각 100건 합산 후 중복 제거.
- 리스트는 DB에 저장하지 않음. 스캔 끝난 뒤 후보만 DB에 한 번에 인서트.
Returns:
list[dict]: [{"code": "006자리", "name": "종목명"}, ...] (중복 제거, 최대 max_codes개)
"""
def _code_from_item(item):
code = (item.get("stk_cd") or item.get("mksc_shrn_iscd") or item.get("code") or "").strip()
return code if code and len(code) == 6 else None
def _name_from_item(item):
return (
(item.get("stk_nm") or item.get("prst_name") or item.get("hts_kor_isnm") or "").strip()
or ""
)
scan_list = []
seen = set()
# 1) 거래량순위 100개 (키움은 거래대금+회전율만 사용, KIS는 거래량+거래대금+회전율 3가지로 풀 확대)
try:
time.sleep(random.uniform(0.5, 1.0))
vol_list = self.client.get_volume_rank(market="J", limit=100)
for item in (vol_list or []):
c = _code_from_item(item)
if c and c not in seen:
seen.add(c)
scan_list.append({"code": c, "name": _name_from_item(item) or ""})
logger.info(f" 📡 [스캔유니버스] 거래량순위 API → {len(vol_list)}건 수신, 누적 {len(scan_list)}종목")
except Exception as e:
logger.warning(f" ⚠️ [스캔유니버스] 거래량순위 조회 실패: {e}")
# 2) 거래대금순위 100개 (키움과 동일 소스)
try:
time.sleep(random.uniform(0.5, 1.0))
val_list = self.client.get_trading_value_rank(market="J", limit=100)
for item in (val_list or []):
c = _code_from_item(item)
if c and c not in seen:
seen.add(c)
scan_list.append({"code": c, "name": _name_from_item(item) or ""})
logger.info(f" 📡 [스캔유니버스] 거래대금순위 API → {len(val_list)}건 수신, 누적 {len(scan_list)}종목")
except Exception as e:
logger.warning(f" ⚠️ [스캔유니버스] 거래대금순위 조회 실패: {e}")
# 3) 회전율순위 100개 (키움 개미털기 2번째 소스와 동일)
try:
time.sleep(random.uniform(0.5, 1.0))
turn_list = self.client.get_turnover_rank(market="J", limit=100)
for item in (turn_list or []):
c = _code_from_item(item)
if c and c not in seen:
seen.add(c)
scan_list.append({"code": c, "name": _name_from_item(item) or ""})
logger.info(f" 📡 [스캔유니버스] 회전율순위 API → {len(turn_list)}건 수신, 누적 {len(scan_list)}종목")
except Exception as e:
logger.warning(f" ⚠️ [스캔유니버스] 회전율순위 조회 실패: {e}")
# 4) 등락률순위(상승) 100개 (기존 후보군 풀 확대)
try:
time.sleep(random.uniform(0.5, 1.0))
chg_list = self.client.get_price_change_rank(market="J", sort_type="1", limit=100)
for item in (chg_list or []):
c = _code_from_item(item)
if c and c not in seen:
seen.add(c)
scan_list.append({"code": c, "name": _name_from_item(item) or ""})
logger.info(f" 📡 [스캔유니버스] 등락률순위(상승) API → {len(chg_list)}건 수신, 누적 {len(scan_list)}종목")
except Exception as e:
logger.warning(f" ⚠️ [스캔유니버스] 등락률순위(상승) 조회 실패: {e}")
# 4-2) 등락률순위(하락) 100개 — 낙폭 큰 종목 직접 조회 → N자 망치봉 스캔 효율·Pass-낙폭 포착
try:
time.sleep(random.uniform(0.5, 1.0))
decline_list = self.client.get_price_decline_rank(market="J", limit=100)
for item in (decline_list or []):
c = _code_from_item(item)
if c and c not in seen:
seen.add(c)
scan_list.append({"code": c, "name": _name_from_item(item) or ""})
logger.info(f" 📡 [스캔유니버스] 등락률순위(하락) API → {len(decline_list)}건 수신, 누적 {len(scan_list)}종목")
except Exception as e:
logger.warning(f" ⚠️ [스캔유니버스] 등락률순위(하락) 조회 실패: {e}")
# 5) 거래증가율순위 100개 (거래량/대금/회전율과 다른 풀 → 후보 다양화)
try:
time.sleep(random.uniform(0.5, 1.0))
growth_list = self.client.get_volume_growth_rank(market="J", limit=100)
for item in (growth_list or []):
c = _code_from_item(item)
if c and c not in seen:
seen.add(c)
scan_list.append({"code": c, "name": _name_from_item(item) or ""})
logger.info(f" 📡 [스캔유니버스] 거래증가율순위 API → {len(growth_list)}건 수신, 누적 {len(scan_list)}종목 (6소스 합산 → 개미털기 필터)")
except Exception as e:
logger.warning(f" ⚠️ [스캔유니버스] 거래증가율순위 조회 실패: {e}")
if not scan_list:
logger.warning(" ⚠️ [스캔유니버스] API에서 0건 수신 → 스캔 불가 (권한/계정/시간 확인)")
return []
scan_list = scan_list[:max_codes]
# 종목명 비어 있으면 시세 배치로 채우기 (KIS volume-rank는 종목명 미제공 시 많음)
need_name_codes = [x["code"] for x in scan_list[:20] if not (x.get("name") or "").strip()]
if need_name_codes:
try:
time.sleep(random.uniform(0.2, 0.4))
batch = self.client.inquire_prices_batch(need_name_codes[:20])
name_map = {}
_market_names = {"KOSPI", "KOSDAQ", "ETF", "KOSPI200", "KSQ150"}
for code, out in (batch or {}).items():
n = (out.get("stck_kor_isnm") or out.get("rprs_mrkt_kor_name") or "").strip()
if n and n not in _market_names:
name_map[code] = n
for x in scan_list:
if not (x.get("name") or "").strip() and x["code"] in name_map:
x["name"] = name_map[x["code"]]
except Exception as e:
logger.debug(f" 스캔 종목명 배치 조회 스킵: {e}")
for x in scan_list:
if not (x.get("name") or "").strip():
x["name"] = x["code"]
logger.info(
f" 📋 스캔 대상: {len(scan_list)}개 종목 (거래량·거래대금·회전율·등락률상승·등락률하락·거래증가율 각 100건 합산 → 개미털기 필터)"
)
if scan_list:
part = ", ".join(f"{x['code']} {x.get('name') or x['code']}" for x in scan_list[:15])
logger.info(f" 📋 스캔 대상(일부): {part}{' ...' if len(scan_list) > 15 else ''}")
return scan_list
def scan_ant_shaking_candidates(self, max_candidates=20):
"""개미털기(눌림목) 후보 종목 스캔 - KIS API로 스캔 대상 조회 후 필터링"""
logger.info("🐜 [개미털기] 고급 스캔 시작 (KIS API 스캔유니버스 사용)")
logger.info(
" 📌 스캔 대상 리스트는 DB에 저장되지 않음. "
"수치 체크(낙폭/회복률 등)는 이 리스트를 순회하며 개미털기 전략 필터를 적용한 결과입니다."
)
candidates = []
seen_codes = set()
# 필터별 탈락 건수 (0개 나오는 이유 확인용) - 세분화
filter_counts = {
"낙폭부족": 0,
"회복률부족": 0,
"피뢰침(고점근접)": 0,
"피뢰침(급등주)": 0,
"RSI과열": 0,
"MA20": 0,
"API응답없음": 0,
"API예외": 0,
"시가0": 0,
"동전주": 0,
"가격파싱오류": 0,
}
scan_list = self._fetch_scan_universe_from_api(max_codes=500)
scan_codes = [x["code"] for x in scan_list]
# scan_list를 딕셔너리로 변환하여 코드로 종목명을 빠르게 찾을 수 있게 함
scan_name_map = {x["code"]: x.get("name", "") for x in scan_list}
if not scan_codes:
logger.warning(" ⚠️ [개미털기] 스캔 대상 0개 → 스캔 생략 (API에서 종목 리스트를 받지 못함)")
return []
# 스캔 대상 리스트를 거래량/낙폭 체크 전에 한 번 출력 (종목명 · 코드)
# 참고: 위 [스캔유니버스] 6소스(거래량·거래대금·회전율·등락률상승·등락률하락·거래증가율) 합산 → 동일 개미털기 필터 적용
logger.info(f" 📋 [개미털기 스캔 대상] {len(scan_list)}개 (6소스 합산, 종목명 · 코드)")
for i, x in enumerate(scan_list):
logger.info(f" {i+1}. {x.get('name') or x['code']} {x['code']}")
# 체결강도 상위 API(FHPST01710000) 1회 조회 → 통과 종목 보너스(100 이상 +10, 120 이상 +20) 적용용
execution_strength_map = {}
try:
time.sleep(random.uniform(0.3, 0.6))
execution_strength_map = self.client.get_execution_strength_map(market="J", limit=200)
if execution_strength_map:
logger.info(f" 📡 [체결강도] 상위 {len(execution_strength_map)}종목 로드 (통과 시 100+ → +10점, 120+ → +20점)")
except Exception as e:
logger.debug(f"체결강도 맵 로드 스킵: {e}")
# 순차 조회 (한투 API는 다중 종목 조회 미지원 - 순차 조회 필수)
# 규칙: 일반 루프에는 random.sleep(1~3) 기본 적용 (서버 부하 방지)
total_scanned = 0
passed_filters = 0
for code in scan_codes:
total_scanned += 1
if code in seen_codes:
continue
seen_codes.add(code)
try:
# 순차 조회 (종목당 1회 API 호출)
price_data = self.client.inquire_price(code)
if not price_data:
filter_counts["API응답없음"] += 1
if code == "001510": # SK증권 추적
logger.warning(f" ⚠️ SK증권(001510) API응답없음 (상세는 위 [현재가API] 로그 참고)")
time.sleep(random.uniform(1.0, 2.0))
continue
time.sleep(random.uniform(1.0, 2.0))
# 가격 데이터 파싱
try:
current_price = abs(float(str(price_data.get("stck_prpr", 0)).replace(",", "")))
open_price = abs(float(str(price_data.get("stck_oprc", current_price)).replace(",", "")))
high_price = abs(float(str(price_data.get("stck_hgpr", current_price)).replace(",", "")))
low_price = abs(float(str(price_data.get("stck_lwpr", current_price)).replace(",", "")))
volume = int(float(str(price_data.get("acml_vol", 0)).replace(",", "")))
except (ValueError, TypeError) as e:
filter_counts["가격파싱오류"] += 1
logger.debug(f"가격 파싱 실패({code}): {e}")
continue
if open_price == 0:
filter_counts["시가0"] += 1
continue
if current_price < 1000: # 동전주 제외
filter_counts["동전주"] += 1
continue
# 종목명 가져오기: 시장명(KOSPI/KOSDAQ 등)이 아닌 진짜 종목명만 사용
# KIS API는 rprs_mrkt_kor_name에 시장구분을 넣는 경우가 있어, stck_kor_isnm(종목한글명) 우선 사용
_MARKET_NAMES = {"KOSPI", "KOSDAQ", "ETF", "KOSPI200", "KSQ150"}
name = scan_name_map.get(code, "").strip()
if not name or name in _MARKET_NAMES:
name = (price_data.get("stck_kor_isnm") or price_data.get("rprs_mrkt_kor_name") or "").strip()
if not name or name in _MARKET_NAMES:
name = code
# 낙폭 계산
if open_price > 0:
drop_rate = (open_price - low_price) / open_price
else:
drop_rate = 0
total_range = high_price - low_price
if total_range > 0:
recovery_pos = (current_price - low_price) / total_range
else:
recovery_pos = 0
# 필터 조건 수치 로그 (디버깅용)
logger.debug(
f" 📊 [{name} {code}] 수치: "
f"낙폭 {drop_rate*100:.2f}% (기준: {self.min_drop_rate*100:.1f}%) | "
f"회복 {recovery_pos*100:.1f}% (기준: {self.min_recovery_ratio*100:.0f}%) | "
f"고점 {high_price:,.0f}원 | 저점 {low_price:,.0f}원 | 현재 {current_price:,.0f}"
)
# [필터 1] 낙폭 체크
if drop_rate < self.min_drop_rate:
filter_counts["낙폭부족"] += 1
logger.info(
f"{LOG_YELLOW}🔍 [탈락-낙폭] {name} {code}: 낙폭 {drop_rate*100:.2f}% < {self.min_drop_rate*100:.1f}% "
f"(시가 {open_price:,.0f}원 → 저점 {low_price:,.0f}원){LOG_RESET}"
)
if code == "001510": # SK증권 추적
logger.warning(f" ⚠️ SK증권(001510) 낙폭부족으로 탈락: 낙폭={drop_rate*100:.2f}%, 기준={self.min_drop_rate*100:.1f}%")
continue
# [필터 2] 회복률 체크
if recovery_pos < self.min_recovery_ratio:
filter_counts["회복률부족"] += 1
logger.info(
f"{LOG_YELLOW}🔍 [탈락-회복률] {name} {code}: 회복률 {recovery_pos*100:.1f}% < {self.min_recovery_ratio*100:.0f}% "
f"(저점 {low_price:,.0f}원 → 현재 {current_price:,.0f}원 / 범위 {total_range:,.0f}원){LOG_RESET}"
)
if code == "001510": # SK증권 추적
logger.warning(f" ⚠️ SK증권(001510) 회복률부족으로 탈락: 회복률={recovery_pos*100:.1f}%, 기준={self.min_recovery_ratio*100:.0f}%")
continue
# [필터 3] 피뢰침 방지 - 고점 추격 매수 방지
high_chase_threshold = get_env_float("HIGH_PRICE_CHASE_THRESHOLD", 0.96)
if current_price >= high_price * high_chase_threshold:
filter_counts["피뢰침(고점근접)"] += 1
drop_from_high = (high_price - current_price) / high_price * 100
logger.info(f"{LOG_YELLOW}🔍 [탈락-피뢰침] {name} {code}: 고점 대비 {drop_from_high:.1f}% 조정 부족 (최소 4% 필요){LOG_RESET}")
continue
# [필터 4] 피뢰침 방지 - 급등주 제외
if low_price > 0:
daily_change_pct = (high_price - low_price) / low_price * 100
else:
daily_change_pct = 0
max_daily_change = get_env_float("MAX_DAILY_CHANGE_PCT", 20.0)
if daily_change_pct > max_daily_change:
filter_counts["피뢰침(급등주)"] += 1
logger.info(f"{LOG_YELLOW}🔍 [탈락-피뢰침] {name} {code}: 당일 변동폭 {daily_change_pct:.1f}% 과도 (최대 {max_daily_change}%){LOG_RESET}")
continue
# [필터 5] RSI 과열 체크 (분봉 데이터 필요)
try:
df = self.client.get_minute_chart(code, period="3", limit=20)
if not df.empty and len(df) >= 14 and "RSI" in df.columns:
rsi = float(df["RSI"].iloc[-1])
rsi_threshold = get_env_float("RSI_OVERHEAT_THRESHOLD", 78.0)
if rsi >= rsi_threshold:
filter_counts["RSI과열"] += 1
logger.info(f"{LOG_YELLOW}🔍 [탈락-RSI] {name} {code}: RSI 과열 ({rsi:.1f} >= {rsi_threshold}){LOG_RESET}")
continue
# [필터 6] MA20 체크
if "MA20" in df.columns and len(df) >= 20:
ma20 = float(df["MA20"].iloc[-1])
if current_price < ma20:
filter_counts["MA20"] += 1
logger.info(f"{LOG_YELLOW}🔍 [탈락-MA20] {name} {code}: 현재가({current_price:.0f}) < MA20({ma20:.2f}){LOG_RESET}")
continue
ma20_cap_pct = get_env_float("MA20_MAX_ABOVE_PCT", 3.0)
if ma20 > 0 and current_price > ma20 * (1 + ma20_cap_pct / 100):
filter_counts["MA20"] += 1
gap_pct = (current_price - ma20) / ma20 * 100
logger.info(f"{LOG_YELLOW}🔍 [탈락-MA20과열] {name} {code}: 20선 대비 {gap_pct:.1f}% 위 (최대 {ma20_cap_pct}%){LOG_RESET}")
continue
except Exception as e:
logger.debug(f"RSI/MA20 체크 실패({code}): {e}")
# 외국인/기관 동향 확인
investor_trend = self.client.get_investor_trend(code, days=3)
investor_score = 0
if investor_trend:
total_net = investor_trend.get("total_net_buy", 0)
if total_net > 10000:
investor_score = 20 # 강한 매수세
elif total_net > 0:
investor_score = 10 # 매수세
# 조건 통과: 낙폭 3% 이상 & 회복 50% 이상
if drop_rate >= self.min_drop_rate and recovery_pos >= self.min_recovery_ratio:
# ML 승률 예측 (USE_ML_SIGNAL=true일 때만)
ml_prob = None
if self.use_ml_signal and self.ml_predictor:
try:
# ML 피처 추출 (간단 버전 - 실제로는 더 많은 피처 필요)
# TODO: 실제 피처 데이터로 교체 필요 (RSI, 거래량비, 이동평균 등)
ml_features = {
"rsi": 50.0, # 임시값
"volume_ratio": 1.0,
"tail_length_pct": drop_rate * 100,
"ma5_gap_pct": 0.0,
"ma20_gap_pct": 0.0,
"foreign_net_buy": investor_trend.get("foreign_net_buy", 0) if investor_trend else 0,
"institution_net_buy": investor_trend.get("org_net_buy", 0) if investor_trend else 0,
"market_hour": dt.now().hour,
}
ml_prob = self.ml_predictor.predict_win_probability(ml_features)
# ML 임계값 미달 시 스킵
if ml_prob < self.ml_min_probability:
logger.info(
f"{LOG_YELLOW}🔍 [탈락-ML] {name} {code}: ML 승률 {ml_prob:.1%} < {self.ml_min_probability:.1%}{LOG_RESET}"
)
continue
except Exception as e:
logger.debug(f"ML 예측 실패({code}): {e}")
# 점수 계산: 낙폭 + 회복률 + 거래량 + 수급 + ML 승률
score = (drop_rate * 100) + (recovery_pos * 50) + investor_score
if volume > 1000000: # 거래량 100만주 이상 가산점
score += 10
if ml_prob is not None:
score += (ml_prob - 0.5) * 100 # ML 승률 가산점
# 체결강도 상위 API(FHPST01710000) 보너스: 100 이상 +10점, 120 이상 +20점
execution_strength = execution_strength_map.get(code, 0)
if execution_strength >= 120:
score += 20
elif execution_strength >= 100:
score += 10
candidate = {
"code": code,
"name": name,
"price": current_price,
"score": score,
"drop_rate": drop_rate,
"recovery": recovery_pos,
"volume": volume,
"investor_trend": investor_trend,
"execution_strength": execution_strength,
}
if ml_prob is not None:
candidate["ml_probability"] = ml_prob
candidates.append(candidate)
passed_filters += 1
# 통과 즉시 DB 저장 (매매 루프가 스캔 완료를 기다리지 않고 실시간으로 후보 읽기 위함)
# 같은 종목 중복 시 ON CONFLICT(code) DO UPDATE 로 최신 점수/가격으로 갱신됨
try:
self.db.add_target_candidate({
"code": code,
"name": name,
"score": score,
"price": current_price,
})
except Exception as e:
logger.debug(f"후보 즉시 저장 실패({code}): {e}")
ml_info = f" | ML {ml_prob:.1%}" if ml_prob is not None else ""
strength_info = f" | 체결강도 {execution_strength:.0f}(+{'20' if execution_strength >= 120 else '10'}점)" if execution_strength >= 100 else ""
logger.info(
f"{LOG_GREEN}✅ [통과] {name} {code}: 낙폭 {drop_rate*100:.1f}% | 회복 {recovery_pos*100:.0f}% | 강도 {score:.1f}{strength_info}{ml_info}{LOG_RESET}"
)
except Exception as e:
filter_counts["API예외"] = filter_counts.get("API예외", 0) + 1
logger.warning(
f"종목 스캔 예외 code={code} exception={e!r} type={type(e).__name__}"
)
time.sleep(random.uniform(1.0, 2.0))
continue
candidates.sort(key=lambda x: x["score"], reverse=True)
# 필터별 탈락/통과 요약 (색상: 탈락=노랑, 통과=초록)
summary = ", ".join(f"{k}={v}" for k, v in filter_counts.items() if v > 0)
logger.info(f" 📊 [필터 요약] 스캔 {total_scanned}개 중 {LOG_YELLOW}탈락: {summary or '없음'}{LOG_RESET} | {LOG_GREEN}통과: {len(candidates)}{LOG_RESET}")
count_ge4 = sum(1 for c in candidates if c.get("score", 0) >= 4.0)
logger.info(
f" ✅ 스캔 완료: 개미털기 {len(candidates)}개 통과 (강도 4.0 이상: {count_ge4}개) "
f"[스캔 {total_scanned}개 → 필터 통과 {passed_filters}개]"
)
# 통과 종목 전부 출력: 종목명 · 코드 · 강도 (몇 개 안 되므로 전부 표시)
if candidates:
logger.info(" 📌 [개미털기 통과 목록] 종목명 · 코드 · 강도")
for i, c in enumerate(candidates):
logger.info(f" {i+1}. {c['name']} {c['code']} 강도 {c['score']:.1f}")
# 강도순 상위 10개 → Mattermost 전송
top10 = candidates[:10]
lines = [f"🐜 **개미털기 강도순 TOP{len(top10)}** (스캔 {total_scanned}개 중 통과 {len(candidates)}개)"]
for i, c in enumerate(top10, 1):
name = (c.get("name") or c.get("code") or "").strip()
score = c.get("score", 0)
lines.append(f"{i}. 강도 **{score:.1f}** {name}")
try:
self.send_mm("\n".join(lines))
except Exception as e:
logger.debug(f"Mattermost 개미털기 TOP10 전송 스킵: {e}")
return candidates[:max_candidates]
def calculate_atr(self, df, period=14):
"""
ATR (Average True Range) 계산 - 변동성 지표
- TR(True Range) = max(고가-저가, |고가-전일종가|, |저가-전일종가|)
- ATR = TR의 14일 이동평균
"""
try:
if df is None or len(df) < period:
return 0
df = df.copy()
# True Range 계산
high_low = df['high'] - df['low']
high_close = (df['high'] - df['close'].shift()).abs()
low_close = (df['low'] - df['close'].shift()).abs()
df['tr'] = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
# ATR = TR의 14일 이동평균
atr = df['tr'].rolling(window=period).mean().iloc[-1]
return float(atr) if not pd.isna(atr) and atr > 0 else 0
except Exception as e:
logger.debug(f"ATR 계산 실패: {e}")
return 0
def check_sell_signals(self):
"""
매도 신호 체크 (ATR 기반 변동성 매도 로직)
- 어깨 매도 (고점 대비 3% 하락)
- ATR 기반 스캘핑 (본절사수/익절보존)
- 순수익 보존
- 목표가/손절가
"""
if not self.holdings:
return []
sell_signals = []
for code, holding in list(self.holdings.items()):
try:
name = holding.get("name", code)
buy_price = holding["buy_price"]
buy_time_str = holding.get("buy_time", "")
qty = holding["qty"]
# 현재가 조회
price_data = self.client.inquire_price(code)
if not price_data:
continue
current_price = abs(float(price_data.get("stck_prpr", 0)))
if current_price == 0:
continue
# 고점 갱신 (holdings에 저장)
max_price = holding.get("max_price", buy_price)
if current_price > max_price:
max_price = current_price
self.holdings[code]["max_price"] = max_price
# 손익률 계산
profit_pct = (current_price - buy_price) / buy_price if buy_price > 0 else 0
profit_val = (current_price - buy_price) * qty
# ATR 조회 (DB 또는 재계산)
atr = holding.get("atr_entry", 0)
if atr == 0:
# ATR 재계산 (3분봉)
try:
df = self.client.get_minute_chart(code, period="3", limit=20)
if not df.empty:
atr = self.calculate_atr(df)
if atr > 0:
self.holdings[code]["atr_entry"] = atr
except Exception as e:
logger.debug(f"ATR 조회 실패({code}): {e}")
atr = buy_price * 0.01 # 기본값 1%
# 손절가/목표가 (ATR 기반 또는 기본값)
stop_price = holding.get("stop_price", buy_price * (1 + self.stop_loss_pct))
target_price_atr = holding.get("target_price", buy_price * (1 + self.take_profit_pct))
target_price_pct = buy_price * (1 + self.take_profit_pct) # 퍼센트 기반 목표가
# ATR 기반 손절가/목표가 계산
if atr > 0:
stop_price = buy_price - (atr * self.stop_atr_multiplier)
target_price_atr = buy_price + (atr * self.target_atr_multiplier)
# 목표가: ATR 기반과 퍼센트 기반 중 더 작은 값 사용 (둘 다 체크)
target_price = min(target_price_atr, target_price_pct)
# 매도 사유 판단
sell_reason = None
# ==========================================================
# [1] 어깨 매도 (Shoulder Cut) - 최우선!
# 고점 대비 3% 이상 빠지면 수익/손실 불문하고 즉시 탈출
# ==========================================================
shoulder_cut_pct = get_env_float("SHOULDER_CUT_PCT", 0.03)
drop_from_high = (max_price - current_price) / max_price if max_price > 0 else 0
if drop_from_high >= shoulder_cut_pct:
sell_reason = "어깨매도"
# ==========================================================
# [2] 금액 기준 손절 (원 단위)
# ==========================================================
max_loss_per_trade_krw = get_env_int("MAX_LOSS_PER_TRADE_KRW", 200000)
if not sell_reason and profit_val <= -max_loss_per_trade_krw:
sell_reason = "금액손실컷"
# ==========================================================
# [3] ATR 기반 스캘핑 로직
# ==========================================================
if not sell_reason and atr > 0:
# [스캘핑 1] 본절사수: 고점이 ATR 1배 이상 올랐는데 현재가가 ATR 0.2배 이하로 떨어짐
if (max_price >= buy_price + atr * 1.0) and (current_price <= buy_price + atr * 0.2):
sell_reason = "스캘핑_본절사수"
# [스캘핑 2] 익절보존: 수익 중인데 고점 대비 ATR 1배 이상 하락
if not sell_reason and current_price < (max_price - atr * 1.0) and profit_pct > 0:
sell_reason = "스캘핑_익절보존"
# 보유 시간 계산 (N자 패턴 등 다음날 상승 가능성 있는 종목 24시간 보유)
hours_passed = 0
if buy_time_str:
try:
buy_time = dt.strptime(buy_time_str, "%Y-%m-%d %H:%M:%S")
hours_passed = (dt.now() - buy_time).total_seconds() / 3600
except:
pass
# ==========================================================
# [4] 빠른 익절 보호 (매수 후 30분 이내)
# ==========================================================
if not sell_reason and hours_passed > 0:
use_quick_profit = get_env_bool("USE_QUICK_PROFIT_PROTECTION", True)
if use_quick_profit and hours_passed < 0.5:
if max_price >= buy_price * 1.005 and current_price <= buy_price * 1.0015:
sell_reason = "💨 작은수익보호"
# ==========================================================
# [5] 24시간 보유 전략 (N자 패턴 등 다음날 상승 가능성)
# ==========================================================
# 세력이 N자 만들어서 털어먹으려는 종목은 하루에 안 끝나고 다음날 오르는 경우가 있음
# 24시간 이내: 특정 조건에서만 매도 (보수적)
# 24시간 이후: 일반 매도 조건 적용
min_hold_hours = get_env_float("MIN_HOLD_HOURS", 24.0) # 최소 보유 시간 (기본 24시간)
if not sell_reason and hours_passed > 0:
if hours_passed < min_hold_hours:
# 24시간 이내: 큰 수익(5% 이상) 또는 고점 대비 큰 하락만 매도
if profit_pct > 0.05: # 5% 이상 수익
sell_reason = f"💰 {hours_passed:.1f}시간내 5%+ 익절"
elif max_price >= buy_price * 1.07 and current_price <= max_price * 0.97:
# 고점 7% 이상 찍고 고점 대비 3% 이상 하락
sell_reason = f"📈 {hours_passed:.1f}시간내 고점7%→3%하락"
# 그 외에는 24시간 보유 유지 (손절은 제외)
else:
# 24시간 이후: 일반 매도 조건 적용
if profit_pct > 0.02: # 2% 이상 수익
sell_reason = f"{hours_passed:.1f}시간 경과 2%+ 익절"
elif profit_pct > 0 and current_price < max_price * 0.97:
# 수익 중인데 고점 대비 3% 이상 하락
sell_reason = f"{hours_passed:.1f}시간 경과 익절보호"
# ==========================================================
# [6] 목표가 달성 및 손절 (24시간 보유 전략과 별개로 항상 체크)
# ==========================================================
if not sell_reason:
# 목표가 달성 체크 (ATR 기반 또는 퍼센트 기반 중 먼저 도달한 것)
if current_price >= target_price:
if current_price >= target_price_atr and current_price >= target_price_pct:
sell_reason = "목표달성(ATR+퍼센트)"
elif current_price >= target_price_atr:
sell_reason = "목표달성(ATR)"
else:
sell_reason = "목표달성(퍼센트)"
elif current_price <= stop_price:
sell_reason = "전략손절"
elif profit_pct <= self.stop_loss_pct:
sell_reason = f"칼손절({profit_pct * 100:.1f}%)"
elif profit_pct >= self.take_profit_pct:
sell_reason = "익절(퍼센트)"
# 매도 신호 추가
if sell_reason:
sell_signals.append({
"code": code,
"name": name,
"reason": sell_reason,
"profit_pct": profit_pct,
"qty": qty,
"price": current_price,
})
time.sleep(random.uniform(0.3, 0.7))
except Exception as e:
logger.error(f"매도 신호 체크 실패({code}): {e}")
continue
return sell_signals
def execute_buy(self, signal):
"""
매수 실행
- 키움 봇과 동일하게 대형주/소형주 비율 맞추는 로직 포함
- 대형주: 기본 금액 100% / 중형주: 85% / 소형주: 70%
- 켈리 기반 매수 금액 + 종목당 최대 15% 제한
"""
code = signal["code"]
name = signal["name"]
price = signal["price"]
# 이미 보유 중이면 스킵
if code in self.holdings:
logger.warning(f"⚠️ [{name}] 이미 보유 중 -> 매수 스킵")
return False
# 최대 보유 종목 수 체크
if len(self.holdings) >= self.max_stocks:
logger.warning(f"⚠️ 최대 보유 종목 수 도달 ({self.max_stocks}개)")
return False
# 🔥 매수 직전 예수금 실시간 확인
if not self._update_account_light(profit_val=0):
logger.warning(f"⚠️ [{name}] 예수금 조회 실패 -> 매수 스킵")
return False
# ============================================================
# [대/중/소형주 구분] - 일봉 거래대금 평균 (키움 봇과 동일)
# ============================================================
size_class = None
try:
df = self.client.get_daily_chart(code, limit=10)
if not df.empty and "volume" in df.columns and "close" in df.columns:
trade_values = df["volume"] * df["close"]
avg_trade_value = trade_values.mean()
large_min = get_env_float("SIZE_CLASS_LARGE_MIN", 5000000000) # 50억 (대형주)
mid_min = get_env_float("SIZE_CLASS_MID_MIN", 500000000) # 5억 (중형주)
if avg_trade_value >= large_min:
size_class = ""
elif avg_trade_value >= mid_min:
size_class = ""
else:
size_class = ""
logger.info(
f"📊 [{name}] 거래대금 평균 {avg_trade_value/1e8:.1f}억원 → {size_class}형주"
)
except Exception as e:
logger.debug(f"대/중/소형 조회 스킵({code}): {e}")
# ============================================================
# [매수 금액] 변동성 역가중 (Volatility Inverse Weighting)
# ============================================================
# ATR 계산용 분봉 데이터 (변동성 계산에 필요)
df_minute = None
try:
df_minute = self.client.get_minute_chart(code, period="3", limit=20)
except Exception as e:
logger.debug(f"분봉 조회 실패({code}): {e}")
# RiskManager 사용 시: 변동성 역가중으로 매수 금액 계산
if self.risk_mgr is not None:
# 켈리 비율 (DB에서 계산, 없으면 None)
kelly_fraction = None
if self.risk_mgr.use_kelly:
try:
kelly_fraction = self.db.calculate_half_kelly()
except Exception as e:
logger.debug(f"켈리 비율 계산 스킵: {e}")
# 변동성 역가중 매수 금액 계산
amount = self.risk_mgr.get_position_size(
stock_name=name,
current_balance=self.current_cash,
df=df_minute, # ATR 계산용 분봉 데이터
kelly_fraction=kelly_fraction,
size_class=size_class, # 대/중/소형 구분
)
if amount <= 0:
logger.warning(f"⚠️ [{name}] RiskManager 계산 금액 0원 -> 매수 스킵")
return False
# 수량 계산 (수수료 고려)
qty = self.risk_mgr.calculate_quantity(price, amount)
else:
# 폴백: 기존 고정 슬롯 방식 (RiskManager 미사용 시)
if self.max_stocks > 0:
slot_money = int(self.current_cash * 0.9 / self.max_stocks)
else:
slot_money = 100000
base_amount = min(slot_money, 100000)
if self.stop_loss_pct != 0:
stop_pct_abs = abs(self.stop_loss_pct)
else:
stop_pct_abs = 0.04
if stop_pct_abs > 0:
kelly_risk_amount = self.current_cash * self.risk_pct_per_trade * self.kelly_multiplier
kelly_based_amount = int(kelly_risk_amount / stop_pct_abs)
base_amount = min(base_amount, kelly_based_amount)
if size_class == "":
amount = int(base_amount * 0.7)
logger.info(f"💰 [{name}] 소형주 → 매수 금액 70%: {amount:,.0f}")
elif size_class == "":
amount = int(base_amount * 0.85)
logger.info(f"💰 [{name}] 중형주 → 매수 금액 85%: {amount:,.0f}")
else:
amount = base_amount
max_limit = int(self.current_cash * self.max_position_pct)
if amount > max_limit:
logger.info(f"📐 [{name}] 최대 포지션 제한: {amount:,.0f}원 → {max_limit:,.0f}")
amount = max_limit
amount = max(amount, self.min_position_amount)
qty = int(amount / price)
if qty <= 0:
logger.warning(f"⚠️ [{name}] 매수 수량 0 (가격: {price:,.0f}원, 금액: {amount:,.0f}원)")
return False
required_amount = price * qty * 1.05
if self.current_cash < required_amount:
logger.warning(
f"⚠️ [{name}] 예수금 부족: 필요 {required_amount:,.0f}원 / "
f"보유 {self.current_cash:,.0f}원 -> 매수 스킵"
)
return False
# ATR 계산 (변동성 기반 손절가/목표가 설정용)
# df_minute는 위에서 이미 조회했으므로 재사용
atr = 0
stop_price = price * (1 + self.stop_loss_pct)
target_price = price * (1 + self.take_profit_pct)
if df_minute is not None and not df_minute.empty:
try:
atr = self.calculate_atr(df_minute)
if atr > 0:
# ATR 기반 손절가/목표가 설정
stop_price = price - (atr * self.stop_atr_multiplier)
target_price = price + (atr * self.target_atr_multiplier)
logger.info(f"📊 [{name}] ATR 기반 손절가/목표가: ATR={atr:.0f}원, 손절={stop_price:,.0f}원, 목표={target_price:,.0f}")
except Exception as e:
logger.debug(f"ATR 계산 스킵({code}): {e}")
atr = price * 0.01 # 기본값 1%
else:
atr = price * 0.01 # 기본값 1%
success = self.client.buy_market_order(code, qty)
if success:
self._update_account_light(profit_val=0)
buy_time = dt.now().strftime("%Y-%m-%d %H:%M:%S")
self.holdings[code] = {
"buy_price": price,
"qty": qty,
"buy_time": buy_time,
"name": name,
"max_price": price, # 고점 추적
"atr_entry": atr, # 매수 시점 ATR 저장
"stop_price": stop_price, # 손절가
"target_price": target_price, # 목표가
}
self.db.upsert_trade({
"code": code,
"name": name,
"strategy": "SHORT_ANT_SHAKING",
"avg_buy_price": price,
"current_price": price,
"target_qty": qty,
"current_qty": qty,
"status": "HOLDING",
"buy_date": buy_time,
"stop_price": stop_price,
"target_price": target_price,
"atr_entry": atr,
})
logger.info(f"💰 [매수 체결] {name} ({code}): {price:,.0f}× {qty}주 | 손절={stop_price:,.0f}원, 목표={target_price:,.0f}")
return True
return False
def execute_sell(self, signal):
"""매도 실행"""
code = signal["code"]
name = signal["name"]
qty = signal["qty"]
if code not in self.holdings:
logger.warning(f"⚠️ [{name}] 보유 종목 아님")
return False
# 매도 주문
success = self.client.sell_market_order(code, qty)
if success:
# 현재가 조회
price_data = self.client.inquire_price(code)
if price_data:
sell_price = abs(float(price_data.get("stck_prpr", 0)))
else:
sell_price = signal.get("price", 0)
# 손익 계산 (매도 후 총자산 반영용)
holding = self.holdings.get(code, {})
buy_price = holding.get("buy_price", sell_price)
profit_val = (sell_price - buy_price) * qty # 손익 금액
# DB에서 매도 처리
self.db.close_trade(
code=code,
sell_price=sell_price,
sell_reason=signal['reason'],
)
del self.holdings[code]
# 🔥 매도 후 예수금 + 총자산 즉시 업데이트 (손익 반영)
self._update_account_light(profit_val=profit_val)
logger.info(f"💸 [매도 체결] {name} ({code}): {qty}주 ({signal['reason']}, {signal['profit_pct']*100:+.2f}%)")
return True
return False
def run(self):
"""메인 루프 (진입점). 내부적으로 asyncio.run(_run_async()) 호출."""
asyncio.run(self._run_async())
async def _run_async(self):
"""비동기 메인 루프 - 백그라운드 태스크 시작 후 동기 매매 루프 실행"""
logger.info("🚀 단타 트레이딩 봇 시작 (비동기 백그라운드 작업 활성화)")
# 백그라운드 태스크 시작
self._universe_task = asyncio.create_task(self._universe_scan_scheduler())
self._report_task = asyncio.create_task(self._report_scheduler())
self._asset_task = asyncio.create_task(self._asset_update_scheduler())
logger.info("✅ 백그라운드 태스크 시작 완료 (유니버스 스캔, 리포트, 자산 업데이트)")
# 동기 매매 루프는 별도 스레드에서 실행 (메인 이벤트 루프 블로킹 방지)
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._sync_trading_loop)
def _sync_trading_loop(self):
"""동기 매매 루프 (메인 로직) - 백그라운드 작업과 분리"""
logger.info("📈 매매 루프 시작 (동기 모드)")
while True:
try:
now = dt.now()
current_date = now.strftime("%Y-%m-%d")
# 날짜 변경 시 리포트 플래그 리셋
if current_date != self.today_date:
self.today_date = current_date
self.morning_report_sent = False
self.closing_report_sent = False
self.final_report_sent = False
self.ai_report_sent = False
self.start_day_asset = 0
logger.info(f"📅 날짜 변경: {current_date}")
# 리포트 전송은 백그라운드 태스크에서 처리하므로 여기서는 제거
# (기존 코드와의 호환성을 위해 주석 처리)
# if now.hour == 13 and now.minute == 0:
# self.send_morning_report()
# self.send_ai_report()
# elif now.hour == 15 and now.minute == 15:
# self.send_closing_report()
# elif now.hour == 15 and now.minute == 35:
# self.send_final_report()
# 장 상태 체크
if not self.check_market_status():
logger.info("⏸ 장 시간 아님 - 대기 중...")
time.sleep(60)
continue
# 자산 정보 업데이트는 백그라운드 태스크에서 처리하므로 여기서는 제거
# (기존 코드와의 호환성을 위해 주석 처리)
# if now.minute % 30 == 0: # 30분마다
# self._update_assets()
# 매도 신호 체크 (우선) - 메인 루프에서 처리
sell_signals = self.check_sell_signals()
for signal in sell_signals:
self.execute_sell(signal)
db_candidates = self.db.get_target_candidates()
if db_candidates:
logger.info(f"🔍 [매수 기회 탐색] 타겟:{len(db_candidates)}개 | 보유:{active_count}/{self.max_stocks}")
# DB 후보군이 있으면 사용
for db_item in db_candidates[:1]: # 상위 1개만
code = db_item['code']
name = db_item['name']
# 실제 가격 확인 후 매수
price_data = self.client.inquire_price(code)
if price_data:
current_price = abs(float(price_data.get("stck_prpr", 0)))
candidate = {
'code': code,
'name': name,
'price': current_price,
'score': db_item.get('score', 0),
}
self.execute_buy(candidate)
time.sleep(random.uniform(1, 2))
break
else:
# DB 후보군이 없으면 대기 (유니버스 업데이트 대기)
# ⚠️ 직접 스캔하지 않음 - 백그라운드 태스크에서 5분마다 업데이트됨
if active_count == 0: # 첫 실행 시에만 로그
logger.info(f"🔍 [매수 기회 탐색] 타겟:0개 (유니버스 스캔 대기 중) | 보유:{active_count}/{self.max_stocks}")
# 대기
time.sleep(random.uniform(3, 5))
except KeyboardInterrupt:
logger.info("⏹ 봇 종료")
# 백그라운드 태스크 취소
if self._universe_task:
self._universe_task.cancel()
if self._report_task:
self._report_task.cancel()
if self._asset_task:
self._asset_task.cancel()
break
except Exception as e:
logger.error(f"❌ 루프 에러: {e}")
time.sleep(5)
if __name__ == "__main__":
bot = ShortTradingBot()
bot.run()