Open Source Strategies

Community Scripts

Browse AI-generated trading strategies shared by the community. Fork, learn, and build on each other's work.

8
Published Scripts
EURUSD
Instrument
15m
Timeframe
Sort by 8 results
M
bad
@malco · 2026-04-03
+25.69%
Return
-1.22
Sharpe
0.3%
Max DD
103
Trades
</> View Code
# ╔══════════════════════════════════════════════════════════════╗
# ║                  STRATEGY REQUEST LOG                       ║
# ╚══════════════════════════════════════════════════════════════╝
# Generated        : 2026-04-03 12:31:35
# Model            : Random Forest
# Feature Eng.     : SMA (20, 50, 200), RSI 14
# Signal / Entry   : —
# Optimization     : —
# Risk Mgmt        : —
# Risk Filter      : —
# ══════════════════════════════════════════════════════════════

# ============================================================
# SECTION 0 — IMPORTS & CONSTANTS
# ============================================================

import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix
import plotly.graph_objects as go
from model_wrapper import ModelWrapper

DATA_PATH = "/Users/malco/Desktop/QuantifyMe/data/EURUSD_ticks.csv"
START_DATE = "2026-02-24 18:30:00"
END_DATE = "2026-03-26"
VALIDATION_DATE = "2026-03-23 22:15:00"
TRAIN_SPLIT = 0.9165
STARTING_CAPITAL = 10_000
TRADE_COST = 2e-5  # round-trip cost per trade


# ============================================================
# SECTION 2 — FEATURE ENGINEERING
# ============================================================

def feature_engineering(df, close, open_, high, low):
    df = df.copy()

    # SMA 20, 50, 200
    df['sma_20'] = close.rolling(20).mean()
    df['sma_50'] = close.rolling(50).mean()
    df['sma_200'] = close.rolling(200).mean()

    # Price relative to SMAs
    df['close_minus_sma20'] = close - df['sma_20']
    df['close_minus_sma50'] = close - df['sma_50']
    df['close_minus_sma200'] = close - df['sma_200']

    # SMA crossovers
    df['sma20_minus_sma50'] = df['sma_20'] - df['sma_50']
    df['sma50_minus_sma200'] = df['sma_50'] - df['sma_200']

    # RSI 14
    delta = close.diff()
    gain = delta.clip(lower=0)
    loss = -delta.clip(upper=0)
    avg_gain = gain.ewm(com=13, min_periods=14).mean()
    avg_loss = loss.ewm(com=13, min_periods=14).mean()
    rs = avg_gain / avg_loss.replace(0, np.nan)
    df['rsi_14'] = 100 - (100 / (1 + rs))

    # Returns
    df['ret_1'] = close.pct_change(1)
    df['ret_4'] = close.pct_change(4)
    df['ret_8'] = close.pct_change(8)

    # Volatility
    df['vol_20'] = df['ret_1'].rolling(20).std()

    # High-Low range
    df['hl_range'] = high - low
    df['hl_range_pct'] = df['hl_range'] / close

    return df


# ============================================================
# SECTION 3 — ML MODEL
# ============================================================

def build_model(X_train, y_train):
    cfg = optimization_config()
    clf = RandomForestClassifier(
        n_estimators=cfg['n_estimators'],
        max_depth=cfg['max_depth'],
        min_samples_leaf=cfg['min_samples_leaf'],
        class_weight=cfg['class_weight'],
        random_state=42,
        n_jobs=-1
    )
    clf.fit(X_train, y_train)
    le = LabelEncoder()
    le.fit([-1, 0, 1])
    model = ModelWrapper(clf, original_classes=le.classes_, n_features=X_train.shape[1])
    return model


# ============================================================
# SECTION 4 — SIGNAL / ENTRY LOGIC
# ============================================================

def generate_signals(model, X, thresh):
    proba = model.predict_proba(X)
    classes = list(model.classes_)

    if 1 in classes:
        idx_pos = classes.index(1)
        p_pos = proba[:, idx_pos]
    else:
        p_pos = np.zeros(len(X))

    if -1 in classes:
        idx_neg = classes.index(-1)
        p_neg = proba[:, idx_neg]
    else:
        p_neg = np.zeros(len(X))

    signal_vals = np.zeros(len(X))
    signal_vals[p_pos >= thresh] = 1.0
    signal_vals[p_neg >= thresh] = -1.0

    # When both exceed threshold, pick the stronger one
    both = (p_pos >= thresh) & (p_neg >= thresh)
    signal_vals[both] = np.where(p_pos[both] >= p_neg[both], 1.0, -1.0)

    signal = pd.Series(signal_vals, index=X.index)
    return signal, p_pos, p_neg


# ============================================================
# SECTION 5 — OPTIMISATION TARGET
# ============================================================

def optimization_config():
    return {
        "objective": "Maximize Sharpe ratio",
        "notes": (
            "Random Forest tuned for Sharpe: balanced class weights to avoid "
            "bias, moderate depth to prevent overfitting, high n_estimators for "
            "stable probability estimates."
        ),
        "n_estimators": 300,
        "max_depth": 8,
        "min_samples_leaf": 20,
        "class_weight": "balanced",
    }


# ============================================================
# SECTION 6 — RISK MANAGEMENT
# ============================================================

def apply_risk(signal, close, pos_size=1.0):
    return signal * pos_size


# ============================================================
# SECTION 7 — BACKTEST ENGINE
# ============================================================

def train_and_backtest():
    # ------------------------------------------------------------------
    # Load data
    # ------------------------------------------------------------------
    df_raw = pd.read_csv(DATA_PATH, parse_dates=['Time'])
    df_raw.set_index('Time', inplace=True)
    df_raw.sort_index(inplace=True)

    mid = (df_raw['Bid'] + df_raw['Ask']) / 2.0
    mid = mid.resample('15min').ohlc()
    mid.columns = ['open', 'high', 'low', 'close']
    mid.dropna(inplace=True)

    if START_DATE:
        mid = mid[mid.index >= START_DATE]
    if END_DATE:
        mid = mid[mid.index <= END_DATE]

    close  = mid['close']
    open_  = mid['open']
    high   = mid['high']
    low    = mid['low']

    # ------------------------------------------------------------------
    # Feature engineering on full dataset
    # ------------------------------------------------------------------
    df_feat = mid.copy()
    df_feat = feature_engineering(df_feat, close, open_, high, low)

    # ------------------------------------------------------------------
    # Target
    # ------------------------------------------------------------------
    target = np.sign(close.shift(-4) - close)
    mask = target.notna()
    df_feat = df_feat[mask]
    target  = target[mask]

    close_full  = close[mask]
    open_full   = open_[mask]
    high_full   = high[mask]
    low_full    = low[mask]

    # ------------------------------------------------------------------
    # Feature columns
    # ------------------------------------------------------------------
    feature_cols = [
        'sma_20', 'sma_50', 'sma_200',
        'close_minus_sma20', 'close_minus_sma50', 'close_minus_sma200',
        'sma20_minus_sma50', 'sma50_minus_sma200',
        'rsi_14',
        'ret_1', 'ret_4', 'ret_8',
        'vol_20',
        'hl_range', 'hl_range_pct'
    ]

    df_feat = df_feat.bfill().ffill()
    df_feat.dropna(subset=feature_cols, inplace=True)
    valid_idx = df_feat.index
    target = target.loc[valid_idx]
    close_full = close_full.loc[valid_idx]
    open_full  = open_full.loc[valid_idx]
    high_full  = high_full.loc[valid_idx]
    low_full   = low_full.loc[valid_idx]

    X = df_feat[feature_cols]

    # ------------------------------------------------------------------
    # Train/test split
    # ------------------------------------------------------------------
    n = len(df_feat)
    if VALIDATION_DATE:
        split_idx = len(df_feat[df_feat.index <= VALIDATION_DATE])
    else:
        split_idx = int(n * TRAIN_SPLIT)

    split_idx = max(1, min(split_idx, n - 1))

    X_train = X.iloc[:split_idx]
    X_test  = X.iloc[split_idx:]
    y_train = target.iloc[:split_idx]
    y_test  = target.iloc[split_idx:]
    close_train = close_full.iloc[:split_idx]
    close_test  = close_full.iloc[split_idx:]

    split_dt = str(df_feat.index[split_idx])

    # ------------------------------------------------------------------
    # Label encoding
    # ------------------------------------------------------------------
    enc = LabelEncoder()
    enc.fit([-1, 0, 1])
    y_train_enc = enc.transform(y_train)
    y_test_enc  = enc.transform(y_test)

    # ------------------------------------------------------------------
    # Build model
    # ------------------------------------------------------------------
    model = build_model(X_train, y_train_enc)

    # ------------------------------------------------------------------
    # Generate signals on full dataset (train + test)
    # ------------------------------------------------------------------
    signal_train, p_pos_train, p_neg_train = generate_signals(model, X_train, thresh=0.55)
    signal_test,  p_pos_test,  p_neg_test  = generate_signals(model, X_test,  thresh=0.55)

    signal_test = apply_risk(signal_test, close_test)

    signal_full = pd.concat([signal_train, signal_test])

    # ------------------------------------------------------------------
    # Overlays on full dataset
    # ------------------------------------------------------------------
    bb_mid   = close_full.rolling(20).mean()
    bb_std   = close_full.rolling(20).std()
    bb_upper = bb_mid + 2 * bb_std
    bb_lower = bb_mid - 2 * bb_std
    ma50     = close_full.rolling(50).mean()
    ma100    = close_full.rolling(100).mean()
    ma200    = close_full.rolling(200).mean()

    def _to_list_or_none(series):
        out = []
        for v in series:
            if pd.isna(v) or np.isinf(v):
                out.append(None)
            else:
                out.append(float(v))
        return out

    # ------------------------------------------------------------------
    # Equity curve — trade-level P&L with costs
    # ------------------------------------------------------------------
    signals_arr = signal_full.values
    close_arr   = close_full.values
    dates_full  = [str(d) for d in close_full.index]

    capital = float(STARTING_CAPITAL)
    equity_strategy = [capital]
    equity_bh_start = close_arr[0]
    equity_bh = [capital]

    last_dir = None
    entry_price = None
    ret_dist = []
    ret_dist_long = []
    ret_dist_short = []
    n_trades = 0

    position_returns = []

    for i in range(len(signals_arr)):
        sig = signals_arr[i]
        price = close_arr[i]

        if sig != 0 and sig != last_dir:
            # Close previous trade
            if last_dir is not None and entry_price is not None:
                raw_ret = last_dir * (price - entry_price) / entry_price
                net_ret = raw_ret - TRADE_COST
                ret_dist.append(float(net_ret))
                if last_dir == 1:
                    ret_dist_long.append(float(net_ret))
                else:
                    ret_dist_short.append(float(net_ret))
                capital *= (1 + net_ret)

            # Open new trade
            last_dir = sig
            entry_price = price
            n_trades += 1

        equity_strategy.append(float(capital))
        bh_ret_val = (price - equity_bh_start) / equity_bh_start
        equity_bh.append(float(capital * (1 + bh_ret_val) / 1.0))

    # Align equity length with dates
    equity_strategy = equity_strategy[1:]
    equity_bh_arr = []
    for i in range(len(close_arr)):
        bh_val = STARTING_CAPITAL * (close_arr[i] / close_arr[0])
        equity_bh_arr.append(float(bh_val))

    equity_strategy_arr = []
    running_cap = float(STARTING_CAPITAL)
    last_dir2 = None
    entry_price2 = None
    for i in range(len(signals_arr)):
        sig = signals_arr[i]
        price = close_arr[i]
        if sig != 0 and sig != last_dir2:
            if last_dir2 is not None and entry_price2 is not None:
                raw_ret = last_dir2 * (price - entry_price2) / entry_price2
                net_ret = raw_ret - TRADE_COST
                running_cap *= (1 + net_ret)
            last_dir2 = sig
            entry_price2 = price
        equity_strategy_arr.append(float(running_cap))

    # ------------------------------------------------------------------
    # Metrics
    # ------------------------------------------------------------------
    total_ret = (equity_strategy_arr[-1] - STARTING_CAPITAL) / STARTING_CAPITAL if equity_strategy_arr else 0.0
    bh_ret    = (close_arr[-1] - close_arr[0]) / close_arr[0] if len(close_arr) > 0 else 0.0

    # Sharpe on test period
    test_signals_arr = signal_test.values
    test_close_arr   = close_test.values

    bar_rets = []
    ld = None
    ep = None
    for i in range(len(test_signals_arr)):
        sig = test_signals_arr[i]
        price = test_close_arr[i]
        if sig != 0:
            if ld is not None and ep is not None:
                bar_ret = ld * (price - ep) / ep
                bar_rets.append(bar_ret)
            ld = sig
            ep = price
        elif ld is not None and ep is not None:
            bar_rets.append(0.0)

    if len(bar_rets) > 1:
        ret_series = pd.Series(bar_rets)
        std_val = ret_series.std()
        if std_val == 0 or np.isnan(std_val):
            sharpe_strat = 0.0
        else:
            sharpe_strat = float((ret_series.mean() / std_val) * np.sqrt(252 * 26))
    else:
        sharpe_strat = 0.0

    # BH Sharpe
    bh_bar_rets = pd.Series(test_close_arr).pct_change().dropna()
    if len(bh_bar_rets) > 1 and bh_bar_rets.std() != 0:
        sharpe_bh = float((bh_bar_rets.mean() / bh_bar_rets.std()) * np.sqrt(252 * 26))
    else:
        sharpe_bh = 0.0

    # Max drawdown
    eq_series = pd.Series(equity_strategy_arr)
    roll_max = eq_series.cummax()
    dd_series = (eq_series - roll_max) / roll_max
    mdd = float(dd_series.min()) if len(dd_series) > 0 else 0.0

    # ------------------------------------------------------------------
    # Confusion matrix (test set)
    # ------------------------------------------------------------------
    pred_test = model.predict(X_test)
    y_test_arr = np.asarray(y_test)
    all_labels = [-1, 0, 1]
    try:
        cm = confusion_matrix(y_test_arr, pred_test, labels=all_labels).tolist()
    except Exception:
        cm = [[0, 0, 0], [0, 0, 0], [0, 0, 0]]

    # ------------------------------------------------------------------
    # Rolling accuracy (test period, 30-bar window, active signals only)
    # ------------------------------------------------------------------
    active_mask = pred_test != 0
    correct = (pred_test == y_test_arr).astype(float)
    correct_series = pd.Series(correct, index=X_test.index)
    active_series  = pd.Series(active_mask.astype(float), index=X_test.index)

    roll_correct = correct_series.where(active_series.astype(bool)).rolling(30, min_periods=1).mean()

    rolling_acc_dates  = [str(d) for d in X_test.index]
    rolling_acc_values = []
    for v in roll_correct:
        if pd.isna(v) or np.isinf(v):
            rolling_acc_values.append(None)
        else:
            rolling_acc_values.append(float(v))

    # ------------------------------------------------------------------
    # Feature importance (top 15)
    # ------------------------------------------------------------------
    fi = model.feature_importances_
    fi_pairs = sorted(zip(feature_cols, fi), key=lambda x: x[1])[-15:]
    fi_names  = [p[0] for p in fi_pairs]
    fi_values = [float(p[1]) for p in fi_pairs]

    # ------------------------------------------------------------------
    # Drawdown series
    # ------------------------------------------------------------------
    dd_values = []
    for v in dd_series:
        if pd.isna(v) or np.isinf(v):
            dd_values.append(None)
        else:
            dd_values.append(float(v))

    # ------------------------------------------------------------------
    # Custom figures
    # ------------------------------------------------------------------
    custom_figs = []

    # --- SMA chart ---
    fig_sma = go.Figure()
    fig_sma.add_trace(go.Scatter(
        x=dates_full, y=_to_list_or_none(close_full),
        name='Close', line=dict(color='#d1d4dc', width=1)
    ))
    fig_sma.add_trace(go.Scatter(
        x=dates_full, y=_to_list_or_none(close_full.rolling(20).mean()),
        name='SMA 20', line=dict(color='#f59e0b', width=1.2)
    ))
    fig_sma.add_trace(go.Scatter(
        x=dates_full, y=_to_list_or_none(close_full.rolling(50).mean()),
        name='SMA 50', line=dict(color='#3b82f6', width=1.2)
    ))
    fig_sma.add_trace(go.Scatter(
        x=dates_full, y=_to_list_or_none(close_full.rolling(200).mean()),
        name='SMA 200', line=dict(color='#ef4444', width=1.2)
    ))
    fig_sma.update_layout(
        title='SMA Overlay (20 / 50 / 200)',
        paper_bgcolor="#131722", plot_bgcolor="#131722",
        font_color="#d1d4dc",
        margin=dict(l=40, r=20, t=30, b=30),
        legend=dict(bgcolor="rgba(0,0,0,0)")
    )
    custom_figs.append(fig_sma.to_dict())

    # --- RSI chart ---
    rsi_full = df_feat['rsi_14']
    fig_rsi = go.Figure()
    fig_rsi.add_trace(go.Scatter(
        x=dates_full, y=_to_list_or_none(rsi_full),
        name='RSI 14', line=dict(color='#a78bfa', width=1.2)
    ))
    fig_rsi.add_hline(y=70, line_dash='dash', line_color='#ef4444', annotation_text='Overbought 70')
    fig_rsi.add_hline(y=30, line_dash='dash', line_color='#22c55e', annotation_text='Oversold 30')
    fig_rsi.add_hline(y=50, line_dash='dot', line_color='#6b7280')
    fig_rsi.update_layout(
        title='RSI 14',
        paper_bgcolor="#131722", plot_bgcolor="#131722",
        font_color="#d1d4dc",
        margin=dict(l=40, r=20, t=30, b=30),
        legend=dict(bgcolor="rgba(0,0,0,0)"),
        yaxis=dict(range=[0, 100])
    )
    custom_figs.append(fig_rsi.to_dict())

    # ------------------------------------------------------------------
    # Register model
    # ------------------------------------------------------------------
    if register_model is not None:
        register_model(model)

    # ------------------------------------------------------------------
    # Build return dict
    # ------------------------------------------------------------------
    def _clean(lst):
        out = []
        for v in lst:
            if v is None:
                out.append(None)
            elif isinstance(v, float) and (np.isnan(v) or np.isinf(v)):
                out.append(None)
            else:
                out.append(v)
        return out

    return {
        "ohlc": {
            "dates":  [str(d) for d in close_full.index],
            "open":   _to_list_or_none(open_full),
            "high":   _to_list_or_none(high_full),
            "low":    _to_list_or_none(low_full),
            "close":  _to_list_or_none(close_full),
        },
        "signals": {
            "dates":  [str(d) for d in signal_full.index],
            "values": [float(v) for v in signal_full.values],
        },
        "bb": {
            "upper": _to_list_or_none(bb_upper),
            "mid":   _to_list_or_none(bb_mid),
            "lower": _to_list_or_none(bb_lower),
        },
        "ma": {
            "ma50":  _to_list_or_none(ma50),
            "ma100": _to_list_or_none(ma100),
            "ma200": _to_list_or_none(ma200),
        },
        "equity": {
            "dates":    dates_full,
            "strategy": _clean(equity_strategy_arr),
            "bh":       _clean(equity_bh_arr),
        },
        "feature_importance": {
            "names":  fi_names,
            "values": fi_values,
        },
        "conf_matrix": cm,
        "conf_hist": {
            "p_pos": p_pos_test.tolist(),
            "p_neg": p_neg_test.tolist(),
        },
        "rolling_acc": {
            "dates":  rolling_acc_dates,
            "values": rolling_acc_values,
        },
        "drawdown": {
            "dates":  dates_full,
            "values": _clean(dd_values),
        },
        "ret_dist":       ret_dist,
        "ret_dist_long":  ret_dist_long,
        "ret_dist_short": ret_dist_short,
        "metrics": {
            "total_ret":    float(total_ret),
            "bh_ret":       float(bh_ret),
            "sharpe_strat": float(sharpe_strat),
            "sharpe_bh":    float(sharpe_bh),
            "mdd":          float(mdd),
            "n_trades":     int(n_trades),
        },
        "split_dt":    split_dt,
        "split_idx":   int(split_idx),
        "n_train":     int(split_idx),
        "n_test":      int(n - split_idx),
        "feature_cols": feature_cols,
        "custom_figs":  custom_figs,
    }
