TL;DR

Four primitives separate a weekend ingestion script from one that runs unattended for six months: a token-bucket rate limiter that respects vendor quotas, a resumable checkpoint that picks up where the last run stopped, idempotent writes so retries don't duplicate rows, and a dead-letter queue so transient failures don't poison the pipeline. Implemented correctly, all four fit in ~200 lines of Python with sqlite3, collections.deque, time.monotonic, and requests. Below: each primitive with runnable code, then a complete ingest.py that wires them together and survives the failure modes that kill naive scripts — rate-limit 429s, network timeouts, partial batches, process restarts, and vendor-side outages.

Why this is the hard problem

People underestimate data ingestion because the happy path is trivial: hit an endpoint, parse the JSON, insert into a table, sleep, loop. The happy path is maybe 30 lines of Python. Everything hard is in the corner cases.

  • The vendor rate-limits you in a shape you didn't expect (per-minute vs per-second bucket, or a sliding window).
  • The process crashes at row 47,000 of a 1,000,000-row backfill, and on restart it either re-ingests all 47,000 (duplicates) or skips them (gaps).
  • A network blip returns a 500 on one symbol; the naive script treats the whole batch as failed and re-runs it, wasting quota and producing duplicates.
  • A vendor change adds a new field, a few symbols return malformed data, and the whole loop halts on an unhandled exception.

Each of the four primitives below addresses one of these failure modes.

Primitive 1: the token-bucket rate limiter

A token bucket has a fixed capacity and refills at a constant rate. Each outbound request consumes one token; when the bucket is empty, the request waits until a token is available. This shape matches how most API vendors think about rate limits and avoids the "burst-then-wait" pattern of a simple sleep-per-call.

import time
from collections import deque

class TokenBucket:
    """
    Allows at most `rate` requests per `period` seconds, smoothed over time.
    Thread-safe only under a single acquiring thread; add a Lock for multi-threaded use.
    """
    def __init__(self, rate: int, period: float):
        self.rate = rate
        self.period = period
        self.events: deque[float] = deque()

    def acquire(self) -> None:
        now = time.monotonic()
        # Drop events outside the window.
        while self.events and self.events[0] <= now - self.period:
            self.events.popleft()
        if len(self.events) >= self.rate:
            # Sleep until the oldest event ages out of the window.
            sleep_for = self.events[0] + self.period - now
            if sleep_for > 0:
                time.sleep(sleep_for)
            now = time.monotonic()
            while self.events and self.events[0] <= now - self.period:
                self.events.popleft()
        self.events.append(now)

Usage:

limiter = TokenBucket(rate=5, period=1.0)  # 5 req/sec
for symbol in symbols:
    limiter.acquire()
    response = requests.get(url, params={"symbol": symbol}, timeout=10)

This is 20 lines. It handles the common case perfectly — a known per-second or per-minute quota.

Three variants worth knowing:

  • Per-endpoint limiters. Some vendors publish different quotas for different endpoints. Keep one TokenBucket per endpoint class.
  • Adaptive limiters. If the vendor returns 429 with a Retry-After header, pause the bucket for that duration. This shape handles dynamic rate-limit changes.
  • Concurrent-request limiters. A threading.Semaphore capped at the vendor's concurrency limit. Combines with the token bucket — one limits requests/sec, the other limits in-flight at once.

Primitive 2: resumable checkpoints

On every successful batch, persist a last_ingested_at marker to the same database as the data itself. On restart, read the marker and resume.

import sqlite3
from contextlib import contextmanager

@contextmanager
def db_tx(db_path: str):
    conn = sqlite3.connect(db_path, timeout=30.0)
    conn.execute("PRAGMA journal_mode=WAL")
    conn.execute("PRAGMA synchronous=NORMAL")
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()

def get_checkpoint(db_path: str, source: str) -> str | None:
    with db_tx(db_path) as conn:
        row = conn.execute(
            "SELECT last_ingested_at FROM ingest_checkpoint WHERE source = ?",
            (source,),
        ).fetchone()
    return row[0] if row else None

def set_checkpoint(db_path: str, source: str, ts: str) -> None:
    with db_tx(db_path) as conn:
        conn.execute(
            """
            INSERT INTO ingest_checkpoint (source, last_ingested_at)
            VALUES (?, ?)
            ON CONFLICT(source) DO UPDATE SET last_ingested_at=excluded.last_ingested_at
            """,
            (source, ts),
        )

Schema:

