top of page

Building a Multi-Agent Trading System with Python

  • Writer: Nikhil Adithyan
    Nikhil Adithyan
  • 2 days ago
  • 13 min read

A comprehensive approach to building a unified framework


ree

Introduction

Most trading bots try to handle everything at once. They trade stocks, forex, and crypto in the same way, which makes them rigid and fragile. Markets behave differently, and one strategy rarely fits all.


Hedge funds solve this by using multiple agents. Each one specializes in a market, and the system coordinates them together. This design is more resilient and adapts better to shocks.


A recent paper, HedgeAgents: A Balanced-aware Multi-agent Financial Trading System, takes this idea to the extreme with reinforcement learning. Our goal here is more practical: build a simplified, production-like foundation that captures the same multi-agent spirit.


We will use EODHD’s historical and real-time data to power this framework. This article is about the architecture, not live execution. Think of it as the blueprint for a system you can later extend into production.


Why Multi-Agent?

When a single trading bot is asked to handle every market, it often breaks down. Stocks, forex, and crypto behave differently, and forcing one system to manage them all usually means it underperforms everywhere.


Take stocks for example. They often reward trend-following behavior. Forex, on the other hand, gravitates toward mean reversion where price swings eventually pull back. Crypto is far more erratic, moving in bursts that defy both patterns.


A multi-agent system accepts this reality. Instead of squeezing everything into one box, it creates specialists. Each agent focuses on its own market, learns the rhythms there, and executes with strategies that fit.


The real strength of this design is how these agents come together. By running side by side, they cover weaknesses and balance each other out. And with EODHD’s data across all three asset classes, building such a system in practice becomes much more accessible.


Setting Up the Foundation

Before building agents, we need a clean foundation. That means importing only the packages we will actually use. No extras, no clutter.



# 1. IMPORTING PACKAGES

import asyncio
import json
import time
from datetime import datetime, timedelta

import pandas as pd
import numpy as np
import requests
import websockets

This set gives us everything required for async tasks, data requests, and handling time series. Pandas and NumPy form the backbone for analysis, while requests and websockets handle the two ways we connect to EODHD’s API.


With the basics ready, the next step is configuration. Here we define our API token, endpoints, symbol mappings, and strategy parameters.



# 2. CONFIGURATION & SYMBOL MAPPING

EODHD_API_TOKEN = "YOUR EODHD API KEY"
API_BASE_REST = "https://eodhd.com/api"

WS_ENDPOINTS = {
    "us":     "wss://ws.eodhistoricaldata.com/ws/us",
    "forex":  "wss://ws.eodhistoricaldata.com/ws/forex",
    "crypto": "wss://ws.eodhistoricaldata.com/ws/crypto",
}

RUN_MODE = {"backtest": True, "live_ws": False, "live_poll_rest": False}
assert sum(int(v) for v in RUN_MODE.values()) == 1

INTRADAY_INTERVAL = "5m"
BAR_SECONDS = 5 * 60
LOOKBACK_DAYS = 30
TIMEZONE = "UTC"

SYMBOLS = {
    "stocks": {"rest": "AAPL.US",    "ws": "AAPL"},
    "forex":  {"rest": "EUR.FOREX",  "ws": "EURUSD"},
    "crypto": {"rest": "BTC-USD.CC", "ws": "BTC-USD"},
}

STRATEGY_PARAMS = {
    "stocks": {"type": "sma_cross", "fast": 20, "slow": 50, "confirm_above_fast": True},
    "forex":  {"type": "rsi_mean_revert", "period": 14, "entry_rsi": 30, "exit_rsi": 50},
    "crypto": {"type": "donchian_breakout", "window": 20, "exit_rule": "midline", "use_trailing_atr": False},
}

RISK = {
    "risk_per_trade": 0.01,
    "max_leverage": 1.0,
    "stop_pct": 0.01,
    "take_profit_pct": 0.02,
}

PORTFOLIO = {
    "initial_equity": 100000.0,
    "allocation": "equal",
    "allow_short": False,
}

NET = {"rest_timeout": 20, "max_retries": 3, "retry_backoff_sec": 1.5}

LOGGING = {"trades_csv": True, "equity_csv": True}

