Moving a Claude-driven research loop from notebook to production is not a modeling problem; it's an ops + safety problem. The working shape in April 2026: price-blind research prompt (Sonnet 4.6 with prompt caching) → structured probability → conviction-tier sizer → idempotent execution via Alpaca MCP V2 → append-only decision log + heartbeat + watchdog + circuit breaker. Under 600 lines of Python; infra runs on a small ARM box plus a free static-host dashboard, and a medium-universe loop typically lands in the low three figures of monthly LLM spend (your numbers will vary with universe size, retry rate, and cache hit rate, model it before you trust it). Below: the full scaffold walkthrough, file-by-file, with the specific tools on aifinhub.io that validate each layer.
What "production" means here
Not "HFT." Not "hedge fund infrastructure." The bar for a solo operator or 1–3 person team:
- Runs unattended on a cheap machine (a small home ARM box, a $5 VPS, or a single Cloudflare Worker).
- Correct under retry (no duplicate fills).
- Observable when it breaks (heartbeat + watchdog + Telegram).
- Auditable after the fact (append-only decision log).
- Cost-bounded (LLM budget ceiling + cost-per-validated-trade target).
- Vacation-proof (can run 4 weeks without intervention).
- Regulatory-safe (education, not advice — see BaFin + EU guide if EU-domiciled).
The shape of the project
A complete working scaffold breaks into five concerns. The exact filenames and folder names don't matter; the separation between them does.
- A pipeline of small numbered scripts (data fetch → research call → decision → execute → reconcile) so a cron/launchd job can run them in order and a failure on step N doesn't corrupt step N-1's output.
- A data folder with the OHLCV / fundamentals store (DuckDB or SQLite), a per-ticker research dossier directory, and two state files:
heartbeat.json(last successful cycle timestamp) andcircuit.json(pause flag + reason). - A memory folder with an append-only
decisions.jsonl— one JSON object per decision, never rewritten. This is the audit log. - A schedules folder with the launchd plists or systemd units that run the pipeline + watchdog + resume-reminder.
- A tests folder with at least three:
test_no_price_leak(the research-pack contains no price strings),test_sizing_caps(the sizer never exceeds the tier cap),test_idempotent_orders(the same idea on the same day produces one order, not many).
That's the whole scaffold. Everything below is what goes inside each piece.
Layer 1: Data
Fetch OHLCV + fundamentals once per cycle into DuckDB. Pick a vendor via Data-Vendor TCO Calculator.
# scripts/01_fetch_bars.py
from alpaca.data import StockHistoricalDataClient, StockBarsRequest
import duckdb, os, time
api_key = os.environ["ALPACA_API_KEY"]
api_secret = os.environ["ALPACA_API_SECRET"]
client = StockHistoricalDataClient(api_key, api_secret)
def fetch(tickers, lookback_days=60):
req = StockBarsRequest(
symbol_or_symbols=tickers,
timeframe="1Day",
start=None, # defaults to lookback window
)
bars = client.get_stock_bars(req).df
with duckdb.connect("data/bars.duckdb") as con:
con.register("incoming", bars.reset_index())
con.execute("""
CREATE TABLE IF NOT EXISTS bars AS SELECT * FROM incoming LIMIT 0;
INSERT INTO bars SELECT * FROM incoming
WHERE (timestamp, symbol) NOT IN (SELECT timestamp, symbol FROM bars);
""")
DuckDB on disk is perfect for this scale: ACID, columnar, zero-setup, ~2GB for 5 years of daily data on 500 tickers.
Layer 2: Research (price-blind)
The critical architectural boundary. The LLM never sees current price. It gets a research pack: filings excerpts, earnings transcripts, competitor context, macro regime summary. It returns a structured probability + thesis.
# scripts/02_research.py
import anthropic, json, os
from dataclasses import asdict, dataclass
SYSTEM = """You are an 8-step research analyst. Follow the template strictly.
NEVER mention prices, chart patterns, position sizes, or 'should buy/sell'.
Return only the JSON payload specified in step 8."""
@dataclass
class ResearchPack:
ticker_anonymous: str # e.g. SYNTHETIC_A
filings_excerpts: list[str]
earnings_transcripts: list[str]
competitor_context: list[str]
macro_regime_summary: str
# No prices. No positions. No PnL.
Def research(pack: ResearchPack) -> dict:
client = anthropic.Anthropic()
resp = client.messages.create(
model="claude-sonnet-4-6-20260115",
max_tokens=1500,
temperature=0,
system=[{"type": "text", "text": SYSTEM, "cache_control": {"type": "ephemeral"}}],
messages=[{"role": "user", "content": json.dumps(asdict(pack))}],
)
return json.loads(resp.content[0].text)
The cache_control: ephemeralon the system prompt is the cost lever, at 50% cache hit rate the effective input cost drops by ~67%. See Token-Cost Optimizer to model your specific loop.
Validate this layer: paste a real research prompt into the Prompt Regression Tester and confirm it never references price across Opus 4.5/4.6/4.7. Run extractions through the Hallucination Detector to catch fabricated numbers.
Layer 3: Decide (sizing)
The LLM emits probability. Your code decides position size via conviction-scaled Kelly with a per-trade cap.
# scripts/03_decide.py
import json, time, uuid
from pathlib import Path
def conviction_tier(p: float) -> str:
if p >= 0.85: return "SUPREME"
if p >= 0.70: return "HIGH"
if p >= 0.55: return "MEDIUM"
return "LOW"
def conviction_fraction(tier: str) -> float:
return {"SUPREME": 0.25, "HIGH": 0.15, "MEDIUM": 0.05, "LOW": 0.0}[tier]
def sized_fraction(p: float, b: float = 1.5, tier_cap: float = 0.04) -> float:
tier = conviction_tier(p)
kelly = max(0.0, (b * p - (1 - p)) / b)
return min(kelly * conviction_fraction(tier), tier_cap)
def decide(research_output: dict, current_bankroll_usd: float) -> dict:
p = research_output["probability_yes"]
fraction = sized_fraction(p)
notional = fraction * current_bankroll_usd
decision = {
"id": str(uuid.uuid4()),
"at": time.time(),
"ticker": research_output["ticker_real"], # mapped back here
"probability": p,
"tier": conviction_tier(p),
"fraction": fraction,
"notional_usd": notional,
"thesis": research_output["thesis"],
"invalidation": research_output["invalidation_conditions"],
}
# Append-only log (critical for audit).
With open("memory/decisions.jsonl", "a") as f:
f.write(json.dumps(decision) + "\n")
return decision
Validate: stress-test the sizing via the Kelly Sizer across thousands of Monte Carlo paths. Confirm the drawdown distribution is acceptable. Verify risk metrics on historical via Risk-Adjusted Returns.
Layer 4: Execute (idempotent)
The ONE rule: every order carries a client-supplied idempotency key. Retry on error never produces double fills.
# scripts/04_execute.py
from alpaca.trading import TradingClient, LimitOrderRequest, OrderSide, TimeInForce
client = TradingClient(api_key, api_secret, paper=True) # start paper!
Def execute(decision: dict, current_price: float):
qty = int(decision["notional_usd"] / current_price)
if qty == 0: return None
req = LimitOrderRequest(
symbol=decision["ticker"],
qty=qty,
side=OrderSide.BUY if decision["fraction"] > 0 else OrderSide.SELL,
time_in_force=TimeInForce.DAY,
limit_price=round(current_price * 1.001, 2),
client_order_id=decision["id"], # <— IDEMPOTENCY KEY
)
return client.submit_order(req)
****: the client_order_idis the decision UUID from the log. Alpaca dedupes on this key. If the request network-errors + you retry, the second call returns the existing order — no duplicate fill.
Validate: verify your MCP server / broker supports idempotency via the Finance MCP Directory. Alpaca V2 = grade A, supports it natively. Tradier community MCP = grade C, does not.
Layer 5: Reconcile
Post-trade: fetch actual fills, update decision log with realized state.
# scripts/05_reconcile.py
from alpaca.trading import TradingClient, GetOrdersRequest
import json
def reconcile():
orders = client.get_orders(GetOrdersRequest())
with open("memory/decisions.jsonl") as f:
decisions = [json.loads(l) for l in f]
unreconciled = [d for d in decisions if d.get("filled_at") is None]
for d in unreconciled:
order = next((o for o in orders if o.client_order_id == d["id"]), None)
if order and order.filled_qty:
with open("memory/fills.jsonl", "a") as f:
f.write(json.dumps({
"decision_id": d["id"],
"filled_at": order.filled_at.isoformat(),
"filled_qty": float(order.filled_qty),
"filled_avg_price": float(order.filled_avg_price),
}) + "\n")
Layer 6: Robustness
The layer that determines whether you can sleep.
# scripts/heartbeat.py
import json, time
from pathlib import Path
def write():
Path("data/heartbeat.json").write_text(json.dumps({
"at": time.time(),
"iso": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"status": "ok",
}))
# scripts/watchdog.py (independent process)
import json, time, subprocess
from pathlib import Path
MAX_AGE = 900 # 15 min during market hours
hb = json.loads(Path("data/heartbeat.json").read_text())
age = time.time() - hb["at"]
if age > MAX_AGE:
subprocess.run(["/usr/local/bin/telegram-notify", f"heartbeat stale: {age:.0f}s"])
Path("data/circuit.json").write_text(json.dumps({
"paused": True,
"reason": f"stale heartbeat ({age:.0f}s)",
"since": time.time(),
}))
Every pipeline layer checks data/circuit.jsonat startup:
def check_circuit():
if not Path("data/circuit.json").exists(): return True
return not json.loads(Path("data/circuit.json").read_text()).get("paused", False)
Resume is deliberately manual. See Heartbeats, Watchdogs, and Circuit Breakers for the full pattern.
Layer 7: Schedule (launchd)
<!-- com.you.trader-run.plist, every 5 minutes during market hours -->
<plist>
<dict>
<key>Label</key><string>com.you.trader-run</string>
<key>ProgramArguments</key>
<array>
<string>/opt/homebrew/bin/python3</string>
<string>/Users/you/trader/scripts/run.py</string>
</array>
<key>StartCalendarInterval</key>
<array>
<!--... Entries for every 5 minutes 09:30-16:00 ET Mon-Fri... -->
</array>
</dict>
</plist>
Three plists: run(pipeline), watchdog(every 15 min 24/7), notify-resume(daily 09:00). Total monthly cost: $0.
Cost envelope
A representative shape, at Sonnet 4.6 with 50% prompt caching, running 10 ideas/day × 5 calls × 8K input / 1.5K output with 15% retry:
| Line | Monthly |
|---|---|
| Data (Alpaca Algo Trader Plus or Polygon Starter) | $29–99 |
| LLM (Sonnet 4.6, 50% cache) | low three figures, scales with universe + retry rate |
| Broker commissions | $0 (PFOF) |
| Infra (small ARM box + a free static host) | $0 |
Run your own numbers — the LLM line moves a lot with universe size, cache hit rate, and retry policy. If validated-trade rate × expected EV-per-trade exceeds total monthly run cost + execution costs + capital cost, the stack is paying for itself.
Verify via Token-Cost Optimizer.
The tests that matter
# tests/test_no_price_leak.py
def test_research_pack_has_no_price():
pack = fetch_research_pack("AAPL") # internal ticker map
payload = json.dumps(asdict(pack))
assert "price" not in payload.lower()
assert "$" not in payload
assert "chart" not in payload.lower()
assert "momentum" not in payload.lower()
# tests/test_sizing_caps.py
def test_sized_fraction_never_exceeds_tier_cap():
for p in [0.5, 0.7, 0.85, 0.95, 0.99]:
for b in [1.0, 1.5, 2.0, 3.0]:
assert sized_fraction(p, b, tier_cap=0.04) <= 0.04
# tests/test_idempotent_orders.py
def test_same_client_order_id_never_double_fills():
decision = {"id": "test-uuid-1", "ticker": "SYNTHETIC_A", "notional_usd": 1000, "fraction": 0.01}
result1 = execute(decision, current_price=100)
result2 = execute(decision, current_price=100) # retry
assert result1.id == result2.id
These three tests catch the three most dangerous failure modes. Run them in CI on every commit. If they don't exist in your production trader, you have a ticking time bomb.
What changes for scale
The shape above works up to ~$500K bankroll. Beyond that:
- Data: move to Databento for tick granularity. See Data-Vendor TCO.
- Execution: layer smart-order routing on top of broker primitives — simple POV / TWAP wrappers.
- Research: staged cascade (Haiku filter → Sonnet extract → Opus synthesize): see Token-Cost Reality.
- Ops: move scheduler off launchd to managed cron (GitHub Actions scheduled workflow works for most solo scale); add Sentry for error tracking.
- Risk: layer Walk-Forward Validator into nightly CI — gate live trading on passing WF efficiency threshold.
The skip-to-the-end recipe
- Run paper for 90 days with the full pipeline above.
- Verify PBO < 0.3 and DSR > 90% on the paper run.
- Confirm heartbeat + watchdog fire Telegram correctly via induced failure drills.
- Read the BaFin + EU guide if EU-domiciled and ensure Impressum + compliance banner are right.
- Go live with 10% of your intended bankroll. Hold for another 30 days. Scale up only if live matches paper within ±20%.
Connects to
- The 2026 Engineer's Guide to AI in Markets — the strategic overview this tactical guide operationalizes.
- Price-Blind LLM Research Harness, deeper dive on Layer 2.
- Heartbeats, Watchdogs, and Circuit Breakers — Layer 6 in detail.
- Conviction-Scaled Kelly — Layer 3 math.