CREATE TABLE IF NOT EXISTS ingest_checkpoint (
  source             TEXT PRIMARY KEY,
  last_ingested_at   TEXT NOT NULL
);

The critical discipline: update the checkpoint only after the batch has been persisted successfully. If the checkpoint update runs before the data insert, a crash between them causes permanent data loss — the checkpoint advances past data that was never written.

Patterns that go with this:

  • Batch size trades off checkpoint granularity for throughput. Checkpointing after every row is safest but slowest. Checkpointing every 100-1000 rows is typical.
  • Multi-source checkpoints. One row per data source in ingest_checkpoint, keyed by source. A single database can host multiple ingestion loops.
  • Resume-from-max pattern. If your data table has a (symbol, ts) natural key, MAX(ts) per symbol is implicitly a per-symbol checkpoint. This can replace the explicit checkpoint table for simple cases.

Primitive 3: idempotent writes

Every insert must be safe to retry. If the ingestion loop crashes after writing a batch but before acknowledging success, the next run will try to write the same rows. The database must shrug that off without erroring and without duplicating rows.

The SQLite pattern:

CREATE TABLE IF NOT EXISTS bars (
  symbol      TEXT NOT NULL,
  ts          TEXT NOT NULL,       -- ISO 8601 UTC
  open        REAL NOT NULL,
  high        REAL NOT NULL,
  low         REAL NOT NULL,
  close       REAL NOT NULL,
  volume      INTEGER NOT NULL,
  ingested_at TEXT NOT NULL,
  PRIMARY KEY (symbol, ts)
);

Insert pattern:

def insert_bars(conn, bars: list[tuple]) -> int:
    """
    bars: list of (symbol, ts, open, high, low, close, volume, ingested_at)
    Returns number of newly inserted rows (duplicates counted as 0).
    """
    before = conn.execute("SELECT total_changes()").fetchone()[0]
    conn.executemany(
        """
        INSERT INTO bars (symbol, ts, open, high, low, close, volume, ingested_at)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        ON CONFLICT(symbol, ts) DO NOTHING
        """,
        bars,
    )
    after = conn.execute("SELECT total_changes()").fetchone()[0]
    return after - before

ON CONFLICT DO NOTHING is the idempotency guarantee. Retrying the batch is a no-op at the row level. The PRIMARY KEY on (symbol, ts) is the uniqueness constraint that makes the pattern work — without it, duplicates proliferate.

For Postgres, substitute ON CONFLICT (symbol, ts) DO NOTHING. For MySQL, INSERT IGNORE. The semantics are equivalent.

An alternative pattern worth knowing: INSERT ... ON CONFLICT DO UPDATE when the incoming row might contain corrections. Typical for when a vendor revises historical bars (adjustments for splits, dividends, or late prints). Decide explicitly — DO NOTHING for write-once, DO UPDATE for authoritative-revisions, and document which vendors do which.

Primitive 4: the dead-letter queue

Some batches fail permanently: malformed rows, a symbol that's been delisted, a vendor that returns garbage for one specific date. Naive retry loops retry these forever, burning quota and blocking progress.

A dead-letter queue (DLQ) is a separate table where failed rows land with the reason and a retry counter. A DLQ worker processes these independently, with exponential backoff and a maximum retry count.