There are a few key things happening here. Symbols are mapped differently for REST and WebSocket calls, so we define both. Each market also has its own strategy parameters. Stocks get a moving average crossover, forex relies on RSI, and crypto follows breakouts. Risk and portfolio settings keep the system controlled and realistic.


This configuration file is the heart of flexibility. With a single change here, we can adjust strategies, markets, or run mode without touching the rest of the code.


Data Layer: Historical (REST)

The first piece of our data layer is pulling historical intraday prices using EODHD’s Intraday Historical endpoint. This is what lets agents test strategies before they ever touch live markets.



# 3. DATA LAYER 1 (REST INTRADAY)

def _to_unix(dt: datetime) -> int:
    return int(dt.timestamp())

def _request_json(url: str, params: dict, timeout: int, retries: int, backoff: float):
    for i in range(retries):
        try:
            r = requests.get(url, params=params, timeout=timeout)
            if r.status_code == 200:
                return r.json()
        except requests.RequestException:
            pass
        time.sleep(backoff * (2 ** i))
    raise RuntimeError(f"Failed REST call: {url}")

def _normalize_intraday(payload) -> pd.DataFrame:
    if not payload:
        return pd.DataFrame(columns=["open","high","low","close","volume"])
    df = pd.DataFrame(payload)

    # timestamp field variants
    if "t" in df.columns:
        ts = pd.to_datetime(df["t"], unit="s", utc=True)
    elif "timestamp" in df.columns and np.issubdtype(df["timestamp"].dtype, np.integer):
        ts = pd.to_datetime(df["timestamp"], unit="s", utc=True)
    elif "datetime" in df.columns:
        ts = pd.to_datetime(df["datetime"], utc=True)
    else:
        raise ValueError("No recognizable time field in intraday payload")

    # price field variants
    o = df["open"]   if "open"   in df.columns else df.get("o")
    h = df["high"]   if "high"   in df.columns else df.get("h")
    l = df["low"]    if "low"    in df.columns else df.get("l")
    c = df["close"]  if "close"  in df.columns else df.get("c")
    v = df["volume"] if "volume" in df.columns else df.get("v", pd.Series([np.nan]*len(df)))

    out = pd.DataFrame(
        {"open": o, "high": h, "low": l, "close": c, "volume": v},
        index=ts
    ).astype({"open":"float64","high":"float64","low":"float64","close":"float64"})
    out.index.name = "time"
    out = out.sort_index()
    return out

def get_intraday_rest(symbol_rest: str, interval: str = INTRADAY_INTERVAL,
                      days: int = LOOKBACK_DAYS, api_token: str = EODHD_API_TOKEN,
                      base_url: str = API_BASE_REST, net: dict = NET) -> pd.DataFrame:
    to_dt = datetime.utcnow()
    from_dt = to_dt - timedelta(days=days)
    url = f"{base_url}/intraday/{symbol_rest}"
    params = {
        "api_token": api_token,
        "interval": interval,
        "from": _to_unix(from_dt),
        "to": _to_unix(to_dt),
        "fmt": "json",
        "order": "a",
    }
    payload = _request_json(url, params, net["rest_timeout"], net["max_retries"], net["retry_backoff_sec"])
    df = _normalize_intraday(payload)

    # enforce exact bar grid (guards against clock skew/gaps)
    rule = {"1m": "1min", "5m": "5min", "1h": "1H"}.get(interval, "5min")
    df = df.resample(rule, label="right", closed="right").agg(
        {"open":"first","high":"max","low":"min","close":"last","volume":"sum"}
    ).dropna(subset=["open","high","low","close"])

    return df

The function chain works like this. requestjson makes the API call with retries so temporary errors don’t kill the run. normalizeintraday cleans the raw payload since EODHD responses can vary in field names. Finally, get_intraday_rest ties it together by building the URL, pulling data, and resampling into fixed bars.


The result is a tidy DataFrame with consistent timestamps. This makes sure our agents backtest on an even grid, without missing bars or irregular gaps.


Data Layer: Real-Time (WebSocket)

If the REST layer handles history, the WebSocket layer is where agents breathe live market data. This piece listens to continuous tick feeds from stocks, forex, and crypto (extracted using EODHD WebSocket), then shapes them into bar data that strategies can actually use.



# 4. DATA LAYER 2 (WEBSOCKET)

