月曜日, 6月 23, 2025
月曜日, 6月 23, 2025
- Advertisment -
ホームニューステックニュースPython×株式投資:従来の100倍!銘柄選抜のバックテストを高速化した話 #初心者 - Qiita

Python×株式投資:従来の100倍!銘柄選抜のバックテストを高速化した話 #初心者 – Qiita



Python×株式投資:従来の100倍!銘柄選抜のバックテストを高速化した話 #初心者 - Qiita

# -----------------------------
# 2nd screening V3
# -----------------------------
import time 

global_start_time = time.time()

from google.colab import drive
drive.mount('/content/drive')

import pandas as pd
import numpy as np
import os
from tqdm.notebook import tqdm 
import yfinance as yf
from curl_cffi import requests
session = requests.Session(impersonate="safari15_5")

# --------------------------------------------------
# ヘルパー関数定義セクション 
# --------------------------------------------------

def calculate_market_sentiment_score(ticker_symbol: str, start_date: str, end_date: str, session) -> pd.DataFrame:
    """
    指定されたティッカーシンボル(TOPIX連動ETFなど)のデータに基づき、地合いスコアを計算する。
    """
    print(f"🌀 地合いスコア計算のため、{ticker_symbol} のデータをダウンロード中...")
    start_str = pd.Timestamp(start_date).strftime('%Y-%m-%d')
    end_str = pd.Timestamp(end_date).strftime('%Y-%m-%d')
    market_data = yf.download(ticker_symbol, start=start_str, end=end_str, interval="1d", session=session)

    if market_data.empty:
        print(f"⚠️ {ticker_symbol} のデータが見つかりません。地合いスコア計算をスキップします。")
        return pd.DataFrame()

    if isinstance(market_data.columns, pd.MultiIndex):
        market_data.columns = market_data.columns.get_level_values(0)

    market_data["LogReturn"] = np.log(market_data["Close"] / market_data["Close"].shift(1))
    market_data["MA25"] = market_data["Close"].rolling(25).mean()
    market_data["MA25_diff"] = (market_data["Close"] - market_data["MA25"]) / market_data["MA25"]

    delta = market_data["Close"].diff()
    gain = np.where(delta > 0, delta, 0)
    loss = np.where(delta  0, -delta, 0)
    avg_gain = pd.Series(gain, index=market_data.index).rolling(14).mean()
    avg_loss = pd.Series(loss, index=market_data.index).rolling(14).mean()
    rs = avg_gain / (avg_loss + 1e-10)
    market_data["RSI_14"] = 100 - (100 / (1 + rs))

    market_data["score_today"] = 0
    market_data["score_today"] += (market_data["LogReturn"].rolling(5).sum() > 0.01).astype(int)
    market_data["score_today"] += (market_data["MA25_diff"] > 0).astype(int)
    market_data["score_today"] += (market_data["RSI_14"] > 55).astype(int)
    market_data["score_today"] -= (market_data["RSI_14"]  45).astype(int)
    market_data["score_today"] -= (market_data["LogReturn"].rolling(5).sum()  -0.01).astype(int)
    market_data["score_today"] -= (market_data["MA25_diff"]  0).astype(int)

    market_data["score_mean"] = market_data["score_today"].rolling(10).mean()
    market_data["score_std"] = market_data["score_today"].rolling(10).std()
    market_data["score_trend"] = market_data["score_today"].diff().rolling(10).mean()

    ema12 = market_data["Close"].ewm(span=12, adjust=False).mean()
    ema26 = market_data["Close"].ewm(span=26, adjust=False).mean()
    market_data["MACD"] = ema12 - ema26
    market_data["Signal"] = market_data["MACD"].ewm(span=9, adjust=False).mean()
    market_data["MACD_Hist"] = market_data["MACD"] - market_data["Signal"]

    market_data.index.name = 'Date'
    market_df = market_data.dropna().copy()
    print(f"✅ 地合いスコア計算完了 ({ticker_symbol})。")
    return market_df

def calculate_technical_indicators(df_group):
    """
    個別銘柄のデータフレームを受け取り、テクニカル指標を計算して返す。
    """
    if df_group.empty or len(df_group)  75:
        return df_group

    df_group["MA_5"] = df_group["Close"].rolling(window=5).mean()
    df_group["MA_25"] = df_group["Close"].rolling(window=25).mean()
    df_group["MA_75"] = df_group["Close"].rolling(window=75).mean()

    delta = df_group["Close"].diff()
    gain = np.where(delta > 0, delta, 0)
    loss = np.where(delta  0, -delta, 0)
    avg_gain = pd.Series(gain, index=df_group.index).rolling(14).mean()
    avg_loss = pd.Series(loss, index=df_group.index).rolling(14).mean()
    rs = avg_gain / (avg_loss + 1e-10)
    df_group["RSI"] = 100 - (100 / (1 + rs))

    df_group["MA_5_slope"] = df_group["MA_5"].diff(5) / 5

    return df_group

