""" KIS Long Trading Bot Ver1 - 늘림목 전략용 한투 API 트레이딩 시스템 - 한국투자증권(KIS) Open API 사용 - 장기 보유 + 분할 매수(늘림목) 전략 - PER/PEG 기반 밸류에이션 - kis_long_term_checker.py의 장기 전략을 참고하되, 늘림목 전략으로 수정 """ import os import json import ㅁㅁ import random import logging import datetime import hashlib import hmac import base64 import warnings import asyncio from datetime import datetime as dt, timedelta from pathlib import Path from typing import List, Dict, Optional import pandas as pd import requests # DB 모듈 임포트 from database import TradeDB # 로깅 설정 (먼저 초기화) logging.basicConfig( format='[%(asctime)s] %(message)s', datefmt='%H:%M:%S', level=logging.INFO ) logger = logging.getLogger("KISLongBot") # Gemini API (AI 리포트용) warnings.filterwarnings("ignore", message=".*google.generativeai.*") try: import google.generativeai as genai GEMINI_AVAILABLE = True except ImportError: GEMINI_AVAILABLE = False logger.warning("⚠️ google-generativeai 미설치 - AI 리포트 기능 사용 불가") # ML 예측 (선택적) try: from ml_predictor import MLPredictor ML_AVAILABLE = True except ImportError: ML_AVAILABLE = False logger.warning("⚠️ ml_predictor 미설치 - ML 예측 기능 사용 불가") # DB 초기화 SCRIPT_DIR = Path(__file__).resolve().parent db = TradeDB(db_path=str(SCRIPT_DIR / "quant_bot.db")) # DB에서 환경변수 로드 (초기 버전: Mattermost/Gemini 설정용) def get_env_from_db(key, default=""): """DB에서 환경변수 읽기 (초기화 단계)""" env_data = db.get_latest_env() if env_data and env_data.get("snapshot"): return env_data["snapshot"].get(key, default) return default # Mattermost 설정 MM_SERVER_URL = get_env_from_db("MM_SERVER_URL", "https://mattermost.hoonfam.org") MM_BOT_TOKEN = get_env_from_db("MM_BOT_TOKEN_", "").strip() MM_CONFIG_FILE = SCRIPT_DIR / "mm_config.json" # 기본 채널(alias) + 롱/늘림목 봇 전용 채널(alias) MM_CHANNEL_DEFAULT = get_env_from_db("MATTERMOST_CHANNEL", "stock") MM_CHANNEL_LONG = get_env_from_db("KIS_LONG_MM_CHANNEL", MM_CHANNEL_DEFAULT) # Gemini API 설정 (기존 google.generativeai 그대로 유지; 추후 google.genai로 마이그레이션 가능) GEMINI_API_KEY = get_env_from_db("GEMINI_API_KEY", "").strip() if GEMINI_API_KEY and 'GEMINI_AVAILABLE' in globals() and GEMINI_AVAILABLE: try: genai.configure(api_key=GEMINI_API_KEY) gemini_model = genai.GenerativeModel('gemini-2.5-flash') except Exception as e: logger.warning(f"⚠️ Gemini 초기화 실패: {e}") gemini_model = None else: gemini_model = None def get_env_float(key, default): """환경변수를 float로 변환 (DB 우선)""" value = get_env_from_db(key, str(default)) if isinstance(value, str) and "#" in value: value = value.split("#")[0].strip() try: return float(value) if value else default except (ValueError, TypeError): return default def get_env_int(key, default): """환경변수를 int로 변환 (DB 우선)""" value = get_env_from_db(key, str(default)) if isinstance(value, str) and "#" in value: value = value.split("#")[0].strip() try: return int(value) if value else default except (ValueError, TypeError): return default def get_env_bool(key, default=False): """환경변수를 bool로 변환 (DB 우선)""" value = get_env_from_db(key, str(default)).lower() return value in ("true", "1", "yes") # ============================================================ # 한투(KIS) API 클라이언트 (kis_long_term_checker.py 참고) # ============================================================ KIS_TOKEN_CACHE_PATH = SCRIPT_DIR / ".kis_token_cache.json" KIS_TOKEN_EXPIRE_MARGIN_SEC = 300 def _load_kis_token_cache(mock): """캐시 파일에서 토큰 로드""" if not KIS_TOKEN_CACHE_PATH.exists(): return None try: with open(KIS_TOKEN_CACHE_PATH, "r", encoding="utf-8") as f: cache = json.load(f) if cache.get("mock") != mock: return None token = cache.get("access_token") expired_str = cache.get("access_token_token_expired") or cache.get("expires_at") if not token or not expired_str: return None try: expired_dt = dt.strptime(expired_str.strip()[:19], "%Y-%m-%d %H:%M:%S") except ValueError: return None if dt.now() >= expired_dt - timedelta(seconds=KIS_TOKEN_EXPIRE_MARGIN_SEC): return None return token except Exception: return None def _save_kis_token_cache(access_token, access_token_token_expired, mock): """토큰 캐시 저장""" try: with open(KIS_TOKEN_CACHE_PATH, "w", encoding="utf-8") as f: json.dump({ "access_token": access_token, "access_token_token_expired": access_token_token_expired, "mock": mock, }, f, ensure_ascii=False, indent=0) except Exception: pass class KISClientWithOrder: """주문 기능이 추가된 KIS 클라이언트 (env 키는 단타 봇과 동일: KIS_APP_KEY_MOCK/REAL, KIS_ACCOUNT_NO_MOCK/REAL)""" def __init__(self, mock=None): # 모의 여부 결정 (단타 봇과 동일) if mock is not None: use_mock = mock else: use_mock = get_env_bool("KIS_MOCK", True) # 앱키/시크릿: 모의·실전 분리 (단타와 동일 키 사용 → DB 한 세트로 둘 다 실행 가능) if use_mock: self.app_key = (get_env_from_db("KIS_APP_KEY_MOCK", "") or get_env_from_db("KIS_APP_KEY", "")).strip() self.app_secret = (get_env_from_db("KIS_APP_SECRET_MOCK", "") or get_env_from_db("KIS_APP_SECRET", "")).strip() if not self.app_key or not self.app_secret: logger.error("❌ 모의투자용 APP KEY/SECRET이 DB에 없습니다. KIS_APP_KEY_MOCK, KIS_APP_SECRET_MOCK 설정 필요.") raise ValueError("모의투자 KIS_APP_KEY_MOCK / KIS_APP_SECRET_MOCK 미설정") else: self.app_key = (get_env_from_db("KIS_APP_KEY_REAL", "") or get_env_from_db("KIS_APP_KEY", "")).strip() self.app_secret = (get_env_from_db("KIS_APP_SECRET_REAL", "") or get_env_from_db("KIS_APP_SECRET", "")).strip() # 계좌번호: 모의/실전 분리 (단타와 동일) if use_mock: raw_no = (get_env_from_db("KIS_ACCOUNT_NO_MOCK", "") or get_env_from_db("KIS_ACCOUNT_NO", "")).strip() raw_code = (get_env_from_db("KIS_ACCOUNT_CODE_MOCK", "") or get_env_from_db("KIS_ACCOUNT_CODE", "01")).strip() if not raw_code: raw_code = "01" else: raw_no = (get_env_from_db("KIS_ACCOUNT_NO_REAL", "") or get_env_from_db("KIS_ACCOUNT_NO", "")).strip() raw_code = (get_env_from_db("KIS_ACCOUNT_CODE_REAL", "") or get_env_from_db("KIS_ACCOUNT_CODE", "01")).strip() if not raw_code: raw_code = "01" # 10자리면 앞 8 / 뒤 2 분리 if len(raw_no) >= 10: self.acc_no = raw_no[:8] self.acc_code = raw_no[8:10] else: self.acc_no = raw_no self.acc_code = raw_code[:2] if len(raw_code) >= 2 else "01" if len(self.acc_no) != 8: logger.warning("⚠️ 계좌번호 CANO 8자리 아님: '%s'(%s자리). DB 확인.", self.acc_no, len(self.acc_no)) if len(self.acc_no) != 8 or len(self.acc_code) != 2: logger.error( "❌ 계좌번호 형식 오류: CANO=%s(%s자리), ACNT_PRDT_CD=%s(%s자리) → OPSQ2000 발생. " "모의면 KIS_ACCOUNT_NO_MOCK/KIS_ACCOUNT_CODE_MOCK, 실전이면 KIS_ACCOUNT_NO_REAL 또는 KIS_ACCOUNT_NO 확인.", self.acc_no, len(self.acc_no), self.acc_code, len(self.acc_code) ) else: logger.info("✅ 한투 계좌 CANO=%s, ACNT_PRDT_CD=%s (모의=%s)", self.acc_no, self.acc_code, use_mock) self.mock = use_mock # 모의(True)=모의 URL, 실전(False)=실전 URL (self.mock 사용, 인자 mock은 None일 수 있음) if self.mock is True: self.base_url = "https://openapivts.koreainvestment.com:29443" else: self.base_url = "https://openapi.koreainvestment.com:9443" self.access_token = None self._auth() def _auth(self): """접근 토큰 발급""" if not self.app_key or not self.app_secret: hint = "KIS_APP_KEY_MOCK, KIS_APP_SECRET_MOCK" if self.mock else "KIS_APP_KEY_REAL, KIS_APP_SECRET_REAL (또는 KIS_APP_KEY, KIS_APP_SECRET)" raise ValueError(f"한투 앱키 미설정: {hint}") cached = _load_kis_token_cache(self.mock) if cached: self.access_token = cached logger.info("한투 토큰 캐시 사용 (%s)", "모의" if self.mock else "실전") return url = f"{self.base_url}/oauth2/tokenP" body = {"grant_type": "client_credentials", "appkey": self.app_key, "appsecret": self.app_secret} try: r = requests.post(url, json=body, timeout=10) data = r.json() if "access_token" in data: self.access_token = data["access_token"] expired = data.get("access_token_token_expired") or "" _save_kis_token_cache(self.access_token, expired, self.mock) logger.info("한투 토큰 발급 완료 (%s)", "모의" if self.mock else "실전") else: raise RuntimeError("한투 토큰 발급 실패") except Exception as e: logger.error("한투 인증 예외: %s", e) raise def _get_hashkey(self, body): """해시키(Hashkey) 발급 - POST 요청 시 body 무결성 검증용""" try: url = f"{self.base_url}/uapi/hashkey" headers = { "content-type": "application/json", "appkey": self.app_key, "appsecret": self.app_secret, } r = requests.post(url, headers=headers, json=body, timeout=5) if r.status_code == 200: data = r.json() if data.get("rt_cd") == "0": return data.get("HASH") return None except Exception as e: logger.debug(f"해시키 발급 실패: {e}") return None def _headers(self, tr_id, hashkey=None): """API 호출용 헤더""" headers = { "content-type": "application/json; charset=utf-8", "authorization": f"Bearer {self.access_token}", "appkey": self.app_key, "appsecret": self.app_secret, "tr_id": tr_id, } if hashkey: headers["hashkey"] = hashkey return headers def _get(self, path, tr_id, params, max_retries=3): """GET 요청. 429 시 지수 백오프 재시도""" url = f"{self.base_url}{path}" for attempt in range(max_retries): try: r = requests.get(url, headers=self._headers(tr_id), params=params, timeout=15) if r.status_code == 429: wait_time = (2 ** attempt) + random.uniform(0.5, 1.5) logger.warning(f"⏳ API 호출 제한 (429) -> {wait_time:.1f}초 대기 후 재시도 ({attempt+1}/{max_retries})") time.sleep(wait_time) continue if r.status_code == 200: j = r.json() if j.get("rt_cd") == "0": return r elif "초과" in str(j.get("msg1", "")) or "과부하" in str(j.get("msg1", "")): wait_time = (2 ** attempt) + random.uniform(0.5, 1.5) logger.warning(f"⏳ API 과부하 -> {wait_time:.1f}초 대기 후 재시도") time.sleep(wait_time) continue return r except requests.exceptions.RequestException as e: if attempt < max_retries - 1: wait_time = (2 ** attempt) + random.uniform(0.5, 1.5) logger.warning(f"⚠️ 네트워크 에러 -> {wait_time:.1f}초 대기 후 재시도: {e}") time.sleep(wait_time) else: logger.error(f"❌ GET 요청 실패 ({path}): {e}") return r def inquire_price(self, stock_code): """주식 현재가 시세 조회 [v1_국내주식-008]""" r = self._get( "/uapi/domestic-stock/v1/quotations/inquire-price", "FHKST01010100", {"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": stock_code}, ) if r.status_code != 200: return None j = r.json() if j.get("rt_cd") != "0": return None return j.get("output") def get_orderbook(self, stock_code): """호가 조회 [v1_국내주식-009]""" try: r = self._get( "/uapi/domestic-stock/v1/quotations/inquire-asking-price-exp-ccn", "FHKST01010200", { "FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": stock_code, }, ) if r.status_code != 200: return None j = r.json() if j.get("rt_cd") != "0": return None return j.get("output") except Exception as e: logger.error(f"호가 조회 실패({stock_code}): {e}") return None def get_account_evaluation(self): """계좌 잔고 조회 (단타와 동일: inquire-balance, VTTC8434R/TTTC8434R).""" if self.mock is True: tr_id = "VTTC8434R" else: tr_id = "TTTC8434R" params = { "CANO": self.acc_no, "ACNT_PRDT_CD": self.acc_code, "AFHR_FLPR_YN": "N", "OFL_YN": "", "INQR_DVSN": "01", "UNPR_DVSN": "01", "FUND_STTL_ICLD_YN": "N", "FNCG_AMT_AUTO_RDPT_YN": "N", "PRCS_DVSN": "00", "CTX_AREA_FK100": "", "CTX_AREA_NK100": "", } try: r = self._get( "/uapi/domestic-stock/v1/trading/inquire-balance", tr_id, params, ) if r.status_code != 200: logger.warning( f"💵 [예수금] 잔고 API HTTP 오류: status={r.status_code}, body={getattr(r, 'text', '')[:200]} | TR={tr_id}, CANO={self.acc_no}, 모의={self.mock}" ) return None j = r.json() if j.get("rt_cd") != "0": msg_cd = j.get("msg_cd", "") msg1 = (j.get("msg1") or "")[:200] logger.warning( f"💵 [예수금] 잔고 API 응답 오류: rt_cd={j.get('rt_cd')}, msg_cd={msg_cd}, msg1={msg1} | " f"TR={tr_id}, CANO={self.acc_no}, ACNT_PRDT_CD={self.acc_code}, 모의={self.mock}" ) if "OPSQ2000" in str(msg_cd) or "INVALID" in msg1.upper(): logger.warning( "💵 [예수금] 계좌번호 검증 실패일 수 있음. 모의면 KIS_ACCOUNT_NO_MOCK/KIS_ACCOUNT_CODE_MOCK 확인." ) return None return j except Exception as e: logger.error(f"💵 [예수금] 잔고 조회 실패: {e} | CANO={self.acc_no}, 모의={self.mock}") return None def inquire_daily_itemchartprice(self, stock_code, start_ymd, end_ymd, period="D", adj="1"): """국내주식 기간별 시세(일봉)""" r = self._get( "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice", "FHKST03010100", { "FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": stock_code, "FID_INPUT_DATE_1": start_ymd, "FID_INPUT_DATE_2": end_ymd, "FID_PERIOD_DIV_CODE": period, "FID_ORG_ADJ_PRC": adj, }, ) if r.status_code != 200: return [], [] j = r.json() if j.get("rt_cd") != "0": return [], [] out1 = j.get("output1") or {} out2 = j.get("output2") or [] return out1, out2 def finance_financial_ratio(self, stock_code, fid_div_cls_code="0"): """국내주식 재무비율""" try: r = self._get( "/uapi/domestic-stock/v1/finance/financial-ratio", "FHKST66430300", { "FID_DIV_CLS_CODE": fid_div_cls_code, "FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": stock_code, }, ) if r.status_code != 200: return None j = r.json() if j.get("rt_cd") != "0": return None out = j.get("output") if isinstance(out, list) and out: return out[0] return out if isinstance(out, dict) else None except Exception: return None def finance_growth_ratio(self, stock_code, fid_div_cls_code="0"): """국내주식 성장성비율""" try: r = self._get( "/uapi/domestic-stock/v1/finance/growth-ratio", "FHKST66430800", { "FID_INPUT_ISCD": stock_code, "FID_DIV_CLS_CODE": fid_div_cls_code, "FID_COND_MRKT_DIV_CODE": "J", }, ) if r.status_code != 200: return None j = r.json() if j.get("rt_cd") != "0": return None out = j.get("output") if isinstance(out, list) and out: return out[0] return out if isinstance(out, dict) else None except Exception: return None def _post(self, path, tr_id, body, use_hashkey=True, max_retries=3): """POST 요청. 해시키 사용 및 429 시 지수 백오프 재시도""" url = f"{self.base_url}{path}" hashkey = None if use_hashkey: hashkey = self._get_hashkey(body) if not hashkey: logger.debug("해시키 발급 실패, 해시키 없이 진행") for attempt in range(max_retries): try: r = requests.post(url, headers=self._headers(tr_id, hashkey), json=body, timeout=15) if r.status_code == 429: wait_time = (2 ** attempt) + random.uniform(0.5, 1.5) logger.warning(f"⏳ API 호출 제한 (429) -> {wait_time:.1f}초 대기 후 재시도 ({attempt+1}/{max_retries})") time.sleep(wait_time) if use_hashkey: hashkey = self._get_hashkey(body) continue if r.status_code == 200: j = r.json() if j.get("rt_cd") == "0": return r elif "초과" in str(j.get("msg1", "")) or "과부하" in str(j.get("msg1", "")): wait_time = (2 ** attempt) + random.uniform(0.5, 1.5) logger.warning(f"⏳ API 과부하 -> {wait_time:.1f}초 대기 후 재시도") time.sleep(wait_time) if use_hashkey: hashkey = self._get_hashkey(body) continue return r except requests.exceptions.RequestException as e: if attempt < max_retries - 1: wait_time = (2 ** attempt) + random.uniform(0.5, 1.5) logger.warning(f"⚠️ 네트워크 에러 -> {wait_time:.1f}초 대기 후 재시도: {e}") time.sleep(wait_time) else: logger.error(f"❌ POST 요청 실패 ({path}): {e}") return r def buy_order(self, code, qty, price=0, order_type="01"): """매수 주문 (모의=VTTC0802U, 실전=TTTC0802U) - 고급 주문 타입 지원""" try: if self.mock: tr_id = "VTTC0802U" else: tr_id = "TTTC0802U" body = { "CANO": self.acc_no, "ACNT_PRDT_CD": self.acc_code, "PDNO": code, "ORD_DVSN": order_type, "ORD_QTY": str(qty), "ORD_UNPR": str(price) if price > 0 else "0", } r = self._post( "/uapi/domestic-stock/v1/trading/order-cash", tr_id, body, use_hashkey=True ) if r.status_code != 200: logger.error(f"매수 주문 HTTP 에러: {r.status_code}") return False j = r.json() if j.get("rt_cd") == "0": ord_no = j.get("output", {}).get("ODNO", "") logger.info(f"✅ 매수 주문 성공: {code} {qty}주 (주문번호: {ord_no})") return True else: logger.error(f"매수 주문 실패: {j.get('msg1', '알 수 없음')}") return False except Exception as e: logger.error(f"매수 주문 예외({code}): {e}") return False def buy_market_order(self, code, qty): """시장가 매수 주문 (간편 메서드)""" return self.buy_order(code, qty, price=0, order_type="01") def sell_order(self, code, qty, price=0, order_type="01"): """매도 주문 (모의=VTTC0801U, 실전=TTTC0801U) - 고급 주문 타입 지원""" try: if self.mock: tr_id = "VTTC0801U" else: tr_id = "TTTC0801U" body = { "CANO": self.acc_no, "ACNT_PRDT_CD": self.acc_code, "PDNO": code, "ORD_DVSN": order_type, "ORD_QTY": str(qty), "ORD_UNPR": str(price) if price > 0 else "0", } r = self._post( "/uapi/domestic-stock/v1/trading/order-cash", tr_id, body, use_hashkey=True ) if r.status_code != 200: logger.error(f"매도 주문 HTTP 에러: {r.status_code}") return False j = r.json() if j.get("rt_cd") == "0": ord_no = j.get("output", {}).get("ODNO", "") logger.info(f"✅ 매도 주문 성공: {code} {qty}주 (주문번호: {ord_no})") return True else: logger.error(f"매도 주문 실패: {j.get('msg1', '알 수 없음')}") return False except Exception as e: logger.error(f"매도 주문 예외({code}): {e}") return False def sell_market_order(self, code, qty): """시장가 매도 주문 (간편 메서드)""" return self.sell_order(code, qty, price=0, order_type="01") def inquire_prices_batch(self, stock_codes: List[str]): """다중 종목 현재가 일괄 조회 (최대 20개)""" if not stock_codes or len(stock_codes) > 20: logger.warning("종목 코드는 1~20개만 가능") return {} result = {} # 한투 API는 다중 종목 조회를 지원하지 않으므로 순차 조회 for code in stock_codes: try: price_data = self.inquire_price(code) if price_data: result[code] = price_data time.sleep(random.uniform(0.1, 0.3)) except Exception as e: logger.debug(f"종목 조회 실패({code}): {e}") continue return result def get_investor_trend(self, stock_code, days=5): """외국인/기관 매매 동향 조회""" try: end_dt = dt.now() start_dt = end_dt - timedelta(days=days + 10) start_ymd = start_dt.strftime("%Y%m%d") end_ymd = end_dt.strftime("%Y%m%d") _, out2 = self.inquire_daily_itemchartprice(stock_code, start_ymd, end_ymd, "D", "1") if not out2: return None foreign_sum = 0 org_sum = 0 for item in out2[:days]: try: foreign_raw = item.get("frgn_ntby_qty") or item.get("frgn_ntby_shnu") or "0" foreign_net = int(float(str(foreign_raw).replace(",", "").replace("+", "").replace("-", ""))) if str(foreign_raw).startswith("-"): foreign_net = -foreign_net org_raw = item.get("orgn_ntby_qty") or "0" org_net = int(float(str(org_raw).replace(",", "").replace("+", "").replace("-", ""))) if str(org_raw).startswith("-"): org_net = -org_net foreign_sum += foreign_net org_sum += org_net except: continue return { "foreign_net_buy": foreign_sum, "org_net_buy": org_sum, "total_net_buy": foreign_sum + org_sum, } except Exception as e: logger.error(f"외국인/기관 동향 조회 실패({stock_code}): {e}") return None # ============================================================ # 헬퍼 함수들 (kis_long_term_checker.py 참고) # ============================================================ def get_kis_daily_chart(client, stock_code, max_days=65, exchange=None): """한투 일봉 조회 후 DataFrame 반환""" end_dt = dt.now() end_ymd = end_dt.strftime("%Y%m%d") if exchange: return pd.DataFrame() # 해외 종목은 미구현 start_dt = end_dt - timedelta(days=max_days + 30) start_ymd = start_dt.strftime("%Y%m%d") _, out2 = client.inquire_daily_itemchartprice(stock_code, start_ymd, end_ymd, "D", "1") if not out2: return pd.DataFrame() rows = [] for b in out2: try: date_str = (b.get("stck_bsop_date") or "").strip() close_p = abs(float(str(b.get("stck_clpr") or "0").replace(",", ""))) open_p = abs(float(str(b.get("stck_oprc") or "0").replace(",", ""))) high_p = abs(float(str(b.get("stck_hgpr") or "0").replace(",", ""))) low_p = abs(float(str(b.get("stck_lwpr") or "0").replace(",", ""))) vol = int(float(str(b.get("acml_vol") or "0").replace(",", ""))) except (TypeError, ValueError): continue if date_str and close_p > 0: rows.append({"dt": date_str, "open": open_p, "high": high_p, "low": low_p, "close": close_p, "volume": vol}) if not rows: return pd.DataFrame() df = pd.DataFrame(rows) df = df.sort_values("dt").reset_index(drop=True) df["MA20"] = df["close"].rolling(window=20).mean() df["MA60"] = df["close"].rolling(window=60).mean() return df def calculate_volatility(df, period=20): """변동성 계산 (20일 표준편차 / 평균 * 100)""" if df is None or df.empty or len(df) < period: return None try: returns = df["close"].pct_change().dropna() volatility = returns.tail(period).std() * 100 return round(volatility, 2) if pd.notna(volatility) else None except Exception: return None def get_kis_per_eps_peg(client, stock_code, current_price): """한투 재무비율·성장성비율로 PER, EPS, 성장률, PEG 계산""" try: fin = client.finance_financial_ratio(stock_code, "0") growth = client.finance_growth_ratio(stock_code, "0") time.sleep(random.uniform(1, 2)) per, eps = None, None if fin: try: eps_raw = fin.get("eps") or fin.get("EPS") if eps_raw is not None: eps = float(str(eps_raw).replace(",", "").strip()) per_raw = fin.get("per") or fin.get("PER") if per_raw is not None: per = float(str(per_raw).replace(",", "").strip()) except (TypeError, ValueError): pass if per is None and eps is not None and eps > 0 and current_price and current_price > 0: per = current_price / eps if per is not None and per <= 0: per = None growth_pct = None if growth: for key in ("bsop_prfi_inrt", "ntin_inrt", "grs"): val = growth.get(key) if val is not None: try: growth_pct = float(str(val).replace(",", "").strip()) if growth_pct != 0: break except (TypeError, ValueError): continue if fin and growth_pct is None: growth_pct = fin.get("ntin_inrt") or fin.get("bsop_prfi_inrt") if growth_pct is not None: try: growth_pct = float(str(growth_pct).replace(",", "").strip()) except (TypeError, ValueError): growth_pct = None peg = None if per is not None and growth_pct is not None and growth_pct > 0: peg = round(per / growth_pct, 2) return {"per": per, "eps": eps, "growth_pct": growth_pct, "peg": peg} except Exception as e: logger.error(f"PER/PEG 계산 실패 {stock_code}: {e}") return {"per": None, "eps": None, "growth_pct": None, "peg": None} # ============================================================ # Mattermost 봇 클래스 # ============================================================ class MattermostBot: """Mattermost 알림 봇""" def __init__(self): self.api_url = f"{MM_SERVER_URL.rstrip('/')}/api/v4/posts" self.headers = { "Authorization": f"Bearer {MM_BOT_TOKEN}", "Content-Type": "application/json" } self.channels = self._load_channels() def _load_channels(self): """채널 설정 로드""" try: if MM_CONFIG_FILE.exists(): with open(MM_CONFIG_FILE, 'r', encoding='utf-8') as f: return json.load(f).get("channels", {}) return {} except Exception as e: logger.error(f"⚠️ MM 설정 로드 실패: {e}") return {} def send(self, channel_alias, message): """메시지 전송""" channel_id = self.channels.get(channel_alias) if not channel_id: logger.warning(f"❌ '{channel_alias}' 채널 ID 없음") return False payload = {"channel_id": channel_id, "message": message} try: res = requests.post(self.api_url, headers=self.headers, json=payload, timeout=3) res.raise_for_status() return True except Exception as e: logger.error(f"❌ MM 전송 에러: {e}") return False # ============================================================ # 늘림목 트레이딩 봇 # ============================================================ class LongTradingBot: """늘림목 전략 트레이딩 봇 - 장기 보유 + 분할 매수 (단타 봇과 동일 env 키 사용: KIS_MOCK, KIS_APP_KEY_MOCK/REAL, KIS_ACCOUNT_NO_MOCK/REAL, TOTAL_DEPOSIT, MAX_STOCKS 등) """ def __init__(self): self.db = db # 자산·리포트·루프용 변수 (단타 봇과 동일 이름) self.today_date = dt.now().strftime("%Y-%m-%d") self.morning_report_sent = False self.closing_report_sent = False self.final_report_sent = False self.ai_report_sent = False self.start_day_asset = 0 self.current_cash = 0 self.current_total_asset = 0 self.d2_excc_amt = 0 # D+2 예수금 (전일 정산 수령 예정, 매매 가능 판단 참고) self.total_deposit = get_env_float("TOTAL_DEPOSIT", 0) self.max_stocks = get_env_int("MAX_STOCKS", 5) # 일일 매수 한도 (0 = 총자산의 30% 자동, 양수면 해당 금액 한도). 누적은 DB buy_execution_log(산 날짜 기준) self.daily_buy_limit = get_env_float("DAILY_BUY_LIMIT", 0) self.client = KISClientWithOrder() # Mattermost 초기화 self.mm = MattermostBot() # 롱/늘림목 봇 전용 채널(alias) 우선 사용, 없으면 기본 채널 사용 self.mm_channel = MM_CHANNEL_LONG # 전략 파라미터 (DB에서 읽기) self.max_per = get_env_float("MAX_PER", 25) self.max_peg = get_env_float("MAX_PEG", 1.5) self.min_growth_pct = get_env_float("MIN_GROWTH_PCT", 10) # 늘림목 파라미터 dca_intervals_str = get_env_from_db("DCA_INTERVALS", "0,-0.05,-0.10,-0.15,-0.20") self.dca_intervals = [float(x.strip()) for x in dca_intervals_str.split(",") if x.strip()] dca_amounts_str = get_env_from_db("DCA_AMOUNTS", "100000,100000,100000,100000,100000") self.dca_amounts = [int(x.strip()) for x in dca_amounts_str.split(",") if x.strip()] self.max_position_pct = get_env_float("MAX_POSITION_PCT", 0.20) # 손절/익절 self.stop_loss_pct = get_env_float("STOP_LOSS_PCT", -0.30) self.take_profit_pct = get_env_float("TAKE_PROFIT_PCT", 0.10) # 매매 루프 체크 간격 (늘림목은 주가 변동성 적음 → 5~10분 권장) self.loop_interval_min_low = get_env_float("LOOP_INTERVAL_MIN_LOW", 5) self.loop_interval_min_high = get_env_float("LOOP_INTERVAL_MIN_HIGH", 10) # DB에서 활성 트레이드 로드 (늘림목만: LONG_INITIAL, LONG_DCA - 단타와 섞이지 않도록) self.holdings = {} active_trades = self.db.get_active_trades(strategy_prefix="LONG") for code, trade in active_trades.items(): # 분할 매수 정보 복원 (간단화) self.holdings[code] = { "buy_prices": [trade.get("avg_buy_price", 0)], "qtys": [trade.get("current_qty", 0)], "total_qty": trade.get("current_qty", 0), "avg_price": trade.get("avg_buy_price", 0), "first_buy_date": trade.get("buy_date", dt.now().strftime("%Y-%m-%d %H:%M:%S")), "name": trade.get("name", code), } # 관심 종목 리스트 (JSON 파일에서 로드) self.watchlist_path = SCRIPT_DIR / "long_term_watchlist.json" self.watchlist = self._load_watchlist() # 초기 자산 조회는 하지 않음 → 루프 진입 시 "예수금 될 때까지 3초 간격 재시도"로만 수행 (중복/EGW00201 방지) # 비동기 태스크 관리 self._report_task = None self._asset_task = None # 잔고 API 초당 제한(EGW00201) 방지: 마지막 조회 시각 self._last_balance_fetch_time = 0.0 def _load_watchlist(self): """관심 종목 리스트 로드""" if not self.watchlist_path.exists(): logger.warning(f"관심 종목 파일 없음: {self.watchlist_path}") return [] try: with open(self.watchlist_path, "r", encoding="utf-8") as f: data = json.load(f) return data.get("items", []) except Exception as e: logger.error(f"관심 종목 로드 실패: {e}") return [] def _merge_watchlist_holdings_from_balance(self, balance): """ watchlist에 strategy "LONG" 또는 use "dca"로 넣어둔 종목 중, 계좌에는 보유 중인데 DB(봇)에는 없는 종목을 holdings에 합침 (단타와 동일 inquire-balance → output1=종목 리스트). """ positions_list = balance.get("output1") or [] if isinstance(positions_list, dict): positions_list = [positions_list] if not isinstance(positions_list, list): positions_list = [] watchlist_dca_codes = { (item.get("code") or "").strip() for item in self.watchlist if (item.get("strategy") == "LONG" or item.get("use") == "dca") } watchlist_dca_codes.discard("") if not watchlist_dca_codes: return for row in positions_list: code = (row.get("pdno") or "").strip() if not code or code in self.holdings or code not in watchlist_dca_codes: continue try: qty = int(float(row.get("hldg_qty") or row.get("ord_psbl_qty") or 0)) if qty <= 0: continue evlu_amt = float(row.get("evlu_amt") or 0) evlu_pfls = float(row.get("evlu_pfls_amt") or 0) pchs_avg = row.get("pchs_avg_pric") if pchs_avg is not None and str(pchs_avg).strip() != "": avg_price = abs(float(str(pchs_avg).replace(",", "").strip())) else: # 매입금액 = 평가금액 - 평가손익 → 평단 = 매입금액/수량 cost = evlu_amt - evlu_pfls avg_price = cost / qty if qty else 0 if avg_price <= 0: continue name = (row.get("prdt_name") or row.get("prdt_name_eng") or code).strip() self.holdings[code] = { "buy_prices": [avg_price], "qtys": [qty], "total_qty": qty, "avg_price": avg_price, "first_buy_date": dt.now().strftime("%Y-%m-%d %H:%M:%S"), "name": name or code, } logger.info( f"📂 [늘림목] watchlist LONG 반영: {name} ({code}) {qty}주 평단 {avg_price:,.0f}원 (계좌 보유)" ) except Exception as e: logger.debug(f"watchlist 병합 스킵 {code}: {e}") def _update_assets(self): """자산 정보 업데이트 (단타와 동일: inquire-balance 응답에서 output2로 예수금)""" try: balance = self.client.get_account_evaluation() if not balance: return # watchlist에 LONG/dca로 넣어둔 계좌 보유 종목을 holdings에 합침 self._merge_watchlist_holdings_from_balance(balance) # 단타와 동일: output2=예수금 요약 out2 = balance.get("output2") if isinstance(out2, list) and out2: out2 = out2[0] out2 = out2 if isinstance(out2, dict) else {} ord_psbl = out2.get("ord_psbl_cash") or out2.get("dnca_tot_amt") if ord_psbl is not None and str(ord_psbl).strip() != "": self.current_cash = float(str(ord_psbl).replace(",", "").strip()) # 보유 종목 평가액 (output1=종목별 잔고 리스트에서 evlu_amt 사용, 없으면 현재가 조회) output1_list = balance.get("output1") or [] if isinstance(output1_list, dict): output1_list = [output1_list] holdings_value = 0 for code, holding in self.holdings.items(): for item in output1_list: if (item.get("pdno") or "").strip() == code: holdings_value += float(item.get("evlu_amt", 0) or 0) break else: price_data = self.client.inquire_price(code) if price_data: holdings_value += abs(float(price_data.get("stck_prpr", 0) or 0)) * holding["total_qty"] self.current_total_asset = self.current_cash + holdings_value if self.start_day_asset == 0: self.start_day_asset = self.current_total_asset self._last_balance_fetch_time = time.time() except Exception as e: logger.error(f"자산 정보 업데이트 실패: {e}") def _update_account_light(self, profit_val=0): """ 경량 계좌 갱신 (매수/매도 직후 즉시 호출!) - API 부하를 줄이기 위해 예수금 + 보유 종목 평가액만 빠르게 계산 - 총자산 = 예수금 + 보유 종목 평가액 (+ 손익 반영) """ try: balance = self.client.get_account_evaluation() if not balance: logger.warning("💵 [예수금] 잔고 API 응답 없음 → 예수금 갱신 스킵 (TR=VTTC8434R/TTTC8434R 확인)") return False self._merge_watchlist_holdings_from_balance(balance) # 단타와 동일: output2=예수금 요약, output1=종목별 잔고 리스트 def _cash_block(obj): if not obj: return {} if isinstance(obj, list) and obj: return obj[0] return obj if isinstance(obj, dict) else {} def _parse_amt(v): if v is None or str(v).strip() == "": return None return float(str(v).replace(",", "").strip()) out2 = _cash_block(balance.get("output2")) ord_psbl_val = _parse_amt(out2.get("ord_psbl_cash")) dnca_tot_val = _parse_amt(out2.get("dnca_tot_amt")) or 0 new_cash = ord_psbl_val if ord_psbl_val is not None else dnca_tot_val if new_cash > 0 or self.current_cash == 0: self.current_cash = new_cash prvs_rcdl = _parse_amt(out2.get("prvs_rcdl_excc_amt")) self.d2_excc_amt = prvs_rcdl if prvs_rcdl is not None else 0 # 보유 종목 평가액: inquire-balance는 output1이 종목별 잔고(리스트) holdings_value = 0 output1_list = balance.get("output1") or [] if isinstance(output1_list, dict): output1_list = [output1_list] for code, holding in self.holdings.items(): for item in output1_list: if (item.get("pdno") or "").strip() == code: holdings_value += float(item.get("evlu_amt", 0) or 0) break else: price_data = self.client.inquire_price(code) if price_data: holdings_value += abs(float(price_data.get("stck_prpr", 0) or 0)) * holding["total_qty"] self.current_total_asset = self.current_cash + holdings_value if profit_val != 0: self.current_total_asset += profit_val if self.start_day_asset == 0: self.start_day_asset = self.current_total_asset self._last_balance_fetch_time = time.time() logger.debug(f"💵 [경량갱신] 예수금: {self.current_cash:,.0f}원 | 총자산: {self.current_total_asset:,.0f}원") return True except Exception as e: logger.error(f"❌ 경량 갱신 실패: {e}") return False def _update_cash_only(self): """예수금만 빠르게 업데이트 (하위 호환성용, _update_account_light 사용 권장)""" return self._update_account_light(profit_val=0) def _ensure_account_balance(self, profit_val=0, min_interval_sec=0): """예수금 조회 성공할 때까지 재시도. min_interval_sec>0이면 그 초 미만일 때 캐시 사용(EGW00201 방지). 매수 직전에는 0으로 호출해 항상 최신 조회(단타와 공유 예수금).""" if min_interval_sec > 0: now = time.time() if now - getattr(self, "_last_balance_fetch_time", 0) < min_interval_sec: if getattr(self, "_last_balance_fetch_time", 0) > 0: return True retry_interval_sec = 3 while True: if self._update_account_light(profit_val=profit_val): return True logger.warning(f"💵 [예수금] 조회 실패 → {retry_interval_sec}초 후 재시도 (성공할 때까지)") time.sleep(retry_interval_sec) def _seconds_until_next_5min(self): """다음 5분 정각까지 남은 초 계산""" now = dt.now() next_min = ((now.minute // 5) + 1) * 5 if next_min >= 60: next_time = now.replace(hour=now.hour + 1, minute=0, second=0, microsecond=0) else: next_time = now.replace(minute=next_min, second=0, microsecond=0) return (next_time - now).total_seconds() async def _report_scheduler(self): """리포트 전송 스케줄러 (비동기 백그라운드)""" while True: try: await asyncio.sleep(60) # 1분마다 체크 now = dt.now() # 13:00 - 오전 리포트 + AI 리포트 if now.hour == 13 and now.minute == 0 and not self.morning_report_sent: loop = asyncio.get_event_loop() await loop.run_in_executor(None, self.send_morning_report) await loop.run_in_executor(None, self.send_ai_report) # 15:15 - 장마감 전 리포트 elif now.hour == 15 and now.minute == 15 and not self.closing_report_sent: loop = asyncio.get_event_loop() await loop.run_in_executor(None, self.send_closing_report) # 15:35 - 최종 리포트 elif now.hour == 15 and now.minute == 35 and not self.final_report_sent: loop = asyncio.get_event_loop() await loop.run_in_executor(None, self.send_final_report) except asyncio.CancelledError: break except Exception as e: logger.error(f"❌ [리포트 스케줄러] 에러: {e}") await asyncio.sleep(60) async def _asset_update_scheduler(self): """자산 정보 업데이트 스케줄러 (30분마다, 비동기 백그라운드)""" while True: try: await asyncio.sleep(60) # 1분마다 체크 now = dt.now() # 30분마다 자산 업데이트 if now.minute % 30 == 0: loop = asyncio.get_event_loop() await loop.run_in_executor(None, self._update_assets) await asyncio.sleep(60) # 업데이트 후 1분 대기 (중복 방지) except asyncio.CancelledError: break except Exception as e: logger.error(f"❌ [자산 업데이트 스케줄러] 에러: {e}") await asyncio.sleep(60) def send_mm(self, msg): """Mattermost 알림 전송""" try: self.mm.send(self.mm_channel, msg) except Exception as e: logger.error(f"❌ MM 전송 에러: {e}") def check_market_status(self): """장 운영 시간 체크""" # FORCE_MARKET_OPEN 플래그 확인 (테스트용) force_open = get_env_bool("FORCE_MARKET_OPEN", False) if force_open: logger.debug("🔓 FORCE_MARKET_OPEN=true - 장 상태 무시하고 계속 진행") return True # 정상 장 운영 시간 체크 now = dt.now() if not (datetime.time(8, 30) <= now.time() <= datetime.time(16, 0)): return False if now.weekday() >= 5: # 주말 return False return True def send_morning_report(self): """오전 장 뜸할 때 리포트 (13:00)""" if self.morning_report_sent: return self._update_assets() day_pnl = self.current_total_asset - self.start_day_asset day_pnl_pct = (day_pnl / self.start_day_asset * 100) if self.start_day_asset > 0 else 0 msg = f"""📊 **[오전 장 현황 - 13:00]** - 당일 시작: {self.start_day_asset:,.0f}원 - 현재 자산: {self.current_total_asset:,.0f}원 - 당일 손익: {day_pnl:+,.0f}원 ({day_pnl_pct:+.2f}%) - 보유 종목: {len(self.holdings)}개""" self.send_mm(msg) self.morning_report_sent = True logger.info("📊 오전 리포트 전송 완료") def send_closing_report(self): """장마감 전 리포트 (15:15)""" if self.closing_report_sent: return self._update_assets() day_pnl = self.current_total_asset - self.start_day_asset day_pnl_pct = (day_pnl / self.start_day_asset * 100) if self.start_day_asset > 0 else 0 msg = f"""📈 **[장마감 전 현황 - 15:15]** - 당일 손익: {day_pnl:+,.0f}원 ({day_pnl_pct:+.2f}%) - 현재 자산: {self.current_total_asset:,.0f}원 - 보유 종목: {len(self.holdings)}개 - 예수금: {self.current_cash:,.0f}원""" self.send_mm(msg) self.closing_report_sent = True logger.info("📈 장마감 전 리포트 전송 완료") def send_final_report(self): """장마감 후 최종 리포트 (15:35)""" if self.final_report_sent: return self._update_assets() # 당일 손익 day_pnl = self.current_total_asset - self.start_day_asset day_pnl_pct = (day_pnl / self.start_day_asset * 100) if self.start_day_asset > 0 else 0 # 누적 손익 cumulative_pnl = self.current_total_asset - self.total_deposit cumulative_pnl_pct = (cumulative_pnl / self.total_deposit * 100) if self.total_deposit > 0 else 0 # 오늘 거래 내역 today_trades = self.db.get_trades_by_date(self.today_date) msg = f"""🏁 **[장마감 최종 보고 - 15:35]** ━━━━━━━━━━━━━━━━━━━━ 📅 **당일 손익** - 시작: {self.start_day_asset:,.0f}원 - 종료: {self.current_total_asset:,.0f}원 - 손익: {day_pnl:+,.0f}원 ({day_pnl_pct:+.2f}%) 💰 **누적 손익 (총 입금액 대비)** - 총 입금: {self.total_deposit:,.0f}원 - 현재 자산: {self.current_total_asset:,.0f}원 - 누적 손익: {cumulative_pnl:+,.0f}원 ({cumulative_pnl_pct:+.2f}%) 📊 **거래 현황** - 오늘 매매: {len(today_trades)}건 - 보유 종목: {len(self.holdings)}개 - 예수금: {self.current_cash:,.0f}원 ━━━━━━━━━━━━━━━━━━━━""" self.send_mm(msg) self.final_report_sent = True logger.info("🏁 장마감 최종 리포트 전송 완료") def send_ai_report(self): """AI 분석 리포트 (13:00)""" if self.ai_report_sent or not gemini_model: return try: # 최근 거래 내역 조회 recent_trades = [] try: conn = self.db.conn cursor = conn.execute(""" SELECT code, name, buy_price, sell_price, qty, profit_rate, realized_pnl, strategy, sell_reason, buy_date, sell_date, hold_minutes FROM trade_history WHERE strategy LIKE 'LONG%' ORDER BY id DESC LIMIT 10 """) for row in cursor.fetchall(): recent_trades.append({ 'code': row[0], 'name': row[1], 'buy_price': row[2], 'sell_price': row[3], 'qty': row[4], 'profit_rate': row[5], 'realized_pnl': row[6], 'strategy': row[7], 'sell_reason': row[8], 'buy_date': row[9], 'sell_date': row[10], 'hold_minutes': row[11] or 0 }) except Exception as e: logger.error(f"거래 내역 조회 실패: {e}") return if not recent_trades: return # 통계 계산 total = len(recent_trades) wins = sum(1 for t in recent_trades if t['profit_rate'] > 0) losses = total - wins win_rate = wins / total * 100 if total > 0 else 0 avg_profit = sum(t['profit_rate'] for t in recent_trades) / total total_pnl = sum(t['realized_pnl'] for t in recent_trades) avg_hold_days = sum(t['hold_minutes'] for t in recent_trades) / total / 1440 # 분을 일로 변환 # AI 분석 trades_text = "" for i, t in enumerate(recent_trades, 1): trades_text += f""" [거래 {i}] {t['name']} ({t['strategy']}) - 매수: {t['buy_price']:,.0f}원 × {t['qty']}주 - 매도: {t['sell_price']:,.0f}원 - 손익: {t['profit_rate']:+.2f}% ({t['realized_pnl']:,.0f}원) - 보유: {avg_hold_days:.1f}일 - 사유: {t['sell_reason']} """ prompt = f"""당신은 퀀트 트레이딩 전문가입니다. 다음은 최근 {total}건의 장기/늘림목 거래 내역입니다: {trades_text} 📊 통계: - 승률: {win_rate:.1f}% ({wins}승 {losses}패) - 평균 수익률: {avg_profit:.2f}% - 총 손익: {total_pnl:,.0f}원 - 평균 보유: {avg_hold_days:.1f}일 **당신의 임무:** 1. 문제점 3가지 진단 (구체적으로) 2. DB 설정 수정 권장사항 (변수명=값 형식) 3. 예상 효과 **출력 형식:** ## 🔍 문제점 1. [구체적 문제 1] 2. [구체적 문제 2] 3. [구체적 문제 3] ## 💡 권장 수정사항 ``` MAX_PER=XX MAX_PEG=X.XX DCA_INTERVALS=X,-X.XX,-X.XX ... ``` ## 📈 예상 효과 - [효과 1] - [효과 2] **간결하고 명확하게 답변하세요.** """ response = gemini_model.generate_content(prompt) analysis = response.text summary = f"""📊 최근 {total}건 거래 통계 - 승률: {win_rate:.1f}% ({wins}승 {losses}패) - 평균 수익률: {avg_profit:.2f}% - 총 손익: {total_pnl:,.0f}원 - 평균 보유: {avg_hold_days:.1f}일""" message = f"""🤖 **[13시 AI 자동 분석]** {summary} {analysis} --- 💬 늘림목 전략 최적화를 위한 AI 분석입니다. """ self.send_mm(message) self.ai_report_sent = True logger.info("🤖 AI 리포트 전송 완료") except Exception as e: logger.error(f"AI 리포트 생성 실패: {e}") def analyze_stock_value(self, code, name): """종목 밸류에이션 분석 - 고도화된 분석""" try: # 현재가 조회 price_data = self.client.inquire_price(code) if not price_data: return None current_price = abs(float(price_data.get("stck_prpr", 0))) if current_price == 0: return None # PER/PEG 조회 fund_data = get_kis_per_eps_peg(self.client, code, current_price) per = fund_data.get("per") peg = fund_data.get("peg") growth_pct = fund_data.get("growth_pct") # 차트 분석 df = get_kis_daily_chart(self.client, code, max_days=65) if df.empty: return None # RSI 계산 delta = df["close"].diff(1) gain = delta.where(delta > 0, 0).rolling(window=14).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() rs = gain / loss.replace(0, float("nan")) rsi = 100 - (100 / (1 + rs)) current_rsi = float(rsi.iloc[-1]) if len(rsi) > 0 else None # [필터 1] RSI 과열 체크 rsi_threshold = get_env_float("RSI_OVERHEAT_THRESHOLD", 78.0) if current_rsi and current_rsi >= rsi_threshold: logger.info(f"🔍 [Pass-RSI] {name} {code}: RSI 과열 ({current_rsi:.1f} >= {rsi_threshold})") return None # 변동성 volatility = calculate_volatility(df) # 이동평균 ma20 = float(df["MA20"].iloc[-1]) if "MA20" in df.columns and len(df) >= 20 else None ma60 = float(df["MA60"].iloc[-1]) if "MA60" in df.columns and len(df) >= 60 else None # [필터 2] MA20 체크 if ma20 and current_price < ma20: logger.info(f"🔍 [Pass-MA20] {name} {code}: 현재가({current_price:.0f}) < MA20({ma20:.2f})") return None # 외국인/기관 동향 분석 investor_trend = self.client.get_investor_trend(code, days=5) investor_score = 0 investor_signal = "중립" if investor_trend: total_net = investor_trend.get("total_net_buy", 0) if total_net > 20000: investor_score = 25 investor_signal = "강한 매수세" elif total_net > 5000: investor_score = 15 investor_signal = "매수세" elif total_net < -20000: investor_score = -25 investor_signal = "강한 매도세" elif total_net < -5000: investor_score = -15 investor_signal = "매도세" # 종합 점수 계산 score = 50 # 기본 점수 # PER 점수 if per is not None: if per < 10: score += 30 elif per < 15: score += 20 elif per < 20: score += 10 elif per > 30: score -= 20 # PEG 점수 if peg is not None: if peg < 1.0: score += 30 elif peg < 1.5: score += 15 elif peg > 2.0: score -= 20 # RSI 점수 if current_rsi is not None: if current_rsi < 30: score += 20 # 과매도 구간 elif current_rsi < 40: score += 10 elif current_rsi > 70: score -= 20 # 과매수 구간 # 이동평균 점수 if ma20 is not None and ma60 is not None: if ma20 > ma60: score += 10 # 정배열 ma_gap_pct = ((ma20 - ma60) / ma60) * 100 if ma_gap_pct > 5: score += 5 # 강한 상승세 # 외국인/기관 점수 추가 score += investor_score analysis_result = { "code": code, "name": name, "current_price": current_price, "per": per, "peg": peg, "growth_pct": growth_pct, "rsi": current_rsi, "volatility": volatility, "ma20": ma20, "ma60": ma60, "investor_trend": investor_trend, "investor_signal": investor_signal, "score": score, "is_buyable": ( (per is None or per <= self.max_per) and (peg is None or peg <= self.max_peg) and (growth_pct is None or growth_pct >= self.min_growth_pct) and score >= 60 and investor_score >= -10 # 강한 매도세는 제외 ), } # 필터링 로그 출력 (return 전에 실행되도록 수정) if not analysis_result["is_buyable"]: reasons = [] if per and per > self.max_per: reasons.append(f"PER {per:.1f} > {self.max_per}") if peg and peg > self.max_peg: reasons.append(f"PEG {peg:.2f} > {self.max_peg}") if growth_pct and growth_pct < self.min_growth_pct: reasons.append(f"성장률 {growth_pct:.1f}% < {self.min_growth_pct}%") if score < 60: reasons.append(f"점수 {score:.1f} < 60") if investor_score < -10: reasons.append(f"수급 점수 {investor_score} < -10") if reasons: logger.info(f"🔍 [Pass-밸류] {name} {code}: {', '.join(reasons)}") return analysis_result except Exception as e: logger.error(f"종목 분석 실패({code}): {e}") return None def check_dca_opportunity(self, code, holding): """늘림목(분할 매수) 기회 체크""" if code not in self.holdings: return None try: price_data = self.client.inquire_price(code) if not price_data: return None current_price = abs(float(price_data.get("stck_prpr", 0))) avg_price = holding["avg_price"] # 평단가 대비 하락률 계산 drop_pct = (current_price - avg_price) / avg_price # 분할 매수 구간 확인 (구간별 1회만 매수 - i번째 구간은 len(buy_prices) > i 이면 이미 매수 완료) buy_prices = holding.get("buy_prices", []) for i, interval in enumerate(self.dca_intervals): if drop_pct <= interval: # i번째 DCA 구간 이미 매수했는지 확인 (매수 횟수로 구간 추적) if len(buy_prices) > i: continue # 이미 이 구간 매수 완료 amount = self.dca_amounts[i] if i < len(self.dca_amounts) else self.dca_amounts[-1] qty = int(amount / current_price) if qty > 0: return { "code": code, "name": holding.get("name", code), "price": current_price, "qty": qty, "drop_pct": drop_pct, "interval": interval, } return None except Exception as e: logger.error(f"늘림목 체크 실패({code}): {e}") return None def check_sell_signals(self): """매도 신호 체크 (손절/익절)""" if not self.holdings: return [] sell_signals = [] for code, holding in list(self.holdings.items()): try: price_data = self.client.inquire_price(code) if not price_data: continue current_price = abs(float(price_data.get("stck_prpr", 0))) avg_price = holding["avg_price"] profit_pct = (current_price - avg_price) / avg_price # 손절 또는 익절 체크 if profit_pct <= self.stop_loss_pct: sell_signals.append({ "code": code, "name": holding.get("name", code), "reason": "손절", "profit_pct": profit_pct, "qty": holding["total_qty"], }) elif profit_pct >= self.take_profit_pct: sell_signals.append({ "code": code, "name": holding.get("name", code), "reason": "익절", "profit_pct": profit_pct, "qty": holding["total_qty"], }) time.sleep(random.uniform(0.3, 0.7)) except Exception as e: logger.error(f"매도 신호 체크 실패({code}): {e}") continue return sell_signals def _check_daily_limits(self, amount): """일일 한도 체크 - 매수 전 반드시 호출. '하루' = 산 날짜(buy_date) 기준, DB에서 조회.""" today = dt.now().strftime("%Y-%m-%d") if today != self.today_date: self.today_date = today self.morning_report_sent = False self.closing_report_sent = False self.final_report_sent = False # 오늘(산 시점 날짜) 매수 누적은 DB에서 조회 (재시작해도 유지) daily_buy_amount, daily_buy_count = self.db.get_daily_buy_amount(today, "LONG") if len(self.holdings) >= self.max_stocks: logger.info(f"🚫 최대 종목 수 도달: {len(self.holdings)}/{self.max_stocks}개") return False, "max_stocks" daily_limit = ( self.daily_buy_limit if self.daily_buy_limit > 0 else self.current_total_asset * 0.30 ) if daily_buy_amount + amount > daily_limit: remain = daily_limit - daily_buy_amount logger.info( f"🚫 일일 매수 한도 도달: " f"누적 {daily_buy_amount:,.0f}원 / 한도 {daily_limit:,.0f}원 (잔여 {remain:,.0f}원)" ) self.send_mm( f"⛔ **일일 매수 한도 도달**\n" f"- 오늘 매수: {daily_buy_amount:,.0f}원\n" f"- 한도: {daily_limit:,.0f}원\n" f"- 오늘 추가 매수 중단 (매도/손절 감시는 계속)" ) return False, "daily_limit" return True, "ok" def _after_buy(self, code, amount, name, strategy, qty): """매수 성공 후 DB에 체결 이력 저장 (산 시점 날짜 기준 → 재시작해도 한도 유지)""" self.db.insert_buy_execution(code=code, name=name, strategy=strategy, amount=amount, qty=qty) today = dt.now().strftime("%Y-%m-%d") daily_buy_amount, daily_buy_count = self.db.get_daily_buy_amount(today, "LONG") logger.info( f"📊 일일 매수 현황: {daily_buy_count}건 / {daily_buy_amount:,.0f}원 (오늘, DB 기준)" ) def execute_buy(self, signal, is_dca=False): """매수 실행""" code = signal["code"] name = signal["name"] price = signal["price"] qty = signal["qty"] # 🔥 매수 직전 예수금 실시간 조회 (단타와 같이 돌릴 때 공유 예수금 반영, 캐시 없이 항상 조회) self._ensure_account_balance(profit_val=0, min_interval_sec=0) # 예수금 부족 체크 (수수료 포함 여유분 5% 고려) required_amount = price * qty * 1.05 # 수수료 포함 order_amount = int(price * qty) # 1회 주문 최소 10만 원 (수수료 비율 감안). 0원이면 가격/수량 확인 필요 if order_amount < 100_000: logger.warning( f"⚠️ [{name}] 1회 주문 최소 10만 원 미만: {order_amount:,}원 (price={price:,.0f}, qty={qty}) -> 매수 스킵" ) return False ok, reason = self._check_daily_limits(order_amount) if not ok: return False if self.current_cash < required_amount: logger.warning( f"⚠️ [{name}] 예수금 부족: 필요 {required_amount:,.0f}원 / " f"보유 {self.current_cash:,.0f}원 -> 매수 스킵" ) return False # 매수 주문 success = self.client.buy_market_order(code, qty) if success: # 매수 후 예수금 + 총자산 즉시 업데이트 (다음 매수 시 정확한 예수금 확인) self._update_account_light(profit_val=0) buy_time = dt.now().strftime("%Y-%m-%d %H:%M:%S") if code not in self.holdings: self.holdings[code] = { "buy_prices": [], "qtys": [], "total_qty": 0, "avg_price": 0, "first_buy_date": buy_time, "name": name, } holding = self.holdings[code] holding["buy_prices"].append(price) holding["qtys"].append(qty) holding["total_qty"] += qty # 평단가 재계산 total_cost = sum(p * q for p, q in zip(holding["buy_prices"], holding["qtys"])) holding["avg_price"] = total_cost / holding["total_qty"] # DB에 저장/업데이트 self.db.upsert_trade({ "code": code, "name": name, "strategy": "LONG_DCA" if is_dca else "LONG_INITIAL", "avg_buy_price": holding["avg_price"], "current_price": price, "target_qty": holding["total_qty"], "current_qty": holding["total_qty"], "status": "HOLDING", "buy_date": holding["first_buy_date"], }) self._after_buy( code, int(price * qty), name=name, strategy="LONG_DCA" if is_dca else "LONG_INITIAL", qty=qty, ) action = "늘림목 매수" if is_dca else "초기 매수" logger.info(f"💰 [{action}] {name} ({code}): {price:,.0f}원 × {qty}주 (평단: {holding['avg_price']:,.0f}원)") return True return False def execute_sell(self, signal): """매도 실행""" code = signal["code"] name = signal["name"] qty = signal["qty"] if code not in self.holdings: logger.warning(f"⚠️ [{name}] 보유 종목 아님") return False # 매도 주문 success = self.client.sell_market_order(code, qty) if success: # 현재가 조회 price_data = self.client.inquire_price(code) sell_price = abs(float(price_data.get("stck_prpr", 0))) if price_data else signal.get("price", 0) holding = self.holdings[code] profit_pct = signal["profit_pct"] # DB에서 매도 처리 self.db.close_trade( code=code, sell_price=sell_price, sell_reason=signal['reason'], ) # 손익 계산 (매도 후 총자산 반영용) profit_val = (sell_price - holding["avg_price"]) * qty # 손익 금액 del self.holdings[code] # 🔥 매도 후 예수금 + 총자산 즉시 업데이트 (손익 반영) self._update_account_light(profit_val=profit_val) logger.info( f"💸 [매도 체결] {name} ({code}): {qty}주 " f"({signal['reason']}, {profit_pct*100:+.2f}%, 평단: {holding['avg_price']:,.0f}원)" ) return True return False def run(self): """메인 루프 (진입점). 내부적으로 asyncio.run(_run_async()) 호출.""" asyncio.run(self._run_async()) async def _run_async(self): """비동기 메인 루프 - 백그라운드 태스크 시작 후 동기 매매 루프 실행""" logger.info("🚀 늘림목 트레이딩 봇 시작 (비동기 백그라운드 작업 활성화)") # 백그라운드 태스크 시작 self._report_task = asyncio.create_task(self._report_scheduler()) self._asset_task = asyncio.create_task(self._asset_update_scheduler()) logger.info("✅ 백그라운드 태스크 시작 완료 (리포트, 자산 업데이트)") # 동기 매매 루프는 별도 스레드에서 실행 (메인 이벤트 루프 블로킹 방지) loop = asyncio.get_event_loop() await loop.run_in_executor(None, self._sync_trading_loop) def _sync_trading_loop(self): """동기 매매 루프 (메인 로직) - 백그라운드 작업과 분리""" logger.info("📈 매매 루프 시작 (동기 모드)") # 최초 예수금 조회: 성공할 때까지 3초 간격 재시도 (매매는 그 다음 5~10분 간격) self._ensure_account_balance(profit_val=0) logger.info( f"💵 예수금 조회 완료 → 매매 루프 진입 | 예수금(주문가능): {self.current_cash:,.0f}원 | D+2: {self.d2_excc_amt:,.0f}원" ) while True: try: now = dt.now() current_date = now.strftime("%Y-%m-%d") # 날짜 변경 시 리포트 플래그 리셋 if current_date != self.today_date: self.today_date = current_date self.morning_report_sent = False self.closing_report_sent = False self.final_report_sent = False self.ai_report_sent = False self.start_day_asset = 0 logger.info(f"📅 날짜 변경: {current_date}") # 리포트 전송은 백그라운드 태스크에서 처리하므로 여기서는 제거 # (기존 코드와의 호환성을 위해 주석 처리) # if now.hour == 13 and now.minute == 0: # self.send_morning_report() # self.send_ai_report() # elif now.hour == 15 and now.minute == 15: # self.send_closing_report() # elif now.hour == 15 and now.minute == 35: # self.send_final_report() # 장 상태 체크 if not self.check_market_status(): logger.info("⏸ 장 시간 아님 - 대기 중...") time.sleep(60) continue # 자산 정보 업데이트는 백그라운드 태스크에서 처리하므로 여기서는 제거 # (기존 코드와의 호환성을 위해 주석 처리) # if now.minute % 30 == 0: # 30분마다 # self._update_assets() # 매도 신호 체크 (우선) - 메인 루프에서 처리 sell_signals = self.check_sell_signals() for signal in sell_signals: self.execute_sell(signal) time.sleep(random.uniform(1, 2)) # 늘림목 기회 체크 for code, holding in list(self.holdings.items()): dca_signal = self.check_dca_opportunity(code, holding) if dca_signal: self.execute_buy(dca_signal, is_dca=True) time.sleep(random.uniform(1, 2)) # 신규 매수 기회 스캔 (관심 종목 리스트 기준) active_count = len(self.holdings) if len(self.watchlist) > 0: logger.info(f"🔍 [매수 기회 탐색] 타겟:{len(self.watchlist)}개 | 보유:{active_count}/{self.max_stocks}") for item in self.watchlist: code = item.get("code", "").strip() name = item.get("name", code) if not code or code in self.holdings: continue # 밸류에이션 분석 analysis = self.analyze_stock_value(code, name) if not analysis or not analysis["is_buyable"]: continue # 초기 매수 실행 signal = { "code": code, "name": name, "price": analysis["current_price"], "qty": int(100000 / analysis["current_price"]), # 10만원 분할 } self.execute_buy(signal, is_dca=False) time.sleep(random.uniform(2, 4)) break # 한 번에 하나씩만 # 대기 (늘림목은 변동성 적음 → 5~10분 간격, env: LOOP_INTERVAL_MIN_LOW / LOOP_INTERVAL_MIN_HIGH) wait_sec = random.uniform( max(60, self.loop_interval_min_low * 60), max(60, self.loop_interval_min_high * 60), ) logger.info(f"⏳ 다음 체크까지 {wait_sec/60:.1f}분 대기 (로그 멈춤 아님)") time.sleep(wait_sec) except KeyboardInterrupt: logger.info("⏹ 봇 종료") # 백그라운드 태스크 취소 if self._report_task: self._report_task.cancel() if self._asset_task: self._asset_task.cancel() break except Exception as e: logger.error(f"❌ 루프 에러: {e}") import traceback logger.error(traceback.format_exc()) time.sleep(60) if __name__ == "__main__": bot = LongTradingBot() bot.run()