1084 lines
42 KiB
Python
1084 lines
42 KiB
Python
"""
|
||
mm_butler.py — 매터모스트 명령 허브 (상주 데몬)
|
||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||
역할: 매터모스트 채널에서 !명령어를 폴링하여 즉시 실행.
|
||
kis_short_ver2 / kis_long_ver2 봇과 완전히 독립적으로 상주.
|
||
|
||
지원 명령어 (기본):
|
||
!도움말 — 전체 명령어 목록
|
||
!클로드분석 — Claude AI 로 단타봇 거래 분석 + 수치 추천 (소스코드 포함)
|
||
!애미분석 — Gemini AI 로 단타봇 거래 분석 + 수치 추천 (소스코드 포함)
|
||
!오픈분석 — OpenRouter 경유 모델로 단타봇 거래 분석 + 수치 추천
|
||
!뉴스 — 네이버 금융 뉴스 AI 분석 (기본: Gemini, 위시리스트 관련 필터링 포함)
|
||
!클로드뉴스 — Claude 로 뉴스 AI 분석
|
||
!오픈뉴스 — OpenRouter 로 뉴스 AI 분석
|
||
!적용 — 마지막 AI 추천 수치 전체 DB 반영
|
||
!설정 KEY=VALUE — 단일 설정값 DB 반영
|
||
!분석기록 [N] — 저장된 분석 기록 목록/상세 (나중에 '뭐라고 했지' 꺼내보기)
|
||
|
||
확장:
|
||
기능이 생길 때마다 _register_commands() 안에 핸들러 함수 하나 추가하면 됩니다.
|
||
다른 파일에서 임포트 후 register_command() 로도 동적 등록 가능합니다.
|
||
|
||
실행:
|
||
python mm_butler.py # 포그라운드
|
||
또는 systemd 서비스로 등록
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import subprocess
|
||
import threading
|
||
import time
|
||
from pathlib import Path
|
||
from typing import Callable, Dict, Optional, Tuple
|
||
|
||
import requests
|
||
|
||
# ------------------------------------------------------------------
|
||
# 내부 모듈 (kis_long_ver1 공용 함수·DB 재사용)
|
||
# ------------------------------------------------------------------
|
||
from database import TradeDB, ENV_CONFIG_KEYS
|
||
from kis_long_ver1 import (
|
||
get_env_from_db,
|
||
get_env_int,
|
||
get_env_float,
|
||
get_env_bool,
|
||
MM_SERVER_URL,
|
||
MM_BOT_TOKEN,
|
||
MM_CONFIG_FILE,
|
||
SCRIPT_DIR,
|
||
db as shared_db,
|
||
)
|
||
from kis_long_ver2 import LongWatchBotV2
|
||
from news_analyzer import NewsAnalyzer
|
||
|
||
# ------------------------------------------------------------------
|
||
# Claude 초기화 (클로드분석용)
|
||
# ------------------------------------------------------------------
|
||
try:
|
||
import anthropic
|
||
_CLAUDE_AVAILABLE = True
|
||
except ImportError:
|
||
_CLAUDE_AVAILABLE = False
|
||
|
||
CLAUDE_API_KEY = get_env_from_db("ANTHROPIC_API_KEY", "").strip()
|
||
CLAUDE_MODEL_ID = get_env_from_db("CLAUDE_MODEL_ID", "claude-sonnet-4-5").strip() or "claude-sonnet-4-5"
|
||
CLAUDE_MAX_TOKENS = get_env_int("CLAUDE_MAX_TOKENS", 8192)
|
||
|
||
claude_client: Optional["anthropic.Anthropic"] = None
|
||
if _CLAUDE_AVAILABLE and CLAUDE_API_KEY:
|
||
try:
|
||
claude_client = anthropic.Anthropic(api_key=CLAUDE_API_KEY)
|
||
except Exception as _e:
|
||
claude_client = None
|
||
|
||
# ------------------------------------------------------------------
|
||
# Gemini 초기화 (애미분석용) google.genai 신규 SDK
|
||
# ------------------------------------------------------------------
|
||
try:
|
||
import google.genai as genai
|
||
_GEMINI_AVAILABLE = True
|
||
except ImportError:
|
||
_GEMINI_AVAILABLE = False
|
||
|
||
GEMINI_API_KEY = get_env_from_db("GEMINI_API_KEY", "").strip()
|
||
GEMINI_MODEL_ID = get_env_from_db("GEMINI_MODEL_ID", "gemini-2.5-flash").strip() or "gemini-2.5-flash"
|
||
|
||
gemini_client = None
|
||
if _GEMINI_AVAILABLE and GEMINI_API_KEY:
|
||
try:
|
||
gemini_client = genai.Client(api_key=GEMINI_API_KEY)
|
||
except Exception as _e:
|
||
gemini_client = None
|
||
|
||
# ------------------------------------------------------------------
|
||
# OpenRouter 초기화 (공용 분석/뉴스용)
|
||
# ------------------------------------------------------------------
|
||
OPENROUTER_API_KEY = get_env_from_db("OPENROUTER_API_KEY", "").strip()
|
||
OPENROUTER_MODEL_ID = get_env_from_db("OPENROUTER_MODEL_ID", "anthropic/claude-4.5-sonnet").strip() or "anthropic/claude-4.5-sonnet"
|
||
|
||
# ------------------------------------------------------------------
|
||
# 로깅
|
||
# ------------------------------------------------------------------
|
||
logging.basicConfig(
|
||
format="[%(asctime)s][%(name)s] %(message)s",
|
||
datefmt="%H:%M:%S",
|
||
level=logging.INFO,
|
||
)
|
||
logger = logging.getLogger("MMButler")
|
||
|
||
|
||
# ==================================================================
|
||
# 유틸리티 함수
|
||
# ==================================================================
|
||
|
||
def _get_env_numeric_snapshot(db: TradeDB) -> str:
|
||
"""DB 최신 env에서 계좌/키/토큰/URL 제외한 수치·설정만 반환 (KEY=값 줄 단위)."""
|
||
EXCLUDE = {
|
||
"MM_SERVER_URL", "MM_BOT_TOKEN_", "MATTERMOST_CHANNEL",
|
||
"GEMINI_API_KEY", "GEMINI_MODEL_ID",
|
||
"ANTHROPIC_API_KEY", "CLAUDE_API_KEY", "CLAUDE_MODEL_ID",
|
||
"OPENROUTER_API_KEY", "OPENROUTER_MODEL_ID",
|
||
"KIS_APP_KEY_REAL", "KIS_APP_SECRET_REAL", "KIS_APP_KEY_MOCK", "KIS_APP_SECRET_MOCK",
|
||
"KIS_ACCOUNT_NO_REAL", "KIS_ACCOUNT_CODE_REAL", "KIS_ACCOUNT_NO_MOCK", "KIS_ACCOUNT_CODE_MOCK",
|
||
"KIS_SHORT_MM_CHANNEL", "KIS_LONG_MM_CHANNEL", "MM_BUTLER_CHANNEL",
|
||
}
|
||
latest = db.get_latest_env()
|
||
if not latest or not latest.get("snapshot"):
|
||
return ""
|
||
snap = latest["snapshot"]
|
||
lines = []
|
||
for k, v in sorted(snap.items()):
|
||
if k in EXCLUDE or v is None:
|
||
continue
|
||
v = str(v or "").strip()
|
||
if "#" in v:
|
||
v = v.split("#")[0].strip()
|
||
if not v:
|
||
continue
|
||
lines.append(f"{k}={v}")
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _get_journalctl_recent(lines: int = 500, unit: Optional[str] = None) -> str:
|
||
"""journalctl 최근 N줄. unit 있으면 -u unit 적용."""
|
||
cmd = ["journalctl", "-n", str(lines), "-o", "short-iso"]
|
||
if unit:
|
||
cmd = ["journalctl", "-u", unit, "-n", str(lines), "-o", "short-iso"]
|
||
try:
|
||
r = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
||
if r.returncode == 0 and r.stdout:
|
||
return r.stdout.strip()
|
||
except Exception as e:
|
||
logger.warning("journalctl 조회 실패: %s", e)
|
||
return ""
|
||
|
||
|
||
def _save_ai_recommendations(db: TradeDB, analysis_text: str) -> None:
|
||
"""AI 분석문에서 'KEY=값' 추천 줄만 추출해 DB에 저장 (!적용 시 사용)."""
|
||
if not analysis_text:
|
||
return
|
||
valid_keys = set(ENV_CONFIG_KEYS)
|
||
lines = []
|
||
for line in analysis_text.splitlines():
|
||
line = line.strip()
|
||
if not line or line.startswith("#"):
|
||
continue
|
||
m = re.match(r"^([A-Z][A-Z0-9_]*)=(.+)$", line)
|
||
if m and m.group(1) in valid_keys:
|
||
lines.append(f"{m.group(1)}={m.group(2).strip()}")
|
||
if lines:
|
||
db.set_last_ai_recommendations("\n".join(lines))
|
||
|
||
|
||
def _read_bot_source() -> str:
|
||
"""
|
||
kis_short_ver2.py 소스 전체를 읽어 반환.
|
||
AI가 매매 로직을 이해하고 정확한 변수명·수치로 추천하기 위해 프롬프트에 첨부.
|
||
AI_SOURCE_MAX_CHARS(env/DB) 로 최대 길이 제한 (기본 120,000자 ≈ Claude 30k 토큰).
|
||
"""
|
||
src_path = SCRIPT_DIR / "kis_short_ver2.py"
|
||
if not src_path.exists():
|
||
return "(kis_short_ver2.py 파일 없음)"
|
||
try:
|
||
max_chars = get_env_int("AI_SOURCE_MAX_CHARS", 120000)
|
||
content = src_path.read_text(encoding="utf-8")
|
||
if len(content) > max_chars:
|
||
content = content[:max_chars] + f"\n\n...(이하 {len(content) - max_chars:,}자 생략)..."
|
||
return content
|
||
except Exception as e:
|
||
logger.warning("kis_short_ver2.py 읽기 실패: %s", e)
|
||
return f"(소스 읽기 실패: {e})"
|
||
|
||
|
||
def _build_analyze_context(db: TradeDB) -> Dict:
|
||
"""
|
||
분석에 필요한 공통 컨텍스트 수집:
|
||
env 수치, journalctl 로그, 최근 거래 내역, 유니버스 후보 수, 소스코드.
|
||
두 AI 핸들러가 이 함수를 공유하여 중복 코드 제거.
|
||
"""
|
||
env_lines = _get_env_numeric_snapshot(db)
|
||
|
||
log_lines_cnt = get_env_int("AI_JOURNAL_LINES", 500)
|
||
journal_unit = os.environ.get("JOURNALCTL_UNIT", "").strip() or None
|
||
journal_log = _get_journalctl_recent(lines=log_lines_cnt, unit=journal_unit) or "(journalctl 로그 없음)"
|
||
|
||
recent_trades = []
|
||
try:
|
||
cursor = db.conn.execute(
|
||
"""
|
||
SELECT code, name, buy_price, sell_price, qty, profit_rate,
|
||
realized_pnl, strategy, sell_reason, buy_date, sell_date, hold_minutes
|
||
FROM trade_history
|
||
ORDER BY id DESC
|
||
LIMIT 10
|
||
"""
|
||
)
|
||
for row in cursor.fetchall():
|
||
recent_trades.append({
|
||
"code": row[0], "name": row[1],
|
||
"buy_price": row[2], "sell_price": row[3], "qty": row[4],
|
||
"profit_rate": row[5], "realized_pnl": row[6],
|
||
"strategy": row[7], "sell_reason": row[8],
|
||
"buy_date": row[9], "sell_date": row[10],
|
||
"hold_minutes": row[11] or 0,
|
||
})
|
||
except Exception as e:
|
||
logger.error("거래 내역 조회 실패: %s", e)
|
||
|
||
try:
|
||
row = db.conn.execute(
|
||
"SELECT COUNT(*) FROM target_candidates WHERE scan_date = date('now')"
|
||
).fetchone()
|
||
candidate_count = row[0] if row else 0
|
||
except Exception:
|
||
candidate_count = 0
|
||
|
||
bot_source = _read_bot_source()
|
||
|
||
return {
|
||
"env_lines": env_lines,
|
||
"journal_log": journal_log,
|
||
"log_lines_cnt": log_lines_cnt,
|
||
"recent_trades": recent_trades,
|
||
"candidate_count": candidate_count,
|
||
"bot_source": bot_source,
|
||
}
|
||
|
||
|
||
def _build_analyze_prompt(ctx: Dict) -> Tuple[str, str]:
|
||
"""
|
||
컨텍스트로부터 AI 프롬프트 문자열 생성.
|
||
kis_short_ver2.py 소스코드를 포함하여 AI가 변수명·로직을 정확히 파악하도록 함.
|
||
Returns: (prompt 문자열, summary 문자열)
|
||
"""
|
||
env_lines = ctx["env_lines"]
|
||
journal_log = ctx["journal_log"]
|
||
log_lines_cnt = ctx["log_lines_cnt"]
|
||
recent_trades = ctx["recent_trades"]
|
||
candidate_count = ctx["candidate_count"]
|
||
bot_source = ctx["bot_source"]
|
||
|
||
source_section = f"""
|
||
**단타봇 전체 소스코드 (kis_short_ver2.py) — 로직·변수명 참고용**
|
||
```python
|
||
{bot_source}
|
||
```
|
||
"""
|
||
|
||
if not recent_trades:
|
||
summary = f"- 유니버스 후보: {candidate_count}개\n- 최근 거래: 없음"
|
||
prompt = f"""당신은 퀀트 트레이딩 전문가입니다.
|
||
아래 단타봇 소스코드를 읽고 로직을 완전히 이해한 뒤, 설정 수치와 로그를 바탕으로 분석해 주세요.
|
||
{source_section}
|
||
|
||
**현재 상태**
|
||
- 유니버스 후보: {candidate_count}개
|
||
- 최근 거래: 없음
|
||
|
||
**현재 DB 설정 수치 (계좌/키 제외)**
|
||
```
|
||
{env_lines}
|
||
```
|
||
|
||
**봇 최근 로그 (journalctl 최근 {log_lines_cnt}줄)**
|
||
```
|
||
{journal_log[:12000]}
|
||
```
|
||
|
||
**당신의 임무**
|
||
1. 소스코드의 매수 조건·필터 로직을 파악하고, 현재 설정이 너무 엄격하거나 느슨한 부분을 찾아 문제점 분석.
|
||
2. **추천**: 반드시 KEY=값 한 줄에 하나. 이유·주석 금지. 그대로 DB 복붙 가능해야 함.
|
||
변수명은 반드시 소스코드에 실제로 존재하는 것만 사용할 것.
|
||
3. 예상 효과 한두 줄.
|
||
|
||
**출력 형식 (반드시 준수)**
|
||
## 🔍 문제점
|
||
1. [소스 로직 기반 구체적 문제 1]
|
||
2. [소스 로직 기반 구체적 문제 2]
|
||
|
||
## 💡 수치 추천 (KEY=값, 한 줄에 하나)
|
||
KEY=값
|
||
(필요한 것만)
|
||
|
||
## 📈 예상 효과
|
||
- [효과]
|
||
"""
|
||
else:
|
||
total = len(recent_trades)
|
||
wins = sum(1 for t in recent_trades if t["profit_rate"] > 0)
|
||
win_rate = wins / total * 100 if total else 0
|
||
avg_profit = sum(t["profit_rate"] for t in recent_trades) / total
|
||
total_pnl = sum(t["realized_pnl"] for t in recent_trades)
|
||
avg_hold = sum(t["hold_minutes"] for t in recent_trades) / total
|
||
|
||
trades_text = ""
|
||
for i, t in enumerate(recent_trades, 1):
|
||
trades_text += (
|
||
f"\n[거래 {i}] {t['name']} ({t['strategy']})\n"
|
||
f"- 매수: {t['buy_price']:,.0f}원 × {t['qty']}주 | 매도: {t['sell_price']:,.0f}원\n"
|
||
f"- 손익: {t['profit_rate']:+.2f}% ({t['realized_pnl']:,.0f}원) | 보유: {t['hold_minutes']}분\n"
|
||
f"- 사유: {t['sell_reason']}\n"
|
||
)
|
||
|
||
summary = (
|
||
f"- 유니버스 후보: {candidate_count}개\n"
|
||
f"- 최근 거래: {total}건 | 승률: {win_rate:.1f}%\n"
|
||
f"- 평균 수익률: {avg_profit:.2f}% | 총 손익: {total_pnl:+,.0f}원"
|
||
)
|
||
|
||
prompt = f"""당신은 퀀트 트레이딩 전문가입니다.
|
||
아래 단타봇 소스코드를 읽고 로직을 완전히 이해한 뒤, 거래 내역·설정 수치를 분석해 주세요.
|
||
{source_section}
|
||
|
||
**현재 상태**
|
||
- 유니버스 후보: {candidate_count}개
|
||
- 최근 거래: {total}건 | 승률: {win_rate:.1f}% ({wins}승 {total - wins}패)
|
||
- 평균 수익률: {avg_profit:.2f}% | 총 손익: {total_pnl:+,.0f}원 | 평균 보유: {avg_hold:.0f}분
|
||
|
||
**최근 거래 내역**
|
||
{trades_text}
|
||
|
||
**현재 DB 설정 수치 (계좌/키 제외)**
|
||
```
|
||
{env_lines}
|
||
```
|
||
|
||
**봇 최근 로그 (journalctl 최근 {log_lines_cnt}줄)**
|
||
```
|
||
{journal_log[:12000]}
|
||
```
|
||
|
||
**당신의 임무**
|
||
1. 소스코드의 매수·매도 로직과 거래 내역을 대조해 승률 하락 원인을 3가지 구체적으로 진단.
|
||
(예: 어느 함수의 어떤 조건이 문제인지 명시)
|
||
2. **추천**: KEY=값 한 줄에 하나. 이유·주석 금지. DB 복붙 가능하게.
|
||
변수명은 반드시 소스코드에 실제로 존재하는 것만 사용할 것.
|
||
3. 예상 효과 한두 줄.
|
||
|
||
**출력 형식 (반드시 준수)**
|
||
## 🔍 문제점 (승률 하락 원인)
|
||
1. [소스 로직 기반 구체적 문제 1]
|
||
2. [소스 로직 기반 구체적 문제 2]
|
||
3. [소스 로직 기반 구체적 문제 3]
|
||
|
||
## 💡 수치 추천 (KEY=값, 한 줄에 하나)
|
||
KEY=값
|
||
(필요한 것만)
|
||
|
||
## 📈 예상 효과
|
||
- [효과]
|
||
"""
|
||
|
||
return prompt, summary
|
||
|
||
|
||
def _call_claude(prompt: str) -> str:
|
||
"""Claude API 호출 후 텍스트 반환. 실패 시 오류 문자열 반환."""
|
||
if not claude_client:
|
||
return "❌ Claude API 미설정 (ANTHROPIC_API_KEY 확인)"
|
||
try:
|
||
max_tok = get_env_int("CLAUDE_MAX_TOKENS", 8192)
|
||
model = get_env_from_db("CLAUDE_MODEL_ID", CLAUDE_MODEL_ID).strip() or CLAUDE_MODEL_ID
|
||
response = claude_client.messages.create(
|
||
model=model,
|
||
max_tokens=max_tok,
|
||
messages=[{"role": "user", "content": prompt}],
|
||
)
|
||
return response.content[0].text if response.content else "(응답 없음)"
|
||
except Exception as e:
|
||
logger.error("Claude 호출 실패: %s", e)
|
||
return f"❌ Claude 호출 실패: {e}"
|
||
|
||
|
||
def _call_gemini(prompt: str) -> str:
|
||
"""Gemini API 호출 후 텍스트 반환. 실패 시 오류 문자열 반환."""
|
||
if not gemini_client:
|
||
return "❌ Gemini API 미설정 (GEMINI_API_KEY 확인)"
|
||
try:
|
||
model = get_env_from_db("GEMINI_MODEL_ID", GEMINI_MODEL_ID).strip() or GEMINI_MODEL_ID
|
||
response = gemini_client.models.generate_content(model=model, contents=prompt)
|
||
return (
|
||
getattr(response, "text", None)
|
||
or (response.candidates[0].content.parts[0].text if response.candidates else "(응답 없음)")
|
||
)
|
||
except Exception as e:
|
||
logger.error("Gemini 호출 실패: %s", e)
|
||
return f"❌ Gemini 호출 실패: {e}"
|
||
|
||
|
||
def _call_openrouter(prompt: str, model: Optional[str] = None) -> str:
|
||
"""OpenRouter API 호출 후 텍스트 반환. 실패 시 오류 문자열 반환."""
|
||
if not OPENROUTER_API_KEY:
|
||
return "❌ OpenRouter API 미설정 (OPENROUTER_API_KEY 확인)"
|
||
use_model = (model or get_env_from_db("OPENROUTER_MODEL_ID", OPENROUTER_MODEL_ID)).strip() or OPENROUTER_MODEL_ID
|
||
try:
|
||
headers = {
|
||
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
|
||
"Content-Type": "application/json",
|
||
}
|
||
payload = {
|
||
"model": use_model,
|
||
"messages": [{"role": "user", "content": prompt}],
|
||
}
|
||
r = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload, timeout=60)
|
||
r.raise_for_status()
|
||
data = r.json()
|
||
choice = (data.get("choices") or [{}])[0]
|
||
message = choice.get("message", {})
|
||
content = message.get("content")
|
||
if isinstance(content, list):
|
||
# 일부 OpenAI 호환 구현은 content를 조각 리스트로 반환
|
||
text = "".join(
|
||
(part.get("text", "") if isinstance(part, dict) else str(part))
|
||
for part in content
|
||
)
|
||
else:
|
||
text = content or ""
|
||
return text or "(응답 없음)"
|
||
except Exception as e:
|
||
logger.error("OpenRouter 호출 실패: %s", e)
|
||
return f"❌ OpenRouter 호출 실패: {e}"
|
||
|
||
|
||
# ==================================================================
|
||
# 핸들러 함수들 (각 !명령어에 대응, CommandHub 에 등록)
|
||
# ==================================================================
|
||
|
||
def handler_help(args: str, db: TradeDB, hub: "CommandHub") -> str:
|
||
"""!도움말 — 등록된 모든 명령어와 설명 출력."""
|
||
lines = ["**🤖 MM Butler 명령어 목록**", ""]
|
||
for cmd, (_, desc) in sorted(hub.registry.items()):
|
||
lines.append(f"- `!{cmd}` — {desc}")
|
||
lines.append("")
|
||
lines.append("_채널에서 위 명령어를 입력하면 즉시 실행됩니다._")
|
||
return "\n".join(lines)
|
||
|
||
|
||
def handler_claude_analyze(args: str, db: TradeDB, hub: "CommandHub") -> str:
|
||
"""
|
||
!클로드분석 — Claude AI 로 단타봇 거래 분석 + 수치 추천.
|
||
kis_short_ver2.py 전체 소스를 프롬프트에 포함하여 로직 기반의 정밀 분석.
|
||
"""
|
||
if not claude_client:
|
||
return "❌ Claude API 키가 설정되어 있지 않습니다. `ANTHROPIC_API_KEY` 를 DB에 추가해 주세요."
|
||
|
||
try:
|
||
ctx = _build_analyze_context(db)
|
||
except Exception as e:
|
||
return f"❌ 컨텍스트 수집 실패: {e}"
|
||
|
||
prompt, summary = _build_analyze_prompt(ctx)
|
||
analysis = _call_claude(prompt)
|
||
_save_ai_recommendations(db, analysis)
|
||
db.insert_ai_analysis_log("claude", summary, analysis)
|
||
|
||
return (
|
||
"🤖 **[클로드 분석]**\n\n"
|
||
f"📊 **현재 상태**\n{summary}\n\n"
|
||
f"{analysis}\n\n"
|
||
"---\n_추천 수치는 `!적용` 으로 DB에 즉시 반영할 수 있습니다._"
|
||
)
|
||
|
||
|
||
def handler_gemini_analyze(args: str, db: TradeDB, hub: "CommandHub") -> str:
|
||
"""
|
||
!애미분석 — Gemini AI 로 단타봇 거래 분석 + 수치 추천.
|
||
kis_short_ver2.py 전체 소스를 프롬프트에 포함하여 로직 기반의 정밀 분석.
|
||
Gemini 2.5 Flash는 100만 토큰 컨텍스트로 긴 소스도 부담 없이 처리.
|
||
"""
|
||
if not gemini_client:
|
||
return "❌ Gemini API 키가 설정되어 있지 않습니다. `GEMINI_API_KEY` 를 DB에 추가해 주세요."
|
||
|
||
try:
|
||
ctx = _build_analyze_context(db)
|
||
except Exception as e:
|
||
return f"❌ 컨텍스트 수집 실패: {e}"
|
||
|
||
prompt, summary = _build_analyze_prompt(ctx)
|
||
analysis = _call_gemini(prompt)
|
||
_save_ai_recommendations(db, analysis)
|
||
db.insert_ai_analysis_log("gemini", summary, analysis)
|
||
|
||
return (
|
||
"🤖 **[애미(Gemini) 분석]**\n\n"
|
||
f"📊 **현재 상태**\n{summary}\n\n"
|
||
f"{analysis}\n\n"
|
||
"---\n_추천 수치는 `!적용` 으로 DB에 즉시 반영할 수 있습니다._"
|
||
)
|
||
|
||
|
||
def handler_openrouter_analyze(args: str, db: TradeDB, hub: "CommandHub") -> str:
|
||
"""
|
||
!오픈분석 — OpenRouter 경유 모델로 단타봇 거래 분석 + 수치 추천.
|
||
kis_short_ver2.py 전체 소스를 프롬프트에 포함하여 로직 기반의 정밀 분석.
|
||
기본 모델은 env/DB의 OPENROUTER_MODEL_ID (기본값: anthropic/claude-3.5-sonnet).
|
||
"""
|
||
if not OPENROUTER_API_KEY:
|
||
return "❌ OpenRouter API 키가 설정되어 있지 않습니다. `OPENROUTER_API_KEY` 를 DB에 추가해 주세요."
|
||
|
||
try:
|
||
ctx = _build_analyze_context(db)
|
||
except Exception as e:
|
||
return f"❌ 컨텍스트 수집 실패: {e}"
|
||
|
||
prompt, summary = _build_analyze_prompt(ctx)
|
||
analysis = _call_openrouter(prompt)
|
||
_save_ai_recommendations(db, analysis)
|
||
db.insert_ai_analysis_log("openrouter", summary, analysis)
|
||
|
||
return (
|
||
"🤖 **[오픈(OpenRouter) 분석]**\n\n"
|
||
f"📊 **현재 상태**\n{summary}\n\n"
|
||
f"{analysis}\n\n"
|
||
"---\n_추천 수치는 `!적용` 으로 DB에 즉시 반영할 수 있습니다._"
|
||
)
|
||
|
||
|
||
def _analyze_news_with_backend(news_list: list, backend: str) -> Optional[Dict]:
|
||
"""
|
||
공통 뉴스 리스트를 받아 backend(gemini/claude/openrouter)로 분석 요청.
|
||
JSON 파싱까지 수행하여 dict 반환.
|
||
"""
|
||
if not news_list:
|
||
return None
|
||
|
||
news_titles = "\n".join([f"- {item['title']}" for item in news_list])
|
||
prompt = f"""다음은 오늘의 주요 금융 뉴스 제목들입니다:
|
||
|
||
{news_titles}
|
||
|
||
이 뉴스들을 분석하여 다음 정보를 JSON 형식으로 제공해주세요:
|
||
|
||
1. summary: 오늘의 주요 이슈를 2-3문장으로 요약
|
||
2. sectors: 관련 업종 리스트 (최대 3개, 예: ["반도체", "AI", "자동차"])
|
||
3. sentiment: 전반적 시장 분위기 (positive/neutral/negative)
|
||
4. recommended_stocks: 관련 주요 종목 (최대 3개)
|
||
- code: 종목코드 (6자리)
|
||
- name: 종목명
|
||
- reason: 추천 이유 (한 줄)
|
||
|
||
반드시 유효한 JSON 형식으로만 응답하세요. 설명 없이 JSON만 출력하세요.
|
||
"""
|
||
|
||
if backend == "gemini":
|
||
raw = _call_gemini(prompt)
|
||
elif backend == "claude":
|
||
raw = _call_claude(prompt)
|
||
elif backend == "openrouter":
|
||
raw = _call_openrouter(prompt)
|
||
else:
|
||
return None
|
||
|
||
if not raw or raw.startswith("❌"):
|
||
return None
|
||
|
||
import json as _json
|
||
|
||
text = raw.strip()
|
||
if text.startswith("```"):
|
||
parts = text.split("```")
|
||
if len(parts) >= 2:
|
||
text = parts[1]
|
||
if text.lstrip().startswith("json"):
|
||
text = text.lstrip()[4:]
|
||
text = text.strip()
|
||
try:
|
||
result = _json.loads(text)
|
||
return result if isinstance(result, dict) else None
|
||
except Exception as e:
|
||
logger.error("뉴스 JSON 파싱 실패(%s): %r", backend, e)
|
||
return None
|
||
|
||
|
||
def _run_news_with_backend(db: TradeDB, backend: str) -> str:
|
||
"""뉴스 크롤링 + 지정 백엔드로 분석 + 위시리스트 필터링까지 공통 처리."""
|
||
analyzer = NewsAnalyzer()
|
||
try:
|
||
max_news = get_env_int("NEWS_MAX_COUNT", 5)
|
||
news_list = analyzer.crawl_naver_finance_news(max_news=max_news)
|
||
if not news_list:
|
||
return "📰 크롤링된 뉴스가 없습니다."
|
||
|
||
analysis = _analyze_news_with_backend(news_list, backend)
|
||
if not analysis:
|
||
return "📰 뉴스 AI 분석 결과가 없습니다."
|
||
|
||
# 위시리스트 관련 종목 필터링
|
||
watch_map: Dict[str, str] = {}
|
||
try:
|
||
watchlist_path = SCRIPT_DIR / "long_term_watchlist.json"
|
||
if watchlist_path.exists():
|
||
with open(watchlist_path, "r", encoding="utf-8") as f:
|
||
obj = json.load(f)
|
||
for it in (obj.get("items", []) if isinstance(obj, dict) else []):
|
||
code = (it.get("code") or "").strip()
|
||
if code:
|
||
watch_map[code] = it.get("name", code)
|
||
active = db.get_active_trades(strategy_prefix="LONG")
|
||
for code, trade in active.items():
|
||
watch_map.setdefault(code, trade.get("name", code))
|
||
except Exception as e:
|
||
logger.debug("위시리스트 로드 실패(무시): %s", e)
|
||
|
||
related = []
|
||
for stock in analysis.get("recommended_stocks", []):
|
||
code = (stock.get("code") or "").strip()
|
||
if code and code in watch_map:
|
||
related.append(f"- `{code}` {watch_map[code]}: {stock.get('reason', '')}")
|
||
|
||
mm_msg = analyzer.format_analysis_for_mattermost(analysis, news_list) or ""
|
||
if not mm_msg:
|
||
return "📰 포맷된 뉴스 메시지가 없습니다."
|
||
|
||
if related:
|
||
mm_msg += "\n\n**🎯 위시리스트 관련 종목**\n" + "\n".join(related)
|
||
else:
|
||
mm_msg += "\n\n_위시리스트와 직접 매칭된 종목 없음 (전체 시장 참고용)_"
|
||
|
||
return mm_msg
|
||
|
||
except Exception as e:
|
||
logger.error("뉴스 핸들러 실패(%s): %s", backend, e)
|
||
return f"❌ 뉴스 분석 실패: {e}"
|
||
|
||
|
||
def handler_news(args: str, db: TradeDB, hub: "CommandHub") -> str:
|
||
"""
|
||
!뉴스 — 네이버 금융 뉴스 크롤링 → Gemini AI 분석 → 위시리스트 관련 필터링.
|
||
(기본 백엔드: Gemini)
|
||
"""
|
||
if not gemini_client:
|
||
return "❌ 뉴스 AI 분석 불가 (Gemini API 키 미설정)"
|
||
return _run_news_with_backend(db, backend="gemini")
|
||
|
||
|
||
def handler_news_claude(args: str, db: TradeDB, hub: "CommandHub") -> str:
|
||
"""!클로드뉴스 — 뉴스 크롤링 + Claude 분석."""
|
||
if not claude_client:
|
||
return "❌ 뉴스 AI 분석 불가 (ANTHROPIC_API_KEY 미설정)"
|
||
return _run_news_with_backend(db, backend="claude")
|
||
|
||
|
||
def handler_news_openrouter(args: str, db: TradeDB, hub: "CommandHub") -> str:
|
||
"""!오픈뉴스 — 뉴스 크롤링 + OpenRouter 분석."""
|
||
if not OPENROUTER_API_KEY:
|
||
return "❌ 뉴스 AI 분석 불가 (OPENROUTER_API_KEY 미설정)"
|
||
return _run_news_with_backend(db, backend="openrouter")
|
||
|
||
|
||
def handler_watchlist_analyze(args: str, db: TradeDB, hub: "CommandHub") -> str:
|
||
"""
|
||
!종목분석 — kis_long_ver2 장기 투자 체크 리포트를 즉시 1회 실행.
|
||
- 장 시작/마감에 자동으로 나가는 리포트와 동일한 형식을, 사용자가 원할 때 수동으로 호출.
|
||
- 실제 리포트 전송은 kis_long_ver2.LongWatchBotV2 의 Mattermost 설정(MM_CHANNEL_LONG 등)을 그대로 사용.
|
||
"""
|
||
try:
|
||
bot = LongWatchBotV2()
|
||
# 장 시작/마감과 동일 포맷, 레이블만 '수동'으로 구분
|
||
bot.send_long_check_report("수동 요청")
|
||
return "📊 장기 투자 체크 리포트를 즉시 전송했습니다. (kis_long_ver2 형식 그대로)"
|
||
except Exception as e:
|
||
logger.error("종목분석(장기 리포트) 핸들러 실패: %s", e)
|
||
return f"❌ 종목분석(장기 리포트) 실행 실패: {e}"
|
||
|
||
|
||
def handler_analysis_log(args: str, db: TradeDB, hub: "CommandHub") -> str:
|
||
"""
|
||
!분석기록 [N] — 저장된 AI 분석 기록 조회.
|
||
인수 없음: 최근 5건 목록 (id, 시각, 모델, 요약).
|
||
!분석기록 3 — id=3 건의 전체 응답 보기 (나중에 '뭐라고 했지' 꺼내볼 때).
|
||
"""
|
||
arg = (args or "").strip()
|
||
if arg.isdigit():
|
||
log_id = int(arg)
|
||
row = db.get_ai_analysis_log_by_id(log_id)
|
||
if not row:
|
||
return f"❌ id={log_id} 분석 기록이 없습니다."
|
||
return (
|
||
f"📋 **분석기록 id={row['id']}** ({row['created_at']} | {row['model']})\n\n"
|
||
f"**당시 상태**\n{row['context_summary'] or '(없음)'}\n\n"
|
||
f"**AI 응답**\n{row['response'] or '(없음)'}\n\n"
|
||
"---\n_목록: `!분석기록`_"
|
||
)
|
||
# 목록 (최근 5건)
|
||
items = db.get_ai_analysis_log_list(limit=5)
|
||
if not items:
|
||
return "저장된 분석 기록이 없습니다. `!클로드분석` 또는 `!애미분석` 을 먼저 실행해 보세요."
|
||
lines = ["📋 **최근 분석기록 (최근 5건)**", ""]
|
||
for it in items:
|
||
ctx = (it["context_summary"] or "")[:120]
|
||
if len(it["context_summary"] or "") > 120:
|
||
ctx += "…"
|
||
prev = (it["response_preview"] or "")[:150]
|
||
if len(it["response_preview"] or "") > 150:
|
||
prev += "…"
|
||
lines.append(f"**id={it['id']}** {it['created_at']} | {it['model']}\n 상태: {ctx}\n 응답: {prev}")
|
||
lines.append("")
|
||
lines.append("_전체 보기: `!분석기록 3` (3을 원하는 id로 변경)_")
|
||
return "\n".join(lines)
|
||
|
||
|
||
def handler_apply(args: str, db: TradeDB, hub: "CommandHub") -> str:
|
||
"""!적용 — 마지막 AI 추천 수치 전체 DB 반영."""
|
||
text = db.get_last_ai_recommendations()
|
||
if not text or not text.strip():
|
||
return "저장된 AI 추천이 없습니다. 먼저 `!클로드분석` 또는 `!애미분석` 을 실행해 주세요."
|
||
|
||
valid_keys = set(ENV_CONFIG_KEYS)
|
||
updates: Dict[str, str] = {}
|
||
for line in text.splitlines():
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
m = re.match(r"^([A-Z][A-Z0-9_]*)=(.+)$", line)
|
||
if m and m.group(1) in valid_keys:
|
||
updates[m.group(1)] = m.group(2).strip()
|
||
|
||
if not updates:
|
||
return "❌ 추천문에서 유효한 KEY=값을 찾지 못했습니다."
|
||
|
||
latest = db.get_latest_env()
|
||
if not latest or not latest.get("snapshot"):
|
||
return "❌ 현재 env가 없습니다."
|
||
|
||
snap = dict(latest["snapshot"])
|
||
old_snap = dict(snap)
|
||
snap.update(updates)
|
||
rid = db.insert_env_snapshot(snap)
|
||
if rid is None:
|
||
return "❌ DB 반영 실패."
|
||
|
||
# 변경 전→후를 항목별로 나열하여 가독성 향상
|
||
lines = [f"⚙️ **AI 추천 일괄 적용 완료** ({len(updates)}건)\n"]
|
||
for k in sorted(updates.keys()):
|
||
old_v = old_snap.get(k)
|
||
new_v = updates[k]
|
||
old_str = f"`{old_v}`" if old_v not in (None, "") else "_없음_"
|
||
lines.append(f"- `{k}` : {old_str} → **`{new_v}`**")
|
||
lines.append("\n_봇 재시작 없이 다음 루프에서 자동 반영됩니다._")
|
||
return "\n".join(lines)
|
||
|
||
|
||
def handler_set(args: str, db: TradeDB, hub: "CommandHub") -> str:
|
||
"""!설정 KEY=값 또는 !설정 KEY 값 — 단일 설정값 DB 반영."""
|
||
rest = args.strip()
|
||
if not rest:
|
||
return "사용법: `!설정 KEY=값` 또는 `!설정 KEY 값`"
|
||
|
||
m = re.match(r"^([A-Za-z_][A-Za-z0-9_]*)\s*=\s*(.+)$", rest)
|
||
if m:
|
||
key, val = m.group(1), m.group(2).strip()
|
||
else:
|
||
parts = rest.split(None, 1)
|
||
if len(parts) < 2:
|
||
return "사용법: `!설정 KEY=값` 또는 `!설정 KEY 값`"
|
||
key, val = parts[0], parts[1]
|
||
|
||
key = key.strip().upper()
|
||
if key not in set(ENV_CONFIG_KEYS):
|
||
similar = [k for k in ENV_CONFIG_KEYS if key in k or k.startswith(key[:4])]
|
||
hint = f"\n혹시 이 키를 찾으시나요? {', '.join(f'`{k}`' for k in similar[:5])}" if similar else ""
|
||
return f"❌ 알 수 없는 설정 키: `{key}`{hint}"
|
||
|
||
latest = db.get_latest_env()
|
||
if not latest or not latest.get("snapshot"):
|
||
return "❌ 현재 env가 없습니다."
|
||
|
||
snap = dict(latest["snapshot"])
|
||
old_val = snap.get(key)
|
||
snap[key] = val.strip()
|
||
rid = db.insert_env_snapshot(snap)
|
||
if rid is None:
|
||
return "❌ DB 반영 실패."
|
||
|
||
# 이전값 → 새값 명시, 관련 경고 추가
|
||
old_str = f"`{old_val}`" if old_val not in (None, "") else "_없음_"
|
||
warn = ""
|
||
# STOP_LOSS_PCT 양수 입력 시 경고 (자동 부호반전 안내)
|
||
if key == "STOP_LOSS_PCT":
|
||
try:
|
||
fval = float(val)
|
||
if fval > 0:
|
||
warn = "\n⚠️ 양수 값입니다. 손절은 음수여야 합니다 (예: `-0.02`). 봇 코드에서 자동 반전되지만 DB에는 음수로 저장하세요."
|
||
except ValueError:
|
||
pass
|
||
# MAX_LOSS / STOP_LOSS 정합 힌트
|
||
if key in ("STOP_LOSS_PCT", "MAX_LOSS_PER_TRADE_KRW"):
|
||
try:
|
||
stop_pct = abs(float(snap.get("STOP_LOSS_PCT") or 0))
|
||
max_loss = float(snap.get("MAX_LOSS_PER_TRADE_KRW") or 0)
|
||
if stop_pct > 0 and max_loss > 0:
|
||
implied_pos = max_loss / stop_pct
|
||
warn += f"\n📐 포지션 상한 자동계산: MAX_LOSS({max_loss:,.0f}원) ÷ |STOP_LOSS({stop_pct:.4f})| = **{implied_pos:,.0f}원**"
|
||
except (ValueError, ZeroDivisionError):
|
||
pass
|
||
|
||
return (
|
||
f"⚙️ **설정 반영 완료**\n"
|
||
f"`{key}` : {old_str} → **`{val}`**"
|
||
f"{warn}\n"
|
||
f"_봇 재시작 없이 다음 루프에서 자동 반영됩니다._"
|
||
)
|
||
|
||
|
||
# ==================================================================
|
||
# CommandHub — MM 폴링 + 명령어 라우터
|
||
# ==================================================================
|
||
|
||
class CommandHub:
|
||
"""
|
||
매터모스트 채널을 폴링해서 !명령어를 감지하고 등록된 핸들러를 실행.
|
||
|
||
핸들러 시그니처:
|
||
def handler(args: str, db: TradeDB, hub: CommandHub) -> str:
|
||
... # 반환값이 MM 응답 메시지
|
||
"""
|
||
|
||
KV_LAST_SEEN_TS = "mm_butler_last_seen_ts"
|
||
|
||
def __init__(
|
||
self,
|
||
server_url: str,
|
||
bot_token: str,
|
||
channel_alias: str,
|
||
db: TradeDB,
|
||
poll_interval_sec: int = 15,
|
||
):
|
||
self.server_url = server_url.rstrip("/")
|
||
self.bot_token = bot_token
|
||
self.channel_alias = channel_alias
|
||
self.db = db
|
||
self.poll_interval_sec = poll_interval_sec
|
||
|
||
self._channel_id: Optional[str] = None
|
||
self._bot_user_id: Optional[str] = None
|
||
self._headers = {
|
||
"Authorization": f"Bearer {bot_token}",
|
||
"Content-Type": "application/json",
|
||
}
|
||
self._running = False
|
||
self._thread: Optional[threading.Thread] = None
|
||
|
||
# { cmd_name: (handler_fn, description) }
|
||
self.registry: Dict[str, Tuple[Callable, str]] = {}
|
||
self._register_commands()
|
||
|
||
# ------------------------------------------------------------------
|
||
# 명령어 등록
|
||
# ------------------------------------------------------------------
|
||
def _register_commands(self) -> None:
|
||
"""기본 명령어 등록. 새 기능은 이 안에 한 줄 추가."""
|
||
self.register_command("도움말", handler_help, "전체 명령어 목록 출력")
|
||
self.register_command("클로드분석", handler_claude_analyze, "Claude AI 로 소스코드 기반 거래 분석 + 수치 추천")
|
||
self.register_command("애미분석", handler_gemini_analyze, "Gemini AI 로 소스코드 기반 거래 분석 + 수치 추천")
|
||
self.register_command("오픈분석", handler_openrouter_analyze, "OpenRouter 로 소스코드 기반 거래 분석 + 수치 추천")
|
||
self.register_command("종목분석", handler_watchlist_analyze, "long_term_watchlist 기반 장기 위시리스트 Gemini 분석")
|
||
self.register_command("애미뉴스", handler_news, "네이버 금융 뉴스 AI 분석 (기본: Gemini, 위시리스트 필터링 포함)")
|
||
self.register_command("클로드뉴스", handler_news_claude, "네이버 금융 뉴스 AI 분석 (Claude)")
|
||
self.register_command("오픈뉴스", handler_news_openrouter, "네이버 금융 뉴스 AI 분석 (OpenRouter)")
|
||
self.register_command("적용", handler_apply, "마지막 AI 추천 수치 전체 DB 반영")
|
||
self.register_command("설정", handler_set, "단일 설정값 DB 반영 (예: !설정 MAX_STOCKS=4)")
|
||
self.register_command("분석기록", handler_analysis_log, "저장된 AI 분석 기록 목록/상세 (나중에 꺼내보기)")
|
||
|
||
def register_command(self, cmd: str, handler: Callable, description: str = "") -> None:
|
||
"""외부에서 동적으로 명령어 추가 가능."""
|
||
self.registry[cmd.strip()] = (handler, description)
|
||
logger.debug("명령어 등록: !%s", cmd)
|
||
|
||
# ------------------------------------------------------------------
|
||
# MM API 헬퍼
|
||
# ------------------------------------------------------------------
|
||
def _load_channel_id(self) -> Optional[str]:
|
||
if self._channel_id:
|
||
return self._channel_id
|
||
try:
|
||
if MM_CONFIG_FILE.exists():
|
||
with open(MM_CONFIG_FILE, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
self._channel_id = data.get("channels", {}).get(self.channel_alias)
|
||
except Exception as e:
|
||
logger.warning("채널 ID 로드 실패: %s", e)
|
||
return self._channel_id
|
||
|
||
def _get_bot_user_id(self) -> Optional[str]:
|
||
if self._bot_user_id:
|
||
return self._bot_user_id
|
||
try:
|
||
r = requests.get(
|
||
f"{self.server_url}/api/v4/users/me",
|
||
headers=self._headers, timeout=5,
|
||
)
|
||
r.raise_for_status()
|
||
self._bot_user_id = r.json().get("id")
|
||
except Exception as e:
|
||
logger.warning("봇 user_id 조회 실패: %s", e)
|
||
return self._bot_user_id
|
||
|
||
def _fetch_posts(self) -> list:
|
||
cid = self._load_channel_id()
|
||
if not cid:
|
||
return []
|
||
try:
|
||
r = requests.get(
|
||
f"{self.server_url}/api/v4/channels/{cid}/posts",
|
||
params={"per_page": 30},
|
||
headers=self._headers, timeout=5,
|
||
)
|
||
r.raise_for_status()
|
||
data = r.json()
|
||
order = data.get("order", [])
|
||
posts = data.get("posts", {})
|
||
bot_id = self._get_bot_user_id()
|
||
out = []
|
||
for pid in order:
|
||
p = posts.get(pid)
|
||
if p and not (bot_id and p.get("user_id") == bot_id):
|
||
out.append(p)
|
||
return out
|
||
except Exception as e:
|
||
logger.debug("게시물 조회 실패: %s", e)
|
||
return []
|
||
|
||
def _post_reply(self, message: str, root_id: Optional[str] = None) -> bool:
|
||
cid = self._load_channel_id()
|
||
if not cid:
|
||
return False
|
||
payload = {"channel_id": cid, "message": message}
|
||
if root_id:
|
||
payload["root_id"] = root_id
|
||
try:
|
||
r = requests.post(
|
||
f"{self.server_url}/api/v4/posts",
|
||
headers=self._headers, json=payload, timeout=10,
|
||
)
|
||
r.raise_for_status()
|
||
return True
|
||
except Exception as e:
|
||
logger.error("MM 전송 실패: %s", e)
|
||
return False
|
||
|
||
# ------------------------------------------------------------------
|
||
# 명령어 처리
|
||
# ------------------------------------------------------------------
|
||
def _dispatch(self, message: str, post_id: str) -> Optional[str]:
|
||
"""!로 시작하는 메시지를 파싱해 핸들러 호출. 반환값 = 응답 메시지."""
|
||
msg = (message or "").strip()
|
||
if not msg.startswith("!"):
|
||
return None
|
||
|
||
# "!명령어 나머지인수" 분리
|
||
parts = msg[1:].split(None, 1)
|
||
cmd = parts[0].strip()
|
||
args = parts[1] if len(parts) > 1 else ""
|
||
|
||
entry = self.registry.get(cmd)
|
||
if not entry:
|
||
return None # 모르는 명령어는 무시 (다른 봇 명령 포함)
|
||
|
||
handler_fn, _ = entry
|
||
try:
|
||
logger.info("명령 실행: !%s (args=%r)", cmd, args[:50])
|
||
return handler_fn(args, self.db, self)
|
||
except Exception as e:
|
||
logger.error("핸들러 !%s 실패: %s", cmd, e)
|
||
return f"❌ `!{cmd}` 실행 중 오류: {e}"
|
||
|
||
# ------------------------------------------------------------------
|
||
# 폴링 루프
|
||
# ------------------------------------------------------------------
|
||
def _poll_loop(self) -> None:
|
||
ts_str = self.db.get_kv(self.KV_LAST_SEEN_TS)
|
||
last_seen_ts = int(ts_str) if ts_str else int(time.time() * 1000)
|
||
self.db.set_kv(self.KV_LAST_SEEN_TS, str(last_seen_ts))
|
||
|
||
while self._running:
|
||
try:
|
||
time.sleep(self.poll_interval_sec)
|
||
if not self._running:
|
||
break
|
||
|
||
posts = self._fetch_posts()
|
||
bot_id = self._get_bot_user_id()
|
||
for p in posts:
|
||
create_at = int(p.get("create_at", 0))
|
||
if create_at <= last_seen_ts:
|
||
continue
|
||
last_seen_ts = max(last_seen_ts, create_at)
|
||
if bot_id and p.get("user_id") == bot_id:
|
||
continue
|
||
|
||
msg_text = (p.get("message") or "").strip()
|
||
reply = self._dispatch(msg_text, p.get("id", ""))
|
||
if reply:
|
||
self._post_reply(reply, root_id=p.get("id"))
|
||
logger.info("명령 응답 전송: %s -> %s…", msg_text[:30], reply[:60])
|
||
|
||
self.db.set_kv(self.KV_LAST_SEEN_TS, str(last_seen_ts))
|
||
|
||
except Exception as e:
|
||
logger.warning("폴링 예외: %s", e)
|
||
|
||
def start(self) -> None:
|
||
"""백그라운드 스레드로 폴링 시작."""
|
||
if self._running:
|
||
return
|
||
if not self.bot_token:
|
||
logger.warning("MM_BOT_TOKEN 미설정 — Butler 리스너 미시작")
|
||
return
|
||
if not self._load_channel_id():
|
||
logger.warning("채널 ID 없음 (alias=%s) — Butler 리스너 미시작", self.channel_alias)
|
||
return
|
||
self._running = True
|
||
self._thread = threading.Thread(target=self._poll_loop, daemon=True)
|
||
self._thread.start()
|
||
logger.info(
|
||
"✅ MM Butler 시작 (채널=%s, 폴링=%ds, Claude=%s, Gemini=%s)",
|
||
self.channel_alias,
|
||
self.poll_interval_sec,
|
||
"✓" if claude_client else "✗",
|
||
"✓" if gemini_client else "✗",
|
||
)
|
||
|
||
def stop(self) -> None:
|
||
self._running = False
|
||
if self._thread:
|
||
self._thread.join(timeout=self.poll_interval_sec * 2)
|
||
self._thread = None
|
||
logger.info("MM Butler 종료")
|
||
|
||
|
||
# ==================================================================
|
||
# 진입점
|
||
# ==================================================================
|
||
|
||
def main() -> None:
|
||
channel_alias = get_env_from_db("MM_BUTLER_CHANNEL", "default").strip() or "default"
|
||
poll_sec = get_env_int("MM_BUTLER_POLL_SEC", 15)
|
||
|
||
hub = CommandHub(
|
||
server_url=MM_SERVER_URL,
|
||
bot_token=MM_BOT_TOKEN,
|
||
channel_alias=channel_alias,
|
||
db=shared_db,
|
||
poll_interval_sec=poll_sec,
|
||
)
|
||
hub.start()
|
||
|
||
logger.info("MM Butler 대기 중… (Ctrl+C 로 종료)")
|
||
try:
|
||
while True:
|
||
time.sleep(60)
|
||
except KeyboardInterrupt:
|
||
hub.stop()
|
||
logger.info("종료")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|