def _ws_url(feed_key: str) -> str:
    return f"{WS_ENDPOINTS[feed_key]}?api_token={EODHD_API_TOKEN}"

def _subscribe_payload(symbols: list[str]) -> str:
    return json.dumps({"action": "subscribe", "symbols": ",".join(symbols)})

async def ws_stream(feed_key: str, symbols: list[str], out_tick_queue: asyncio.Queue):
    url = _ws_url(feed_key)
    sub_msg = _subscribe_payload(symbols)
    while True:
        try:
            async with websockets.connect(url, ping_interval=20, ping_timeout=20) as ws:
                await ws.send(sub_msg)
                async for raw in ws:
                    try:
                        msg = json.loads(raw)
                        sym = msg.get("s") or msg.get("symbol")
                        if not sym:
                            continue
                        ts = msg.get("t") or msg.get("timestamp") or msg.get("time")
                        if ts is None:
                            continue
                        if isinstance(ts, (int, float)):
                            ts = pd.to_datetime(ts, unit="s", utc=True)
                        else:
                            ts = pd.to_datetime(ts, utc=True)

                        price = (msg.get("p") or msg.get("price") or
                                 msg.get("last") or msg.get("c"))
                        if price is None:
                            continue
                        size = msg.get("q") or msg.get("size") or msg.get("volume") or 0
                        await out_tick_queue.put((sym, ts, float(price), float(size)))
                    except Exception:
                        continue
        except Exception:
            await asyncio.sleep(2.0)

class BarAggregator:
    def __init__(self, symbols: list[str], bar_seconds: int):
        self.bar_seconds = bar_seconds
        self.state = {s: None for s in symbols}

    def _bar_bucket_start(self, ts: pd.Timestamp) -> pd.Timestamp:
        epoch = int(ts.timestamp())
        start = epoch - (epoch % self.bar_seconds)
        return pd.to_datetime(start, unit="s", utc=True)

    def add_tick(self, symbol: str, ts: pd.Timestamp, price: float, size: float):
        start = self._bar_bucket_start(ts)
        cur = self.state.get(symbol)
        if cur is None or cur["start"] != start:
            if cur is not None:
                finished = cur.copy()
                finished["time"] = cur["start"] + pd.Timedelta(seconds=self.bar_seconds)
                finished.pop("start", None)
                self.state[symbol] = {"start": start, "o": price, "h": price, "l": price, "c": price, "v": size}
                return finished
            self.state[symbol] = {"start": start, "o": price, "h": price, "l": price, "c": price, "v": size}
            return None
        cur["c"] = price
        if price > cur["h"]:
            cur["h"] = price
        if price < cur["l"]:
            cur["l"] = price
        cur["v"] = cur.get("v", 0) + (size or 0)
        return None

async def ticks_to_bars(tick_queue: asyncio.Queue, bars_queue: asyncio.Queue, symbols: list[str], bar_seconds: int):
    agg = BarAggregator(symbols, bar_seconds)
    while True:
        sym, ts, price, size = await tick_queue.get()
        finished = agg.add_tick(sym, ts, price, size)
        if finished:
            bar = {
                "symbol": sym,
                "open": finished["o"],
                "high": finished["h"],
                "low": finished["l"],
                "close": finished["c"],
                "volume": finished.get("v", 0.0),
                "time": finished["time"],
            }
            await bars_queue.put(bar)

async def run_ws_layer(symbol_map: dict):
    tick_q = asyncio.Queue()
    bar_q = asyncio.Queue()

    tasks = [
        asyncio.create_task(ws_stream("us", [symbol_map["stocks"]["ws"]], tick_q)),
        asyncio.create_task(ws_stream("forex", [symbol_map["forex"]["ws"]], tick_q)),
        asyncio.create_task(ws_stream("crypto", [symbol_map["crypto"]["ws"]], tick_q)),
        asyncio.create_task(ticks_to_bars(tick_q, bar_q, [symbol_map["stocks"]["ws"],
                                                          symbol_map["forex"]["ws"],
                                                          symbol_map["crypto"]["ws"]], BAR_SECONDS)),
    ]
    return tick_q, bar_q, tasks

The ws_stream function subscribes to a feed and pipes raw ticks into a queue. Each tick carries a symbol, a timestamp, a trade price, and size. The BarAggregator then buckets these ticks into bars of fixed length, updating highs, lows, and volumes on the fly.


