""" mm_butler.py — 매터모스트 명령 허브 (상주 데몬) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 역할: 매터모스트 채널에서 !명령어를 폴링하여 즉시 실행. kis_short_ver2 / kis_long_ver2 봇과 완전히 독립적으로 상주. 지원 명령어 (기본): !도움말 — 전체 명령어 목록 !클로드분석 — Claude AI 로 단타봇 거래 분석 + 수치 추천 (소스코드 포함) !애미분석 — Gemini AI 로 단타봇 거래 분석 + 수치 추천 (소스코드 포함) !오픈분석 — OpenRouter 경유 모델로 단타봇 거래 분석 + 수치 추천 !뉴스 — 네이버 금융 뉴스 AI 분석 (기본: Gemini, 위시리스트 관련 필터링 포함) !클로드뉴스 — Claude 로 뉴스 AI 분석 !오픈뉴스 — OpenRouter 로 뉴스 AI 분석 !적용 — 마지막 AI 추천 수치 전체 DB 반영 !설정 KEY=VALUE — 단일 설정값 DB 반영 !분석기록 [N] — 저장된 분석 기록 목록/상세 (나중에 '뭐라고 했지' 꺼내보기) 확장: 기능이 생길 때마다 _register_commands() 안에 핸들러 함수 하나 추가하면 됩니다. 다른 파일에서 임포트 후 register_command() 로도 동적 등록 가능합니다. 실행: python mm_butler.py # 포그라운드 또는 systemd 서비스로 등록 """ from __future__ import annotations import json import logging import os import re import subprocess import threading import time from pathlib import Path from typing import Callable, Dict, Optional, Tuple import requests # ------------------------------------------------------------------ # 내부 모듈 (kis_long_ver1 공용 함수·DB 재사용) # ------------------------------------------------------------------ from database import TradeDB, ENV_CONFIG_KEYS from kis_long_ver1 import ( get_env_from_db, get_env_int, get_env_float, get_env_bool, MM_SERVER_URL, MM_BOT_TOKEN, MM_CONFIG_FILE, SCRIPT_DIR, db as shared_db, ) from kis_long_ver2 import LongWatchBotV2 from news_analyzer import NewsAnalyzer # ------------------------------------------------------------------ # Claude 초기화 (클로드분석용) # ------------------------------------------------------------------ try: import anthropic _CLAUDE_AVAILABLE = True except ImportError: _CLAUDE_AVAILABLE = False CLAUDE_API_KEY = get_env_from_db("ANTHROPIC_API_KEY", "").strip() CLAUDE_MODEL_ID = get_env_from_db("CLAUDE_MODEL_ID", "claude-sonnet-4-5").strip() or "claude-sonnet-4-5" CLAUDE_MAX_TOKENS = get_env_int("CLAUDE_MAX_TOKENS", 8192) claude_client: Optional["anthropic.Anthropic"] = None if _CLAUDE_AVAILABLE and CLAUDE_API_KEY: try: claude_client = anthropic.Anthropic(api_key=CLAUDE_API_KEY) except Exception as _e: claude_client = None # ------------------------------------------------------------------ # Gemini 초기화 (애미분석용) google.genai 신규 SDK # ------------------------------------------------------------------ try: import google.genai as genai _GEMINI_AVAILABLE = True except ImportError: _GEMINI_AVAILABLE = False GEMINI_API_KEY = get_env_from_db("GEMINI_API_KEY", "").strip() GEMINI_MODEL_ID = get_env_from_db("GEMINI_MODEL_ID", "gemini-2.5-flash").strip() or "gemini-2.5-flash" gemini_client = None if _GEMINI_AVAILABLE and GEMINI_API_KEY: try: gemini_client = genai.Client(api_key=GEMINI_API_KEY) except Exception as _e: gemini_client = None # ------------------------------------------------------------------ # OpenRouter 초기화 (공용 분석/뉴스용) # ------------------------------------------------------------------ OPENROUTER_API_KEY = get_env_from_db("OPENROUTER_API_KEY", "").strip() OPENROUTER_MODEL_ID = get_env_from_db("OPENROUTER_MODEL_ID", "anthropic/claude-4.5-sonnet").strip() or "anthropic/claude-4.5-sonnet" # ------------------------------------------------------------------ # 로깅 # ------------------------------------------------------------------ logging.basicConfig( format="[%(asctime)s][%(name)s] %(message)s", datefmt="%H:%M:%S", level=logging.INFO, ) logger = logging.getLogger("MMButler") # ================================================================== # 유틸리티 함수 # ================================================================== def _get_env_numeric_snapshot(db: TradeDB) -> str: """DB 최신 env에서 계좌/키/토큰/URL 제외한 수치·설정만 반환 (KEY=값 줄 단위).""" EXCLUDE = { "MM_SERVER_URL", "MM_BOT_TOKEN_", "MATTERMOST_CHANNEL", "GEMINI_API_KEY", "GEMINI_MODEL_ID", "ANTHROPIC_API_KEY", "CLAUDE_API_KEY", "CLAUDE_MODEL_ID", "OPENROUTER_API_KEY", "OPENROUTER_MODEL_ID", "KIS_APP_KEY_REAL", "KIS_APP_SECRET_REAL", "KIS_APP_KEY_MOCK", "KIS_APP_SECRET_MOCK", "KIS_ACCOUNT_NO_REAL", "KIS_ACCOUNT_CODE_REAL", "KIS_ACCOUNT_NO_MOCK", "KIS_ACCOUNT_CODE_MOCK", "KIS_SHORT_MM_CHANNEL", "KIS_LONG_MM_CHANNEL", "MM_BUTLER_CHANNEL", } latest = db.get_latest_env() if not latest or not latest.get("snapshot"): return "" snap = latest["snapshot"] lines = [] for k, v in sorted(snap.items()): if k in EXCLUDE or v is None: continue v = str(v or "").strip() if "#" in v: v = v.split("#")[0].strip() if not v: continue lines.append(f"{k}={v}") return "\n".join(lines) def _get_journalctl_recent(lines: int = 500, unit: Optional[str] = None) -> str: """journalctl 최근 N줄. unit 있으면 -u unit 적용.""" cmd = ["journalctl", "-n", str(lines), "-o", "short-iso"] if unit: cmd = ["journalctl", "-u", unit, "-n", str(lines), "-o", "short-iso"] try: r = subprocess.run(cmd, capture_output=True, text=True, timeout=10) if r.returncode == 0 and r.stdout: return r.stdout.strip() except Exception as e: logger.warning("journalctl 조회 실패: %s", e) return "" def _save_ai_recommendations(db: TradeDB, analysis_text: str) -> None: """AI 분석문에서 'KEY=값' 추천 줄만 추출해 DB에 저장 (!적용 시 사용).""" if not analysis_text: return valid_keys = set(ENV_CONFIG_KEYS) lines = [] for line in analysis_text.splitlines(): line = line.strip() if not line or line.startswith("#"): continue m = re.match(r"^([A-Z][A-Z0-9_]*)=(.+)$", line) if m and m.group(1) in valid_keys: lines.append(f"{m.group(1)}={m.group(2).strip()}") if lines: db.set_last_ai_recommendations("\n".join(lines)) def _read_bot_source() -> str: """ kis_short_ver2.py 소스 전체를 읽어 반환. AI가 매매 로직을 이해하고 정확한 변수명·수치로 추천하기 위해 프롬프트에 첨부. AI_SOURCE_MAX_CHARS(env/DB) 로 최대 길이 제한 (기본 120,000자 ≈ Claude 30k 토큰). """ src_path = SCRIPT_DIR / "kis_short_ver2.py" if not src_path.exists(): return "(kis_short_ver2.py 파일 없음)" try: max_chars = get_env_int("AI_SOURCE_MAX_CHARS", 120000) content = src_path.read_text(encoding="utf-8") if len(content) > max_chars: content = content[:max_chars] + f"\n\n...(이하 {len(content) - max_chars:,}자 생략)..." return content except Exception as e: logger.warning("kis_short_ver2.py 읽기 실패: %s", e) return f"(소스 읽기 실패: {e})" def _build_analyze_context(db: TradeDB) -> Dict: """ 분석에 필요한 공통 컨텍스트 수집: env 수치, journalctl 로그, 최근 거래 내역, 유니버스 후보 수, 소스코드. 두 AI 핸들러가 이 함수를 공유하여 중복 코드 제거. """ env_lines = _get_env_numeric_snapshot(db) log_lines_cnt = get_env_int("AI_JOURNAL_LINES", 500) journal_unit = os.environ.get("JOURNALCTL_UNIT", "").strip() or None journal_log = _get_journalctl_recent(lines=log_lines_cnt, unit=journal_unit) or "(journalctl 로그 없음)" recent_trades = [] try: cursor = db.conn.execute( """ SELECT code, name, buy_price, sell_price, qty, profit_rate, realized_pnl, strategy, sell_reason, buy_date, sell_date, hold_minutes FROM trade_history ORDER BY id DESC LIMIT 10 """ ) for row in cursor.fetchall(): recent_trades.append({ "code": row[0], "name": row[1], "buy_price": row[2], "sell_price": row[3], "qty": row[4], "profit_rate": row[5], "realized_pnl": row[6], "strategy": row[7], "sell_reason": row[8], "buy_date": row[9], "sell_date": row[10], "hold_minutes": row[11] or 0, }) except Exception as e: logger.error("거래 내역 조회 실패: %s", e) try: row = db.conn.execute( "SELECT COUNT(*) FROM target_candidates WHERE scan_date = date('now')" ).fetchone() candidate_count = row[0] if row else 0 except Exception: candidate_count = 0 bot_source = _read_bot_source() return { "env_lines": env_lines, "journal_log": journal_log, "log_lines_cnt": log_lines_cnt, "recent_trades": recent_trades, "candidate_count": candidate_count, "bot_source": bot_source, } def _build_analyze_prompt(ctx: Dict) -> Tuple[str, str]: """ 컨텍스트로부터 AI 프롬프트 문자열 생성. kis_short_ver2.py 소스코드를 포함하여 AI가 변수명·로직을 정확히 파악하도록 함. Returns: (prompt 문자열, summary 문자열) """ env_lines = ctx["env_lines"] journal_log = ctx["journal_log"] log_lines_cnt = ctx["log_lines_cnt"] recent_trades = ctx["recent_trades"] candidate_count = ctx["candidate_count"] bot_source = ctx["bot_source"] source_section = f""" **단타봇 전체 소스코드 (kis_short_ver2.py) — 로직·변수명 참고용** ```python {bot_source} ``` """ if not recent_trades: summary = f"- 유니버스 후보: {candidate_count}개\n- 최근 거래: 없음" prompt = f"""당신은 퀀트 트레이딩 전문가입니다. 아래 단타봇 소스코드를 읽고 로직을 완전히 이해한 뒤, 설정 수치와 로그를 바탕으로 분석해 주세요. {source_section} **현재 상태** - 유니버스 후보: {candidate_count}개 - 최근 거래: 없음 **현재 DB 설정 수치 (계좌/키 제외)** ``` {env_lines} ``` **봇 최근 로그 (journalctl 최근 {log_lines_cnt}줄)** ``` {journal_log[:12000]} ``` **당신의 임무** 1. 소스코드의 매수 조건·필터 로직을 파악하고, 현재 설정이 너무 엄격하거나 느슨한 부분을 찾아 문제점 분석. 2. **추천**: 반드시 KEY=값 한 줄에 하나. 이유·주석 금지. 그대로 DB 복붙 가능해야 함. 변수명은 반드시 소스코드에 실제로 존재하는 것만 사용할 것. 3. 예상 효과 한두 줄. **출력 형식 (반드시 준수)** ## 🔍 문제점 1. [소스 로직 기반 구체적 문제 1] 2. [소스 로직 기반 구체적 문제 2] ## 💡 수치 추천 (KEY=값, 한 줄에 하나) KEY=값 (필요한 것만) ## 📈 예상 효과 - [효과] """ else: total = len(recent_trades) wins = sum(1 for t in recent_trades if t["profit_rate"] > 0) win_rate = wins / total * 100 if total else 0 avg_profit = sum(t["profit_rate"] for t in recent_trades) / total total_pnl = sum(t["realized_pnl"] for t in recent_trades) avg_hold = sum(t["hold_minutes"] for t in recent_trades) / total trades_text = "" for i, t in enumerate(recent_trades, 1): trades_text += ( f"\n[거래 {i}] {t['name']} ({t['strategy']})\n" f"- 매수: {t['buy_price']:,.0f}원 × {t['qty']}주 | 매도: {t['sell_price']:,.0f}원\n" f"- 손익: {t['profit_rate']:+.2f}% ({t['realized_pnl']:,.0f}원) | 보유: {t['hold_minutes']}분\n" f"- 사유: {t['sell_reason']}\n" ) summary = ( f"- 유니버스 후보: {candidate_count}개\n" f"- 최근 거래: {total}건 | 승률: {win_rate:.1f}%\n" f"- 평균 수익률: {avg_profit:.2f}% | 총 손익: {total_pnl:+,.0f}원" ) prompt = f"""당신은 퀀트 트레이딩 전문가입니다. 아래 단타봇 소스코드를 읽고 로직을 완전히 이해한 뒤, 거래 내역·설정 수치를 분석해 주세요. {source_section} **현재 상태** - 유니버스 후보: {candidate_count}개 - 최근 거래: {total}건 | 승률: {win_rate:.1f}% ({wins}승 {total - wins}패) - 평균 수익률: {avg_profit:.2f}% | 총 손익: {total_pnl:+,.0f}원 | 평균 보유: {avg_hold:.0f}분 **최근 거래 내역** {trades_text} **현재 DB 설정 수치 (계좌/키 제외)** ``` {env_lines} ``` **봇 최근 로그 (journalctl 최근 {log_lines_cnt}줄)** ``` {journal_log[:12000]} ``` **당신의 임무** 1. 소스코드의 매수·매도 로직과 거래 내역을 대조해 승률 하락 원인을 3가지 구체적으로 진단. (예: 어느 함수의 어떤 조건이 문제인지 명시) 2. **추천**: KEY=값 한 줄에 하나. 이유·주석 금지. DB 복붙 가능하게. 변수명은 반드시 소스코드에 실제로 존재하는 것만 사용할 것. 3. 예상 효과 한두 줄. **출력 형식 (반드시 준수)** ## 🔍 문제점 (승률 하락 원인) 1. [소스 로직 기반 구체적 문제 1] 2. [소스 로직 기반 구체적 문제 2] 3. [소스 로직 기반 구체적 문제 3] ## 💡 수치 추천 (KEY=값, 한 줄에 하나) KEY=값 (필요한 것만) ## 📈 예상 효과 - [효과] """ return prompt, summary def _call_claude(prompt: str) -> str: """Claude API 호출 후 텍스트 반환. 실패 시 오류 문자열 반환.""" if not claude_client: return "❌ Claude API 미설정 (ANTHROPIC_API_KEY 확인)" try: max_tok = get_env_int("CLAUDE_MAX_TOKENS", 8192) model = get_env_from_db("CLAUDE_MODEL_ID", CLAUDE_MODEL_ID).strip() or CLAUDE_MODEL_ID response = claude_client.messages.create( model=model, max_tokens=max_tok, messages=[{"role": "user", "content": prompt}], ) return response.content[0].text if response.content else "(응답 없음)" except Exception as e: logger.error("Claude 호출 실패: %s", e) return f"❌ Claude 호출 실패: {e}" def _call_gemini(prompt: str) -> str: """Gemini API 호출 후 텍스트 반환. 실패 시 오류 문자열 반환.""" if not gemini_client: return "❌ Gemini API 미설정 (GEMINI_API_KEY 확인)" try: model = get_env_from_db("GEMINI_MODEL_ID", GEMINI_MODEL_ID).strip() or GEMINI_MODEL_ID response = gemini_client.models.generate_content(model=model, contents=prompt) return ( getattr(response, "text", None) or (response.candidates[0].content.parts[0].text if response.candidates else "(응답 없음)") ) except Exception as e: logger.error("Gemini 호출 실패: %s", e) return f"❌ Gemini 호출 실패: {e}" def _call_openrouter(prompt: str, model: Optional[str] = None) -> str: """OpenRouter API 호출 후 텍스트 반환. 실패 시 오류 문자열 반환.""" if not OPENROUTER_API_KEY: return "❌ OpenRouter API 미설정 (OPENROUTER_API_KEY 확인)" use_model = (model or get_env_from_db("OPENROUTER_MODEL_ID", OPENROUTER_MODEL_ID)).strip() or OPENROUTER_MODEL_ID try: headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", } payload = { "model": use_model, "messages": [{"role": "user", "content": prompt}], } r = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload, timeout=60) r.raise_for_status() data = r.json() choice = (data.get("choices") or [{}])[0] message = choice.get("message", {}) content = message.get("content") if isinstance(content, list): # 일부 OpenAI 호환 구현은 content를 조각 리스트로 반환 text = "".join( (part.get("text", "") if isinstance(part, dict) else str(part)) for part in content ) else: text = content or "" return text or "(응답 없음)" except Exception as e: logger.error("OpenRouter 호출 실패: %s", e) return f"❌ OpenRouter 호출 실패: {e}" # ================================================================== # 핸들러 함수들 (각 !명령어에 대응, CommandHub 에 등록) # ================================================================== def handler_help(args: str, db: TradeDB, hub: "CommandHub") -> str: """!도움말 — 등록된 모든 명령어와 설명 출력.""" lines = ["**🤖 MM Butler 명령어 목록**", ""] for cmd, (_, desc) in sorted(hub.registry.items()): lines.append(f"- `!{cmd}` — {desc}") lines.append("") lines.append("_채널에서 위 명령어를 입력하면 즉시 실행됩니다._") return "\n".join(lines) def handler_claude_analyze(args: str, db: TradeDB, hub: "CommandHub") -> str: """ !클로드분석 — Claude AI 로 단타봇 거래 분석 + 수치 추천. kis_short_ver2.py 전체 소스를 프롬프트에 포함하여 로직 기반의 정밀 분석. """ if not claude_client: return "❌ Claude API 키가 설정되어 있지 않습니다. `ANTHROPIC_API_KEY` 를 DB에 추가해 주세요." try: ctx = _build_analyze_context(db) except Exception as e: return f"❌ 컨텍스트 수집 실패: {e}" prompt, summary = _build_analyze_prompt(ctx) analysis = _call_claude(prompt) _save_ai_recommendations(db, analysis) db.insert_ai_analysis_log("claude", summary, analysis) return ( "🤖 **[클로드 분석]**\n\n" f"📊 **현재 상태**\n{summary}\n\n" f"{analysis}\n\n" "---\n_추천 수치는 `!적용` 으로 DB에 즉시 반영할 수 있습니다._" ) def handler_gemini_analyze(args: str, db: TradeDB, hub: "CommandHub") -> str: """ !애미분석 — Gemini AI 로 단타봇 거래 분석 + 수치 추천. kis_short_ver2.py 전체 소스를 프롬프트에 포함하여 로직 기반의 정밀 분석. Gemini 2.5 Flash는 100만 토큰 컨텍스트로 긴 소스도 부담 없이 처리. """ if not gemini_client: return "❌ Gemini API 키가 설정되어 있지 않습니다. `GEMINI_API_KEY` 를 DB에 추가해 주세요." try: ctx = _build_analyze_context(db) except Exception as e: return f"❌ 컨텍스트 수집 실패: {e}" prompt, summary = _build_analyze_prompt(ctx) analysis = _call_gemini(prompt) _save_ai_recommendations(db, analysis) db.insert_ai_analysis_log("gemini", summary, analysis) return ( "🤖 **[애미(Gemini) 분석]**\n\n" f"📊 **현재 상태**\n{summary}\n\n" f"{analysis}\n\n" "---\n_추천 수치는 `!적용` 으로 DB에 즉시 반영할 수 있습니다._" ) def handler_openrouter_analyze(args: str, db: TradeDB, hub: "CommandHub") -> str: """ !오픈분석 — OpenRouter 경유 모델로 단타봇 거래 분석 + 수치 추천. kis_short_ver2.py 전체 소스를 프롬프트에 포함하여 로직 기반의 정밀 분석. 기본 모델은 env/DB의 OPENROUTER_MODEL_ID (기본값: anthropic/claude-3.5-sonnet). """ if not OPENROUTER_API_KEY: return "❌ OpenRouter API 키가 설정되어 있지 않습니다. `OPENROUTER_API_KEY` 를 DB에 추가해 주세요." try: ctx = _build_analyze_context(db) except Exception as e: return f"❌ 컨텍스트 수집 실패: {e}" prompt, summary = _build_analyze_prompt(ctx) analysis = _call_openrouter(prompt) _save_ai_recommendations(db, analysis) db.insert_ai_analysis_log("openrouter", summary, analysis) return ( "🤖 **[오픈(OpenRouter) 분석]**\n\n" f"📊 **현재 상태**\n{summary}\n\n" f"{analysis}\n\n" "---\n_추천 수치는 `!적용` 으로 DB에 즉시 반영할 수 있습니다._" ) def _analyze_news_with_backend(news_list: list, backend: str) -> Optional[Dict]: """ 공통 뉴스 리스트를 받아 backend(gemini/claude/openrouter)로 분석 요청. JSON 파싱까지 수행하여 dict 반환. """ if not news_list: return None news_titles = "\n".join([f"- {item['title']}" for item in news_list]) prompt = f"""다음은 오늘의 주요 금융 뉴스 제목들입니다: {news_titles} 이 뉴스들을 분석하여 다음 정보를 JSON 형식으로 제공해주세요: 1. summary: 오늘의 주요 이슈를 2-3문장으로 요약 2. sectors: 관련 업종 리스트 (최대 3개, 예: ["반도체", "AI", "자동차"]) 3. sentiment: 전반적 시장 분위기 (positive/neutral/negative) 4. recommended_stocks: 관련 주요 종목 (최대 3개) - code: 종목코드 (6자리) - name: 종목명 - reason: 추천 이유 (한 줄) 반드시 유효한 JSON 형식으로만 응답하세요. 설명 없이 JSON만 출력하세요. """ if backend == "gemini": raw = _call_gemini(prompt) elif backend == "claude": raw = _call_claude(prompt) elif backend == "openrouter": raw = _call_openrouter(prompt) else: return None if not raw or raw.startswith("❌"): return None import json as _json text = raw.strip() if text.startswith("```"): parts = text.split("```") if len(parts) >= 2: text = parts[1] if text.lstrip().startswith("json"): text = text.lstrip()[4:] text = text.strip() try: result = _json.loads(text) return result if isinstance(result, dict) else None except Exception as e: logger.error("뉴스 JSON 파싱 실패(%s): %r", backend, e) return None def _run_news_with_backend(db: TradeDB, backend: str) -> str: """뉴스 크롤링 + 지정 백엔드로 분석 + 위시리스트 필터링까지 공통 처리.""" analyzer = NewsAnalyzer() try: max_news = get_env_int("NEWS_MAX_COUNT", 5) news_list = analyzer.crawl_naver_finance_news(max_news=max_news) if not news_list: return "📰 크롤링된 뉴스가 없습니다." analysis = _analyze_news_with_backend(news_list, backend) if not analysis: return "📰 뉴스 AI 분석 결과가 없습니다." # 위시리스트 관련 종목 필터링 watch_map: Dict[str, str] = {} try: watchlist_path = SCRIPT_DIR / "long_term_watchlist.json" if watchlist_path.exists(): with open(watchlist_path, "r", encoding="utf-8") as f: obj = json.load(f) for it in (obj.get("items", []) if isinstance(obj, dict) else []): code = (it.get("code") or "").strip() if code: watch_map[code] = it.get("name", code) active = db.get_active_trades(strategy_prefix="LONG") for code, trade in active.items(): watch_map.setdefault(code, trade.get("name", code)) except Exception as e: logger.debug("위시리스트 로드 실패(무시): %s", e) related = [] for stock in analysis.get("recommended_stocks", []): code = (stock.get("code") or "").strip() if code and code in watch_map: related.append(f"- `{code}` {watch_map[code]}: {stock.get('reason', '')}") mm_msg = analyzer.format_analysis_for_mattermost(analysis, news_list) or "" if not mm_msg: return "📰 포맷된 뉴스 메시지가 없습니다." if related: mm_msg += "\n\n**🎯 위시리스트 관련 종목**\n" + "\n".join(related) else: mm_msg += "\n\n_위시리스트와 직접 매칭된 종목 없음 (전체 시장 참고용)_" return mm_msg except Exception as e: logger.error("뉴스 핸들러 실패(%s): %s", backend, e) return f"❌ 뉴스 분석 실패: {e}" def handler_news(args: str, db: TradeDB, hub: "CommandHub") -> str: """ !뉴스 — 네이버 금융 뉴스 크롤링 → Gemini AI 분석 → 위시리스트 관련 필터링. (기본 백엔드: Gemini) """ if not gemini_client: return "❌ 뉴스 AI 분석 불가 (Gemini API 키 미설정)" return _run_news_with_backend(db, backend="gemini") def handler_news_claude(args: str, db: TradeDB, hub: "CommandHub") -> str: """!클로드뉴스 — 뉴스 크롤링 + Claude 분석.""" if not claude_client: return "❌ 뉴스 AI 분석 불가 (ANTHROPIC_API_KEY 미설정)" return _run_news_with_backend(db, backend="claude") def handler_news_openrouter(args: str, db: TradeDB, hub: "CommandHub") -> str: """!오픈뉴스 — 뉴스 크롤링 + OpenRouter 분석.""" if not OPENROUTER_API_KEY: return "❌ 뉴스 AI 분석 불가 (OPENROUTER_API_KEY 미설정)" return _run_news_with_backend(db, backend="openrouter") def handler_watchlist_analyze(args: str, db: TradeDB, hub: "CommandHub") -> str: """ !종목분석 — kis_long_ver2 장기 투자 체크 리포트를 즉시 1회 실행. - 장 시작/마감에 자동으로 나가는 리포트와 동일한 형식을, 사용자가 원할 때 수동으로 호출. - 실제 리포트 전송은 kis_long_ver2.LongWatchBotV2 의 Mattermost 설정(MM_CHANNEL_LONG 등)을 그대로 사용. """ try: bot = LongWatchBotV2() # 장 시작/마감과 동일 포맷, 레이블만 '수동'으로 구분 bot.send_long_check_report("수동 요청") return "📊 장기 투자 체크 리포트를 즉시 전송했습니다. (kis_long_ver2 형식 그대로)" except Exception as e: logger.error("종목분석(장기 리포트) 핸들러 실패: %s", e) return f"❌ 종목분석(장기 리포트) 실행 실패: {e}" def handler_analysis_log(args: str, db: TradeDB, hub: "CommandHub") -> str: """ !분석기록 [N] — 저장된 AI 분석 기록 조회. 인수 없음: 최근 5건 목록 (id, 시각, 모델, 요약). !분석기록 3 — id=3 건의 전체 응답 보기 (나중에 '뭐라고 했지' 꺼내볼 때). """ arg = (args or "").strip() if arg.isdigit(): log_id = int(arg) row = db.get_ai_analysis_log_by_id(log_id) if not row: return f"❌ id={log_id} 분석 기록이 없습니다." return ( f"📋 **분석기록 id={row['id']}** ({row['created_at']} | {row['model']})\n\n" f"**당시 상태**\n{row['context_summary'] or '(없음)'}\n\n" f"**AI 응답**\n{row['response'] or '(없음)'}\n\n" "---\n_목록: `!분석기록`_" ) # 목록 (최근 5건) items = db.get_ai_analysis_log_list(limit=5) if not items: return "저장된 분석 기록이 없습니다. `!클로드분석` 또는 `!애미분석` 을 먼저 실행해 보세요." lines = ["📋 **최근 분석기록 (최근 5건)**", ""] for it in items: ctx = (it["context_summary"] or "")[:120] if len(it["context_summary"] or "") > 120: ctx += "…" prev = (it["response_preview"] or "")[:150] if len(it["response_preview"] or "") > 150: prev += "…" lines.append(f"**id={it['id']}** {it['created_at']} | {it['model']}\n 상태: {ctx}\n 응답: {prev}") lines.append("") lines.append("_전체 보기: `!분석기록 3` (3을 원하는 id로 변경)_") return "\n".join(lines) def handler_apply(args: str, db: TradeDB, hub: "CommandHub") -> str: """!적용 — 마지막 AI 추천 수치 전체 DB 반영.""" text = db.get_last_ai_recommendations() if not text or not text.strip(): return "저장된 AI 추천이 없습니다. 먼저 `!클로드분석` 또는 `!애미분석` 을 실행해 주세요." valid_keys = set(ENV_CONFIG_KEYS) updates: Dict[str, str] = {} for line in text.splitlines(): line = line.strip() if not line: continue m = re.match(r"^([A-Z][A-Z0-9_]*)=(.+)$", line) if m and m.group(1) in valid_keys: updates[m.group(1)] = m.group(2).strip() if not updates: return "❌ 추천문에서 유효한 KEY=값을 찾지 못했습니다." latest = db.get_latest_env() if not latest or not latest.get("snapshot"): return "❌ 현재 env가 없습니다." snap = dict(latest["snapshot"]) old_snap = dict(snap) snap.update(updates) rid = db.insert_env_snapshot(snap) if rid is None: return "❌ DB 반영 실패." # 변경 전→후를 항목별로 나열하여 가독성 향상 lines = [f"⚙️ **AI 추천 일괄 적용 완료** ({len(updates)}건)\n"] for k in sorted(updates.keys()): old_v = old_snap.get(k) new_v = updates[k] old_str = f"`{old_v}`" if old_v not in (None, "") else "_없음_" lines.append(f"- `{k}` : {old_str} → **`{new_v}`**") lines.append("\n_봇 재시작 없이 다음 루프에서 자동 반영됩니다._") return "\n".join(lines) def handler_set(args: str, db: TradeDB, hub: "CommandHub") -> str: """!설정 KEY=값 또는 !설정 KEY 값 — 단일 설정값 DB 반영.""" rest = args.strip() if not rest: return "사용법: `!설정 KEY=값` 또는 `!설정 KEY 값`" m = re.match(r"^([A-Za-z_][A-Za-z0-9_]*)\s*=\s*(.+)$", rest) if m: key, val = m.group(1), m.group(2).strip() else: parts = rest.split(None, 1) if len(parts) < 2: return "사용법: `!설정 KEY=값` 또는 `!설정 KEY 값`" key, val = parts[0], parts[1] key = key.strip().upper() if key not in set(ENV_CONFIG_KEYS): similar = [k for k in ENV_CONFIG_KEYS if key in k or k.startswith(key[:4])] hint = f"\n혹시 이 키를 찾으시나요? {', '.join(f'`{k}`' for k in similar[:5])}" if similar else "" return f"❌ 알 수 없는 설정 키: `{key}`{hint}" latest = db.get_latest_env() if not latest or not latest.get("snapshot"): return "❌ 현재 env가 없습니다." snap = dict(latest["snapshot"]) old_val = snap.get(key) snap[key] = val.strip() rid = db.insert_env_snapshot(snap) if rid is None: return "❌ DB 반영 실패." # 이전값 → 새값 명시, 관련 경고 추가 old_str = f"`{old_val}`" if old_val not in (None, "") else "_없음_" warn = "" # STOP_LOSS_PCT 양수 입력 시 경고 (자동 부호반전 안내) if key == "STOP_LOSS_PCT": try: fval = float(val) if fval > 0: warn = "\n⚠️ 양수 값입니다. 손절은 음수여야 합니다 (예: `-0.02`). 봇 코드에서 자동 반전되지만 DB에는 음수로 저장하세요." except ValueError: pass # MAX_LOSS / STOP_LOSS 정합 힌트 if key in ("STOP_LOSS_PCT", "MAX_LOSS_PER_TRADE_KRW"): try: stop_pct = abs(float(snap.get("STOP_LOSS_PCT") or 0)) max_loss = float(snap.get("MAX_LOSS_PER_TRADE_KRW") or 0) if stop_pct > 0 and max_loss > 0: implied_pos = max_loss / stop_pct warn += f"\n📐 포지션 상한 자동계산: MAX_LOSS({max_loss:,.0f}원) ÷ |STOP_LOSS({stop_pct:.4f})| = **{implied_pos:,.0f}원**" except (ValueError, ZeroDivisionError): pass return ( f"⚙️ **설정 반영 완료**\n" f"`{key}` : {old_str} → **`{val}`**" f"{warn}\n" f"_봇 재시작 없이 다음 루프에서 자동 반영됩니다._" ) # ================================================================== # CommandHub — MM 폴링 + 명령어 라우터 # ================================================================== class CommandHub: """ 매터모스트 채널을 폴링해서 !명령어를 감지하고 등록된 핸들러를 실행. 핸들러 시그니처: def handler(args: str, db: TradeDB, hub: CommandHub) -> str: ... # 반환값이 MM 응답 메시지 """ KV_LAST_SEEN_TS = "mm_butler_last_seen_ts" def __init__( self, server_url: str, bot_token: str, channel_alias: str, db: TradeDB, poll_interval_sec: int = 15, ): self.server_url = server_url.rstrip("/") self.bot_token = bot_token self.channel_alias = channel_alias self.db = db self.poll_interval_sec = poll_interval_sec self._channel_id: Optional[str] = None self._bot_user_id: Optional[str] = None self._headers = { "Authorization": f"Bearer {bot_token}", "Content-Type": "application/json", } self._running = False self._thread: Optional[threading.Thread] = None # { cmd_name: (handler_fn, description) } self.registry: Dict[str, Tuple[Callable, str]] = {} self._register_commands() # ------------------------------------------------------------------ # 명령어 등록 # ------------------------------------------------------------------ def _register_commands(self) -> None: """기본 명령어 등록. 새 기능은 이 안에 한 줄 추가.""" self.register_command("도움말", handler_help, "전체 명령어 목록 출력") self.register_command("클로드분석", handler_claude_analyze, "Claude AI 로 소스코드 기반 거래 분석 + 수치 추천") self.register_command("애미분석", handler_gemini_analyze, "Gemini AI 로 소스코드 기반 거래 분석 + 수치 추천") self.register_command("오픈분석", handler_openrouter_analyze, "OpenRouter 로 소스코드 기반 거래 분석 + 수치 추천") self.register_command("종목분석", handler_watchlist_analyze, "long_term_watchlist 기반 장기 위시리스트 Gemini 분석") self.register_command("애미뉴스", handler_news, "네이버 금융 뉴스 AI 분석 (기본: Gemini, 위시리스트 필터링 포함)") self.register_command("클로드뉴스", handler_news_claude, "네이버 금융 뉴스 AI 분석 (Claude)") self.register_command("오픈뉴스", handler_news_openrouter, "네이버 금융 뉴스 AI 분석 (OpenRouter)") self.register_command("적용", handler_apply, "마지막 AI 추천 수치 전체 DB 반영") self.register_command("설정", handler_set, "단일 설정값 DB 반영 (예: !설정 MAX_STOCKS=4)") self.register_command("분석기록", handler_analysis_log, "저장된 AI 분석 기록 목록/상세 (나중에 꺼내보기)") def register_command(self, cmd: str, handler: Callable, description: str = "") -> None: """외부에서 동적으로 명령어 추가 가능.""" self.registry[cmd.strip()] = (handler, description) logger.debug("명령어 등록: !%s", cmd) # ------------------------------------------------------------------ # MM API 헬퍼 # ------------------------------------------------------------------ def _load_channel_id(self) -> Optional[str]: if self._channel_id: return self._channel_id try: if MM_CONFIG_FILE.exists(): with open(MM_CONFIG_FILE, "r", encoding="utf-8") as f: data = json.load(f) self._channel_id = data.get("channels", {}).get(self.channel_alias) except Exception as e: logger.warning("채널 ID 로드 실패: %s", e) return self._channel_id def _get_bot_user_id(self) -> Optional[str]: if self._bot_user_id: return self._bot_user_id try: r = requests.get( f"{self.server_url}/api/v4/users/me", headers=self._headers, timeout=5, ) r.raise_for_status() self._bot_user_id = r.json().get("id") except Exception as e: logger.warning("봇 user_id 조회 실패: %s", e) return self._bot_user_id def _fetch_posts(self) -> list: cid = self._load_channel_id() if not cid: return [] try: r = requests.get( f"{self.server_url}/api/v4/channels/{cid}/posts", params={"per_page": 30}, headers=self._headers, timeout=5, ) r.raise_for_status() data = r.json() order = data.get("order", []) posts = data.get("posts", {}) bot_id = self._get_bot_user_id() out = [] for pid in order: p = posts.get(pid) if p and not (bot_id and p.get("user_id") == bot_id): out.append(p) return out except Exception as e: logger.debug("게시물 조회 실패: %s", e) return [] def _post_reply(self, message: str, root_id: Optional[str] = None) -> bool: cid = self._load_channel_id() if not cid: return False payload = {"channel_id": cid, "message": message} if root_id: payload["root_id"] = root_id try: r = requests.post( f"{self.server_url}/api/v4/posts", headers=self._headers, json=payload, timeout=10, ) r.raise_for_status() return True except Exception as e: logger.error("MM 전송 실패: %s", e) return False # ------------------------------------------------------------------ # 명령어 처리 # ------------------------------------------------------------------ def _dispatch(self, message: str, post_id: str) -> Optional[str]: """!로 시작하는 메시지를 파싱해 핸들러 호출. 반환값 = 응답 메시지.""" msg = (message or "").strip() if not msg.startswith("!"): return None # "!명령어 나머지인수" 분리 parts = msg[1:].split(None, 1) cmd = parts[0].strip() args = parts[1] if len(parts) > 1 else "" entry = self.registry.get(cmd) if not entry: return None # 모르는 명령어는 무시 (다른 봇 명령 포함) handler_fn, _ = entry try: logger.info("명령 실행: !%s (args=%r)", cmd, args[:50]) return handler_fn(args, self.db, self) except Exception as e: logger.error("핸들러 !%s 실패: %s", cmd, e) return f"❌ `!{cmd}` 실행 중 오류: {e}" # ------------------------------------------------------------------ # 폴링 루프 # ------------------------------------------------------------------ def _poll_loop(self) -> None: ts_str = self.db.get_kv(self.KV_LAST_SEEN_TS) last_seen_ts = int(ts_str) if ts_str else int(time.time() * 1000) self.db.set_kv(self.KV_LAST_SEEN_TS, str(last_seen_ts)) while self._running: try: time.sleep(self.poll_interval_sec) if not self._running: break posts = self._fetch_posts() bot_id = self._get_bot_user_id() for p in posts: create_at = int(p.get("create_at", 0)) if create_at <= last_seen_ts: continue last_seen_ts = max(last_seen_ts, create_at) if bot_id and p.get("user_id") == bot_id: continue msg_text = (p.get("message") or "").strip() reply = self._dispatch(msg_text, p.get("id", "")) if reply: self._post_reply(reply, root_id=p.get("id")) logger.info("명령 응답 전송: %s -> %s…", msg_text[:30], reply[:60]) self.db.set_kv(self.KV_LAST_SEEN_TS, str(last_seen_ts)) except Exception as e: logger.warning("폴링 예외: %s", e) def start(self) -> None: """백그라운드 스레드로 폴링 시작.""" if self._running: return if not self.bot_token: logger.warning("MM_BOT_TOKEN 미설정 — Butler 리스너 미시작") return if not self._load_channel_id(): logger.warning("채널 ID 없음 (alias=%s) — Butler 리스너 미시작", self.channel_alias) return self._running = True self._thread = threading.Thread(target=self._poll_loop, daemon=True) self._thread.start() logger.info( "✅ MM Butler 시작 (채널=%s, 폴링=%ds, Claude=%s, Gemini=%s)", self.channel_alias, self.poll_interval_sec, "✓" if claude_client else "✗", "✓" if gemini_client else "✗", ) def stop(self) -> None: self._running = False if self._thread: self._thread.join(timeout=self.poll_interval_sec * 2) self._thread = None logger.info("MM Butler 종료") # ================================================================== # 진입점 # ================================================================== def main() -> None: channel_alias = get_env_from_db("MM_BUTLER_CHANNEL", "default").strip() or "default" poll_sec = get_env_int("MM_BUTLER_POLL_SEC", 15) hub = CommandHub( server_url=MM_SERVER_URL, bot_token=MM_BOT_TOKEN, channel_alias=channel_alias, db=shared_db, poll_interval_sec=poll_sec, ) hub.start() logger.info("MM Butler 대기 중… (Ctrl+C 로 종료)") try: while True: time.sleep(60) except KeyboardInterrupt: hub.stop() logger.info("종료") if __name__ == "__main__": main()