# --------------------------------------------------
# 設定・定義セクション 
# --------------------------------------------------

# --- パス設定 ---
input_base_dir = "/content/drive/MyDrive/stock_prediction/ver.1/results/Qiita/1st/J-Quants/Prime/selected_ajt_type2"
output_base_dir = "/content/drive/MyDrive/stock_prediction/ver.1/results/Qiita/2nd/Speed _test/mydataset/morerapid"
os.makedirs(output_base_dir, exist_ok=True)

# --- データソース設定 ---
PARQUET_PATH_FOR_PREDICTION = "/content/drive/MyDrive/stock_prediction/ver.1/Database/OHLCV/プライム/total_with_date/batch_size/ticker_combined_append_batch_PER.parquet"
MARKET_INDEX_TICKER = '1306.T'  # 地合い計算に使うTOPIX連動ETFなど

# --- 日付リスト ---
date_list = [
    '2022-01-13', '2022-01-24', '2022-01-28', '2022-02-01', '2022-04-20',
    '2022-05-17', '2022-05-27', '2022-06-21', '2022-07-21', '2022-09-05',
    '2022-09-09', '2022-11-28', '2022-12-01', '2022-12-21', '2023-02-17',
    '2023-03-28', '2023-04-24', '2023-07-14', '2023-08-04', '2023-09-21',
    '2023-10-05', '2023-11-09', '2023-11-29', '2023-12-04', '2023-12-21',
    '2023-12-22', '2024-03-05', '2024-03-13', '2024-03-26', '2024-04-11',
    '2024-06-06', '2024-06-07', '2024-07-22', '2024-07-26', '2024-09-12'
    
]

# --- スクリーニング条件定義 ---
screening_conditions = [
    {"name": "pattern1", "rsi_range": (25, 50), "ma_eps": 0.02, "slope_period": 5, "slope_thresh": 0.04},
    {"name": "pattern2", "rsi_range": (25, 50), "ma_eps": 0.04, "slope_period": 5, "slope_thresh": 0.04},
    {"name": "pattern3", "rsi_range": (25, 50), "ma_eps": 0.06, "slope_period": 5, "slope_thresh": 0.04},
    {"name": "pattern4", "rsi_range": (25, 50), "ma_eps": 0.08, "slope_period": 5, "slope_thresh": 0.04},

    {"name": "pattern5", "rsi_range": (30, 60), "ma_eps": 0.02, "slope_period": 5, "slope_thresh": 0.04},
    {"name": "pattern6", "rsi_range": (30, 60), "ma_eps": 0.04, "slope_period": 5, "slope_thresh": 0.04},
    {"name": "pattern7", "rsi_range": (30, 60), "ma_eps": 0.06, "slope_period": 5, "slope_thresh": 0.04},
    {"name": "pattern8", "rsi_range": (30, 60), "ma_eps": 0.08, "slope_period": 5, "slope_thresh": 0.04}
]


# ------------------------------------------------------------------------------
# メイン処理 
# ------------------------------------------------------------------------------

# -----------------------------
# 1. データ準備(一括読み込みと事前計算)
# -----------------------------
print("\n--- 全データセットの読み込みとテクニカル指標の事前計算を開始します ---")
global_data_load_start_time = time.time()

# 分析期間を決定
earliest_screening_date = pd.Timestamp(date_list[0])
latest_screening_date = pd.Timestamp(date_list[-1])
max_lookback_days = 250
max_lookahead_days = 90
global_data_start_date = earliest_screening_date - pd.Timedelta(days=max_lookback_days)
global_data_end_date = latest_screening_date + pd.Timedelta(days=max_lookahead_days)
print(f"分析対象期間: {global_data_start_date.strftime('%Y-%m-%d')} から {global_data_end_date.strftime('%Y-%m-%d')}")

# 地合いスコアを動的に計算
try:
    df_market_sentiment = calculate_market_sentiment_score(
        ticker_symbol=MARKET_INDEX_TICKER,
        start_date=global_data_start_date,
        end_date=global_data_end_date,
        session=session
    )
    sentiment_cols = ['score_today', 'score_mean', 'score_std', 'score_trend']
    df_market_sentiment_to_merge = df_market_sentiment[sentiment_cols].copy()