M
ggg
@malco · 2026-04-03
+12.79%
Return
3.72
Sharpe
0.9%
Max DD
56
Trades
</> View Code
# ╔══════════════════════════════════════════════════════════════╗
# ║                  STRATEGY REQUEST LOG                       ║
# ╚══════════════════════════════════════════════════════════════╝
# Generated        : 2026-04-03 08:22:31
# Model            : Random Forest
# Feature Eng.     : SMA (20, 50, 200), RSI 14
# Signal / Entry   : —
# Optimization     : —
# Risk Mgmt        : —
# Risk Filter      : —
# ══════════════════════════════════════════════════════════════

# ============================================================
# SECTION 0 — IMPORTS & CONSTANTS
# ============================================================

import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix
import plotly.graph_objects as go
from model_wrapper import ModelWrapper

DATA_PATH = "/Users/malco/Desktop/QuantifyMe/data/EURUSD_ticks.csv"
START_DATE = "2026-02-24 00:00:00"
END_DATE   = "2026-03-26 00:00:00"
VALIDATION_DATE = ""
TRAIN_SPLIT = 0.6820973075106282
STARTING_CAPITAL = 10_000
COST_PER_TRADE = 2e-5


# ============================================================
# SECTION 2 — FEATURE ENGINEERING
# ============================================================

def feature_engineering(df, close, open_, high, low):
    df = df.copy()

    # SMA 20, 50, 200
    df["sma_20"]  = close.rolling(20).mean()
    df["sma_50"]  = close.rolling(50).mean()
    df["sma_200"] = close.rolling(200).mean()

    # Price relative to SMAs
    df["price_vs_sma20"]  = close / df["sma_20"] - 1
    df["price_vs_sma50"]  = close / df["sma_50"] - 1
    df["price_vs_sma200"] = close / df["sma_200"] - 1

    # SMA crossovers
    df["sma20_vs_sma50"]  = df["sma_20"] / df["sma_50"] - 1
    df["sma50_vs_sma200"] = df["sma_50"] / df["sma_200"] - 1

    # RSI 14
    delta = close.diff()
    gain  = delta.clip(lower=0)
    loss  = -delta.clip(upper=0)
    avg_gain = gain.ewm(com=13, adjust=False).mean()
    avg_loss = loss.ewm(com=13, adjust=False).mean()
    rs = avg_gain / avg_loss.replace(0, np.nan)
    df["rsi_14"] = 100 - (100 / (1 + rs))

    # Returns
    df["ret_1"]  = close.pct_change(1)
    df["ret_4"]  = close.pct_change(4)
    df["ret_12"] = close.pct_change(12)

    # Volatility
    df["volatility_20"] = close.pct_change().rolling(20).std()

    # High-Low range
    df["hl_range"] = (high - low) / close

    return df


# ============================================================
# SECTION 3 — ML MODEL
# ============================================================

def build_model(X_train, y_train):
    cfg = optimization_config()
    clf = RandomForestClassifier(
        n_estimators  = cfg["n_estimators"],
        max_depth      = cfg["max_depth"],
        min_samples_leaf = cfg["min_samples_leaf"],
        class_weight   = cfg["class_weight"],
        random_state   = 42,
        n_jobs         = -1,
    )
    clf.fit(X_train, y_train)

    le = LabelEncoder()
    le.fit([-1, 0, 1])
    original_classes = le.classes_

    wrapper = ModelWrapper(clf, original_classes=original_classes, n_features=X_train.shape[1])
    return wrapper


# ============================================================
# SECTION 4 — SIGNAL / ENTRY LOGIC
# ============================================================

def generate_signals(model, X, thresh):
    proba = model.predict_proba(X)
    classes = list(model.classes_)

    if 1 in classes:
        idx_pos = classes.index(1)
        p_pos = proba[:, idx_pos]
    else:
        p_pos = np.zeros(len(X))

    if -1 in classes:
        idx_neg = classes.index(-1)
        p_neg = proba[:, idx_neg]
    else:
        p_neg = np.zeros(len(X))

    signal = pd.Series(0.0, index=X.index)
    signal[p_pos >= thresh] = 1.0
    signal[p_neg >= thresh] = -1.0

    # Where both exceed threshold, pick the higher probability
    both = (p_pos >= thresh) & (p_neg >= thresh)
    if both.any():
        signal[both] = np.where(p_pos[both] >= p_neg[both], 1.0, -1.0)

    return signal, p_pos, p_neg


# ============================================================
# SECTION 5 — OPTIMISATION TARGET
# ============================================================

def optimization_config():
    return {
        "objective":       "Maximize Sharpe ratio",
        "notes":           "Balanced class weights to handle directional imbalance; conservative depth to reduce overfitting; more estimators for stability.",
        "n_estimators":    300,
        "max_depth":       6,
        "min_samples_leaf": 20,
        "class_weight":    "balanced",
        "learning_rate":   None,
    }


# ============================================================
# SECTION 6 — RISK MANAGEMENT
# ============================================================

def apply_risk(signal, close, pos_size=1.0):
    return signal * pos_size


# ============================================================
# SECTION 7 — BACKTEST ENGINE
# ============================================================

def train_and_backtest():
    # ── Load & resample ──────────────────────────────────────
    df_raw = pd.read_csv(DATA_PATH, parse_dates=["Time"])
    df_raw = df_raw.sort_values("Time").set_index("Time")
    df_raw["mid"] = (df_raw["Bid"] + df_raw["Ask"]) / 2

    ohlc = df_raw["mid"].resample("15min").ohlc()
    ohlc = ohlc.dropna()

    if START_DATE:
        ohlc = ohlc[ohlc.index >= START_DATE]
    if END_DATE:
        ohlc = ohlc[ohlc.index <= END_DATE]

    close  = ohlc["close"]
    open_  = ohlc["open"]
    high   = ohlc["high"]
    low    = ohlc["low"]

    # ── Feature engineering ───────────────────────────────────
    df = pd.DataFrame(index=ohlc.index)
    df = feature_engineering(df, close, open_, high, low)

    # ── Target ───────────────────────────────────────────────
    target = np.sign(close.shift(-4) - close)
    mask   = target.notna()
    df     = df[mask]
    target = target[mask]

    close  = close[mask]
    open_  = open_[mask]
    high   = high[mask]
    low    = low[mask]

    # ── Drop NaN rows from features ───────────────────────────
    feat_mask = df.notna().all(axis=1)
    df     = df[feat_mask]
    target = target[feat_mask]
    close  = close[feat_mask]
    open_  = open_[feat_mask]
    high   = high[feat_mask]
    low    = low[feat_mask]

    feature_cols = list(df.columns)

    # ── Overlays on full dataset ──────────────────────────────
    bb_mid   = close.rolling(20).mean()
    bb_std   = close.rolling(20).std()
    bb_upper = bb_mid + 2 * bb_std
    bb_lower = bb_mid - 2 * bb_std

    ma50  = close.rolling(50).mean()
    ma100 = close.rolling(100).mean()
    ma200 = close.rolling(200).mean()

    # ── Train/test split ──────────────────────────────────────
    if VALIDATION_DATE:
        split_idx = len(df[df.index <= VALIDATION_DATE])
    else:
        split_idx = int(len(df) * TRAIN_SPLIT)

    split_idx = max(1, min(split_idx, len(df) - 1))

    X_train = df.iloc[:split_idx]
    X_test  = df.iloc[split_idx:]
    y_train = target.iloc[:split_idx]
    y_test  = target.iloc[split_idx:]
    close_train = close.iloc[:split_idx]
    close_test  = close.iloc[split_idx:]

    split_dt = str(df.index[split_idx])

    # ── Label encoding ────────────────────────────────────────
    enc = LabelEncoder()
    enc.fit([-1, 0, 1])
    y_train_enc = enc.transform(y_train)
    y_test_enc  = enc.transform(y_test)

    # ── Build model ───────────────────────────────────────────
    model = build_model(X_train, y_train_enc)

    # ── Generate signals on full dataset ─────────────────────
    thresh = 0.55
    signal_train, p_pos_train, p_neg_train = generate_signals(model, X_train, thresh)
    signal_test,  p_pos_test,  p_neg_test  = generate_signals(model, X_test,  thresh)

    signal_train = apply_risk(signal_train, close_train)
    signal_test  = apply_risk(signal_test,  close_test)

    signal_full = pd.concat([signal_train, signal_test])

    # ── Confusion matrix ──────────────────────────────────────
    pred_test = model.predict(X_test)
    cm = confusion_matrix(y_test, pred_test, labels=[-1, 0, 1])
    conf_matrix_list = cm.tolist()

    # ── Rolling accuracy (test period, 30-bar window) ─────────
    active_mask = signal_test != 0
    correct = (pred_test == np.asarray(y_test)).astype(float)
    correct_series = pd.Series(correct, index=X_test.index)
    active_series  = pd.Series(active_mask.values, index=X_test.index)

    rolling_correct = correct_series[active_mask].rolling(30).mean()
    roll_acc = pd.Series(np.nan, index=X_test.index)
    roll_acc[active_mask] = rolling_correct

    # ── Equity curve ──────────────────────────────────────────
    full_close  = close
    full_signal = signal_full

    rets = full_close.pct_change().fillna(0)

    # Position is held until signal changes; 0 means hold current
    position = full_signal.replace(0, np.nan).ffill().fillna(0)

    strategy_rets = position.shift(1).fillna(0) * rets

    # Apply transaction costs
    trade_changes = position.diff().abs()
    strategy_rets = strategy_rets - trade_changes * COST_PER_TRADE

    equity_strategy = STARTING_CAPITAL * (1 + strategy_rets).cumprod()
    equity_bh       = STARTING_CAPITAL * (1 + rets).cumprod()

    # ── Trade-level metrics (direction flips only) ────────────
    position_arr = position.values
    signal_arr   = full_signal.values
    close_arr    = full_close.values
    dates_arr    = full_close.index

    last_dir   = None
    entry_price = None
    entry_idx   = None
    trades      = []
    long_trades  = []
    short_trades = []

    for i in range(len(signal_arr)):
        sig = signal_arr[i]
        if sig == 0:
            continue
        if sig != last_dir:
            if last_dir is not None and entry_price is not None:
                raw_ret = last_dir * (close_arr[i] - entry_price) / entry_price
                raw_ret -= COST_PER_TRADE
                trades.append(raw_ret)
                if last_dir == 1:
                    long_trades.append(raw_ret)
                else:
                    short_trades.append(raw_ret)
            last_dir    = sig
            entry_price = close_arr[i]
            entry_idx   = i

    # Close last open trade
    if last_dir is not None and entry_price is not None:
        raw_ret = last_dir * (close_arr[-1] - entry_price) / entry_price
        raw_ret -= COST_PER_TRADE
        trades.append(raw_ret)
        if last_dir == 1:
            long_trades.append(raw_ret)
        else:
            short_trades.append(raw_ret)

    n_trades = len(trades)

    # ── Metrics ───────────────────────────────────────────────
    total_ret = float((equity_strategy.iloc[-1] / STARTING_CAPITAL) - 1)
    bh_ret    = float((equity_bh.iloc[-1]       / STARTING_CAPITAL) - 1)

    test_strat_rets = strategy_rets.iloc[split_idx:]
    if test_strat_rets.std() == 0 or test_strat_rets.empty:
        sharpe_strat = 0.0
    else:
        sharpe_strat = float(test_strat_rets.mean() / test_strat_rets.std() * np.sqrt(252 * 24 * 4))

    test_bh_rets = rets.iloc[split_idx:]
    if test_bh_rets.std() == 0 or test_bh_rets.empty:
        sharpe_bh = 0.0
    else:
        sharpe_bh = float(test_bh_rets.mean() / test_bh_rets.std() * np.sqrt(252 * 24 * 4))

    rolling_max = equity_strategy.cummax()
    drawdown    = (equity_strategy - rolling_max) / rolling_max
    mdd         = float(drawdown.min())

    # ── Feature importance ────────────────────────────────────
    importances = model.feature_importances_
    fi_pairs = sorted(zip(feature_cols, importances), key=lambda x: x[1])
    fi_pairs = fi_pairs[-15:]
    fi_names  = [p[0] for p in fi_pairs]
    fi_values = [float(p[1]) for p in fi_pairs]

    # ── Helper to sanitize lists ──────────────────────────────
    def clean(lst):
        return [None if (v is None or (isinstance(v, float) and (np.isnan(v) or np.isinf(v)))) else float(v) for v in lst]

    def clean_int(lst):
        return [None if (v is None or (isinstance(v, float) and (np.isnan(v) or np.isinf(v)))) else int(v) for v in lst]

    dates_str      = [str(d) for d in full_close.index]
    signal_dates   = [str(d) for d in full_signal.index]
    equity_dates   = [str(d) for d in equity_strategy.index]
    drawdown_dates = [str(d) for d in drawdown.index]
    roll_acc_dates = [str(d) for d in roll_acc.index]
    test_dates     = [str(d) for d in X_test.index]

    # ============================================================
    # SECTION 8 — CUSTOM FIGURES
    # ============================================================

    custom_figs = []

    dark = dict(
        paper_bgcolor="#131722",
        plot_bgcolor="#131722",
        font_color="#d1d4dc",
        margin=dict(l=40, r=20, t=30, b=30),
        legend=dict(bgcolor="rgba(0,0,0,0)"),
    )

    # — SMA Chart ——————————————————————————————————————————————
    fig_sma = go.Figure()
    fig_sma.add_trace(go.Scatter(
        x=dates_str, y=clean(close.tolist()),
        name="Close", line=dict(color="#d1d4dc", width=1)
    ))
    fig_sma.add_trace(go.Scatter(
        x=dates_str, y=clean(df["sma_20"].tolist()),
        name="SMA 20", line=dict(color="#2196F3", width=1.2)
    ))
    fig_sma.add_trace(go.Scatter(
        x=dates_str, y=clean(df["sma_50"].tolist()),
        name="SMA 50", line=dict(color="#FF9800", width=1.2)
    ))
    fig_sma.add_trace(go.Scatter(
        x=dates_str, y=clean(df["sma_200"].tolist()),
        name="SMA 200", line=dict(color="#E91E63", width=1.5)
    ))
    fig_sma.update_layout(title="SMA (20, 50, 200)", **dark)
    custom_figs.append(fig_sma.to_dict())

    # — RSI Chart ——————————————————————————————————————————————
    fig_rsi = go.Figure()
    fig_rsi.add_trace(go.Scatter(
        x=dates_str, y=clean(df["rsi_14"].tolist()),
        name="RSI 14", line=dict(color="#00BCD4", width=1.5)
    ))
    fig_rsi.add_hline(y=70, line_color="#E91E63", line_dash="dash", annotation_text="70")
    fig_rsi.add_hline(y=30, line_color="#4CAF50", line_dash="dash", annotation_text="30")
    fig_rsi.add_hline(y=50, line_color="#888888", line_dash="dot")
    fig_rsi.update_layout(title="RSI 14", yaxis=dict(range=[0, 100]), **dark)
    custom_figs.append(fig_rsi.to_dict())

    # ── Register model ────────────────────────────────────────
    if register_model is not None:
        register_model(model)

    return {
        "ohlc": {
            "dates": dates_str,
            "open":  clean(open_.tolist()),
            "high":  clean(high.tolist()),
            "low":   clean(low.tolist()),
            "close": clean(close.tolist()),
        },
        "signals": {
            "dates":  signal_dates,
            "values": clean(signal_full.tolist()),
        },
        "bb": {
            "upper": clean(bb_upper.tolist()),
            "mid":   clean(bb_mid.tolist()),
            "lower": clean(bb_lower.tolist()),
        },
        "ma": {
            "ma50":  clean(ma50.tolist()),
            "ma100": clean(ma100.tolist()),
            "ma200": clean(ma200.tolist()),
        },
        "equity": {
            "dates":    equity_dates,
            "strategy": clean(equity_strategy.tolist()),
            "bh":       clean(equity_bh.tolist()),
        },
        "feature_importance": {
            "names":  fi_names,
            "values": fi_values,
        },
        "conf_matrix": conf_matrix_list,
        "conf_hist": {
            "p_pos": clean(p_pos_test.tolist()),
            "p_neg": clean(p_neg_test.tolist()),
        },
        "rolling_acc": {
            "dates":  roll_acc_dates,
            "values": clean(roll_acc.tolist()),
        },
        "drawdown": {
            "dates":  drawdown_dates,
            "values": clean(drawdown.tolist()),
        },
        "ret_dist":       clean(trades),
        "ret_dist_long":  clean(long_trades),
        "ret_dist_short": clean(short_trades),
        "metrics": {
            "total_ret":    total_ret,
            "bh_ret":       bh_ret,
            "sharpe_strat": sharpe_strat,
            "sharpe_bh":    sharpe_bh,
            "mdd":          mdd,
            "n_trades":     n_trades,
        },
        "split_dt":    split_dt,
        "split_idx":   split_idx,
        "n_train":     len(X_train),
        "n_test":      len(X_test),
        "feature_cols": feature_cols,
        "custom_figs": custom_figs,
    }
