679 lines
29 KiB
Python
679 lines
29 KiB
Python
# kiwoom_universe_scanner.py
|
|
# 키움 REST API로 개미털기 유니버스 스캔 → MariaDB kis_quant_db 저장
|
|
# 매매 없음, 스캔 전용 단독 실행 파일
|
|
|
|
import time
|
|
import json
|
|
import datetime
|
|
import os
|
|
import logging
|
|
import requests
|
|
import random
|
|
from dotenv import load_dotenv
|
|
import sys
|
|
|
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
# ── 키움 REST API 인증 .env 로드 ─────────────────────────────────────
|
|
# kiwoom_rest_api/.env 에 KIWOOM_API_KEY / KIWOOM_API_SECRET / KIWOOM_USE_SANDBOX 저장
|
|
# (API 키는 보안상 .env 파일에만 보관, DB 저장 안 함)
|
|
_env_path = os.path.join(current_dir, "kiwoom_rest_api", ".env")
|
|
if not os.path.exists(_env_path):
|
|
_env_path = os.path.join(current_dir, ".env")
|
|
load_dotenv(_env_path)
|
|
|
|
# ── 로거 ──────────────────────────────────────────────
|
|
logging.basicConfig(
|
|
format='%(asctime)s %(message)s',
|
|
datefmt='%H:%M:%S',
|
|
level=logging.INFO
|
|
)
|
|
logger = logging.getLogger('UniverseScanner')
|
|
logging.getLogger('urllib3').setLevel(logging.WARNING)
|
|
logging.getLogger('requests').setLevel(logging.WARNING)
|
|
|
|
# ── MariaDB 연동: 봇들과 동일한 DB에서 설정값 읽기 ─────────────────────
|
|
# get_env_int/float("KEY", default) → DB env_config 최신 행 우선, 없으면 default
|
|
try:
|
|
from database import TradeDB as _TradeDB
|
|
_db = _TradeDB()
|
|
_env_snap = (_db.get_latest_env() or {}).get("snapshot", {})
|
|
logger.info("✅ MariaDB 설정 연동 완료 (env_config 최신 %d키 로드)",
|
|
sum(1 for v in _env_snap.values() if v))
|
|
|
|
def _get_str(key: str, default: str = "") -> str:
|
|
v = _env_snap.get(key, "")
|
|
return str(v).split("#")[0].strip() if v else default
|
|
|
|
def _get_int(key: str, default: int) -> int:
|
|
try:
|
|
return int(_get_str(key, str(default))) or default
|
|
except (ValueError, TypeError):
|
|
return default
|
|
|
|
def _get_float(key: str, default: float) -> float:
|
|
try:
|
|
return float(_get_str(key, str(default))) or default
|
|
except (ValueError, TypeError):
|
|
return default
|
|
|
|
except Exception as _db_e:
|
|
logger.warning("⚠️ MariaDB 연동 실패 → os.environ fallback: %s", _db_e)
|
|
_env_snap = {}
|
|
|
|
def _get_str(key: str, default: str = "") -> str:
|
|
return os.environ.get(key, default)
|
|
|
|
def _get_int(key: str, default: int) -> int:
|
|
try:
|
|
return int(os.environ.get(key, str(default)))
|
|
except (ValueError, TypeError):
|
|
return default
|
|
|
|
def _get_float(key: str, default: float) -> float:
|
|
try:
|
|
return float(os.environ.get(key, str(default)))
|
|
except (ValueError, TypeError):
|
|
return default
|
|
|
|
# ── 설정값 (DB 우선, fallback: 기본값) ───────────────────────────────
|
|
MM_SERVER_URL = _get_str("MM_SERVER_URL", "https://mattermost.hoonfam.org")
|
|
MM_BOT_TOKEN = _get_str("MM_BOT_TOKEN_", "").strip() # DB 키는 MM_BOT_TOKEN_ (언더스코어)
|
|
MM_CONFIG_FILE = os.path.join(current_dir, 'mm_config.json')
|
|
SCAN_INTERVAL = _get_int("SCAN_INTERVAL_SEC", 300) # 스캔 주기 (초), 기본 5분
|
|
|
|
# 유니버스 저장 필터
|
|
TOP_N = _get_int("UPDATE_UNIVERSE_TOP_N", 20) # 저장 상위 N개
|
|
MIN_SCORE = _get_float("UPDATE_UNIVERSE_MIN_SCORE", 4.0) # 최소 강도 점수
|
|
|
|
# ── 종목 단가 상한 (리스크 파라미터 자동 파생) ───────────────────────────
|
|
# 원리: 투자금 = MAX_LOSS ÷ |STOP_PCT| → qty = floor(투자금/price)
|
|
# qty ≥ 1 이 되려면: price ≤ 투자금 = MAX_LOSS ÷ STOP_PCT
|
|
# 봇과 동일한 DB값 사용 → 봇이 실제로 살 수 없는 종목은 유니버스에 애초에 넣지 않음
|
|
_max_loss_krw = _get_int("MAX_LOSS_PER_TRADE_KRW", 200000)
|
|
_stop_loss_pct = abs(_get_float("STOP_LOSS_PCT", 0.03)) # 음수 저장 방지
|
|
_stop_loss_pct = _stop_loss_pct if _stop_loss_pct > 0 else 0.03
|
|
MAX_STOCK_PRICE_KRW = int(_max_loss_krw / _stop_loss_pct)
|
|
|
|
# ── Kiwoom API import ──────────────────────────────────
|
|
try:
|
|
from kiwoom_rest_api.auth.token import TokenManager
|
|
from kiwoom_rest_api.koreanstock.stockinfo import StockInfo
|
|
from kiwoom_rest_api.koreanstock.chart import Chart
|
|
from kiwoom_rest_api.koreanstock.order import Order
|
|
from kiwoom_rest_api.koreanstock.rank_info import RankInfo
|
|
from kiwoom_rest_api.koreanstock.account import Account
|
|
from kiwoom_rest_api.koreanstock.market_condition import MarketCondition
|
|
# 퀀트 트레이딩 필수 API 추가
|
|
from kiwoom_rest_api.koreanstock.sector import Sector
|
|
from kiwoom_rest_api.koreanstock.foreign_institution import ForeignInstitution
|
|
from kiwoom_rest_api.koreanstock.theme import Theme
|
|
from kiwoom_rest_api.koreanstock.etf import ETF
|
|
except ImportError as e:
|
|
logger.critical(f"❌ 키움 REST API 모듈 임포트 실패: {e}")
|
|
raise e
|
|
|
|
|
|
# ── Mattermost ─────────────────────────────────────────
|
|
class MattermostBot:
|
|
def __init__(self):
|
|
self.api_url = f'{MM_SERVER_URL.rstrip("/")}/api/v4/posts'
|
|
self.headers = {
|
|
'Authorization': f'Bearer {MM_BOT_TOKEN}',
|
|
'Content-Type': 'application/json'
|
|
}
|
|
self.channels = self._load_channels()
|
|
|
|
def _load_channels(self):
|
|
try:
|
|
if os.path.exists(MM_CONFIG_FILE):
|
|
with open(MM_CONFIG_FILE, 'r', encoding='utf-8') as f:
|
|
return json.load(f).get('channels', {})
|
|
except Exception as e:
|
|
logger.error(f'MM 채널 로드 오류: {e}')
|
|
return {}
|
|
|
|
def send(self, channel_alias, message):
|
|
channel_id = self.channels.get(channel_alias)
|
|
if not channel_id:
|
|
logger.warning(f'MM 채널 {channel_alias} ID 없음')
|
|
return False
|
|
try:
|
|
res = requests.post(
|
|
self.api_url,
|
|
headers=self.headers,
|
|
json={'channel_id': channel_id, 'message': message},
|
|
timeout=3
|
|
)
|
|
res.raise_for_status()
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f'MM 전송 오류: {e}')
|
|
return False
|
|
|
|
|
|
# ── Broker API (스캔 전용) ─────────────────────────────
|
|
class BrokerAPI:
|
|
def __init__(self):
|
|
logger.info('키움 REST API 초기화 중...')
|
|
try:
|
|
self.token_manager = TokenManager()
|
|
# access_token 프로퍼티 호출로 최초 토큰 발급
|
|
_tok = self.token_manager.access_token
|
|
logger.info('키움 토큰 발급 완료 (len=%d)', len(_tok or ''))
|
|
self.stock_info = StockInfo(token_manager=self.token_manager)
|
|
self.chart = Chart(token_manager=self.token_manager)
|
|
self.order = Order(token_manager=self.token_manager)
|
|
self.rank = RankInfo(token_manager=self.token_manager)
|
|
self.account = Account(token_manager=self.token_manager)
|
|
self.market = MarketCondition(token_manager=self.token_manager)
|
|
self.sector = Sector(token_manager=self.token_manager)
|
|
self.foreign_inst = ForeignInstitution(token_manager=self.token_manager)
|
|
self.theme = Theme(token_manager=self.token_manager)
|
|
self.etf = ETF(token_manager=self.token_manager)
|
|
self.acc_no = os.environ.get('KIWOOM_ACCOUNT_NO', '')
|
|
logger.info(f'키움 API 초기화 완료 | 계좌: {self.acc_no}')
|
|
except Exception as e:
|
|
logger.critical(f'키움 API 초기화 실패: {e}')
|
|
raise
|
|
|
|
def _safe_request(self, func, *args, **kwargs):
|
|
"""
|
|
API 호출 안전장치
|
|
- return_code != '0' 이면 return_msg 로깅 후 재시도 여부 판단
|
|
- 429 / 초과 / 과부하 → 지수 백오프 재시도
|
|
- 인증 오류(8005) → 토큰 재발급 후 1회 재시도
|
|
"""
|
|
full_name = func.__name__
|
|
api_id = full_name.split('_')[-1]
|
|
max_retries = 3
|
|
|
|
for i in range(max_retries):
|
|
try:
|
|
time.sleep(0.5)
|
|
result = func(*args, **kwargs)
|
|
|
|
if isinstance(result, dict):
|
|
return_code = str(result.get('return_code', '0'))
|
|
return_msg = str(result.get('return_msg', ''))
|
|
|
|
if return_code == '0':
|
|
return result # ✅ 정상
|
|
|
|
# 토큰 만료(8005) → 재발급 후 1회 재시도
|
|
if '8005' in return_msg or 'Token' in return_msg:
|
|
logger.warning(f"⚠️ [{api_id}] 토큰 만료 감지 → 재발급 후 재시도")
|
|
self.token_manager._request_new_token()
|
|
time.sleep(1)
|
|
continue
|
|
|
|
# API 호출 초과 / 과부하
|
|
if '초과' in return_msg or '과부하' in return_msg or '429' in return_msg:
|
|
wait = (2 ** i) + random.uniform(0.5, 1.5)
|
|
logger.warning(f"⚠️ [{api_id}] 호출 제한 → {wait:.1f}초 대기 ({i+1}/{max_retries})")
|
|
time.sleep(wait)
|
|
continue
|
|
|
|
# 그 외 오류 → 로깅 후 빈 dict 반환 (재시도 불필요)
|
|
logger.warning(f"⚠️ [{api_id}] API 오류 (code={return_code}): {return_msg[:80]}")
|
|
return {}
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
msg = str(e)
|
|
if ("429" in msg) or ("초과" in msg) or ("과부하" in msg):
|
|
wait = (2 ** i) + random.uniform(0.5, 1.5)
|
|
logger.warning(f"⚠️ [{api_id}] 예외 호출제한 → {wait:.1f}초 대기 ({i+1}/{max_retries})")
|
|
time.sleep(wait)
|
|
else:
|
|
logger.error(f"❌ [{api_id}] 예외: {e}")
|
|
return {}
|
|
|
|
logger.error(f"💀 [{api_id}] {max_retries}회 재시도 모두 실패")
|
|
return {}
|
|
|
|
def _is_valid_stock(self, name, code):
|
|
"""종목 필터링 (스팩, ETN, 우선주 등 제외)"""
|
|
if len(code) != 6 or not code.isdigit():
|
|
return False
|
|
|
|
exclude = ['스팩', 'ETN', 'W', 'ELW', '채권', '레버리지', '인버스',
|
|
'곱버스', '선물', '콜', '풋', '2X', '3X', '합성', 'H', 'B']
|
|
|
|
if any(k in name for k in exclude):
|
|
return False
|
|
|
|
if name.endswith('우') or name.endswith('우B'):
|
|
return False
|
|
|
|
return True
|
|
|
|
def scan_ant_shaking_candidates(self):
|
|
"""
|
|
[조건검색 대체 로직] 개미털기(눌림목) 후보 종목 스캔
|
|
|
|
스캔 순서:
|
|
1. 거래대금/회전율 상위 종목 수집
|
|
2. [1차 컷] 가격 상한 필터 (MAX_STOCK_PRICE_KRW = MAX_LOSS ÷ STOP_PCT)
|
|
→ 이 이상 가격 종목은 1주라도 사면 손절 시 MAX_LOSS 초과 → 제외
|
|
3. [2차 계산] 상세 OHLCV → 낙폭·회복률·강도 점수 산정
|
|
→ 가격컷을 강도계산 이전에 수행해 불필요한 API 호출 최소화
|
|
"""
|
|
logger.info(f"🐜 [개미털기] 스캔 시작 (단가상한 {MAX_STOCK_PRICE_KRW:,}원 = 손실한도{_max_loss_krw:,}÷손절{_stop_loss_pct*100:.1f}%)")
|
|
logger.info(f" 📡 수집 방식: 거래대금 + 회전율 (2가지 소스)")
|
|
raw_codes_set = set()
|
|
scan_strategies = [("3", "거래대금"), ("2", "회전율")]
|
|
|
|
for sort_tp, desc in scan_strategies:
|
|
logger.info(f" 🔍 [{desc}] 상위 종목 조회 중...")
|
|
try:
|
|
res = self._safe_request(
|
|
self.rank.top_trading_volume_today_request_ka10030,
|
|
mrkt_tp="000",
|
|
sort_tp=sort_tp,
|
|
mang_stk_incls="3",
|
|
pric_tp="0",
|
|
crd_tp="0",
|
|
trde_qty_tp="0",
|
|
trde_prica_tp="0",
|
|
mrkt_open_tp="0",
|
|
stex_tp="3",
|
|
cont_yn="N"
|
|
)
|
|
|
|
if not res or 'tdy_trde_qty_upper' not in res:
|
|
logger.warning(f" ⚠️ [{desc}] 응답 없음")
|
|
continue
|
|
|
|
logger.info(f" ✅ [{desc}] {len(res['tdy_trde_qty_upper'])}개 수신")
|
|
|
|
for stock in res['tdy_trde_qty_upper']:
|
|
code = stock['stk_cd'].split('_')[0]
|
|
try:
|
|
price = abs(float(stock['cur_prc']))
|
|
except:
|
|
continue
|
|
|
|
# open_pric 없는 경우 현재가로 대체 (모의 API 일부 누락)
|
|
raw_open = stock.get('open_pric') or stock.get('opmr_pred_rt') or 0
|
|
try:
|
|
open_price = abs(float(raw_open)) if raw_open else price
|
|
except (ValueError, TypeError):
|
|
open_price = price
|
|
change_rate = ((price - open_price) / open_price * 100) if open_price > 0 else 0
|
|
|
|
if price < 1000: # 동전주 제외
|
|
continue
|
|
if change_rate > 20: # 상한가 근처 제외
|
|
continue
|
|
# [1차 컷] 단가 상한: price > MAX_LOSS ÷ STOP_PCT 이면 qty=0이 되어 매수 불가
|
|
# → 강도 계산(2차) 이전에 먼저 제거해 불필요한 API 호출 방지
|
|
if price > MAX_STOCK_PRICE_KRW:
|
|
continue
|
|
|
|
name = stock['stk_nm']
|
|
if self._is_valid_stock(name, code):
|
|
raw_codes_set.add(code)
|
|
except Exception as e:
|
|
logger.error(f"스캔({desc}) 중 에러: {e}")
|
|
|
|
raw_codes = list(raw_codes_set)
|
|
logger.info(f" 📥 후보 수집: {len(raw_codes)}개 -> 정밀 분석")
|
|
|
|
final_list = []
|
|
chunk_size = 50
|
|
|
|
try:
|
|
for i in range(0, len(raw_codes), chunk_size):
|
|
chunk = raw_codes[i:i + chunk_size]
|
|
code_str = '|'.join(chunk)
|
|
|
|
res = self._safe_request(
|
|
self.stock_info.watchlist_stock_information_request_ka10095,
|
|
stock_code=code_str
|
|
)
|
|
|
|
if res and 'atn_stk_infr' in res:
|
|
for item in res['atn_stk_infr']:
|
|
code = item['stk_cd'].strip()
|
|
if code.startswith('A'):
|
|
code = code[1:]
|
|
|
|
try:
|
|
op = abs(float(item.get('open_pric', 0)))
|
|
hi = abs(float(item.get('high_pric', 0)))
|
|
lo = abs(float(item.get('low_pric', 0)))
|
|
cl = abs(float(item.get('cur_prc', 0)))
|
|
|
|
if op == 0:
|
|
continue
|
|
# 가격 상한은 1차 수집 단계에서 이미 적용됨 → 중복 필터 불필요
|
|
|
|
drop_rate = (op - lo) / op
|
|
total_range = hi - lo
|
|
recovery_pos = (cl - lo) / total_range if total_range > 0 else 0
|
|
|
|
if total_range > 0 and drop_rate > 0.03:
|
|
if recovery_pos > 0.5:
|
|
score = drop_rate * 100
|
|
logger.info(
|
|
f" 💎 {item['stk_nm'].strip()} {code}: "
|
|
f"낙폭 {drop_rate*100:.1f}% | 회복 {recovery_pos*100:.0f}% | 점수 {score:.2f}"
|
|
)
|
|
final_list.append({
|
|
'code': code,
|
|
'name': item['stk_nm'].strip(),
|
|
'price': cl,
|
|
'score': score
|
|
})
|
|
except Exception as e:
|
|
logger.debug(f"종목 분석 오류({code}): {e}")
|
|
continue
|
|
|
|
time.sleep(0.5)
|
|
|
|
final_list.sort(key=lambda x: x['score'], reverse=True)
|
|
return final_list
|
|
except Exception as e:
|
|
logger.error(f"분석 중 치명적 에러: {e}")
|
|
return []
|
|
|
|
def get_foreign_consecutive_buy(self, consecutive_days=3, market="001", limit=20):
|
|
try:
|
|
res = self._safe_request(
|
|
self.rank.top_foreign_consecutive_net_buy_request_ka10035,
|
|
mrkt_tp=market,
|
|
trde_tp="2",
|
|
base_dt_tp="1",
|
|
stex_tp="1"
|
|
)
|
|
stocks = res.get('for_cont_nettrde_upper', []) if res else []
|
|
return stocks[:limit]
|
|
except Exception as e:
|
|
logger.error(f"외국인 연속 순매수 조회 실패: {e}")
|
|
return []
|
|
|
|
def get_institutional_buy_stocks(self, market="001", limit=20):
|
|
try:
|
|
from datetime import datetime as dt
|
|
today = dt.now().strftime("%Y%m%d")
|
|
res = self._safe_request(
|
|
self.rank.same_day_net_buying_ranking_request_ka10062,
|
|
strt_dt=today,
|
|
mrkt_tp=market,
|
|
trde_tp="1",
|
|
sort_cnd="1",
|
|
unit_tp="1",
|
|
stex_tp="1"
|
|
)
|
|
stocks = res.get('eql_nettrde_rank', []) if res else []
|
|
return [s for s in stocks if float(s.get('orgn_nettrde_qty', 0)) > 0][:limit]
|
|
except Exception as e:
|
|
logger.error(f"기관 순매수 조회 실패: {e}")
|
|
return []
|
|
|
|
def get_volume_surge_stocks(self, market="001", min_volume="50", limit=20):
|
|
try:
|
|
res = self._safe_request(
|
|
self.rank.sudden_increase_trading_volume_request_ka10023,
|
|
mrkt_tp=market,
|
|
sort_tp="1",
|
|
tm_tp="2",
|
|
trde_qty_tp=min_volume,
|
|
stk_cnd="1",
|
|
pric_tp="0",
|
|
stex_tp="1"
|
|
)
|
|
stocks = res.get('trde_qty_sdnin', []) if res else []
|
|
return stocks[:limit]
|
|
except Exception as e:
|
|
logger.error(f"거래량 급증 조회 실패: {e}")
|
|
return []
|
|
|
|
def get_top_price_movers(self, market="001", sort_type="1", limit=20):
|
|
try:
|
|
res = self._safe_request(
|
|
self.rank.top_day_over_day_change_rate_request_ka10027,
|
|
mrkt_tp=market,
|
|
sort_tp=sort_type,
|
|
trde_qty_cnd="0000",
|
|
stk_cnd="1",
|
|
crd_cnd="0",
|
|
updown_incls="1",
|
|
pric_cnd="0",
|
|
trde_prica_cnd="0",
|
|
stex_tp="1"
|
|
)
|
|
stocks = res.get('pred_pre_flu_rt_upper', []) if res else []
|
|
return stocks[:limit]
|
|
except Exception as e:
|
|
logger.error(f"등락률 조회 실패: {e}")
|
|
return []
|
|
|
|
def check_market_status(self):
|
|
now = datetime.datetime.now()
|
|
if not (datetime.time(8, 30) <= now.time() <= datetime.time(16, 0)):
|
|
return False
|
|
if now.weekday() >= 5:
|
|
return False
|
|
return True
|
|
|
|
|
|
# ── MariaDB 저장 ────────────────────────────────────────
|
|
def save_to_kis_db(candidates: list, db_path: str = None):
|
|
"""
|
|
target_candidates 테이블에 저장 (MariaDB kis_quant_db).
|
|
db_path 인수는 하위 호환용으로 유지하되 무시됩니다.
|
|
TradeDB.update_target_candidates() 를 사용합니다.
|
|
"""
|
|
try:
|
|
from database import TradeDB
|
|
db = TradeDB()
|
|
ok = db.update_target_candidates(candidates)
|
|
if ok:
|
|
logger.info(f'💾 MariaDB 저장 완료: {len(candidates)}종목 → kis_quant_db.target_candidates')
|
|
return ok
|
|
except Exception as e:
|
|
logger.error(f'MariaDB 저장 실패: {e}')
|
|
return False
|
|
|
|
|
|
# ── 유니버스 업데이트 ──────────────────────────────────
|
|
def update_universe(api: BrokerAPI, mm: MattermostBot):
|
|
logger.info('=' * 50)
|
|
logger.info(f'🔄 유니버스 업데이트 시작 | {datetime.datetime.now().strftime("%H:%M:%S")}')
|
|
|
|
all_candidates = {} # code → {code, name, price, basescore, bonusscore, fromant}
|
|
|
|
# 1. 개미털기 스캔 (핵심)
|
|
# 가격 상한(MAX_STOCK_PRICE_KRW)은 scan_ant_shaking_candidates 내부에서 1차로 적용됨
|
|
try:
|
|
antshaking = api.scan_ant_shaking_candidates()
|
|
for item in antshaking:
|
|
code = item['code']
|
|
all_candidates[code] = {
|
|
'code': code,
|
|
'name': item['name'],
|
|
'price': item['price'],
|
|
'basescore': item['score'],
|
|
'bonusscore': 0.0,
|
|
'fromant': True
|
|
}
|
|
logger.info(f'🐜 개미털기 후보: {len(antshaking)}종목')
|
|
except Exception as e:
|
|
logger.warning(f'개미털기 스캔 오류: {e}')
|
|
|
|
# 2. 외국인 연속 매수 보너스 (0.5점)
|
|
try:
|
|
foreign_buy = api.get_foreign_consecutive_buy(consecutive_days=2, limit=30)
|
|
logger.info(f'🌍 외국인 연속매수: {len(foreign_buy)}건')
|
|
for idx, item in enumerate(foreign_buy):
|
|
code = item.get('stk_cd', '').strip()
|
|
if len(code) != 6:
|
|
continue
|
|
bonus = (30 - idx) / 30.0 * 0.5
|
|
if code in all_candidates:
|
|
all_candidates[code]['bonusscore'] += bonus
|
|
else:
|
|
all_candidates[code] = {
|
|
'code': code, 'name': item.get('stk_nm', code),
|
|
'price': 0, 'basescore': 3.0, 'bonusscore': bonus, 'fromant': False
|
|
}
|
|
except Exception as e:
|
|
logger.warning(f'외국인 연속매수 오류: {e}')
|
|
|
|
# 3. 거래량 급증 보너스 (0.3점)
|
|
try:
|
|
vol_surge = api.get_volume_surge_stocks(min_volume="50", limit=30)
|
|
logger.info(f'📈 거래량 급증: {len(vol_surge)}건')
|
|
for idx, item in enumerate(vol_surge):
|
|
code = item.get('stk_cd', '').split('_')[0].strip()
|
|
if len(code) != 6:
|
|
continue
|
|
bonus = (30 - idx) / 30.0 * 0.3
|
|
price = abs(float(item.get('cur_prc') or item.get('prpr') or 0))
|
|
if code in all_candidates:
|
|
all_candidates[code]['bonusscore'] += bonus
|
|
else:
|
|
all_candidates[code] = {
|
|
'code': code, 'name': item.get('stk_nm', code),
|
|
'price': price,
|
|
'basescore': 2.5, 'bonusscore': bonus, 'fromant': False
|
|
}
|
|
except Exception as e:
|
|
logger.warning(f'거래량 급증 오류: {e}')
|
|
|
|
# 4. 기관 매수 보너스 (0.3점)
|
|
try:
|
|
inst_buy = api.get_institutional_buy_stocks(limit=30)
|
|
logger.info(f'🏦 기관 매수: {len(inst_buy)}건')
|
|
for idx, item in enumerate(inst_buy):
|
|
code = item.get('stk_cd', '').strip()
|
|
if len(code) != 6:
|
|
continue
|
|
bonus = (30 - idx) / 30.0 * 0.3
|
|
price = abs(float(item.get('cur_prc') or item.get('prpr') or 0))
|
|
if code in all_candidates:
|
|
all_candidates[code]['bonusscore'] += bonus
|
|
else:
|
|
all_candidates[code] = {
|
|
'code': code, 'name': item.get('stk_nm', code),
|
|
'price': price,
|
|
'basescore': 2.5, 'bonusscore': bonus, 'fromant': False
|
|
}
|
|
except Exception as e:
|
|
logger.warning(f'기관 매수 오류: {e}')
|
|
|
|
# 5. 등락률 상위 보너스 (0.2점)
|
|
try:
|
|
price_movers = api.get_top_price_movers(sort_type='1', limit=30)
|
|
logger.info(f'🚀 등락률 상위: {len(price_movers)}건')
|
|
for idx, item in enumerate(price_movers):
|
|
code = item.get('stk_cd', '').strip()
|
|
if len(code) != 6:
|
|
continue
|
|
bonus = (30 - idx) / 30.0 * 0.2
|
|
price = abs(float(item.get('cur_prc') or item.get('prpr') or 0))
|
|
if code in all_candidates:
|
|
all_candidates[code]['bonusscore'] += bonus
|
|
else:
|
|
all_candidates[code] = {
|
|
'code': code, 'name': item.get('stk_nm', code),
|
|
'price': price,
|
|
'basescore': 2.0, 'bonusscore': bonus, 'fromant': False
|
|
}
|
|
except Exception as e:
|
|
logger.warning(f'등락률 순위 오류: {e}')
|
|
|
|
# 6. 최종 후보 필터 (MIN_SCORE 이상만)
|
|
now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
final_candidates = []
|
|
ant_count = 0
|
|
bonus_count = 0
|
|
for code, data in all_candidates.items():
|
|
total_score = data['basescore'] + data['bonusscore']
|
|
if total_score >= MIN_SCORE:
|
|
final_candidates.append({
|
|
'code': code,
|
|
'name': data['name'],
|
|
'score': round(total_score, 2),
|
|
'price': data['price'],
|
|
'scan_time': now_str
|
|
})
|
|
if data['fromant']:
|
|
ant_count += 1
|
|
else:
|
|
bonus_count += 1
|
|
|
|
final_candidates.sort(key=lambda x: x['score'], reverse=True)
|
|
final_candidates = final_candidates[:TOP_N]
|
|
|
|
top5 = [f"{x['name']}({x['score']:.1f})" for x in final_candidates[:5]]
|
|
logger.info(f'✅ 최종 후보: {len(final_candidates)}종목 (개미털기:{ant_count} 보너스:{bonus_count})')
|
|
logger.info(f'🔝 Top5: {", ".join(top5)}')
|
|
|
|
# 7. MariaDB 저장
|
|
if final_candidates:
|
|
save_to_kis_db(final_candidates)
|
|
else:
|
|
logger.warning('⚠️ 최종 후보 0개 → DB 저장 생략')
|
|
|
|
# 8. Mattermost 알림
|
|
try:
|
|
msg = (
|
|
f'🔄 유니버스 업데이트 완료\n'
|
|
f'- 후보: {len(final_candidates)}종목 (개미털기:{ant_count} 보너스:{bonus_count})\n'
|
|
f'- Top5: {", ".join(top5)}'
|
|
)
|
|
mm.send('stock', msg)
|
|
except Exception as e:
|
|
logger.debug(f'MM 알림 오류: {e}')
|
|
|
|
logger.info('=' * 50)
|
|
|
|
|
|
# ── 메인 루프 ──────────────────────────────────────────
|
|
def main():
|
|
logger.info('🚀 키움 유니버스 스캐너 시작')
|
|
logger.info(f' DB : MariaDB kis_quant_db (192.168.0.141)')
|
|
logger.info(f' 스캔 주기 : {SCAN_INTERVAL}초')
|
|
logger.info(f' 최소 점수 : {MIN_SCORE}')
|
|
logger.info(f' 저장 Top N : {TOP_N}')
|
|
|
|
api = BrokerAPI()
|
|
mm = MattermostBot()
|
|
|
|
is_first_run = True
|
|
while True:
|
|
try:
|
|
is_open = api.check_market_status()
|
|
now = datetime.datetime.now()
|
|
|
|
# 장 중이거나 첫 실행이면 스캔
|
|
if is_open or is_first_run:
|
|
update_universe(api, mm)
|
|
is_first_run = False
|
|
else:
|
|
logger.info(f'⏸ 장 외 시간 ({now.strftime("%H:%M:%S")}) → 스캔 대기')
|
|
|
|
# 다음 5분 경계까지 대기
|
|
next_min = (now.minute // 5 + 1) * 5
|
|
if next_min >= 60:
|
|
next_time = now.replace(hour=now.hour + 1, minute=0, second=5, microsecond=0)
|
|
else:
|
|
next_time = now.replace(minute=next_min, second=5, microsecond=0)
|
|
wait_sec = max(5, (next_time - now).total_seconds())
|
|
logger.info(f'⏳ 다음 스캔: {next_time.strftime("%H:%M:%S")} ({wait_sec:.0f}초 후)')
|
|
time.sleep(wait_sec)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info('⛔ 종료 요청')
|
|
break
|
|
except Exception as e:
|
|
logger.error(f'메인 루프 오류: {e}')
|
|
time.sleep(60)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main() |