Finally, run_ws_layer wires everything together. It launches one WebSocket stream for each asset class and a background task that turns raw ticks into bars. The output queue gives agents a clean, real-time bar feed across stocks, forex, and crypto.


Indicators

Each agent relies on a few technical tools. We keep this lightweight, with one core indicator assigned to each market.


The simple moving average works best for stocks, offering a clean view of trend direction. RSI fits forex, where short bursts of overbought and oversold conditions are common. For crypto, Donchian channels give a breakout-friendly structure that matches its volatile nature.



# 5. INDICATORS

def sma(series: pd.Series, n: int) -> pd.Series:
    return series.rolling(n, min_periods=n).mean()

def rsi(close: pd.Series, n: int = 14) -> pd.Series:
    delta = close.diff()
    gain = delta.clip(lower=0.0)
    loss = -delta.clip(upper=0.0)
    avg_gain = gain.ewm(alpha=1/n, adjust=False).mean()
    avg_loss = loss.ewm(alpha=1/n, adjust=False).mean()
    rs = avg_gain / avg_loss.replace(0, np.nan)
    rsi = 100 - (100 / (1 + rs))
    return rsi.fillna(50.0)

def donchian(high: pd.Series, low: pd.Series, n: int = 20) -> pd.DataFrame:
    upper = high.rolling(n, min_periods=n).max()
    lower = low.rolling(n, min_periods=n).min()
    mid = (upper + lower) / 2.0
    return pd.DataFrame({"upper": upper, "lower": lower, "mid": mid})

These three are enough. They are simple to compute, well-tested, and map directly to the different market behaviors our agents face.


Strategies

Indicators by themselves do nothing. What matters is how we turn them into rules that the agents can follow. Each market gets a strategy that reflects its character.


For stocks, we use a simple moving average crossover. This gives clean entries when a fast line crosses above a slow line, with an optional confirmation that price stays above the fast line.


Forex gets a mean reversion play. Here the RSI looks for oversold dips, then holds the long until conditions normalize. It is a patient approach that fits currency pairs.


Crypto runs on Donchian breakouts. Price moving above the upper band starts a trade, and exits are flexible: either on a midline cross or a deeper retreat to the lower band.



# 6. STRATEGIES

def sma_cross_strategy(df: pd.DataFrame, fast: int, slow: int, confirm_above_fast: bool = True) -> pd.Series:
    fast_sma = sma(df["close"], fast)
    slow_sma = sma(df["close"], slow)
    signal = (fast_sma > slow_sma).astype(int)
    if confirm_above_fast:
        signal &= (df["close"] > fast_sma)
    return signal

def rsi_mean_revert_strategy(df: pd.DataFrame, period: int, entry_rsi: float, exit_rsi: float) -> pd.Series:
    rsi_vals = rsi_wilder(df["close"], period)
    long_cond = (rsi_vals < entry_rsi)
    exit_cond = (rsi_vals >= exit_rsi)
    signal = pd.Series(0, index=df.index)
    in_pos = False
    for i in range(len(df)):
        if not in_pos and long_cond.iat[i]:
            signal.iat[i] = 1
            in_pos = True
        elif in_pos and exit_cond.iat[i]:
            in_pos = False
        elif in_pos:
            signal.iat[i] = 1
    return signal

def donchian_breakout_strategy(df: pd.DataFrame, window: int, exit_rule: str = "midline") -> pd.Series:
    dc = donchian(df["high"], df["low"], window)
    long_cond = df["close"] > dc["upper"]
    if exit_rule == "midline":
        exit_cond = df["close"] < dc["mid"]
    else:
        exit_cond = df["close"] < dc["lower"]  # alternative exit
    signal = pd.Series(0, index=df.index)
    in_pos = False
    for i in range(len(df)):
        if not in_pos and long_cond.iat[i]:
            signal.iat[i] = 1
            in_pos = True
        elif in_pos and exit_cond.iat[i]:
            in_pos = False
        elif in_pos:
            signal.iat[i] = 1
    return signal

Each strategy reduces to a binary signal. The SMA crossover goes long when the fast average clears the slow, with an option to confirm above the fast line. The RSI strategy enters when RSI falls below a set level and holds until it exits above another, using a simple in-position flag to avoid constant flip-flops.