M
g
@malco · 2026-04-02
+2.98%
Return
11.42
Sharpe
0.8%
Max DD
15
Trades
</> View Code
# ╔══════════════════════════════════════════════════════════════╗
# ║                  STRATEGY REQUEST LOG                       ║
# ╚══════════════════════════════════════════════════════════════╝
# Generated        : 2026-04-02 17:56:59
# Model            : Random Forest
# Feature Eng.     : SMA (20, 50, 200), RSI 14
# Signal / Entry   : —
# Optimization     : —
# Risk Mgmt        : —
# Risk Filter      : —
# ══════════════════════════════════════════════════════════════

# ============================================================
# SECTION 0 — IMPORTS & CONSTANTS
# ============================================================

import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix
import plotly.graph_objects as go
from model_wrapper import ModelWrapper

DATA_PATH = "/Users/malco/Desktop/QuantifyMe/data/EURUSD_ticks.csv"
START_DATE = "2026-02-24 08:00:00"
END_DATE   = "2026-03-26 00:00:00"
VALIDATION_DATE = ""
TRAIN_SPLIT = 0.6820973075106282
STARTING_CAPITAL = 10_000
ROUND_TRIP_COST = 2e-5


# ============================================================
# SECTION 2 — FEATURE ENGINEERING
# ============================================================

def feature_engineering(df, close, open_, high, low):
    # SMA 20, 50, 200
    df['sma_20'] = close.rolling(20).mean()
    df['sma_50'] = close.rolling(50).mean()
    df['sma_200'] = close.rolling(200).mean()

    # Price relative to SMAs
    df['close_vs_sma20'] = close / df['sma_20'] - 1.0
    df['close_vs_sma50'] = close / df['sma_50'] - 1.0
    df['close_vs_sma200'] = close / df['sma_200'] - 1.0

    # SMA crossover features
    df['sma20_vs_sma50'] = df['sma_20'] / df['sma_50'] - 1.0
    df['sma50_vs_sma200'] = df['sma_50'] / df['sma_200'] - 1.0

    # RSI 14
    delta = close.diff()
    gain = delta.clip(lower=0)
    loss = -delta.clip(upper=0)
    avg_gain = gain.rolling(14).mean()
    avg_loss = loss.rolling(14).mean()
    rs = avg_gain / avg_loss.replace(0, np.nan)
    df['rsi_14'] = 100.0 - (100.0 / (1.0 + rs))

    # Additional useful features
    df['log_return'] = np.log(close / close.shift(1))
    df['hl_spread'] = (high - low) / close
    df['close_vs_open'] = (close - open_) / open_

    return df


# ============================================================
# SECTION 3 — ML MODEL
# ============================================================

def build_model(X_train, y_train):
    cfg = optimization_config()
    clf = RandomForestClassifier(
        n_estimators=cfg['n_estimators'],
        max_depth=cfg['max_depth'],
        min_samples_leaf=cfg['min_samples_leaf'],
        class_weight=cfg['class_weight'],
        random_state=42,
        n_jobs=-1
    )
    clf.fit(X_train, y_train)

    le = LabelEncoder()
    le.fit([-1, 0, 1])
    original_classes = le.classes_

    return ModelWrapper(clf, original_classes=original_classes, n_features=X_train.shape[1])


# ============================================================
# SECTION 4 — SIGNAL / ENTRY LOGIC
# ============================================================

def generate_signals(model, X, thresh):
    proba = model.predict_proba(X)
    classes = list(model.classes_)

    if 1 in classes:
        idx_pos = classes.index(1)
        p_pos = proba[:, idx_pos]
    else:
        p_pos = np.zeros(len(X))

    if -1 in classes:
        idx_neg = classes.index(-1)
        p_neg = proba[:, idx_neg]
    else:
        p_neg = np.zeros(len(X))

    signal_values = np.zeros(len(X))
    signal_values[p_pos > thresh] = 1.0
    signal_values[p_neg > thresh] = -1.0
    # If both exceed thresh, pick the one with higher probability
    both = (p_pos > thresh) & (p_neg > thresh)
    signal_values[both] = np.where(p_pos[both] >= p_neg[both], 1.0, -1.0)

    signal = pd.Series(signal_values, index=X.index)
    return signal, p_pos, p_neg


# ============================================================
# SECTION 5 — OPTIMISATION TARGET
# ============================================================

def optimization_config():
    return {
        "objective": "Maximize Sharpe ratio",
        "notes": (
            "Random Forest tuned for Sharpe: balanced class weights to handle "
            "class imbalance, conservative depth to avoid overfitting, "
            "more estimators for stable probability estimates."
        ),
        "n_estimators": 300,
        "max_depth": 6,
        "min_samples_leaf": 20,
        "class_weight": "balanced",
    }


# ============================================================
# SECTION 6 — RISK MANAGEMENT
# ============================================================

def apply_risk(signal, close, pos_size=1.0):
    return signal * pos_size


# ============================================================
# SECTION 7 — BACKTEST ENGINE
# ============================================================

def train_and_backtest():
    # ── Load & resample ──────────────────────────────────────
    raw = pd.read_csv(DATA_PATH, parse_dates=['Time'])
    raw = raw.sort_values('Time').set_index('Time')
    raw['mid'] = (raw['Bid'] + raw['Ask']) / 2.0

    ohlc_full = raw['mid'].resample('15min').ohlc()
    ohlc_full = ohlc_full.dropna()

    if START_DATE:
        ohlc_full = ohlc_full[ohlc_full.index >= START_DATE]
    if END_DATE:
        ohlc_full = ohlc_full[ohlc_full.index <= END_DATE]

    close  = ohlc_full['close']
    open_  = ohlc_full['open']
    high   = ohlc_full['high']
    low    = ohlc_full['low']

    # ── Feature engineering ──────────────────────────────────
    df = ohlc_full.copy()
    df = feature_engineering(df, close, open_, high, low)

    # ── Target ───────────────────────────────────────────────
    target = np.sign(close.shift(-4) - close)
    mask = target.notna()
    df     = df[mask]
    target = target[mask]

    # Re-extract aligned series after mask
    close_aligned  = df['close']
    open_aligned   = df['open']
    high_aligned   = df['high']
    low_aligned    = df['low']

    # ── Feature columns ──────────────────────────────────────
    feature_cols = [
        'sma_20', 'sma_50', 'sma_200',
        'close_vs_sma20', 'close_vs_sma50', 'close_vs_sma200',
        'sma20_vs_sma50', 'sma50_vs_sma200',
        'rsi_14',
        'log_return', 'hl_spread', 'close_vs_open'
    ]

    df_features = df[feature_cols].copy()
    df_features = df_features.bfill().ffill().dropna()
    target = target.loc[df_features.index]
    close_aligned = close_aligned.loc[df_features.index]

    # ── Train/test split ─────────────────────────────────────
    if VALIDATION_DATE:
        split_idx = len(df_features[df_features.index <= VALIDATION_DATE])
    else:
        split_idx = int(len(df_features) * TRAIN_SPLIT)

    X_train = df_features.iloc[:split_idx]
    X_test  = df_features.iloc[split_idx:]
    y_train = target.iloc[:split_idx]
    y_test  = target.iloc[split_idx:]
    close_train = close_aligned.iloc[:split_idx]
    close_test  = close_aligned.iloc[split_idx:]

    split_dt = str(df_features.index[split_idx]) if split_idx < len(df_features) else str(df_features.index[-1])

    # ── Label encoding ───────────────────────────────────────
    enc = LabelEncoder()
    enc.fit([-1, 0, 1])
    y_train_enc = enc.transform(y_train)
    y_test_enc  = enc.transform(y_test)

    # ── Build model ──────────────────────────────────────────
    model = build_model(X_train, y_train_enc)

    # ── Generate signals (train + test) ──────────────────────
    thresh = 0.55
    signal_train, p_pos_train, p_neg_train = generate_signals(model, X_train, thresh)
    signal_test,  p_pos_test,  p_neg_test  = generate_signals(model, X_test,  thresh)

    signal_train = apply_risk(signal_train, close_train)
    signal_test  = apply_risk(signal_test,  close_test)

    signal_full = pd.concat([signal_train, signal_test])

    # ── Confusion matrix (test only) ─────────────────────────
    pred_test = model.predict(X_test)
    y_test_arr = np.asarray(y_test)
    cm = confusion_matrix(y_test_arr, pred_test, labels=[-1, 0, 1])
    conf_matrix = cm.tolist()

    # ── Equity curve & metrics (test period only) ────────────
    returns_test = close_test.pct_change().fillna(0.0)

    # Build trade returns by tracking direction flips
    last_dir = None
    entry_price = None
    trades = []
    long_trades = []
    short_trades = []
    equity = STARTING_CAPITAL
    equity_curve = []
    strategy_rets = []

    sig_arr   = signal_test.values
    close_arr = close_test.values
    ret_arr   = returns_test.values

    for i in range(len(sig_arr)):
        s = sig_arr[i]
        bar_ret = 0.0

        if last_dir is not None and last_dir != 0.0:
            bar_ret = last_dir * ret_arr[i]

        # Check for direction flip
        if s != 0.0 and s != last_dir:
            if last_dir is not None and last_dir != 0.0 and entry_price is not None:
                # Close previous trade
                trade_ret = last_dir * (close_arr[i] / entry_price - 1.0) - ROUND_TRIP_COST
                trades.append(trade_ret)
                if last_dir > 0:
                    long_trades.append(trade_ret)
                else:
                    short_trades.append(trade_ret)
            entry_price = close_arr[i]
            last_dir = s

        strategy_rets.append(bar_ret)
        equity_curve.append(equity * (1.0 + bar_ret))
        equity = equity_curve[-1]

    # Close final open trade
    if last_dir is not None and last_dir != 0.0 and entry_price is not None and len(close_arr) > 0:
        trade_ret = last_dir * (close_arr[-1] / entry_price - 1.0) - ROUND_TRIP_COST
        trades.append(trade_ret)
        if last_dir > 0:
            long_trades.append(trade_ret)
        else:
            short_trades.append(trade_ret)

    # Prepend starting capital
    equity_vals = [STARTING_CAPITAL] + equity_curve
    equity_dates_full = [str(close_test.index[0])] + [str(d) for d in close_test.index]

    # Buy-and-hold equity
    bh_rets = returns_test.values
    bh_equity = [STARTING_CAPITAL]
    for r in bh_rets:
        bh_equity.append(bh_equity[-1] * (1.0 + r))

    # Metrics
    strategy_rets_arr = np.array(strategy_rets)
    total_ret = (equity_vals[-1] - STARTING_CAPITAL) / STARTING_CAPITAL
    bh_ret    = (bh_equity[-1]   - STARTING_CAPITAL) / STARTING_CAPITAL

    # Sharpe (annualised, 15-min bars → ~26,280 bars/year)
    bars_per_year = 26280.0
    if len(strategy_rets_arr) > 1 and strategy_rets_arr.std() > 0:
        sharpe_strat = float(np.sqrt(bars_per_year) * strategy_rets_arr.mean() / strategy_rets_arr.std())
    else:
        sharpe_strat = 0.0

    bh_rets_arr = np.array(bh_rets)
    if len(bh_rets_arr) > 1 and bh_rets_arr.std() > 0:
        sharpe_bh = float(np.sqrt(bars_per_year) * bh_rets_arr.mean() / bh_rets_arr.std())
    else:
        sharpe_bh = 0.0

    # Max drawdown
    eq_arr = np.array(equity_vals)
    running_max = np.maximum.accumulate(eq_arr)
    dd_arr = (eq_arr - running_max) / running_max
    mdd = float(dd_arr.min())

    n_trades = len(trades)

    # ── Rolling accuracy (test, 30-bar window, non-flat only) ─
    active_mask = signal_test.values != 0.0
    correct = (signal_test.values == y_test_arr).astype(float)
    correct_series = pd.Series(correct, index=signal_test.index)
    active_series  = pd.Series(active_mask.astype(float), index=signal_test.index)

    roll_correct = correct_series.where(active_series == 1).rolling(30, min_periods=1).mean()
    rolling_acc_vals = []
    for v in roll_correct.values:
        if np.isnan(v) or np.isinf(v):
            rolling_acc_vals.append(None)
        else:
            rolling_acc_vals.append(float(v))

    # ── Bollinger Bands & MAs (full dataset) ─────────────────
    close_full = close_aligned  # full aligned close
    bb_mid   = close_full.rolling(20).mean()
    bb_std   = close_full.rolling(20).std()
    bb_upper = bb_mid + 2.0 * bb_std
    bb_lower = bb_mid - 2.0 * bb_std
    ma50  = close_full.rolling(50).mean()
    ma100 = close_full.rolling(100).mean()
    ma200 = close_full.rolling(200).mean()

    def _series_to_list(s):
        out = []
        for v in s.values:
            if v is None or (isinstance(v, float) and (np.isnan(v) or np.isinf(v))):
                out.append(None)
            else:
                out.append(float(v))
        return out

    full_dates = [str(d) for d in close_full.index]
    ohlc_open  = _series_to_list(open_aligned)
    ohlc_high  = _series_to_list(high_aligned.loc[close_full.index] if hasattr(high_aligned, 'loc') else high_aligned)
    ohlc_low   = _series_to_list(low_aligned.loc[close_full.index] if hasattr(low_aligned, 'loc') else low_aligned)
    ohlc_close = _series_to_list(close_full)

    # ── Feature importance ───────────────────────────────────
    fi = model.feature_importances_
    fi_pairs = sorted(zip(feature_cols, fi), key=lambda x: x[1])
    fi_pairs = fi_pairs[-15:]  # top 15 ascending
    fi_names  = [p[0] for p in fi_pairs]
    fi_values = [float(p[1]) for p in fi_pairs]

    # ── OHLC aligned to full dataset ─────────────────────────
    ohlc_high_full = high_aligned
    ohlc_low_full  = low_aligned

    # ── Drawdown series (test period, padded to full) ─────────
    dd_full = np.full(len(close_full), 0.0)
    test_start_loc = split_idx
    # dd_arr has len = len(equity_vals) = len(close_test)+1
    # align to test window
    if len(dd_arr) - 1 == len(close_test):
        dd_full[test_start_loc:test_start_loc + len(close_test)] = dd_arr[1:]

    drawdown_vals = []
    for v in dd_full:
        if np.isnan(v) or np.isinf(v):
            drawdown_vals.append(None)
        else:
            drawdown_vals.append(float(v))

    # ── Equity dates — full dataset aligned ──────────────────
    # For equity, fill train period with flat capital, test period with curve
    eq_full_strategy = [float(STARTING_CAPITAL)] * len(close_full)
    eq_full_bh       = [float(STARTING_CAPITAL)] * len(close_full)

    # BH for full period
    close_full_arr = close_full.values
    for i in range(1, len(close_full_arr)):
        r = (close_full_arr[i] - close_full_arr[i-1]) / close_full_arr[i-1] if close_full_arr[i-1] != 0 else 0.0
        eq_full_bh[i] = eq_full_bh[i-1] * (1.0 + r)

    # Strategy equity: flat in train, then use computed curve for test
    if len(equity_curve) > 0:
        for i, idx_loc in enumerate(range(test_start_loc, min(test_start_loc + len(equity_curve), len(close_full)))):
            eq_full_strategy[idx_loc] = float(equity_curve[i])
        # Forward fill remaining if any
        last_val = eq_full_strategy[test_start_loc + len(equity_curve) - 1] if len(equity_curve) > 0 else STARTING_CAPITAL
        for idx_loc in range(test_start_loc + len(equity_curve), len(close_full)):
            eq_full_strategy[idx_loc] = last_val

    # ── Signals full ─────────────────────────────────────────
    signal_full_vals = []
    for v in signal_full.values:
        if np.isnan(v) or np.isinf(v):
            signal_full_vals.append(0.0)
        else:
            signal_full_vals.append(float(v))

    # ── conf_hist ────────────────────────────────────────────
    p_pos_list = [float(v) for v in p_pos_test.tolist()]
    p_neg_list = [float(v) for v in p_neg_test.tolist()]

    # ── ret_dist ─────────────────────────────────────────────
    ret_dist       = [float(v) for v in trades]
    ret_dist_long  = [float(v) for v in long_trades]
    ret_dist_short = [float(v) for v in short_trades]

    # ── SECTION 8 — CUSTOM FIGURES ───────────────────────────
    custom_figs = []

    # Figure 1: SMA overlay (20, 50, 200) on price
    fig_sma = go.Figure()
    fig_sma.add_trace(go.Scatter(
        x=full_dates, y=ohlc_close,
        mode='lines', name='Close',
        line=dict(color='#d1d4dc', width=1)
    ))
    fig_sma.add_trace(go.Scatter(
        x=full_dates, y=_series_to_list(ma50 if 'ma50' in dir() else close_full.rolling(20).mean()),
        mode='lines', name='SMA 20',
        line=dict(color='#f7c948', width=1.5)
    ))
    sma20_full = close_full.rolling(20).mean()
    sma50_full = close_full.rolling(50).mean()
    sma200_full = close_full.rolling(200).mean()
    fig_sma.add_trace(go.Scatter(
        x=full_dates, y=_series_to_list(sma20_full),
        mode='lines', name='SMA 20',
        line=dict(color='#f7c948', width=1.5)
    ))
    fig_sma.add_trace(go.Scatter(
        x=full_dates, y=_series_to_list(sma50_full),
        mode='lines', name='SMA 50',
        line=dict(color='#2196F3', width=1.5)
    ))
    fig_sma.add_trace(go.Scatter(
        x=full_dates, y=_series_to_list(sma200_full),
        mode='lines', name='SMA 200',
        line=dict(color='#E91E63', width=1.5)
    ))
    fig_sma.update_layout(
        title='Price with SMA 20 / 50 / 200',
        paper_bgcolor="#131722", plot_bgcolor="#131722",
        font_color="#d1d4dc",
        margin=dict(l=40, r=20, t=30, b=30),
        legend=dict(bgcolor="rgba(0,0,0,0)"),
        xaxis=dict(gridcolor="#2a2e39"),
        yaxis=dict(gridcolor="#2a2e39")
    )
    # Remove duplicate SMA 20 trace (index 1 was placeholder)
    fig_sma.data = (fig_sma.data[0],) + fig_sma.data[2:]
    custom_figs.append(fig_sma.to_dict())

    # Figure 2: RSI 14
    rsi_full = df_features['rsi_14']
    fig_rsi = go.Figure()
    fig_rsi.add_trace(go.Scatter(
        x=full_dates, y=_series_to_list(rsi_full),
        mode='lines', name='RSI 14',
        line=dict(color='#9c27b0', width=1.5)
    ))
    fig_rsi.add_hline(y=70, line_dash='dash', line_color='#ef5350', annotation_text='Overbought 70')
    fig_rsi.add_hline(y=30, line_dash='dash', line_color='#26a69a', annotation_text='Oversold 30')
    fig_rsi.update_layout(
        title='RSI 14',
        paper_bgcolor="#131722", plot_bgcolor="#131722",
        font_color="#d1d4dc",
        margin=dict(l=40, r=20, t=30, b=30),
        legend=dict(bgcolor="rgba(0,0,0,0)"),
        xaxis=dict(gridcolor="#2a2e39"),
        yaxis=dict(gridcolor="#2a2e39", range=[0, 100])
    )
    custom_figs.append(fig_rsi.to_dict())

    # ── Register model ────────────────────────────────────────
    if register_model is not None:
        register_model(model)

    # ── Build return dict ─────────────────────────────────────
    return {
        "ohlc": {
            "dates":  full_dates,
            "open":   ohlc_open,
            "high":   _series_to_list(ohlc_high_full),
            "low":    _series_to_list(ohlc_low_full),
            "close":  ohlc_close,
        },
        "signals": {
            "dates":  full_dates,
            "values": signal_full_vals,
        },
        "bb": {
            "upper": _series_to_list(bb_upper),
            "mid":   _series_to_list(bb_mid),
            "lower": _series_to_list(bb_lower),
        },
        "ma": {
            "ma50":  _series_to_list(ma50),
            "ma100": _series_to_list(ma100),
            "ma200": _series_to_list(ma200),
        },
        "equity": {
            "dates":    full_dates,
            "strategy": eq_full_strategy,
            "bh":       eq_full_bh,
        },
        "feature_importance": {
            "names":  fi_names,
            "values": fi_values,
        },
        "conf_matrix": conf_matrix,
        "conf_hist": {
            "p_pos": p_pos_list,
            "p_neg": p_neg_list,
        },
        "rolling_acc": {
            "dates":  [str(d) for d in signal_test.index],
            "values": rolling_acc_vals,
        },
        "drawdown": {
            "dates":  full_dates,
            "values": drawdown_vals,
        },
        "ret_dist":       ret_dist,
        "ret_dist_long":  ret_dist_long,
        "ret_dist_short": ret_dist_short,
        "metrics": {
            "total_ret":    float(total_ret),
            "bh_ret":       float(bh_ret),
            "sharpe_strat": float(sharpe_strat),
            "sharpe_bh":    float(sharpe_bh),
            "mdd":          float(mdd),
            "n_trades":     int(n_trades),
        },
        "split_dt":    split_dt,
        "split_idx":   int(split_idx),
        "n_train":     int(len(X_train)),
        "n_test":      int(len(X_test)),
        "feature_cols": feature_cols,
        "custom_figs":  custom_figs,
    }
