291 lines
11 KiB
Python
291 lines
11 KiB
Python
"""
|
||
매터모스트 원격 조종 (양방향 챗봇)
|
||
- 채널 메시지를 폴링하여 !적용 / !설정 명령을 처리
|
||
- 수치만 DB(env_config)에 반영. 메인 루프가 매 사이클마다 reload_config() 하므로 별도 콜백 없음.
|
||
"""
|
||
import json
|
||
import re
|
||
import logging
|
||
import threading
|
||
import time
|
||
from pathlib import Path
|
||
from typing import Optional, Tuple
|
||
|
||
import requests
|
||
|
||
from database import TradeDB, ENV_CONFIG_KEYS
|
||
|
||
logger = logging.getLogger("MMRemote")
|
||
|
||
|
||
class MattermostRemoteController:
|
||
"""
|
||
매터모스트 채널에서 !적용 / !설정 명령을 감지해 DB(env_config)에만 반영.
|
||
메인 매매 루프가 매 사이클마다 reload_config() 하므로 여기서는 수치만 저장.
|
||
별도 스레드에서 폴링하며 동작합니다.
|
||
"""
|
||
|
||
KV_LAST_SEEN_TS = "mm_remote_last_seen_ts"
|
||
|
||
def __init__(
|
||
self,
|
||
server_url: str,
|
||
bot_token: str,
|
||
channel_alias: str,
|
||
mm_config_path: Path,
|
||
db: TradeDB,
|
||
poll_interval_sec: int = 18,
|
||
):
|
||
self.server_url = server_url.rstrip("/")
|
||
self.bot_token = bot_token
|
||
self.channel_alias = channel_alias
|
||
self.mm_config_path = Path(mm_config_path)
|
||
self.db = db
|
||
self.poll_interval_sec = poll_interval_sec
|
||
|
||
self._channel_id: Optional[str] = None
|
||
self._bot_user_id: Optional[str] = None
|
||
self._headers = {
|
||
"Authorization": f"Bearer {bot_token}",
|
||
"Content-Type": "application/json",
|
||
}
|
||
self._running = False
|
||
self._thread: Optional[threading.Thread] = None
|
||
|
||
def _load_channel_id(self) -> Optional[str]:
|
||
if self._channel_id:
|
||
return self._channel_id
|
||
try:
|
||
if self.mm_config_path.exists():
|
||
with open(self.mm_config_path, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
channels = data.get("channels", {})
|
||
self._channel_id = channels.get(self.channel_alias)
|
||
return self._channel_id
|
||
except Exception as e:
|
||
logger.warning("채널 ID 로드 실패: %s", e)
|
||
return None
|
||
|
||
def _get_bot_user_id(self) -> Optional[str]:
|
||
if self._bot_user_id:
|
||
return self._bot_user_id
|
||
try:
|
||
r = requests.get(
|
||
f"{self.server_url}/api/v4/users/me",
|
||
headers=self._headers,
|
||
timeout=5,
|
||
)
|
||
r.raise_for_status()
|
||
self._bot_user_id = r.json().get("id")
|
||
return self._bot_user_id
|
||
except Exception as e:
|
||
logger.warning("봇 사용자 ID 조회 실패: %s", e)
|
||
return None
|
||
|
||
def _fetch_posts(self) -> list:
|
||
"""채널 최근 게시물 목록 (최신순)."""
|
||
cid = self._load_channel_id()
|
||
if not cid:
|
||
return []
|
||
try:
|
||
r = requests.get(
|
||
f"{self.server_url}/api/v4/channels/{cid}/posts",
|
||
params={"per_page": 30},
|
||
headers=self._headers,
|
||
timeout=5,
|
||
)
|
||
r.raise_for_status()
|
||
data = r.json()
|
||
order = data.get("order", [])
|
||
posts = data.get("posts", {})
|
||
bot_id = self._get_bot_user_id()
|
||
out = []
|
||
for pid in order:
|
||
p = posts.get(pid)
|
||
if not p:
|
||
continue
|
||
if bot_id and p.get("user_id") == bot_id:
|
||
continue
|
||
out.append(p)
|
||
return out
|
||
except Exception as e:
|
||
logger.debug("게시물 조회 실패: %s", e)
|
||
return []
|
||
|
||
def _post_reply(self, message: str, root_id: Optional[str] = None) -> bool:
|
||
cid = self._load_channel_id()
|
||
if not cid:
|
||
return False
|
||
payload = {"channel_id": cid, "message": message}
|
||
if root_id:
|
||
payload["root_id"] = root_id
|
||
try:
|
||
r = requests.post(
|
||
f"{self.server_url}/api/v4/posts",
|
||
headers=self._headers,
|
||
json=payload,
|
||
timeout=5,
|
||
)
|
||
r.raise_for_status()
|
||
return True
|
||
except Exception as e:
|
||
logger.error("MM 전송 실패: %s", e)
|
||
return False
|
||
|
||
def _apply_all(self) -> Tuple[bool, str]:
|
||
"""마지막 AI 추천문을 파싱해 전부 env에 반영. (성공 여부, 요약 메시지)"""
|
||
text = self.db.get_last_ai_recommendations()
|
||
if not text or not text.strip():
|
||
return False, "저장된 AI 추천이 없습니다. 먼저 13시 AI 리포트를 받아주세요."
|
||
|
||
valid_keys = set(ENV_CONFIG_KEYS)
|
||
updates = {}
|
||
for line in text.splitlines():
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
m = re.match(r"^([A-Z][A-Z0-9_]*)=(.+)$", line)
|
||
if m and m.group(1) in valid_keys:
|
||
updates[m.group(1)] = m.group(2).strip()
|
||
|
||
if not updates:
|
||
return False, "추천문에서 유효한 KEY=값을 찾지 못했습니다."
|
||
|
||
latest = self.db.get_latest_env()
|
||
if not latest or not latest.get("snapshot"):
|
||
return False, "현재 env가 없습니다."
|
||
|
||
snap = dict(latest["snapshot"])
|
||
snap.update(updates)
|
||
rid = self.db.insert_env_snapshot(snap)
|
||
if rid is None:
|
||
return False, "DB 반영 실패."
|
||
|
||
summary = ", ".join(f"{k}={v}" for k, v in sorted(updates.items())[:10])
|
||
if len(updates) > 10:
|
||
summary += f" 외 {len(updates) - 10}건"
|
||
return True, f"✅ 적용 완료 ({len(updates)}건): {summary}"
|
||
|
||
def _apply_one(self, key: str, value: str) -> Tuple[bool, str]:
|
||
"""단일 키만 env에 반영. (성공 여부, 요약 메시지)"""
|
||
key = key.strip().upper()
|
||
if key not in set(ENV_CONFIG_KEYS):
|
||
return False, f"알 수 없는 설정 키: {key}"
|
||
|
||
latest = self.db.get_latest_env()
|
||
if not latest or not latest.get("snapshot"):
|
||
return False, "현재 env가 없습니다."
|
||
|
||
snap = dict(latest["snapshot"])
|
||
snap[key] = value.strip()
|
||
rid = self.db.insert_env_snapshot(snap)
|
||
if rid is None:
|
||
return False, "DB 반영 실패."
|
||
return True, f"✅ 설정 반영: {key}={value}"
|
||
|
||
def _process_message(self, message: str, post_id: str) -> Optional[str]:
|
||
"""
|
||
메시지에서 !적용 / !설정 처리. 처리 시 DB 반영 후 응답 문구 반환.
|
||
"""
|
||
msg = (message or "").strip()
|
||
if not msg.startswith("!"):
|
||
return None
|
||
|
||
if msg == "!적용" or msg.startswith("!적용 "):
|
||
ok, reply = self._apply_all()
|
||
return reply
|
||
|
||
if msg == "!설정":
|
||
return "사용법: !설정 KEY 값 또는 !설정 KEY=값"
|
||
|
||
if msg.startswith("!설정 "):
|
||
rest = msg[3:].strip()
|
||
# !설정 MAX_STOCKS 4 또는 !설정 MAX_STOCKS=4
|
||
m = re.match(r"^([A-Za-z_][A-Za-z0-9_]*)\s*=\s*(.+)$", rest)
|
||
if m:
|
||
key, val = m.group(1), m.group(2).strip()
|
||
else:
|
||
parts = rest.split(None, 1)
|
||
if len(parts) < 2:
|
||
return "사용법: !설정 KEY 값 또는 !설정 KEY=값"
|
||
key, val = parts[0], parts[1]
|
||
ok, reply = self._apply_one(key, val)
|
||
return reply
|
||
|
||
return None
|
||
|
||
def _poll_loop(self):
|
||
"""폴링 루프: 마지막 처리 시각 이후 메시지만 처리 (재시작 시 DB에서 복원)."""
|
||
ts_str = self.db.get_kv(self.KV_LAST_SEEN_TS)
|
||
last_seen_ts = int(ts_str) if ts_str else int(time.time() * 1000)
|
||
self.db.set_kv(self.KV_LAST_SEEN_TS, str(last_seen_ts))
|
||
|
||
while self._running:
|
||
try:
|
||
time.sleep(self.poll_interval_sec)
|
||
if not self._running:
|
||
break
|
||
|
||
posts = self._fetch_posts()
|
||
bot_id = self._get_bot_user_id()
|
||
for p in posts:
|
||
create_at = int(p.get("create_at", 0))
|
||
if create_at <= last_seen_ts:
|
||
continue
|
||
last_seen_ts = max(last_seen_ts, create_at)
|
||
if bot_id and p.get("user_id") == bot_id:
|
||
continue
|
||
msg = (p.get("message") or "").strip()
|
||
reply = self._process_message(msg, p.get("id", ""))
|
||
if reply:
|
||
self._post_reply(reply, root_id=p.get("id"))
|
||
logger.info("MM 원격 명령 처리: %s -> %s", msg[:50], reply[:50])
|
||
|
||
self.db.set_kv(self.KV_LAST_SEEN_TS, str(last_seen_ts))
|
||
except Exception as e:
|
||
logger.warning("MM 폴링 예외: %s", e)
|
||
|
||
def start(self):
|
||
"""백그라운드 스레드로 폴링 시작.
|
||
|
||
mm_butler.py 가 동일 채널을 이미 처리 중이면 중복 응답을 막기 위해
|
||
mm_remote 리스너를 시작하지 않습니다.
|
||
mm_butler 가 없거나 다른 채널을 담당할 때만 활성화됩니다.
|
||
"""
|
||
if self._running:
|
||
return
|
||
if not self.bot_token or not self._load_channel_id():
|
||
logger.warning("MM 원격 조종: 토큰/채널 없음 - 리스너 미시작")
|
||
return
|
||
|
||
# mm_butler.py 와 동일 채널이면 중복 처리 방지 (mm_butler 가 우선)
|
||
try:
|
||
from kis_long_ver1 import get_env_from_db, MM_CONFIG_FILE
|
||
import json as _json
|
||
butler_alias = get_env_from_db("MM_BUTLER_CHANNEL", "default").strip() or "default"
|
||
if MM_CONFIG_FILE.exists():
|
||
with open(MM_CONFIG_FILE, "r", encoding="utf-8") as _f:
|
||
_cfg = _json.load(_f)
|
||
butler_cid = _cfg.get("channels", {}).get(butler_alias)
|
||
my_cid = self._load_channel_id()
|
||
if butler_cid and my_cid and butler_cid == my_cid:
|
||
logger.info(
|
||
"ℹ️ mm_remote: mm_butler.py 가 동일 채널(%s) 담당 중 → mm_remote 리스너 비활성 (중복 응답 방지)",
|
||
butler_alias,
|
||
)
|
||
return
|
||
except Exception:
|
||
pass # 확인 실패 시 기존대로 시작
|
||
|
||
self._running = True
|
||
self._thread = threading.Thread(target=self._poll_loop, daemon=True)
|
||
self._thread.start()
|
||
logger.info("MM 원격 조종 리스너 시작 (채널=%s, !적용 / !설정)", self.channel_alias)
|
||
|
||
def stop(self):
|
||
"""폴링 중지."""
|
||
self._running = False
|
||
if self._thread:
|
||
self._thread.join(timeout=self.poll_interval_sec * 2)
|
||
self._thread = None
|