The Donchian breakout triggers on a close above the channel’s upper band, with exits defined by either the midline or lower band. Window length is key here: shorter windows catch more moves, longer ones focus on bigger trends.


The Agent Class

An agent is a self-contained trader. It knows its market, pulls data, builds signals, and manages its own position. The class below is the core that ties strategy and execution together.



# 7. AGENT CLASS

class Agent:
    def __init__(self, name: str, market_key: str):
        self.name = name
        self.market = market_key
        self.rest_symbol = SYMBOLS[market_key]["rest"]
        self.ws_symbol = SYMBOLS[market_key]["ws"]
        self.params = STRATEGY_PARAMS[market_key]

        self.df = None            # historical bars
        self.signal = None        # 1/0 series aligned with df
        self.position = 0         # 0 or +1 (long only for clarity)
        self.entry_price = None
        self.stop = None
        self.take_profit = None

    def load_history(self):
        self.df = get_intraday_rest(self.rest_symbol, INTRADAY_INTERVAL, LOOKBACK_DAYS)
        return self.df

    def _build_signals(self, df: pd.DataFrame) -> pd.Series:
        t = self.params["type"]
        if t == "sma_cross":
            return sma_cross_strategy(df, self.params["fast"], self.params["slow"], self.params.get("confirm_above_fast", True))
        if t == "rsi_mean_revert":
            return rsi_mean_revert_strategy(df, self.params["period"], self.params["entry_rsi"], self.params["exit_rsi"])
        if t == "donchian_breakout":
            return donchian_breakout_strategy(df, self.params["window"], self.params.get("exit_rule", "midline"))
        raise ValueError(f"Unknown strategy type: {t}")

    def prepare(self, df: pd.DataFrame | None = None):
        if df is not None:
            self.df = df.copy()
        elif self.df is None:
            self.load_history()
        self.signal = self._build_signals(self.df)
        return self.signal

    def _risk_sizing(self, price: float, risk_per_trade: float, equity: float, stop_pct: float) -> float:
        risk_amount = equity * risk_per_trade
        stop_dist = max(price * stop_pct, 1e-8)
        qty = risk_amount / stop_dist
        return max(qty, 0.0)

    def _set_protections(self, entry: float):
        self.stop = entry * (1.0 - RISK["stop_pct"])
        self.take_profit = entry * (1.0 + RISK["take_profit_pct"])

    # Live/on-bar execution: call at each completed bar
    # bar = {"symbol": str, "open": float, "high": float, "low": float, "close": float, "time": pd.Timestamp}
    def on_bar(self, bar: dict, equity: float) -> dict | None:
        if bar["symbol"] != self.ws_symbol:
            return None

        # Update rolling df with the latest bar for signal continuity
        row = pd.DataFrame(
            {"open":[bar["open"]], "high":[bar["high"]], "low":[bar["low"]], "close":[bar["close"]], "volume":[bar.get("volume", 0.0)]},
            index=pd.DatetimeIndex([bar["time"]], name="time")
        )
        self.df = pd.concat([self.df, row]).iloc[-5000:] if self.df is not None else row
        sig = self._build_signals(self.df).iloc[-1]

        events = None
        price = bar["close"]

        # Exit conditions (stop/TP) checked first
        if self.position == 1:
            if self.stop is not None and bar["low"] <= self.stop:
                exit_px = self.stop
                events = {"agent": self.name, "action": "stop", "price": exit_px, "time": bar["time"]}
                self.position = 0; self.entry_price = None; self.stop = None; self.take_profit = None
                return events
            if self.take_profit is not None and bar["high"] >= self.take_profit:
                exit_px = self.take_profit
                events = {"agent": self.name, "action": "take_profit", "price": exit_px, "time": bar["time"]}
                self.position = 0; self.entry_price = None; self.stop = None; self.take_profit = None
                return events

        # Strategy-driven exits/entries
        if self.position == 1 and sig == 0:
            events = {"agent": self.name, "action": "exit", "price": price, "time": bar["time"]}
            self.position = 0; self.entry_price = None; self.stop = None; self.take_profit = None
        elif self.position == 0 and sig == 1:
            qty = self._risk_sizing(price, RISK["risk_per_trade"], equity, RISK["stop_pct"])
            if qty > 0:
                self.position = 1
                self.entry_price = price
                self._set_protections(price)
                events = {"agent": self.name, "action": "entry", "price": price, "qty": qty, "time": bar["time"]}
        return events

