# -----------------------------
# 2nd screening V3
# -----------------------------
import time
global_start_time = time.time()
from google.colab import drive
drive.mount('/content/drive')
import pandas as pd
import numpy as np
import os
from tqdm.notebook import tqdm
import yfinance as yf
from curl_cffi import requests
session = requests.Session(impersonate="safari15_5")
# --------------------------------------------------
# ヘルパー関数定義セクション
# --------------------------------------------------
def calculate_market_sentiment_score(ticker_symbol: str, start_date: str, end_date: str, session) -> pd.DataFrame:
"""
指定されたティッカーシンボル(TOPIX連動ETFなど)のデータに基づき、地合いスコアを計算する。
"""
print(f"🌀 地合いスコア計算のため、{ticker_symbol} のデータをダウンロード中...")
start_str = pd.Timestamp(start_date).strftime('%Y-%m-%d')
end_str = pd.Timestamp(end_date).strftime('%Y-%m-%d')
market_data = yf.download(ticker_symbol, start=start_str, end=end_str, interval="1d", session=session)
if market_data.empty:
print(f"⚠️ {ticker_symbol} のデータが見つかりません。地合いスコア計算をスキップします。")
return pd.DataFrame()
if isinstance(market_data.columns, pd.MultiIndex):
market_data.columns = market_data.columns.get_level_values(0)
market_data["LogReturn"] = np.log(market_data["Close"] / market_data["Close"].shift(1))
market_data["MA25"] = market_data["Close"].rolling(25).mean()
market_data["MA25_diff"] = (market_data["Close"] - market_data["MA25"]) / market_data["MA25"]
delta = market_data["Close"].diff()
gain = np.where(delta > 0, delta, 0)
loss = np.where(delta 0, -delta, 0)
avg_gain = pd.Series(gain, index=market_data.index).rolling(14).mean()
avg_loss = pd.Series(loss, index=market_data.index).rolling(14).mean()
rs = avg_gain / (avg_loss + 1e-10)
market_data["RSI_14"] = 100 - (100 / (1 + rs))
market_data["score_today"] = 0
market_data["score_today"] += (market_data["LogReturn"].rolling(5).sum() > 0.01).astype(int)
market_data["score_today"] += (market_data["MA25_diff"] > 0).astype(int)
market_data["score_today"] += (market_data["RSI_14"] > 55).astype(int)
market_data["score_today"] -= (market_data["RSI_14"] 45).astype(int)
market_data["score_today"] -= (market_data["LogReturn"].rolling(5).sum() -0.01).astype(int)
market_data["score_today"] -= (market_data["MA25_diff"] 0).astype(int)
market_data["score_mean"] = market_data["score_today"].rolling(10).mean()
market_data["score_std"] = market_data["score_today"].rolling(10).std()
market_data["score_trend"] = market_data["score_today"].diff().rolling(10).mean()
ema12 = market_data["Close"].ewm(span=12, adjust=False).mean()
ema26 = market_data["Close"].ewm(span=26, adjust=False).mean()
market_data["MACD"] = ema12 - ema26
market_data["Signal"] = market_data["MACD"].ewm(span=9, adjust=False).mean()
market_data["MACD_Hist"] = market_data["MACD"] - market_data["Signal"]
market_data.index.name = 'Date'
market_df = market_data.dropna().copy()
print(f"✅ 地合いスコア計算完了 ({ticker_symbol})。")
return market_df
def calculate_technical_indicators(df_group):
"""
個別銘柄のデータフレームを受け取り、テクニカル指標を計算して返す。
"""
if df_group.empty or len(df_group) 75:
return df_group
df_group["MA_5"] = df_group["Close"].rolling(window=5).mean()
df_group["MA_25"] = df_group["Close"].rolling(window=25).mean()
df_group["MA_75"] = df_group["Close"].rolling(window=75).mean()
delta = df_group["Close"].diff()
gain = np.where(delta > 0, delta, 0)
loss = np.where(delta 0, -delta, 0)
avg_gain = pd.Series(gain, index=df_group.index).rolling(14).mean()
avg_loss = pd.Series(loss, index=df_group.index).rolling(14).mean()
rs = avg_gain / (avg_loss + 1e-10)
df_group["RSI"] = 100 - (100 / (1 + rs))
df_group["MA_5_slope"] = df_group["MA_5"].diff(5) / 5
return df_group
# --------------------------------------------------
# 設定・定義セクション
# --------------------------------------------------
# --- パス設定 ---
input_base_dir = "/content/drive/MyDrive/stock_prediction/ver.1/results/Qiita/1st/J-Quants/Prime/selected_ajt_type2"
output_base_dir = "/content/drive/MyDrive/stock_prediction/ver.1/results/Qiita/2nd/Speed _test/mydataset/morerapid"
os.makedirs(output_base_dir, exist_ok=True)
# --- データソース設定 ---
PARQUET_PATH_FOR_PREDICTION = "/content/drive/MyDrive/stock_prediction/ver.1/Database/OHLCV/プライム/total_with_date/batch_size/ticker_combined_append_batch_PER.parquet"
MARKET_INDEX_TICKER = '1306.T' # 地合い計算に使うTOPIX連動ETFなど
# --- 日付リスト ---
date_list = [
'2022-01-13', '2022-01-24', '2022-01-28', '2022-02-01', '2022-04-20',
'2022-05-17', '2022-05-27', '2022-06-21', '2022-07-21', '2022-09-05',
'2022-09-09', '2022-11-28', '2022-12-01', '2022-12-21', '2023-02-17',
'2023-03-28', '2023-04-24', '2023-07-14', '2023-08-04', '2023-09-21',
'2023-10-05', '2023-11-09', '2023-11-29', '2023-12-04', '2023-12-21',
'2023-12-22', '2024-03-05', '2024-03-13', '2024-03-26', '2024-04-11',
'2024-06-06', '2024-06-07', '2024-07-22', '2024-07-26', '2024-09-12'
]
# --- スクリーニング条件定義 ---
screening_conditions = [
{"name": "pattern1", "rsi_range": (25, 50), "ma_eps": 0.02, "slope_period": 5, "slope_thresh": 0.04},
{"name": "pattern2", "rsi_range": (25, 50), "ma_eps": 0.04, "slope_period": 5, "slope_thresh": 0.04},
{"name": "pattern3", "rsi_range": (25, 50), "ma_eps": 0.06, "slope_period": 5, "slope_thresh": 0.04},
{"name": "pattern4", "rsi_range": (25, 50), "ma_eps": 0.08, "slope_period": 5, "slope_thresh": 0.04},
{"name": "pattern5", "rsi_range": (30, 60), "ma_eps": 0.02, "slope_period": 5, "slope_thresh": 0.04},
{"name": "pattern6", "rsi_range": (30, 60), "ma_eps": 0.04, "slope_period": 5, "slope_thresh": 0.04},
{"name": "pattern7", "rsi_range": (30, 60), "ma_eps": 0.06, "slope_period": 5, "slope_thresh": 0.04},
{"name": "pattern8", "rsi_range": (30, 60), "ma_eps": 0.08, "slope_period": 5, "slope_thresh": 0.04}
]
# ------------------------------------------------------------------------------
# メイン処理
# ------------------------------------------------------------------------------
# -----------------------------
# 1. データ準備(一括読み込みと事前計算)
# -----------------------------
print("\n--- 全データセットの読み込みとテクニカル指標の事前計算を開始します ---")
global_data_load_start_time = time.time()
# 分析期間を決定
earliest_screening_date = pd.Timestamp(date_list[0])
latest_screening_date = pd.Timestamp(date_list[-1])
max_lookback_days = 250
max_lookahead_days = 90
global_data_start_date = earliest_screening_date - pd.Timedelta(days=max_lookback_days)
global_data_end_date = latest_screening_date + pd.Timedelta(days=max_lookahead_days)
print(f"分析対象期間: {global_data_start_date.strftime('%Y-%m-%d')} から {global_data_end_date.strftime('%Y-%m-%d')}")
# 地合いスコアを動的に計算
try:
df_market_sentiment = calculate_market_sentiment_score(
ticker_symbol=MARKET_INDEX_TICKER,
start_date=global_data_start_date,
end_date=global_data_end_date,
session=session
)
sentiment_cols = ['score_today', 'score_mean', 'score_std', 'score_trend']
df_market_sentiment_to_merge = df_market_sentiment[sentiment_cols].copy()
except Exception as e:
print(f"致命的エラー: 地合いスコアの計算に失敗しました: {e}")
df_market_sentiment = pd.DataFrame() # ループ内で参照するため空で定義
df_market_sentiment_to_merge = pd.DataFrame(columns=['score_today', 'score_mean', 'score_std', 'score_trend'])
# ParquetファイルからOHLCVデータを読み込む
try:
df_all_data = pd.read_parquet(
PARQUET_PATH_FOR_PREDICTION,
columns=['Date', 'Ticker', 'Open', 'High', 'Low', 'Close', 'Volume'],
filters=[('Date', '>=', pd.Timestamp(global_data_start_date)), ('Date', '', pd.Timestamp(global_data_end_date))]
)
df_all_data["Date"] = pd.to_datetime(df_all_data["Date"])
df_all_data["Ticker"] = df_all_data["Ticker"].str.replace(".T", "", regex=False)
except Exception as e:
print(f"致命的エラー: 全データセットの読み込みに失敗しました: {e}")
exit()
# 銘柄データに地合いスコアを日付キーでマージ
if not df_market_sentiment_to_merge.empty:
print("🌀 銘柄データに地合いスコアを結合中...")
df_all_data_processed = pd.merge(df_all_data, df_market_sentiment_to_merge.reset_index(), on='Date', how='left')
df_all_data_processed = df_all_data_processed.set_index(["Date", "Ticker"]).sort_index()
else:
print("⚠️ 地合いスコアが利用できないため、スコア関連の列はNaNになります。")
df_all_data_processed = df_all_data.set_index(["Date", "Ticker"]).sort_index()
for col in ['score_today', 'score_mean', 'score_std', 'score_trend']:
df_all_data_processed[col] = np.nan
# 全銘柄のテクニカル指標を計算 (★ここで、冒頭で定義した関数を呼び出す)
print("🌀 全銘柄のテクニカル指標を計算中...")
tickers_to_process = df_all_data_processed.index.get_level_values('Ticker').unique()
processed_data_parts = []
for ticker_val in tqdm(tickers_to_process, desc="↳ 各銘柄のテクニカル指標計算"):
ticker_df_part = df_all_data_processed.loc[(slice(None), ticker_val), :].reset_index(level='Ticker', drop=True).copy()
processed_df_part = calculate_technical_indicators(ticker_df_part)
processed_df_part['Ticker'] = ticker_val
processed_df_part = processed_df_part.set_index('Ticker', append=True).swaplevel(0,1)
processed_data_parts.append(processed_df_part)
df_all_data_processed = pd.concat(processed_data_parts).sort_index()
global_data_load_end_time = time.time()
print(f"✅ 全データセットの読み込みとテクニカル指標の事前計算が完了しました。所要時間: {global_data_load_end_time - global_data_load_start_time:.2f} 秒")
# -----------------------------
# 2. 日付ごとのバックテスト実行
# -----------------------------
for today_str in tqdm(date_list, desc="全体進捗 (日付別)"):
today_process_start_time = time.time()
today = pd.Timestamp(today_str)
# 1stスクリーニング結果を読む
input_csv_path = os.path.join(input_base_dir, f'1st_filtered_{today_str}.csv')
try:
df_list = pd.read_csv(input_csv_path)
except FileNotFoundError:
print(f"エラー: 1stスクリーニング結果ファイルが見つかりません: {input_csv_path}")
continue
# 出力先を作成
output_dir = os.path.join(output_base_dir, f'2nd_16pts_slope5_{today_str}')
os.makedirs(output_dir, exist_ok=True)
# 地合いスコアの取得
score_today_val, score_mean, score_std, score_trend = np.nan, np.nan, np.nan, np.nan
try:
if not df_market_sentiment.empty:
if today in df_market_sentiment.index:
score_row = df_market_sentiment.loc[today]
else:
temp_idx = df_market_sentiment.index.get_indexer([today], method="nearest")[0]
temp_date = df_market_sentiment.index[temp_idx]
score_date = temp_date if temp_date today else (df_market_sentiment.index[temp_idx - 1] if temp_idx > 0 else None)
score_row = df_market_sentiment.loc[score_date] if score_date else None
if score_row is not None:
score_today_val, score_mean, score_std, score_trend = score_row['score_today'], score_row['score_mean'], score_row['score_std'], score_row['score_trend']
except Exception as e:
print(f"地合いスコア取得中にエラー: {e}")
# 条件別比較&バックテスト
summary = []
for cond in tqdm(screening_conditions, desc=f" ↳ 日付 {today_str} のパターン処理", leave=False):
result = []
for _, row in df_list.iterrows():
ticker = str(row["LocalCode"])[0:4]
try:
df = df_all_data_processed.loc[(ticker,slice(None)), :].reset_index(level='Ticker', drop=True).copy()
if df.empty: continue
screening_date = today
if screening_date not in df.index:
temp_idx = df.index.get_indexer([screening_date], method="nearest")[0]
temp_date = df.index[temp_idx]
screening_date = temp_date if temp_date screening_date else (df.index[temp_idx - 1] if temp_idx > 0 else None)
if not screening_date: continue
last_row = df.loc[screening_date]
ma5_last, ma25_last, ma75_last = last_row["MA_5"], last_row["MA_25"], last_row["MA_75"]
slope_last, rsi_last, close_last = last_row["MA_5_slope"], last_row["RSI"], last_row["Close"]
epsilon = close_last * cond["ma_eps"]
trend_flag = (abs(ma5_last - ma25_last) epsilon and abs(ma25_last - ma75_last) epsilon and slope_last > cond["slope_thresh"])
rsi_flag = (cond["rsi_range"][0] rsi_last cond["rsi_range"][1])
if trend_flag and rsi_flag:
df_trading_days = df.dropna(subset=['Close'])
if screening_date not in df_trading_days.index: continue
screening_idx_pos = df_trading_days.index.get_loc(screening_date)
returns = {}
for days in [14, 30, 60, 90]:
future_idx_pos = screening_idx_pos + days
returns[f"Return({days}d)%"] = ((df_trading_days["Close"].iloc[future_idx_pos] - close_last) / close_last * 100) if future_idx_pos len(df_trading_days) else np.nan
result.append({"Ticker": ticker, "Name": row.get("Name", "NoName"), **returns})
except (KeyError, IndexError, TypeError):
continue
df_result = pd.DataFrame(result)
df_result.to_csv(f"{output_dir}/result_{cond['name']}_{today_str}.csv", index=False)
# summary への追加
mean_return14, win_rate14 = (df_result["Return(14d)%"].mean(), (df_result["Return(14d)%"] > 0).mean() * 100) if not df_result.empty else (np.nan, np.nan)
mean_return30, win_rate30 = (df_result["Return(30d)%"].mean(), (df_result["Return(30d)%"] > 0).mean() * 100) if not df_result.empty else (np.nan, np.nan)
mean_return60, win_rate60 = (df_result["Return(60d)%"].mean(), (df_result["Return(60d)%"] > 0).mean() * 100) if not df_result.empty else (np.nan, np.nan)
mean_return90, win_rate90 = (df_result["Return(90d)%"].mean(), (df_result["Return(90d)%"] > 0).mean() * 100) if not df_result.empty else (np.nan, np.nan)
summary.append({
"Pattern": cond["name"], "銘柄数": len(df_result),
"平均リターン(2週間)%": round(mean_return14, 2), "勝率(2週間後にプラス)%": round(win_rate14, 2),
"平均リターン(30日)%": round(mean_return30, 2), "勝率(30日後にプラス)%": round(win_rate30, 2),
"平均リターン(60日)%": round(mean_return60, 2), "勝率(60日後にプラス)%": round(win_rate60, 2),
"平均リターン(90日)%": round(mean_return90, 2), "勝率(90日後にプラス)%": round(win_rate90, 2),
"地合いスコア(当日)": int(score_today_val) if pd.notna(score_today_val) else np.nan,
"地合いスコア平均": round(score_mean, 2), "地合いスコア標準偏差": round(score_std, 2), "地合いスコア傾向": round(score_trend, 3)
})
# summaryの保存
df_summary = pd.DataFrame(summary)
today_process_end_time = time.time()
elapsed_time_today = today_process_end_time - today_process_start_time
df_summary["この日の処理時間(秒)"] = elapsed_time_today
df_summary.to_csv(f"{output_dir}/summary_16pts_slope5_{today_str}.csv", index=False)
global_end_time = time.time()
total_elapsed_time = global_end_time - global_start_time
print(f"\n全日付 完了!! 合計処理時間: {total_elapsed_time:.2f} 秒")
Views: 0