Browse AI-generated trading strategies shared by the community. Fork, learn, and build on each other's work.
# ╔══════════════════════════════════════════════════════════════╗
# ║ STRATEGY REQUEST LOG ║
# ╚══════════════════════════════════════════════════════════════╝
# Generated : 2026-04-03 12:31:35
# Model : Random Forest
# Feature Eng. : SMA (20, 50, 200), RSI 14
# Signal / Entry : —
# Optimization : —
# Risk Mgmt : —
# Risk Filter : —
# ══════════════════════════════════════════════════════════════
# ============================================================
# SECTION 0 — IMPORTS & CONSTANTS
# ============================================================
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix
import plotly.graph_objects as go
from model_wrapper import ModelWrapper
DATA_PATH = "/Users/malco/Desktop/QuantifyMe/data/EURUSD_ticks.csv"
START_DATE = "2026-02-24 18:30:00"
END_DATE = "2026-03-26"
VALIDATION_DATE = "2026-03-23 22:15:00"
TRAIN_SPLIT = 0.9165
STARTING_CAPITAL = 10_000
TRADE_COST = 2e-5 # round-trip cost per trade
# ============================================================
# SECTION 2 — FEATURE ENGINEERING
# ============================================================
def feature_engineering(df, close, open_, high, low):
df = df.copy()
# SMA 20, 50, 200
df['sma_20'] = close.rolling(20).mean()
df['sma_50'] = close.rolling(50).mean()
df['sma_200'] = close.rolling(200).mean()
# Price relative to SMAs
df['close_minus_sma20'] = close - df['sma_20']
df['close_minus_sma50'] = close - df['sma_50']
df['close_minus_sma200'] = close - df['sma_200']
# SMA crossovers
df['sma20_minus_sma50'] = df['sma_20'] - df['sma_50']
df['sma50_minus_sma200'] = df['sma_50'] - df['sma_200']
# RSI 14
delta = close.diff()
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
avg_gain = gain.ewm(com=13, min_periods=14).mean()
avg_loss = loss.ewm(com=13, min_periods=14).mean()
rs = avg_gain / avg_loss.replace(0, np.nan)
df['rsi_14'] = 100 - (100 / (1 + rs))
# Returns
df['ret_1'] = close.pct_change(1)
df['ret_4'] = close.pct_change(4)
df['ret_8'] = close.pct_change(8)
# Volatility
df['vol_20'] = df['ret_1'].rolling(20).std()
# High-Low range
df['hl_range'] = high - low
df['hl_range_pct'] = df['hl_range'] / close
return df
# ============================================================
# SECTION 3 — ML MODEL
# ============================================================
def build_model(X_train, y_train):
cfg = optimization_config()
clf = RandomForestClassifier(
n_estimators=cfg['n_estimators'],
max_depth=cfg['max_depth'],
min_samples_leaf=cfg['min_samples_leaf'],
class_weight=cfg['class_weight'],
random_state=42,
n_jobs=-1
)
clf.fit(X_train, y_train)
le = LabelEncoder()
le.fit([-1, 0, 1])
model = ModelWrapper(clf, original_classes=le.classes_, n_features=X_train.shape[1])
return model
# ============================================================
# SECTION 4 — SIGNAL / ENTRY LOGIC
# ============================================================
def generate_signals(model, X, thresh):
proba = model.predict_proba(X)
classes = list(model.classes_)
if 1 in classes:
idx_pos = classes.index(1)
p_pos = proba[:, idx_pos]
else:
p_pos = np.zeros(len(X))
if -1 in classes:
idx_neg = classes.index(-1)
p_neg = proba[:, idx_neg]
else:
p_neg = np.zeros(len(X))
signal_vals = np.zeros(len(X))
signal_vals[p_pos >= thresh] = 1.0
signal_vals[p_neg >= thresh] = -1.0
# When both exceed threshold, pick the stronger one
both = (p_pos >= thresh) & (p_neg >= thresh)
signal_vals[both] = np.where(p_pos[both] >= p_neg[both], 1.0, -1.0)
signal = pd.Series(signal_vals, index=X.index)
return signal, p_pos, p_neg
# ============================================================
# SECTION 5 — OPTIMISATION TARGET
# ============================================================
def optimization_config():
return {
"objective": "Maximize Sharpe ratio",
"notes": (
"Random Forest tuned for Sharpe: balanced class weights to avoid "
"bias, moderate depth to prevent overfitting, high n_estimators for "
"stable probability estimates."
),
"n_estimators": 300,
"max_depth": 8,
"min_samples_leaf": 20,
"class_weight": "balanced",
}
# ============================================================
# SECTION 6 — RISK MANAGEMENT
# ============================================================
def apply_risk(signal, close, pos_size=1.0):
return signal * pos_size
# ============================================================
# SECTION 7 — BACKTEST ENGINE
# ============================================================
def train_and_backtest():
# ------------------------------------------------------------------
# Load data
# ------------------------------------------------------------------
df_raw = pd.read_csv(DATA_PATH, parse_dates=['Time'])
df_raw.set_index('Time', inplace=True)
df_raw.sort_index(inplace=True)
mid = (df_raw['Bid'] + df_raw['Ask']) / 2.0
mid = mid.resample('15min').ohlc()
mid.columns = ['open', 'high', 'low', 'close']
mid.dropna(inplace=True)
if START_DATE:
mid = mid[mid.index >= START_DATE]
if END_DATE:
mid = mid[mid.index <= END_DATE]
close = mid['close']
open_ = mid['open']
high = mid['high']
low = mid['low']
# ------------------------------------------------------------------
# Feature engineering on full dataset
# ------------------------------------------------------------------
df_feat = mid.copy()
df_feat = feature_engineering(df_feat, close, open_, high, low)
# ------------------------------------------------------------------
# Target
# ------------------------------------------------------------------
target = np.sign(close.shift(-4) - close)
mask = target.notna()
df_feat = df_feat[mask]
target = target[mask]
close_full = close[mask]
open_full = open_[mask]
high_full = high[mask]
low_full = low[mask]
# ------------------------------------------------------------------
# Feature columns
# ------------------------------------------------------------------
feature_cols = [
'sma_20', 'sma_50', 'sma_200',
'close_minus_sma20', 'close_minus_sma50', 'close_minus_sma200',
'sma20_minus_sma50', 'sma50_minus_sma200',
'rsi_14',
'ret_1', 'ret_4', 'ret_8',
'vol_20',
'hl_range', 'hl_range_pct'
]
df_feat = df_feat.bfill().ffill()
df_feat.dropna(subset=feature_cols, inplace=True)
valid_idx = df_feat.index
target = target.loc[valid_idx]
close_full = close_full.loc[valid_idx]
open_full = open_full.loc[valid_idx]
high_full = high_full.loc[valid_idx]
low_full = low_full.loc[valid_idx]
X = df_feat[feature_cols]
# ------------------------------------------------------------------
# Train/test split
# ------------------------------------------------------------------
n = len(df_feat)
if VALIDATION_DATE:
split_idx = len(df_feat[df_feat.index <= VALIDATION_DATE])
else:
split_idx = int(n * TRAIN_SPLIT)
split_idx = max(1, min(split_idx, n - 1))
X_train = X.iloc[:split_idx]
X_test = X.iloc[split_idx:]
y_train = target.iloc[:split_idx]
y_test = target.iloc[split_idx:]
close_train = close_full.iloc[:split_idx]
close_test = close_full.iloc[split_idx:]
split_dt = str(df_feat.index[split_idx])
# ------------------------------------------------------------------
# Label encoding
# ------------------------------------------------------------------
enc = LabelEncoder()
enc.fit([-1, 0, 1])
y_train_enc = enc.transform(y_train)
y_test_enc = enc.transform(y_test)
# ------------------------------------------------------------------
# Build model
# ------------------------------------------------------------------
model = build_model(X_train, y_train_enc)
# ------------------------------------------------------------------
# Generate signals on full dataset (train + test)
# ------------------------------------------------------------------
signal_train, p_pos_train, p_neg_train = generate_signals(model, X_train, thresh=0.55)
signal_test, p_pos_test, p_neg_test = generate_signals(model, X_test, thresh=0.55)
signal_test = apply_risk(signal_test, close_test)
signal_full = pd.concat([signal_train, signal_test])
# ------------------------------------------------------------------
# Overlays on full dataset
# ------------------------------------------------------------------
bb_mid = close_full.rolling(20).mean()
bb_std = close_full.rolling(20).std()
bb_upper = bb_mid + 2 * bb_std
bb_lower = bb_mid - 2 * bb_std
ma50 = close_full.rolling(50).mean()
ma100 = close_full.rolling(100).mean()
ma200 = close_full.rolling(200).mean()
def _to_list_or_none(series):
out = []
for v in series:
if pd.isna(v) or np.isinf(v):
out.append(None)
else:
out.append(float(v))
return out
# ------------------------------------------------------------------
# Equity curve — trade-level P&L with costs
# ------------------------------------------------------------------
signals_arr = signal_full.values
close_arr = close_full.values
dates_full = [str(d) for d in close_full.index]
capital = float(STARTING_CAPITAL)
equity_strategy = [capital]
equity_bh_start = close_arr[0]
equity_bh = [capital]
last_dir = None
entry_price = None
ret_dist = []
ret_dist_long = []
ret_dist_short = []
n_trades = 0
position_returns = []
for i in range(len(signals_arr)):
sig = signals_arr[i]
price = close_arr[i]
if sig != 0 and sig != last_dir:
# Close previous trade
if last_dir is not None and entry_price is not None:
raw_ret = last_dir * (price - entry_price) / entry_price
net_ret = raw_ret - TRADE_COST
ret_dist.append(float(net_ret))
if last_dir == 1:
ret_dist_long.append(float(net_ret))
else:
ret_dist_short.append(float(net_ret))
capital *= (1 + net_ret)
# Open new trade
last_dir = sig
entry_price = price
n_trades += 1
equity_strategy.append(float(capital))
bh_ret_val = (price - equity_bh_start) / equity_bh_start
equity_bh.append(float(capital * (1 + bh_ret_val) / 1.0))
# Align equity length with dates
equity_strategy = equity_strategy[1:]
equity_bh_arr = []
for i in range(len(close_arr)):
bh_val = STARTING_CAPITAL * (close_arr[i] / close_arr[0])
equity_bh_arr.append(float(bh_val))
equity_strategy_arr = []
running_cap = float(STARTING_CAPITAL)
last_dir2 = None
entry_price2 = None
for i in range(len(signals_arr)):
sig = signals_arr[i]
price = close_arr[i]
if sig != 0 and sig != last_dir2:
if last_dir2 is not None and entry_price2 is not None:
raw_ret = last_dir2 * (price - entry_price2) / entry_price2
net_ret = raw_ret - TRADE_COST
running_cap *= (1 + net_ret)
last_dir2 = sig
entry_price2 = price
equity_strategy_arr.append(float(running_cap))
# ------------------------------------------------------------------
# Metrics
# ------------------------------------------------------------------
total_ret = (equity_strategy_arr[-1] - STARTING_CAPITAL) / STARTING_CAPITAL if equity_strategy_arr else 0.0
bh_ret = (close_arr[-1] - close_arr[0]) / close_arr[0] if len(close_arr) > 0 else 0.0
# Sharpe on test period
test_signals_arr = signal_test.values
test_close_arr = close_test.values
bar_rets = []
ld = None
ep = None
for i in range(len(test_signals_arr)):
sig = test_signals_arr[i]
price = test_close_arr[i]
if sig != 0:
if ld is not None and ep is not None:
bar_ret = ld * (price - ep) / ep
bar_rets.append(bar_ret)
ld = sig
ep = price
elif ld is not None and ep is not None:
bar_rets.append(0.0)
if len(bar_rets) > 1:
ret_series = pd.Series(bar_rets)
std_val = ret_series.std()
if std_val == 0 or np.isnan(std_val):
sharpe_strat = 0.0
else:
sharpe_strat = float((ret_series.mean() / std_val) * np.sqrt(252 * 26))
else:
sharpe_strat = 0.0
# BH Sharpe
bh_bar_rets = pd.Series(test_close_arr).pct_change().dropna()
if len(bh_bar_rets) > 1 and bh_bar_rets.std() != 0:
sharpe_bh = float((bh_bar_rets.mean() / bh_bar_rets.std()) * np.sqrt(252 * 26))
else:
sharpe_bh = 0.0
# Max drawdown
eq_series = pd.Series(equity_strategy_arr)
roll_max = eq_series.cummax()
dd_series = (eq_series - roll_max) / roll_max
mdd = float(dd_series.min()) if len(dd_series) > 0 else 0.0
# ------------------------------------------------------------------
# Confusion matrix (test set)
# ------------------------------------------------------------------
pred_test = model.predict(X_test)
y_test_arr = np.asarray(y_test)
all_labels = [-1, 0, 1]
try:
cm = confusion_matrix(y_test_arr, pred_test, labels=all_labels).tolist()
except Exception:
cm = [[0, 0, 0], [0, 0, 0], [0, 0, 0]]
# ------------------------------------------------------------------
# Rolling accuracy (test period, 30-bar window, active signals only)
# ------------------------------------------------------------------
active_mask = pred_test != 0
correct = (pred_test == y_test_arr).astype(float)
correct_series = pd.Series(correct, index=X_test.index)
active_series = pd.Series(active_mask.astype(float), index=X_test.index)
roll_correct = correct_series.where(active_series.astype(bool)).rolling(30, min_periods=1).mean()
rolling_acc_dates = [str(d) for d in X_test.index]
rolling_acc_values = []
for v in roll_correct:
if pd.isna(v) or np.isinf(v):
rolling_acc_values.append(None)
else:
rolling_acc_values.append(float(v))
# ------------------------------------------------------------------
# Feature importance (top 15)
# ------------------------------------------------------------------
fi = model.feature_importances_
fi_pairs = sorted(zip(feature_cols, fi), key=lambda x: x[1])[-15:]
fi_names = [p[0] for p in fi_pairs]
fi_values = [float(p[1]) for p in fi_pairs]
# ------------------------------------------------------------------
# Drawdown series
# ------------------------------------------------------------------
dd_values = []
for v in dd_series:
if pd.isna(v) or np.isinf(v):
dd_values.append(None)
else:
dd_values.append(float(v))
# ------------------------------------------------------------------
# Custom figures
# ------------------------------------------------------------------
custom_figs = []
# --- SMA chart ---
fig_sma = go.Figure()
fig_sma.add_trace(go.Scatter(
x=dates_full, y=_to_list_or_none(close_full),
name='Close', line=dict(color='#d1d4dc', width=1)
))
fig_sma.add_trace(go.Scatter(
x=dates_full, y=_to_list_or_none(close_full.rolling(20).mean()),
name='SMA 20', line=dict(color='#f59e0b', width=1.2)
))
fig_sma.add_trace(go.Scatter(
x=dates_full, y=_to_list_or_none(close_full.rolling(50).mean()),
name='SMA 50', line=dict(color='#3b82f6', width=1.2)
))
fig_sma.add_trace(go.Scatter(
x=dates_full, y=_to_list_or_none(close_full.rolling(200).mean()),
name='SMA 200', line=dict(color='#ef4444', width=1.2)
))
fig_sma.update_layout(
title='SMA Overlay (20 / 50 / 200)',
paper_bgcolor="#131722", plot_bgcolor="#131722",
font_color="#d1d4dc",
margin=dict(l=40, r=20, t=30, b=30),
legend=dict(bgcolor="rgba(0,0,0,0)")
)
custom_figs.append(fig_sma.to_dict())
# --- RSI chart ---
rsi_full = df_feat['rsi_14']
fig_rsi = go.Figure()
fig_rsi.add_trace(go.Scatter(
x=dates_full, y=_to_list_or_none(rsi_full),
name='RSI 14', line=dict(color='#a78bfa', width=1.2)
))
fig_rsi.add_hline(y=70, line_dash='dash', line_color='#ef4444', annotation_text='Overbought 70')
fig_rsi.add_hline(y=30, line_dash='dash', line_color='#22c55e', annotation_text='Oversold 30')
fig_rsi.add_hline(y=50, line_dash='dot', line_color='#6b7280')
fig_rsi.update_layout(
title='RSI 14',
paper_bgcolor="#131722", plot_bgcolor="#131722",
font_color="#d1d4dc",
margin=dict(l=40, r=20, t=30, b=30),
legend=dict(bgcolor="rgba(0,0,0,0)"),
yaxis=dict(range=[0, 100])
)
custom_figs.append(fig_rsi.to_dict())
# ------------------------------------------------------------------
# Register model
# ------------------------------------------------------------------
if register_model is not None:
register_model(model)
# ------------------------------------------------------------------
# Build return dict
# ------------------------------------------------------------------
def _clean(lst):
out = []
for v in lst:
if v is None:
out.append(None)
elif isinstance(v, float) and (np.isnan(v) or np.isinf(v)):
out.append(None)
else:
out.append(v)
return out
return {
"ohlc": {
"dates": [str(d) for d in close_full.index],
"open": _to_list_or_none(open_full),
"high": _to_list_or_none(high_full),
"low": _to_list_or_none(low_full),
"close": _to_list_or_none(close_full),
},
"signals": {
"dates": [str(d) for d in signal_full.index],
"values": [float(v) for v in signal_full.values],
},
"bb": {
"upper": _to_list_or_none(bb_upper),
"mid": _to_list_or_none(bb_mid),
"lower": _to_list_or_none(bb_lower),
},
"ma": {
"ma50": _to_list_or_none(ma50),
"ma100": _to_list_or_none(ma100),
"ma200": _to_list_or_none(ma200),
},
"equity": {
"dates": dates_full,
"strategy": _clean(equity_strategy_arr),
"bh": _clean(equity_bh_arr),
},
"feature_importance": {
"names": fi_names,
"values": fi_values,
},
"conf_matrix": cm,
"conf_hist": {
"p_pos": p_pos_test.tolist(),
"p_neg": p_neg_test.tolist(),
},
"rolling_acc": {
"dates": rolling_acc_dates,
"values": rolling_acc_values,
},
"drawdown": {
"dates": dates_full,
"values": _clean(dd_values),
},
"ret_dist": ret_dist,
"ret_dist_long": ret_dist_long,
"ret_dist_short": ret_dist_short,
"metrics": {
"total_ret": float(total_ret),
"bh_ret": float(bh_ret),
"sharpe_strat": float(sharpe_strat),
"sharpe_bh": float(sharpe_bh),
"mdd": float(mdd),
"n_trades": int(n_trades),
},
"split_dt": split_dt,
"split_idx": int(split_idx),
"n_train": int(split_idx),
"n_test": int(n - split_idx),
"feature_cols": feature_cols,
"custom_figs": custom_figs,
}
# ╔══════════════════════════════════════════════════════════════╗
# ║ STRATEGY REQUEST LOG ║
# ╚══════════════════════════════════════════════════════════════╝
# Generated : 2026-04-03 08:22:31
# Model : Random Forest
# Feature Eng. : SMA (20, 50, 200), RSI 14
# Signal / Entry : —
# Optimization : —
# Risk Mgmt : —
# Risk Filter : —
# ══════════════════════════════════════════════════════════════
# ============================================================
# SECTION 0 — IMPORTS & CONSTANTS
# ============================================================
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix
import plotly.graph_objects as go
from model_wrapper import ModelWrapper
DATA_PATH = "/Users/malco/Desktop/QuantifyMe/data/EURUSD_ticks.csv"
START_DATE = "2026-02-24 00:00:00"
END_DATE = "2026-03-26 00:00:00"
VALIDATION_DATE = ""
TRAIN_SPLIT = 0.6820973075106282
STARTING_CAPITAL = 10_000
COST_PER_TRADE = 2e-5
# ============================================================
# SECTION 2 — FEATURE ENGINEERING
# ============================================================
def feature_engineering(df, close, open_, high, low):
df = df.copy()
# SMA 20, 50, 200
df["sma_20"] = close.rolling(20).mean()
df["sma_50"] = close.rolling(50).mean()
df["sma_200"] = close.rolling(200).mean()
# Price relative to SMAs
df["price_vs_sma20"] = close / df["sma_20"] - 1
df["price_vs_sma50"] = close / df["sma_50"] - 1
df["price_vs_sma200"] = close / df["sma_200"] - 1
# SMA crossovers
df["sma20_vs_sma50"] = df["sma_20"] / df["sma_50"] - 1
df["sma50_vs_sma200"] = df["sma_50"] / df["sma_200"] - 1
# RSI 14
delta = close.diff()
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
avg_gain = gain.ewm(com=13, adjust=False).mean()
avg_loss = loss.ewm(com=13, adjust=False).mean()
rs = avg_gain / avg_loss.replace(0, np.nan)
df["rsi_14"] = 100 - (100 / (1 + rs))
# Returns
df["ret_1"] = close.pct_change(1)
df["ret_4"] = close.pct_change(4)
df["ret_12"] = close.pct_change(12)
# Volatility
df["volatility_20"] = close.pct_change().rolling(20).std()
# High-Low range
df["hl_range"] = (high - low) / close
return df
# ============================================================
# SECTION 3 — ML MODEL
# ============================================================
def build_model(X_train, y_train):
cfg = optimization_config()
clf = RandomForestClassifier(
n_estimators = cfg["n_estimators"],
max_depth = cfg["max_depth"],
min_samples_leaf = cfg["min_samples_leaf"],
class_weight = cfg["class_weight"],
random_state = 42,
n_jobs = -1,
)
clf.fit(X_train, y_train)
le = LabelEncoder()
le.fit([-1, 0, 1])
original_classes = le.classes_
wrapper = ModelWrapper(clf, original_classes=original_classes, n_features=X_train.shape[1])
return wrapper
# ============================================================
# SECTION 4 — SIGNAL / ENTRY LOGIC
# ============================================================
def generate_signals(model, X, thresh):
proba = model.predict_proba(X)
classes = list(model.classes_)
if 1 in classes:
idx_pos = classes.index(1)
p_pos = proba[:, idx_pos]
else:
p_pos = np.zeros(len(X))
if -1 in classes:
idx_neg = classes.index(-1)
p_neg = proba[:, idx_neg]
else:
p_neg = np.zeros(len(X))
signal = pd.Series(0.0, index=X.index)
signal[p_pos >= thresh] = 1.0
signal[p_neg >= thresh] = -1.0
# Where both exceed threshold, pick the higher probability
both = (p_pos >= thresh) & (p_neg >= thresh)
if both.any():
signal[both] = np.where(p_pos[both] >= p_neg[both], 1.0, -1.0)
return signal, p_pos, p_neg
# ============================================================
# SECTION 5 — OPTIMISATION TARGET
# ============================================================
def optimization_config():
return {
"objective": "Maximize Sharpe ratio",
"notes": "Balanced class weights to handle directional imbalance; conservative depth to reduce overfitting; more estimators for stability.",
"n_estimators": 300,
"max_depth": 6,
"min_samples_leaf": 20,
"class_weight": "balanced",
"learning_rate": None,
}
# ============================================================
# SECTION 6 — RISK MANAGEMENT
# ============================================================
def apply_risk(signal, close, pos_size=1.0):
return signal * pos_size
# ============================================================
# SECTION 7 — BACKTEST ENGINE
# ============================================================
def train_and_backtest():
# ── Load & resample ──────────────────────────────────────
df_raw = pd.read_csv(DATA_PATH, parse_dates=["Time"])
df_raw = df_raw.sort_values("Time").set_index("Time")
df_raw["mid"] = (df_raw["Bid"] + df_raw["Ask"]) / 2
ohlc = df_raw["mid"].resample("15min").ohlc()
ohlc = ohlc.dropna()
if START_DATE:
ohlc = ohlc[ohlc.index >= START_DATE]
if END_DATE:
ohlc = ohlc[ohlc.index <= END_DATE]
close = ohlc["close"]
open_ = ohlc["open"]
high = ohlc["high"]
low = ohlc["low"]
# ── Feature engineering ───────────────────────────────────
df = pd.DataFrame(index=ohlc.index)
df = feature_engineering(df, close, open_, high, low)
# ── Target ───────────────────────────────────────────────
target = np.sign(close.shift(-4) - close)
mask = target.notna()
df = df[mask]
target = target[mask]
close = close[mask]
open_ = open_[mask]
high = high[mask]
low = low[mask]
# ── Drop NaN rows from features ───────────────────────────
feat_mask = df.notna().all(axis=1)
df = df[feat_mask]
target = target[feat_mask]
close = close[feat_mask]
open_ = open_[feat_mask]
high = high[feat_mask]
low = low[feat_mask]
feature_cols = list(df.columns)
# ── Overlays on full dataset ──────────────────────────────
bb_mid = close.rolling(20).mean()
bb_std = close.rolling(20).std()
bb_upper = bb_mid + 2 * bb_std
bb_lower = bb_mid - 2 * bb_std
ma50 = close.rolling(50).mean()
ma100 = close.rolling(100).mean()
ma200 = close.rolling(200).mean()
# ── Train/test split ──────────────────────────────────────
if VALIDATION_DATE:
split_idx = len(df[df.index <= VALIDATION_DATE])
else:
split_idx = int(len(df) * TRAIN_SPLIT)
split_idx = max(1, min(split_idx, len(df) - 1))
X_train = df.iloc[:split_idx]
X_test = df.iloc[split_idx:]
y_train = target.iloc[:split_idx]
y_test = target.iloc[split_idx:]
close_train = close.iloc[:split_idx]
close_test = close.iloc[split_idx:]
split_dt = str(df.index[split_idx])
# ── Label encoding ────────────────────────────────────────
enc = LabelEncoder()
enc.fit([-1, 0, 1])
y_train_enc = enc.transform(y_train)
y_test_enc = enc.transform(y_test)
# ── Build model ───────────────────────────────────────────
model = build_model(X_train, y_train_enc)
# ── Generate signals on full dataset ─────────────────────
thresh = 0.55
signal_train, p_pos_train, p_neg_train = generate_signals(model, X_train, thresh)
signal_test, p_pos_test, p_neg_test = generate_signals(model, X_test, thresh)
signal_train = apply_risk(signal_train, close_train)
signal_test = apply_risk(signal_test, close_test)
signal_full = pd.concat([signal_train, signal_test])
# ── Confusion matrix ──────────────────────────────────────
pred_test = model.predict(X_test)
cm = confusion_matrix(y_test, pred_test, labels=[-1, 0, 1])
conf_matrix_list = cm.tolist()
# ── Rolling accuracy (test period, 30-bar window) ─────────
active_mask = signal_test != 0
correct = (pred_test == np.asarray(y_test)).astype(float)
correct_series = pd.Series(correct, index=X_test.index)
active_series = pd.Series(active_mask.values, index=X_test.index)
rolling_correct = correct_series[active_mask].rolling(30).mean()
roll_acc = pd.Series(np.nan, index=X_test.index)
roll_acc[active_mask] = rolling_correct
# ── Equity curve ──────────────────────────────────────────
full_close = close
full_signal = signal_full
rets = full_close.pct_change().fillna(0)
# Position is held until signal changes; 0 means hold current
position = full_signal.replace(0, np.nan).ffill().fillna(0)
strategy_rets = position.shift(1).fillna(0) * rets
# Apply transaction costs
trade_changes = position.diff().abs()
strategy_rets = strategy_rets - trade_changes * COST_PER_TRADE
equity_strategy = STARTING_CAPITAL * (1 + strategy_rets).cumprod()
equity_bh = STARTING_CAPITAL * (1 + rets).cumprod()
# ── Trade-level metrics (direction flips only) ────────────
position_arr = position.values
signal_arr = full_signal.values
close_arr = full_close.values
dates_arr = full_close.index
last_dir = None
entry_price = None
entry_idx = None
trades = []
long_trades = []
short_trades = []
for i in range(len(signal_arr)):
sig = signal_arr[i]
if sig == 0:
continue
if sig != last_dir:
if last_dir is not None and entry_price is not None:
raw_ret = last_dir * (close_arr[i] - entry_price) / entry_price
raw_ret -= COST_PER_TRADE
trades.append(raw_ret)
if last_dir == 1:
long_trades.append(raw_ret)
else:
short_trades.append(raw_ret)
last_dir = sig
entry_price = close_arr[i]
entry_idx = i
# Close last open trade
if last_dir is not None and entry_price is not None:
raw_ret = last_dir * (close_arr[-1] - entry_price) / entry_price
raw_ret -= COST_PER_TRADE
trades.append(raw_ret)
if last_dir == 1:
long_trades.append(raw_ret)
else:
short_trades.append(raw_ret)
n_trades = len(trades)
# ── Metrics ───────────────────────────────────────────────
total_ret = float((equity_strategy.iloc[-1] / STARTING_CAPITAL) - 1)
bh_ret = float((equity_bh.iloc[-1] / STARTING_CAPITAL) - 1)
test_strat_rets = strategy_rets.iloc[split_idx:]
if test_strat_rets.std() == 0 or test_strat_rets.empty:
sharpe_strat = 0.0
else:
sharpe_strat = float(test_strat_rets.mean() / test_strat_rets.std() * np.sqrt(252 * 24 * 4))
test_bh_rets = rets.iloc[split_idx:]
if test_bh_rets.std() == 0 or test_bh_rets.empty:
sharpe_bh = 0.0
else:
sharpe_bh = float(test_bh_rets.mean() / test_bh_rets.std() * np.sqrt(252 * 24 * 4))
rolling_max = equity_strategy.cummax()
drawdown = (equity_strategy - rolling_max) / rolling_max
mdd = float(drawdown.min())
# ── Feature importance ────────────────────────────────────
importances = model.feature_importances_
fi_pairs = sorted(zip(feature_cols, importances), key=lambda x: x[1])
fi_pairs = fi_pairs[-15:]
fi_names = [p[0] for p in fi_pairs]
fi_values = [float(p[1]) for p in fi_pairs]
# ── Helper to sanitize lists ──────────────────────────────
def clean(lst):
return [None if (v is None or (isinstance(v, float) and (np.isnan(v) or np.isinf(v)))) else float(v) for v in lst]
def clean_int(lst):
return [None if (v is None or (isinstance(v, float) and (np.isnan(v) or np.isinf(v)))) else int(v) for v in lst]
dates_str = [str(d) for d in full_close.index]
signal_dates = [str(d) for d in full_signal.index]
equity_dates = [str(d) for d in equity_strategy.index]
drawdown_dates = [str(d) for d in drawdown.index]
roll_acc_dates = [str(d) for d in roll_acc.index]
test_dates = [str(d) for d in X_test.index]
# ============================================================
# SECTION 8 — CUSTOM FIGURES
# ============================================================
custom_figs = []
dark = dict(
paper_bgcolor="#131722",
plot_bgcolor="#131722",
font_color="#d1d4dc",
margin=dict(l=40, r=20, t=30, b=30),
legend=dict(bgcolor="rgba(0,0,0,0)"),
)
# — SMA Chart ——————————————————————————————————————————————
fig_sma = go.Figure()
fig_sma.add_trace(go.Scatter(
x=dates_str, y=clean(close.tolist()),
name="Close", line=dict(color="#d1d4dc", width=1)
))
fig_sma.add_trace(go.Scatter(
x=dates_str, y=clean(df["sma_20"].tolist()),
name="SMA 20", line=dict(color="#2196F3", width=1.2)
))
fig_sma.add_trace(go.Scatter(
x=dates_str, y=clean(df["sma_50"].tolist()),
name="SMA 50", line=dict(color="#FF9800", width=1.2)
))
fig_sma.add_trace(go.Scatter(
x=dates_str, y=clean(df["sma_200"].tolist()),
name="SMA 200", line=dict(color="#E91E63", width=1.5)
))
fig_sma.update_layout(title="SMA (20, 50, 200)", **dark)
custom_figs.append(fig_sma.to_dict())
# — RSI Chart ——————————————————————————————————————————————
fig_rsi = go.Figure()
fig_rsi.add_trace(go.Scatter(
x=dates_str, y=clean(df["rsi_14"].tolist()),
name="RSI 14", line=dict(color="#00BCD4", width=1.5)
))
fig_rsi.add_hline(y=70, line_color="#E91E63", line_dash="dash", annotation_text="70")
fig_rsi.add_hline(y=30, line_color="#4CAF50", line_dash="dash", annotation_text="30")
fig_rsi.add_hline(y=50, line_color="#888888", line_dash="dot")
fig_rsi.update_layout(title="RSI 14", yaxis=dict(range=[0, 100]), **dark)
custom_figs.append(fig_rsi.to_dict())
# ── Register model ────────────────────────────────────────
if register_model is not None:
register_model(model)
return {
"ohlc": {
"dates": dates_str,
"open": clean(open_.tolist()),
"high": clean(high.tolist()),
"low": clean(low.tolist()),
"close": clean(close.tolist()),
},
"signals": {
"dates": signal_dates,
"values": clean(signal_full.tolist()),
},
"bb": {
"upper": clean(bb_upper.tolist()),
"mid": clean(bb_mid.tolist()),
"lower": clean(bb_lower.tolist()),
},
"ma": {
"ma50": clean(ma50.tolist()),
"ma100": clean(ma100.tolist()),
"ma200": clean(ma200.tolist()),
},
"equity": {
"dates": equity_dates,
"strategy": clean(equity_strategy.tolist()),
"bh": clean(equity_bh.tolist()),
},
"feature_importance": {
"names": fi_names,
"values": fi_values,
},
"conf_matrix": conf_matrix_list,
"conf_hist": {
"p_pos": clean(p_pos_test.tolist()),
"p_neg": clean(p_neg_test.tolist()),
},
"rolling_acc": {
"dates": roll_acc_dates,
"values": clean(roll_acc.tolist()),
},
"drawdown": {
"dates": drawdown_dates,
"values": clean(drawdown.tolist()),
},
"ret_dist": clean(trades),
"ret_dist_long": clean(long_trades),
"ret_dist_short": clean(short_trades),
"metrics": {
"total_ret": total_ret,
"bh_ret": bh_ret,
"sharpe_strat": sharpe_strat,
"sharpe_bh": sharpe_bh,
"mdd": mdd,
"n_trades": n_trades,
},
"split_dt": split_dt,
"split_idx": split_idx,
"n_train": len(X_train),
"n_test": len(X_test),
"feature_cols": feature_cols,
"custom_figs": custom_figs,
}
# ╔══════════════════════════════════════════════════════════════╗
# ║ STRATEGY REQUEST LOG ║
# ╚══════════════════════════════════════════════════════════════╝
# Generated : 2026-04-02 17:56:59
# Model : Random Forest
# Feature Eng. : SMA (20, 50, 200), RSI 14
# Signal / Entry : —
# Optimization : —
# Risk Mgmt : —
# Risk Filter : —
# ══════════════════════════════════════════════════════════════
# ============================================================
# SECTION 0 — IMPORTS & CONSTANTS
# ============================================================
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix
import plotly.graph_objects as go
from model_wrapper import ModelWrapper
DATA_PATH = "/Users/malco/Desktop/QuantifyMe/data/EURUSD_ticks.csv"
START_DATE = "2026-02-24 08:00:00"
END_DATE = "2026-03-26 00:00:00"
VALIDATION_DATE = ""
TRAIN_SPLIT = 0.6820973075106282
STARTING_CAPITAL = 10_000
ROUND_TRIP_COST = 2e-5
# ============================================================
# SECTION 2 — FEATURE ENGINEERING
# ============================================================
def feature_engineering(df, close, open_, high, low):
# SMA 20, 50, 200
df['sma_20'] = close.rolling(20).mean()
df['sma_50'] = close.rolling(50).mean()
df['sma_200'] = close.rolling(200).mean()
# Price relative to SMAs
df['close_vs_sma20'] = close / df['sma_20'] - 1.0
df['close_vs_sma50'] = close / df['sma_50'] - 1.0
df['close_vs_sma200'] = close / df['sma_200'] - 1.0
# SMA crossover features
df['sma20_vs_sma50'] = df['sma_20'] / df['sma_50'] - 1.0
df['sma50_vs_sma200'] = df['sma_50'] / df['sma_200'] - 1.0
# RSI 14
delta = close.diff()
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
avg_gain = gain.rolling(14).mean()
avg_loss = loss.rolling(14).mean()
rs = avg_gain / avg_loss.replace(0, np.nan)
df['rsi_14'] = 100.0 - (100.0 / (1.0 + rs))
# Additional useful features
df['log_return'] = np.log(close / close.shift(1))
df['hl_spread'] = (high - low) / close
df['close_vs_open'] = (close - open_) / open_
return df
# ============================================================
# SECTION 3 — ML MODEL
# ============================================================
def build_model(X_train, y_train):
cfg = optimization_config()
clf = RandomForestClassifier(
n_estimators=cfg['n_estimators'],
max_depth=cfg['max_depth'],
min_samples_leaf=cfg['min_samples_leaf'],
class_weight=cfg['class_weight'],
random_state=42,
n_jobs=-1
)
clf.fit(X_train, y_train)
le = LabelEncoder()
le.fit([-1, 0, 1])
original_classes = le.classes_
return ModelWrapper(clf, original_classes=original_classes, n_features=X_train.shape[1])
# ============================================================
# SECTION 4 — SIGNAL / ENTRY LOGIC
# ============================================================
def generate_signals(model, X, thresh):
proba = model.predict_proba(X)
classes = list(model.classes_)
if 1 in classes:
idx_pos = classes.index(1)
p_pos = proba[:, idx_pos]
else:
p_pos = np.zeros(len(X))
if -1 in classes:
idx_neg = classes.index(-1)
p_neg = proba[:, idx_neg]
else:
p_neg = np.zeros(len(X))
signal_values = np.zeros(len(X))
signal_values[p_pos > thresh] = 1.0
signal_values[p_neg > thresh] = -1.0
# If both exceed thresh, pick the one with higher probability
both = (p_pos > thresh) & (p_neg > thresh)
signal_values[both] = np.where(p_pos[both] >= p_neg[both], 1.0, -1.0)
signal = pd.Series(signal_values, index=X.index)
return signal, p_pos, p_neg
# ============================================================
# SECTION 5 — OPTIMISATION TARGET
# ============================================================
def optimization_config():
return {
"objective": "Maximize Sharpe ratio",
"notes": (
"Random Forest tuned for Sharpe: balanced class weights to handle "
"class imbalance, conservative depth to avoid overfitting, "
"more estimators for stable probability estimates."
),
"n_estimators": 300,
"max_depth": 6,
"min_samples_leaf": 20,
"class_weight": "balanced",
}
# ============================================================
# SECTION 6 — RISK MANAGEMENT
# ============================================================
def apply_risk(signal, close, pos_size=1.0):
return signal * pos_size
# ============================================================
# SECTION 7 — BACKTEST ENGINE
# ============================================================
def train_and_backtest():
# ── Load & resample ──────────────────────────────────────
raw = pd.read_csv(DATA_PATH, parse_dates=['Time'])
raw = raw.sort_values('Time').set_index('Time')
raw['mid'] = (raw['Bid'] + raw['Ask']) / 2.0
ohlc_full = raw['mid'].resample('15min').ohlc()
ohlc_full = ohlc_full.dropna()
if START_DATE:
ohlc_full = ohlc_full[ohlc_full.index >= START_DATE]
if END_DATE:
ohlc_full = ohlc_full[ohlc_full.index <= END_DATE]
close = ohlc_full['close']
open_ = ohlc_full['open']
high = ohlc_full['high']
low = ohlc_full['low']
# ── Feature engineering ──────────────────────────────────
df = ohlc_full.copy()
df = feature_engineering(df, close, open_, high, low)
# ── Target ───────────────────────────────────────────────
target = np.sign(close.shift(-4) - close)
mask = target.notna()
df = df[mask]
target = target[mask]
# Re-extract aligned series after mask
close_aligned = df['close']
open_aligned = df['open']
high_aligned = df['high']
low_aligned = df['low']
# ── Feature columns ──────────────────────────────────────
feature_cols = [
'sma_20', 'sma_50', 'sma_200',
'close_vs_sma20', 'close_vs_sma50', 'close_vs_sma200',
'sma20_vs_sma50', 'sma50_vs_sma200',
'rsi_14',
'log_return', 'hl_spread', 'close_vs_open'
]
df_features = df[feature_cols].copy()
df_features = df_features.bfill().ffill().dropna()
target = target.loc[df_features.index]
close_aligned = close_aligned.loc[df_features.index]
# ── Train/test split ─────────────────────────────────────
if VALIDATION_DATE:
split_idx = len(df_features[df_features.index <= VALIDATION_DATE])
else:
split_idx = int(len(df_features) * TRAIN_SPLIT)
X_train = df_features.iloc[:split_idx]
X_test = df_features.iloc[split_idx:]
y_train = target.iloc[:split_idx]
y_test = target.iloc[split_idx:]
close_train = close_aligned.iloc[:split_idx]
close_test = close_aligned.iloc[split_idx:]
split_dt = str(df_features.index[split_idx]) if split_idx < len(df_features) else str(df_features.index[-1])
# ── Label encoding ───────────────────────────────────────
enc = LabelEncoder()
enc.fit([-1, 0, 1])
y_train_enc = enc.transform(y_train)
y_test_enc = enc.transform(y_test)
# ── Build model ──────────────────────────────────────────
model = build_model(X_train, y_train_enc)
# ── Generate signals (train + test) ──────────────────────
thresh = 0.55
signal_train, p_pos_train, p_neg_train = generate_signals(model, X_train, thresh)
signal_test, p_pos_test, p_neg_test = generate_signals(model, X_test, thresh)
signal_train = apply_risk(signal_train, close_train)
signal_test = apply_risk(signal_test, close_test)
signal_full = pd.concat([signal_train, signal_test])
# ── Confusion matrix (test only) ─────────────────────────
pred_test = model.predict(X_test)
y_test_arr = np.asarray(y_test)
cm = confusion_matrix(y_test_arr, pred_test, labels=[-1, 0, 1])
conf_matrix = cm.tolist()
# ── Equity curve & metrics (test period only) ────────────
returns_test = close_test.pct_change().fillna(0.0)
# Build trade returns by tracking direction flips
last_dir = None
entry_price = None
trades = []
long_trades = []
short_trades = []
equity = STARTING_CAPITAL
equity_curve = []
strategy_rets = []
sig_arr = signal_test.values
close_arr = close_test.values
ret_arr = returns_test.values
for i in range(len(sig_arr)):
s = sig_arr[i]
bar_ret = 0.0
if last_dir is not None and last_dir != 0.0:
bar_ret = last_dir * ret_arr[i]
# Check for direction flip
if s != 0.0 and s != last_dir:
if last_dir is not None and last_dir != 0.0 and entry_price is not None:
# Close previous trade
trade_ret = last_dir * (close_arr[i] / entry_price - 1.0) - ROUND_TRIP_COST
trades.append(trade_ret)
if last_dir > 0:
long_trades.append(trade_ret)
else:
short_trades.append(trade_ret)
entry_price = close_arr[i]
last_dir = s
strategy_rets.append(bar_ret)
equity_curve.append(equity * (1.0 + bar_ret))
equity = equity_curve[-1]
# Close final open trade
if last_dir is not None and last_dir != 0.0 and entry_price is not None and len(close_arr) > 0:
trade_ret = last_dir * (close_arr[-1] / entry_price - 1.0) - ROUND_TRIP_COST
trades.append(trade_ret)
if last_dir > 0:
long_trades.append(trade_ret)
else:
short_trades.append(trade_ret)
# Prepend starting capital
equity_vals = [STARTING_CAPITAL] + equity_curve
equity_dates_full = [str(close_test.index[0])] + [str(d) for d in close_test.index]
# Buy-and-hold equity
bh_rets = returns_test.values
bh_equity = [STARTING_CAPITAL]
for r in bh_rets:
bh_equity.append(bh_equity[-1] * (1.0 + r))
# Metrics
strategy_rets_arr = np.array(strategy_rets)
total_ret = (equity_vals[-1] - STARTING_CAPITAL) / STARTING_CAPITAL
bh_ret = (bh_equity[-1] - STARTING_CAPITAL) / STARTING_CAPITAL
# Sharpe (annualised, 15-min bars → ~26,280 bars/year)
bars_per_year = 26280.0
if len(strategy_rets_arr) > 1 and strategy_rets_arr.std() > 0:
sharpe_strat = float(np.sqrt(bars_per_year) * strategy_rets_arr.mean() / strategy_rets_arr.std())
else:
sharpe_strat = 0.0
bh_rets_arr = np.array(bh_rets)
if len(bh_rets_arr) > 1 and bh_rets_arr.std() > 0:
sharpe_bh = float(np.sqrt(bars_per_year) * bh_rets_arr.mean() / bh_rets_arr.std())
else:
sharpe_bh = 0.0
# Max drawdown
eq_arr = np.array(equity_vals)
running_max = np.maximum.accumulate(eq_arr)
dd_arr = (eq_arr - running_max) / running_max
mdd = float(dd_arr.min())
n_trades = len(trades)
# ── Rolling accuracy (test, 30-bar window, non-flat only) ─
active_mask = signal_test.values != 0.0
correct = (signal_test.values == y_test_arr).astype(float)
correct_series = pd.Series(correct, index=signal_test.index)
active_series = pd.Series(active_mask.astype(float), index=signal_test.index)
roll_correct = correct_series.where(active_series == 1).rolling(30, min_periods=1).mean()
rolling_acc_vals = []
for v in roll_correct.values:
if np.isnan(v) or np.isinf(v):
rolling_acc_vals.append(None)
else:
rolling_acc_vals.append(float(v))
# ── Bollinger Bands & MAs (full dataset) ─────────────────
close_full = close_aligned # full aligned close
bb_mid = close_full.rolling(20).mean()
bb_std = close_full.rolling(20).std()
bb_upper = bb_mid + 2.0 * bb_std
bb_lower = bb_mid - 2.0 * bb_std
ma50 = close_full.rolling(50).mean()
ma100 = close_full.rolling(100).mean()
ma200 = close_full.rolling(200).mean()
def _series_to_list(s):
out = []
for v in s.values:
if v is None or (isinstance(v, float) and (np.isnan(v) or np.isinf(v))):
out.append(None)
else:
out.append(float(v))
return out
full_dates = [str(d) for d in close_full.index]
ohlc_open = _series_to_list(open_aligned)
ohlc_high = _series_to_list(high_aligned.loc[close_full.index] if hasattr(high_aligned, 'loc') else high_aligned)
ohlc_low = _series_to_list(low_aligned.loc[close_full.index] if hasattr(low_aligned, 'loc') else low_aligned)
ohlc_close = _series_to_list(close_full)
# ── Feature importance ───────────────────────────────────
fi = model.feature_importances_
fi_pairs = sorted(zip(feature_cols, fi), key=lambda x: x[1])
fi_pairs = fi_pairs[-15:] # top 15 ascending
fi_names = [p[0] for p in fi_pairs]
fi_values = [float(p[1]) for p in fi_pairs]
# ── OHLC aligned to full dataset ─────────────────────────
ohlc_high_full = high_aligned
ohlc_low_full = low_aligned
# ── Drawdown series (test period, padded to full) ─────────
dd_full = np.full(len(close_full), 0.0)
test_start_loc = split_idx
# dd_arr has len = len(equity_vals) = len(close_test)+1
# align to test window
if len(dd_arr) - 1 == len(close_test):
dd_full[test_start_loc:test_start_loc + len(close_test)] = dd_arr[1:]
drawdown_vals = []
for v in dd_full:
if np.isnan(v) or np.isinf(v):
drawdown_vals.append(None)
else:
drawdown_vals.append(float(v))
# ── Equity dates — full dataset aligned ──────────────────
# For equity, fill train period with flat capital, test period with curve
eq_full_strategy = [float(STARTING_CAPITAL)] * len(close_full)
eq_full_bh = [float(STARTING_CAPITAL)] * len(close_full)
# BH for full period
close_full_arr = close_full.values
for i in range(1, len(close_full_arr)):
r = (close_full_arr[i] - close_full_arr[i-1]) / close_full_arr[i-1] if close_full_arr[i-1] != 0 else 0.0
eq_full_bh[i] = eq_full_bh[i-1] * (1.0 + r)
# Strategy equity: flat in train, then use computed curve for test
if len(equity_curve) > 0:
for i, idx_loc in enumerate(range(test_start_loc, min(test_start_loc + len(equity_curve), len(close_full)))):
eq_full_strategy[idx_loc] = float(equity_curve[i])
# Forward fill remaining if any
last_val = eq_full_strategy[test_start_loc + len(equity_curve) - 1] if len(equity_curve) > 0 else STARTING_CAPITAL
for idx_loc in range(test_start_loc + len(equity_curve), len(close_full)):
eq_full_strategy[idx_loc] = last_val
# ── Signals full ─────────────────────────────────────────
signal_full_vals = []
for v in signal_full.values:
if np.isnan(v) or np.isinf(v):
signal_full_vals.append(0.0)
else:
signal_full_vals.append(float(v))
# ── conf_hist ────────────────────────────────────────────
p_pos_list = [float(v) for v in p_pos_test.tolist()]
p_neg_list = [float(v) for v in p_neg_test.tolist()]
# ── ret_dist ─────────────────────────────────────────────
ret_dist = [float(v) for v in trades]
ret_dist_long = [float(v) for v in long_trades]
ret_dist_short = [float(v) for v in short_trades]
# ── SECTION 8 — CUSTOM FIGURES ───────────────────────────
custom_figs = []
# Figure 1: SMA overlay (20, 50, 200) on price
fig_sma = go.Figure()
fig_sma.add_trace(go.Scatter(
x=full_dates, y=ohlc_close,
mode='lines', name='Close',
line=dict(color='#d1d4dc', width=1)
))
fig_sma.add_trace(go.Scatter(
x=full_dates, y=_series_to_list(ma50 if 'ma50' in dir() else close_full.rolling(20).mean()),
mode='lines', name='SMA 20',
line=dict(color='#f7c948', width=1.5)
))
sma20_full = close_full.rolling(20).mean()
sma50_full = close_full.rolling(50).mean()
sma200_full = close_full.rolling(200).mean()
fig_sma.add_trace(go.Scatter(
x=full_dates, y=_series_to_list(sma20_full),
mode='lines', name='SMA 20',
line=dict(color='#f7c948', width=1.5)
))
fig_sma.add_trace(go.Scatter(
x=full_dates, y=_series_to_list(sma50_full),
mode='lines', name='SMA 50',
line=dict(color='#2196F3', width=1.5)
))
fig_sma.add_trace(go.Scatter(
x=full_dates, y=_series_to_list(sma200_full),
mode='lines', name='SMA 200',
line=dict(color='#E91E63', width=1.5)
))
fig_sma.update_layout(
title='Price with SMA 20 / 50 / 200',
paper_bgcolor="#131722", plot_bgcolor="#131722",
font_color="#d1d4dc",
margin=dict(l=40, r=20, t=30, b=30),
legend=dict(bgcolor="rgba(0,0,0,0)"),
xaxis=dict(gridcolor="#2a2e39"),
yaxis=dict(gridcolor="#2a2e39")
)
# Remove duplicate SMA 20 trace (index 1 was placeholder)
fig_sma.data = (fig_sma.data[0],) + fig_sma.data[2:]
custom_figs.append(fig_sma.to_dict())
# Figure 2: RSI 14
rsi_full = df_features['rsi_14']
fig_rsi = go.Figure()
fig_rsi.add_trace(go.Scatter(
x=full_dates, y=_series_to_list(rsi_full),
mode='lines', name='RSI 14',
line=dict(color='#9c27b0', width=1.5)
))
fig_rsi.add_hline(y=70, line_dash='dash', line_color='#ef5350', annotation_text='Overbought 70')
fig_rsi.add_hline(y=30, line_dash='dash', line_color='#26a69a', annotation_text='Oversold 30')
fig_rsi.update_layout(
title='RSI 14',
paper_bgcolor="#131722", plot_bgcolor="#131722",
font_color="#d1d4dc",
margin=dict(l=40, r=20, t=30, b=30),
legend=dict(bgcolor="rgba(0,0,0,0)"),
xaxis=dict(gridcolor="#2a2e39"),
yaxis=dict(gridcolor="#2a2e39", range=[0, 100])
)
custom_figs.append(fig_rsi.to_dict())
# ── Register model ────────────────────────────────────────
if register_model is not None:
register_model(model)
# ── Build return dict ─────────────────────────────────────
return {
"ohlc": {
"dates": full_dates,
"open": ohlc_open,
"high": _series_to_list(ohlc_high_full),
"low": _series_to_list(ohlc_low_full),
"close": ohlc_close,
},
"signals": {
"dates": full_dates,
"values": signal_full_vals,
},
"bb": {
"upper": _series_to_list(bb_upper),
"mid": _series_to_list(bb_mid),
"lower": _series_to_list(bb_lower),
},
"ma": {
"ma50": _series_to_list(ma50),
"ma100": _series_to_list(ma100),
"ma200": _series_to_list(ma200),
},
"equity": {
"dates": full_dates,
"strategy": eq_full_strategy,
"bh": eq_full_bh,
},
"feature_importance": {
"names": fi_names,
"values": fi_values,
},
"conf_matrix": conf_matrix,
"conf_hist": {
"p_pos": p_pos_list,
"p_neg": p_neg_list,
},
"rolling_acc": {
"dates": [str(d) for d in signal_test.index],
"values": rolling_acc_vals,
},
"drawdown": {
"dates": full_dates,
"values": drawdown_vals,
},
"ret_dist": ret_dist,
"ret_dist_long": ret_dist_long,
"ret_dist_short": ret_dist_short,
"metrics": {
"total_ret": float(total_ret),
"bh_ret": float(bh_ret),
"sharpe_strat": float(sharpe_strat),
"sharpe_bh": float(sharpe_bh),
"mdd": float(mdd),
"n_trades": int(n_trades),
},
"split_dt": split_dt,
"split_idx": int(split_idx),
"n_train": int(len(X_train)),
"n_test": int(len(X_test)),
"feature_cols": feature_cols,
"custom_figs": custom_figs,
}
# ╔══════════════════════════════════════════════════════════════╗
# ║ STRATEGY REQUEST LOG ║
# ╚══════════════════════════════════════════════════════════════╝
# Generated : 2026-04-02 14:41:40
# Model : Random Forest
# Feature Eng. : SMA (20, 50, 200), RSI 14
# Signal / Entry : —
# Optimization : —
# Risk Mgmt : —
# Risk Filter : —
# ══════════════════════════════════════════════════════════════
# ============================================================
# SECTION 0 — IMPORTS & CONSTANTS
# ============================================================
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix
import plotly.graph_objects as go
from model_wrapper import ModelWrapper
DATA_PATH = "/Users/malco/Desktop/QuantifyMe/data/EURUSD_ticks.csv"
START_DATE = "2026-03-03"
END_DATE = "2026-04-02"
CAPITAL = 10_000.0
COST_RT = 2e-5 # round-trip transaction cost per trade
# ============================================================
# SECTION 2 — FEATURE ENGINEERING
# ============================================================
def feature_engineering(df, close, open_, high, low):
# SMA 20, 50, 200
df["sma_20"] = close.rolling(20).mean()
df["sma_50"] = close.rolling(50).mean()
df["sma_200"] = close.rolling(200).mean()
# Price relative to SMAs
df["close_sma20_ratio"] = close / df["sma_20"] - 1.0
df["close_sma50_ratio"] = close / df["sma_50"] - 1.0
df["close_sma200_ratio"] = close / df["sma_200"] - 1.0
# SMA crossover signals
df["sma20_50_diff"] = df["sma_20"] - df["sma_50"]
df["sma50_200_diff"] = df["sma_50"] - df["sma_200"]
# RSI 14
delta = close.diff()
gain = delta.clip(lower=0)
loss = (-delta).clip(lower=0)
avg_gain = gain.ewm(com=13, min_periods=14).mean()
avg_loss = loss.ewm(com=13, min_periods=14).mean()
rs = avg_gain / avg_loss.replace(0, np.nan)
df["rsi_14"] = 100.0 - (100.0 / (1.0 + rs))
# Returns
df["ret_1"] = close.pct_change(1)
df["ret_4"] = close.pct_change(4)
df["ret_8"] = close.pct_change(8)
# Volatility
df["vol_20"] = df["ret_1"].rolling(20).std()
# High-Low range normalised
df["hl_range"] = (high - low) / close
return df
# ============================================================
# SECTION 3 — ML MODEL
# ============================================================
def build_model(X_train, y_train):
cfg = optimization_config()
clf = RandomForestClassifier(
n_estimators = cfg["n_estimators"],
max_depth = cfg["max_depth"],
min_samples_leaf = cfg["min_samples_leaf"],
class_weight = cfg["class_weight"],
random_state = 42,
n_jobs = -1,
)
clf.fit(X_train, y_train)
le = LabelEncoder()
le.fit([-1, 0, 1])
wrapper = ModelWrapper(clf, original_classes=le.classes_, n_features=X_train.shape[1])
return wrapper
# ============================================================
# SECTION 4 — SIGNAL / ENTRY LOGIC
# ============================================================
def generate_signals(model, X, thresh):
proba = model.predict_proba(X) # shape (n, n_classes)
classes = list(model.classes_)
# Extract p_pos and p_neg safely
if 1 in classes:
p_pos = proba[:, classes.index(1)]
else:
p_pos = np.zeros(len(X))
if -1 in classes:
p_neg = proba[:, classes.index(-1)]
else:
p_neg = np.zeros(len(X))
# Assign signal based on threshold
signal_vals = np.zeros(len(X))
signal_vals[p_pos >= thresh] = 1.0
signal_vals[p_neg >= thresh] = -1.0
# If both exceed threshold, pick the more confident one
both = (p_pos >= thresh) & (p_neg >= thresh)
signal_vals[both] = np.where(p_pos[both] >= p_neg[both], 1.0, -1.0)
signal = pd.Series(signal_vals, index=X.index, dtype=float)
return signal, p_pos.astype(float), p_neg.astype(float)
# ============================================================
# SECTION 5 — OPTIMISATION TARGET
# ============================================================
def optimization_config():
return {
"objective": "Maximize Sharpe ratio",
"notes": "Balanced class weights to avoid majority-class bias; "
"conservative depth to reduce overfitting; "
"more estimators for stability.",
"n_estimators": 300,
"max_depth": 8,
"min_samples_leaf": 20,
"class_weight": "balanced",
}
# ============================================================
# SECTION 6 — RISK MANAGEMENT
# ============================================================
def apply_risk(signal, close, pos_size=1.0):
return signal * pos_size
# ============================================================
# SECTION 7 — BACKTEST ENGINE
# ============================================================
def train_and_backtest():
# ------------------------------------------------------------------
# Load & prepare data
# ------------------------------------------------------------------
raw = pd.read_csv(DATA_PATH, parse_dates=["Time"])
raw = raw.sort_values("Time").set_index("Time")
mid = (raw["Bid"] + raw["Ask"]) / 2.0
ohlc = mid.resample("15min").ohlc()
ohlc.columns = ["open", "high", "low", "close"]
ohlc = ohlc.dropna()
if START_DATE:
ohlc = ohlc[ohlc.index >= START_DATE]
if END_DATE:
ohlc = ohlc[ohlc.index <= END_DATE]
close = ohlc["close"]
open_ = ohlc["open"]
high = ohlc["high"]
low = ohlc["low"]
df = ohlc.copy()
# ------------------------------------------------------------------
# Feature engineering
# ------------------------------------------------------------------
df = feature_engineering(df, close, open_, high, low)
# ------------------------------------------------------------------
# Target
# ------------------------------------------------------------------
target = np.sign(close.shift(-4) - close)
mask = target.notna()
df = df[mask]
target = target[mask]
close = df["close"]
open_ = df["open"]
high = df["high"]
low = df["low"]
# ------------------------------------------------------------------
# Feature columns
# ------------------------------------------------------------------
feature_cols = [
"sma_20", "sma_50", "sma_200",
"close_sma20_ratio", "close_sma50_ratio", "close_sma200_ratio",
"sma20_50_diff", "sma50_200_diff",
"rsi_14",
"ret_1", "ret_4", "ret_8",
"vol_20", "hl_range",
]
df_feat = df[feature_cols].copy().bfill().ffill()
feat_mask = df_feat.notna().all(axis=1)
df_feat = df_feat[feat_mask]
target = target[feat_mask]
close = close[feat_mask]
open_ = open_[feat_mask]
high = high[feat_mask]
low = low[feat_mask]
df = df[feat_mask]
# ------------------------------------------------------------------
# Train / test split (70/30 walk-forward)
# ------------------------------------------------------------------
n_total = len(df_feat)
split_idx = int(n_total * 0.70)
X_train = df_feat.iloc[:split_idx]
X_test = df_feat.iloc[split_idx:]
y_train = target.iloc[:split_idx]
y_test = target.iloc[split_idx:]
close_test = close.iloc[split_idx:]
split_dt = str(df_feat.index[split_idx])
n_train = split_idx
n_test = n_total - split_idx
# ------------------------------------------------------------------
# Label encoding
# ------------------------------------------------------------------
enc = LabelEncoder()
enc.fit([-1, 0, 1])
y_train_enc = enc.transform(y_train)
y_test_enc = enc.transform(y_test)
# ------------------------------------------------------------------
# Build model
# ------------------------------------------------------------------
model = build_model(X_train, y_train_enc)
# ------------------------------------------------------------------
# Signals
# ------------------------------------------------------------------
THRESH = 0.55
signal_test, p_pos, p_neg = generate_signals(model, X_test, THRESH)
signal_test = apply_risk(signal_test, close_test)
# ------------------------------------------------------------------
# Confusion matrix
# ------------------------------------------------------------------
pred_test = model.predict(X_test) # already decoded: -1, 0, 1
labels = [-1, 0, 1]
cm = confusion_matrix(y_test, pred_test, labels=labels).tolist()
# ------------------------------------------------------------------
# Equity curve & trade metrics
# ------------------------------------------------------------------
close_arr = close_test.values
signal_arr = signal_test.values
n_bars = len(close_arr)
equity_strategy = [CAPITAL]
equity_bh = [CAPITAL]
ret_all = []
ret_long = []
ret_short = []
last_dir = None
entry_price = None
entry_capital = CAPITAL
cap_strat = CAPITAL
cap_bh = CAPITAL
bh_entry = close_arr[0] if n_bars > 0 else 1.0
for i in range(1, n_bars):
bar_ret_bh = (close_arr[i] - close_arr[i - 1]) / close_arr[i - 1]
cap_bh = cap_bh * (1.0 + bar_ret_bh)
equity_bh.append(cap_bh)
sig = signal_arr[i]
# Detect direction flip → close previous trade, open new
if sig != 0 and sig != last_dir:
# Close previous trade
if last_dir is not None and entry_price is not None:
trade_ret = last_dir * (close_arr[i] - entry_price) / entry_price - COST_RT
cap_strat = entry_capital * (1.0 + trade_ret)
ret_all.append(trade_ret)
if last_dir > 0:
ret_long.append(trade_ret)
else:
ret_short.append(trade_ret)
# Open new trade
entry_price = close_arr[i]
entry_capital = cap_strat
last_dir = sig
equity_strategy.append(cap_strat)
# Close last open trade at final bar
if last_dir is not None and entry_price is not None and n_bars > 1:
final_ret = last_dir * (close_arr[-1] - entry_price) / entry_price - COST_RT
cap_strat = entry_capital * (1.0 + final_ret)
ret_all.append(final_ret)
if last_dir > 0:
ret_long.append(final_ret)
else:
ret_short.append(final_ret)
# update last equity point
equity_strategy[-1] = cap_strat
eq_strat_arr = np.array(equity_strategy)
eq_bh_arr = np.array(equity_bh)
total_ret = (eq_strat_arr[-1] - CAPITAL) / CAPITAL if n_bars > 0 else 0.0
bh_ret = (eq_bh_arr[-1] - CAPITAL) / CAPITAL if n_bars > 0 else 0.0
# Sharpe (annualised, 15-min bars → 26240 bars/year approx)
BARS_PER_YEAR = 26240.0
ret_series = pd.Series(np.diff(eq_strat_arr) / eq_strat_arr[:-1]) if len(eq_strat_arr) > 1 else pd.Series([], dtype=float)
if len(ret_series) > 0 and ret_series.std() > 0:
sharpe_strat = float(ret_series.mean() / ret_series.std() * np.sqrt(BARS_PER_YEAR))
else:
sharpe_strat = 0.0
bh_ret_series = pd.Series(np.diff(eq_bh_arr) / eq_bh_arr[:-1]) if len(eq_bh_arr) > 1 else pd.Series([], dtype=float)
if len(bh_ret_series) > 0 and bh_ret_series.std() > 0:
sharpe_bh = float(bh_ret_series.mean() / bh_ret_series.std() * np.sqrt(BARS_PER_YEAR))
else:
sharpe_bh = 0.0
# Max drawdown
running_max = np.maximum.accumulate(eq_strat_arr)
dd_arr = (eq_strat_arr - running_max) / running_max
mdd = float(dd_arr.min()) if len(dd_arr) > 0 else 0.0
n_trades = len(ret_all)
# ------------------------------------------------------------------
# Rolling accuracy (30-bar window, active signals only)
# ------------------------------------------------------------------
pred_series = pd.Series(pred_test, index=X_test.index)
target_test = y_test
correct = (pred_series == target_test).astype(float)
active_mask = pred_series != 0
rolling_acc_vals = correct.where(active_mask).rolling(30, min_periods=1).mean()
rolling_acc_vals = rolling_acc_vals.where(active_mask)
roll_dates = [str(d) for d in rolling_acc_vals.index]
roll_vals = [None if np.isnan(v) else float(v) for v in rolling_acc_vals.values]
# ------------------------------------------------------------------
# Drawdown series
# ------------------------------------------------------------------
dd_dates = [str(d) for d in close_test.index[:len(dd_arr)]]
dd_values = [float(v) if np.isfinite(v) else None for v in dd_arr]
# ------------------------------------------------------------------
# Bollinger Bands & MAs (full test period for display)
# ------------------------------------------------------------------
close_full = close
bb_mid = close_full.rolling(20).mean()
bb_std = close_full.rolling(20).std()
bb_upper = bb_mid + 2.0 * bb_std
bb_lower = bb_mid - 2.0 * bb_std
ma50 = close_full.rolling(50).mean()
ma100 = close_full.rolling(100).mean()
ma200 = close_full.rolling(200).mean()
# Slice to test period for display
test_index = X_test.index
def _slice(s):
return s.reindex(test_index).bfill().ffill()
bb_upper_t = _slice(bb_upper)
bb_mid_t = _slice(bb_mid)
bb_lower_t = _slice(bb_lower)
ma50_t = _slice(ma50)
ma100_t = _slice(ma100)
ma200_t = _slice(ma200)
# ------------------------------------------------------------------
# Feature importance (top 15 ascending)
# ------------------------------------------------------------------
fi_vals = model.feature_importances_
fi_pairs = sorted(zip(feature_cols, fi_vals.tolist()), key=lambda x: x[1])[-15:]
fi_names = [p[0] for p in fi_pairs]
fi_values = [float(p[1]) for p in fi_pairs]
# ------------------------------------------------------------------
# OHLC for return dict (test period)
# ------------------------------------------------------------------
ohlc_test = df.loc[test_index]
def _clean_list(arr):
return [None if (v is None or (isinstance(v, float) and not np.isfinite(v))) else float(v) for v in arr]
# ------------------------------------------------------------------
# SECTION 8 — CUSTOM FIGURES
# ------------------------------------------------------------------
custom_figs = []
# Figure 1: SMA overlay (20, 50, 200)
fig_sma = go.Figure()
fig_sma.add_trace(go.Scatter(
x=list(test_index.astype(str)),
y=_clean_list(close_test.values),
name="Close", line=dict(color="#d1d4dc", width=1)
))
fig_sma.add_trace(go.Scatter(
x=list(test_index.astype(str)),
y=_clean_list(ma50_t.values),
name="SMA 50", line=dict(color="#2196F3", width=1.2)
))
# SMA 20 from feature columns
sma20_t = _slice(df["sma_20"])
fig_sma.add_trace(go.Scatter(
x=list(test_index.astype(str)),
y=_clean_list(sma20_t.values),
name="SMA 20", line=dict(color="#FF9800", width=1.2)
))
fig_sma.add_trace(go.Scatter(
x=list(test_index.astype(str)),
y=_clean_list(ma200_t.values),
name="SMA 200", line=dict(color="#E040FB", width=1.2)
))
fig_sma.update_layout(
title="SMA Overlay (Test Period)",
paper_bgcolor="#131722", plot_bgcolor="#131722",
font_color="#d1d4dc",
margin=dict(l=40, r=20, t=30, b=30),
legend=dict(bgcolor="rgba(0,0,0,0)"),
xaxis_title="Date", yaxis_title="Price",
)
custom_figs.append(fig_sma.to_dict())
# Figure 2: RSI 14
rsi_t = _slice(df["rsi_14"])
fig_rsi = go.Figure()
fig_rsi.add_trace(go.Scatter(
x=list(test_index.astype(str)),
y=_clean_list(rsi_t.values),
name="RSI 14", line=dict(color="#26C6DA", width=1.2)
))
fig_rsi.add_hline(y=70, line_dash="dash", line_color="#FF5252", annotation_text="Overbought 70")
fig_rsi.add_hline(y=30, line_dash="dash", line_color="#69F0AE", annotation_text="Oversold 30")
fig_rsi.update_layout(
title="RSI 14 (Test Period)",
paper_bgcolor="#131722", plot_bgcolor="#131722",
font_color="#d1d4dc",
margin=dict(l=40, r=20, t=30, b=30),
legend=dict(bgcolor="rgba(0,0,0,0)"),
xaxis_title="Date", yaxis_title="RSI",
yaxis=dict(range=[0, 100]),
)
custom_figs.append(fig_rsi.to_dict())
# ------------------------------------------------------------------
# Register model
# ------------------------------------------------------------------
if register_model is not None:
register_model(model)
# ------------------------------------------------------------------
# Build return dict
# ------------------------------------------------------------------
equity_dates = [str(d) for d in close_test.index[:len(equity_strategy)]]
result = {
"ohlc": {
"dates": [str(d) for d in ohlc_test.index],
"open": _clean_list(ohlc_test["open"].values),
"high": _clean_list(ohlc_test["high"].values),
"low": _clean_list(ohlc_test["low"].values),
"close": _clean_list(ohlc_test["close"].values),
},
"signals": {
"dates": [str(d) for d in signal_test.index],
"values": _clean_list(signal_test.values),
},
"bb": {
"upper": _clean_list(bb_upper_t.values),
"mid": _clean_list(bb_mid_t.values),
"lower": _clean_list(bb_lower_t.values),
},
"ma": {
"ma50": _clean_list(ma50_t.values),
"ma100": _clean_list(ma100_t.values),
"ma200": _clean_list(ma200_t.values),
},
"equity": {
"dates": equity_dates,
"strategy": _clean_list(eq_strat_arr.tolist()),
"bh": _clean_list(eq_bh_arr.tolist()),
},
"feature_importance": {
"names": fi_names,
"values": fi_values,
},
"conf_matrix": cm,
"conf_hist": {
"p_pos": p_pos.tolist(),
"p_neg": p_neg.tolist(),
},
"rolling_acc": {
"dates": roll_dates,
"values": roll_vals,
},
"drawdown": {
"dates": dd_dates,
"values": dd_values,
},
"ret_dist": [float(r) for r in ret_all],
"ret_dist_long": [float(r) for r in ret_long],
"ret_dist_short": [float(r) for r in ret_short],
"metrics": {
"total_ret": float(total_ret) if np.isfinite(total_ret) else 0.0,
"bh_ret": float(bh_ret) if np.isfinite(bh_ret) else 0.0,
"sharpe_strat": float(sharpe_strat) if np.isfinite(sharpe_strat) else 0.0,
"sharpe_bh": float(sharpe_bh) if np.isfinite(sharpe_bh) else 0.0,
"mdd": float(mdd) if np.isfinite(mdd) else 0.0,
"n_trades": int(n_trades),
},
"split_dt": split_dt,
"split_idx": int(split_idx),
"n_train": int(n_train),
"n_test": int(n_test),
"feature_cols": feature_cols,
"custom_figs": custom_figs,
}
return result
# ╔══════════════════════════════════════════════════════════════╗
# ║ STRATEGY REQUEST LOG ║
# ╚══════════════════════════════════════════════════════════════╝
# Generated : 2026-04-02 14:13:23
# Model : Random Forest
# Feature Eng. : SMA (20, 50, 200), RSI 14
# Signal / Entry : —
# Optimization : —
# Risk Mgmt : —
# Risk Filter : —
# ══════════════════════════════════════════════════════════════
# ============================================================
# SECTION 0 — IMPORTS & CONSTANTS
# ============================================================
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix
import plotly.graph_objects as go
from model_wrapper import ModelWrapper
DATA_PATH = "/Users/malco/Desktop/QuantifyMe/data/EURUSD_ticks.csv"
START_DATE = "2026-03-03"
END_DATE = "2026-03-28"
STARTING_CAPITAL = 10_000
TRADE_COST = 2e-5
# ============================================================
# SECTION 2 — FEATURE ENGINEERING
# ============================================================
def feature_engineering(df, close, open_, high, low):
# SMA 20, 50, 200
df['sma_20'] = close.rolling(20).mean()
df['sma_50'] = close.rolling(50).mean()
df['sma_200'] = close.rolling(200).mean()
# Price relative to SMAs
df['close_sma20_ratio'] = close / df['sma_20'] - 1.0
df['close_sma50_ratio'] = close / df['sma_50'] - 1.0
df['close_sma200_ratio'] = close / df['sma_200'] - 1.0
# RSI 14
delta = close.diff()
gain = delta.clip(lower=0)
loss = (-delta).clip(lower=0)
avg_gain = gain.ewm(com=13, min_periods=14).mean()
avg_loss = loss.ewm(com=13, min_periods=14).mean()
rs = avg_gain / avg_loss.replace(0, np.nan)
df['rsi_14'] = 100 - (100 / (1 + rs))
# Additional derived features (no lookahead)
df['returns_1'] = close.pct_change(1)
df['returns_4'] = close.pct_change(4)
df['hl_range'] = (high - low) / close
df['oc_range'] = (close - open_) / close
return df
# ============================================================
# SECTION 3 — ML MODEL
# ============================================================
def build_model(X_train, y_train):
cfg = optimization_config()
clf = RandomForestClassifier(
n_estimators = cfg['n_estimators'],
max_depth = cfg['max_depth'],
min_samples_leaf = cfg['min_samples_leaf'],
class_weight = cfg['class_weight'],
random_state = 42,
n_jobs = -1
)
clf.fit(X_train, y_train)
enc = LabelEncoder()
enc.fit([-1, 0, 1])
return ModelWrapper(clf, original_classes=enc.classes_, n_features=X_train.shape[1])
# ============================================================
# SECTION 4 — SIGNAL / ENTRY LOGIC
# ============================================================
def generate_signals(model, X, thresh):
proba = model.predict_proba(X) # shape (n, n_classes)
classes = list(model.classes_)
n = len(X)
if 1 in classes:
p_pos = proba[:, classes.index(1)]
else:
p_pos = np.zeros(n)
if -1 in classes:
p_neg = proba[:, classes.index(-1)]
else:
p_neg = np.zeros(n)
signal_vals = np.zeros(n)
signal_vals[p_pos >= thresh] = 1.0
signal_vals[p_neg >= thresh] = -1.0
# Where both cross threshold, pick the higher probability
both = (p_pos >= thresh) & (p_neg >= thresh)
signal_vals[both] = np.where(p_pos[both] >= p_neg[both], 1.0, -1.0)
signal = pd.Series(signal_vals, index=X.index, dtype=float)
return signal, p_pos, p_neg
# ============================================================
# SECTION 5 — OPTIMISATION TARGET
# ============================================================
def optimization_config():
return {
"objective": "Maximize Sharpe ratio",
"notes": "Balanced class weights, moderate depth, many estimators for stable Sharpe",
"n_estimators": 300,
"max_depth": 8,
"min_samples_leaf": 20,
"class_weight": "balanced",
"learning_rate": None
}
# ============================================================
# SECTION 6 — RISK MANAGEMENT
# ============================================================
def apply_risk(signal, close, pos_size=1.0):
return signal * pos_size
# ============================================================
# SECTION 7 — BACKTEST ENGINE
# ============================================================
def train_and_backtest():
# ------------------------------------------------------------------
# 1. Load & Resample
# ------------------------------------------------------------------
raw = pd.read_csv(DATA_PATH, parse_dates=['Time'])
raw.set_index('Time', inplace=True)
raw['mid'] = (raw['Bid'] + raw['Ask']) / 2.0
ohlc = raw['mid'].resample('15min').ohlc()
ohlc.dropna(inplace=True)
if START_DATE:
ohlc = ohlc[ohlc.index >= START_DATE]
if END_DATE:
ohlc = ohlc[ohlc.index <= END_DATE]
close = ohlc['close']
open_ = ohlc['open']
high = ohlc['high']
low = ohlc['low']
df = ohlc.copy()
# ------------------------------------------------------------------
# 2. Feature Engineering
# ------------------------------------------------------------------
df = feature_engineering(df, close, open_, high, low)
# ------------------------------------------------------------------
# 3. Target — direction 1 hour ahead (4 bars)
# ------------------------------------------------------------------
target = np.sign(close.shift(-4) - close)
mask = target.notna()
df = df[mask]
target = target[mask]
close = close[mask]
open_ = open_[mask]
high = high[mask]
low = low[mask]
feature_cols = [
'sma_20', 'sma_50', 'sma_200',
'close_sma20_ratio', 'close_sma50_ratio', 'close_sma200_ratio',
'rsi_14',
'returns_1', 'returns_4',
'hl_range', 'oc_range'
]
df_feat = df[feature_cols].copy()
df_feat = df_feat.bfill().ffill()
df_feat.dropna(inplace=True)
# Align target and price to clean feature index
target = target.reindex(df_feat.index)
close = close.reindex(df_feat.index)
open_ = open_.reindex(df_feat.index)
high = high.reindex(df_feat.index)
low = low.reindex(df_feat.index)
df = df.reindex(df_feat.index)
# ------------------------------------------------------------------
# 4. Train / Test Split (70/30, no shuffle)
# ------------------------------------------------------------------
n_total = len(df_feat)
split_idx = int(n_total * 0.70)
X_train = df_feat.iloc[:split_idx]
X_test = df_feat.iloc[split_idx:]
y_train = target.iloc[:split_idx]
y_test = target.iloc[split_idx:]
close_test = close.iloc[split_idx:]
split_dt = str(df_feat.index[split_idx])
n_train = len(X_train)
n_test = len(X_test)
# ------------------------------------------------------------------
# 5. Encode labels
# ------------------------------------------------------------------
enc = LabelEncoder()
enc.fit([-1, 0, 1])
y_train_enc = enc.transform(y_train)
# ------------------------------------------------------------------
# 6. Train Model
# ------------------------------------------------------------------
model = build_model(X_train, y_train_enc)
# ------------------------------------------------------------------
# 7. Generate Signals
# ------------------------------------------------------------------
thresh = 0.55
signal_test, p_pos, p_neg = generate_signals(model, X_test, thresh)
signal_test = apply_risk(signal_test, close_test)
# ------------------------------------------------------------------
# 8. Confusion Matrix
# ------------------------------------------------------------------
pred_test = model.predict(X_test)
y_test_arr = np.asarray(y_test)
pred_arr = np.asarray(pred_test)
cm = confusion_matrix(y_test_arr, pred_arr, labels=[-1, 0, 1])
conf_matrix = cm.tolist()
# ------------------------------------------------------------------
# 9. Equity Curve & Metrics
# ------------------------------------------------------------------
close_arr = close_test.values
signal_arr = signal_test.values
# Trade returns: position change triggers a trade
position = np.zeros(len(signal_arr))
bar_ret = np.diff(close_arr, prepend=close_arr[0]) / close_arr
bar_ret[0] = 0.0
strat_ret = signal_arr * bar_ret
# Subtract round-trip cost on position changes
pos_changes = np.diff(signal_arr, prepend=signal_arr[0])
trade_mask = pos_changes != 0
strat_ret -= np.abs(pos_changes) * (TRADE_COST / 2) # half on entry, scaled
equity_strat = STARTING_CAPITAL * np.cumprod(1 + strat_ret)
bh_ret_arr = close_arr / close_arr[0]
equity_bh = STARTING_CAPITAL * bh_ret_arr
# Collect trade returns for ret_dist
ret_dist = []
ret_dist_long = []
ret_dist_short = []
current_pos = 0.0
entry_price = None
entry_idx = None
for i in range(len(signal_arr)):
new_pos = signal_arr[i]
if new_pos != current_pos:
if current_pos != 0.0 and entry_price is not None:
exit_price = close_arr[i]
raw_r = (exit_price - entry_price) / entry_price * current_pos
r = raw_r - TRADE_COST
ret_dist.append(float(r))
if current_pos > 0:
ret_dist_long.append(float(r))
else:
ret_dist_short.append(float(r))
if new_pos != 0.0:
entry_price = close_arr[i]
entry_idx = i
else:
entry_price = None
entry_idx = None
current_pos = new_pos
# Close last open position
if current_pos != 0.0 and entry_price is not None:
exit_price = close_arr[-1]
raw_r = (exit_price - entry_price) / entry_price * current_pos
r = raw_r - TRADE_COST
ret_dist.append(float(r))
if current_pos > 0:
ret_dist_long.append(float(r))
else:
ret_dist_short.append(float(r))
n_trades = len(ret_dist)
# Total return
total_ret = float((equity_strat[-1] - STARTING_CAPITAL) / STARTING_CAPITAL)
bh_total = float((equity_bh[-1] - STARTING_CAPITAL) / STARTING_CAPITAL)
# Sharpe (annualised, 15min bars → 26040 bars/year)
BARS_PER_YEAR = 26040
active_mask = signal_arr != 0
if active_mask.sum() > 1 and strat_ret[active_mask].std() > 0:
sharpe_strat = float(
np.mean(strat_ret[active_mask]) /
np.std(strat_ret[active_mask]) *
np.sqrt(BARS_PER_YEAR)
)
else:
sharpe_strat = 0.0
if bar_ret.std() > 0:
sharpe_bh = float(np.mean(bar_ret) / np.std(bar_ret) * np.sqrt(BARS_PER_YEAR))
else:
sharpe_bh = 0.0
# Max Drawdown
running_max = np.maximum.accumulate(equity_strat)
drawdown_arr = (equity_strat - running_max) / running_max
mdd = float(drawdown_arr.min())
# ------------------------------------------------------------------
# 10. Rolling Accuracy (30-bar, non-flat signals, test period)
# ------------------------------------------------------------------
correct = (pred_arr == y_test_arr).astype(float)
active_sig = signal_arr != 0
roll_vals = []
dates_test = X_test.index
for i in range(len(correct)):
start_i = max(0, i - 29)
window_active = active_sig[start_i:i+1]
if window_active.sum() == 0:
roll_vals.append(None)
else:
roll_vals.append(float(correct[start_i:i+1][window_active].mean()))
# ------------------------------------------------------------------
# 11. Bollinger Bands & MAs on full test set
# ------------------------------------------------------------------
close_full = close
bb_mid_s = close_full.rolling(20).mean()
bb_std_s = close_full.rolling(20).std()
bb_upper_s = bb_mid_s + 2 * bb_std_s
bb_lower_s = bb_mid_s - 2 * bb_std_s
ma50_s = close_full.rolling(50).mean()
ma100_s = close_full.rolling(100).mean()
ma200_s = close_full.rolling(200).mean()
def _clean(series):
vals = series.reindex(close_test.index).tolist()
return [None if (v is None or (isinstance(v, float) and (np.isnan(v) or np.isinf(v)))) else float(v) for v in vals]
# ------------------------------------------------------------------
# 12. Feature Importance (top 15 ascending)
# ------------------------------------------------------------------
fi_vals = model.feature_importances_
fi_pairs = sorted(zip(feature_cols, fi_vals), key=lambda x: x[1])[-15:]
fi_names = [p[0] for p in fi_pairs]
fi_values = [float(p[1]) for p in fi_pairs]
# ------------------------------------------------------------------
# 13. OHLC for test period
# ------------------------------------------------------------------
test_dates = [str(d) for d in X_test.index]
ohlc_test = df.reindex(X_test.index)
def _list_clean(arr):
return [None if (v is None or (isinstance(v, float) and (np.isnan(v) or np.isinf(v)))) else float(v)
for v in (arr.tolist() if hasattr(arr, 'tolist') else list(arr))]
# ------------------------------------------------------------------
# SECTION 8 — CUSTOM FIGURES
# ------------------------------------------------------------------
custom_figs = []
# --- Chart 1: SMA 20 / 50 / 200 overlay on close (test period) ---
sma20_clean = _clean(df['sma_20'])
sma50_clean = _clean(df['sma_50'])
sma200_clean = _clean(df['sma_200'])
close_clean = _clean(close)
fig_sma = go.Figure()
fig_sma.add_trace(go.Scatter(
x=test_dates, y=close_clean,
name='Close', line=dict(color='#d1d4dc', width=1)
))
fig_sma.add_trace(go.Scatter(
x=test_dates, y=sma20_clean,
name='SMA 20', line=dict(color='#f7c948', width=1.2)
))
fig_sma.add_trace(go.Scatter(
x=test_dates, y=sma50_clean,
name='SMA 50', line=dict(color='#26a69a', width=1.2)
))
fig_sma.add_trace(go.Scatter(
x=test_dates, y=sma200_clean,
name='SMA 200', line=dict(color='#ef5350', width=1.2)
))
fig_sma.update_layout(
title='SMA 20 / 50 / 200 — Test Period',
paper_bgcolor="#131722", plot_bgcolor="#131722",
font_color="#d1d4dc",
margin=dict(l=40, r=20, t=30, b=30),
legend=dict(bgcolor="rgba(0,0,0,0)")
)
custom_figs.append(fig_sma.to_dict())
# --- Chart 2: RSI 14 (test period) ---
rsi_clean = _clean(df['rsi_14'])
fig_rsi = go.Figure()
fig_rsi.add_trace(go.Scatter(
x=test_dates, y=rsi_clean,
name='RSI 14', line=dict(color='#ab47bc', width=1.5)
))
fig_rsi.add_hline(y=70, line_dash='dash', line_color='#ef5350', opacity=0.6)
fig_rsi.add_hline(y=30, line_dash='dash', line_color='#26a69a', opacity=0.6)
fig_rsi.add_hline(y=50, line_dash='dot', line_color='#d1d4dc', opacity=0.3)
fig_rsi.update_layout(
title='RSI 14 — Test Period',
yaxis=dict(range=[0, 100]),
paper_bgcolor="#131722", plot_bgcolor="#131722",
font_color="#d1d4dc",
margin=dict(l=40, r=20, t=30, b=30),
legend=dict(bgcolor="rgba(0,0,0,0)")
)
custom_figs.append(fig_rsi.to_dict())
# ------------------------------------------------------------------
# 14. Assemble result dict
# ------------------------------------------------------------------
equity_strat_list = [float(v) for v in equity_strat.tolist()]
equity_bh_list = [float(v) for v in equity_bh.tolist()]
drawdown_list = [float(v) for v in drawdown_arr.tolist()]
def _sanitize_list(lst):
out = []
for v in lst:
if v is None:
out.append(None)
elif isinstance(v, float) and (np.isnan(v) or np.isinf(v)):
out.append(None)
else:
out.append(v)
return out
result = {
"ohlc": {
"dates": test_dates,
"open": _list_clean(ohlc_test['open']),
"high": _list_clean(ohlc_test['high']),
"low": _list_clean(ohlc_test['low']),
"close": _list_clean(ohlc_test['close']),
},
"signals": {
"dates": test_dates,
"values": [float(v) for v in signal_test.values.tolist()]
},
"bb": {
"upper": _sanitize_list(_clean(bb_upper_s)),
"mid": _sanitize_list(_clean(bb_mid_s)),
"lower": _sanitize_list(_clean(bb_lower_s)),
},
"ma": {
"ma50": _sanitize_list(_clean(ma50_s)),
"ma100": _sanitize_list(_clean(ma100_s)),
"ma200": _sanitize_list(_clean(ma200_s)),
},
"equity": {
"dates": test_dates,
"strategy": _sanitize_list(equity_strat_list),
"bh": _sanitize_list(equity_bh_list),
},
"feature_importance": {
"names": fi_names,
"values": fi_values,
},
"conf_matrix": conf_matrix,
"conf_hist": {
"p_pos": p_pos.tolist(),
"p_neg": p_neg.tolist(),
},
"rolling_acc": {
"dates": test_dates,
"values": _sanitize_list(roll_vals),
},
"drawdown": {
"dates": test_dates,
"values": _sanitize_list(drawdown_list),
},
"ret_dist": ret_dist,
"ret_dist_long": ret_dist_long,
"ret_dist_short": ret_dist_short,
"metrics": {
"total_ret": total_ret,
"bh_ret": bh_total,
"sharpe_strat": sharpe_strat,
"sharpe_bh": sharpe_bh,
"mdd": mdd,
"n_trades": n_trades,
},
"split_dt": split_dt,
"split_idx": int(split_idx),
"n_train": int(n_train),
"n_test": int(n_test),
"feature_cols": feature_cols,
"custom_figs": custom_figs,
}
if register_model is not None:
register_model(model)
return result
# ╔══════════════════════════════════════════════════════════════╗
# ║ STRATEGY REQUEST LOG ║
# ╚══════════════════════════════════════════════════════════════╝
# Generated : 2026-04-02 13:16:50
# Model : Random Forest
# Feature Eng. : SMA (20, 50, 200), RSI 14
# Signal / Entry : —
# Optimization : —
# Risk Mgmt : —
# Risk Filter : —
# ══════════════════════════════════════════════════════════════
# ============================================================
# SECTION 0 — IMPORTS & CONSTANTS
# ============================================================
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from model_wrapper import ModelWrapper
import plotly.graph_objects as go
DATA_PATH = "/Users/malco/Desktop/QuantifyMe/data/EURUSD_ticks.csv"
# ============================================================
# SECTION 1 — MODEL WRAPPER
# ============================================================
# ModelWrapper is imported from model_wrapper module
# ============================================================
# SECTION 2 — FEATURE ENGINEERING
# ============================================================
def feature_engineering(df, close, open_, high, low) -> pd.DataFrame:
"""
Add SMA (20, 50, 200) and RSI 14 features.
"""
# SMA features
df['sma_20'] = close.rolling(window=20).mean()
df['sma_50'] = close.rolling(window=50).mean()
df['sma_200'] = close.rolling(window=200).mean()
# RSI 14 feature
delta = close.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss.replace(0, 1e-10)
df['rsi_14'] = 100 - (100 / (1 + rs))
return df
# ============================================================
# SECTION 3 — ML MODEL
# ============================================================
def build_model(X_train, y_train) -> ModelWrapper:
"""
Build a Random Forest classifier for 3-class ([-1, 0, 1]) prediction.
Hyperparameters are read from optimization_config() to serve the Sharpe objective.
"""
config = optimization_config()
clf = RandomForestClassifier(
n_estimators=config.get('n_estimators', 200),
max_depth=config.get('max_depth', 10),
min_samples_leaf=config.get('min_samples_leaf', 5),
min_samples_split=config.get('min_samples_split', 10),
class_weight=config.get('class_weight', 'balanced'),
random_state=42,
n_jobs=-1
)
clf.fit(X_train, y_train)
# Return wrapped model with original classes [-1, 0, 1]
return ModelWrapper(clf, original_classes=[-1, 0, 1], n_features=X_train.shape[1])
# ============================================================
# SECTION 4 — SIGNAL / ENTRY LOGIC
# ============================================================
def generate_signals(model, X, thresh) -> tuple:
"""
Generate buy/sell signals based on model confidence > thresh.
Returns (signal, p_pos, p_neg) where:
signal: pd.Series with values in {-1.0, 0.0, 1.0}
p_pos: 1-D array of probability for class +1
p_neg: 1-D array of probability for class -1
"""
proba = model.predict_proba(X)
classes = model.classes_
# Extract probabilities for +1 and -1
p_pos = np.zeros(len(X))
p_neg = np.zeros(len(X))
for i, cls in enumerate(classes):
if cls == 1:
p_pos = proba[:, i]
elif cls == -1:
p_neg = proba[:, i]
# Generate signals: buy if P(+1) > thresh, sell if P(-1) > thresh, else hold
signal = np.zeros(len(X))
signal[p_pos > thresh] = 1.0
signal[p_neg > thresh] = -1.0
signal_series = pd.Series(signal, index=X.index, dtype=float)
return signal_series, p_pos, p_neg
# ============================================================
# SECTION 5 — OPTIMISATION TARGET
# ============================================================
def optimization_config() -> dict:
"""
Return hyperparameters optimized for Sharpe ratio.
Conservative depth and higher estimators reduce overfitting and variance,
improving risk-adjusted returns.
"""
return {
'objective': 'Maximize Sharpe ratio',
'notes': 'Random Forest with balanced class weights and conservative depth for stable risk-adjusted returns',
'n_estimators': 200,
'max_depth': 10,
'min_samples_leaf': 5,
'min_samples_split': 10,
'class_weight': 'balanced'
}
# ============================================================
# SECTION 6 — RISK MANAGEMENT
# ============================================================
def apply_risk(signal, close, pos_size=1.0):
"""
Apply position sizing and risk rules.
For now, simple scaling by position size.
"""
return signal * pos_size
# ============================================================
# SECTION 7 — BACKTEST ENGINE
# ============================================================
def train_and_backtest() -> dict:
"""
Full backtest pipeline: load data, engineer features, train model,
generate signals, compute equity curve and metrics.
"""
# ─────────────────────────────────────────────────────────────
# 1. Load and resample data
# ─────────────────────────────────────────────────────────────
df_raw = pd.read_csv(DATA_PATH)
df_raw['Time'] = pd.to_datetime(df_raw['Time'])
df_raw.set_index('Time', inplace=True)
df_raw['mid'] = (df_raw['Bid'] + df_raw['Ask']) / 2
# Resample to 15-minute OHLC
ohlc_data = df_raw['mid'].resample('15min').ohlc()
ohlc_data.columns = ['open', 'high', 'low', 'close']
df = ohlc_data.copy()
close = df['close']
open_ = df['open']
high = df['high']
low = df['low']
# ─────────────────────────────────────────────────────────────
# 2. Create target: direction 1 hour ahead (4 × 15m bars)
# ─────────────────────────────────────────────────────────────
target = np.sign(close.shift(-4) - close)
# Drop NaN from target BEFORE train/test split
mask = target.notna()
df = df[mask]
target = target[mask]
close = close[mask]
open_ = open_[mask]
high = high[mask]
low = low[mask]
# ─────────────────────────────────────────────────────────────
# 3. Feature engineering
# ─────────────────────────────────────────────────────────────
df = feature_engineering(df, close, open_, high, low)
# Define feature columns (in order used for training)
feature_cols = ['sma_20', 'sma_50', 'sma_200', 'rsi_14']
# Drop NaN from features
mask_features = df[feature_cols].notna().all(axis=1)
df = df[mask_features]
target = target[mask_features]
close = close[mask_features]
open_ = open_[mask_features]
high = high[mask_features]
low = low[mask_features]
# ─────────────────────────────────────────────────────────────
# 4. Train/test split (70/30 walk-forward, no shuffle)
# ─────────────────────────────────────────────────────────────
n_total = len(df)
split_idx = int(0.7 * n_total)
split_dt = df.index[split_idx].strftime('%Y-%m-%d %H:%M:%S')
X_train = df.iloc[:split_idx][feature_cols]
X_test = df.iloc[split_idx:][feature_cols]
y_train = target.iloc[:split_idx]
y_test = target.iloc[split_idx:]
close_train = close.iloc[:split_idx]
close_test = close.iloc[split_idx:]
open_test = open_.iloc[split_idx:]
high_test = high.iloc[split_idx:]
low_test = low.iloc[split_idx:]
n_train = len(X_train)
n_test = len(X_test)
# ─────────────────────────────────────────────────────────────
# 5. Encode labels
# ─────────────────────────────────────────────────────────────
le = LabelEncoder()
le.fit([-1, 0, 1]) # Fit on ALL possible classes
y_train_enc = le.transform(y_train)
y_test_enc = le.transform(y_test)
# ─────────────────────────────────────────────────────────────
# 6. Build and train model
# ─────────────────────────────────────────────────────────────
model = build_model(X_train, y_train_enc)
# ─────────────────────────────────────────────────────────────
# 7. Generate signals on test set
# ─────────────────────────────────────────────────────────────
signal_test_raw, p_pos, p_neg = generate_signals(model, X_test, thresh=0.55)
# Apply risk (position sizing)
signal_test = apply_risk(signal_test_raw, close_test, pos_size=1.0)
# ─────────────────────────────────────────────────────────────
# 8. Compute equity curve and metrics
# ─────────────────────────────────────────────────────────────
# Strategy returns (per signal, scaled by next 1-hour price move)
price_moves = close_test.shift(-4) - close_test
strat_ret = signal_test * price_moves / close_test.shift(1)
# Apply trading cost (2e-5 round-trip cost)
signal_changes = signal_test.diff().abs()
trading_cost = signal_changes * 2e-5
strat_ret = strat_ret - trading_cost
# Buy-and-hold returns
bh_ret = price_moves / close_test.shift(1)
# Equity curves (starting capital 10,000)
capital = 10000
strat_equity = [capital]
bh_equity = [capital]
for ret_s, ret_bh in zip(strat_ret, bh_ret):
strat_equity.append(strat_equity[-1] * (1 + ret_s) if not np.isnan(ret_s) else strat_equity[-1])
bh_equity.append(bh_equity[-1] * (1 + ret_bh) if not np.isnan(ret_bh) else bh_equity[-1])
strat_equity = strat_equity[1:]
bh_equity = bh_equity[1:]
# Total returns (decimal ratio)
total_ret = (strat_equity[-1] - capital) / capital
bh_ret_total = (bh_equity[-1] - capital) / capital
# Sharpe ratio (annualized, assuming 252*24*4 = 24192 15-min bars per year)
ret_series = pd.Series(strat_ret.values, index=close_test.index)
ret_series = ret_series.dropna()
if len(ret_series) > 0 and ret_series.std() > 0:
sharpe_strat = ret_series.mean() / ret_series.std() * np.sqrt(24192)
else:
sharpe_strat = 0.0
ret_series_bh = pd.Series(bh_ret.values, index=close_test.index)
ret_series_bh = ret_series_bh.dropna()
if len(ret_series_bh) > 0 and ret_series_bh.std() > 0:
sharpe_bh = ret_series_bh.mean() / ret_series_bh.std() * np.sqrt(24192)
else:
sharpe_bh = 0.0
# Max Drawdown
cum_strat = np.cumprod(1 + ret_series.fillna(0))
running_max = np.maximum.accumulate(cum_strat)
drawdown = (cum_strat - running_max) / running_max
mdd = float(np.min(drawdown)) if len(drawdown) > 0 else 0.0
# Trade returns and statistics
trade_returns = []
trade_returns_long = []
trade_returns_short = []
entry_price = None
entry_signal = None
n_trades = 0
for i, (sig, ret) in enumerate(zip(signal_test, price_moves)):
if sig != 0 and entry_signal != sig:
if entry_signal is not None:
# Close previous trade
n_trades += 1
trade_ret = (close_test.iloc[i] - entry_price) / entry_price * entry_signal
trade_returns.append(trade_ret)
if entry_signal == 1:
trade_returns_long.append(trade_ret)
else:
trade_returns_short.append(trade_ret)
entry_signal = sig
entry_price = close_test.iloc[i]
ret_dist = trade_returns if trade_returns else []
ret_dist_long = trade_returns_long if trade_returns_long else []
ret_dist_short = trade_returns_short if trade_returns_short else []
# ─────────────────────────────────────────────────────────────
# 9. Confusion matrix and predictions
# ─────────────────────────────────────────────────────────────
pred_test = model.predict(X_test)
from sklearn.metrics import confusion_matrix
cm = confusion_matrix(y_test, pred_test, labels=[-1, 0, 1])
conf_matrix = cm.tolist()
# ─────────────────────────────────────────────────────────────
# 10. Rolling accuracy
# ─────────────────────────────────────────────────────────────
rolling_acc_list = []
rolling_dates = []
for i in range(30, len(X_test)):
window_signals = signal_test.iloc[i-30:i]
window_actual = y_test.iloc[i-30:i]
# Only count non-zero signals
active_mask = window_signals != 0
if active_mask.sum() > 0:
acc = (window_signals[active_mask] == window_actual[active_mask]).sum() / active_mask.sum()
rolling_acc_list.append(float(acc))
rolling_dates.append(close_test.index[i].strftime('%Y-%m-%d %H:%M:%S'))
else:
rolling_acc_list.append(None)
rolling_dates.append(close_test.index[i].strftime('%Y-%m-%d %H:%M:%S'))
# ─────────────────────────────────────────────────────────────
# 11. Bollinger Bands and Moving Averages
# ─────────────────────────────────────────────────────────────
bb_sma = close_test.rolling(20).mean()
bb_std = close_test.rolling(20).std()
bb_upper = bb_sma + 2 * bb_std
bb_lower = bb_sma - 2 * bb_std
ma_50 = close_test.rolling(50).mean()
ma_100 = close_test.rolling(100).mean()
ma_200 = close_test.rolling(200).mean()
# ─────────────────────────────────────────────────────────────
# 12. Drawdown series
# ─────────────────────────────────────────────────────────────
drawdown_series = []
drawdown_dates = []
cum_returns = [1.0]
for ret in ret_series.fillna(0):
cum_returns.append(cum_returns[-1] * (1 + ret))
running_max = np.maximum.accumulate(cum_returns)
for i, (cum, max_cum) in enumerate(zip(cum_returns[1:], running_max[1:])):
dd = (cum - max_cum) / max_cum
drawdown_series.append(float(dd))
drawdown_dates.append(close_test.index[i].strftime('%Y-%m-%d %H:%M:%S'))
# ─────────────────────────────────────────────────────────────
# 13. Feature importance
# ─────────────────────────────────────────────────────────────
importances = model.feature_importances_
feature_importance_dict = {name: imp for name, imp in zip(feature_cols, importances)}
sorted_features = sorted(feature_importance_dict.items(), key=lambda x: x[1])
fi_names = [x[0] for x in sorted_features[-15:]]
fi_values = [float(x[1]) for x in sorted_features[-15:]]
# ─────────────────────────────────────────────────────────────
# 14. Build return dictionary (core metrics)
# ─────────────────────────────────────────────────────────────
result = {
'ohlc': {
'dates': [dt.strftime('%Y-%m-%d %H:%M:%S') for dt in close_test.index],
'open': [float(o) if not np.isnan(o) else None for o in open_test],
'high': [float(h) if not np.isnan(h) else None for h in high_test],
'low': [float(l) if not np.isnan(l) else None for l in low_test],
'close': [float(c) if not np.isnan(c) else None for c in close_test]
},
'signals': {
'dates': [dt.strftime('%Y-%m-%d %H:%M:%S') for dt in signal_test.index],
'values': [float(s) if not np.isnan(s) else None for s in signal_test]
},
'bb': {
'upper': [float(u) if not np.isnan(u) else None for u in bb_upper],
'mid': [float(m) if not np.isnan(m) else None for m in bb_sma],
'lower': [float(l) if not np.isnan(l) else None for l in bb_lower]
},
'ma': {
'ma50': [float(m) if not np.isnan(m) else None for m in ma_50],
'ma100': [float(m) if not np.isnan(m) else None for m in ma_100],
'ma200': [float(m) if not np.isnan(m) else None for m in ma_200]
},
'equity': {
'dates': [dt.strftime('%Y-%m-%d %H:%M:%S') for dt in close_test.index],
'strategy': [float(e) for e in strat_equity],
'bh': [float(e) for e in bh_equity]
},
'feature_importance': {
'names': fi_names,
'values': fi_values
},
'conf_matrix': conf_matrix,
'conf_hist': {
'p_pos': [float(p) if not np.isnan(p) else None for p in p_pos],
'p_neg': [float(p) if not np.isnan(p) else None for p in p_neg]
},
'rolling_acc': {
'dates': rolling_dates,
'values': rolling_acc_list
},
'drawdown': {
'dates': drawdown_dates,
'values': drawdown_series
},
'ret_dist': ret_dist,
'ret_dist_long': ret_dist_long,
'ret_dist_short': ret_dist_short,
'metrics': {
'total_ret': float(total_ret),
'bh_ret': float(bh_ret_total),
'sharpe_strat': float(sharpe_strat),
'sharpe_bh': float(sharpe_bh),
'mdd': float(mdd),
'n_trades': int(n_trades)
},
'split_dt': split_dt,
'split_idx': int(split_idx),
'n_train': int(n_train),
'n_test': int(n_test),
'feature_cols': feature_cols,
'custom_figs': []
}
# ─────────────────────────────────────────────────────────────
# 15. Build custom figures (SMA and RSI)
# ─────────────────────────────────────────────────────────────
# SMA Chart
fig_sma = go.Figure()
fig_sma.add_trace(go.Scatter(
x=close_test.index,
y=close_test,
name='Close',
line=dict(color='#2962FF', width=1)
))
fig_sma.add_trace(go.Scatter(
x=close_test.index,
y=df['sma_20'].iloc[split_idx:],
name='SMA 20',
line=dict(color='#FF6D00', width=1)
))
fig_sma.add_trace(go.Scatter(
x=close_test.index,
y=df['sma_50'].iloc[split_idx:],
name='SMA 50',
line=dict(color='#00C853', width=1)
))
fig_sma.add_trace(go.Scatter(
x=close_test.index,
y=df['sma_200'].iloc[split_idx:],
name='SMA 200',
line=dict(color='#D50000', width=1)
))
fig_sma.update_layout(
title='SMA (20, 50, 200)',
xaxis_title='Date',
yaxis_title='Price',
template='plotly_dark',
paper_bgcolor='#131722',
plot_bgcolor='#131722',
font_color='#d1d4dc',
margin=dict(l=40, r=20, t=30, b=30),
legend=dict(bgcolor='rgba(0,0,0,0)'),
hovermode='x unified'
)
result['custom_figs'].append(fig_sma.to_dict())
# RSI Chart
fig_rsi = go.Figure()
fig_rsi.add_trace(go.Scatter(
x=close_test.index,
y=df['rsi_14'].iloc[split_idx:],
name='RSI 14',
line=dict(color='#2962FF', width=2)
))
fig_rsi.add_hline(y=70, line_dash='dash', line_color='#FF6D00', annotation_text='Overbought (70)')
fig_rsi.add_hline(y=30, line_dash='dash', line_color='#00C853', annotation_text='Oversold (30)')
fig_rsi.update_layout(
title='RSI 14',
xaxis_title='Date',
yaxis_title='RSI',
template='plotly_dark',
paper_bgcolor='#131722',
plot_bgcolor='#131722',
font_color='#d1d4dc',
margin=dict(l=40, r=20, t=30, b=30),
legend=dict(bgcolor='rgba(0,0,0,0)'),
yaxis=dict(range=[0, 100]),
hovermode='x unified'
)
result['custom_figs'].append(fig_rsi.to_dict())
# ─────────────────────────────────────────────────────────────
# 16. Register model for prediction tab
# ─────────────────────────────────────────────────────────────
if 'register_model' in globals() and register_model is not None:
register_model(model)
return result
# Run backtest
if __name__ == '__main__':
result = train_and_backtest()
print("Backtest complete.")
# ╔══════════════════════════════════════════════════════════════╗
# ║ STRATEGY REQUEST LOG ║
# ╚══════════════════════════════════════════════════════════════╝
# Generated : 2026-03-23 07:21:48
# Model : Random Forest
# Feature Eng. : MACD (12,26,9), RSI 14
# Signal / Entry : RSI oversold/overbought
# Optimization : —
# Risk Mgmt : Max 3 trades/day
# Risk Filter : —
# ══════════════════════════════════════════════════════════════
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix
import plotly.graph_objects as go
DATA_PATH = "/Users/malco/Desktop/QuantifyMe/data/EURUSD_ticks.csv"
# ============================================================
# SECTION 0 — MODEL WRAPPER
# ============================================================
class ModelWrapper:
def __init__(self, model, original_classes, n_features=1):
self._m = model
self.classes_ = np.array(original_classes)
self._n_features = n_features
def predict_proba(self, X):
return self._m.predict_proba(X)
def predict(self, X):
return self._m.predict(X)
@property
def feature_importances_(self):
if hasattr(self._m, 'feature_importances_'):
return self._m.feature_importances_
if hasattr(self._m, 'coef_'):
return np.abs(self._m.coef_).mean(axis=0)
try:
imps = [e.feature_importances_ for _, e in self._m.estimators_
if hasattr(e, 'feature_importances_')]
if imps:
return np.mean(imps, axis=0)
except Exception:
pass
return np.ones(self._n_features)
# ============================================================
# SECTION 1 — FEATURE ENGINEERING
# ============================================================
def feature_engineering(df, close, open_, high, low):
# MACD (12, 26, 9)
ema12 = close.ewm(span=12, adjust=False).mean()
ema26 = close.ewm(span=26, adjust=False).mean()
macd_line = ema12 - ema26
macd_signal = macd_line.ewm(span=9, adjust=False).mean()
macd_hist = macd_line - macd_signal
df['macd'] = macd_line
df['macd_signal'] = macd_signal
df['macd_hist'] = macd_hist
# RSI 14
delta = close.diff()
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
avg_gain = gain.ewm(com=13, adjust=False).mean()
avg_loss = loss.ewm(com=13, adjust=False).mean()
rs = avg_gain / avg_loss.replace(0, np.nan)
df['rsi'] = 100 - (100 / (1 + rs))
# RSI overbought/oversold flags
df['rsi_oversold'] = (df['rsi'] < 30).astype(float)
df['rsi_overbought'] = (df['rsi'] > 70).astype(float)
# MACD crossover
df['macd_cross'] = np.sign(macd_hist)
# Lagged features to avoid lookahead
df['rsi_lag1'] = df['rsi'].shift(1)
df['macd_lag1'] = df['macd'].shift(1)
df['macd_hist_lag1'] = df['macd_hist'].shift(1)
return df
# ============================================================
# SECTION 2 — SIGNAL / ENTRY LOGIC
# ============================================================
def generate_signals(model, X, thresh):
proba = model.predict_proba(X)
classes = model.classes_
pos_idx = np.where(classes == 1)[0]
neg_idx = np.where(classes == -1)[0]
p_pos = proba[:, pos_idx[0]] if len(pos_idx) > 0 else np.zeros(len(X))
p_neg = proba[:, neg_idx[0]] if len(neg_idx) > 0 else np.zeros(len(X))
signal_vals = np.zeros(len(X))
signal_vals[p_pos >= thresh] = 1.0
signal_vals[p_neg >= thresh] = -1.0
signal = pd.Series(signal_vals, index=X.index, dtype=float)
return signal, p_pos, p_neg
# ============================================================
# SECTION 3 — ML MODEL
# ============================================================
def build_model(X_train, y_train):
cfg = optimization_config()
raw = RandomForestClassifier(
n_estimators=cfg['n_estimators'],
max_depth=cfg['max_depth'],
min_samples_leaf=cfg['min_samples_leaf'],
class_weight=cfg['class_weight'],
random_state=42,
n_jobs=-1
)
raw.fit(X_train, y_train)
return ModelWrapper(raw, original_classes=[-1, 0, 1], n_features=X_train.shape[1])
# ============================================================
# SECTION 4 — OPTIMISATION TARGET
# ============================================================
def optimization_config():
return {
"objective": "Maximize Sharpe ratio",
"notes": (
"Conservative depth and balanced class weights to prevent overfitting "
"and reduce drawdowns, more trees for stable probability estimates."
),
"n_estimators": 300,
"max_depth": 6,
"min_samples_leaf": 20,
"class_weight": "balanced",
}
# ============================================================
# SECTION 5 — RISK MANAGEMENT
# ============================================================
def apply_risk(signal, close, pos_size=1.0):
# Max 3 trades per day
dates = signal.index.normalize()
result = signal.copy() * pos_size
unique_days = dates.unique()
for day in unique_days:
day_mask = dates == day
day_signals = result[day_mask]
# Count trade entries (signal changes from flat or changes direction)
trade_count = 0
prev = 0.0
for idx in day_signals.index:
cur = day_signals[idx]
if cur != 0.0 and cur != prev:
trade_count += 1
if trade_count > 3:
result[idx] = 0.0
elif cur == 0.0:
pass
prev = result[idx]
return result
# ============================================================
# SECTION 6 — BACKTEST ENGINE
# ============================================================
def train_and_backtest():
# Load data
df_raw = pd.read_csv(DATA_PATH, parse_dates=['Time'])
df_raw = df_raw.sort_values('Time').set_index('Time')
df_raw['mid'] = (df_raw['Bid'] + df_raw['Ask']) / 2
# Resample to 15-minute OHLC
ohlc = df_raw['mid'].resample('15min').ohlc()
ohlc = ohlc.dropna()
close = ohlc['close']
open_ = ohlc['open']
high = ohlc['high']
low = ohlc['low']
# Feature engineering
df_feat = ohlc.copy()
df_feat = feature_engineering(df_feat, close, open_, high, low)
df_feat = df_feat.bfill().ffill()
# Target: direction 1 hour ahead (4 bars)
target = np.sign(close.shift(-4) - close)
# Drop last 4 rows (no valid target)
df_feat = df_feat.iloc[:-4]
target = target.iloc[:-4]
close_aligned = close.iloc[:-4]
open_aligned = open_.iloc[:-4]
high_aligned = high.iloc[:-4]
low_aligned = low.iloc[:-4]
feature_cols = ['macd', 'macd_signal', 'macd_hist', 'rsi',
'rsi_oversold', 'rsi_overbought', 'macd_cross',
'rsi_lag1', 'macd_lag1', 'macd_hist_lag1']
X = df_feat[feature_cols].copy()
X = X.bfill().ffill().fillna(0)
# Train/test split 70/30
split_idx = int(len(X) * 0.70)
X_train = X.iloc[:split_idx]
X_test = X.iloc[split_idx:]
y_train = target.iloc[:split_idx]
y_test = target.iloc[split_idx:]
close_test = close_aligned.iloc[split_idx:]
close_train = close_aligned.iloc[:split_idx]
# Label encoding
enc = LabelEncoder()
y_train_enc = enc.fit_transform(y_train)
y_test_enc = enc.transform(y_test)
# Build model
model = build_model(X_train, y_train_enc)
# Generate signals
thresh = 0.45
signal_test, p_pos, p_neg = generate_signals(model, X_test, thresh)
# Apply risk
signal_test = apply_risk(signal_test, close_test)
# Backtest returns
price_ret = close_test.pct_change().shift(-1).fillna(0)
strat_ret = signal_test * price_ret
# Equity curve
capital = 10000.0
equity_strategy = (1 + strat_ret).cumprod() * capital
equity_bh = (1 + price_ret).cumprod() * capital
total_ret = float((equity_strategy.iloc[-1] / capital) - 1.0)
bh_ret = float((equity_bh.iloc[-1] / capital) - 1.0)
# Sharpe ratio (annualised, 15-min bars: ~26040 bars/year)
bars_per_year = 252 * 26 * 4 # approx
active_mask = signal_test != 0
ret_series = strat_ret[active_mask]
if len(ret_series) > 1 and ret_series.std() > 0:
sharpe_strat = float((ret_series.mean() / ret_series.std()) * np.sqrt(bars_per_year))
else:
sharpe_strat = 0.0
bh_std = price_ret.std()
if bh_std > 0:
sharpe_bh = float((price_ret.mean() / bh_std) * np.sqrt(bars_per_year))
else:
sharpe_bh = 0.0
# Max drawdown
roll_max = equity_strategy.cummax()
drawdown_series = (equity_strategy - roll_max) / roll_max
mdd = float(drawdown_series.min())
# Trade returns
all_trade_rets = strat_ret[active_mask].tolist()
long_mask = (signal_test == 1.0) & active_mask
short_mask = (signal_test == -1.0) & active_mask
long_rets = strat_ret[long_mask].tolist()
short_rets = strat_ret[short_mask].tolist()
n_trades = int(active_mask.sum())
# Confusion matrix
y_pred_raw = model.predict(X_test)
# Map encoded predictions back — model.classes_ is [-1,0,1]
# y_test_enc and y_pred_raw are in encoded space [0,1,2]
cm = confusion_matrix(y_test_enc, y_pred_raw, labels=[0, 1, 2])
conf_matrix = cm.tolist()
# Rolling accuracy (30-bar window, non-flat signals)
correct = (y_pred_raw == y_test_enc).astype(float)
correct_series = pd.Series(correct, index=X_test.index)
active_enc = pd.Series((signal_test != 0).values, index=X_test.index)
correct_active = correct_series.where(active_enc)
rolling_acc_raw = correct_active.rolling(30, min_periods=1).mean()
rolling_acc_vals = [None if np.isnan(v) else float(v) for v in rolling_acc_raw]
# Bollinger Bands (20, 2)
bb_close = close_aligned.iloc[split_idx:]
bb_mid = bb_close.rolling(20).mean()
bb_std = bb_close.rolling(20).std()
bb_upper = bb_mid + 2 * bb_std
bb_lower = bb_mid - 2 * bb_std
def _clean_list(s):
return [None if (v is None or (isinstance(v, float) and (np.isnan(v) or np.isinf(v)))) else float(v)
for v in s]
# Moving averages
ma50 = close_aligned.rolling(50).mean().iloc[split_idx:]
ma100 = close_aligned.rolling(100).mean().iloc[split_idx:]
ma200 = close_aligned.rolling(200).mean().iloc[split_idx:]
# Feature importance (top 15 ascending)
importances = model.feature_importances_
fi_pairs = sorted(zip(feature_cols, importances), key=lambda x: x[1])
fi_pairs = fi_pairs[-15:] if len(fi_pairs) > 15 else fi_pairs
fi_names = [p[0] for p in fi_pairs]
fi_vals = [float(p[1]) for p in fi_pairs]
# OHLC for test period
test_dates = X_test.index
ohlc_dates = [str(d) for d in test_dates]
open_test = open_aligned.iloc[split_idx:]
high_test = high_aligned.iloc[split_idx:]
low_test = low_aligned.iloc[split_idx:]
split_dt = str(X_test.index[0])
# Custom figure: RSI oversold/overbought
rsi_test = df_feat['rsi'].iloc[split_idx:]
rsi_dates = [str(d) for d in rsi_test.index]
fig_rsi = go.Figure()
fig_rsi.add_trace(go.Scatter(
x=rsi_dates,
y=rsi_test.tolist(),
mode='lines',
name='RSI 14',
line=dict(color='#2962ff', width=1.5)
))
fig_rsi.add_hline(y=70, line_dash='dash', line_color='#ef5350',
annotation_text='Overbought (70)', annotation_position='top left')
fig_rsi.add_hline(y=30, line_dash='dash', line_color='#26a69a',
annotation_text='Oversold (30)', annotation_position='bottom left')
fig_rsi.add_hline(y=50, line_dash='dot', line_color='#888888', line_width=1)
# Shade overbought/oversold regions
fig_rsi.add_hrect(y0=70, y1=100, fillcolor='rgba(239,83,80,0.08)', line_width=0)
fig_rsi.add_hrect(y0=0, y1=30, fillcolor='rgba(38,166,154,0.08)', line_width=0)
fig_rsi.update_layout(
title=dict(text='RSI 14 — Oversold / Overbought', font=dict(color='#d1d4dc')),
paper_bgcolor='#131722',
plot_bgcolor='#131722',
font_color='#d1d4dc',
margin=dict(l=40, r=20, t=30, b=30),
legend=dict(bgcolor='rgba(0,0,0,0)'),
xaxis=dict(gridcolor='#2a2e39', showgrid=True),
yaxis=dict(gridcolor='#2a2e39', showgrid=True, range=[0, 100]),
)
custom_figs = [fig_rsi.to_dict()]
result = {
"ohlc": {
"dates": ohlc_dates,
"open": _clean_list(open_test.tolist()),
"high": _clean_list(high_test.tolist()),
"low": _clean_list(low_test.tolist()),
"close": _clean_list(close_test.tolist()),
},
"signals": {
"dates": [str(d) for d in signal_test.index],
"values": [float(v) for v in signal_test.tolist()],
},
"bb": {
"upper": _clean_list(bb_upper.tolist()),
"mid": _clean_list(bb_mid.tolist()),
"lower": _clean_list(bb_lower.tolist()),
},
"ma": {
"ma50": _clean_list(ma50.tolist()),
"ma100": _clean_list(ma100.tolist()),
"ma200": _clean_list(ma200.tolist()),
},
"equity": {
"dates": [str(d) for d in equity_strategy.index],
"strategy": _clean_list(equity_strategy.tolist()),
"bh": _clean_list(equity_bh.tolist()),
},
"feature_importance": {
"names": fi_names,
"values": fi_vals,
},
"conf_matrix": conf_matrix,
"conf_hist": {
"p_pos": [float(v) for v in p_pos.tolist()],
"p_neg": [float(v) for v in p_neg.tolist()],
},
"rolling_acc": {
"dates": [str(d) for d in X_test.index],
"values": rolling_acc_vals,
},
"drawdown": {
"dates": [str(d) for d in drawdown_series.index],
"values": _clean_list(drawdown_series.tolist()),
},
"ret_dist": [float(v) for v in all_trade_rets],
"ret_dist_long": [float(v) for v in long_rets],
"ret_dist_short": [float(v) for v in short_rets],
"metrics": {
"total_ret": total_ret,
"bh_ret": bh_ret,
"sharpe_strat": sharpe_strat,
"sharpe_bh": sharpe_bh,
"mdd": mdd,
"n_trades": n_trades,
},
"split_dt": split_dt,
"split_idx": split_idx,
"n_train": len(X_train),
"n_test": len(X_test),
"custom_figs": custom_figs,
}
return result
# ╔══════════════════════════════════════════════════════════════╗
# ║ STRATEGY REQUEST LOG ║
# ╚══════════════════════════════════════════════════════════════╝
# Generated : 2026-03-23 05:46:30
# Model : Gradient Boosting
# Feature Eng. : MACD (12,26,9)
# Signal / Entry : MA crossover
# Optimization : Minimize max drawdown
# Risk Mgmt : —
# Risk Filter : —
# ══════════════════════════════════════════════════════════════
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import confusion_matrix
import plotly.graph_objects as go
DATA_PATH = "/Users/malco/Desktop/QuantifyMe/data/EURUSD_ticks.csv"
# ============================================================
# SECTION 0 — MODEL WRAPPER
# ============================================================
class ModelWrapper:
def __init__(self, model, original_classes, n_features=1):
self._m = model
self.classes_ = np.array(original_classes)
self._n_features = n_features
def predict_proba(self, X):
return self._m.predict_proba(X)
def predict(self, X):
return self._m.predict(X)
@property
def feature_importances_(self):
if hasattr(self._m, 'feature_importances_'):
return self._m.feature_importances_
if hasattr(self._m, 'coef_'):
return np.abs(self._m.coef_).mean(axis=0)
try:
imps = [e.feature_importances_ for _, e in self._m.estimators_
if hasattr(e, 'feature_importances_')]
if imps:
return np.mean(imps, axis=0)
except Exception:
pass
return np.ones(self._n_features)
# ============================================================
# SECTION 1 — FEATURE ENGINEERING
# ============================================================
def feature_engineering(df, close, open_, high, low):
"""Add MACD(12,26,9) and technical indicators."""
# MACD (12, 26, 9)
ema12 = close.ewm(span=12, adjust=False).mean()
ema26 = close.ewm(span=26, adjust=False).mean()
macd = ema12 - ema26
signal_line = macd.ewm(span=9, adjust=False).mean()
macd_hist = macd - signal_line
df['macd'] = macd
df['macd_signal'] = signal_line
df['macd_hist'] = macd_hist
# RSI (14)
delta = close.diff()
gain = (delta.where(delta > 0, 0)).ewm(span=14, adjust=False).mean()
loss = (-delta.where(delta < 0, 0)).ewm(span=14, adjust=False).mean()
rs = gain / loss.replace(0, 1e-10)
df['rsi'] = 100 - (100 / (1 + rs))
# Bollinger Bands (20, 2)
ma20 = close.rolling(20).mean()
std20 = close.rolling(20).std()
df['bb_upper'] = ma20 + 2 * std20
df['bb_mid'] = ma20
df['bb_lower'] = ma20 - 2 * std20
# Moving averages
df['ma50'] = close.rolling(50).mean()
df['ma100'] = close.rolling(100).mean()
df['ma200'] = close.rolling(200).mean()
# Price momentum
df['returns'] = close.pct_change()
df['roc10'] = (close - close.shift(10)) / close.shift(10)
# Volatility
df['volatility'] = close.pct_change().rolling(20).std()
# High-Low range
df['hl_pct'] = (high - low) / close
return df
# ============================================================
# SECTION 2 — SIGNAL / ENTRY LOGIC
# ============================================================
def generate_signals(model, X, thresh):
"""Generate trading signals from model predictions."""
proba = model.predict_proba(X)
# Extract class probabilities using model.classes_
class_to_idx = {c: i for i, c in enumerate(model.classes_)}
p_pos = np.zeros(len(X))
p_neg = np.zeros(len(X))
if 1 in class_to_idx:
p_pos = proba[:, class_to_idx[1]]
if -1 in class_to_idx:
p_neg = proba[:, class_to_idx[-1]]
# Direct model predictions: -1, 0, or 1
pred = model.predict(X)
signal = pd.Series(pred.astype(float), index=X.index)
return signal, p_pos, p_neg
# ============================================================
# SECTION 3 — ML MODEL
# ============================================================
def build_model(X_train, y_train):
"""Train Gradient Boosting classifier with optimization config."""
config = optimization_config()
gb = GradientBoostingClassifier(
n_estimators=config.get('n_estimators', 100),
learning_rate=config.get('learning_rate', 0.05),
max_depth=config.get('max_depth', 5),
min_samples_leaf=config.get('min_samples_leaf', 20),
subsample=config.get('subsample', 0.8),
random_state=42
)
gb.fit(X_train, y_train)
# Wrap with ModelWrapper; classes are [-1, 0, 1]
return ModelWrapper(gb, original_classes=[-1, 0, 1], n_features=X_train.shape[1])
# ============================================================
# SECTION 4 — OPTIMISATION TARGET
# ============================================================
def optimization_config():
"""Config optimized to minimize max drawdown."""
return {
'objective': 'minimize_max_drawdown',
'notes': 'Conservative: high min_samples_leaf, low learning_rate, shallow depth',
'n_estimators': 200,
'learning_rate': 0.02,
'max_depth': 4,
'min_samples_leaf': 30,
'subsample': 0.75
}
# ============================================================
# SECTION 5 — RISK MANAGEMENT
# ============================================================
def apply_risk(signal, close, pos_size=1.0):
"""Apply position sizing; simple scaling of signal."""
return signal * pos_size
# ============================================================
# SECTION 6 — BACKTEST ENGINE
# ============================================================
def train_and_backtest():
"""Full backtest: load data, train model, generate P&L, return metrics."""
# Load and resample
df_ticks = pd.read_csv(DATA_PATH, parse_dates=['Time'])
df_ticks.set_index('Time', inplace=True)
mid = (df_ticks['Bid'] + df_ticks['Ask']) / 2
ohlc = mid.resample('15min').ohlc()
close = ohlc['close']
open_ = ohlc['open']
high = ohlc['high']
low = ohlc['low']
# Remove NaN bars
valid = close.notna()
close = close[valid]
open_ = open_[valid]
high = high[valid]
low = low[valid]
# Feature engineering
df_feat = pd.DataFrame(index=close.index)
df_feat = feature_engineering(df_feat, close, open_, high, low)
df_feat = df_feat.bfill().ffill()
# Target: direction 1 hour ahead (4 bars of 15min)
target = np.sign(close.shift(-4) - close)
target = pd.Series(target, index=close.index)
# Remove look-ahead NaNs
valid_target = target.notna()
df_feat = df_feat[valid_target]
target = target[valid_target]
close = close[valid_target]
open_ = open_[valid_target]
high = high[valid_target]
low = low[valid_target]
# Train/test split (70/30, walk-forward)
split_idx = int(len(df_feat) * 0.7)
split_dt = str(df_feat.index[split_idx])
X_train = df_feat.iloc[:split_idx]
y_train = target.iloc[:split_idx]
X_test = df_feat.iloc[split_idx:]
y_test = target.iloc[split_idx:]
close_train = close.iloc[:split_idx]
close_test = close.iloc[split_idx:]
# Encode labels: [-1, 0, 1] → [0, 1, 2]
enc = LabelEncoder()
y_train_enc = enc.fit_transform(y_train)
y_test_enc = enc.transform(y_test)
# Build model
model = build_model(X_train, y_train_enc)
# Generate signals on test set
signal_test, p_pos, p_neg = generate_signals(model, X_test, thresh=0.5)
signal_test = apply_risk(signal_test, close_test, pos_size=1.0)
# P&L: trade signal at next bar, minus 2e-5 cost per signal change
ret_test = close_test.pct_change()
signal_lag = signal_test.shift(1).fillna(0)
trade_cost = 2e-5 * np.abs(signal_test.diff().fillna(0))
pnl = signal_lag * ret_test - trade_cost
pnl = pnl.fillna(0)
# Equity curves
cum_ret_strat = (1 + pnl).cumprod() - 1
equity_strat = 10000 * (1 + cum_ret_strat)
# Buy & hold
bh_rets = ret_test.fillna(0)
cum_ret_bh = (1 + bh_rets).cumprod() - 1
equity_bh = 10000 * (1 + cum_ret_bh)
# Metrics: total return (decimal ratio, not percent)
total_ret = float(cum_ret_strat.iloc[-1]) if len(cum_ret_strat) > 0 else 0.0
bh_ret = float(cum_ret_bh.iloc[-1]) if len(cum_ret_bh) > 0 else 0.0
# Sharpe ratio (annualized)
pnl_std = pnl.std()
if pnl_std > 0:
sharpe_strat = float((pnl.mean() / pnl_std) * np.sqrt(252 * 24 * 4))
else:
sharpe_strat = 0.0
bh_std = bh_rets.std()
if bh_std > 0:
sharpe_bh = float((bh_rets.mean() / bh_std) * np.sqrt(252 * 24 * 4))
else:
sharpe_bh = 0.0
# Max drawdown (decimal ratio, negative)
running_max = equity_strat.expanding().max()
drawdown_series = (equity_strat - running_max) / running_max
mdd = float(drawdown_series.min()) if len(drawdown_series) > 0 else 0.0
# Trade returns: entry on signal change, exit on next change
trade_list = []
trade_long = []
trade_short = []
entry_idx = None
entry_sig = 0
for i in range(1, len(signal_test)):
if signal_test.iloc[i] != entry_sig:
if entry_idx is not None:
exit_ret = (close_test.iloc[i] - close_test.iloc[entry_idx]) / close_test.iloc[entry_idx]
trade_ret = exit_ret * entry_sig - 2e-5
trade_list.append(float(trade_ret))
if entry_sig > 0:
trade_long.append(float(trade_ret))
elif entry_sig < 0:
trade_short.append(float(trade_ret))
entry_idx = i
entry_sig = signal_test.iloc[i]
# Feature importance: top 15
imp = model.feature_importances_
feat_names = list(X_train.columns)
imp_tuples = [(feat_names[j], float(imp[j])) for j in range(len(imp))]
imp_tuples.sort(key=lambda x: x[1])
if len(imp_tuples) > 15:
imp_tuples = imp_tuples[-15:]
# Confusion matrix
y_pred = model.predict(X_test)
class_map = {-1: 0, 0: 1, 1: 2}
y_test_mapped = np.array([class_map[int(y)] for y in y_test_enc])
y_pred_mapped = np.array([class_map[int(p)] for p in y_pred])
cm = confusion_matrix(y_test_mapped, y_pred_mapped, labels=[0, 1, 2])
conf_mat = cm.tolist()
# Rolling accuracy (30-bar window, test period only)
rolling_accuracy = []
for i in range(30, len(y_pred)):
win = (y_pred_mapped[i-30:i] == y_test_mapped[i-30:i]).sum()
acc = float(win) / 30.0
rolling_accuracy.append(acc)
# MA crossover custom figure
custom_figs = []
ma_short = close_test.rolling(10).mean()
ma_long = close_test.rolling(20).mean()
fig = go.Figure()
fig.add_trace(go.Scatter(
x=close_test.index, y=close_test, mode='lines',
name='Close', line=dict(color='#2962FF', width=2)
))
fig.add_trace(go.Scatter(
x=ma_short.index, y=ma_short, mode='lines',
name='MA(10)', line=dict(color='#FF6D00', width=1.5)
))
fig.add_trace(go.Scatter(
x=ma_long.index, y=ma_long, mode='lines',
name='MA(20)', line=dict(color='#AB47BC', width=1.5)
))
fig.update_layout(
title='MA Crossover (Test Period)',
xaxis_title='Date', yaxis_title='Price (EURUSD)',
paper_bgcolor='#131722', plot_bgcolor='#131722',
font_color='#d1d4dc',
margin=dict(l=40, r=20, t=30, b=30),
legend=dict(bgcolor='rgba(0,0,0,0)', x=0.01, y=0.99)
)
custom_figs.append(fig.to_dict())
# Helper: convert NaN/Inf to None
def safe_tolist(arr):
lst = np.where(np.isnan(arr) | np.isinf(arr), None, arr).tolist()
return [None if (isinstance(x, float) and (np.isnan(x) or np.isinf(x))) else x for x in lst]
# Build result dict
result = {
'ohlc': {
'dates': [str(d.date()) + ' ' + str(d.time()) for d in close.index],
'open': safe_tolist(open_.values),
'high': safe_tolist(high.values),
'low': safe_tolist(low.values),
'close': safe_tolist(close.values)
},
'signals': {
'dates': [str(d.date()) + ' ' + str(d.time()) for d in signal_test.index],
'values': safe_tolist(signal_test.values)
},
'bb': {
'upper': safe_tolist(df_feat['bb_upper'].values),
'mid': safe_tolist(df_feat['bb_mid'].values),
'lower': safe_tolist(df_feat['bb_lower'].values)
},
'ma': {
'ma50': safe_tolist(df_feat['ma50'].values),
'ma100': safe_tolist(df_feat['ma100'].values),
'ma200': safe_tolist(df_feat['ma200'].values)
},
'equity': {
'dates': [str(d.date()) + ' ' + str(d.time()) for d in equity_strat.index],
'strategy': safe_tolist(equity_strat.values),
'bh': safe_tolist(equity_bh.values)
},
'feature_importance': {
'names': [name for name, _ in imp_tuples],
'values': [val for _, val in imp_tuples]
},
'conf_matrix': conf_mat,
'conf_hist': {
'p_pos': safe_tolist(p_pos),
'p_neg': safe_tolist(p_neg)
},
'rolling_acc': {
'dates': [str(d.date()) + ' ' + str(d.time()) for d in close_test.index[30:]],
'values': rolling_accuracy
},
'drawdown': {
'dates': [str(d.date()) + ' ' + str(d.time()) for d in drawdown_series.index],
'values': safe_tolist(drawdown_series.values)
},
'ret_dist': trade_list,
'ret_dist_long': trade_long,
'ret_dist_short': trade_short,
'metrics': {
'total_ret': total_ret,
'bh_ret': bh_ret,
'sharpe_strat': sharpe_strat,
'sharpe_bh': sharpe_bh,
'mdd': mdd,
'n_trades': len(trade_list)
},
'split_dt': split_dt,
'split_idx': int(split_idx),
'n_train': int(split_idx),
'n_test': int(len(X_test)),
'custom_figs': custom_figs
}
return result
if __name__ == '__main__':
res = train_and_backtest()
import json
print(json.dumps(res, default=str, indent=2))