The lifecycle is simple. prepare loads history, builds signals, and leaves the agent ready. In live contexts, on_bar receives a completed bar, updates the rolling DataFrame, recomputes the latest signal, and checks for exits first. Stops and take profit are enforced before any new decision. That makes the behavior deterministic and easy to reason about.


Sizing is risk based. The quantity is derived from account equity and a fixed percentage risk per trade. The stop distance defines how large a position can be without exceeding that risk. This gives the system consistent bet sizing across different price levels.


State is minimal on purpose. The agent tracks position, entry price, and two protective levels. It does not manage cash, fees, or slippage here. Those belong to a portfolio layer or an execution engine. This keeps the class focused and testable.


Finally, the agent is strategy agnostic. The same skeleton runs a moving average trader, an RSI mean reverter, or a Donchian breakout hunter. Swapping behavior is a matter of changing parameters, not rewriting code.


Backtesting

Backtesting lets each agent prove its logic on history before we think about live use. Signals are converted to positions on the next bar, then returns flow into an equity curve and basic metrics.



# 8. BACKTESTING

def _position_from_signal(sig: pd.Series) -> pd.Series:
    # enter/exit on next bar
    return sig.ffill().shift(1).fillna(0).astype(float)

def backtest_agent(agent: Agent, initial_equity: float = 1_00000.0) -> dict:
    if agent.df is None or agent.signal is None:
        agent.prepare()

    df = agent.df.copy()
    sig = agent.signal.reindex(df.index).fillna(0).astype(int)
    pos = _position_from_signal(sig)

    ret = df["close"].pct_change().fillna(0.0)
    strat_ret = pos * ret

    equity = (1.0 + strat_ret).cumprod() * initial_equity

    # metrics
    cum_ret = equity.iloc[-1] / equity.iloc[0] - 1.0
    ann_factor = max(1, int((6.5*60) / int(INTRADAY_INTERVAL.replace("m","").replace("h","60")))) * 252  # rough bars/year
    sharpe = (strat_ret.mean() * ann_factor) / (strat_ret.std(ddof=0) + 1e-12) if strat_ret.std(ddof=0) > 0 else 0.0
    roll_max = equity.cummax()
    drawdown = equity / roll_max - 1.0
    max_dd = drawdown.min()

    out = pd.DataFrame({"close": df["close"], "signal": sig, "pos": pos, "ret": strat_ret, "equity": equity})
    result = {
        "agent": agent.name,
        "market": agent.market,
        "equity_curve": out,
        "metrics": {
            "cum_return": float(cum_ret),
            "sharpe": float(sharpe),
            "max_drawdown": float(max_dd),
        },
    }

    if LOGGING.get("equity_csv", False):
        out.to_csv(f"equity_{agent.name}.csv")

    return result

def combine_portfolio(results: list[dict], initial_equity: float = PORTFOLIO["initial_equity"]) -> dict:
    if not results:
        raise ValueError("No agent results to combine.")

    # equal weights
    n = len(results)
    weights = {r["agent"]: 1.0 / n for r in results}

    # align returns on a common index
    rets = []
    for r in results:
        df = r["equity_curve"][["ret"]].rename(columns={"ret": r["agent"]})
        rets.append(df)
    rets = pd.concat(rets, axis=1).fillna(0.0)

    port_ret = sum(rets[col] * w for col, w in weights.items())
    port_equity = (1.0 + port_ret).cumprod() * initial_equity

    # metrics
    ann_factor = max(1, int((6.5*60) / int(INTRADAY_INTERVAL.replace("m","").replace("h","60")))) * 252
    sharpe = (port_ret.mean() * ann_factor) / (port_ret.std(ddof=0) + 1e-12) if port_ret.std(ddof=0) > 0 else 0.0
    roll_max = port_equity.cummax()
    drawdown = port_equity / roll_max - 1.0
    max_dd = drawdown.min()
    cum_ret = port_equity.iloc[-1] / port_equity.iloc[0] - 1.0

    port_df = pd.DataFrame({"ret": port_ret, "equity": port_equity})
    if LOGGING.get("equity_csv", False):
        port_df.to_csv("portfolio_equity.csv")

    return {
        "weights": weights,
        "equity_curve": port_df,
        "metrics": {
            "cum_return": float(cum_ret),
            "sharpe": float(sharpe),
            "max_drawdown": float(max_dd),
        },
    }

