class GridTrendMultiplier: """ Expert4x Grid Trend Multiplier Strategy
metrics = strategy.execute_strategy(df)
The strategy automatically adapts to market conditions, increasing exposure during strong trends while maintaining strict risk controls through position sizing and stop losses.
Core Features: - Dynamic grid levels based on ATR - Trend detection using multiple timeframes - Position size multiplier based on trend strength - Martingale-style recovery with risk management - Auto grid adjustment during strong trends """ expert4x grid trend multiplier
print("\n" + "="*50) print("GRID TREND MULTIPLIER STRATEGY RESULTS") print("="*50) for key, value in metrics.items(): if isinstance(value, float): print(f"{key.replace('_', ' ').title()}: {value:.2f}") else: print(f"{key.replace('_', ' ').title()}: {value}") return strategy, metrics if == " main ": strategy, metrics = run_backtest()
def update_multiplier(self, trend_strength: float): """ Update position multiplier based on trend strength """ if trend_strength > 50: # Strong trend - increase multiplier self.total_multiplier = min( self.max_multiplier, self.total_multiplier * self.trend_multiplier ) elif trend_strength < 25: # Weak trend - decrease multiplier self.total_multiplier = max( 1.0, self.total_multiplier / self.trend_multiplier ) def check_grid_execution(self, current_price: float, grid_levels: List[float], atr: float) -> Optional[Dict]: """ Check if price hit a grid level and execute order Returns: Order details if executed, None otherwise """ for level in grid_levels: # Check if price crossed a grid level if abs(current_price - level) / level < 0.0001: # Within 0.01% # Determine direction based on trend if self.current_trend == "BULLISH": direction = "BUY" stop_loss = level * (1 - 0.02) # 2% stop loss take_profit = level * (1 + self.grid_distance_pct / 100) elif self.current_trend == "BEARISH": direction = "SELL" stop_loss = level * (1 + 0.02) take_profit = level * (1 - self.grid_distance_pct / 100) else: # Neutral - alternate direction = "BUY" if len(self.open_positions) % 2 == 0 else "SELL" stop_loss = level * (1 - 0.02) if direction == "BUY" else level * (1 + 0.02) take_profit = level * (1 + self.grid_distance_pct / 100) if direction == "BUY" else level * (1 - self.grid_distance_pct / 100) position_size = self.calculate_position_size(level) order = { 'type': direction, 'entry_price': level, 'position_size': position_size, 'stop_loss': stop_loss, 'take_profit': take_profit, 'timestamp': datetime.now(), 'grid_level': level, 'multiplier': self.total_multiplier } return order return None
def calculate_position_size(self, price: float, stop_loss_pct: float = 0.02) -> float: """ Calculate position size based on trend multiplier and risk management Args: price: Entry price stop_loss_pct: Stop loss percentage Returns: Position size in units """ # Base risk amount risk_amount = self.balance * self.risk_per_trade # Apply trend multiplier if self.current_trend == "BULLISH": position_multiplier = self.total_multiplier elif self.current_trend == "BEARISH": position_multiplier = self.total_multiplier else: position_multiplier = 1.0 # Calculate position size stop_loss_distance = price * stop_loss_pct position_size = (risk_amount * position_multiplier) / stop_loss_distance # Cap position size based on available balance max_position = self.balance * 0.1 / price # Max 10% of balance per trade position_size = min(position_size, max_position) return position_size for i in range(1000): price += np
import pandas as pd import numpy as np from datetime import datetime from typing import Dict, List, Tuple, Optional import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger()
I'll help you create an feature. This is a trading strategy that combines grid trading with trend detection and position sizing multipliers.
for i in range(1000): price += np.random.randn() * 0.5 if i > 200 and i < 600: # Uptrend price += 0.1 elif i > 600: # Downtrend price -= 0.05 prices.append(max(price, 10)) df = pd.DataFrame({ 'high': [p * (1 + abs(np.random.randn() * 0.002)) for p in prices], 'low': [p * (1 - abs(np.random.randn() * 0.002)) for p in prices], 'close': prices }, index=dates) 200 and i <
def update_positions(self, current_price: float) -> List[Dict]: """ Update open positions and close if TP/SL hit Returns: List of closed trades """ closed = [] remaining_positions = [] for position in self.open_positions: # Check take profit if (position['type'] == 'BUY' and current_price >= position['take_profit']) or \ (position['type'] == 'SELL' and current_price <= position['take_profit']): # Close with profit profit = abs(current_price - position['entry_price']) * position['position_size'] if position['type'] == 'SELL': profit = profit # Profit for sell is same calculation position['exit_price'] = current_price position['profit'] = profit position['exit_time'] = datetime.now() position['result'] = 'WIN' closed.append(position) self.winning_trades += 1 # Check stop loss elif (position['type'] == 'BUY' and current_price <= position['stop_loss']) or \ (position['type'] == 'SELL' and current_price >= position['stop_loss']): # Close with loss loss = abs(current_price - position['entry_price']) * position['position_size'] position['exit_price'] = current_price position['profit'] = -loss position['exit_time'] = datetime.now() position['result'] = 'LOSS' closed.append(position) self.losing_trades += 1 else: # Position still open remaining_positions.append(position) self.open_positions = remaining_positions self.total_trades += len(closed) # Update balance for trade in closed: self.balance += trade['profit'] # Update drawdown if self.balance > self.peak_balance: self.peak_balance = self.balance current_drawdown = (self.peak_balance - self.balance) / self.peak_balance * 100 self.max_drawdown = max(self.max_drawdown, current_drawdown) return closed
def detect_trend(self, prices: pd.Series, volume: Optional[pd.Series] = None) -> Tuple[str, float]: """ Detect market trend using multiple indicators Returns: (trend_direction, trend_strength) """ # Calculate EMAs ema_fast = prices.ewm(span=20, adjust=False).mean() ema_slow = prices.ewm(span=50, adjust=False).mean() # Calculate ADX for trend strength high = prices.rolling(window=14).max() low = prices.rolling(window=14).min() plus_dm = high.diff() minus_dm = -low.diff() plus_dm[plus_dm < 0] = 0 minus_dm[minus_dm < 0] = 0 tr = self.calculate_atr( high, low, prices ) if hasattr(self, 'calculate_atr') else pd.Series(index=prices.index) plus_di = 100 * (plus_dm.rolling(14).mean() / tr) minus_di = 100 * (minus_dm.rolling(14).mean() / tr) dx = 100 * abs(plus_di - minus_di) / (plus_di + minus_di) adx = dx.rolling(14).mean() # Determine trend current_ema_fast = ema_fast.iloc[-1] current_ema_slow = ema_slow.iloc[-1] current_adx = adx.iloc[-1] if not pd.isna(adx.iloc[-1]) else 25 if current_ema_fast > current_ema_slow and current_adx > 25: trend = "BULLISH" trend_strength = min(100, current_adx) elif current_ema_fast < current_ema_slow and current_adx > 25: trend = "BEARISH" trend_strength = min(100, current_adx) else: trend = "NEUTRAL" trend_strength = 0 return trend, trend_strength
def execute_strategy(self, price_data: pd.DataFrame, volume_data: Optional[pd.Series] = None) -> Dict: """ Main strategy execution loop Args: price_data: DataFrame with 'high', 'low', 'close' columns volume_data: Optional volume series Returns: Strategy performance metrics """ logger.info("Starting Grid Trend Multiplier Strategy") for i in range(len(price_data)): current_close = price_data['close'].iloc[i] current_high = price_data['high'].iloc[i] current_low = price_data['low'].iloc[i] # Use enough data for indicators if i < 50: continue # Get price series up to current point price_series = price_data['close'].iloc[:i+1] # Detect trend self.current_trend, self.trend_strength = self.detect_trend(price_series) # Update multiplier based on trend strength self.update_multiplier(self.trend_strength) # Calculate ATR atr_series = self.calculate_atr( price_data['high'].iloc[:i+1], price_data['low'].iloc[:i+1], price_data['close'].iloc[:i+1] ) current_atr = atr_series.iloc[-1] if not pd.isna(atr_series.iloc[-1]) else current_close * 0.01 # Calculate grid levels self.grid_levels = self.calculate_grid_levels(current_close, current_atr) # Check for grid execution order = self.check_grid_execution(current_close, self.grid_levels, current_atr) if order: self.open_positions.append(order) logger.info(f"Order executed: {order['type']} at {order['entry_price']:.4f} " f"with multiplier {order['multiplier']:.2f}") # Update existing positions closed_trades = self.update_positions(current_close) if closed_trades: for trade in closed_trades: logger.info(f"Trade closed: {trade['result']} with profit ${trade['profit']:.2f}") # Calculate final metrics metrics = self.get_performance_metrics() return metrics
def __init__(self, initial_balance: float = 10000, grid_distance_pct: float = 0.5, max_grid_levels: int = 10, trend_multiplier: float = 1.5, max_multiplier: float = 5.0, atr_period: int = 14, risk_per_trade: float = 0.02): """ Initialize Grid Trend Multiplier Args: initial_balance: Starting account balance grid_distance_pct: Distance between grid levels (% of price) max_grid_levels: Maximum grid levels trend_multiplier: Position size multiplier for trend direction max_multiplier: Maximum allowed multiplier atr_period: ATR calculation period risk_per_trade: Risk per trade (2% = 0.02) """ self.initial_balance = initial_balance self.balance = initial_balance self.grid_distance_pct = grid_distance_pct self.max_grid_levels = max_grid_levels self.trend_multiplier = trend_multiplier self.max_multiplier = max_multiplier self.atr_period = atr_period self.risk_per_trade = risk_per_trade # Strategy state self.grid_levels = [] self.open_positions = [] self.closed_trades = [] self.current_trend = "NEUTRAL" # BULLISH, BEARISH, NEUTRAL self.trend_strength = 0 # 0-100 self.total_multiplier = 1.0 # Performance metrics self.total_trades = 0 self.winning_trades = 0 self.losing_trades = 0 self.max_drawdown = 0 self.peak_balance = initial_balance def calculate_atr(self, high: pd.Series, low: pd.Series, close: pd.Series) -> pd.Series: """Calculate Average True Range""" tr1 = high - low tr2 = abs(high - close.shift()) tr3 = abs(low - close.shift()) tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) atr = tr.rolling(window=self.atr_period).mean() return atr