M
g
@malco · 2026-04-02
+0.37%
Return
2.34
Sharpe
0.5%
Max DD
12
Trades
</> View Code
# ╔══════════════════════════════════════════════════════════════╗
# ║                  STRATEGY REQUEST LOG                       ║
# ╚══════════════════════════════════════════════════════════════╝
# Generated        : 2026-04-02 14:41:40
# Model            : Random Forest
# Feature Eng.     : SMA (20, 50, 200), RSI 14
# Signal / Entry   : —
# Optimization     : —
# Risk Mgmt        : —
# Risk Filter      : —
# ══════════════════════════════════════════════════════════════

# ============================================================
# SECTION 0 — IMPORTS & CONSTANTS
# ============================================================

import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix
import plotly.graph_objects as go
from model_wrapper import ModelWrapper

DATA_PATH  = "/Users/malco/Desktop/QuantifyMe/data/EURUSD_ticks.csv"
START_DATE = "2026-03-03"
END_DATE   = "2026-04-02"
CAPITAL    = 10_000.0
COST_RT    = 2e-5   # round-trip transaction cost per trade

# ============================================================
# SECTION 2 — FEATURE ENGINEERING
# ============================================================

def feature_engineering(df, close, open_, high, low):
    # SMA 20, 50, 200
    df["sma_20"]  = close.rolling(20).mean()
    df["sma_50"]  = close.rolling(50).mean()
    df["sma_200"] = close.rolling(200).mean()

    # Price relative to SMAs
    df["close_sma20_ratio"]  = close / df["sma_20"]  - 1.0
    df["close_sma50_ratio"]  = close / df["sma_50"]  - 1.0
    df["close_sma200_ratio"] = close / df["sma_200"] - 1.0

    # SMA crossover signals
    df["sma20_50_diff"]  = df["sma_20"]  - df["sma_50"]
    df["sma50_200_diff"] = df["sma_50"]  - df["sma_200"]

    # RSI 14
    delta = close.diff()
    gain  = delta.clip(lower=0)
    loss  = (-delta).clip(lower=0)
    avg_gain = gain.ewm(com=13, min_periods=14).mean()
    avg_loss = loss.ewm(com=13, min_periods=14).mean()
    rs = avg_gain / avg_loss.replace(0, np.nan)
    df["rsi_14"] = 100.0 - (100.0 / (1.0 + rs))

    # Returns
    df["ret_1"]  = close.pct_change(1)
    df["ret_4"]  = close.pct_change(4)
    df["ret_8"]  = close.pct_change(8)

    # Volatility
    df["vol_20"] = df["ret_1"].rolling(20).std()

    # High-Low range normalised
    df["hl_range"] = (high - low) / close

    return df

# ============================================================
# SECTION 3 — ML MODEL
# ============================================================

def build_model(X_train, y_train):
    cfg = optimization_config()
    clf = RandomForestClassifier(
        n_estimators  = cfg["n_estimators"],
        max_depth     = cfg["max_depth"],
        min_samples_leaf = cfg["min_samples_leaf"],
        class_weight  = cfg["class_weight"],
        random_state  = 42,
        n_jobs        = -1,
    )
    clf.fit(X_train, y_train)
    le = LabelEncoder()
    le.fit([-1, 0, 1])
    wrapper = ModelWrapper(clf, original_classes=le.classes_, n_features=X_train.shape[1])
    return wrapper

# ============================================================
# SECTION 4 — SIGNAL / ENTRY LOGIC
# ============================================================

def generate_signals(model, X, thresh):
    proba   = model.predict_proba(X)           # shape (n, n_classes)
    classes = list(model.classes_)

    # Extract p_pos and p_neg safely
    if 1 in classes:
        p_pos = proba[:, classes.index(1)]
    else:
        p_pos = np.zeros(len(X))

    if -1 in classes:
        p_neg = proba[:, classes.index(-1)]
    else:
        p_neg = np.zeros(len(X))

    # Assign signal based on threshold
    signal_vals = np.zeros(len(X))
    signal_vals[p_pos >= thresh] =  1.0
    signal_vals[p_neg >= thresh] = -1.0
    # If both exceed threshold, pick the more confident one
    both = (p_pos >= thresh) & (p_neg >= thresh)
    signal_vals[both] = np.where(p_pos[both] >= p_neg[both], 1.0, -1.0)

    signal = pd.Series(signal_vals, index=X.index, dtype=float)
    return signal, p_pos.astype(float), p_neg.astype(float)

# ============================================================
# SECTION 5 — OPTIMISATION TARGET
# ============================================================

def optimization_config():
    return {
        "objective":       "Maximize Sharpe ratio",
        "notes":           "Balanced class weights to avoid majority-class bias; "
                           "conservative depth to reduce overfitting; "
                           "more estimators for stability.",
        "n_estimators":    300,
        "max_depth":       8,
        "min_samples_leaf": 20,
        "class_weight":    "balanced",
    }

# ============================================================
# SECTION 6 — RISK MANAGEMENT
# ============================================================

def apply_risk(signal, close, pos_size=1.0):
    return signal * pos_size

# ============================================================
# SECTION 7 — BACKTEST ENGINE
# ============================================================

def train_and_backtest():
    # ------------------------------------------------------------------
    # Load & prepare data
    # ------------------------------------------------------------------
    raw = pd.read_csv(DATA_PATH, parse_dates=["Time"])
    raw = raw.sort_values("Time").set_index("Time")
    mid = (raw["Bid"] + raw["Ask"]) / 2.0

    ohlc = mid.resample("15min").ohlc()
    ohlc.columns = ["open", "high", "low", "close"]
    ohlc = ohlc.dropna()

    if START_DATE:
        ohlc = ohlc[ohlc.index >= START_DATE]
    if END_DATE:
        ohlc = ohlc[ohlc.index <= END_DATE]

    close  = ohlc["close"]
    open_  = ohlc["open"]
    high   = ohlc["high"]
    low    = ohlc["low"]

    df = ohlc.copy()

    # ------------------------------------------------------------------
    # Feature engineering
    # ------------------------------------------------------------------
    df = feature_engineering(df, close, open_, high, low)

    # ------------------------------------------------------------------
    # Target
    # ------------------------------------------------------------------
    target = np.sign(close.shift(-4) - close)
    mask   = target.notna()
    df     = df[mask]
    target = target[mask]

    close  = df["close"]
    open_  = df["open"]
    high   = df["high"]
    low    = df["low"]

    # ------------------------------------------------------------------
    # Feature columns
    # ------------------------------------------------------------------
    feature_cols = [
        "sma_20", "sma_50", "sma_200",
        "close_sma20_ratio", "close_sma50_ratio", "close_sma200_ratio",
        "sma20_50_diff", "sma50_200_diff",
        "rsi_14",
        "ret_1", "ret_4", "ret_8",
        "vol_20", "hl_range",
    ]

    df_feat = df[feature_cols].copy().bfill().ffill()
    feat_mask = df_feat.notna().all(axis=1)
    df_feat = df_feat[feat_mask]
    target  = target[feat_mask]
    close   = close[feat_mask]
    open_   = open_[feat_mask]
    high    = high[feat_mask]
    low     = low[feat_mask]
    df      = df[feat_mask]

    # ------------------------------------------------------------------
    # Train / test split (70/30 walk-forward)
    # ------------------------------------------------------------------
    n_total   = len(df_feat)
    split_idx = int(n_total * 0.70)

    X_train = df_feat.iloc[:split_idx]
    X_test  = df_feat.iloc[split_idx:]
    y_train = target.iloc[:split_idx]
    y_test  = target.iloc[split_idx:]

    close_test = close.iloc[split_idx:]

    split_dt = str(df_feat.index[split_idx])
    n_train  = split_idx
    n_test   = n_total - split_idx

    # ------------------------------------------------------------------
    # Label encoding
    # ------------------------------------------------------------------
    enc = LabelEncoder()
    enc.fit([-1, 0, 1])
    y_train_enc = enc.transform(y_train)
    y_test_enc  = enc.transform(y_test)

    # ------------------------------------------------------------------
    # Build model
    # ------------------------------------------------------------------
    model = build_model(X_train, y_train_enc)

    # ------------------------------------------------------------------
    # Signals
    # ------------------------------------------------------------------
    THRESH = 0.55
    signal_test, p_pos, p_neg = generate_signals(model, X_test, THRESH)
    signal_test = apply_risk(signal_test, close_test)

    # ------------------------------------------------------------------
    # Confusion matrix
    # ------------------------------------------------------------------
    pred_test = model.predict(X_test)   # already decoded: -1, 0, 1
    labels    = [-1, 0, 1]
    cm = confusion_matrix(y_test, pred_test, labels=labels).tolist()

    # ------------------------------------------------------------------
    # Equity curve & trade metrics
    # ------------------------------------------------------------------
    close_arr  = close_test.values
    signal_arr = signal_test.values
    n_bars     = len(close_arr)

    equity_strategy = [CAPITAL]
    equity_bh       = [CAPITAL]

    ret_all   = []
    ret_long  = []
    ret_short = []

    last_dir      = None
    entry_price   = None
    entry_capital = CAPITAL
    cap_strat     = CAPITAL
    cap_bh        = CAPITAL

    bh_entry = close_arr[0] if n_bars > 0 else 1.0

    for i in range(1, n_bars):
        bar_ret_bh = (close_arr[i] - close_arr[i - 1]) / close_arr[i - 1]
        cap_bh     = cap_bh * (1.0 + bar_ret_bh)
        equity_bh.append(cap_bh)

        sig = signal_arr[i]

        # Detect direction flip → close previous trade, open new
        if sig != 0 and sig != last_dir:
            # Close previous trade
            if last_dir is not None and entry_price is not None:
                trade_ret = last_dir * (close_arr[i] - entry_price) / entry_price - COST_RT
                cap_strat = entry_capital * (1.0 + trade_ret)
                ret_all.append(trade_ret)
                if last_dir > 0:
                    ret_long.append(trade_ret)
                else:
                    ret_short.append(trade_ret)
            # Open new trade
            entry_price   = close_arr[i]
            entry_capital = cap_strat
            last_dir      = sig

        equity_strategy.append(cap_strat)

    # Close last open trade at final bar
    if last_dir is not None and entry_price is not None and n_bars > 1:
        final_ret = last_dir * (close_arr[-1] - entry_price) / entry_price - COST_RT
        cap_strat = entry_capital * (1.0 + final_ret)
        ret_all.append(final_ret)
        if last_dir > 0:
            ret_long.append(final_ret)
        else:
            ret_short.append(final_ret)
        # update last equity point
        equity_strategy[-1] = cap_strat

    eq_strat_arr = np.array(equity_strategy)
    eq_bh_arr    = np.array(equity_bh)

    total_ret = (eq_strat_arr[-1] - CAPITAL) / CAPITAL if n_bars > 0 else 0.0
    bh_ret    = (eq_bh_arr[-1]    - CAPITAL) / CAPITAL if n_bars > 0 else 0.0

    # Sharpe (annualised, 15-min bars → 26240 bars/year approx)
    BARS_PER_YEAR = 26240.0
    ret_series = pd.Series(np.diff(eq_strat_arr) / eq_strat_arr[:-1]) if len(eq_strat_arr) > 1 else pd.Series([], dtype=float)
    if len(ret_series) > 0 and ret_series.std() > 0:
        sharpe_strat = float(ret_series.mean() / ret_series.std() * np.sqrt(BARS_PER_YEAR))
    else:
        sharpe_strat = 0.0

    bh_ret_series = pd.Series(np.diff(eq_bh_arr) / eq_bh_arr[:-1]) if len(eq_bh_arr) > 1 else pd.Series([], dtype=float)
    if len(bh_ret_series) > 0 and bh_ret_series.std() > 0:
        sharpe_bh = float(bh_ret_series.mean() / bh_ret_series.std() * np.sqrt(BARS_PER_YEAR))
    else:
        sharpe_bh = 0.0

    # Max drawdown
    running_max = np.maximum.accumulate(eq_strat_arr)
    dd_arr      = (eq_strat_arr - running_max) / running_max
    mdd         = float(dd_arr.min()) if len(dd_arr) > 0 else 0.0

    n_trades = len(ret_all)

    # ------------------------------------------------------------------
    # Rolling accuracy (30-bar window, active signals only)
    # ------------------------------------------------------------------
    pred_series  = pd.Series(pred_test, index=X_test.index)
    target_test  = y_test
    correct      = (pred_series == target_test).astype(float)
    active_mask  = pred_series != 0
    rolling_acc_vals = correct.where(active_mask).rolling(30, min_periods=1).mean()
    rolling_acc_vals = rolling_acc_vals.where(active_mask)

    roll_dates = [str(d) for d in rolling_acc_vals.index]
    roll_vals  = [None if np.isnan(v) else float(v) for v in rolling_acc_vals.values]

    # ------------------------------------------------------------------
    # Drawdown series
    # ------------------------------------------------------------------
    dd_dates  = [str(d) for d in close_test.index[:len(dd_arr)]]
    dd_values = [float(v) if np.isfinite(v) else None for v in dd_arr]

    # ------------------------------------------------------------------
    # Bollinger Bands & MAs (full test period for display)
    # ------------------------------------------------------------------
    close_full = close
    bb_mid   = close_full.rolling(20).mean()
    bb_std   = close_full.rolling(20).std()
    bb_upper = bb_mid + 2.0 * bb_std
    bb_lower = bb_mid - 2.0 * bb_std

    ma50  = close_full.rolling(50).mean()
    ma100 = close_full.rolling(100).mean()
    ma200 = close_full.rolling(200).mean()

    # Slice to test period for display
    test_index = X_test.index

    def _slice(s):
        return s.reindex(test_index).bfill().ffill()

    bb_upper_t = _slice(bb_upper)
    bb_mid_t   = _slice(bb_mid)
    bb_lower_t = _slice(bb_lower)
    ma50_t     = _slice(ma50)
    ma100_t    = _slice(ma100)
    ma200_t    = _slice(ma200)

    # ------------------------------------------------------------------
    # Feature importance (top 15 ascending)
    # ------------------------------------------------------------------
    fi_vals = model.feature_importances_
    fi_pairs = sorted(zip(feature_cols, fi_vals.tolist()), key=lambda x: x[1])[-15:]
    fi_names  = [p[0] for p in fi_pairs]
    fi_values = [float(p[1]) for p in fi_pairs]

    # ------------------------------------------------------------------
    # OHLC for return dict (test period)
    # ------------------------------------------------------------------
    ohlc_test = df.loc[test_index]

    def _clean_list(arr):
        return [None if (v is None or (isinstance(v, float) and not np.isfinite(v))) else float(v) for v in arr]

    # ------------------------------------------------------------------
    # SECTION 8 — CUSTOM FIGURES
    # ------------------------------------------------------------------
    custom_figs = []

    # Figure 1: SMA overlay (20, 50, 200)
    fig_sma = go.Figure()
    fig_sma.add_trace(go.Scatter(
        x=list(test_index.astype(str)),
        y=_clean_list(close_test.values),
        name="Close", line=dict(color="#d1d4dc", width=1)
    ))
    fig_sma.add_trace(go.Scatter(
        x=list(test_index.astype(str)),
        y=_clean_list(ma50_t.values),
        name="SMA 50", line=dict(color="#2196F3", width=1.2)
    ))
    # SMA 20 from feature columns
    sma20_t = _slice(df["sma_20"])
    fig_sma.add_trace(go.Scatter(
        x=list(test_index.astype(str)),
        y=_clean_list(sma20_t.values),
        name="SMA 20", line=dict(color="#FF9800", width=1.2)
    ))
    fig_sma.add_trace(go.Scatter(
        x=list(test_index.astype(str)),
        y=_clean_list(ma200_t.values),
        name="SMA 200", line=dict(color="#E040FB", width=1.2)
    ))
    fig_sma.update_layout(
        title="SMA Overlay (Test Period)",
        paper_bgcolor="#131722", plot_bgcolor="#131722",
        font_color="#d1d4dc",
        margin=dict(l=40, r=20, t=30, b=30),
        legend=dict(bgcolor="rgba(0,0,0,0)"),
        xaxis_title="Date", yaxis_title="Price",
    )
    custom_figs.append(fig_sma.to_dict())

    # Figure 2: RSI 14
    rsi_t = _slice(df["rsi_14"])
    fig_rsi = go.Figure()
    fig_rsi.add_trace(go.Scatter(
        x=list(test_index.astype(str)),
        y=_clean_list(rsi_t.values),
        name="RSI 14", line=dict(color="#26C6DA", width=1.2)
    ))
    fig_rsi.add_hline(y=70, line_dash="dash", line_color="#FF5252", annotation_text="Overbought 70")
    fig_rsi.add_hline(y=30, line_dash="dash", line_color="#69F0AE", annotation_text="Oversold 30")
    fig_rsi.update_layout(
        title="RSI 14 (Test Period)",
        paper_bgcolor="#131722", plot_bgcolor="#131722",
        font_color="#d1d4dc",
        margin=dict(l=40, r=20, t=30, b=30),
        legend=dict(bgcolor="rgba(0,0,0,0)"),
        xaxis_title="Date", yaxis_title="RSI",
        yaxis=dict(range=[0, 100]),
    )
    custom_figs.append(fig_rsi.to_dict())

    # ------------------------------------------------------------------
    # Register model
    # ------------------------------------------------------------------
    if register_model is not None:
        register_model(model)

    # ------------------------------------------------------------------
    # Build return dict
    # ------------------------------------------------------------------
    equity_dates = [str(d) for d in close_test.index[:len(equity_strategy)]]

    result = {
        "ohlc": {
            "dates": [str(d) for d in ohlc_test.index],
            "open":  _clean_list(ohlc_test["open"].values),
            "high":  _clean_list(ohlc_test["high"].values),
            "low":   _clean_list(ohlc_test["low"].values),
            "close": _clean_list(ohlc_test["close"].values),
        },
        "signals": {
            "dates":  [str(d) for d in signal_test.index],
            "values": _clean_list(signal_test.values),
        },
        "bb": {
            "upper": _clean_list(bb_upper_t.values),
            "mid":   _clean_list(bb_mid_t.values),
            "lower": _clean_list(bb_lower_t.values),
        },
        "ma": {
            "ma50":  _clean_list(ma50_t.values),
            "ma100": _clean_list(ma100_t.values),
            "ma200": _clean_list(ma200_t.values),
        },
        "equity": {
            "dates":    equity_dates,
            "strategy": _clean_list(eq_strat_arr.tolist()),
            "bh":       _clean_list(eq_bh_arr.tolist()),
        },
        "feature_importance": {
            "names":  fi_names,
            "values": fi_values,
        },
        "conf_matrix": cm,
        "conf_hist": {
            "p_pos": p_pos.tolist(),
            "p_neg": p_neg.tolist(),
        },
        "rolling_acc": {
            "dates":  roll_dates,
            "values": roll_vals,
        },
        "drawdown": {
            "dates":  dd_dates,
            "values": dd_values,
        },
        "ret_dist":       [float(r) for r in ret_all],
        "ret_dist_long":  [float(r) for r in ret_long],
        "ret_dist_short": [float(r) for r in ret_short],
        "metrics": {
            "total_ret":    float(total_ret)    if np.isfinite(total_ret)    else 0.0,
            "bh_ret":       float(bh_ret)       if np.isfinite(bh_ret)       else 0.0,
            "sharpe_strat": float(sharpe_strat) if np.isfinite(sharpe_strat) else 0.0,
            "sharpe_bh":    float(sharpe_bh)    if np.isfinite(sharpe_bh)    else 0.0,
            "mdd":          float(mdd)          if np.isfinite(mdd)          else 0.0,
            "n_trades":     int(n_trades),
        },
        "split_dt":    split_dt,
        "split_idx":   int(split_idx),
        "n_train":     int(n_train),
        "n_test":      int(n_test),
        "feature_cols": feature_cols,
        "custom_figs": custom_figs,
    }

    return result
