# kiwoom_universe_scanner.py # 키움 REST API로 개미털기 유니버스 스캔 → MariaDB kis_quant_db 저장 # 매매 없음, 스캔 전용 단독 실행 파일 import time import json import datetime import os import logging import requests import random from dotenv import load_dotenv import sys current_dir = os.path.dirname(os.path.abspath(__file__)) # ── 키움 REST API 인증 .env 로드 ───────────────────────────────────── # kiwoom_rest_api/.env 에 KIWOOM_API_KEY / KIWOOM_API_SECRET / KIWOOM_USE_SANDBOX 저장 # (API 키는 보안상 .env 파일에만 보관, DB 저장 안 함) _env_path = os.path.join(current_dir, "kiwoom_rest_api", ".env") if not os.path.exists(_env_path): _env_path = os.path.join(current_dir, ".env") load_dotenv(_env_path) # ── 로거 ────────────────────────────────────────────── logging.basicConfig( format='%(asctime)s %(message)s', datefmt='%H:%M:%S', level=logging.INFO ) logger = logging.getLogger('UniverseScanner') logging.getLogger('urllib3').setLevel(logging.WARNING) logging.getLogger('requests').setLevel(logging.WARNING) # ── MariaDB 연동: 봇들과 동일한 DB에서 설정값 읽기 ───────────────────── # get_env_int/float("KEY", default) → DB env_config 최신 행 우선, 없으면 default try: from database import TradeDB as _TradeDB _db = _TradeDB() _env_snap = (_db.get_latest_env() or {}).get("snapshot", {}) logger.info("✅ MariaDB 설정 연동 완료 (env_config 최신 %d키 로드)", sum(1 for v in _env_snap.values() if v)) def _get_str(key: str, default: str = "") -> str: v = _env_snap.get(key, "") return str(v).split("#")[0].strip() if v else default def _get_int(key: str, default: int) -> int: try: return int(_get_str(key, str(default))) or default except (ValueError, TypeError): return default def _get_float(key: str, default: float) -> float: try: return float(_get_str(key, str(default))) or default except (ValueError, TypeError): return default except Exception as _db_e: logger.warning("⚠️ MariaDB 연동 실패 → os.environ fallback: %s", _db_e) _env_snap = {} def _get_str(key: str, default: str = "") -> str: return os.environ.get(key, default) def _get_int(key: str, default: int) -> int: try: return int(os.environ.get(key, str(default))) except (ValueError, TypeError): return default def _get_float(key: str, default: float) -> float: try: return float(os.environ.get(key, str(default))) except (ValueError, TypeError): return default # ── 설정값 (DB 우선, fallback: 기본값) ─────────────────────────────── MM_SERVER_URL = _get_str("MM_SERVER_URL", "https://mattermost.hoonfam.org") MM_BOT_TOKEN = _get_str("MM_BOT_TOKEN_", "").strip() # DB 키는 MM_BOT_TOKEN_ (언더스코어) MM_CONFIG_FILE = os.path.join(current_dir, 'mm_config.json') SCAN_INTERVAL = _get_int("SCAN_INTERVAL_SEC", 300) # 스캔 주기 (초), 기본 5분 # 유니버스 저장 필터 TOP_N = _get_int("UPDATE_UNIVERSE_TOP_N", 20) # 저장 상위 N개 MIN_SCORE = _get_float("UPDATE_UNIVERSE_MIN_SCORE", 4.0) # 최소 강도 점수 # ── 종목 단가 상한 (리스크 파라미터 자동 파생) ─────────────────────────── # 원리: 투자금 = MAX_LOSS ÷ |STOP_PCT| → qty = floor(투자금/price) # qty ≥ 1 이 되려면: price ≤ 투자금 = MAX_LOSS ÷ STOP_PCT # 봇과 동일한 DB값 사용 → 봇이 실제로 살 수 없는 종목은 유니버스에 애초에 넣지 않음 _max_loss_krw = _get_int("MAX_LOSS_PER_TRADE_KRW", 200000) _stop_loss_pct = abs(_get_float("STOP_LOSS_PCT", 0.03)) # 음수 저장 방지 _stop_loss_pct = _stop_loss_pct if _stop_loss_pct > 0 else 0.03 MAX_STOCK_PRICE_KRW = int(_max_loss_krw / _stop_loss_pct) # ── Kiwoom API import ────────────────────────────────── try: from kiwoom_rest_api.auth.token import TokenManager from kiwoom_rest_api.koreanstock.stockinfo import StockInfo from kiwoom_rest_api.koreanstock.chart import Chart from kiwoom_rest_api.koreanstock.order import Order from kiwoom_rest_api.koreanstock.rank_info import RankInfo from kiwoom_rest_api.koreanstock.account import Account from kiwoom_rest_api.koreanstock.market_condition import MarketCondition # 퀀트 트레이딩 필수 API 추가 from kiwoom_rest_api.koreanstock.sector import Sector from kiwoom_rest_api.koreanstock.foreign_institution import ForeignInstitution from kiwoom_rest_api.koreanstock.theme import Theme from kiwoom_rest_api.koreanstock.etf import ETF except ImportError as e: logger.critical(f"❌ 키움 REST API 모듈 임포트 실패: {e}") raise e # ── Mattermost ───────────────────────────────────────── class MattermostBot: def __init__(self): self.api_url = f'{MM_SERVER_URL.rstrip("/")}/api/v4/posts' self.headers = { 'Authorization': f'Bearer {MM_BOT_TOKEN}', 'Content-Type': 'application/json' } self.channels = self._load_channels() def _load_channels(self): try: if os.path.exists(MM_CONFIG_FILE): with open(MM_CONFIG_FILE, 'r', encoding='utf-8') as f: return json.load(f).get('channels', {}) except Exception as e: logger.error(f'MM 채널 로드 오류: {e}') return {} def send(self, channel_alias, message): channel_id = self.channels.get(channel_alias) if not channel_id: logger.warning(f'MM 채널 {channel_alias} ID 없음') return False try: res = requests.post( self.api_url, headers=self.headers, json={'channel_id': channel_id, 'message': message}, timeout=3 ) res.raise_for_status() return True except Exception as e: logger.error(f'MM 전송 오류: {e}') return False # ── Broker API (스캔 전용) ───────────────────────────── class BrokerAPI: def __init__(self): logger.info('키움 REST API 초기화 중...') try: self.token_manager = TokenManager() # access_token 프로퍼티 호출로 최초 토큰 발급 _tok = self.token_manager.access_token logger.info('키움 토큰 발급 완료 (len=%d)', len(_tok or '')) self.stock_info = StockInfo(token_manager=self.token_manager) self.chart = Chart(token_manager=self.token_manager) self.order = Order(token_manager=self.token_manager) self.rank = RankInfo(token_manager=self.token_manager) self.account = Account(token_manager=self.token_manager) self.market = MarketCondition(token_manager=self.token_manager) self.sector = Sector(token_manager=self.token_manager) self.foreign_inst = ForeignInstitution(token_manager=self.token_manager) self.theme = Theme(token_manager=self.token_manager) self.etf = ETF(token_manager=self.token_manager) self.acc_no = os.environ.get('KIWOOM_ACCOUNT_NO', '') logger.info(f'키움 API 초기화 완료 | 계좌: {self.acc_no}') except Exception as e: logger.critical(f'키움 API 초기화 실패: {e}') raise def _safe_request(self, func, *args, **kwargs): """ API 호출 안전장치 - return_code != '0' 이면 return_msg 로깅 후 재시도 여부 판단 - 429 / 초과 / 과부하 → 지수 백오프 재시도 - 인증 오류(8005) → 토큰 재발급 후 1회 재시도 """ full_name = func.__name__ api_id = full_name.split('_')[-1] max_retries = 3 for i in range(max_retries): try: time.sleep(0.5) result = func(*args, **kwargs) if isinstance(result, dict): return_code = str(result.get('return_code', '0')) return_msg = str(result.get('return_msg', '')) if return_code == '0': return result # ✅ 정상 # 토큰 만료(8005) → 재발급 후 1회 재시도 if '8005' in return_msg or 'Token' in return_msg: logger.warning(f"⚠️ [{api_id}] 토큰 만료 감지 → 재발급 후 재시도") self.token_manager._request_new_token() time.sleep(1) continue # API 호출 초과 / 과부하 if '초과' in return_msg or '과부하' in return_msg or '429' in return_msg: wait = (2 ** i) + random.uniform(0.5, 1.5) logger.warning(f"⚠️ [{api_id}] 호출 제한 → {wait:.1f}초 대기 ({i+1}/{max_retries})") time.sleep(wait) continue # 그 외 오류 → 로깅 후 빈 dict 반환 (재시도 불필요) logger.warning(f"⚠️ [{api_id}] API 오류 (code={return_code}): {return_msg[:80]}") return {} return result except Exception as e: msg = str(e) if ("429" in msg) or ("초과" in msg) or ("과부하" in msg): wait = (2 ** i) + random.uniform(0.5, 1.5) logger.warning(f"⚠️ [{api_id}] 예외 호출제한 → {wait:.1f}초 대기 ({i+1}/{max_retries})") time.sleep(wait) else: logger.error(f"❌ [{api_id}] 예외: {e}") return {} logger.error(f"💀 [{api_id}] {max_retries}회 재시도 모두 실패") return {} def _is_valid_stock(self, name, code): """종목 필터링 (스팩, ETN, 우선주 등 제외)""" if len(code) != 6 or not code.isdigit(): return False exclude = ['스팩', 'ETN', 'W', 'ELW', '채권', '레버리지', '인버스', '곱버스', '선물', '콜', '풋', '2X', '3X', '합성', 'H', 'B'] if any(k in name for k in exclude): return False if name.endswith('우') or name.endswith('우B'): return False return True def scan_ant_shaking_candidates(self): """ [조건검색 대체 로직] 개미털기(눌림목) 후보 종목 스캔 스캔 순서: 1. 거래대금/회전율 상위 종목 수집 2. [1차 컷] 가격 상한 필터 (MAX_STOCK_PRICE_KRW = MAX_LOSS ÷ STOP_PCT) → 이 이상 가격 종목은 1주라도 사면 손절 시 MAX_LOSS 초과 → 제외 3. [2차 계산] 상세 OHLCV → 낙폭·회복률·강도 점수 산정 → 가격컷을 강도계산 이전에 수행해 불필요한 API 호출 최소화 """ logger.info(f"🐜 [개미털기] 스캔 시작 (단가상한 {MAX_STOCK_PRICE_KRW:,}원 = 손실한도{_max_loss_krw:,}÷손절{_stop_loss_pct*100:.1f}%)") logger.info(f" 📡 수집 방식: 거래대금 + 회전율 (2가지 소스)") raw_codes_set = set() scan_strategies = [("3", "거래대금"), ("2", "회전율")] for sort_tp, desc in scan_strategies: logger.info(f" 🔍 [{desc}] 상위 종목 조회 중...") try: res = self._safe_request( self.rank.top_trading_volume_today_request_ka10030, mrkt_tp="000", sort_tp=sort_tp, mang_stk_incls="3", pric_tp="0", crd_tp="0", trde_qty_tp="0", trde_prica_tp="0", mrkt_open_tp="0", stex_tp="3", cont_yn="N" ) if not res or 'tdy_trde_qty_upper' not in res: logger.warning(f" ⚠️ [{desc}] 응답 없음") continue logger.info(f" ✅ [{desc}] {len(res['tdy_trde_qty_upper'])}개 수신") for stock in res['tdy_trde_qty_upper']: code = stock['stk_cd'].split('_')[0] try: price = abs(float(stock['cur_prc'])) except: continue # open_pric 없는 경우 현재가로 대체 (모의 API 일부 누락) raw_open = stock.get('open_pric') or stock.get('opmr_pred_rt') or 0 try: open_price = abs(float(raw_open)) if raw_open else price except (ValueError, TypeError): open_price = price change_rate = ((price - open_price) / open_price * 100) if open_price > 0 else 0 if price < 1000: # 동전주 제외 continue if change_rate > 20: # 상한가 근처 제외 continue # [1차 컷] 단가 상한: price > MAX_LOSS ÷ STOP_PCT 이면 qty=0이 되어 매수 불가 # → 강도 계산(2차) 이전에 먼저 제거해 불필요한 API 호출 방지 if price > MAX_STOCK_PRICE_KRW: continue name = stock['stk_nm'] if self._is_valid_stock(name, code): raw_codes_set.add(code) except Exception as e: logger.error(f"스캔({desc}) 중 에러: {e}") raw_codes = list(raw_codes_set) logger.info(f" 📥 후보 수집: {len(raw_codes)}개 -> 정밀 분석") final_list = [] chunk_size = 50 try: for i in range(0, len(raw_codes), chunk_size): chunk = raw_codes[i:i + chunk_size] code_str = '|'.join(chunk) res = self._safe_request( self.stock_info.watchlist_stock_information_request_ka10095, stock_code=code_str ) if res and 'atn_stk_infr' in res: for item in res['atn_stk_infr']: code = item['stk_cd'].strip() if code.startswith('A'): code = code[1:] try: op = abs(float(item.get('open_pric', 0))) hi = abs(float(item.get('high_pric', 0))) lo = abs(float(item.get('low_pric', 0))) cl = abs(float(item.get('cur_prc', 0))) if op == 0: continue # 가격 상한은 1차 수집 단계에서 이미 적용됨 → 중복 필터 불필요 drop_rate = (op - lo) / op total_range = hi - lo recovery_pos = (cl - lo) / total_range if total_range > 0 else 0 if total_range > 0 and drop_rate > 0.03: if recovery_pos > 0.5: score = drop_rate * 100 logger.info( f" 💎 {item['stk_nm'].strip()} {code}: " f"낙폭 {drop_rate*100:.1f}% | 회복 {recovery_pos*100:.0f}% | 점수 {score:.2f}" ) final_list.append({ 'code': code, 'name': item['stk_nm'].strip(), 'price': cl, 'score': score }) except Exception as e: logger.debug(f"종목 분석 오류({code}): {e}") continue time.sleep(0.5) final_list.sort(key=lambda x: x['score'], reverse=True) return final_list except Exception as e: logger.error(f"분석 중 치명적 에러: {e}") return [] def get_foreign_consecutive_buy(self, consecutive_days=3, market="001", limit=20): try: res = self._safe_request( self.rank.top_foreign_consecutive_net_buy_request_ka10035, mrkt_tp=market, trde_tp="2", base_dt_tp="1", stex_tp="1" ) stocks = res.get('for_cont_nettrde_upper', []) if res else [] return stocks[:limit] except Exception as e: logger.error(f"외국인 연속 순매수 조회 실패: {e}") return [] def get_institutional_buy_stocks(self, market="001", limit=20): try: from datetime import datetime as dt today = dt.now().strftime("%Y%m%d") res = self._safe_request( self.rank.same_day_net_buying_ranking_request_ka10062, strt_dt=today, mrkt_tp=market, trde_tp="1", sort_cnd="1", unit_tp="1", stex_tp="1" ) stocks = res.get('eql_nettrde_rank', []) if res else [] return [s for s in stocks if float(s.get('orgn_nettrde_qty', 0)) > 0][:limit] except Exception as e: logger.error(f"기관 순매수 조회 실패: {e}") return [] def get_volume_surge_stocks(self, market="001", min_volume="50", limit=20): try: res = self._safe_request( self.rank.sudden_increase_trading_volume_request_ka10023, mrkt_tp=market, sort_tp="1", tm_tp="2", trde_qty_tp=min_volume, stk_cnd="1", pric_tp="0", stex_tp="1" ) stocks = res.get('trde_qty_sdnin', []) if res else [] return stocks[:limit] except Exception as e: logger.error(f"거래량 급증 조회 실패: {e}") return [] def get_top_price_movers(self, market="001", sort_type="1", limit=20): try: res = self._safe_request( self.rank.top_day_over_day_change_rate_request_ka10027, mrkt_tp=market, sort_tp=sort_type, trde_qty_cnd="0000", stk_cnd="1", crd_cnd="0", updown_incls="1", pric_cnd="0", trde_prica_cnd="0", stex_tp="1" ) stocks = res.get('pred_pre_flu_rt_upper', []) if res else [] return stocks[:limit] except Exception as e: logger.error(f"등락률 조회 실패: {e}") return [] def check_market_status(self): now = datetime.datetime.now() if not (datetime.time(8, 30) <= now.time() <= datetime.time(16, 0)): return False if now.weekday() >= 5: return False return True # ── MariaDB 저장 ──────────────────────────────────────── def save_to_kis_db(candidates: list, db_path: str = None): """ target_candidates 테이블에 저장 (MariaDB kis_quant_db). db_path 인수는 하위 호환용으로 유지하되 무시됩니다. TradeDB.update_target_candidates() 를 사용합니다. """ try: from database import TradeDB db = TradeDB() ok = db.update_target_candidates(candidates) if ok: logger.info(f'💾 MariaDB 저장 완료: {len(candidates)}종목 → kis_quant_db.target_candidates') return ok except Exception as e: logger.error(f'MariaDB 저장 실패: {e}') return False # ── 유니버스 업데이트 ────────────────────────────────── def update_universe(api: BrokerAPI, mm: MattermostBot): logger.info('=' * 50) logger.info(f'🔄 유니버스 업데이트 시작 | {datetime.datetime.now().strftime("%H:%M:%S")}') all_candidates = {} # code → {code, name, price, basescore, bonusscore, fromant} # 1. 개미털기 스캔 (핵심) # 가격 상한(MAX_STOCK_PRICE_KRW)은 scan_ant_shaking_candidates 내부에서 1차로 적용됨 try: antshaking = api.scan_ant_shaking_candidates() for item in antshaking: code = item['code'] all_candidates[code] = { 'code': code, 'name': item['name'], 'price': item['price'], 'basescore': item['score'], 'bonusscore': 0.0, 'fromant': True } logger.info(f'🐜 개미털기 후보: {len(antshaking)}종목') except Exception as e: logger.warning(f'개미털기 스캔 오류: {e}') # 2. 외국인 연속 매수 보너스 (0.5점) try: foreign_buy = api.get_foreign_consecutive_buy(consecutive_days=2, limit=30) logger.info(f'🌍 외국인 연속매수: {len(foreign_buy)}건') for idx, item in enumerate(foreign_buy): code = item.get('stk_cd', '').strip() if len(code) != 6: continue bonus = (30 - idx) / 30.0 * 0.5 if code in all_candidates: all_candidates[code]['bonusscore'] += bonus else: all_candidates[code] = { 'code': code, 'name': item.get('stk_nm', code), 'price': 0, 'basescore': 3.0, 'bonusscore': bonus, 'fromant': False } except Exception as e: logger.warning(f'외국인 연속매수 오류: {e}') # 3. 거래량 급증 보너스 (0.3점) try: vol_surge = api.get_volume_surge_stocks(min_volume="50", limit=30) logger.info(f'📈 거래량 급증: {len(vol_surge)}건') for idx, item in enumerate(vol_surge): code = item.get('stk_cd', '').split('_')[0].strip() if len(code) != 6: continue bonus = (30 - idx) / 30.0 * 0.3 price = abs(float(item.get('cur_prc') or item.get('prpr') or 0)) if code in all_candidates: all_candidates[code]['bonusscore'] += bonus else: all_candidates[code] = { 'code': code, 'name': item.get('stk_nm', code), 'price': price, 'basescore': 2.5, 'bonusscore': bonus, 'fromant': False } except Exception as e: logger.warning(f'거래량 급증 오류: {e}') # 4. 기관 매수 보너스 (0.3점) try: inst_buy = api.get_institutional_buy_stocks(limit=30) logger.info(f'🏦 기관 매수: {len(inst_buy)}건') for idx, item in enumerate(inst_buy): code = item.get('stk_cd', '').strip() if len(code) != 6: continue bonus = (30 - idx) / 30.0 * 0.3 price = abs(float(item.get('cur_prc') or item.get('prpr') or 0)) if code in all_candidates: all_candidates[code]['bonusscore'] += bonus else: all_candidates[code] = { 'code': code, 'name': item.get('stk_nm', code), 'price': price, 'basescore': 2.5, 'bonusscore': bonus, 'fromant': False } except Exception as e: logger.warning(f'기관 매수 오류: {e}') # 5. 등락률 상위 보너스 (0.2점) try: price_movers = api.get_top_price_movers(sort_type='1', limit=30) logger.info(f'🚀 등락률 상위: {len(price_movers)}건') for idx, item in enumerate(price_movers): code = item.get('stk_cd', '').strip() if len(code) != 6: continue bonus = (30 - idx) / 30.0 * 0.2 price = abs(float(item.get('cur_prc') or item.get('prpr') or 0)) if code in all_candidates: all_candidates[code]['bonusscore'] += bonus else: all_candidates[code] = { 'code': code, 'name': item.get('stk_nm', code), 'price': price, 'basescore': 2.0, 'bonusscore': bonus, 'fromant': False } except Exception as e: logger.warning(f'등락률 순위 오류: {e}') # 6. 최종 후보 필터 (MIN_SCORE 이상만) now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') final_candidates = [] ant_count = 0 bonus_count = 0 for code, data in all_candidates.items(): total_score = data['basescore'] + data['bonusscore'] if total_score >= MIN_SCORE: final_candidates.append({ 'code': code, 'name': data['name'], 'score': round(total_score, 2), 'price': data['price'], 'scan_time': now_str }) if data['fromant']: ant_count += 1 else: bonus_count += 1 final_candidates.sort(key=lambda x: x['score'], reverse=True) final_candidates = final_candidates[:TOP_N] top5 = [f"{x['name']}({x['score']:.1f})" for x in final_candidates[:5]] logger.info(f'✅ 최종 후보: {len(final_candidates)}종목 (개미털기:{ant_count} 보너스:{bonus_count})') logger.info(f'🔝 Top5: {", ".join(top5)}') # 7. MariaDB 저장 if final_candidates: save_to_kis_db(final_candidates) else: logger.warning('⚠️ 최종 후보 0개 → DB 저장 생략') # 8. Mattermost 알림 try: msg = ( f'🔄 유니버스 업데이트 완료\n' f'- 후보: {len(final_candidates)}종목 (개미털기:{ant_count} 보너스:{bonus_count})\n' f'- Top5: {", ".join(top5)}' ) mm.send('stock', msg) except Exception as e: logger.debug(f'MM 알림 오류: {e}') logger.info('=' * 50) # ── 메인 루프 ────────────────────────────────────────── def main(): logger.info('🚀 키움 유니버스 스캐너 시작') logger.info(f' DB : MariaDB kis_quant_db (192.168.0.141)') logger.info(f' 스캔 주기 : {SCAN_INTERVAL}초') logger.info(f' 최소 점수 : {MIN_SCORE}') logger.info(f' 저장 Top N : {TOP_N}') api = BrokerAPI() mm = MattermostBot() is_first_run = True while True: try: is_open = api.check_market_status() now = datetime.datetime.now() # 장 중이거나 첫 실행이면 스캔 if is_open or is_first_run: update_universe(api, mm) is_first_run = False else: logger.info(f'⏸ 장 외 시간 ({now.strftime("%H:%M:%S")}) → 스캔 대기') # 다음 5분 경계까지 대기 next_min = (now.minute // 5 + 1) * 5 if next_min >= 60: next_time = now.replace(hour=now.hour + 1, minute=0, second=5, microsecond=0) else: next_time = now.replace(minute=next_min, second=5, microsecond=0) wait_sec = max(5, (next_time - now).total_seconds()) logger.info(f'⏳ 다음 스캔: {next_time.strftime("%H:%M:%S")} ({wait_sec:.0f}초 후)') time.sleep(wait_sec) except KeyboardInterrupt: logger.info('⛔ 종료 요청') break except Exception as e: logger.error(f'메인 루프 오류: {e}') time.sleep(60) if __name__ == '__main__': main()