""" 매터모스트 원격 조종 (양방향 챗봇) - 채널 메시지를 폴링하여 !적용 / !설정 명령을 처리 - 수치만 DB(env_config)에 반영. 메인 루프가 매 사이클마다 reload_config() 하므로 별도 콜백 없음. """ import json import re import logging import threading import time from pathlib import Path from typing import Optional, Tuple import requests from database import TradeDB, ENV_CONFIG_KEYS logger = logging.getLogger("MMRemote") class MattermostRemoteController: """ 매터모스트 채널에서 !적용 / !설정 명령을 감지해 DB(env_config)에만 반영. 메인 매매 루프가 매 사이클마다 reload_config() 하므로 여기서는 수치만 저장. 별도 스레드에서 폴링하며 동작합니다. """ KV_LAST_SEEN_TS = "mm_remote_last_seen_ts" def __init__( self, server_url: str, bot_token: str, channel_alias: str, mm_config_path: Path, db: TradeDB, poll_interval_sec: int = 18, ): self.server_url = server_url.rstrip("/") self.bot_token = bot_token self.channel_alias = channel_alias self.mm_config_path = Path(mm_config_path) self.db = db self.poll_interval_sec = poll_interval_sec self._channel_id: Optional[str] = None self._bot_user_id: Optional[str] = None self._headers = { "Authorization": f"Bearer {bot_token}", "Content-Type": "application/json", } self._running = False self._thread: Optional[threading.Thread] = None def _load_channel_id(self) -> Optional[str]: if self._channel_id: return self._channel_id try: if self.mm_config_path.exists(): with open(self.mm_config_path, "r", encoding="utf-8") as f: data = json.load(f) channels = data.get("channels", {}) self._channel_id = channels.get(self.channel_alias) return self._channel_id except Exception as e: logger.warning("채널 ID 로드 실패: %s", e) return None def _get_bot_user_id(self) -> Optional[str]: if self._bot_user_id: return self._bot_user_id try: r = requests.get( f"{self.server_url}/api/v4/users/me", headers=self._headers, timeout=5, ) r.raise_for_status() self._bot_user_id = r.json().get("id") return self._bot_user_id except Exception as e: logger.warning("봇 사용자 ID 조회 실패: %s", e) return None def _fetch_posts(self) -> list: """채널 최근 게시물 목록 (최신순).""" cid = self._load_channel_id() if not cid: return [] try: r = requests.get( f"{self.server_url}/api/v4/channels/{cid}/posts", params={"per_page": 30}, headers=self._headers, timeout=5, ) r.raise_for_status() data = r.json() order = data.get("order", []) posts = data.get("posts", {}) bot_id = self._get_bot_user_id() out = [] for pid in order: p = posts.get(pid) if not p: continue if bot_id and p.get("user_id") == bot_id: continue out.append(p) return out except Exception as e: logger.debug("게시물 조회 실패: %s", e) return [] def _post_reply(self, message: str, root_id: Optional[str] = None) -> bool: cid = self._load_channel_id() if not cid: return False payload = {"channel_id": cid, "message": message} if root_id: payload["root_id"] = root_id try: r = requests.post( f"{self.server_url}/api/v4/posts", headers=self._headers, json=payload, timeout=5, ) r.raise_for_status() return True except Exception as e: logger.error("MM 전송 실패: %s", e) return False def _apply_all(self) -> Tuple[bool, str]: """마지막 AI 추천문을 파싱해 전부 env에 반영. (성공 여부, 요약 메시지)""" text = self.db.get_last_ai_recommendations() if not text or not text.strip(): return False, "저장된 AI 추천이 없습니다. 먼저 13시 AI 리포트를 받아주세요." valid_keys = set(ENV_CONFIG_KEYS) updates = {} for line in text.splitlines(): line = line.strip() if not line: continue m = re.match(r"^([A-Z][A-Z0-9_]*)=(.+)$", line) if m and m.group(1) in valid_keys: updates[m.group(1)] = m.group(2).strip() if not updates: return False, "추천문에서 유효한 KEY=값을 찾지 못했습니다." latest = self.db.get_latest_env() if not latest or not latest.get("snapshot"): return False, "현재 env가 없습니다." snap = dict(latest["snapshot"]) snap.update(updates) rid = self.db.insert_env_snapshot(snap) if rid is None: return False, "DB 반영 실패." summary = ", ".join(f"{k}={v}" for k, v in sorted(updates.items())[:10]) if len(updates) > 10: summary += f" 외 {len(updates) - 10}건" return True, f"✅ 적용 완료 ({len(updates)}건): {summary}" def _apply_one(self, key: str, value: str) -> Tuple[bool, str]: """단일 키만 env에 반영. (성공 여부, 요약 메시지)""" key = key.strip().upper() if key not in set(ENV_CONFIG_KEYS): return False, f"알 수 없는 설정 키: {key}" latest = self.db.get_latest_env() if not latest or not latest.get("snapshot"): return False, "현재 env가 없습니다." snap = dict(latest["snapshot"]) snap[key] = value.strip() rid = self.db.insert_env_snapshot(snap) if rid is None: return False, "DB 반영 실패." return True, f"✅ 설정 반영: {key}={value}" def _process_message(self, message: str, post_id: str) -> Optional[str]: """ 메시지에서 !적용 / !설정 처리. 처리 시 DB 반영 후 응답 문구 반환. """ msg = (message or "").strip() if not msg.startswith("!"): return None if msg == "!적용" or msg.startswith("!적용 "): ok, reply = self._apply_all() return reply if msg == "!설정": return "사용법: !설정 KEY 값 또는 !설정 KEY=값" if msg.startswith("!설정 "): rest = msg[3:].strip() # !설정 MAX_STOCKS 4 또는 !설정 MAX_STOCKS=4 m = re.match(r"^([A-Za-z_][A-Za-z0-9_]*)\s*=\s*(.+)$", rest) if m: key, val = m.group(1), m.group(2).strip() else: parts = rest.split(None, 1) if len(parts) < 2: return "사용법: !설정 KEY 값 또는 !설정 KEY=값" key, val = parts[0], parts[1] ok, reply = self._apply_one(key, val) return reply return None def _poll_loop(self): """폴링 루프: 마지막 처리 시각 이후 메시지만 처리 (재시작 시 DB에서 복원).""" ts_str = self.db.get_kv(self.KV_LAST_SEEN_TS) last_seen_ts = int(ts_str) if ts_str else int(time.time() * 1000) self.db.set_kv(self.KV_LAST_SEEN_TS, str(last_seen_ts)) while self._running: try: time.sleep(self.poll_interval_sec) if not self._running: break posts = self._fetch_posts() bot_id = self._get_bot_user_id() for p in posts: create_at = int(p.get("create_at", 0)) if create_at <= last_seen_ts: continue last_seen_ts = max(last_seen_ts, create_at) if bot_id and p.get("user_id") == bot_id: continue msg = (p.get("message") or "").strip() reply = self._process_message(msg, p.get("id", "")) if reply: self._post_reply(reply, root_id=p.get("id")) logger.info("MM 원격 명령 처리: %s -> %s", msg[:50], reply[:50]) self.db.set_kv(self.KV_LAST_SEEN_TS, str(last_seen_ts)) except Exception as e: logger.warning("MM 폴링 예외: %s", e) def start(self): """백그라운드 스레드로 폴링 시작. mm_butler.py 가 동일 채널을 이미 처리 중이면 중복 응답을 막기 위해 mm_remote 리스너를 시작하지 않습니다. mm_butler 가 없거나 다른 채널을 담당할 때만 활성화됩니다. """ if self._running: return if not self.bot_token or not self._load_channel_id(): logger.warning("MM 원격 조종: 토큰/채널 없음 - 리스너 미시작") return # mm_butler.py 와 동일 채널이면 중복 처리 방지 (mm_butler 가 우선) try: from kis_long_ver1 import get_env_from_db, MM_CONFIG_FILE import json as _json butler_alias = get_env_from_db("MM_BUTLER_CHANNEL", "default").strip() or "default" if MM_CONFIG_FILE.exists(): with open(MM_CONFIG_FILE, "r", encoding="utf-8") as _f: _cfg = _json.load(_f) butler_cid = _cfg.get("channels", {}).get(butler_alias) my_cid = self._load_channel_id() if butler_cid and my_cid and butler_cid == my_cid: logger.info( "ℹ️ mm_remote: mm_butler.py 가 동일 채널(%s) 담당 중 → mm_remote 리스너 비활성 (중복 응답 방지)", butler_alias, ) return except Exception: pass # 확인 실패 시 기존대로 시작 self._running = True self._thread = threading.Thread(target=self._poll_loop, daemon=True) self._thread.start() logger.info("MM 원격 조종 리스너 시작 (채널=%s, !적용 / !설정)", self.channel_alias) def stop(self): """폴링 중지.""" self._running = False if self._thread: self._thread.join(timeout=self.poll_interval_sec * 2) self._thread = None