Files
kis_bot/kiwoom_universe_scanner.py
2026-03-17 12:33:30 +09:00

679 lines
29 KiB
Python

# kiwoom_universe_scanner.py
# 키움 REST API로 개미털기 유니버스 스캔 → MariaDB kis_quant_db 저장
# 매매 없음, 스캔 전용 단독 실행 파일
import time
import json
import datetime
import os
import logging
import requests
import random
from dotenv import load_dotenv
import sys
current_dir = os.path.dirname(os.path.abspath(__file__))
# ── 키움 REST API 인증 .env 로드 ─────────────────────────────────────
# kiwoom_rest_api/.env 에 KIWOOM_API_KEY / KIWOOM_API_SECRET / KIWOOM_USE_SANDBOX 저장
# (API 키는 보안상 .env 파일에만 보관, DB 저장 안 함)
_env_path = os.path.join(current_dir, "kiwoom_rest_api", ".env")
if not os.path.exists(_env_path):
_env_path = os.path.join(current_dir, ".env")
load_dotenv(_env_path)
# ── 로거 ──────────────────────────────────────────────
logging.basicConfig(
format='%(asctime)s %(message)s',
datefmt='%H:%M:%S',
level=logging.INFO
)
logger = logging.getLogger('UniverseScanner')
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('requests').setLevel(logging.WARNING)
# ── MariaDB 연동: 봇들과 동일한 DB에서 설정값 읽기 ─────────────────────
# get_env_int/float("KEY", default) → DB env_config 최신 행 우선, 없으면 default
try:
from database import TradeDB as _TradeDB
_db = _TradeDB()
_env_snap = (_db.get_latest_env() or {}).get("snapshot", {})
logger.info("✅ MariaDB 설정 연동 완료 (env_config 최신 %d키 로드)",
sum(1 for v in _env_snap.values() if v))
def _get_str(key: str, default: str = "") -> str:
v = _env_snap.get(key, "")
return str(v).split("#")[0].strip() if v else default
def _get_int(key: str, default: int) -> int:
try:
return int(_get_str(key, str(default))) or default
except (ValueError, TypeError):
return default
def _get_float(key: str, default: float) -> float:
try:
return float(_get_str(key, str(default))) or default
except (ValueError, TypeError):
return default
except Exception as _db_e:
logger.warning("⚠️ MariaDB 연동 실패 → os.environ fallback: %s", _db_e)
_env_snap = {}
def _get_str(key: str, default: str = "") -> str:
return os.environ.get(key, default)
def _get_int(key: str, default: int) -> int:
try:
return int(os.environ.get(key, str(default)))
except (ValueError, TypeError):
return default
def _get_float(key: str, default: float) -> float:
try:
return float(os.environ.get(key, str(default)))
except (ValueError, TypeError):
return default
# ── 설정값 (DB 우선, fallback: 기본값) ───────────────────────────────
MM_SERVER_URL = _get_str("MM_SERVER_URL", "https://mattermost.hoonfam.org")
MM_BOT_TOKEN = _get_str("MM_BOT_TOKEN_", "").strip() # DB 키는 MM_BOT_TOKEN_ (언더스코어)
MM_CONFIG_FILE = os.path.join(current_dir, 'mm_config.json')
SCAN_INTERVAL = _get_int("SCAN_INTERVAL_SEC", 300) # 스캔 주기 (초), 기본 5분
# 유니버스 저장 필터
TOP_N = _get_int("UPDATE_UNIVERSE_TOP_N", 20) # 저장 상위 N개
MIN_SCORE = _get_float("UPDATE_UNIVERSE_MIN_SCORE", 4.0) # 최소 강도 점수
# ── 종목 단가 상한 (리스크 파라미터 자동 파생) ───────────────────────────
# 원리: 투자금 = MAX_LOSS ÷ |STOP_PCT| → qty = floor(투자금/price)
# qty ≥ 1 이 되려면: price ≤ 투자금 = MAX_LOSS ÷ STOP_PCT
# 봇과 동일한 DB값 사용 → 봇이 실제로 살 수 없는 종목은 유니버스에 애초에 넣지 않음
_max_loss_krw = _get_int("MAX_LOSS_PER_TRADE_KRW", 200000)
_stop_loss_pct = abs(_get_float("STOP_LOSS_PCT", 0.03)) # 음수 저장 방지
_stop_loss_pct = _stop_loss_pct if _stop_loss_pct > 0 else 0.03
MAX_STOCK_PRICE_KRW = int(_max_loss_krw / _stop_loss_pct)
# ── Kiwoom API import ──────────────────────────────────
try:
from kiwoom_rest_api.auth.token import TokenManager
from kiwoom_rest_api.koreanstock.stockinfo import StockInfo
from kiwoom_rest_api.koreanstock.chart import Chart
from kiwoom_rest_api.koreanstock.order import Order
from kiwoom_rest_api.koreanstock.rank_info import RankInfo
from kiwoom_rest_api.koreanstock.account import Account
from kiwoom_rest_api.koreanstock.market_condition import MarketCondition
# 퀀트 트레이딩 필수 API 추가
from kiwoom_rest_api.koreanstock.sector import Sector
from kiwoom_rest_api.koreanstock.foreign_institution import ForeignInstitution
from kiwoom_rest_api.koreanstock.theme import Theme
from kiwoom_rest_api.koreanstock.etf import ETF
except ImportError as e:
logger.critical(f"❌ 키움 REST API 모듈 임포트 실패: {e}")
raise e
# ── Mattermost ─────────────────────────────────────────
class MattermostBot:
def __init__(self):
self.api_url = f'{MM_SERVER_URL.rstrip("/")}/api/v4/posts'
self.headers = {
'Authorization': f'Bearer {MM_BOT_TOKEN}',
'Content-Type': 'application/json'
}
self.channels = self._load_channels()
def _load_channels(self):
try:
if os.path.exists(MM_CONFIG_FILE):
with open(MM_CONFIG_FILE, 'r', encoding='utf-8') as f:
return json.load(f).get('channels', {})
except Exception as e:
logger.error(f'MM 채널 로드 오류: {e}')
return {}
def send(self, channel_alias, message):
channel_id = self.channels.get(channel_alias)
if not channel_id:
logger.warning(f'MM 채널 {channel_alias} ID 없음')
return False
try:
res = requests.post(
self.api_url,
headers=self.headers,
json={'channel_id': channel_id, 'message': message},
timeout=3
)
res.raise_for_status()
return True
except Exception as e:
logger.error(f'MM 전송 오류: {e}')
return False
# ── Broker API (스캔 전용) ─────────────────────────────
class BrokerAPI:
def __init__(self):
logger.info('키움 REST API 초기화 중...')
try:
self.token_manager = TokenManager()
# access_token 프로퍼티 호출로 최초 토큰 발급
_tok = self.token_manager.access_token
logger.info('키움 토큰 발급 완료 (len=%d)', len(_tok or ''))
self.stock_info = StockInfo(token_manager=self.token_manager)
self.chart = Chart(token_manager=self.token_manager)
self.order = Order(token_manager=self.token_manager)
self.rank = RankInfo(token_manager=self.token_manager)
self.account = Account(token_manager=self.token_manager)
self.market = MarketCondition(token_manager=self.token_manager)
self.sector = Sector(token_manager=self.token_manager)
self.foreign_inst = ForeignInstitution(token_manager=self.token_manager)
self.theme = Theme(token_manager=self.token_manager)
self.etf = ETF(token_manager=self.token_manager)
self.acc_no = os.environ.get('KIWOOM_ACCOUNT_NO', '')
logger.info(f'키움 API 초기화 완료 | 계좌: {self.acc_no}')
except Exception as e:
logger.critical(f'키움 API 초기화 실패: {e}')
raise
def _safe_request(self, func, *args, **kwargs):
"""
API 호출 안전장치
- return_code != '0' 이면 return_msg 로깅 후 재시도 여부 판단
- 429 / 초과 / 과부하 → 지수 백오프 재시도
- 인증 오류(8005) → 토큰 재발급 후 1회 재시도
"""
full_name = func.__name__
api_id = full_name.split('_')[-1]
max_retries = 3
for i in range(max_retries):
try:
time.sleep(0.5)
result = func(*args, **kwargs)
if isinstance(result, dict):
return_code = str(result.get('return_code', '0'))
return_msg = str(result.get('return_msg', ''))
if return_code == '0':
return result # ✅ 정상
# 토큰 만료(8005) → 재발급 후 1회 재시도
if '8005' in return_msg or 'Token' in return_msg:
logger.warning(f"⚠️ [{api_id}] 토큰 만료 감지 → 재발급 후 재시도")
self.token_manager._request_new_token()
time.sleep(1)
continue
# API 호출 초과 / 과부하
if '초과' in return_msg or '과부하' in return_msg or '429' in return_msg:
wait = (2 ** i) + random.uniform(0.5, 1.5)
logger.warning(f"⚠️ [{api_id}] 호출 제한 → {wait:.1f}초 대기 ({i+1}/{max_retries})")
time.sleep(wait)
continue
# 그 외 오류 → 로깅 후 빈 dict 반환 (재시도 불필요)
logger.warning(f"⚠️ [{api_id}] API 오류 (code={return_code}): {return_msg[:80]}")
return {}
return result
except Exception as e:
msg = str(e)
if ("429" in msg) or ("초과" in msg) or ("과부하" in msg):
wait = (2 ** i) + random.uniform(0.5, 1.5)
logger.warning(f"⚠️ [{api_id}] 예외 호출제한 → {wait:.1f}초 대기 ({i+1}/{max_retries})")
time.sleep(wait)
else:
logger.error(f"❌ [{api_id}] 예외: {e}")
return {}
logger.error(f"💀 [{api_id}] {max_retries}회 재시도 모두 실패")
return {}
def _is_valid_stock(self, name, code):
"""종목 필터링 (스팩, ETN, 우선주 등 제외)"""
if len(code) != 6 or not code.isdigit():
return False
exclude = ['스팩', 'ETN', 'W', 'ELW', '채권', '레버리지', '인버스',
'곱버스', '선물', '', '', '2X', '3X', '합성', 'H', 'B']
if any(k in name for k in exclude):
return False
if name.endswith('') or name.endswith('우B'):
return False
return True
def scan_ant_shaking_candidates(self):
"""
[조건검색 대체 로직] 개미털기(눌림목) 후보 종목 스캔
스캔 순서:
1. 거래대금/회전율 상위 종목 수집
2. [1차 컷] 가격 상한 필터 (MAX_STOCK_PRICE_KRW = MAX_LOSS ÷ STOP_PCT)
→ 이 이상 가격 종목은 1주라도 사면 손절 시 MAX_LOSS 초과 → 제외
3. [2차 계산] 상세 OHLCV → 낙폭·회복률·강도 점수 산정
→ 가격컷을 강도계산 이전에 수행해 불필요한 API 호출 최소화
"""
logger.info(f"🐜 [개미털기] 스캔 시작 (단가상한 {MAX_STOCK_PRICE_KRW:,}원 = 손실한도{_max_loss_krw:,}÷손절{_stop_loss_pct*100:.1f}%)")
logger.info(f" 📡 수집 방식: 거래대금 + 회전율 (2가지 소스)")
raw_codes_set = set()
scan_strategies = [("3", "거래대금"), ("2", "회전율")]
for sort_tp, desc in scan_strategies:
logger.info(f" 🔍 [{desc}] 상위 종목 조회 중...")
try:
res = self._safe_request(
self.rank.top_trading_volume_today_request_ka10030,
mrkt_tp="000",
sort_tp=sort_tp,
mang_stk_incls="3",
pric_tp="0",
crd_tp="0",
trde_qty_tp="0",
trde_prica_tp="0",
mrkt_open_tp="0",
stex_tp="3",
cont_yn="N"
)
if not res or 'tdy_trde_qty_upper' not in res:
logger.warning(f" ⚠️ [{desc}] 응답 없음")
continue
logger.info(f" ✅ [{desc}] {len(res['tdy_trde_qty_upper'])}개 수신")
for stock in res['tdy_trde_qty_upper']:
code = stock['stk_cd'].split('_')[0]
try:
price = abs(float(stock['cur_prc']))
except:
continue
# open_pric 없는 경우 현재가로 대체 (모의 API 일부 누락)
raw_open = stock.get('open_pric') or stock.get('opmr_pred_rt') or 0
try:
open_price = abs(float(raw_open)) if raw_open else price
except (ValueError, TypeError):
open_price = price
change_rate = ((price - open_price) / open_price * 100) if open_price > 0 else 0
if price < 1000: # 동전주 제외
continue
if change_rate > 20: # 상한가 근처 제외
continue
# [1차 컷] 단가 상한: price > MAX_LOSS ÷ STOP_PCT 이면 qty=0이 되어 매수 불가
# → 강도 계산(2차) 이전에 먼저 제거해 불필요한 API 호출 방지
if price > MAX_STOCK_PRICE_KRW:
continue
name = stock['stk_nm']
if self._is_valid_stock(name, code):
raw_codes_set.add(code)
except Exception as e:
logger.error(f"스캔({desc}) 중 에러: {e}")
raw_codes = list(raw_codes_set)
logger.info(f" 📥 후보 수집: {len(raw_codes)}개 -> 정밀 분석")
final_list = []
chunk_size = 50
try:
for i in range(0, len(raw_codes), chunk_size):
chunk = raw_codes[i:i + chunk_size]
code_str = '|'.join(chunk)
res = self._safe_request(
self.stock_info.watchlist_stock_information_request_ka10095,
stock_code=code_str
)
if res and 'atn_stk_infr' in res:
for item in res['atn_stk_infr']:
code = item['stk_cd'].strip()
if code.startswith('A'):
code = code[1:]
try:
op = abs(float(item.get('open_pric', 0)))
hi = abs(float(item.get('high_pric', 0)))
lo = abs(float(item.get('low_pric', 0)))
cl = abs(float(item.get('cur_prc', 0)))
if op == 0:
continue
# 가격 상한은 1차 수집 단계에서 이미 적용됨 → 중복 필터 불필요
drop_rate = (op - lo) / op
total_range = hi - lo
recovery_pos = (cl - lo) / total_range if total_range > 0 else 0
if total_range > 0 and drop_rate > 0.03:
if recovery_pos > 0.5:
score = drop_rate * 100
logger.info(
f" 💎 {item['stk_nm'].strip()} {code}: "
f"낙폭 {drop_rate*100:.1f}% | 회복 {recovery_pos*100:.0f}% | 점수 {score:.2f}"
)
final_list.append({
'code': code,
'name': item['stk_nm'].strip(),
'price': cl,
'score': score
})
except Exception as e:
logger.debug(f"종목 분석 오류({code}): {e}")
continue
time.sleep(0.5)
final_list.sort(key=lambda x: x['score'], reverse=True)
return final_list
except Exception as e:
logger.error(f"분석 중 치명적 에러: {e}")
return []
def get_foreign_consecutive_buy(self, consecutive_days=3, market="001", limit=20):
try:
res = self._safe_request(
self.rank.top_foreign_consecutive_net_buy_request_ka10035,
mrkt_tp=market,
trde_tp="2",
base_dt_tp="1",
stex_tp="1"
)
stocks = res.get('for_cont_nettrde_upper', []) if res else []
return stocks[:limit]
except Exception as e:
logger.error(f"외국인 연속 순매수 조회 실패: {e}")
return []
def get_institutional_buy_stocks(self, market="001", limit=20):
try:
from datetime import datetime as dt
today = dt.now().strftime("%Y%m%d")
res = self._safe_request(
self.rank.same_day_net_buying_ranking_request_ka10062,
strt_dt=today,
mrkt_tp=market,
trde_tp="1",
sort_cnd="1",
unit_tp="1",
stex_tp="1"
)
stocks = res.get('eql_nettrde_rank', []) if res else []
return [s for s in stocks if float(s.get('orgn_nettrde_qty', 0)) > 0][:limit]
except Exception as e:
logger.error(f"기관 순매수 조회 실패: {e}")
return []
def get_volume_surge_stocks(self, market="001", min_volume="50", limit=20):
try:
res = self._safe_request(
self.rank.sudden_increase_trading_volume_request_ka10023,
mrkt_tp=market,
sort_tp="1",
tm_tp="2",
trde_qty_tp=min_volume,
stk_cnd="1",
pric_tp="0",
stex_tp="1"
)
stocks = res.get('trde_qty_sdnin', []) if res else []
return stocks[:limit]
except Exception as e:
logger.error(f"거래량 급증 조회 실패: {e}")
return []
def get_top_price_movers(self, market="001", sort_type="1", limit=20):
try:
res = self._safe_request(
self.rank.top_day_over_day_change_rate_request_ka10027,
mrkt_tp=market,
sort_tp=sort_type,
trde_qty_cnd="0000",
stk_cnd="1",
crd_cnd="0",
updown_incls="1",
pric_cnd="0",
trde_prica_cnd="0",
stex_tp="1"
)
stocks = res.get('pred_pre_flu_rt_upper', []) if res else []
return stocks[:limit]
except Exception as e:
logger.error(f"등락률 조회 실패: {e}")
return []
def check_market_status(self):
now = datetime.datetime.now()
if not (datetime.time(8, 30) <= now.time() <= datetime.time(16, 0)):
return False
if now.weekday() >= 5:
return False
return True
# ── MariaDB 저장 ────────────────────────────────────────
def save_to_kis_db(candidates: list, db_path: str = None):
"""
target_candidates 테이블에 저장 (MariaDB kis_quant_db).
db_path 인수는 하위 호환용으로 유지하되 무시됩니다.
TradeDB.update_target_candidates() 를 사용합니다.
"""
try:
from database import TradeDB
db = TradeDB()
ok = db.update_target_candidates(candidates)
if ok:
logger.info(f'💾 MariaDB 저장 완료: {len(candidates)}종목 → kis_quant_db.target_candidates')
return ok
except Exception as e:
logger.error(f'MariaDB 저장 실패: {e}')
return False
# ── 유니버스 업데이트 ──────────────────────────────────
def update_universe(api: BrokerAPI, mm: MattermostBot):
logger.info('=' * 50)
logger.info(f'🔄 유니버스 업데이트 시작 | {datetime.datetime.now().strftime("%H:%M:%S")}')
all_candidates = {} # code → {code, name, price, basescore, bonusscore, fromant}
# 1. 개미털기 스캔 (핵심)
# 가격 상한(MAX_STOCK_PRICE_KRW)은 scan_ant_shaking_candidates 내부에서 1차로 적용됨
try:
antshaking = api.scan_ant_shaking_candidates()
for item in antshaking:
code = item['code']
all_candidates[code] = {
'code': code,
'name': item['name'],
'price': item['price'],
'basescore': item['score'],
'bonusscore': 0.0,
'fromant': True
}
logger.info(f'🐜 개미털기 후보: {len(antshaking)}종목')
except Exception as e:
logger.warning(f'개미털기 스캔 오류: {e}')
# 2. 외국인 연속 매수 보너스 (0.5점)
try:
foreign_buy = api.get_foreign_consecutive_buy(consecutive_days=2, limit=30)
logger.info(f'🌍 외국인 연속매수: {len(foreign_buy)}')
for idx, item in enumerate(foreign_buy):
code = item.get('stk_cd', '').strip()
if len(code) != 6:
continue
bonus = (30 - idx) / 30.0 * 0.5
if code in all_candidates:
all_candidates[code]['bonusscore'] += bonus
else:
all_candidates[code] = {
'code': code, 'name': item.get('stk_nm', code),
'price': 0, 'basescore': 3.0, 'bonusscore': bonus, 'fromant': False
}
except Exception as e:
logger.warning(f'외국인 연속매수 오류: {e}')
# 3. 거래량 급증 보너스 (0.3점)
try:
vol_surge = api.get_volume_surge_stocks(min_volume="50", limit=30)
logger.info(f'📈 거래량 급증: {len(vol_surge)}')
for idx, item in enumerate(vol_surge):
code = item.get('stk_cd', '').split('_')[0].strip()
if len(code) != 6:
continue
bonus = (30 - idx) / 30.0 * 0.3
price = abs(float(item.get('cur_prc') or item.get('prpr') or 0))
if code in all_candidates:
all_candidates[code]['bonusscore'] += bonus
else:
all_candidates[code] = {
'code': code, 'name': item.get('stk_nm', code),
'price': price,
'basescore': 2.5, 'bonusscore': bonus, 'fromant': False
}
except Exception as e:
logger.warning(f'거래량 급증 오류: {e}')
# 4. 기관 매수 보너스 (0.3점)
try:
inst_buy = api.get_institutional_buy_stocks(limit=30)
logger.info(f'🏦 기관 매수: {len(inst_buy)}')
for idx, item in enumerate(inst_buy):
code = item.get('stk_cd', '').strip()
if len(code) != 6:
continue
bonus = (30 - idx) / 30.0 * 0.3
price = abs(float(item.get('cur_prc') or item.get('prpr') or 0))
if code in all_candidates:
all_candidates[code]['bonusscore'] += bonus
else:
all_candidates[code] = {
'code': code, 'name': item.get('stk_nm', code),
'price': price,
'basescore': 2.5, 'bonusscore': bonus, 'fromant': False
}
except Exception as e:
logger.warning(f'기관 매수 오류: {e}')
# 5. 등락률 상위 보너스 (0.2점)
try:
price_movers = api.get_top_price_movers(sort_type='1', limit=30)
logger.info(f'🚀 등락률 상위: {len(price_movers)}')
for idx, item in enumerate(price_movers):
code = item.get('stk_cd', '').strip()
if len(code) != 6:
continue
bonus = (30 - idx) / 30.0 * 0.2
price = abs(float(item.get('cur_prc') or item.get('prpr') or 0))
if code in all_candidates:
all_candidates[code]['bonusscore'] += bonus
else:
all_candidates[code] = {
'code': code, 'name': item.get('stk_nm', code),
'price': price,
'basescore': 2.0, 'bonusscore': bonus, 'fromant': False
}
except Exception as e:
logger.warning(f'등락률 순위 오류: {e}')
# 6. 최종 후보 필터 (MIN_SCORE 이상만)
now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
final_candidates = []
ant_count = 0
bonus_count = 0
for code, data in all_candidates.items():
total_score = data['basescore'] + data['bonusscore']
if total_score >= MIN_SCORE:
final_candidates.append({
'code': code,
'name': data['name'],
'score': round(total_score, 2),
'price': data['price'],
'scan_time': now_str
})
if data['fromant']:
ant_count += 1
else:
bonus_count += 1
final_candidates.sort(key=lambda x: x['score'], reverse=True)
final_candidates = final_candidates[:TOP_N]
top5 = [f"{x['name']}({x['score']:.1f})" for x in final_candidates[:5]]
logger.info(f'✅ 최종 후보: {len(final_candidates)}종목 (개미털기:{ant_count} 보너스:{bonus_count})')
logger.info(f'🔝 Top5: {", ".join(top5)}')
# 7. MariaDB 저장
if final_candidates:
save_to_kis_db(final_candidates)
else:
logger.warning('⚠️ 최종 후보 0개 → DB 저장 생략')
# 8. Mattermost 알림
try:
msg = (
f'🔄 유니버스 업데이트 완료\n'
f'- 후보: {len(final_candidates)}종목 (개미털기:{ant_count} 보너스:{bonus_count})\n'
f'- Top5: {", ".join(top5)}'
)
mm.send('stock', msg)
except Exception as e:
logger.debug(f'MM 알림 오류: {e}')
logger.info('=' * 50)
# ── 메인 루프 ──────────────────────────────────────────
def main():
logger.info('🚀 키움 유니버스 스캐너 시작')
logger.info(f' DB : MariaDB kis_quant_db (192.168.0.141)')
logger.info(f' 스캔 주기 : {SCAN_INTERVAL}')
logger.info(f' 최소 점수 : {MIN_SCORE}')
logger.info(f' 저장 Top N : {TOP_N}')
api = BrokerAPI()
mm = MattermostBot()
is_first_run = True
while True:
try:
is_open = api.check_market_status()
now = datetime.datetime.now()
# 장 중이거나 첫 실행이면 스캔
if is_open or is_first_run:
update_universe(api, mm)
is_first_run = False
else:
logger.info(f'⏸ 장 외 시간 ({now.strftime("%H:%M:%S")}) → 스캔 대기')
# 다음 5분 경계까지 대기
next_min = (now.minute // 5 + 1) * 5
if next_min >= 60:
next_time = now.replace(hour=now.hour + 1, minute=0, second=5, microsecond=0)
else:
next_time = now.replace(minute=next_min, second=5, microsecond=0)
wait_sec = max(5, (next_time - now).total_seconds())
logger.info(f'⏳ 다음 스캔: {next_time.strftime("%H:%M:%S")} ({wait_sec:.0f}초 후)')
time.sleep(wait_sec)
except KeyboardInterrupt:
logger.info('⛔ 종료 요청')
break
except Exception as e:
logger.error(f'메인 루프 오류: {e}')
time.sleep(60)
if __name__ == '__main__':
main()