TL;DR
Four primitives separate a weekend ingestion script from one that runs unattended for six months: a token-bucket rate limiter that respects vendor quotas, a resumable checkpoint that picks up where the last run stopped, idempotent writes so retries don't duplicate rows, and a dead-letter queue so transient failures don't poison the pipeline. Implemented correctly, all four fit in ~200 lines of Python with sqlite3, collections.deque, time.monotonic, and requests. Below: each primitive with runnable code, then a complete ingest.py that wires them together and survives the failure modes that kill naive scripts — rate-limit 429s, network timeouts, partial batches, process restarts, and vendor-side outages.
Why this is the hard problem
People underestimate data ingestion because the happy path is trivial: hit an endpoint, parse the JSON, insert into a table, sleep, loop. The happy path is maybe 30 lines of Python. Everything hard is in the corner cases.
- The vendor rate-limits you in a shape you didn't expect (per-minute vs per-second bucket, or a sliding window).
- The process crashes at row 47,000 of a 1,000,000-row backfill, and on restart it either re-ingests all 47,000 (duplicates) or skips them (gaps).
- A network blip returns a 500 on one symbol; the naive script treats the whole batch as failed and re-runs it, wasting quota and producing duplicates.
- A vendor change adds a new field, a few symbols return malformed data, and the whole loop halts on an unhandled exception.
Each of the four primitives below addresses one of these failure modes.
Primitive 1: the token-bucket rate limiter
A token bucket has a fixed capacity and refills at a constant rate. Each outbound request consumes one token; when the bucket is empty, the request waits until a token is available. This shape matches how most API vendors think about rate limits and avoids the "burst-then-wait" pattern of a simple sleep-per-call.
import time
from collections import deque
class TokenBucket:
"""
Allows at most `rate` requests per `period` seconds, smoothed over time.
Thread-safe only under a single acquiring thread; add a Lock for multi-threaded use.
"""
def __init__(self, rate: int, period: float):
self.rate = rate
self.period = period
self.events: deque[float] = deque()
def acquire(self) -> None:
now = time.monotonic()
# Drop events outside the window.
while self.events and self.events[0] <= now - self.period:
self.events.popleft()
if len(self.events) >= self.rate:
# Sleep until the oldest event ages out of the window.
sleep_for = self.events[0] + self.period - now
if sleep_for > 0:
time.sleep(sleep_for)
now = time.monotonic()
while self.events and self.events[0] <= now - self.period:
self.events.popleft()
self.events.append(now)
Usage:
limiter = TokenBucket(rate=5, period=1.0) # 5 req/sec
for symbol in symbols:
limiter.acquire()
response = requests.get(url, params={"symbol": symbol}, timeout=10)
This is 20 lines. It handles the common case perfectly — a known per-second or per-minute quota.
Three variants worth knowing:
- Per-endpoint limiters. Some vendors publish different quotas for different endpoints. Keep one
TokenBucketper endpoint class. - Adaptive limiters. If the vendor returns 429 with a
Retry-Afterheader, pause the bucket for that duration. This shape handles dynamic rate-limit changes. - Concurrent-request limiters. A
threading.Semaphorecapped at the vendor's concurrency limit. Combines with the token bucket — one limits requests/sec, the other limits in-flight at once.
Primitive 2: resumable checkpoints
On every successful batch, persist a last_ingested_at marker to the same database as the data itself. On restart, read the marker and resume.
import sqlite3
from contextlib import contextmanager
@contextmanager
def db_tx(db_path: str):
conn = sqlite3.connect(db_path, timeout=30.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def get_checkpoint(db_path: str, source: str) -> str | None:
with db_tx(db_path) as conn:
row = conn.execute(
"SELECT last_ingested_at FROM ingest_checkpoint WHERE source = ?",
(source,),
).fetchone()
return row[0] if row else None
def set_checkpoint(db_path: str, source: str, ts: str) -> None:
with db_tx(db_path) as conn:
conn.execute(
"""
INSERT INTO ingest_checkpoint (source, last_ingested_at)
VALUES (?, ?)
ON CONFLICT(source) DO UPDATE SET last_ingested_at=excluded.last_ingested_at
""",
(source, ts),
)
Schema:
CREATE TABLE IF NOT EXISTS ingest_checkpoint (
source TEXT PRIMARY KEY,
last_ingested_at TEXT NOT NULL
);
The critical discipline: update the checkpoint only after the batch has been persisted successfully. If the checkpoint update runs before the data insert, a crash between them causes permanent data loss — the checkpoint advances past data that was never written.
Patterns that go with this:
- Batch size trades off checkpoint granularity for throughput. Checkpointing after every row is safest but slowest. Checkpointing every 100-1000 rows is typical.
- Multi-source checkpoints. One row per data source in
ingest_checkpoint, keyed bysource. A single database can host multiple ingestion loops. - Resume-from-max pattern. If your data table has a
(symbol, ts)natural key,MAX(ts)per symbol is implicitly a per-symbol checkpoint. This can replace the explicit checkpoint table for simple cases.
Primitive 3: idempotent writes
Every insert must be safe to retry. If the ingestion loop crashes after writing a batch but before acknowledging success, the next run will try to write the same rows. The database must shrug that off without erroring and without duplicating rows.
The SQLite pattern:
CREATE TABLE IF NOT EXISTS bars (
symbol TEXT NOT NULL,
ts TEXT NOT NULL, -- ISO 8601 UTC
open REAL NOT NULL,
high REAL NOT NULL,
low REAL NOT NULL,
close REAL NOT NULL,
volume INTEGER NOT NULL,
ingested_at TEXT NOT NULL,
PRIMARY KEY (symbol, ts)
);
Insert pattern:
def insert_bars(conn, bars: list[tuple]) -> int:
"""
bars: list of (symbol, ts, open, high, low, close, volume, ingested_at)
Returns number of newly inserted rows (duplicates counted as 0).
"""
before = conn.execute("SELECT total_changes()").fetchone()[0]
conn.executemany(
"""
INSERT INTO bars (symbol, ts, open, high, low, close, volume, ingested_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(symbol, ts) DO NOTHING
""",
bars,
)
after = conn.execute("SELECT total_changes()").fetchone()[0]
return after - before
ON CONFLICT DO NOTHING is the idempotency guarantee. Retrying the batch is a no-op at the row level. The PRIMARY KEY on (symbol, ts) is the uniqueness constraint that makes the pattern work — without it, duplicates proliferate.
For Postgres, substitute ON CONFLICT (symbol, ts) DO NOTHING. For MySQL, INSERT IGNORE. The semantics are equivalent.
An alternative pattern worth knowing: INSERT ... ON CONFLICT DO UPDATE when the incoming row might contain corrections. Typical for when a vendor revises historical bars (adjustments for splits, dividends, or late prints). Decide explicitly — DO NOTHING for write-once, DO UPDATE for authoritative-revisions, and document which vendors do which.
Primitive 4: the dead-letter queue
Some batches fail permanently: malformed rows, a symbol that's been delisted, a vendor that returns garbage for one specific date. Naive retry loops retry these forever, burning quota and blocking progress.
A dead-letter queue (DLQ) is a separate table where failed rows land with the reason and a retry counter. A DLQ worker processes these independently, with exponential backoff and a maximum retry count.
CREATE TABLE IF NOT EXISTS ingest_dlq (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL,
payload_json TEXT NOT NULL, -- the raw request/response that failed
reason TEXT NOT NULL,
retry_count INTEGER NOT NULL DEFAULT 0,
next_retry_at TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_dlq_next_retry ON ingest_dlq (next_retry_at);
Enqueue:
import json, datetime as dt
def dlq_enqueue(conn, source: str, payload: dict, reason: str) -> None:
now = dt.datetime.utcnow().isoformat(timespec="seconds") + "Z"
conn.execute(
"""
INSERT INTO ingest_dlq (source, payload_json, reason, next_retry_at, created_at)
VALUES (?, ?, ?, ?, ?)
""",
(source, json.dumps(payload), reason, now, now),
)
Retry worker, run on its own schedule (e.g., every 15 minutes via launchd):
import math
MAX_RETRIES = 6 # ~64 minute max backoff
def dlq_retry_once(conn, attempt_fn) -> int:
"""
attempt_fn(payload_dict) -> None on success, raises on failure.
Returns number of DLQ rows processed.
"""
now = dt.datetime.utcnow()
rows = conn.execute(
"""
SELECT id, source, payload_json, retry_count
FROM ingest_dlq
WHERE next_retry_at <= ? AND retry_count < ?
ORDER BY next_retry_at
LIMIT 50
""",
(now.isoformat(timespec="seconds") + "Z", MAX_RETRIES),
).fetchall()
processed = 0
for row_id, source, payload_json, retry_count in rows:
payload = json.loads(payload_json)
try:
attempt_fn(payload)
conn.execute("DELETE FROM ingest_dlq WHERE id = ?", (row_id,))
except Exception as e:
# Exponential backoff: 1, 2, 4, 8, 16, 32, 64 minutes
backoff_min = 2 ** retry_count
next_retry = now + dt.timedelta(minutes=backoff_min)
conn.execute(
"""
UPDATE ingest_dlq
SET retry_count = retry_count + 1,
next_retry_at = ?,
reason = ?
WHERE id = ?
""",
(next_retry.isoformat(timespec="seconds") + "Z", str(e)[:500], row_id),
)
processed += 1
conn.commit()
return processed
Rows that exceed MAX_RETRIES stay in the DLQ indefinitely until a human reviews them. This is a feature, not a bug — permanent failures deserve a human eyeball before being discarded.
Wiring it all together: ingest.py
The four primitives assembled into a single script you can point at a vendor API and leave running under launchd:
# ingest.py
import json, time, datetime as dt, logging, sys
from collections import deque
from contextlib import contextmanager
import sqlite3
import requests
DB_PATH = "/Users/you/data/market.db"
SOURCE = "alpaca_bars_1min"
ENDPOINT = "https://data.alpaca.markets/v2/stocks/bars"
RATE_PER_SEC = 4
BATCH_SYMBOLS = 50
BATCH_LOOKBACK_MIN = 15
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("ingest")
class TokenBucket:
def __init__(self, rate, period):
self.rate = rate
self.period = period
self.events = deque()
def acquire(self):
now = time.monotonic()
while self.events and self.events[0] <= now - self.period:
self.events.popleft()
if len(self.events) >= self.rate:
sleep_for = self.events[0] + self.period - now
if sleep_for > 0:
time.sleep(sleep_for)
now = time.monotonic()
while self.events and self.events[0] <= now - self.period:
self.events.popleft()
self.events.append(now)
@contextmanager
def db_tx(path):
conn = sqlite3.connect(path, timeout=30.0)
conn.execute("PRAGMA journal_mode=WAL")
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def get_checkpoint(conn, source):
row = conn.execute(
"SELECT last_ingested_at FROM ingest_checkpoint WHERE source = ?",
(source,),
).fetchone()
return row[0] if row else None
def set_checkpoint(conn, source, ts):
conn.execute(
"INSERT INTO ingest_checkpoint (source, last_ingested_at) VALUES (?, ?) "
"ON CONFLICT(source) DO UPDATE SET last_ingested_at=excluded.last_ingested_at",
(source, ts),
)
def insert_bars(conn, bars):
before = conn.execute("SELECT total_changes()").fetchone()[0]
conn.executemany(
"INSERT INTO bars (symbol, ts, open, high, low, close, volume, ingested_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(symbol, ts) DO NOTHING",
bars,
)
after = conn.execute("SELECT total_changes()").fetchone()[0]
return after - before
def dlq_enqueue(conn, source, payload, reason):
now = dt.datetime.utcnow().isoformat(timespec="seconds") + "Z"
conn.execute(
"INSERT INTO ingest_dlq (source, payload_json, reason, next_retry_at, created_at) "
"VALUES (?, ?, ?, ?, ?)",
(source, json.dumps(payload), reason, now, now),
)
def fetch_bars(limiter, symbols, start_iso, end_iso, headers):
limiter.acquire()
params = {"symbols": ",".join(symbols), "start": start_iso, "end": end_iso, "timeframe": "1Min"}
r = requests.get(ENDPOINT, params=params, headers=headers, timeout=15)
if r.status_code == 429:
retry_after = float(r.headers.get("Retry-After", "5"))
log.warning("429 received; sleeping %.1fs", retry_after)
time.sleep(retry_after)
raise RuntimeError("rate_limited")
r.raise_for_status()
return r.json()
def run_once(api_key_id, api_secret):
limiter = TokenBucket(rate=RATE_PER_SEC, period=1.0)
headers = {"APCA-API-KEY-ID": api_key_id, "APCA-API-SECRET-KEY": api_secret}
with db_tx(DB_PATH) as conn:
ckpt = get_checkpoint(conn, SOURCE)
if ckpt is None:
start = dt.datetime.utcnow() - dt.timedelta(days=1)
else:
start = dt.datetime.fromisoformat(ckpt.replace("Z", "+00:00"))
end = min(dt.datetime.utcnow(), start + dt.timedelta(minutes=BATCH_LOOKBACK_MIN))
universe = [row[0] for row in conn.execute("SELECT symbol FROM universe ORDER BY symbol")]
ingested_at = dt.datetime.utcnow().isoformat(timespec="seconds") + "Z"
total_new = 0
for i in range(0, len(universe), BATCH_SYMBOLS):
batch = universe[i : i + BATCH_SYMBOLS]
try:
payload = fetch_bars(limiter, batch, start.isoformat() + "Z", end.isoformat() + "Z", headers)
rows = []
for sym, bars in (payload.get("bars") or {}).items():
for b in bars:
rows.append((sym, b["t"], b["o"], b["h"], b["l"], b["c"], b["v"], ingested_at))
if rows:
total_new += insert_bars(conn, rows)
except Exception as e:
log.exception("batch failed; enqueuing to DLQ")
dlq_enqueue(conn, SOURCE, {"batch": batch, "start": start.isoformat(), "end": end.isoformat()}, str(e)[:500])
set_checkpoint(conn, SOURCE, end.isoformat(timespec="seconds") + "Z")
log.info("run complete: %d new bars, checkpoint=%s", total_new, end.isoformat())
if __name__ == "__main__":
import os
run_once(os.environ["APCA_KEY_ID"], os.environ["APCA_SECRET"])
That's roughly 120 lines with comments stripped. Schedule it under launchd or systemd every 5-15 minutes; it is safe to run concurrently only if you add a PID-file lock (omitted here for brevity).
Cost and reliability math
Assume a universe of 500 symbols, 1-minute bars, run every 15 minutes, 8 trading hours/day:
- Requests per run: 500 / 50 = 10 batch calls, well under the 4 req/sec limit.
- Bars per run: ~500 symbols × 15 minutes = 7,500 bars.
- Bars per year (250 trading days): ~1.9M bars × size ~80 bytes = ~150 MB of raw data.
- SQLite handles this trivially; a single WAL file under 1 GB for years of data.
- DLQ volume under normal operation: 0-3 batches/day, resolved by the retry worker within an hour.
A naive version of this script — no rate limiter, no checkpoint, no idempotency, no DLQ — looks like it works for a week, then breaks the first time the network blips, the vendor rate-limits a burst, or the laptop restarts overnight. A week later you discover duplicate rows, a 6-hour gap, and a crash-looped cron that burned through your API quota with nothing to show for it.
The two reliability patterns this doesn't cover
For completeness, two patterns often paired with the four above but beyond the scope of this piece:
- Schema migrations. As the vendor evolves the response shape, the ingestion code must evolve. Keep schema version numbers in the
ingest_checkpointrow, and run migration scripts that are themselves idempotent. - Data quality gates. Bars with obviously wrong values (negative volume, high < low) should be flagged rather than silently persisted. A separate validation pass on each batch, logging anomalies to a
data_qualitytable.
Both layer on top of the four primitives cleanly.
Connects to
- Market Data APIs Compared (2026) — choose the vendor whose rate limits match your token bucket.
- Heartbeats, Watchdogs, and Circuit Breakers for Trading Systems — the monitoring layer that sits above the ingestion loop.
- The $0/Month Trading Stack — where this ingestion loop lives in the bigger picture.
- Data-Vendor TCO Calculator — quantify the request volume and cost implied by your rate-limiter settings.
- Trading System Blueprinter — generate the ingestion + execution skeleton for a given strategy.
References
- SQLite documentation, "ON CONFLICT clause" and "WAL" (sqlite.org, retrieved April 2026).
- Floyd, S., & Jacobson, V. (1993). "Random Early Detection Gateways for Congestion Avoidance." IEEE/ACM Transactions on Networking 1(4). (Foundational token-bucket reasoning.)
- Nygard, M. T. (2018). Release It! Design and Deploy Production-Ready Software. 2nd ed. Pragmatic Bookshelf. (Circuit breakers, DLQ patterns.)
- Hohpe, G., & Woolf, B. (2003). Enterprise Integration Patterns. Addison-Wesley. (Dead-letter channel, idempotent receiver.)
- Alpaca Data API documentation and rate-limit headers (retrieved April 2026).