CREATE TABLE IF NOT EXISTS ingest_dlq (
  id            INTEGER PRIMARY KEY AUTOINCREMENT,
  source        TEXT NOT NULL,
  payload_json  TEXT NOT NULL,         -- the raw request/response that failed
  reason        TEXT NOT NULL,
  retry_count   INTEGER NOT NULL DEFAULT 0,
  next_retry_at TEXT NOT NULL,
  created_at    TEXT NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_dlq_next_retry ON ingest_dlq (next_retry_at);

Enqueue:

import json, datetime as dt

def dlq_enqueue(conn, source: str, payload: dict, reason: str) -> None:
    now = dt.datetime.utcnow().isoformat(timespec="seconds") + "Z"
    conn.execute(
        """
        INSERT INTO ingest_dlq (source, payload_json, reason, next_retry_at, created_at)
        VALUES (?, ?, ?, ?, ?)
        """,
        (source, json.dumps(payload), reason, now, now),
    )

Retry worker, run on its own schedule (e.g., every 15 minutes via launchd):

import math

MAX_RETRIES = 6  # ~64 minute max backoff

def dlq_retry_once(conn, attempt_fn) -> int:
    """
    attempt_fn(payload_dict) -> None on success, raises on failure.
    Returns number of DLQ rows processed.
    """
    now = dt.datetime.utcnow()
    rows = conn.execute(
        """
        SELECT id, source, payload_json, retry_count
        FROM ingest_dlq
        WHERE next_retry_at <= ? AND retry_count < ?
        ORDER BY next_retry_at
        LIMIT 50
        """,
        (now.isoformat(timespec="seconds") + "Z", MAX_RETRIES),
    ).fetchall()

    processed = 0
    for row_id, source, payload_json, retry_count in rows:
        payload = json.loads(payload_json)
        try:
            attempt_fn(payload)
            conn.execute("DELETE FROM ingest_dlq WHERE id = ?", (row_id,))
        except Exception as e:
            # Exponential backoff: 1, 2, 4, 8, 16, 32, 64 minutes
            backoff_min = 2 ** retry_count
            next_retry = now + dt.timedelta(minutes=backoff_min)
            conn.execute(
                """
                UPDATE ingest_dlq
                SET retry_count = retry_count + 1,
                    next_retry_at = ?,
                    reason = ?
                WHERE id = ?
                """,
                (next_retry.isoformat(timespec="seconds") + "Z", str(e)[:500], row_id),
            )
        processed += 1
    conn.commit()
    return processed

Rows that exceed MAX_RETRIES stay in the DLQ indefinitely until a human reviews them. This is a feature, not a bug — permanent failures deserve a human eyeball before being discarded.

Wiring it all together: ingest.py

The four primitives assembled into a single script you can point at a vendor API and leave running under launchd:

# ingest.py
import json, time, datetime as dt, logging, sys
from collections import deque
from contextlib import contextmanager
import sqlite3
import requests

DB_PATH = "/Users/you/data/market.db"
SOURCE = "alpaca_bars_1min"
ENDPOINT = "https://data.alpaca.markets/v2/stocks/bars"
RATE_PER_SEC = 4
BATCH_SYMBOLS = 50
BATCH_LOOKBACK_MIN = 15

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("ingest")


class TokenBucket:
    def __init__(self, rate, period):
        self.rate = rate
        self.period = period
        self.events = deque()

    def acquire(self):
        now = time.monotonic()
        while self.events and self.events[0] <= now - self.period:
            self.events.popleft()
        if len(self.events) >= self.rate:
            sleep_for = self.events[0] + self.period - now
            if sleep_for > 0:
                time.sleep(sleep_for)
            now = time.monotonic()
            while self.events and self.events[0] <= now - self.period:
                self.events.popleft()
        self.events.append(now)


@contextmanager
def db_tx(path):
    conn = sqlite3.connect(path, timeout=30.0)
    conn.execute("PRAGMA journal_mode=WAL")
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()


def get_checkpoint(conn, source):
    row = conn.execute(
        "SELECT last_ingested_at FROM ingest_checkpoint WHERE source = ?",
        (source,),
    ).fetchone()
    return row[0] if row else None


def set_checkpoint(conn, source, ts):
    conn.execute(
        "INSERT INTO ingest_checkpoint (source, last_ingested_at) VALUES (?, ?) "
        "ON CONFLICT(source) DO UPDATE SET last_ingested_at=excluded.last_ingested_at",
        (source, ts),
    )


def insert_bars(conn, bars):
    before = conn.execute("SELECT total_changes()").fetchone()[0]
    conn.executemany(
        "INSERT INTO bars (symbol, ts, open, high, low, close, volume, ingested_at) "
        "VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(symbol, ts) DO NOTHING",
        bars,
    )
    after = conn.execute("SELECT total_changes()").fetchone()[0]
    return after - before


def dlq_enqueue(conn, source, payload, reason):
    now = dt.datetime.utcnow().isoformat(timespec="seconds") + "Z"
    conn.execute(
        "INSERT INTO ingest_dlq (source, payload_json, reason, next_retry_at, created_at) "
        "VALUES (?, ?, ?, ?, ?)",
        (source, json.dumps(payload), reason, now, now),
    )


def fetch_bars(limiter, symbols, start_iso, end_iso, headers):
    limiter.acquire()
    params = {"symbols": ",".join(symbols), "start": start_iso, "end": end_iso, "timeframe": "1Min"}
    r = requests.get(ENDPOINT, params=params, headers=headers, timeout=15)
    if r.status_code == 429:
        retry_after = float(r.headers.get("Retry-After", "5"))
        log.warning("429 received; sleeping %.1fs", retry_after)
        time.sleep(retry_after)
        raise RuntimeError("rate_limited")
    r.raise_for_status()
    return r.json()


def run_once(api_key_id, api_secret):
    limiter = TokenBucket(rate=RATE_PER_SEC, period=1.0)
    headers = {"APCA-API-KEY-ID": api_key_id, "APCA-API-SECRET-KEY": api_secret}
    with db_tx(DB_PATH) as conn:
        ckpt = get_checkpoint(conn, SOURCE)
        if ckpt is None:
            start = dt.datetime.utcnow() - dt.timedelta(days=1)
        else:
            start = dt.datetime.fromisoformat(ckpt.replace("Z", "+00:00"))
        end = min(dt.datetime.utcnow(), start + dt.timedelta(minutes=BATCH_LOOKBACK_MIN))
        universe = [row[0] for row in conn.execute("SELECT symbol FROM universe ORDER BY symbol")]

        ingested_at = dt.datetime.utcnow().isoformat(timespec="seconds") + "Z"
        total_new = 0
        for i in range(0, len(universe), BATCH_SYMBOLS):
            batch = universe[i : i + BATCH_SYMBOLS]
            try:
                payload = fetch_bars(limiter, batch, start.isoformat() + "Z", end.isoformat() + "Z", headers)
                rows = []
                for sym, bars in (payload.get("bars") or {}).items():
                    for b in bars:
                        rows.append((sym, b["t"], b["o"], b["h"], b["l"], b["c"], b["v"], ingested_at))
                if rows:
                    total_new += insert_bars(conn, rows)
            except Exception as e:
                log.exception("batch failed; enqueuing to DLQ")
                dlq_enqueue(conn, SOURCE, {"batch": batch, "start": start.isoformat(), "end": end.isoformat()}, str(e)[:500])

        set_checkpoint(conn, SOURCE, end.isoformat(timespec="seconds") + "Z")
        log.info("run complete: %d new bars, checkpoint=%s", total_new, end.isoformat())


if __name__ == "__main__":
    import os
    run_once(os.environ["APCA_KEY_ID"], os.environ["APCA_SECRET"])

That's roughly 120 lines with comments stripped. Schedule it under launchd or systemd every 5-15 minutes; it is safe to run concurrently only if you add a PID-file lock (omitted here for brevity).

Cost and reliability math

Assume a universe of 500 symbols, 1-minute bars, run every 15 minutes, 8 trading hours/day:

  • Requests per run: 500 / 50 = 10 batch calls, well under the 4 req/sec limit.
  • Bars per run: ~500 symbols × 15 minutes = 7,500 bars.
  • Bars per year (250 trading days): ~1.9M bars × size ~80 bytes = ~150 MB of raw data.
  • SQLite handles this trivially; a single WAL file under 1 GB for years of data.
  • DLQ volume under normal operation: 0-3 batches/day, resolved by the retry worker within an hour.

A naive version of this script — no rate limiter, no checkpoint, no idempotency, no DLQ — looks like it works for a week, then breaks the first time the network blips, the vendor rate-limits a burst, or the laptop restarts overnight. A week later you discover duplicate rows, a 6-hour gap, and a crash-looped cron that burned through your API quota with nothing to show for it.

The two reliability patterns this doesn't cover

For completeness, two patterns often paired with the four above but beyond the scope of this piece:

  1. Schema migrations. As the vendor evolves the response shape, the ingestion code must evolve. Keep schema version numbers in the ingest_checkpoint row, and run migration scripts that are themselves idempotent.
  2. Data quality gates. Bars with obviously wrong values (negative volume, high < low) should be flagged rather than silently persisted. A separate validation pass on each batch, logging anomalies to a data_quality table.

Both layer on top of the four primitives cleanly.

Connects to

References

  • SQLite documentation, "ON CONFLICT clause" and "WAL" (sqlite.org, retrieved April 2026).
  • Floyd, S., & Jacobson, V. (1993). "Random Early Detection Gateways for Congestion Avoidance." IEEE/ACM Transactions on Networking 1(4). (Foundational token-bucket reasoning.)
  • Nygard, M. T. (2018). Release It! Design and Deploy Production-Ready Software. 2nd ed. Pragmatic Bookshelf. (Circuit breakers, DLQ patterns.)
  • Hohpe, G., & Woolf, B. (2003). Enterprise Integration Patterns. Addison-Wesley. (Dead-letter channel, idempotent receiver.)
  • Alpaca Data API documentation and rate-limit headers (retrieved April 2026).