M
sharpe 28!
@malco · 2026-04-02
+1.24%
Return
28.57
Sharpe
0.4%
Max DD
74
Trades
</> View Code
# ╔══════════════════════════════════════════════════════════════╗
# ║                  STRATEGY REQUEST LOG                       ║
# ╚══════════════════════════════════════════════════════════════╝
# Generated        : 2026-04-02 14:13:23
# Model            : Random Forest
# Feature Eng.     : SMA (20, 50, 200), RSI 14
# Signal / Entry   : —
# Optimization     : —
# Risk Mgmt        : —
# Risk Filter      : —
# ══════════════════════════════════════════════════════════════

# ============================================================
# SECTION 0 — IMPORTS & CONSTANTS
# ============================================================

import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix
import plotly.graph_objects as go
from model_wrapper import ModelWrapper

DATA_PATH = "/Users/malco/Desktop/QuantifyMe/data/EURUSD_ticks.csv"
START_DATE = "2026-03-03"
END_DATE   = "2026-03-28"
STARTING_CAPITAL = 10_000
TRADE_COST = 2e-5


# ============================================================
# SECTION 2 — FEATURE ENGINEERING
# ============================================================

def feature_engineering(df, close, open_, high, low):
    # SMA 20, 50, 200
    df['sma_20']  = close.rolling(20).mean()
    df['sma_50']  = close.rolling(50).mean()
    df['sma_200'] = close.rolling(200).mean()

    # Price relative to SMAs
    df['close_sma20_ratio']  = close / df['sma_20']  - 1.0
    df['close_sma50_ratio']  = close / df['sma_50']  - 1.0
    df['close_sma200_ratio'] = close / df['sma_200'] - 1.0

    # RSI 14
    delta = close.diff()
    gain  = delta.clip(lower=0)
    loss  = (-delta).clip(lower=0)
    avg_gain = gain.ewm(com=13, min_periods=14).mean()
    avg_loss = loss.ewm(com=13, min_periods=14).mean()
    rs = avg_gain / avg_loss.replace(0, np.nan)
    df['rsi_14'] = 100 - (100 / (1 + rs))

    # Additional derived features (no lookahead)
    df['returns_1']  = close.pct_change(1)
    df['returns_4']  = close.pct_change(4)
    df['hl_range']   = (high - low) / close
    df['oc_range']   = (close - open_) / close

    return df


# ============================================================
# SECTION 3 — ML MODEL
# ============================================================

def build_model(X_train, y_train):
    cfg = optimization_config()
    clf = RandomForestClassifier(
        n_estimators  = cfg['n_estimators'],
        max_depth     = cfg['max_depth'],
        min_samples_leaf = cfg['min_samples_leaf'],
        class_weight  = cfg['class_weight'],
        random_state  = 42,
        n_jobs        = -1
    )
    clf.fit(X_train, y_train)
    enc = LabelEncoder()
    enc.fit([-1, 0, 1])
    return ModelWrapper(clf, original_classes=enc.classes_, n_features=X_train.shape[1])


# ============================================================
# SECTION 4 — SIGNAL / ENTRY LOGIC
# ============================================================

def generate_signals(model, X, thresh):
    proba = model.predict_proba(X)                       # shape (n, n_classes)
    classes = list(model.classes_)
    n = len(X)

    if 1 in classes:
        p_pos = proba[:, classes.index(1)]
    else:
        p_pos = np.zeros(n)

    if -1 in classes:
        p_neg = proba[:, classes.index(-1)]
    else:
        p_neg = np.zeros(n)

    signal_vals = np.zeros(n)
    signal_vals[p_pos >= thresh] =  1.0
    signal_vals[p_neg >= thresh] = -1.0

    # Where both cross threshold, pick the higher probability
    both = (p_pos >= thresh) & (p_neg >= thresh)
    signal_vals[both] = np.where(p_pos[both] >= p_neg[both], 1.0, -1.0)

    signal = pd.Series(signal_vals, index=X.index, dtype=float)
    return signal, p_pos, p_neg


# ============================================================
# SECTION 5 — OPTIMISATION TARGET
# ============================================================

def optimization_config():
    return {
        "objective":       "Maximize Sharpe ratio",
        "notes":           "Balanced class weights, moderate depth, many estimators for stable Sharpe",
        "n_estimators":    300,
        "max_depth":       8,
        "min_samples_leaf": 20,
        "class_weight":    "balanced",
        "learning_rate":   None
    }


# ============================================================
# SECTION 6 — RISK MANAGEMENT
# ============================================================

def apply_risk(signal, close, pos_size=1.0):
    return signal * pos_size


# ============================================================
# SECTION 7 — BACKTEST ENGINE
# ============================================================

def train_and_backtest():
    # ------------------------------------------------------------------
    # 1. Load & Resample
    # ------------------------------------------------------------------
    raw = pd.read_csv(DATA_PATH, parse_dates=['Time'])
    raw.set_index('Time', inplace=True)
    raw['mid'] = (raw['Bid'] + raw['Ask']) / 2.0

    ohlc = raw['mid'].resample('15min').ohlc()
    ohlc.dropna(inplace=True)

    if START_DATE:
        ohlc = ohlc[ohlc.index >= START_DATE]
    if END_DATE:
        ohlc = ohlc[ohlc.index <= END_DATE]

    close  = ohlc['close']
    open_  = ohlc['open']
    high   = ohlc['high']
    low    = ohlc['low']

    df = ohlc.copy()

    # ------------------------------------------------------------------
    # 2. Feature Engineering
    # ------------------------------------------------------------------
    df = feature_engineering(df, close, open_, high, low)

    # ------------------------------------------------------------------
    # 3. Target — direction 1 hour ahead (4 bars)
    # ------------------------------------------------------------------
    target = np.sign(close.shift(-4) - close)
    mask   = target.notna()
    df     = df[mask]
    target = target[mask]

    close  = close[mask]
    open_  = open_[mask]
    high   = high[mask]
    low    = low[mask]

    feature_cols = [
        'sma_20', 'sma_50', 'sma_200',
        'close_sma20_ratio', 'close_sma50_ratio', 'close_sma200_ratio',
        'rsi_14',
        'returns_1', 'returns_4',
        'hl_range', 'oc_range'
    ]

    df_feat = df[feature_cols].copy()
    df_feat = df_feat.bfill().ffill()
    df_feat.dropna(inplace=True)

    # Align target and price to clean feature index
    target = target.reindex(df_feat.index)
    close  = close.reindex(df_feat.index)
    open_  = open_.reindex(df_feat.index)
    high   = high.reindex(df_feat.index)
    low    = low.reindex(df_feat.index)
    df     = df.reindex(df_feat.index)

    # ------------------------------------------------------------------
    # 4. Train / Test Split (70/30, no shuffle)
    # ------------------------------------------------------------------
    n_total  = len(df_feat)
    split_idx = int(n_total * 0.70)

    X_train = df_feat.iloc[:split_idx]
    X_test  = df_feat.iloc[split_idx:]
    y_train = target.iloc[:split_idx]
    y_test  = target.iloc[split_idx:]
    close_test = close.iloc[split_idx:]

    split_dt = str(df_feat.index[split_idx])
    n_train  = len(X_train)
    n_test   = len(X_test)

    # ------------------------------------------------------------------
    # 5. Encode labels
    # ------------------------------------------------------------------
    enc = LabelEncoder()
    enc.fit([-1, 0, 1])
    y_train_enc = enc.transform(y_train)

    # ------------------------------------------------------------------
    # 6. Train Model
    # ------------------------------------------------------------------
    model = build_model(X_train, y_train_enc)

    # ------------------------------------------------------------------
    # 7. Generate Signals
    # ------------------------------------------------------------------
    thresh = 0.55
    signal_test, p_pos, p_neg = generate_signals(model, X_test, thresh)
    signal_test = apply_risk(signal_test, close_test)

    # ------------------------------------------------------------------
    # 8. Confusion Matrix
    # ------------------------------------------------------------------
    pred_test = model.predict(X_test)
    y_test_arr = np.asarray(y_test)
    pred_arr   = np.asarray(pred_test)

    cm = confusion_matrix(y_test_arr, pred_arr, labels=[-1, 0, 1])
    conf_matrix = cm.tolist()

    # ------------------------------------------------------------------
    # 9. Equity Curve & Metrics
    # ------------------------------------------------------------------
    close_arr  = close_test.values
    signal_arr = signal_test.values

    # Trade returns: position change triggers a trade
    position   = np.zeros(len(signal_arr))
    bar_ret    = np.diff(close_arr, prepend=close_arr[0]) / close_arr
    bar_ret[0] = 0.0

    strat_ret  = signal_arr * bar_ret
    # Subtract round-trip cost on position changes
    pos_changes = np.diff(signal_arr, prepend=signal_arr[0])
    trade_mask  = pos_changes != 0
    strat_ret  -= np.abs(pos_changes) * (TRADE_COST / 2)  # half on entry, scaled

    equity_strat = STARTING_CAPITAL * np.cumprod(1 + strat_ret)
    bh_ret_arr   = close_arr / close_arr[0]
    equity_bh    = STARTING_CAPITAL * bh_ret_arr

    # Collect trade returns for ret_dist
    ret_dist       = []
    ret_dist_long  = []
    ret_dist_short = []

    current_pos    = 0.0
    entry_price    = None
    entry_idx      = None

    for i in range(len(signal_arr)):
        new_pos = signal_arr[i]
        if new_pos != current_pos:
            if current_pos != 0.0 and entry_price is not None:
                exit_price = close_arr[i]
                raw_r      = (exit_price - entry_price) / entry_price * current_pos
                r          = raw_r - TRADE_COST
                ret_dist.append(float(r))
                if current_pos > 0:
                    ret_dist_long.append(float(r))
                else:
                    ret_dist_short.append(float(r))
            if new_pos != 0.0:
                entry_price = close_arr[i]
                entry_idx   = i
            else:
                entry_price = None
                entry_idx   = None
            current_pos = new_pos

    # Close last open position
    if current_pos != 0.0 and entry_price is not None:
        exit_price = close_arr[-1]
        raw_r = (exit_price - entry_price) / entry_price * current_pos
        r     = raw_r - TRADE_COST
        ret_dist.append(float(r))
        if current_pos > 0:
            ret_dist_long.append(float(r))
        else:
            ret_dist_short.append(float(r))

    n_trades = len(ret_dist)

    # Total return
    total_ret = float((equity_strat[-1] - STARTING_CAPITAL) / STARTING_CAPITAL)
    bh_total  = float((equity_bh[-1]    - STARTING_CAPITAL) / STARTING_CAPITAL)

    # Sharpe (annualised, 15min bars → 26040 bars/year)
    BARS_PER_YEAR = 26040
    active_mask = signal_arr != 0
    if active_mask.sum() > 1 and strat_ret[active_mask].std() > 0:
        sharpe_strat = float(
            np.mean(strat_ret[active_mask]) /
            np.std(strat_ret[active_mask])  *
            np.sqrt(BARS_PER_YEAR)
        )
    else:
        sharpe_strat = 0.0

    if bar_ret.std() > 0:
        sharpe_bh = float(np.mean(bar_ret) / np.std(bar_ret) * np.sqrt(BARS_PER_YEAR))
    else:
        sharpe_bh = 0.0

    # Max Drawdown
    running_max = np.maximum.accumulate(equity_strat)
    drawdown_arr = (equity_strat - running_max) / running_max
    mdd = float(drawdown_arr.min())

    # ------------------------------------------------------------------
    # 10. Rolling Accuracy (30-bar, non-flat signals, test period)
    # ------------------------------------------------------------------
    correct    = (pred_arr == y_test_arr).astype(float)
    active_sig = signal_arr != 0
    roll_vals  = []
    dates_test = X_test.index

    for i in range(len(correct)):
        start_i = max(0, i - 29)
        window_active = active_sig[start_i:i+1]
        if window_active.sum() == 0:
            roll_vals.append(None)
        else:
            roll_vals.append(float(correct[start_i:i+1][window_active].mean()))

    # ------------------------------------------------------------------
    # 11. Bollinger Bands & MAs on full test set
    # ------------------------------------------------------------------
    close_full = close
    bb_mid_s   = close_full.rolling(20).mean()
    bb_std_s   = close_full.rolling(20).std()
    bb_upper_s = bb_mid_s + 2 * bb_std_s
    bb_lower_s = bb_mid_s - 2 * bb_std_s

    ma50_s  = close_full.rolling(50).mean()
    ma100_s = close_full.rolling(100).mean()
    ma200_s = close_full.rolling(200).mean()

    def _clean(series):
        vals = series.reindex(close_test.index).tolist()
        return [None if (v is None or (isinstance(v, float) and (np.isnan(v) or np.isinf(v)))) else float(v) for v in vals]

    # ------------------------------------------------------------------
    # 12. Feature Importance (top 15 ascending)
    # ------------------------------------------------------------------
    fi_vals = model.feature_importances_
    fi_pairs = sorted(zip(feature_cols, fi_vals), key=lambda x: x[1])[-15:]
    fi_names  = [p[0] for p in fi_pairs]
    fi_values = [float(p[1]) for p in fi_pairs]

    # ------------------------------------------------------------------
    # 13. OHLC for test period
    # ------------------------------------------------------------------
    test_dates = [str(d) for d in X_test.index]
    ohlc_test  = df.reindex(X_test.index)

    def _list_clean(arr):
        return [None if (v is None or (isinstance(v, float) and (np.isnan(v) or np.isinf(v)))) else float(v)
                for v in (arr.tolist() if hasattr(arr, 'tolist') else list(arr))]

    # ------------------------------------------------------------------
    # SECTION 8 — CUSTOM FIGURES
    # ------------------------------------------------------------------

    custom_figs = []

    # --- Chart 1: SMA 20 / 50 / 200 overlay on close (test period) ---
    sma20_clean  = _clean(df['sma_20'])
    sma50_clean  = _clean(df['sma_50'])
    sma200_clean = _clean(df['sma_200'])
    close_clean  = _clean(close)

    fig_sma = go.Figure()
    fig_sma.add_trace(go.Scatter(
        x=test_dates, y=close_clean,
        name='Close', line=dict(color='#d1d4dc', width=1)
    ))
    fig_sma.add_trace(go.Scatter(
        x=test_dates, y=sma20_clean,
        name='SMA 20', line=dict(color='#f7c948', width=1.2)
    ))
    fig_sma.add_trace(go.Scatter(
        x=test_dates, y=sma50_clean,
        name='SMA 50', line=dict(color='#26a69a', width=1.2)
    ))
    fig_sma.add_trace(go.Scatter(
        x=test_dates, y=sma200_clean,
        name='SMA 200', line=dict(color='#ef5350', width=1.2)
    ))
    fig_sma.update_layout(
        title='SMA 20 / 50 / 200 — Test Period',
        paper_bgcolor="#131722", plot_bgcolor="#131722",
        font_color="#d1d4dc",
        margin=dict(l=40, r=20, t=30, b=30),
        legend=dict(bgcolor="rgba(0,0,0,0)")
    )
    custom_figs.append(fig_sma.to_dict())

    # --- Chart 2: RSI 14 (test period) ---
    rsi_clean = _clean(df['rsi_14'])

    fig_rsi = go.Figure()
    fig_rsi.add_trace(go.Scatter(
        x=test_dates, y=rsi_clean,
        name='RSI 14', line=dict(color='#ab47bc', width=1.5)
    ))
    fig_rsi.add_hline(y=70, line_dash='dash', line_color='#ef5350', opacity=0.6)
    fig_rsi.add_hline(y=30, line_dash='dash', line_color='#26a69a', opacity=0.6)
    fig_rsi.add_hline(y=50, line_dash='dot',  line_color='#d1d4dc', opacity=0.3)
    fig_rsi.update_layout(
        title='RSI 14 — Test Period',
        yaxis=dict(range=[0, 100]),
        paper_bgcolor="#131722", plot_bgcolor="#131722",
        font_color="#d1d4dc",
        margin=dict(l=40, r=20, t=30, b=30),
        legend=dict(bgcolor="rgba(0,0,0,0)")
    )
    custom_figs.append(fig_rsi.to_dict())

    # ------------------------------------------------------------------
    # 14. Assemble result dict
    # ------------------------------------------------------------------
    equity_strat_list = [float(v) for v in equity_strat.tolist()]
    equity_bh_list    = [float(v) for v in equity_bh.tolist()]
    drawdown_list     = [float(v) for v in drawdown_arr.tolist()]

    def _sanitize_list(lst):
        out = []
        for v in lst:
            if v is None:
                out.append(None)
            elif isinstance(v, float) and (np.isnan(v) or np.isinf(v)):
                out.append(None)
            else:
                out.append(v)
        return out

    result = {
        "ohlc": {
            "dates": test_dates,
            "open":  _list_clean(ohlc_test['open']),
            "high":  _list_clean(ohlc_test['high']),
            "low":   _list_clean(ohlc_test['low']),
            "close": _list_clean(ohlc_test['close']),
        },
        "signals": {
            "dates":  test_dates,
            "values": [float(v) for v in signal_test.values.tolist()]
        },
        "bb": {
            "upper": _sanitize_list(_clean(bb_upper_s)),
            "mid":   _sanitize_list(_clean(bb_mid_s)),
            "lower": _sanitize_list(_clean(bb_lower_s)),
        },
        "ma": {
            "ma50":  _sanitize_list(_clean(ma50_s)),
            "ma100": _sanitize_list(_clean(ma100_s)),
            "ma200": _sanitize_list(_clean(ma200_s)),
        },
        "equity": {
            "dates":    test_dates,
            "strategy": _sanitize_list(equity_strat_list),
            "bh":       _sanitize_list(equity_bh_list),
        },
        "feature_importance": {
            "names":  fi_names,
            "values": fi_values,
        },
        "conf_matrix": conf_matrix,
        "conf_hist": {
            "p_pos": p_pos.tolist(),
            "p_neg": p_neg.tolist(),
        },
        "rolling_acc": {
            "dates":  test_dates,
            "values": _sanitize_list(roll_vals),
        },
        "drawdown": {
            "dates":  test_dates,
            "values": _sanitize_list(drawdown_list),
        },
        "ret_dist":       ret_dist,
        "ret_dist_long":  ret_dist_long,
        "ret_dist_short": ret_dist_short,
        "metrics": {
            "total_ret":    total_ret,
            "bh_ret":       bh_total,
            "sharpe_strat": sharpe_strat,
            "sharpe_bh":    sharpe_bh,
            "mdd":          mdd,
            "n_trades":     n_trades,
        },
        "split_dt":    split_dt,
        "split_idx":   int(split_idx),
        "n_train":     int(n_train),
        "n_test":      int(n_test),
        "feature_cols": feature_cols,
        "custom_figs":  custom_figs,
    }

    if register_model is not None:
        register_model(model)

    return result