except Exception as e:
    print(f"致命的エラー: 地合いスコアの計算に失敗しました: {e}")
    df_market_sentiment = pd.DataFrame() # ループ内で参照するため空で定義
    df_market_sentiment_to_merge = pd.DataFrame(columns=['score_today', 'score_mean', 'score_std', 'score_trend'])

# ParquetファイルからOHLCVデータを読み込む
try:
    df_all_data = pd.read_parquet(
        PARQUET_PATH_FOR_PREDICTION,
        columns=['Date', 'Ticker', 'Open', 'High', 'Low', 'Close', 'Volume'],
        filters=[('Date', '>=', pd.Timestamp(global_data_start_date)), ('Date', '', pd.Timestamp(global_data_end_date))]
    )
    df_all_data["Date"] = pd.to_datetime(df_all_data["Date"])
    df_all_data["Ticker"] = df_all_data["Ticker"].str.replace(".T", "", regex=False)
except Exception as e:
    print(f"致命的エラー: 全データセットの読み込みに失敗しました: {e}")
    exit()

# 銘柄データに地合いスコアを日付キーでマージ
if not df_market_sentiment_to_merge.empty:
    print("🌀 銘柄データに地合いスコアを結合中...")
    df_all_data_processed = pd.merge(df_all_data, df_market_sentiment_to_merge.reset_index(), on='Date', how='left')
    df_all_data_processed = df_all_data_processed.set_index(["Date", "Ticker"]).sort_index()
else:
    print("⚠️ 地合いスコアが利用できないため、スコア関連の列はNaNになります。")
    df_all_data_processed = df_all_data.set_index(["Date", "Ticker"]).sort_index()
    for col in ['score_today', 'score_mean', 'score_std', 'score_trend']:
        df_all_data_processed[col] = np.nan

# 全銘柄のテクニカル指標を計算 (★ここで、冒頭で定義した関数を呼び出す)
print("🌀 全銘柄のテクニカル指標を計算中...")
tickers_to_process = df_all_data_processed.index.get_level_values('Ticker').unique()
processed_data_parts = []
for ticker_val in tqdm(tickers_to_process, desc="↳ 各銘柄のテクニカル指標計算"):
    ticker_df_part = df_all_data_processed.loc[(slice(None), ticker_val), :].reset_index(level='Ticker', drop=True).copy()
    processed_df_part = calculate_technical_indicators(ticker_df_part)
    processed_df_part['Ticker'] = ticker_val
    processed_df_part = processed_df_part.set_index('Ticker', append=True).swaplevel(0,1)
    processed_data_parts.append(processed_df_part)

df_all_data_processed = pd.concat(processed_data_parts).sort_index()

global_data_load_end_time = time.time()
print(f"✅ 全データセットの読み込みとテクニカル指標の事前計算が完了しました。所要時間: {global_data_load_end_time - global_data_load_start_time:.2f}")


