""" KIS Long Watch Bot Ver2 - 위시리스트/늘림목 관찰 전용 리포트 봇 - 한국투자증권(KIS) Open API 사용 (kis_long_ver1의 KISClientWithOrder 재사용) - 매매는 하지 않고, 보유/관심 종목을 모니터링해서 리포트와 뉴스 기반 알림만 전송 - linux_bot 형식 장기 투자 체크 리포트 (💰📊📈💼🎯🤖 + AI 4줄) 장 시작/마감 전송 - 수치는 전부 DB/env에서 로드 (하드코딩 금지) """ import asyncio import logging import random import time from datetime import datetime as dt, timedelta from pathlib import Path from typing import Dict, List, Optional import pandas as pd from database import TradeDB from news_analyzer import NewsAnalyzer from kis_long_ver1 import ( KISClientWithOrder, MattermostBot, get_env_float, get_env_int, get_env_bool, get_env_from_db, get_kis_daily_chart, get_kis_per_eps_peg, calculate_volatility, MM_CHANNEL_LONG, MM_CHANNEL_DEFAULT, SCRIPT_DIR, db as shared_db, ) # Gemini AI (4줄 인사이트 생성용) try: from kis_long_ver1 import gemini_model except Exception: gemini_model = None # 로깅 설정 logging.basicConfig( format="[%(asctime)s] %(message)s", datefmt="%H:%M:%S", level=logging.INFO, ) logger = logging.getLogger("KISLongBotV2") class LongWatchBotV2: """ 위시리스트/보유 종목을 기반으로 장기 관찰 리포트와 뉴스 알림을 보내는 봇. - KIS API로 현재가/일봉 차트 조회 (6개월 등, env로 조절) - long_term_watchlist.json + LONG 전략 보유 종목 기준 - 매수/매도 주문은 하지 않음 (완전 관찰 전용) """ def __init__(self): # DB는 기존 공유 인스턴스를 재사용 (kis_long_ver1과 동일) self.db: TradeDB = shared_db # KIS 읽기 전용 클라이언트 (주문 메서드는 사용하지 않음) self.client = KISClientWithOrder() # Mattermost self.mm = MattermostBot() self.mm_channel = MM_CHANNEL_LONG or MM_CHANNEL_DEFAULT # 위시리스트 파일 self.watchlist_path: Path = SCRIPT_DIR / "long_term_watchlist.json" self.watchlist: List[Dict] = self._load_watchlist() # DB에 이미 LONG 보유 중인 종목 (늘림목 포지션) self.holdings: Dict[str, Dict] = {} active_trades = self.db.get_active_trades(strategy_prefix="LONG") for code, trade in active_trades.items(): qty = trade.get("current_qty", 0) or 0 if qty <= 0: continue self.holdings[code] = { "code": code, "name": trade.get("name", code), "avg_price": trade.get("avg_buy_price", 0), "qty": qty, "first_buy_date": trade.get("buy_date", ""), } # 자산·리포트용 상태 self.current_cash: float = 0 self.current_total_asset: float = 0 self.start_day_asset: float = 0 self.total_deposit: float = get_env_float("TOTAL_DEPOSIT", 0) self.today_date: str = dt.now().strftime("%Y-%m-%d") # 뉴스 분석기 (ANTHROPIC_API_KEY 필요, 없으면 내부에서 비활성) self.news_analyzer = NewsAnalyzer() # 리포트/뉴스 스케줄 설정 (전부 env/DB에서 로드) self.daily_lookback_days: int = get_env_int("LONG_DAILY_LOOKBACK_DAYS", 180) self.ma_short_days: int = get_env_int("LONG_MA_SHORT_DAYS", 5) self.ma_long_days: int = get_env_int("LONG_MA_LONG_DAYS", 20) self.report_am_hour: int = get_env_int("LONG_REPORT_AM_HOUR", 9) self.report_am_min: int = get_env_int("LONG_REPORT_AM_MIN", 5) self.report_pm_hour: int = get_env_int("LONG_REPORT_PM_HOUR", 15) self.report_pm_min: int = get_env_int("LONG_REPORT_PM_MIN", 35) self.news_enabled: bool = get_env_bool("LONG_NEWS_ENABLED", True) self.news_interval_min: int = get_env_int("LONG_NEWS_INTERVAL_MIN", 60) self.news_active_start_hour: int = get_env_int("LONG_NEWS_ACTIVE_START_HOUR", 9) self.news_active_end_hour: int = get_env_int("LONG_NEWS_ACTIVE_END_HOUR", 23) self.news_max_count: int = get_env_int("NEWS_MAX_COUNT", 5) # 종목별 분석 사이 딜레이 (초당 호출 분산용, 기본 1~2초) self.analysis_delay_min: float = get_env_float("LONG_ANALYSIS_DELAY_MIN_SEC", 1.0) self.analysis_delay_max: float = get_env_float("LONG_ANALYSIS_DELAY_MAX_SEC", 2.0) if self.analysis_delay_min < 0: self.analysis_delay_min = 0.0 if self.analysis_delay_max < self.analysis_delay_min: self.analysis_delay_max = self.analysis_delay_min # 플래그 self.morning_report_sent: bool = False self.closing_report_sent: bool = False # ------------------------------------------------------------------ # 기본 데이터 로드/갱신 # ------------------------------------------------------------------ def _load_watchlist(self) -> List[Dict]: """long_term_watchlist.json에서 관심 종목 리스트 로드.""" if not self.watchlist_path.exists(): logger.warning(f"관심 종목 파일 없음: {self.watchlist_path}") return [] try: with open(self.watchlist_path, "r", encoding="utf-8") as f: data = f.read().strip() if not data: return [] try: obj = __import__("json").loads(data) except Exception as e: logger.error(f"관심 종목 JSON 파싱 실패: {e}") return [] items = obj.get("items", []) if isinstance(obj, dict) else [] codes = [it.get("code") for it in items] logger.info(f"📂 위시리스트 로드 완료: {len(items)}개 (codes={','.join(filter(None, codes))})") return items except Exception as e: logger.error(f"관심 종목 로드 실패: {e}") return [] def _update_assets(self) -> None: """계좌 평가 정보 갱신 (예수금 + LONG 보유 종목 시가 평가).""" try: balance = self.client.get_account_evaluation() if not balance: logger.warning("💵 [롱봇V2] 계좌 평가 응답 없음 → 자산 갱신 스킵") return # 예수금 (output2) out2 = balance.get("output2") if isinstance(out2, list) and out2: out2 = out2[0] out2 = out2 if isinstance(out2, dict) else {} ord_psbl = out2.get("ord_psbl_cash") or out2.get("dnca_tot_amt") self.current_cash = float(str(ord_psbl or 0).replace(",", "").strip()) if ord_psbl is not None else 0 # 보유 종목 평가액 (output1) output1_list = balance.get("output1") or [] if isinstance(output1_list, dict): output1_list = [output1_list] holdings_value = 0.0 for code, h in self.holdings.items(): evlu = None for row in output1_list: if (row.get("pdno") or "").strip() == code: evlu = float(row.get("evlu_amt", 0) or 0) break if evlu is not None: holdings_value += evlu else: # 계좌 평가 응답에 없으면 현재가로 대략 평가 try: price_data = self.client.inquire_price(code) if price_data: p = abs(float(price_data.get("stck_prpr", 0) or 0)) holdings_value += p * (h.get("qty") or h.get("total_qty") or 0) except Exception as e: logger.debug(f"보유평가 현재가 조회 실패({code}): {e}") self.current_total_asset = self.current_cash + holdings_value except Exception as e: logger.error(f"자산 정보 업데이트 실패: {e}") # ------------------------------------------------------------------ # 개별 종목 목록 헬퍼 # ------------------------------------------------------------------ def _get_watch_items(self) -> List[Dict]: """보유(LONG) + 위시리스트를 합친 유니크 종목 목록.""" items: Dict[str, Dict] = {} for code, h in self.holdings.items(): items[code] = { "code": code, "name": h.get("name", code), "avg_price": h.get("avg_price", 0), "qty": h.get("qty") or h.get("total_qty") or 0, "first_buy_date": h.get("first_buy_date", ""), "source": "HOLDING", } for it in self.watchlist: code = (it.get("code") or "").strip() if not code: continue if code not in items: items[code] = { "code": code, "name": it.get("name", code), "avg_price": float(it.get("avg_price", 0) or 0), "qty": int(it.get("qty", 0) or 0), "first_buy_date": it.get("first_buy", ""), "source": "WATCHLIST", } else: items[code]["source"] = "HOLDING+WATCH" return list(items.values()) # ------------------------------------------------------------------ # 기존 일봉 분석 (MA·6M 기준 요약용 - 뉴스 필터에서 계속 사용) # ------------------------------------------------------------------ def _fetch_daily_analysis(self, code: str) -> Optional[Dict]: """KIS 일봉 기반 MA·6M 고저 위치 계산 (기존 요약 리포트용).""" try: max_days = int(self.daily_lookback_days) if self.daily_lookback_days > 0 else 180 except Exception: max_days = 180 try: df = get_kis_daily_chart(self.client, code, max_days=max_days) except Exception as e: logger.error(f"일봉 차트 조회 실패({code}): {e}") return None if df is None or df.empty: logger.warning(f"일봉 데이터 없음({code})") return None try: if self.ma_short_days > 0: df["MA_SHORT"] = df["close"].rolling(window=self.ma_short_days).mean() else: df["MA_SHORT"] = pd.NA if self.ma_long_days > 0: df["MA_LONG"] = df["close"].rolling(window=self.ma_long_days).mean() else: df["MA_LONG"] = pd.NA except Exception as e: logger.debug(f"MA 계산 실패({code}): {e}") last = df.iloc[-1] close = float(last["close"]) ma_short = float(last.get("MA_SHORT") or 0) ma_long = float(last.get("MA_LONG") or 0) ma20 = float(last.get("MA20") or 0) hi_6m = float(df["high"].tail(max_days).max()) lo_6m = float(df["low"].tail(max_days).min()) def _pct(a: float, b: float) -> float: if b == 0: return 0.0 return (a / b - 1.0) * 100.0 return { "last_close": close, "ma_short": ma_short, "ma_long": ma_long, "ma20": ma20, "pct_vs_ma_short": _pct(close, ma_short) if ma_short > 0 else 0.0, "pct_vs_ma_long": _pct(close, ma_long) if ma_long > 0 else 0.0, "pct_vs_ma20": _pct(close, ma20) if ma20 > 0 else 0.0, "hi_6m": hi_6m, "lo_6m": lo_6m, "pct_from_6m_hi": _pct(close, hi_6m) if hi_6m > 0 else 0.0, "pct_from_6m_lo": _pct(close, lo_6m) if lo_6m > 0 else 0.0, } # ------------------------------------------------------------------ # linux_bot 형식 장기 투자 체크 리포트용 분석 메서드들 # ------------------------------------------------------------------ def _is_domestic_code(self, code: str) -> bool: """국내 6자리 종목코드 여부.""" c = (code or "").strip() return len(c) == 6 and c.isdigit() def _compute_rsi(self, df: pd.DataFrame, period: int = 14) -> Optional[float]: """일봉 close 기준 RSI 계산 (RSI = 과매수/과매도 지표, 0~100).""" if df is None or df.empty or len(df) < period + 1: return None try: delta = df["close"].diff(1) gain = delta.where(delta > 0, 0.0).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0.0)).rolling(window=period).mean() rs = gain / loss.replace(0, float("nan")) rsi = 100 - (100 / (1 + rs)) val = float(rsi.iloc[-1]) return val if pd.notna(val) else None except Exception as e: logger.debug(f"RSI 계산 실패: {e}") return None def _get_trade_history_summary(self, code: str, max_trades: int = 10) -> Optional[Dict]: """trade_history에서 해당 종목 최근 N건 승률·평균 수익률·주요 매도사유 요약.""" try: cursor = self.db.conn.execute( """ SELECT profit_rate, realized_pnl, sell_reason, hold_minutes FROM trade_history WHERE code = ? ORDER BY sell_date DESC LIMIT ? """, (code, max_trades), ) rows = [dict(zip([c[0] for c in cursor.description], r)) for r in cursor.fetchall()] except Exception as e: logger.debug(f"trade_history 조회 실패({code}): {e}") return None if not rows: return None total = len(rows) profits = [float(r.get("profit_rate") or 0) for r in rows] wins = len([p for p in profits if p > 0]) avg_profit = sum(profits) / total if total else 0.0 reason_count: Dict[str, int] = {} for r in rows: reason = (r.get("sell_reason") or "").strip() or "미기록" reason_count[reason] = reason_count.get(reason, 0) + 1 top_reasons = sorted(reason_count.items(), key=lambda x: x[1], reverse=True)[:3] return { "total": total, "wins": wins, "losses": total - wins, "avg_profit": avg_profit, "top_reasons": top_reasons, } def _full_stock_analysis(self, code: str, item: Dict) -> Optional[Dict]: """ 국내 종목 전체 분석: 가격·수익률·외인/기관·차트(RSI/변동성/정배열)·밸류(PER/PEG)·종합 판정 점수 """ name = item.get("name", code) avg_price = float(item.get("avg_price") or 0) qty = int(item.get("qty") or 0) is_holding = qty > 0 and avg_price > 0 # 현재가 조회 try: price_data = self.client.inquire_price(code) except Exception as e: logger.warning(f"현재가 조회 실패({code}): {e}") return {"code": code, "name": name, "error": "현재가 조회 실패"} if not price_data: return {"code": code, "name": name, "error": "현재가 조회 실패"} try: current_price = abs(float(price_data.get("stck_prpr") or 0)) prdy_ctrt_raw = (price_data.get("prdy_ctrt") or "0").strip().replace("+", "").replace("%", "") try: day_change_pct = float(prdy_ctrt_raw) except Exception: day_change_pct = 0.0 except Exception as e: logger.warning(f"가격 파싱 실패({code}): {e}") return {"code": code, "name": name, "error": "가격 파싱 실패"} if current_price <= 0: return {"code": code, "name": name, "error": "현재가 없음"} # 수익률 (보유 종목만) profit_val = (current_price - avg_price) * qty if is_holding else 0.0 profit_pct = (current_price - avg_price) / avg_price * 100 if is_holding else 0.0 # 일봉 차트 df = pd.DataFrame() try: df = get_kis_daily_chart(self.client, code, max_days=max(65, self.daily_lookback_days)) except Exception as e: logger.debug(f"일봉 조회 실패({code}): {e}") # RSI (과매수: 70+, 과매도: 30-) rsi = self._compute_rsi(df) if not df.empty else None # 변동성 (20일 수익률 표준편차 %) volatility = calculate_volatility(df) if not df.empty else None # 정배열/역배열 (MA20 > MA60 이면 정배열) ma20 = float(df["MA20"].iloc[-1]) if not df.empty and "MA20" in df.columns and pd.notna(df["MA20"].iloc[-1]) else None ma60 = float(df["MA60"].iloc[-1]) if not df.empty and "MA60" in df.columns and pd.notna(df["MA60"].iloc[-1]) else None if ma20 is not None and ma60 is not None: ma_arrangement = "정배열" if ma20 > ma60 else "역배열" else: ma_arrangement = "데이터없음" # PER / PEG (재무비율 API) per, peg = None, None try: fund = get_kis_per_eps_peg(self.client, code, current_price) if fund: per = fund.get("per") peg = fund.get("peg") time.sleep(random.uniform(0.3, 0.8)) except Exception as e: logger.debug(f"PER/PEG 조회 실패({code}): {e}") # 외인/기관 동향 (5일 순매수 합산) investor_text = "데이터 없음" investor_score = 0 try: trend = self.client.get_investor_trend(code, days=5) if trend is not None: total_net = trend.get("total_net_buy", 0) or 0 inv_strong_buy = get_env_float("LONG_INV_STRONG_BUY", 20000.0) inv_buy = get_env_float("LONG_INV_BUY", 5000.0) inv_strong_sell = get_env_float("LONG_INV_STRONG_SELL", -20000.0) inv_sell = get_env_float("LONG_INV_SELL", -5000.0) if total_net > inv_strong_buy: investor_text = "외인·기관 강한 매수세" investor_score = 25 elif total_net > inv_buy: investor_text = "외인·기관 소폭 매수" investor_score = 15 elif total_net < inv_strong_sell: investor_text = "외인·기관 강한 매도세" investor_score = -25 elif total_net < inv_sell: investor_text = "외인·기관 소폭 매도" investor_score = -15 else: investor_text = "외인·기관 중립" else: # 응답이 None이면 주말/공휴일 등 데이터 미존재 investor_text = "주말/공휴일 (거래 없음)" except Exception as e: logger.debug(f"외인/기관 조회 실패({code}): {e}") # 종합 판정 점수 (0~100) score = 50 # 기본값 per_high = get_env_float("PER_HIGH_THRESHOLD", 30.0) per_fair = get_env_float("PER_FAIR_THRESHOLD", 15.0) per_low = get_env_float("PER_LOW_THRESHOLD", 10.0) peg_great = get_env_float("PEG_GREAT_THRESHOLD", 1.0) peg_good = get_env_float("PEG_GOOD_THRESHOLD", 1.5) peg_bad = get_env_float("PEG_BAD_THRESHOLD", 2.0) rsi_oversold = get_env_float("RSI_OVERSOLD_THRESHOLD", 30.0) rsi_weak = get_env_float("RSI_WEAK_THRESHOLD", 40.0) rsi_overbought = get_env_float("RSI_OVERBOUGHT_THRESHOLD", 70.0) rsi_comment = "적정" if per is not None: if per < per_low: score += 30 elif per < per_fair: score += 20 elif per < per_high: score += 10 elif per >= per_high: score -= 20 if peg is not None: if peg < peg_great: score += 30 elif peg < peg_good: score += 15 elif peg > peg_bad: score -= 20 if rsi is not None: if rsi < rsi_oversold: score += 20 rsi_comment = "과매도" elif rsi < rsi_weak: score += 10 rsi_comment = "약세" elif rsi >= rsi_overbought: score -= 20 rsi_comment = "과매수" elif rsi > 60.0: rsi_comment = "과매수 근접" if ma20 is not None and ma60 is not None and ma20 > ma60: score += 10 score += investor_score score = max(0, min(100, int(score))) # 판정 라벨 및 한 줄 설명 (기준값 전부 env) v_full_score = get_env_float("LONG_VERDICT_FULL_SCORE", 75.0) v_full_profit = get_env_float("LONG_VERDICT_FULL_PROFIT_PCT", 50.0) v_partial_score = get_env_float("LONG_VERDICT_PARTIAL_SCORE", 60.0) v_partial_profit = get_env_float("LONG_VERDICT_PARTIAL_PROFIT_PCT", 20.0) v_stop_score = get_env_float("LONG_VERDICT_STOP_SCORE", 55.0) v_stop_loss = get_env_float("LONG_VERDICT_STOP_LOSS_PCT", -10.0) v_stop_mild = get_env_float("LONG_VERDICT_STOP_LOSS_MILD_PCT", -5.0) v_earn_profit = get_env_float("LONG_VERDICT_EARN_PROFIT_MIN", 30.0) v_earn_rsi = get_env_float("LONG_VERDICT_EARN_RSI_MIN", 65.0) if score >= v_full_score and profit_pct > v_full_profit: verdict_label = "🎯 전량 익절" verdict_reason = "고점 도달" elif score >= v_partial_score + 10 and profit_pct > v_partial_profit: verdict_label = "분할 익절" verdict_reason = "수익 일부 실현" elif score >= v_partial_score: verdict_label = "분할 익절" verdict_reason = "수익 일부 실현" elif score >= v_stop_score and profit_pct < v_stop_loss: verdict_label = "⚠️ 손절 고려 (확률 55%)" verdict_reason = "하락세 + 점수 낮음. 손절 후 바닥에서 재진입 검토" elif score <= v_stop_score and profit_pct < v_stop_mild: verdict_label = "🚨 손절 검토 (확률 65%)" verdict_reason = "손실 크고 하락 추세. 추가 손실 가능" elif profit_pct > v_earn_profit and rsi is not None and rsi > v_earn_rsi: verdict_label = "🎯 익절 권장" verdict_reason = "큰 수익 + 하락 전환" else: verdict_label = "관망" verdict_reason = "추가 관찰" return { "code": code, "name": name, "current_price": current_price, "day_change_pct": day_change_pct, "avg_price": avg_price, "qty": qty, "profit_val": profit_val, "profit_pct": profit_pct, "per": per, "peg": peg, "rsi": rsi, "rsi_comment": rsi_comment, "volatility": volatility, "ma_arrangement": ma_arrangement, "investor_text": investor_text, "score": score, "verdict_label": verdict_label, "verdict_reason": verdict_reason, } def _minimal_overseas_analysis(self, code: str, item: Dict) -> Optional[Dict]: """해외 종목: 현재가만 조회 후 최소 분석 (PER/RSI/외인 데이터 없음).""" name = item.get("name", code) avg_price = float(item.get("avg_price") or 0) qty = int(item.get("qty") or 0) is_holding = qty > 0 and avg_price > 0 try: price_data = self.client.inquire_price(code) except Exception as e: logger.warning(f"해외 현재가 조회 실패({code}): {e}") return {"code": code, "name": name, "error": "현재가 조회 실패"} if not price_data: return {"code": code, "name": name, "error": "현재가 조회 실패"} try: current_price = abs(float(price_data.get("stck_prpr") or 0)) prdy_ctrt_raw = (price_data.get("prdy_ctrt") or "0").strip().replace("+", "").replace("%", "") try: day_change_pct = float(prdy_ctrt_raw) except Exception: day_change_pct = 0.0 except Exception as e: return {"code": code, "name": name, "error": "가격 파싱 실패"} if current_price <= 0: return {"code": code, "name": name, "error": "현재가 없음"} profit_val = (current_price - avg_price) * qty if is_holding else 0.0 profit_pct = (current_price - avg_price) / avg_price * 100 if is_holding else 0.0 if profit_pct > 50: verdict_label, verdict_reason = "🎯 전량 익절", "고점 도달" elif profit_pct > 20: verdict_label, verdict_reason = "분할 익절", "수익 일부 실현" elif profit_pct < -20: verdict_label, verdict_reason = "🚨 손절 검토 (확률 65%)", "손실 크고 하락 추세. 추가 손실 가능" elif profit_pct < -10: verdict_label, verdict_reason = "⚠️ 손절 고려 (확률 55%)", "하락세 + 점수 낮음. 손절 후 바닥에서 재진입 검토" elif profit_pct > 30: verdict_label, verdict_reason = "🎯 익절 권장", "큰 수익 + 하락 전환" else: verdict_label, verdict_reason = "관망", "추가 관찰" return { "code": code, "name": name, "current_price": current_price, "day_change_pct": day_change_pct, "avg_price": avg_price, "qty": qty, "profit_val": profit_val, "profit_pct": profit_pct, "per": None, "peg": None, "rsi": None, "volatility": None, "ma_arrangement": "데이터없음", "investor_text": "데이터 없음 (해외종목 또는 미제공)", "score": 50, "verdict_label": verdict_label, "verdict_reason": verdict_reason, } def _format_one_stock_block( self, item: Dict, analysis: Dict, ai_text: str, currency: str = "KRW", ) -> str: """ 한 종목의 linux_bot 형식 블록 생성. 💰 수익률 → 📊 외인/기관 → 📈 차트 → 💼 밸류 → 🎯 판정 → 🤖 AI 분석 """ name = analysis.get("name", item.get("name", "")) code = analysis.get("code", item.get("code", "")) if analysis.get("error"): return f"{name} ({code}): {analysis['error']}\n" is_krw = currency == "KRW" unit = "원" if is_krw else " USD" profit_val = analysis.get("profit_val", 0) profit_pct = analysis.get("profit_pct", 0) qty = analysis.get("qty", 0) avg_price = analysis.get("avg_price", 0) current_price = analysis.get("current_price", 0) day_change_pct = analysis.get("day_change_pct", 0) # 💰 수익률 (보유 시) or 현재가 (관심만) if qty > 0 and avg_price > 0: if is_krw: line_profit = ( f"💰 수익률: {profit_val:+,.0f}{unit} | " f"보유 {profit_pct:+.1f}% ({profit_val:+,.0f}{unit}) | " f"전일대비 {day_change_pct:+.2f}%" ) else: line_profit = ( f"💰 수익률: {profit_val:+.0f}{unit} | " f"보유 {profit_pct:+.1f}% ({profit_val:+.0f}{unit}) | " f"전일대비 {day_change_pct:+.2f}%" ) else: if is_krw: line_profit = f"💰 현재가: {current_price:,.0f}{unit} | 전일대비 {day_change_pct:+.2f}%" else: line_profit = f"💰 현재가: {current_price:.2f}{unit} | 전일대비 {day_change_pct:+.2f}%" # 📊 외인/기관 line_inv = f"📊 외인/기관: {analysis.get('investor_text', '데이터 없음')}" # 📈 차트: RSI | 변동성 | 정배열/역배열 rsi = analysis.get("rsi") rsi_comment = analysis.get("rsi_comment", "적정") vol = analysis.get("volatility") arr = analysis.get("ma_arrangement", "") if rsi is not None: rsi_str = f"RSI {rsi:.1f} ({rsi_comment})" else: rsi_str = "RSI -" vol_str = f"{vol:.2f}%" if vol is not None else "-" line_chart = f"📈 차트: {rsi_str} | 변동성 {vol_str} | {arr}" # 💼 밸류: PER / PEG (없으면 줄 생략) per = analysis.get("per") peg = analysis.get("peg") per_high = get_env_float("PER_HIGH_THRESHOLD", 30.0) per_fair = get_env_float("PER_FAIR_THRESHOLD", 15.0) peg_great = get_env_float("PEG_GREAT_THRESHOLD", 1.0) line_value = None if per is not None: per_comment = "높음" if per > per_high else ("적정" if per > per_fair else "저평가") line_value = f"💼 밸류: PER {per:.1f} ({per_comment})" if peg is not None: peg_comment = "저평가" if peg < peg_great else "적정" line_value += f" | PEG {peg:.2f} ({peg_comment})" # 🎯 판정 score = analysis.get("score", 0) vlabel = analysis.get("verdict_label", "") vreason = analysis.get("verdict_reason", "") line_verdict = f"🎯 판정: {score}점 → {vlabel}\n └ {vreason}" # 🤖 AI 분석 line_ai = "🤖 AI 분석\n" + (ai_text.strip() if ai_text else " (분석 없음)") parts = [f"**{name} ({code})**", line_profit, "", line_inv, ""] parts.append(line_chart) if line_value: parts.append(line_value) parts.extend([line_verdict, "", line_ai, ""]) return "\n".join(parts) def _get_ai_4lines( self, name: str, code: str, analysis: Dict, trade_summary: Optional[Dict], news_summary: str, ) -> str: """ Gemini로 종목별 장기 투자 관점 4줄 생성: 📌 수치/흐름 / 📰 뉴스 관점 / 💡 판단 / ⚠️ 리스크 """ if not gemini_model: return " (Gemini 미설정)" try: rsi = analysis.get("rsi") vol = analysis.get("volatility") arr = analysis.get("ma_arrangement", "") per = analysis.get("per") peg = analysis.get("peg") score = analysis.get("score") verdict = analysis.get("verdict_label", "") profit_pct = analysis.get("profit_pct", 0) investor = analysis.get("investor_text", "") trade_str = "" if trade_summary: t = trade_summary trade_str = ( f"최근 매매: {t['total']}건, 승률 {t['wins']}/{t['total']}, " f"평균 수익률 {t['avg_profit']:.1f}%, 주요 사유 {[r[0] for r in t['top_reasons']]}." ) prompt = f"""다음 종목에 대해 장기 투자 관점으로 **4줄**만 작성하세요. 각 줄은 반드시 지정 접두사로 시작하세요. 종목: {name} ({code}) - RSI: {rsi} | 변동성: {vol}% | 추세: {arr} - PER: {per} | PEG: {peg} | 판정: {score}점 → {verdict} - 보유 수익률: {profit_pct:.1f}% | 외인/기관: {investor} {trade_str} 뉴스/시장 요약: {news_summary[:500] if news_summary else '없음'} 정확히 4줄, 아래 형식으로만 작성(한글): 📌 수치/흐름: (RSI·변동성·추세 한 문장) 📰 뉴스 관점: (호재/악재·수급 한 문장) 💡 판단: (매수/관망/익절/손절 중 하나 + 이유 한 문장) ⚠️ 리스크: (주요 리스크 한 문장) """ response = gemini_model.generate_content(prompt) if response and response.text: return " " + response.text.strip().replace("\n", "\n ") except Exception as e: logger.warning(f"AI 4줄 분석 실패({code}): {e}") return " (AI 분석 생성 실패)" # ------------------------------------------------------------------ # linux_bot 형식 장기 투자 체크 리포트 (핵심 메서드) # ------------------------------------------------------------------ def send_long_check_report(self, when_label: str = "정기") -> None: """ linux_bot 형식으로 전 종목 장기 투자 체크 리포트 생성 후 MM 전송. 종목당 블록: 💰 수익률 → 📊 외인/기관 → 📈 차트 → 💼 밸류 → 🎯 판정 → 🤖 AI 4줄 """ items = self._get_watch_items() max_ai = get_env_int("LONG_AI_MAX_STOCKS", 20) ai_recent_trades = get_env_int("LONG_AI_RECENT_TRADES", 10) # 뉴스 요약 수집 (AI 프롬프트에 시장 맥락으로 활용) news_summary = "" try: if self.news_analyzer and getattr(self.news_analyzer, "client", None): news_list = self.news_analyzer.crawl_naver_finance_news(max_news=3) if news_list: analysis = self.news_analyzer.analyze_news_with_claude(news_list) if analysis and isinstance(analysis, dict): raw = analysis.get("summary") or analysis.get("market_outlook") or "" news_summary = (raw[:800] if isinstance(raw, str) else str(raw)[:800]) except Exception as e: logger.debug(f"뉴스 요약 수집 실패(무시): {e}") blocks: List[str] = [] for i, item in enumerate(items): code = (item.get("code") or "").strip() name = item.get("name", code) if not code: continue # 국내 vs 해외 분기 if self._is_domestic_code(code): analysis = self._full_stock_analysis(code, item) currency = "KRW" else: analysis = self._minimal_overseas_analysis(code, item) currency = "USD" if analysis is None: blocks.append(f"{name} ({code}): 분석 실패\n") continue if analysis.get("error"): blocks.append(f"{name} ({code}): {analysis['error']}\n") continue # AI 4줄 분석 (상위 max_ai 종목만, Gemini 있을 때) trade_summary = self._get_trade_history_summary(code, max_trades=ai_recent_trades) if i < max_ai and gemini_model: ai_text = self._get_ai_4lines(name, code, analysis, trade_summary, news_summary) else: ai_text = " (AI 분석 생략)" block = self._format_one_stock_block(item, analysis, ai_text, currency=currency) blocks.append(block) # API Rate Limit 방지용 딜레이 (env 기반) try: if self.analysis_delay_max > 0: time.sleep(random.uniform(self.analysis_delay_min, self.analysis_delay_max)) except Exception: pass title = f"📊 장기 투자 체크 (한투 KIS API + AI 분석)" separator = "━" * 30 msg = f"{title}\n{separator}\n\n" + f"\n{separator}\n\n".join(b.strip() for b in blocks if b.strip()) try: self.mm.send(self.mm_channel, msg) logger.info(f"📊 롱봇V2 장기 투자 체크 리포트 전송 완료 ({len(blocks)}개 종목)") except Exception as e: logger.error(f"장기 투자 체크 리포트 전송 실패: {e}") # ------------------------------------------------------------------ # 뉴스 분석/알림 # ------------------------------------------------------------------ def _should_run_news_now(self, now: Optional[dt] = None) -> bool: if not self.news_enabled: return False now = now or dt.now() h = now.hour # 활동 시간대만 (예: 9~23시) if self.news_active_start_hour <= self.news_active_end_hour: return self.news_active_start_hour <= h < self.news_active_end_hour # 야간 랩어라운드 형태 (예: 20~5시)도 지원 return h >= self.news_active_start_hour or h < self.news_active_end_hour def _collect_watchlist_names(self) -> Dict[str, str]: m: Dict[str, str] = {} for it in self._get_watch_items(): code = it["code"] name = it.get("name", code) if code: m[code] = name return m def run_news_cycle_once(self) -> None: """뉴스 한 사이클 (크롤링 → Claude 분석 → watchlist 관련 이슈만 필터링 → MM 전송).""" if not self._should_run_news_now(): return if not self.news_analyzer or not getattr(self.news_analyzer, "client", None): logger.debug("뉴스 분석 비활성 (API 키/클라이언트 없음)") return try: logger.info("📰 [롱봇V2] 뉴스 분석 사이클 시작") news_list = self.news_analyzer.crawl_naver_finance_news(max_news=self.news_max_count) if not news_list: logger.info("📰 [롱봇V2] 크롤링된 뉴스 없음") return analysis = self.news_analyzer.analyze_news_with_claude(news_list) if not analysis: logger.info("📰 [롱봇V2] 뉴스 AI 분석 결과 없음") return watch_map = self._collect_watchlist_names() watch_codes = set(watch_map.keys()) related: List[Dict] = [] for stock in analysis.get("recommended_stocks", []): code = (stock.get("code") or "").strip() if code and code in watch_codes: related.append( { "code": code, "name": watch_map.get(code, stock.get("name", "")), "reason": stock.get("reason", ""), } ) mm_msg = self.news_analyzer.format_analysis_for_mattermost(analysis, news_list) if not mm_msg: logger.info("📰 [롱봇V2] 포맷된 뉴스 메시지 없음") return if related: extra_lines = ["\n**🎯 위시리스트 관련 종목**"] for r in related: extra_lines.append(f"- `{r['code']}` {r['name']}: {r['reason']}") mm_msg += "\n" + "\n".join(extra_lines) else: mm_msg += "\n_현재 위시리스트와 직접 매칭된 종목은 없음 (전체 시장 참고용)_" self.mm.send(self.mm_channel, mm_msg) logger.info(f"📰 [롱봇V2] 뉴스 분석 알림 전송 완료 (관련 종목 {len(related)}개)") except Exception as e: logger.error(f"❌ 롱봇V2 뉴스 분석 사이클 실패: {e}") # ------------------------------------------------------------------ # 메인 루프 # ------------------------------------------------------------------ async def _report_scheduler_loop(self) -> None: """장 시작/마감 시각에 맞춰 하루 2회 장기 투자 체크 리포트 전송.""" logger.info( f"📅 롱봇V2 리포트 스케줄러 시작 (오전 {self.report_am_hour:02d}:{self.report_am_min:02d}, " f"마감 {self.report_pm_hour:02d}:{self.report_pm_min:02d})" ) # 시작 직후 잔고 API 연타 방지 (숏봇·다른 프로세스와 같은 앱키 공유 시 EGW00201 완화) await asyncio.sleep(5) last_date = "" while True: try: now = dt.now() today = now.strftime("%Y-%m-%d") if today != last_date: last_date = today self.morning_report_sent = False self.closing_report_sent = False self.today_date = today if ( not self.morning_report_sent and now.hour == self.report_am_hour and now.minute == self.report_am_min ): self.send_long_check_report("장 시작") self.morning_report_sent = True if ( not self.closing_report_sent and now.hour == self.report_pm_hour and now.minute == self.report_pm_min ): self.send_long_check_report("장 마감") self.closing_report_sent = True except Exception as e: logger.error(f"리포트 스케줄러 루프 에러: {e}") await asyncio.sleep(30) async def _news_loop(self) -> None: """뉴스 감시 루프 (기본 1시간 간격, env로 조절).""" if not self.news_enabled: logger.info("📰 롱봇V2 뉴스 감시 비활성화 (LONG_NEWS_ENABLED=false)") return interval = self.news_interval_min if self.news_interval_min > 0 else 60 logger.info(f"📰 롱봇V2 뉴스 감시 시작 (interval={interval}분)") while True: try: if self._should_run_news_now(): self.run_news_cycle_once() else: logger.debug("📰 롱봇V2 뉴스 감시: 비활성 시간대 (sleep)") except Exception as e: logger.error(f"뉴스 감시 루프 에러: {e}") await asyncio.sleep(interval * 60) def run(self) -> None: """동기 진입점: asyncio 루프에서 리포트/뉴스 태스크를 함께 실행.""" async def _main(): tasks = [ asyncio.create_task(self._report_scheduler_loop()), asyncio.create_task(self._news_loop()), ] await asyncio.gather(*tasks) try: asyncio.run(_main()) except KeyboardInterrupt: logger.info("💤 롱봇V2 종료 (KeyboardInterrupt)") except Exception as e: logger.error(f"💥 롱봇V2 메인 루프 에러: {e}") def main(): logger.info("🚀 KIS Long Watch Bot Ver2 시작") bot = LongWatchBotV2() bot.run() if __name__ == "__main__": main()