M
good!
@malco · 2026-04-02
+11.62%
Return
3.98
Sharpe
6.1%
Max DD
94
Trades
</> View Code
# ╔══════════════════════════════════════════════════════════════╗
# ║                  STRATEGY REQUEST LOG                       ║
# ╚══════════════════════════════════════════════════════════════╝
# Generated        : 2026-04-02 13:16:50
# Model            : Random Forest
# Feature Eng.     : SMA (20, 50, 200), RSI 14
# Signal / Entry   : —
# Optimization     : —
# Risk Mgmt        : —
# Risk Filter      : —
# ══════════════════════════════════════════════════════════════

# ============================================================
# SECTION 0 — IMPORTS & CONSTANTS
# ============================================================

import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from model_wrapper import ModelWrapper
import plotly.graph_objects as go

DATA_PATH = "/Users/malco/Desktop/QuantifyMe/data/EURUSD_ticks.csv"


# ============================================================
# SECTION 1 — MODEL WRAPPER
# ============================================================

# ModelWrapper is imported from model_wrapper module


# ============================================================
# SECTION 2 — FEATURE ENGINEERING
# ============================================================

def feature_engineering(df, close, open_, high, low) -> pd.DataFrame:
    """
    Add SMA (20, 50, 200) and RSI 14 features.
    """
    # SMA features
    df['sma_20'] = close.rolling(window=20).mean()
    df['sma_50'] = close.rolling(window=50).mean()
    df['sma_200'] = close.rolling(window=200).mean()
    
    # RSI 14 feature
    delta = close.diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
    rs = gain / loss.replace(0, 1e-10)
    df['rsi_14'] = 100 - (100 / (1 + rs))
    
    return df


# ============================================================
# SECTION 3 — ML MODEL
# ============================================================

def build_model(X_train, y_train) -> ModelWrapper:
    """
    Build a Random Forest classifier for 3-class ([-1, 0, 1]) prediction.
    Hyperparameters are read from optimization_config() to serve the Sharpe objective.
    """
    config = optimization_config()
    
    clf = RandomForestClassifier(
        n_estimators=config.get('n_estimators', 200),
        max_depth=config.get('max_depth', 10),
        min_samples_leaf=config.get('min_samples_leaf', 5),
        min_samples_split=config.get('min_samples_split', 10),
        class_weight=config.get('class_weight', 'balanced'),
        random_state=42,
        n_jobs=-1
    )
    
    clf.fit(X_train, y_train)
    
    # Return wrapped model with original classes [-1, 0, 1]
    return ModelWrapper(clf, original_classes=[-1, 0, 1], n_features=X_train.shape[1])


# ============================================================
# SECTION 4 — SIGNAL / ENTRY LOGIC
# ============================================================

def generate_signals(model, X, thresh) -> tuple:
    """
    Generate buy/sell signals based on model confidence > thresh.
    Returns (signal, p_pos, p_neg) where:
      signal: pd.Series with values in {-1.0, 0.0, 1.0}
      p_pos:  1-D array of probability for class +1
      p_neg:  1-D array of probability for class -1
    """
    proba = model.predict_proba(X)
    classes = model.classes_
    
    # Extract probabilities for +1 and -1
    p_pos = np.zeros(len(X))
    p_neg = np.zeros(len(X))
    
    for i, cls in enumerate(classes):
        if cls == 1:
            p_pos = proba[:, i]
        elif cls == -1:
            p_neg = proba[:, i]
    
    # Generate signals: buy if P(+1) > thresh, sell if P(-1) > thresh, else hold
    signal = np.zeros(len(X))
    signal[p_pos > thresh] = 1.0
    signal[p_neg > thresh] = -1.0
    
    signal_series = pd.Series(signal, index=X.index, dtype=float)
    
    return signal_series, p_pos, p_neg


# ============================================================
# SECTION 5 — OPTIMISATION TARGET
# ============================================================

def optimization_config() -> dict:
    """
    Return hyperparameters optimized for Sharpe ratio.
    Conservative depth and higher estimators reduce overfitting and variance,
    improving risk-adjusted returns.
    """
    return {
        'objective': 'Maximize Sharpe ratio',
        'notes': 'Random Forest with balanced class weights and conservative depth for stable risk-adjusted returns',
        'n_estimators': 200,
        'max_depth': 10,
        'min_samples_leaf': 5,
        'min_samples_split': 10,
        'class_weight': 'balanced'
    }


# ============================================================
# SECTION 6 — RISK MANAGEMENT
# ============================================================

def apply_risk(signal, close, pos_size=1.0):
    """
    Apply position sizing and risk rules.
    For now, simple scaling by position size.
    """
    return signal * pos_size


# ============================================================
# SECTION 7 — BACKTEST ENGINE
# ============================================================

def train_and_backtest() -> dict:
    """
    Full backtest pipeline: load data, engineer features, train model,
    generate signals, compute equity curve and metrics.
    """
    
    # ─────────────────────────────────────────────────────────────
    # 1. Load and resample data
    # ─────────────────────────────────────────────────────────────
    df_raw = pd.read_csv(DATA_PATH)
    df_raw['Time'] = pd.to_datetime(df_raw['Time'])
    df_raw.set_index('Time', inplace=True)
    
    df_raw['mid'] = (df_raw['Bid'] + df_raw['Ask']) / 2
    
    # Resample to 15-minute OHLC
    ohlc_data = df_raw['mid'].resample('15min').ohlc()
    ohlc_data.columns = ['open', 'high', 'low', 'close']
    
    df = ohlc_data.copy()
    close = df['close']
    open_ = df['open']
    high = df['high']
    low = df['low']
    
    # ─────────────────────────────────────────────────────────────
    # 2. Create target: direction 1 hour ahead (4 × 15m bars)
    # ─────────────────────────────────────────────────────────────
    target = np.sign(close.shift(-4) - close)
    
    # Drop NaN from target BEFORE train/test split
    mask = target.notna()
    df = df[mask]
    target = target[mask]
    close = close[mask]
    open_ = open_[mask]
    high = high[mask]
    low = low[mask]
    
    # ─────────────────────────────────────────────────────────────
    # 3. Feature engineering
    # ─────────────────────────────────────────────────────────────
    df = feature_engineering(df, close, open_, high, low)
    
    # Define feature columns (in order used for training)
    feature_cols = ['sma_20', 'sma_50', 'sma_200', 'rsi_14']
    
    # Drop NaN from features
    mask_features = df[feature_cols].notna().all(axis=1)
    df = df[mask_features]
    target = target[mask_features]
    close = close[mask_features]
    open_ = open_[mask_features]
    high = high[mask_features]
    low = low[mask_features]
    
    # ─────────────────────────────────────────────────────────────
    # 4. Train/test split (70/30 walk-forward, no shuffle)
    # ─────────────────────────────────────────────────────────────
    n_total = len(df)
    split_idx = int(0.7 * n_total)
    split_dt = df.index[split_idx].strftime('%Y-%m-%d %H:%M:%S')
    
    X_train = df.iloc[:split_idx][feature_cols]
    X_test = df.iloc[split_idx:][feature_cols]
    y_train = target.iloc[:split_idx]
    y_test = target.iloc[split_idx:]
    
    close_train = close.iloc[:split_idx]
    close_test = close.iloc[split_idx:]
    
    open_test = open_.iloc[split_idx:]
    high_test = high.iloc[split_idx:]
    low_test = low.iloc[split_idx:]
    
    n_train = len(X_train)
    n_test = len(X_test)
    
    # ─────────────────────────────────────────────────────────────
    # 5. Encode labels
    # ─────────────────────────────────────────────────────────────
    le = LabelEncoder()
    le.fit([-1, 0, 1])  # Fit on ALL possible classes
    y_train_enc = le.transform(y_train)
    y_test_enc = le.transform(y_test)
    
    # ─────────────────────────────────────────────────────────────
    # 6. Build and train model
    # ─────────────────────────────────────────────────────────────
    model = build_model(X_train, y_train_enc)
    
    # ─────────────────────────────────────────────────────────────
    # 7. Generate signals on test set
    # ─────────────────────────────────────────────────────────────
    signal_test_raw, p_pos, p_neg = generate_signals(model, X_test, thresh=0.55)
    
    # Apply risk (position sizing)
    signal_test = apply_risk(signal_test_raw, close_test, pos_size=1.0)
    
    # ─────────────────────────────────────────────────────────────
    # 8. Compute equity curve and metrics
    # ─────────────────────────────────────────────────────────────
    
    # Strategy returns (per signal, scaled by next 1-hour price move)
    price_moves = close_test.shift(-4) - close_test
    strat_ret = signal_test * price_moves / close_test.shift(1)
    
    # Apply trading cost (2e-5 round-trip cost)
    signal_changes = signal_test.diff().abs()
    trading_cost = signal_changes * 2e-5
    strat_ret = strat_ret - trading_cost
    
    # Buy-and-hold returns
    bh_ret = price_moves / close_test.shift(1)
    
    # Equity curves (starting capital 10,000)
    capital = 10000
    strat_equity = [capital]
    bh_equity = [capital]
    
    for ret_s, ret_bh in zip(strat_ret, bh_ret):
        strat_equity.append(strat_equity[-1] * (1 + ret_s) if not np.isnan(ret_s) else strat_equity[-1])
        bh_equity.append(bh_equity[-1] * (1 + ret_bh) if not np.isnan(ret_bh) else bh_equity[-1])
    
    strat_equity = strat_equity[1:]
    bh_equity = bh_equity[1:]
    
    # Total returns (decimal ratio)
    total_ret = (strat_equity[-1] - capital) / capital
    bh_ret_total = (bh_equity[-1] - capital) / capital
    
    # Sharpe ratio (annualized, assuming 252*24*4 = 24192 15-min bars per year)
    ret_series = pd.Series(strat_ret.values, index=close_test.index)
    ret_series = ret_series.dropna()
    
    if len(ret_series) > 0 and ret_series.std() > 0:
        sharpe_strat = ret_series.mean() / ret_series.std() * np.sqrt(24192)
    else:
        sharpe_strat = 0.0
    
    ret_series_bh = pd.Series(bh_ret.values, index=close_test.index)
    ret_series_bh = ret_series_bh.dropna()
    
    if len(ret_series_bh) > 0 and ret_series_bh.std() > 0:
        sharpe_bh = ret_series_bh.mean() / ret_series_bh.std() * np.sqrt(24192)
    else:
        sharpe_bh = 0.0
    
    # Max Drawdown
    cum_strat = np.cumprod(1 + ret_series.fillna(0))
    running_max = np.maximum.accumulate(cum_strat)
    drawdown = (cum_strat - running_max) / running_max
    mdd = float(np.min(drawdown)) if len(drawdown) > 0 else 0.0
    
    # Trade returns and statistics
    trade_returns = []
    trade_returns_long = []
    trade_returns_short = []
    entry_price = None
    entry_signal = None
    n_trades = 0
    
    for i, (sig, ret) in enumerate(zip(signal_test, price_moves)):
        if sig != 0 and entry_signal != sig:
            if entry_signal is not None:
                # Close previous trade
                n_trades += 1
                trade_ret = (close_test.iloc[i] - entry_price) / entry_price * entry_signal
                trade_returns.append(trade_ret)
                if entry_signal == 1:
                    trade_returns_long.append(trade_ret)
                else:
                    trade_returns_short.append(trade_ret)
            
            entry_signal = sig
            entry_price = close_test.iloc[i]
    
    ret_dist = trade_returns if trade_returns else []
    ret_dist_long = trade_returns_long if trade_returns_long else []
    ret_dist_short = trade_returns_short if trade_returns_short else []
    
    # ─────────────────────────────────────────────────────────────
    # 9. Confusion matrix and predictions
    # ─────────────────────────────────────────────────────────────
    pred_test = model.predict(X_test)
    
    from sklearn.metrics import confusion_matrix
    cm = confusion_matrix(y_test, pred_test, labels=[-1, 0, 1])
    conf_matrix = cm.tolist()
    
    # ─────────────────────────────────────────────────────────────
    # 10. Rolling accuracy
    # ─────────────────────────────────────────────────────────────
    rolling_acc_list = []
    rolling_dates = []
    
    for i in range(30, len(X_test)):
        window_signals = signal_test.iloc[i-30:i]
        window_actual = y_test.iloc[i-30:i]
        
        # Only count non-zero signals
        active_mask = window_signals != 0
        if active_mask.sum() > 0:
            acc = (window_signals[active_mask] == window_actual[active_mask]).sum() / active_mask.sum()
            rolling_acc_list.append(float(acc))
            rolling_dates.append(close_test.index[i].strftime('%Y-%m-%d %H:%M:%S'))
        else:
            rolling_acc_list.append(None)
            rolling_dates.append(close_test.index[i].strftime('%Y-%m-%d %H:%M:%S'))
    
    # ─────────────────────────────────────────────────────────────
    # 11. Bollinger Bands and Moving Averages
    # ─────────────────────────────────────────────────────────────
    bb_sma = close_test.rolling(20).mean()
    bb_std = close_test.rolling(20).std()
    bb_upper = bb_sma + 2 * bb_std
    bb_lower = bb_sma - 2 * bb_std
    
    ma_50 = close_test.rolling(50).mean()
    ma_100 = close_test.rolling(100).mean()
    ma_200 = close_test.rolling(200).mean()
    
    # ─────────────────────────────────────────────────────────────
    # 12. Drawdown series
    # ─────────────────────────────────────────────────────────────
    drawdown_series = []
    drawdown_dates = []
    cum_returns = [1.0]
    for ret in ret_series.fillna(0):
        cum_returns.append(cum_returns[-1] * (1 + ret))
    
    running_max = np.maximum.accumulate(cum_returns)
    for i, (cum, max_cum) in enumerate(zip(cum_returns[1:], running_max[1:])):
        dd = (cum - max_cum) / max_cum
        drawdown_series.append(float(dd))
        drawdown_dates.append(close_test.index[i].strftime('%Y-%m-%d %H:%M:%S'))
    
    # ─────────────────────────────────────────────────────────────
    # 13. Feature importance
    # ─────────────────────────────────────────────────────────────
    importances = model.feature_importances_
    feature_importance_dict = {name: imp for name, imp in zip(feature_cols, importances)}
    sorted_features = sorted(feature_importance_dict.items(), key=lambda x: x[1])
    
    fi_names = [x[0] for x in sorted_features[-15:]]
    fi_values = [float(x[1]) for x in sorted_features[-15:]]
    
    # ─────────────────────────────────────────────────────────────
    # 14. Build return dictionary (core metrics)
    # ─────────────────────────────────────────────────────────────
    
    result = {
        'ohlc': {
            'dates': [dt.strftime('%Y-%m-%d %H:%M:%S') for dt in close_test.index],
            'open': [float(o) if not np.isnan(o) else None for o in open_test],
            'high': [float(h) if not np.isnan(h) else None for h in high_test],
            'low': [float(l) if not np.isnan(l) else None for l in low_test],
            'close': [float(c) if not np.isnan(c) else None for c in close_test]
        },
        'signals': {
            'dates': [dt.strftime('%Y-%m-%d %H:%M:%S') for dt in signal_test.index],
            'values': [float(s) if not np.isnan(s) else None for s in signal_test]
        },
        'bb': {
            'upper': [float(u) if not np.isnan(u) else None for u in bb_upper],
            'mid': [float(m) if not np.isnan(m) else None for m in bb_sma],
            'lower': [float(l) if not np.isnan(l) else None for l in bb_lower]
        },
        'ma': {
            'ma50': [float(m) if not np.isnan(m) else None for m in ma_50],
            'ma100': [float(m) if not np.isnan(m) else None for m in ma_100],
            'ma200': [float(m) if not np.isnan(m) else None for m in ma_200]
        },
        'equity': {
            'dates': [dt.strftime('%Y-%m-%d %H:%M:%S') for dt in close_test.index],
            'strategy': [float(e) for e in strat_equity],
            'bh': [float(e) for e in bh_equity]
        },
        'feature_importance': {
            'names': fi_names,
            'values': fi_values
        },
        'conf_matrix': conf_matrix,
        'conf_hist': {
            'p_pos': [float(p) if not np.isnan(p) else None for p in p_pos],
            'p_neg': [float(p) if not np.isnan(p) else None for p in p_neg]
        },
        'rolling_acc': {
            'dates': rolling_dates,
            'values': rolling_acc_list
        },
        'drawdown': {
            'dates': drawdown_dates,
            'values': drawdown_series
        },
        'ret_dist': ret_dist,
        'ret_dist_long': ret_dist_long,
        'ret_dist_short': ret_dist_short,
        'metrics': {
            'total_ret': float(total_ret),
            'bh_ret': float(bh_ret_total),
            'sharpe_strat': float(sharpe_strat),
            'sharpe_bh': float(sharpe_bh),
            'mdd': float(mdd),
            'n_trades': int(n_trades)
        },
        'split_dt': split_dt,
        'split_idx': int(split_idx),
        'n_train': int(n_train),
        'n_test': int(n_test),
        'feature_cols': feature_cols,
        'custom_figs': []
    }
    
    # ─────────────────────────────────────────────────────────────
    # 15. Build custom figures (SMA and RSI)
    # ─────────────────────────────────────────────────────────────
    
    # SMA Chart
    fig_sma = go.Figure()
    
    fig_sma.add_trace(go.Scatter(
        x=close_test.index,
        y=close_test,
        name='Close',
        line=dict(color='#2962FF', width=1)
    ))
    
    fig_sma.add_trace(go.Scatter(
        x=close_test.index,
        y=df['sma_20'].iloc[split_idx:],
        name='SMA 20',
        line=dict(color='#FF6D00', width=1)
    ))
    
    fig_sma.add_trace(go.Scatter(
        x=close_test.index,
        y=df['sma_50'].iloc[split_idx:],
        name='SMA 50',
        line=dict(color='#00C853', width=1)
    ))
    
    fig_sma.add_trace(go.Scatter(
        x=close_test.index,
        y=df['sma_200'].iloc[split_idx:],
        name='SMA 200',
        line=dict(color='#D50000', width=1)
    ))
    
    fig_sma.update_layout(
        title='SMA (20, 50, 200)',
        xaxis_title='Date',
        yaxis_title='Price',
        template='plotly_dark',
        paper_bgcolor='#131722',
        plot_bgcolor='#131722',
        font_color='#d1d4dc',
        margin=dict(l=40, r=20, t=30, b=30),
        legend=dict(bgcolor='rgba(0,0,0,0)'),
        hovermode='x unified'
    )
    
    result['custom_figs'].append(fig_sma.to_dict())
    
    # RSI Chart
    fig_rsi = go.Figure()
    
    fig_rsi.add_trace(go.Scatter(
        x=close_test.index,
        y=df['rsi_14'].iloc[split_idx:],
        name='RSI 14',
        line=dict(color='#2962FF', width=2)
    ))
    
    fig_rsi.add_hline(y=70, line_dash='dash', line_color='#FF6D00', annotation_text='Overbought (70)')
    fig_rsi.add_hline(y=30, line_dash='dash', line_color='#00C853', annotation_text='Oversold (30)')
    
    fig_rsi.update_layout(
        title='RSI 14',
        xaxis_title='Date',
        yaxis_title='RSI',
        template='plotly_dark',
        paper_bgcolor='#131722',
        plot_bgcolor='#131722',
        font_color='#d1d4dc',
        margin=dict(l=40, r=20, t=30, b=30),
        legend=dict(bgcolor='rgba(0,0,0,0)'),
        yaxis=dict(range=[0, 100]),
        hovermode='x unified'
    )
    
    result['custom_figs'].append(fig_rsi.to_dict())
    
    # ─────────────────────────────────────────────────────────────
    # 16. Register model for prediction tab
    # ─────────────────────────────────────────────────────────────
    if 'register_model' in globals() and register_model is not None:
        register_model(model)
    
    return result