# -----------------------------
# 2. 日付ごとのバックテスト実行
# -----------------------------
for today_str in tqdm(date_list, desc="全体進捗 (日付別)"):
    today_process_start_time = time.time()
    today = pd.Timestamp(today_str)

    # 1stスクリーニング結果を読む
    input_csv_path = os.path.join(input_base_dir, f'1st_filtered_{today_str}.csv')
    try:
        df_list = pd.read_csv(input_csv_path)
    except FileNotFoundError:
        print(f"エラー: 1stスクリーニング結果ファイルが見つかりません: {input_csv_path}")
        continue

    # 出力先を作成
    output_dir = os.path.join(output_base_dir, f'2nd_16pts_slope5_{today_str}')
    os.makedirs(output_dir, exist_ok=True)

    # 地合いスコアの取得
    score_today_val, score_mean, score_std, score_trend = np.nan, np.nan, np.nan, np.nan
    try:
        if not df_market_sentiment.empty:
            if today in df_market_sentiment.index:
                score_row = df_market_sentiment.loc[today]
            else:
                temp_idx = df_market_sentiment.index.get_indexer([today], method="nearest")[0]
                temp_date = df_market_sentiment.index[temp_idx]
                score_date = temp_date if temp_date  today else (df_market_sentiment.index[temp_idx - 1] if temp_idx > 0 else None)
                score_row = df_market_sentiment.loc[score_date] if score_date else None

            if score_row is not None:
                score_today_val, score_mean, score_std, score_trend = score_row['score_today'], score_row['score_mean'], score_row['score_std'], score_row['score_trend']
    except Exception as e:
        print(f"地合いスコア取得中にエラー: {e}")

    # 条件別比較&バックテスト
    summary = []
    for cond in tqdm(screening_conditions, desc=f"  ↳ 日付 {today_str} のパターン処理", leave=False):
        result = []
        for _, row in df_list.iterrows():
            ticker = str(row["LocalCode"])[0:4]
            try:
                df = df_all_data_processed.loc[(ticker,slice(None)), :].reset_index(level='Ticker', drop=True).copy()
                if df.empty: continue

                screening_date = today
                if screening_date not in df.index:
                    temp_idx = df.index.get_indexer([screening_date], method="nearest")[0]
                    temp_date = df.index[temp_idx]
                    screening_date = temp_date if temp_date  screening_date else (df.index[temp_idx - 1] if temp_idx > 0 else None)
                if not screening_date: continue

                last_row = df.loc[screening_date]
                ma5_last, ma25_last, ma75_last = last_row["MA_5"], last_row["MA_25"], last_row["MA_75"]
                slope_last, rsi_last, close_last = last_row["MA_5_slope"], last_row["RSI"], last_row["Close"]

                epsilon = close_last * cond["ma_eps"]
                trend_flag = (abs(ma5_last - ma25_last)  epsilon and abs(ma25_last - ma75_last)  epsilon and slope_last > cond["slope_thresh"])
                rsi_flag = (cond["rsi_range"][0]  rsi_last  cond["rsi_range"][1])

                if trend_flag and rsi_flag:
                    df_trading_days = df.dropna(subset=['Close'])
                    if screening_date not in df_trading_days.index: continue
                    screening_idx_pos = df_trading_days.index.get_loc(screening_date)
                    returns = {}
                    for days in [14, 30, 60, 90]:
                        future_idx_pos = screening_idx_pos + days
                        returns[f"Return({days}d)%"] = ((df_trading_days["Close"].iloc[future_idx_pos] - close_last) / close_last * 100) if future_idx_pos  len(df_trading_days) else np.nan
                    result.append({"Ticker": ticker, "Name": row.get("Name", "NoName"), **returns})
            except (KeyError, IndexError, TypeError):
                continue

        df_result = pd.DataFrame(result)
        df_result.to_csv(f"{output_dir}/result_{cond['name']}_{today_str}.csv", index=False)

        # summary への追加
        mean_return14, win_rate14 = (df_result["Return(14d)%"].mean(), (df_result["Return(14d)%"] > 0).mean() * 100) if not df_result.empty else (np.nan, np.nan)
        mean_return30, win_rate30 = (df_result["Return(30d)%"].mean(), (df_result["Return(30d)%"] > 0).mean() * 100) if not df_result.empty else (np.nan, np.nan)
        mean_return60, win_rate60 = (df_result["Return(60d)%"].mean(), (df_result["Return(60d)%"] > 0).mean() * 100) if not df_result.empty else (np.nan, np.nan)
        mean_return90, win_rate90 = (df_result["Return(90d)%"].mean(), (df_result["Return(90d)%"] > 0).mean() * 100) if not df_result.empty else (np.nan, np.nan)

        summary.append({
            "Pattern": cond["name"], "銘柄数": len(df_result),
            "平均リターン(2週間)%": round(mean_return14, 2), "勝率(2週間後にプラス)%": round(win_rate14, 2),
            "平均リターン(30日)%": round(mean_return30, 2), "勝率(30日後にプラス)%": round(win_rate30, 2),
            "平均リターン(60日)%": round(mean_return60, 2), "勝率(60日後にプラス)%": round(win_rate60, 2),
            "平均リターン(90日)%": round(mean_return90, 2), "勝率(90日後にプラス)%": round(win_rate90, 2),
            "地合いスコア(当日)": int(score_today_val) if pd.notna(score_today_val) else np.nan,
            "地合いスコア平均": round(score_mean, 2), "地合いスコア標準偏差": round(score_std, 2), "地合いスコア傾向": round(score_trend, 3)
        })

    # summaryの保存
    df_summary = pd.DataFrame(summary)
    today_process_end_time = time.time()
    elapsed_time_today = today_process_end_time - today_process_start_time
    df_summary["この日の処理時間(秒)"] = elapsed_time_today
    df_summary.to_csv(f"{output_dir}/summary_16pts_slope5_{today_str}.csv", index=False)


global_end_time = time.time()
total_elapsed_time = global_end_time - global_start_time
print(f"\n全日付 完了!! 合計処理時間: {total_elapsed_time:.2f}")





Source link

Views: 0

RELATED ARTICLES

返事を書く

あなたのコメントを入力してください。
ここにあなたの名前を入力してください

- Advertisment -