Building a Multi-Agent Trading System with Python
- Nikhil Adithyan
- 2 days ago
- 13 min read
A comprehensive approach to building a unified framework

Introduction
Most trading bots try to handle everything at once. They trade stocks, forex, and crypto in the same way, which makes them rigid and fragile. Markets behave differently, and one strategy rarely fits all.
Hedge funds solve this by using multiple agents. Each one specializes in a market, and the system coordinates them together. This design is more resilient and adapts better to shocks.
A recent paper, HedgeAgents: A Balanced-aware Multi-agent Financial Trading System, takes this idea to the extreme with reinforcement learning. Our goal here is more practical: build a simplified, production-like foundation that captures the same multi-agent spirit.
We will use EODHD’s historical and real-time data to power this framework. This article is about the architecture, not live execution. Think of it as the blueprint for a system you can later extend into production.
Why Multi-Agent?
When a single trading bot is asked to handle every market, it often breaks down. Stocks, forex, and crypto behave differently, and forcing one system to manage them all usually means it underperforms everywhere.
Take stocks for example. They often reward trend-following behavior. Forex, on the other hand, gravitates toward mean reversion where price swings eventually pull back. Crypto is far more erratic, moving in bursts that defy both patterns.
A multi-agent system accepts this reality. Instead of squeezing everything into one box, it creates specialists. Each agent focuses on its own market, learns the rhythms there, and executes with strategies that fit.
The real strength of this design is how these agents come together. By running side by side, they cover weaknesses and balance each other out. And with EODHD’s data across all three asset classes, building such a system in practice becomes much more accessible.
Setting Up the Foundation
Before building agents, we need a clean foundation. That means importing only the packages we will actually use. No extras, no clutter.
# 1. IMPORTING PACKAGES
import asyncio
import json
import time
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import requests
import websockets
This set gives us everything required for async tasks, data requests, and handling time series. Pandas and NumPy form the backbone for analysis, while requests and websockets handle the two ways we connect to EODHD’s API.
With the basics ready, the next step is configuration. Here we define our API token, endpoints, symbol mappings, and strategy parameters.
# 2. CONFIGURATION & SYMBOL MAPPING
EODHD_API_TOKEN = "YOUR EODHD API KEY"
API_BASE_REST = "https://eodhd.com/api"
WS_ENDPOINTS = {
"us": "wss://ws.eodhistoricaldata.com/ws/us",
"forex": "wss://ws.eodhistoricaldata.com/ws/forex",
"crypto": "wss://ws.eodhistoricaldata.com/ws/crypto",
}
RUN_MODE = {"backtest": True, "live_ws": False, "live_poll_rest": False}
assert sum(int(v) for v in RUN_MODE.values()) == 1
INTRADAY_INTERVAL = "5m"
BAR_SECONDS = 5 * 60
LOOKBACK_DAYS = 30
TIMEZONE = "UTC"
SYMBOLS = {
"stocks": {"rest": "AAPL.US", "ws": "AAPL"},
"forex": {"rest": "EUR.FOREX", "ws": "EURUSD"},
"crypto": {"rest": "BTC-USD.CC", "ws": "BTC-USD"},
}
STRATEGY_PARAMS = {
"stocks": {"type": "sma_cross", "fast": 20, "slow": 50, "confirm_above_fast": True},
"forex": {"type": "rsi_mean_revert", "period": 14, "entry_rsi": 30, "exit_rsi": 50},
"crypto": {"type": "donchian_breakout", "window": 20, "exit_rule": "midline", "use_trailing_atr": False},
}
RISK = {
"risk_per_trade": 0.01,
"max_leverage": 1.0,
"stop_pct": 0.01,
"take_profit_pct": 0.02,
}
PORTFOLIO = {
"initial_equity": 100000.0,
"allocation": "equal",
"allow_short": False,
}
NET = {"rest_timeout": 20, "max_retries": 3, "retry_backoff_sec": 1.5}
LOGGING = {"trades_csv": True, "equity_csv": True}
There are a few key things happening here. Symbols are mapped differently for REST and WebSocket calls, so we define both. Each market also has its own strategy parameters. Stocks get a moving average crossover, forex relies on RSI, and crypto follows breakouts. Risk and portfolio settings keep the system controlled and realistic.
This configuration file is the heart of flexibility. With a single change here, we can adjust strategies, markets, or run mode without touching the rest of the code.
Data Layer: Historical (REST)
The first piece of our data layer is pulling historical intraday prices using EODHD’s Intraday Historical endpoint. This is what lets agents test strategies before they ever touch live markets.
# 3. DATA LAYER 1 (REST INTRADAY)
def _to_unix(dt: datetime) -> int:
return int(dt.timestamp())
def _request_json(url: str, params: dict, timeout: int, retries: int, backoff: float):
for i in range(retries):
try:
r = requests.get(url, params=params, timeout=timeout)
if r.status_code == 200:
return r.json()
except requests.RequestException:
pass
time.sleep(backoff * (2 ** i))
raise RuntimeError(f"Failed REST call: {url}")
def _normalize_intraday(payload) -> pd.DataFrame:
if not payload:
return pd.DataFrame(columns=["open","high","low","close","volume"])
df = pd.DataFrame(payload)
# timestamp field variants
if "t" in df.columns:
ts = pd.to_datetime(df["t"], unit="s", utc=True)
elif "timestamp" in df.columns and np.issubdtype(df["timestamp"].dtype, np.integer):
ts = pd.to_datetime(df["timestamp"], unit="s", utc=True)
elif "datetime" in df.columns:
ts = pd.to_datetime(df["datetime"], utc=True)
else:
raise ValueError("No recognizable time field in intraday payload")
# price field variants
o = df["open"] if "open" in df.columns else df.get("o")
h = df["high"] if "high" in df.columns else df.get("h")
l = df["low"] if "low" in df.columns else df.get("l")
c = df["close"] if "close" in df.columns else df.get("c")
v = df["volume"] if "volume" in df.columns else df.get("v", pd.Series([np.nan]*len(df)))
out = pd.DataFrame(
{"open": o, "high": h, "low": l, "close": c, "volume": v},
index=ts
).astype({"open":"float64","high":"float64","low":"float64","close":"float64"})
out.index.name = "time"
out = out.sort_index()
return out
def get_intraday_rest(symbol_rest: str, interval: str = INTRADAY_INTERVAL,
days: int = LOOKBACK_DAYS, api_token: str = EODHD_API_TOKEN,
base_url: str = API_BASE_REST, net: dict = NET) -> pd.DataFrame:
to_dt = datetime.utcnow()
from_dt = to_dt - timedelta(days=days)
url = f"{base_url}/intraday/{symbol_rest}"
params = {
"api_token": api_token,
"interval": interval,
"from": _to_unix(from_dt),
"to": _to_unix(to_dt),
"fmt": "json",
"order": "a",
}
payload = _request_json(url, params, net["rest_timeout"], net["max_retries"], net["retry_backoff_sec"])
df = _normalize_intraday(payload)
# enforce exact bar grid (guards against clock skew/gaps)
rule = {"1m": "1min", "5m": "5min", "1h": "1H"}.get(interval, "5min")
df = df.resample(rule, label="right", closed="right").agg(
{"open":"first","high":"max","low":"min","close":"last","volume":"sum"}
).dropna(subset=["open","high","low","close"])
return df
The function chain works like this. requestjson makes the API call with retries so temporary errors don’t kill the run. normalizeintraday cleans the raw payload since EODHD responses can vary in field names. Finally, get_intraday_rest ties it together by building the URL, pulling data, and resampling into fixed bars.
The result is a tidy DataFrame with consistent timestamps. This makes sure our agents backtest on an even grid, without missing bars or irregular gaps.
Data Layer: Real-Time (WebSocket)
If the REST layer handles history, the WebSocket layer is where agents breathe live market data. This piece listens to continuous tick feeds from stocks, forex, and crypto (extracted using EODHD WebSocket), then shapes them into bar data that strategies can actually use.
# 4. DATA LAYER 2 (WEBSOCKET)
def _ws_url(feed_key: str) -> str:
return f"{WS_ENDPOINTS[feed_key]}?api_token={EODHD_API_TOKEN}"
def _subscribe_payload(symbols: list[str]) -> str:
return json.dumps({"action": "subscribe", "symbols": ",".join(symbols)})
async def ws_stream(feed_key: str, symbols: list[str], out_tick_queue: asyncio.Queue):
url = _ws_url(feed_key)
sub_msg = _subscribe_payload(symbols)
while True:
try:
async with websockets.connect(url, ping_interval=20, ping_timeout=20) as ws:
await ws.send(sub_msg)
async for raw in ws:
try:
msg = json.loads(raw)
sym = msg.get("s") or msg.get("symbol")
if not sym:
continue
ts = msg.get("t") or msg.get("timestamp") or msg.get("time")
if ts is None:
continue
if isinstance(ts, (int, float)):
ts = pd.to_datetime(ts, unit="s", utc=True)
else:
ts = pd.to_datetime(ts, utc=True)
price = (msg.get("p") or msg.get("price") or
msg.get("last") or msg.get("c"))
if price is None:
continue
size = msg.get("q") or msg.get("size") or msg.get("volume") or 0
await out_tick_queue.put((sym, ts, float(price), float(size)))
except Exception:
continue
except Exception:
await asyncio.sleep(2.0)
class BarAggregator:
def __init__(self, symbols: list[str], bar_seconds: int):
self.bar_seconds = bar_seconds
self.state = {s: None for s in symbols}
def _bar_bucket_start(self, ts: pd.Timestamp) -> pd.Timestamp:
epoch = int(ts.timestamp())
start = epoch - (epoch % self.bar_seconds)
return pd.to_datetime(start, unit="s", utc=True)
def add_tick(self, symbol: str, ts: pd.Timestamp, price: float, size: float):
start = self._bar_bucket_start(ts)
cur = self.state.get(symbol)
if cur is None or cur["start"] != start:
if cur is not None:
finished = cur.copy()
finished["time"] = cur["start"] + pd.Timedelta(seconds=self.bar_seconds)
finished.pop("start", None)
self.state[symbol] = {"start": start, "o": price, "h": price, "l": price, "c": price, "v": size}
return finished
self.state[symbol] = {"start": start, "o": price, "h": price, "l": price, "c": price, "v": size}
return None
cur["c"] = price
if price > cur["h"]:
cur["h"] = price
if price < cur["l"]:
cur["l"] = price
cur["v"] = cur.get("v", 0) + (size or 0)
return None
async def ticks_to_bars(tick_queue: asyncio.Queue, bars_queue: asyncio.Queue, symbols: list[str], bar_seconds: int):
agg = BarAggregator(symbols, bar_seconds)
while True:
sym, ts, price, size = await tick_queue.get()
finished = agg.add_tick(sym, ts, price, size)
if finished:
bar = {
"symbol": sym,
"open": finished["o"],
"high": finished["h"],
"low": finished["l"],
"close": finished["c"],
"volume": finished.get("v", 0.0),
"time": finished["time"],
}
await bars_queue.put(bar)
async def run_ws_layer(symbol_map: dict):
tick_q = asyncio.Queue()
bar_q = asyncio.Queue()
tasks = [
asyncio.create_task(ws_stream("us", [symbol_map["stocks"]["ws"]], tick_q)),
asyncio.create_task(ws_stream("forex", [symbol_map["forex"]["ws"]], tick_q)),
asyncio.create_task(ws_stream("crypto", [symbol_map["crypto"]["ws"]], tick_q)),
asyncio.create_task(ticks_to_bars(tick_q, bar_q, [symbol_map["stocks"]["ws"],
symbol_map["forex"]["ws"],
symbol_map["crypto"]["ws"]], BAR_SECONDS)),
]
return tick_q, bar_q, tasks
The ws_stream function subscribes to a feed and pipes raw ticks into a queue. Each tick carries a symbol, a timestamp, a trade price, and size. The BarAggregator then buckets these ticks into bars of fixed length, updating highs, lows, and volumes on the fly.
Finally, run_ws_layer wires everything together. It launches one WebSocket stream for each asset class and a background task that turns raw ticks into bars. The output queue gives agents a clean, real-time bar feed across stocks, forex, and crypto.
Indicators
Each agent relies on a few technical tools. We keep this lightweight, with one core indicator assigned to each market.
The simple moving average works best for stocks, offering a clean view of trend direction. RSI fits forex, where short bursts of overbought and oversold conditions are common. For crypto, Donchian channels give a breakout-friendly structure that matches its volatile nature.
# 5. INDICATORS
def sma(series: pd.Series, n: int) -> pd.Series:
return series.rolling(n, min_periods=n).mean()
def rsi(close: pd.Series, n: int = 14) -> pd.Series:
delta = close.diff()
gain = delta.clip(lower=0.0)
loss = -delta.clip(upper=0.0)
avg_gain = gain.ewm(alpha=1/n, adjust=False).mean()
avg_loss = loss.ewm(alpha=1/n, adjust=False).mean()
rs = avg_gain / avg_loss.replace(0, np.nan)
rsi = 100 - (100 / (1 + rs))
return rsi.fillna(50.0)
def donchian(high: pd.Series, low: pd.Series, n: int = 20) -> pd.DataFrame:
upper = high.rolling(n, min_periods=n).max()
lower = low.rolling(n, min_periods=n).min()
mid = (upper + lower) / 2.0
return pd.DataFrame({"upper": upper, "lower": lower, "mid": mid})
These three are enough. They are simple to compute, well-tested, and map directly to the different market behaviors our agents face.
Strategies
Indicators by themselves do nothing. What matters is how we turn them into rules that the agents can follow. Each market gets a strategy that reflects its character.
For stocks, we use a simple moving average crossover. This gives clean entries when a fast line crosses above a slow line, with an optional confirmation that price stays above the fast line.
Forex gets a mean reversion play. Here the RSI looks for oversold dips, then holds the long until conditions normalize. It is a patient approach that fits currency pairs.
Crypto runs on Donchian breakouts. Price moving above the upper band starts a trade, and exits are flexible: either on a midline cross or a deeper retreat to the lower band.
# 6. STRATEGIES
def sma_cross_strategy(df: pd.DataFrame, fast: int, slow: int, confirm_above_fast: bool = True) -> pd.Series:
fast_sma = sma(df["close"], fast)
slow_sma = sma(df["close"], slow)
signal = (fast_sma > slow_sma).astype(int)
if confirm_above_fast:
signal &= (df["close"] > fast_sma)
return signal
def rsi_mean_revert_strategy(df: pd.DataFrame, period: int, entry_rsi: float, exit_rsi: float) -> pd.Series:
rsi_vals = rsi_wilder(df["close"], period)
long_cond = (rsi_vals < entry_rsi)
exit_cond = (rsi_vals >= exit_rsi)
signal = pd.Series(0, index=df.index)
in_pos = False
for i in range(len(df)):
if not in_pos and long_cond.iat[i]:
signal.iat[i] = 1
in_pos = True
elif in_pos and exit_cond.iat[i]:
in_pos = False
elif in_pos:
signal.iat[i] = 1
return signal
def donchian_breakout_strategy(df: pd.DataFrame, window: int, exit_rule: str = "midline") -> pd.Series:
dc = donchian(df["high"], df["low"], window)
long_cond = df["close"] > dc["upper"]
if exit_rule == "midline":
exit_cond = df["close"] < dc["mid"]
else:
exit_cond = df["close"] < dc["lower"] # alternative exit
signal = pd.Series(0, index=df.index)
in_pos = False
for i in range(len(df)):
if not in_pos and long_cond.iat[i]:
signal.iat[i] = 1
in_pos = True
elif in_pos and exit_cond.iat[i]:
in_pos = False
elif in_pos:
signal.iat[i] = 1
return signal
Each strategy reduces to a binary signal. The SMA crossover goes long when the fast average clears the slow, with an option to confirm above the fast line. The RSI strategy enters when RSI falls below a set level and holds until it exits above another, using a simple in-position flag to avoid constant flip-flops.
The Donchian breakout triggers on a close above the channel’s upper band, with exits defined by either the midline or lower band. Window length is key here: shorter windows catch more moves, longer ones focus on bigger trends.
The Agent Class
An agent is a self-contained trader. It knows its market, pulls data, builds signals, and manages its own position. The class below is the core that ties strategy and execution together.
# 7. AGENT CLASS
class Agent:
def __init__(self, name: str, market_key: str):
self.name = name
self.market = market_key
self.rest_symbol = SYMBOLS[market_key]["rest"]
self.ws_symbol = SYMBOLS[market_key]["ws"]
self.params = STRATEGY_PARAMS[market_key]
self.df = None # historical bars
self.signal = None # 1/0 series aligned with df
self.position = 0 # 0 or +1 (long only for clarity)
self.entry_price = None
self.stop = None
self.take_profit = None
def load_history(self):
self.df = get_intraday_rest(self.rest_symbol, INTRADAY_INTERVAL, LOOKBACK_DAYS)
return self.df
def _build_signals(self, df: pd.DataFrame) -> pd.Series:
t = self.params["type"]
if t == "sma_cross":
return sma_cross_strategy(df, self.params["fast"], self.params["slow"], self.params.get("confirm_above_fast", True))
if t == "rsi_mean_revert":
return rsi_mean_revert_strategy(df, self.params["period"], self.params["entry_rsi"], self.params["exit_rsi"])
if t == "donchian_breakout":
return donchian_breakout_strategy(df, self.params["window"], self.params.get("exit_rule", "midline"))
raise ValueError(f"Unknown strategy type: {t}")
def prepare(self, df: pd.DataFrame | None = None):
if df is not None:
self.df = df.copy()
elif self.df is None:
self.load_history()
self.signal = self._build_signals(self.df)
return self.signal
def _risk_sizing(self, price: float, risk_per_trade: float, equity: float, stop_pct: float) -> float:
risk_amount = equity * risk_per_trade
stop_dist = max(price * stop_pct, 1e-8)
qty = risk_amount / stop_dist
return max(qty, 0.0)
def _set_protections(self, entry: float):
self.stop = entry * (1.0 - RISK["stop_pct"])
self.take_profit = entry * (1.0 + RISK["take_profit_pct"])
# Live/on-bar execution: call at each completed bar
# bar = {"symbol": str, "open": float, "high": float, "low": float, "close": float, "time": pd.Timestamp}
def on_bar(self, bar: dict, equity: float) -> dict | None:
if bar["symbol"] != self.ws_symbol:
return None
# Update rolling df with the latest bar for signal continuity
row = pd.DataFrame(
{"open":[bar["open"]], "high":[bar["high"]], "low":[bar["low"]], "close":[bar["close"]], "volume":[bar.get("volume", 0.0)]},
index=pd.DatetimeIndex([bar["time"]], name="time")
)
self.df = pd.concat([self.df, row]).iloc[-5000:] if self.df is not None else row
sig = self._build_signals(self.df).iloc[-1]
events = None
price = bar["close"]
# Exit conditions (stop/TP) checked first
if self.position == 1:
if self.stop is not None and bar["low"] <= self.stop:
exit_px = self.stop
events = {"agent": self.name, "action": "stop", "price": exit_px, "time": bar["time"]}
self.position = 0; self.entry_price = None; self.stop = None; self.take_profit = None
return events
if self.take_profit is not None and bar["high"] >= self.take_profit:
exit_px = self.take_profit
events = {"agent": self.name, "action": "take_profit", "price": exit_px, "time": bar["time"]}
self.position = 0; self.entry_price = None; self.stop = None; self.take_profit = None
return events
# Strategy-driven exits/entries
if self.position == 1 and sig == 0:
events = {"agent": self.name, "action": "exit", "price": price, "time": bar["time"]}
self.position = 0; self.entry_price = None; self.stop = None; self.take_profit = None
elif self.position == 0 and sig == 1:
qty = self._risk_sizing(price, RISK["risk_per_trade"], equity, RISK["stop_pct"])
if qty > 0:
self.position = 1
self.entry_price = price
self._set_protections(price)
events = {"agent": self.name, "action": "entry", "price": price, "qty": qty, "time": bar["time"]}
return events
The lifecycle is simple. prepare loads history, builds signals, and leaves the agent ready. In live contexts, on_bar receives a completed bar, updates the rolling DataFrame, recomputes the latest signal, and checks for exits first. Stops and take profit are enforced before any new decision. That makes the behavior deterministic and easy to reason about.
Sizing is risk based. The quantity is derived from account equity and a fixed percentage risk per trade. The stop distance defines how large a position can be without exceeding that risk. This gives the system consistent bet sizing across different price levels.
State is minimal on purpose. The agent tracks position, entry price, and two protective levels. It does not manage cash, fees, or slippage here. Those belong to a portfolio layer or an execution engine. This keeps the class focused and testable.
Finally, the agent is strategy agnostic. The same skeleton runs a moving average trader, an RSI mean reverter, or a Donchian breakout hunter. Swapping behavior is a matter of changing parameters, not rewriting code.
Backtesting
Backtesting lets each agent prove its logic on history before we think about live use. Signals are converted to positions on the next bar, then returns flow into an equity curve and basic metrics.
# 8. BACKTESTING
def _position_from_signal(sig: pd.Series) -> pd.Series:
# enter/exit on next bar
return sig.ffill().shift(1).fillna(0).astype(float)
def backtest_agent(agent: Agent, initial_equity: float = 1_00000.0) -> dict:
if agent.df is None or agent.signal is None:
agent.prepare()
df = agent.df.copy()
sig = agent.signal.reindex(df.index).fillna(0).astype(int)
pos = _position_from_signal(sig)
ret = df["close"].pct_change().fillna(0.0)
strat_ret = pos * ret
equity = (1.0 + strat_ret).cumprod() * initial_equity
# metrics
cum_ret = equity.iloc[-1] / equity.iloc[0] - 1.0
ann_factor = max(1, int((6.5*60) / int(INTRADAY_INTERVAL.replace("m","").replace("h","60")))) * 252 # rough bars/year
sharpe = (strat_ret.mean() * ann_factor) / (strat_ret.std(ddof=0) + 1e-12) if strat_ret.std(ddof=0) > 0 else 0.0
roll_max = equity.cummax()
drawdown = equity / roll_max - 1.0
max_dd = drawdown.min()
out = pd.DataFrame({"close": df["close"], "signal": sig, "pos": pos, "ret": strat_ret, "equity": equity})
result = {
"agent": agent.name,
"market": agent.market,
"equity_curve": out,
"metrics": {
"cum_return": float(cum_ret),
"sharpe": float(sharpe),
"max_drawdown": float(max_dd),
},
}
if LOGGING.get("equity_csv", False):
out.to_csv(f"equity_{agent.name}.csv")
return result
def combine_portfolio(results: list[dict], initial_equity: float = PORTFOLIO["initial_equity"]) -> dict:
if not results:
raise ValueError("No agent results to combine.")
# equal weights
n = len(results)
weights = {r["agent"]: 1.0 / n for r in results}
# align returns on a common index
rets = []
for r in results:
df = r["equity_curve"][["ret"]].rename(columns={"ret": r["agent"]})
rets.append(df)
rets = pd.concat(rets, axis=1).fillna(0.0)
port_ret = sum(rets[col] * w for col, w in weights.items())
port_equity = (1.0 + port_ret).cumprod() * initial_equity
# metrics
ann_factor = max(1, int((6.5*60) / int(INTRADAY_INTERVAL.replace("m","").replace("h","60")))) * 252
sharpe = (port_ret.mean() * ann_factor) / (port_ret.std(ddof=0) + 1e-12) if port_ret.std(ddof=0) > 0 else 0.0
roll_max = port_equity.cummax()
drawdown = port_equity / roll_max - 1.0
max_dd = drawdown.min()
cum_ret = port_equity.iloc[-1] / port_equity.iloc[0] - 1.0
port_df = pd.DataFrame({"ret": port_ret, "equity": port_equity})
if LOGGING.get("equity_csv", False):
port_df.to_csv("portfolio_equity.csv")
return {
"weights": weights,
"equity_curve": port_df,
"metrics": {
"cum_return": float(cum_ret),
"sharpe": float(sharpe),
"max_drawdown": float(max_dd),
},
}
Two choices stand out here. Positions follow signals with a one bar delay, which removes look ahead and keeps results honest. Metrics are kept to the essentials so readers can compare agents quickly.
The portfolio combiner is intentionally plain. It aligns return series, applies equal weights, and reports aggregate metrics. This is enough to show the benefit of specialization without getting lost in allocation theory.
Orchestration
This is the binder. We place agents, data pipes, and logging in one spot so the system reads like a single unit. Think of it as the control room that routes bars to the right agent and records what happens.
# 9. ORCHESTRATION
import os
from collections import defaultdict
class TradeLogger:
def __init__(self, path="trades.csv"):
self.path = path
self._init = False
def log(self, row: dict):
df = pd.DataFrame([row])
header = not os.path.exists(self.path) or not self._init
df.to_csv(self.path, mode="a", index=False, header=header)
self._init = True
async def live_ws_orchestrator():
agents = [
Agent("stocks_agent", "stocks"),
Agent("forex_agent", "forex"),
Agent("crypto_agent", "crypto"),
]
# preload history for continuity
for a in agents:
a.load_history()
a.prepare()
alloc = PORTFOLIO["initial_equity"] / len(agents)
qty_map = defaultdict(float) # agent_name -> open qty
equity_map = {a.name: alloc for a in agents}
sym_to_agent = {a.ws_symbol: a for a in agents}
logger = TradeLogger("trades_live.csv")
_, bars_q, tasks = await run_ws_layer(SYMBOLS)
try:
while True:
bar = await bars_q.get()
a = sym_to_agent.get(bar["symbol"])
if a is None:
continue
ev = a.on_bar(bar, equity_map[a.name])
if not ev:
continue
action = ev["action"]
price = float(ev["price"])
ts = pd.to_datetime(ev["time"])
if action == "entry":
qty = float(ev["qty"])
qty_map[a.name] = qty
logger.log({"time": ts, "agent": a.name, "market": a.market, "action": "entry", "price": price, "qty": qty})
else: # exit/stop/take_profit
qty = qty_map.get(a.name, 0.0)
pnl = qty * (price - (a.entry_price or price))
equity_map[a.name] += pnl
qty_map[a.name] = 0.0
logger.log({"time": ts, "agent": a.name, "market": a.market, "action": action, "price": price, "qty": qty, "pnl": pnl})
finally:
for t in tasks:
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
The flow is straightforward. We create three agents, preload history for signal continuity, and give each an equal slice of portfolio equity. A symbol map points incoming bars to the correct agent.
run_ws_layer supplies a shared bar queue. Each completed bar is passed to on_bar, which may return an entry or exit event. Entries record size and price. Exits compute realized PnL and update the agent’s equity. Everything is written to a CSV through a tiny logger.
This section is architectural, not a live demo. It shows how the pieces fit so you can swap the source later. The same orchestration works with a REST polling loop or even a simulator feed, as long as bars arrive in the same format.
Wrapping Up
We started from the ground up: setting imports, defining configuration, building data layers, adding indicators, implementing strategies, wiring agents, backtesting them, and finally bringing everything together in orchestration. The goal was to outline a clean architecture for a multi-agent trading framework.
This is not a production-ready system. It is a blueprint. Running it live requires more like execution infrastructure, risk management, and scaling considerations. Those are the natural next steps if you want to take this further.
All of it rests on reliable data. EODHD makes that part simple, providing the historical and live market feeds that plug directly into a setup like this. With that being said, you’ve reached the end of the article. Hope you learned something new and useful today.