# Run backtest
if __name__ == '__main__':
    result = train_and_backtest()
    print("Backtest complete.")
M
decent!
@malco · 2026-03-22
+6.86%
Return
5.84
Sharpe
1.6%
Max DD
50
Trades
</> View Code
# ╔══════════════════════════════════════════════════════════════╗
# ║                  STRATEGY REQUEST LOG                       ║
# ╚══════════════════════════════════════════════════════════════╝
# Generated        : 2026-03-23 07:21:48
# Model            : Random Forest
# Feature Eng.     : MACD (12,26,9), RSI 14
# Signal / Entry   : RSI oversold/overbought
# Optimization     : —
# Risk Mgmt        : Max 3 trades/day
# Risk Filter      : —
# ══════════════════════════════════════════════════════════════

import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix
import plotly.graph_objects as go

DATA_PATH = "/Users/malco/Desktop/QuantifyMe/data/EURUSD_ticks.csv"

# ============================================================
# SECTION 0 — MODEL WRAPPER
# ============================================================

class ModelWrapper:
    def __init__(self, model, original_classes, n_features=1):
        self._m = model
        self.classes_ = np.array(original_classes)
        self._n_features = n_features

    def predict_proba(self, X):
        return self._m.predict_proba(X)

    def predict(self, X):
        return self._m.predict(X)

    @property
    def feature_importances_(self):
        if hasattr(self._m, 'feature_importances_'):
            return self._m.feature_importances_
        if hasattr(self._m, 'coef_'):
            return np.abs(self._m.coef_).mean(axis=0)
        try:
            imps = [e.feature_importances_ for _, e in self._m.estimators_
                    if hasattr(e, 'feature_importances_')]
            if imps:
                return np.mean(imps, axis=0)
        except Exception:
            pass
        return np.ones(self._n_features)


# ============================================================
# SECTION 1 — FEATURE ENGINEERING
# ============================================================

def feature_engineering(df, close, open_, high, low):
    # MACD (12, 26, 9)
    ema12 = close.ewm(span=12, adjust=False).mean()
    ema26 = close.ewm(span=26, adjust=False).mean()
    macd_line = ema12 - ema26
    macd_signal = macd_line.ewm(span=9, adjust=False).mean()
    macd_hist = macd_line - macd_signal

    df['macd'] = macd_line
    df['macd_signal'] = macd_signal
    df['macd_hist'] = macd_hist

    # RSI 14
    delta = close.diff()
    gain = delta.clip(lower=0)
    loss = -delta.clip(upper=0)
    avg_gain = gain.ewm(com=13, adjust=False).mean()
    avg_loss = loss.ewm(com=13, adjust=False).mean()
    rs = avg_gain / avg_loss.replace(0, np.nan)
    df['rsi'] = 100 - (100 / (1 + rs))

    # RSI overbought/oversold flags
    df['rsi_oversold'] = (df['rsi'] < 30).astype(float)
    df['rsi_overbought'] = (df['rsi'] > 70).astype(float)

    # MACD crossover
    df['macd_cross'] = np.sign(macd_hist)

    # Lagged features to avoid lookahead
    df['rsi_lag1'] = df['rsi'].shift(1)
    df['macd_lag1'] = df['macd'].shift(1)
    df['macd_hist_lag1'] = df['macd_hist'].shift(1)

    return df


# ============================================================
# SECTION 2 — SIGNAL / ENTRY LOGIC
# ============================================================

def generate_signals(model, X, thresh):
    proba = model.predict_proba(X)
    classes = model.classes_

    pos_idx = np.where(classes == 1)[0]
    neg_idx = np.where(classes == -1)[0]

    p_pos = proba[:, pos_idx[0]] if len(pos_idx) > 0 else np.zeros(len(X))
    p_neg = proba[:, neg_idx[0]] if len(neg_idx) > 0 else np.zeros(len(X))

    signal_vals = np.zeros(len(X))
    signal_vals[p_pos >= thresh] = 1.0
    signal_vals[p_neg >= thresh] = -1.0

    signal = pd.Series(signal_vals, index=X.index, dtype=float)
    return signal, p_pos, p_neg


# ============================================================
# SECTION 3 — ML MODEL
# ============================================================

def build_model(X_train, y_train):
    cfg = optimization_config()
    raw = RandomForestClassifier(
        n_estimators=cfg['n_estimators'],
        max_depth=cfg['max_depth'],
        min_samples_leaf=cfg['min_samples_leaf'],
        class_weight=cfg['class_weight'],
        random_state=42,
        n_jobs=-1
    )
    raw.fit(X_train, y_train)
    return ModelWrapper(raw, original_classes=[-1, 0, 1], n_features=X_train.shape[1])


# ============================================================
# SECTION 4 — OPTIMISATION TARGET
# ============================================================

def optimization_config():
    return {
        "objective": "Maximize Sharpe ratio",
        "notes": (
            "Conservative depth and balanced class weights to prevent overfitting "
            "and reduce drawdowns, more trees for stable probability estimates."
        ),
        "n_estimators": 300,
        "max_depth": 6,
        "min_samples_leaf": 20,
        "class_weight": "balanced",
    }


# ============================================================
# SECTION 5 — RISK MANAGEMENT
# ============================================================

def apply_risk(signal, close, pos_size=1.0):
    # Max 3 trades per day
    dates = signal.index.normalize()
    result = signal.copy() * pos_size

    unique_days = dates.unique()
    for day in unique_days:
        day_mask = dates == day
        day_signals = result[day_mask]

        # Count trade entries (signal changes from flat or changes direction)
        trade_count = 0
        prev = 0.0
        for idx in day_signals.index:
            cur = day_signals[idx]
            if cur != 0.0 and cur != prev:
                trade_count += 1
                if trade_count > 3:
                    result[idx] = 0.0
            elif cur == 0.0:
                pass
            prev = result[idx]

    return result


# ============================================================
# SECTION 6 — BACKTEST ENGINE
# ============================================================

def train_and_backtest():
    # Load data
    df_raw = pd.read_csv(DATA_PATH, parse_dates=['Time'])
    df_raw = df_raw.sort_values('Time').set_index('Time')
    df_raw['mid'] = (df_raw['Bid'] + df_raw['Ask']) / 2

    # Resample to 15-minute OHLC
    ohlc = df_raw['mid'].resample('15min').ohlc()
    ohlc = ohlc.dropna()

    close = ohlc['close']
    open_ = ohlc['open']
    high = ohlc['high']
    low = ohlc['low']

    # Feature engineering
    df_feat = ohlc.copy()
    df_feat = feature_engineering(df_feat, close, open_, high, low)
    df_feat = df_feat.bfill().ffill()

    # Target: direction 1 hour ahead (4 bars)
    target = np.sign(close.shift(-4) - close)

    # Drop last 4 rows (no valid target)
    df_feat = df_feat.iloc[:-4]
    target = target.iloc[:-4]
    close_aligned = close.iloc[:-4]
    open_aligned = open_.iloc[:-4]
    high_aligned = high.iloc[:-4]
    low_aligned = low.iloc[:-4]

    feature_cols = ['macd', 'macd_signal', 'macd_hist', 'rsi',
                    'rsi_oversold', 'rsi_overbought', 'macd_cross',
                    'rsi_lag1', 'macd_lag1', 'macd_hist_lag1']

    X = df_feat[feature_cols].copy()
    X = X.bfill().ffill().fillna(0)

    # Train/test split 70/30
    split_idx = int(len(X) * 0.70)
    X_train = X.iloc[:split_idx]
    X_test = X.iloc[split_idx:]
    y_train = target.iloc[:split_idx]
    y_test = target.iloc[split_idx:]
    close_test = close_aligned.iloc[split_idx:]
    close_train = close_aligned.iloc[:split_idx]

    # Label encoding
    enc = LabelEncoder()
    y_train_enc = enc.fit_transform(y_train)
    y_test_enc = enc.transform(y_test)

    # Build model
    model = build_model(X_train, y_train_enc)

    # Generate signals
    thresh = 0.45
    signal_test, p_pos, p_neg = generate_signals(model, X_test, thresh)

    # Apply risk
    signal_test = apply_risk(signal_test, close_test)

    # Backtest returns
    price_ret = close_test.pct_change().shift(-1).fillna(0)
    strat_ret = signal_test * price_ret

    # Equity curve
    capital = 10000.0
    equity_strategy = (1 + strat_ret).cumprod() * capital
    equity_bh = (1 + price_ret).cumprod() * capital

    total_ret = float((equity_strategy.iloc[-1] / capital) - 1.0)
    bh_ret = float((equity_bh.iloc[-1] / capital) - 1.0)

    # Sharpe ratio (annualised, 15-min bars: ~26040 bars/year)
    bars_per_year = 252 * 26 * 4  # approx
    active_mask = signal_test != 0
    ret_series = strat_ret[active_mask]

    if len(ret_series) > 1 and ret_series.std() > 0:
        sharpe_strat = float((ret_series.mean() / ret_series.std()) * np.sqrt(bars_per_year))
    else:
        sharpe_strat = 0.0

    bh_std = price_ret.std()
    if bh_std > 0:
        sharpe_bh = float((price_ret.mean() / bh_std) * np.sqrt(bars_per_year))
    else:
        sharpe_bh = 0.0

    # Max drawdown
    roll_max = equity_strategy.cummax()
    drawdown_series = (equity_strategy - roll_max) / roll_max
    mdd = float(drawdown_series.min())

    # Trade returns
    all_trade_rets = strat_ret[active_mask].tolist()
    long_mask = (signal_test == 1.0) & active_mask
    short_mask = (signal_test == -1.0) & active_mask
    long_rets = strat_ret[long_mask].tolist()
    short_rets = strat_ret[short_mask].tolist()

    n_trades = int(active_mask.sum())

    # Confusion matrix
    y_pred_raw = model.predict(X_test)
    # Map encoded predictions back — model.classes_ is [-1,0,1]
    # y_test_enc and y_pred_raw are in encoded space [0,1,2]
    cm = confusion_matrix(y_test_enc, y_pred_raw, labels=[0, 1, 2])
    conf_matrix = cm.tolist()

    # Rolling accuracy (30-bar window, non-flat signals)
    correct = (y_pred_raw == y_test_enc).astype(float)
    correct_series = pd.Series(correct, index=X_test.index)
    active_enc = pd.Series((signal_test != 0).values, index=X_test.index)
    correct_active = correct_series.where(active_enc)
    rolling_acc_raw = correct_active.rolling(30, min_periods=1).mean()
    rolling_acc_vals = [None if np.isnan(v) else float(v) for v in rolling_acc_raw]

    # Bollinger Bands (20, 2)
    bb_close = close_aligned.iloc[split_idx:]
    bb_mid = bb_close.rolling(20).mean()
    bb_std = bb_close.rolling(20).std()
    bb_upper = bb_mid + 2 * bb_std
    bb_lower = bb_mid - 2 * bb_std

    def _clean_list(s):
        return [None if (v is None or (isinstance(v, float) and (np.isnan(v) or np.isinf(v)))) else float(v)
                for v in s]

    # Moving averages
    ma50 = close_aligned.rolling(50).mean().iloc[split_idx:]
    ma100 = close_aligned.rolling(100).mean().iloc[split_idx:]
    ma200 = close_aligned.rolling(200).mean().iloc[split_idx:]

    # Feature importance (top 15 ascending)
    importances = model.feature_importances_
    fi_pairs = sorted(zip(feature_cols, importances), key=lambda x: x[1])
    fi_pairs = fi_pairs[-15:] if len(fi_pairs) > 15 else fi_pairs
    fi_names = [p[0] for p in fi_pairs]
    fi_vals = [float(p[1]) for p in fi_pairs]

    # OHLC for test period
    test_dates = X_test.index
    ohlc_dates = [str(d) for d in test_dates]
    open_test = open_aligned.iloc[split_idx:]
    high_test = high_aligned.iloc[split_idx:]
    low_test = low_aligned.iloc[split_idx:]

    split_dt = str(X_test.index[0])

    # Custom figure: RSI oversold/overbought
    rsi_test = df_feat['rsi'].iloc[split_idx:]
    rsi_dates = [str(d) for d in rsi_test.index]

    fig_rsi = go.Figure()
    fig_rsi.add_trace(go.Scatter(
        x=rsi_dates,
        y=rsi_test.tolist(),
        mode='lines',
        name='RSI 14',
        line=dict(color='#2962ff', width=1.5)
    ))
    fig_rsi.add_hline(y=70, line_dash='dash', line_color='#ef5350',
                      annotation_text='Overbought (70)', annotation_position='top left')
    fig_rsi.add_hline(y=30, line_dash='dash', line_color='#26a69a',
                      annotation_text='Oversold (30)', annotation_position='bottom left')
    fig_rsi.add_hline(y=50, line_dash='dot', line_color='#888888', line_width=1)

    # Shade overbought/oversold regions
    fig_rsi.add_hrect(y0=70, y1=100, fillcolor='rgba(239,83,80,0.08)', line_width=0)
    fig_rsi.add_hrect(y0=0, y1=30, fillcolor='rgba(38,166,154,0.08)', line_width=0)

    fig_rsi.update_layout(
        title=dict(text='RSI 14 — Oversold / Overbought', font=dict(color='#d1d4dc')),
        paper_bgcolor='#131722',
        plot_bgcolor='#131722',
        font_color='#d1d4dc',
        margin=dict(l=40, r=20, t=30, b=30),
        legend=dict(bgcolor='rgba(0,0,0,0)'),
        xaxis=dict(gridcolor='#2a2e39', showgrid=True),
        yaxis=dict(gridcolor='#2a2e39', showgrid=True, range=[0, 100]),
    )
    custom_figs = [fig_rsi.to_dict()]

    result = {
        "ohlc": {
            "dates": ohlc_dates,
            "open": _clean_list(open_test.tolist()),
            "high": _clean_list(high_test.tolist()),
            "low": _clean_list(low_test.tolist()),
            "close": _clean_list(close_test.tolist()),
        },
        "signals": {
            "dates": [str(d) for d in signal_test.index],
            "values": [float(v) for v in signal_test.tolist()],
        },
        "bb": {
            "upper": _clean_list(bb_upper.tolist()),
            "mid": _clean_list(bb_mid.tolist()),
            "lower": _clean_list(bb_lower.tolist()),
        },
        "ma": {
            "ma50": _clean_list(ma50.tolist()),
            "ma100": _clean_list(ma100.tolist()),
            "ma200": _clean_list(ma200.tolist()),
        },
        "equity": {
            "dates": [str(d) for d in equity_strategy.index],
            "strategy": _clean_list(equity_strategy.tolist()),
            "bh": _clean_list(equity_bh.tolist()),
        },
        "feature_importance": {
            "names": fi_names,
            "values": fi_vals,
        },
        "conf_matrix": conf_matrix,
        "conf_hist": {
            "p_pos": [float(v) for v in p_pos.tolist()],
            "p_neg": [float(v) for v in p_neg.tolist()],
        },
        "rolling_acc": {
            "dates": [str(d) for d in X_test.index],
            "values": rolling_acc_vals,
        },
        "drawdown": {
            "dates": [str(d) for d in drawdown_series.index],
            "values": _clean_list(drawdown_series.tolist()),
        },
        "ret_dist": [float(v) for v in all_trade_rets],
        "ret_dist_long": [float(v) for v in long_rets],
        "ret_dist_short": [float(v) for v in short_rets],
        "metrics": {
            "total_ret": total_ret,
            "bh_ret": bh_ret,
            "sharpe_strat": sharpe_strat,
            "sharpe_bh": sharpe_bh,
            "mdd": mdd,
            "n_trades": n_trades,
        },
        "split_dt": split_dt,
        "split_idx": split_idx,
        "n_train": len(X_train),
        "n_test": len(X_test),
        "custom_figs": custom_figs,
    }

    return result