Two choices stand out here. Positions follow signals with a one bar delay, which removes look ahead and keeps results honest. Metrics are kept to the essentials so readers can compare agents quickly.


The portfolio combiner is intentionally plain. It aligns return series, applies equal weights, and reports aggregate metrics. This is enough to show the benefit of specialization without getting lost in allocation theory.


Orchestration

This is the binder. We place agents, data pipes, and logging in one spot so the system reads like a single unit. Think of it as the control room that routes bars to the right agent and records what happens.



# 9. ORCHESTRATION

import os
from collections import defaultdict

class TradeLogger:
    def __init__(self, path="trades.csv"):
        self.path = path
        self._init = False

    def log(self, row: dict):
        df = pd.DataFrame([row])
        header = not os.path.exists(self.path) or not self._init
        df.to_csv(self.path, mode="a", index=False, header=header)
        self._init = True

async def live_ws_orchestrator():
    agents = [
        Agent("stocks_agent", "stocks"),
        Agent("forex_agent",  "forex"),
        Agent("crypto_agent", "crypto"),
    ]
    # preload history for continuity
    for a in agents:
        a.load_history()
        a.prepare()

    alloc = PORTFOLIO["initial_equity"] / len(agents)
    qty_map = defaultdict(float)   # agent_name -> open qty
    equity_map = {a.name: alloc for a in agents}
    sym_to_agent = {a.ws_symbol: a for a in agents}
    logger = TradeLogger("trades_live.csv")

    _, bars_q, tasks = await run_ws_layer(SYMBOLS)

    try:
        while True:
            bar = await bars_q.get()
            a = sym_to_agent.get(bar["symbol"])
            if a is None:
                continue
            ev = a.on_bar(bar, equity_map[a.name])
            if not ev:
                continue

            action = ev["action"]
            price = float(ev["price"])
            ts = pd.to_datetime(ev["time"])

            if action == "entry":
                qty = float(ev["qty"])
                qty_map[a.name] = qty
                logger.log({"time": ts, "agent": a.name, "market": a.market, "action": "entry", "price": price, "qty": qty})
            else:  # exit/stop/take_profit
                qty = qty_map.get(a.name, 0.0)
                pnl = qty * (price - (a.entry_price or price))
                equity_map[a.name] += pnl
                qty_map[a.name] = 0.0
                logger.log({"time": ts, "agent": a.name, "market": a.market, "action": action, "price": price, "qty": qty, "pnl": pnl})
    finally:
        for t in tasks:
            t.cancel()
        await asyncio.gather(*tasks, return_exceptions=True)

The flow is straightforward. We create three agents, preload history for signal continuity, and give each an equal slice of portfolio equity. A symbol map points incoming bars to the correct agent.


run_ws_layer supplies a shared bar queue. Each completed bar is passed to on_bar, which may return an entry or exit event. Entries record size and price. Exits compute realized PnL and update the agent’s equity. Everything is written to a CSV through a tiny logger.


This section is architectural, not a live demo. It shows how the pieces fit so you can swap the source later. The same orchestration works with a REST polling loop or even a simulator feed, as long as bars arrive in the same format.


Wrapping Up

We started from the ground up: setting imports, defining configuration, building data layers, adding indicators, implementing strategies, wiring agents, backtesting them, and finally bringing everything together in orchestration. The goal was to outline a clean architecture for a multi-agent trading framework.


This is not a production-ready system. It is a blueprint. Running it live requires more like execution infrastructure, risk management, and scaling considerations. Those are the natural next steps if you want to take this further.


All of it rests on reliable data. EODHD makes that part simple, providing the historical and live market feeds that plug directly into a setup like this. With that being said, you’ve reached the end of the article. Hope you learned something new and useful today.

Bring information-rich articles and research works straight to your inbox (it's not that hard). 

Thanks for subscribing!

© 2023 by InsightBig. Powered and secured by Wix

bottom of page