M
good!
@malco · 2026-03-22
+6.86%
Return
5.84
Sharpe
1.6%
Max DD
50
Trades
</> View Code
# ╔══════════════════════════════════════════════════════════════╗
# ║                  STRATEGY REQUEST LOG                       ║
# ╚══════════════════════════════════════════════════════════════╝
# Generated        : 2026-03-23 05:46:30
# Model            : Gradient Boosting
# Feature Eng.     : MACD (12,26,9)
# Signal / Entry   : MA crossover
# Optimization     : Minimize max drawdown
# Risk Mgmt        : —
# Risk Filter      : —
# ══════════════════════════════════════════════════════════════

import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import confusion_matrix
import plotly.graph_objects as go

DATA_PATH = "/Users/malco/Desktop/QuantifyMe/data/EURUSD_ticks.csv"

# ============================================================
# SECTION 0 — MODEL WRAPPER
# ============================================================

class ModelWrapper:
    def __init__(self, model, original_classes, n_features=1):
        self._m = model
        self.classes_ = np.array(original_classes)
        self._n_features = n_features
    
    def predict_proba(self, X):
        return self._m.predict_proba(X)
    
    def predict(self, X):
        return self._m.predict(X)
    
    @property
    def feature_importances_(self):
        if hasattr(self._m, 'feature_importances_'):
            return self._m.feature_importances_
        if hasattr(self._m, 'coef_'):
            return np.abs(self._m.coef_).mean(axis=0)
        try:
            imps = [e.feature_importances_ for _, e in self._m.estimators_
                    if hasattr(e, 'feature_importances_')]
            if imps:
                return np.mean(imps, axis=0)
        except Exception:
            pass
        return np.ones(self._n_features)

# ============================================================
# SECTION 1 — FEATURE ENGINEERING
# ============================================================

def feature_engineering(df, close, open_, high, low):
    """Add MACD(12,26,9) and technical indicators."""
    # MACD (12, 26, 9)
    ema12 = close.ewm(span=12, adjust=False).mean()
    ema26 = close.ewm(span=26, adjust=False).mean()
    macd = ema12 - ema26
    signal_line = macd.ewm(span=9, adjust=False).mean()
    macd_hist = macd - signal_line
    
    df['macd'] = macd
    df['macd_signal'] = signal_line
    df['macd_hist'] = macd_hist
    
    # RSI (14)
    delta = close.diff()
    gain = (delta.where(delta > 0, 0)).ewm(span=14, adjust=False).mean()
    loss = (-delta.where(delta < 0, 0)).ewm(span=14, adjust=False).mean()
    rs = gain / loss.replace(0, 1e-10)
    df['rsi'] = 100 - (100 / (1 + rs))
    
    # Bollinger Bands (20, 2)
    ma20 = close.rolling(20).mean()
    std20 = close.rolling(20).std()
    df['bb_upper'] = ma20 + 2 * std20
    df['bb_mid'] = ma20
    df['bb_lower'] = ma20 - 2 * std20
    
    # Moving averages
    df['ma50'] = close.rolling(50).mean()
    df['ma100'] = close.rolling(100).mean()
    df['ma200'] = close.rolling(200).mean()
    
    # Price momentum
    df['returns'] = close.pct_change()
    df['roc10'] = (close - close.shift(10)) / close.shift(10)
    
    # Volatility
    df['volatility'] = close.pct_change().rolling(20).std()
    
    # High-Low range
    df['hl_pct'] = (high - low) / close
    
    return df

# ============================================================
# SECTION 2 — SIGNAL / ENTRY LOGIC
# ============================================================

def generate_signals(model, X, thresh):
    """Generate trading signals from model predictions."""
    proba = model.predict_proba(X)
    
    # Extract class probabilities using model.classes_
    class_to_idx = {c: i for i, c in enumerate(model.classes_)}
    
    p_pos = np.zeros(len(X))
    p_neg = np.zeros(len(X))
    
    if 1 in class_to_idx:
        p_pos = proba[:, class_to_idx[1]]
    if -1 in class_to_idx:
        p_neg = proba[:, class_to_idx[-1]]
    
    # Direct model predictions: -1, 0, or 1
    pred = model.predict(X)
    signal = pd.Series(pred.astype(float), index=X.index)
    
    return signal, p_pos, p_neg

# ============================================================
# SECTION 3 — ML MODEL
# ============================================================

def build_model(X_train, y_train):
    """Train Gradient Boosting classifier with optimization config."""
    config = optimization_config()
    
    gb = GradientBoostingClassifier(
        n_estimators=config.get('n_estimators', 100),
        learning_rate=config.get('learning_rate', 0.05),
        max_depth=config.get('max_depth', 5),
        min_samples_leaf=config.get('min_samples_leaf', 20),
        subsample=config.get('subsample', 0.8),
        random_state=42
    )
    
    gb.fit(X_train, y_train)
    
    # Wrap with ModelWrapper; classes are [-1, 0, 1]
    return ModelWrapper(gb, original_classes=[-1, 0, 1], n_features=X_train.shape[1])

# ============================================================
# SECTION 4 — OPTIMISATION TARGET
# ============================================================

def optimization_config():
    """Config optimized to minimize max drawdown."""
    return {
        'objective': 'minimize_max_drawdown',
        'notes': 'Conservative: high min_samples_leaf, low learning_rate, shallow depth',
        'n_estimators': 200,
        'learning_rate': 0.02,
        'max_depth': 4,
        'min_samples_leaf': 30,
        'subsample': 0.75
    }

# ============================================================
# SECTION 5 — RISK MANAGEMENT
# ============================================================

def apply_risk(signal, close, pos_size=1.0):
    """Apply position sizing; simple scaling of signal."""
    return signal * pos_size

# ============================================================
# SECTION 6 — BACKTEST ENGINE
# ============================================================

def train_and_backtest():
    """Full backtest: load data, train model, generate P&L, return metrics."""
    
    # Load and resample
    df_ticks = pd.read_csv(DATA_PATH, parse_dates=['Time'])
    df_ticks.set_index('Time', inplace=True)
    mid = (df_ticks['Bid'] + df_ticks['Ask']) / 2
    
    ohlc = mid.resample('15min').ohlc()
    close = ohlc['close']
    open_ = ohlc['open']
    high = ohlc['high']
    low = ohlc['low']
    
    # Remove NaN bars
    valid = close.notna()
    close = close[valid]
    open_ = open_[valid]
    high = high[valid]
    low = low[valid]
    
    # Feature engineering
    df_feat = pd.DataFrame(index=close.index)
    df_feat = feature_engineering(df_feat, close, open_, high, low)
    df_feat = df_feat.bfill().ffill()
    
    # Target: direction 1 hour ahead (4 bars of 15min)
    target = np.sign(close.shift(-4) - close)
    target = pd.Series(target, index=close.index)
    
    # Remove look-ahead NaNs
    valid_target = target.notna()
    df_feat = df_feat[valid_target]
    target = target[valid_target]
    close = close[valid_target]
    open_ = open_[valid_target]
    high = high[valid_target]
    low = low[valid_target]
    
    # Train/test split (70/30, walk-forward)
    split_idx = int(len(df_feat) * 0.7)
    split_dt = str(df_feat.index[split_idx])
    
    X_train = df_feat.iloc[:split_idx]
    y_train = target.iloc[:split_idx]
    X_test = df_feat.iloc[split_idx:]
    y_test = target.iloc[split_idx:]
    
    close_train = close.iloc[:split_idx]
    close_test = close.iloc[split_idx:]
    
    # Encode labels: [-1, 0, 1] → [0, 1, 2]
    enc = LabelEncoder()
    y_train_enc = enc.fit_transform(y_train)
    y_test_enc = enc.transform(y_test)
    
    # Build model
    model = build_model(X_train, y_train_enc)
    
    # Generate signals on test set
    signal_test, p_pos, p_neg = generate_signals(model, X_test, thresh=0.5)
    signal_test = apply_risk(signal_test, close_test, pos_size=1.0)
    
    # P&L: trade signal at next bar, minus 2e-5 cost per signal change
    ret_test = close_test.pct_change()
    signal_lag = signal_test.shift(1).fillna(0)
    trade_cost = 2e-5 * np.abs(signal_test.diff().fillna(0))
    pnl = signal_lag * ret_test - trade_cost
    pnl = pnl.fillna(0)
    
    # Equity curves
    cum_ret_strat = (1 + pnl).cumprod() - 1
    equity_strat = 10000 * (1 + cum_ret_strat)
    
    # Buy & hold
    bh_rets = ret_test.fillna(0)
    cum_ret_bh = (1 + bh_rets).cumprod() - 1
    equity_bh = 10000 * (1 + cum_ret_bh)
    
    # Metrics: total return (decimal ratio, not percent)
    total_ret = float(cum_ret_strat.iloc[-1]) if len(cum_ret_strat) > 0 else 0.0
    bh_ret = float(cum_ret_bh.iloc[-1]) if len(cum_ret_bh) > 0 else 0.0
    
    # Sharpe ratio (annualized)
    pnl_std = pnl.std()
    if pnl_std > 0:
        sharpe_strat = float((pnl.mean() / pnl_std) * np.sqrt(252 * 24 * 4))
    else:
        sharpe_strat = 0.0
    
    bh_std = bh_rets.std()
    if bh_std > 0:
        sharpe_bh = float((bh_rets.mean() / bh_std) * np.sqrt(252 * 24 * 4))
    else:
        sharpe_bh = 0.0
    
    # Max drawdown (decimal ratio, negative)
    running_max = equity_strat.expanding().max()
    drawdown_series = (equity_strat - running_max) / running_max
    mdd = float(drawdown_series.min()) if len(drawdown_series) > 0 else 0.0
    
    # Trade returns: entry on signal change, exit on next change
    trade_list = []
    trade_long = []
    trade_short = []
    
    entry_idx = None
    entry_sig = 0
    for i in range(1, len(signal_test)):
        if signal_test.iloc[i] != entry_sig:
            if entry_idx is not None:
                exit_ret = (close_test.iloc[i] - close_test.iloc[entry_idx]) / close_test.iloc[entry_idx]
                trade_ret = exit_ret * entry_sig - 2e-5
                trade_list.append(float(trade_ret))
                if entry_sig > 0:
                    trade_long.append(float(trade_ret))
                elif entry_sig < 0:
                    trade_short.append(float(trade_ret))
            entry_idx = i
            entry_sig = signal_test.iloc[i]
    
    # Feature importance: top 15
    imp = model.feature_importances_
    feat_names = list(X_train.columns)
    imp_tuples = [(feat_names[j], float(imp[j])) for j in range(len(imp))]
    imp_tuples.sort(key=lambda x: x[1])
    if len(imp_tuples) > 15:
        imp_tuples = imp_tuples[-15:]
    
    # Confusion matrix
    y_pred = model.predict(X_test)
    class_map = {-1: 0, 0: 1, 1: 2}
    y_test_mapped = np.array([class_map[int(y)] for y in y_test_enc])
    y_pred_mapped = np.array([class_map[int(p)] for p in y_pred])
    cm = confusion_matrix(y_test_mapped, y_pred_mapped, labels=[0, 1, 2])
    conf_mat = cm.tolist()
    
    # Rolling accuracy (30-bar window, test period only)
    rolling_accuracy = []
    for i in range(30, len(y_pred)):
        win = (y_pred_mapped[i-30:i] == y_test_mapped[i-30:i]).sum()
        acc = float(win) / 30.0
        rolling_accuracy.append(acc)
    
    # MA crossover custom figure
    custom_figs = []
    ma_short = close_test.rolling(10).mean()
    ma_long = close_test.rolling(20).mean()
    
    fig = go.Figure()
    fig.add_trace(go.Scatter(
        x=close_test.index, y=close_test, mode='lines',
        name='Close', line=dict(color='#2962FF', width=2)
    ))
    fig.add_trace(go.Scatter(
        x=ma_short.index, y=ma_short, mode='lines',
        name='MA(10)', line=dict(color='#FF6D00', width=1.5)
    ))
    fig.add_trace(go.Scatter(
        x=ma_long.index, y=ma_long, mode='lines',
        name='MA(20)', line=dict(color='#AB47BC', width=1.5)
    ))
    
    fig.update_layout(
        title='MA Crossover (Test Period)',
        xaxis_title='Date', yaxis_title='Price (EURUSD)',
        paper_bgcolor='#131722', plot_bgcolor='#131722',
        font_color='#d1d4dc',
        margin=dict(l=40, r=20, t=30, b=30),
        legend=dict(bgcolor='rgba(0,0,0,0)', x=0.01, y=0.99)
    )
    custom_figs.append(fig.to_dict())
    
    # Helper: convert NaN/Inf to None
    def safe_tolist(arr):
        lst = np.where(np.isnan(arr) | np.isinf(arr), None, arr).tolist()
        return [None if (isinstance(x, float) and (np.isnan(x) or np.isinf(x))) else x for x in lst]
    
    # Build result dict
    result = {
        'ohlc': {
            'dates': [str(d.date()) + ' ' + str(d.time()) for d in close.index],
            'open': safe_tolist(open_.values),
            'high': safe_tolist(high.values),
            'low': safe_tolist(low.values),
            'close': safe_tolist(close.values)
        },
        'signals': {
            'dates': [str(d.date()) + ' ' + str(d.time()) for d in signal_test.index],
            'values': safe_tolist(signal_test.values)
        },
        'bb': {
            'upper': safe_tolist(df_feat['bb_upper'].values),
            'mid': safe_tolist(df_feat['bb_mid'].values),
            'lower': safe_tolist(df_feat['bb_lower'].values)
        },
        'ma': {
            'ma50': safe_tolist(df_feat['ma50'].values),
            'ma100': safe_tolist(df_feat['ma100'].values),
            'ma200': safe_tolist(df_feat['ma200'].values)
        },
        'equity': {
            'dates': [str(d.date()) + ' ' + str(d.time()) for d in equity_strat.index],
            'strategy': safe_tolist(equity_strat.values),
            'bh': safe_tolist(equity_bh.values)
        },
        'feature_importance': {
            'names': [name for name, _ in imp_tuples],
            'values': [val for _, val in imp_tuples]
        },
        'conf_matrix': conf_mat,
        'conf_hist': {
            'p_pos': safe_tolist(p_pos),
            'p_neg': safe_tolist(p_neg)
        },
        'rolling_acc': {
            'dates': [str(d.date()) + ' ' + str(d.time()) for d in close_test.index[30:]],
            'values': rolling_accuracy
        },
        'drawdown': {
            'dates': [str(d.date()) + ' ' + str(d.time()) for d in drawdown_series.index],
            'values': safe_tolist(drawdown_series.values)
        },
        'ret_dist': trade_list,
        'ret_dist_long': trade_long,
        'ret_dist_short': trade_short,
        'metrics': {
            'total_ret': total_ret,
            'bh_ret': bh_ret,
            'sharpe_strat': sharpe_strat,
            'sharpe_bh': sharpe_bh,
            'mdd': mdd,
            'n_trades': len(trade_list)
        },
        'split_dt': split_dt,
        'split_idx': int(split_idx),
        'n_train': int(split_idx),
        'n_test': int(len(X_test)),
        'custom_figs': custom_figs
    }
    
    return result


if __name__ == '__main__':
    res = train_and_backtest()
    import json
    print(json.dumps(res, default=str, indent=2))