Developer guide · June 29, 2026

How to Build an Options Flow Scanner: DIY vs. API

Every developer who's seriously looked into unusual options activity eventually thinks: "Could I just build this myself?" The honest answer is yes, but it's significantly harder than it looks.

How an options flow scanner works

An options flow scanner does one thing: surface options prints that are statistically unusual relative to the existing open interest and typical volume for that contract. The pipeline has four stages:

  1. Ingest: receive options trade prints from an exchange feed or API (ticker, strike, expiry, volume, premium, side)
  2. Score: compute unusualness metrics (Vol/OI ratio, premium size, DTE urgency, OTM aggressiveness, aggressor side)
  3. Filter: apply thresholds (score above X, premium above $Y, sweeps only, etc.)
  4. Alert: push qualifying prints to Discord, Slack, Telegram, email, or a database

The complexity is in stage 2. A naive scanner that just flags high-volume options contracts produces too many false positives: routine institutional rolls, index rebalancing, and market-maker hedging all show up as "unusual" if you only look at raw volume. The quality of a scanner lives in how well it separates truly directional flow from noise.

Data sources: OPRA tape, Polygon, and alternatives

Options trades in the US are reported through the Options Price Reporting Authority (OPRA), which aggregates prints from all 16 US options exchanges. Every options print, every single trade, goes through OPRA. The challenge is accessing it:

ProviderData typeDelayCostBest for
Polygon.io StarterREST snapshots, unlimited calls15 min~$29/moLearning, backtesting, delayed alerting
Polygon.io AdvancedWebSocket stream + RESTReal-time~$199/moLive scanner, production alerting
Interactive Brokers TWS APIReal-time via broker APIReal-timeIncluded with account + data subsTraders with an IB account
CBOE DataShopFull OPRA tape, bulk filesEOD or real-timeInstitutional pricingQuant research, building commercial products
TradierREST + WebSocketDelayedDeveloper tier: free; production variesBrokerage-integrated tools

For a DIY scanner on a budget, Polygon Starter is the practical starting point. The 15-minute delay means you can't catch intraday momentum plays, but you can identify unusual activity that persists through a session and validate your scoring logic before upgrading to real-time.

Computing Vol/OI correctly

Vol/OI is the most important metric in unusual options scanning, and the most frequently computed incorrectly.

The correct formula: Vol/OI = today's volume for (ticker, strike, expiry) ÷ prior-day's open interest for the same (ticker, strike, expiry).

Common error: Using total OI across all strikes for the ticker as the denominator. This produces much lower ratios and misses concentrated unusual activity at specific strikes.

import requests
from datetime import date, timedelta

POLYGON_KEY = "YOUR_POLYGON_KEY"

def get_options_snapshot(ticker):
    """Get current options chain snapshot with volume and OI."""
    url = f"https://api.polygon.io/v3/snapshot/options/{ticker}"
    params = {"apiKey": POLYGON_KEY, "limit": 250}
    r = requests.get(url, params=params)
    r.raise_for_status()
    return r.json().get("results", [])

def compute_vol_oi(snapshot_results):
    """
    Compute Vol/OI per contract. Returns list of (ticker, strike, expiry, vol_oi, premium, side).
    Only includes contracts with OI > 0 to avoid div-by-zero.
    """
    signals = []
    for contract in snapshot_results:
        details = contract.get("details", {})
        day = contract.get("day", {})
        greeks = contract.get("greeks", {})

        ticker = details.get("underlying_ticker", "")
        strike = details.get("strike_price", 0)
        expiry = details.get("expiration_date", "")
        contract_type = details.get("contract_type", "").upper()

        volume = day.get("volume", 0) or 0
        oi = contract.get("open_interest", 0) or 0
        premium = (day.get("vwap", 0) or 0) * volume * 100  # estimated dollar premium

        if oi == 0 or volume == 0:
            continue

        vol_oi = volume / oi

        signals.append({
            "ticker": ticker,
            "strike": strike,
            "expiry": expiry,
            "type": contract_type,
            "volume": volume,
            "oi": oi,
            "vol_oi": vol_oi,
            "premium": premium,
        })

    return signals

# Usage
snapshot = get_options_snapshot("NVDA")
contracts = compute_vol_oi(snapshot)

# Filter for unusual Vol/OI
unusual = [c for c in contracts if c["vol_oi"] >= 3.0 and c["premium"] >= 50_000]
unusual.sort(key=lambda x: x["vol_oi"], reverse=True)
print(f"Found {len(unusual)} unusual contracts for NVDA")

This gives you the Vol/OI ratio per contract. The threshold of 3.0 is a starting point. Calibrate against historical data to find the ratio that minimizes false positives for your target universe.

Detecting sweeps

A sweep is a large order split across multiple exchanges simultaneously, indicating urgency: the buyer couldn't fill the full order on one exchange and hit every available offer. Sweep detection requires tick-level data: each individual trade print with its exchange code and timestamp.

from collections import defaultdict
from datetime import datetime, timedelta

def detect_sweeps(trade_stream, window_seconds=3, min_exchanges=3, min_prints=5):
    """
    Identify sweeps in a stream of options trade prints.
    A sweep: same (ticker, strike, expiry, type) across min_exchanges
    different exchanges within window_seconds seconds.

    trade_stream: list of dicts with keys:
        ticker, strike, expiry, type, exchange, timestamp, volume, premium
    Returns: list of sweep events
    """
    # Group trades by contract
    by_contract = defaultdict(list)
    for trade in trade_stream:
        key = (trade["ticker"], trade["strike"], trade["expiry"], trade["type"])
        by_contract[key].append(trade)

    sweeps = []
    for contract_key, trades in by_contract.items():
        trades_sorted = sorted(trades, key=lambda x: x["timestamp"])

        # Sliding window: find clusters within window_seconds
        i = 0
        while i < len(trades_sorted):
            window_start = trades_sorted[i]["timestamp"]
            window_end = window_start + timedelta(seconds=window_seconds)

            cluster = [t for t in trades_sorted[i:]
                      if t["timestamp"] <= window_end]

            exchanges = {t["exchange"] for t in cluster}
            if len(exchanges) >= min_exchanges and len(cluster) >= min_prints:
                total_volume = sum(t["volume"] for t in cluster)
                total_premium = sum(t["premium"] for t in cluster)
                sweeps.append({
                    "ticker": contract_key[0],
                    "strike": contract_key[1],
                    "expiry": contract_key[2],
                    "type": contract_key[3],
                    "exchanges": list(exchanges),
                    "prints": len(cluster),
                    "volume": total_volume,
                    "premium": total_premium,
                    "start_time": window_start,
                })
                i += len(cluster)  # advance past the cluster
            else:
                i += 1

    return sweeps

This requires tick-level data: each individual trade with an exchange code. Polygon Advanced provides this via WebSocket. The Starter tier provides snapshots, not individual ticks, so sweep detection isn't possible at the $29/mo tier.

Composite scoring

Vol/OI alone generates too many false positives. A composite unusualness score combines multiple factors:

import math

def score_contract(vol_oi, premium, dte, otm_pct, is_sweep, is_ask_side):
    """
    Composite unusualness score (0-100).
    Weights match industry-standard practice for separating
    institutional directional flow from routine hedging/rolls.
    """
    # Vol/OI component (40 points max)
    # Log-scale so a 10x ratio doesn't dominate a 20x ratio
    vol_oi_score = min(40, math.log(max(vol_oi, 1) + 1) / math.log(21) * 40)

    # Premium component (30 points max)
    # $25K floor → 0 pts; $1M+ → 30 pts
    premium_score = min(30, math.log(max(premium / 25_000, 1) + 1) / math.log(41) * 30)

    # DTE urgency component (20 points max)
    # 0-7 DTE = max urgency; 90+ DTE = near-zero
    if dte is None or dte < 0:
        dte_score = 5
    elif dte <= 7:
        dte_score = 20
    elif dte <= 30:
        dte_score = 15
    elif dte <= 60:
        dte_score = 8
    else:
        dte_score = max(0, 20 - (dte - 60) / 10)

    # OTM aggressiveness (10 points max)
    # 0% OTM (ATM) = 0 pts; 10%+ OTM = 10 pts
    otm_score = min(10, (otm_pct or 0) * 1.2)

    base_score = vol_oi_score + premium_score + dte_score + otm_score

    # Bonuses for execution quality
    if is_sweep:
        base_score = min(100, base_score * 1.15)
    if is_ask_side:
        base_score = min(100, base_score * 1.08)

    return round(base_score)

# Score flags
def score_flag(score):
    if score >= 85:
        return "EXTREME"
    elif score >= 70:
        return "ELEVATED"
    elif score >= 55:
        return "NOTABLE"
    else:
        return None

The weights here are a starting point. Calibrate them against historical data using the backtest methodology from the historical data guide: tune the weights to maximize the correlation between score tier and 3-day directional hit-rate in your target universe.

Full Python implementation (Polygon delayed + Discord)

A working end-to-end scanner that polls Polygon's delayed feed and sends qualifying prints to Discord:

import requests
import time
import math
from datetime import datetime, date

POLYGON_KEY = "YOUR_POLYGON_KEY"
DISCORD_WEBHOOK = "YOUR_DISCORD_WEBHOOK_URL"

WATCHLIST = ["AAPL", "NVDA", "TSLA", "SPY", "QQQ", "META", "AMZN", "MSFT"]
PREMIUM_FLOOR = 100_000   # minimum dollar premium
SCORE_MIN = 65            # minimum score to alert
SCAN_INTERVAL_SECS = 300  # poll every 5 min (respect Polygon rate limits)

def score_contract(vol_oi, premium, dte, otm_pct):
    vol_oi_score = min(40, math.log(max(vol_oi, 1) + 1) / math.log(21) * 40)
    premium_score = min(30, math.log(max(premium / 25_000, 1) + 1) / math.log(41) * 30)
    if dte is None or dte < 0: dte_score = 5
    elif dte <= 7: dte_score = 20
    elif dte <= 30: dte_score = 15
    elif dte <= 60: dte_score = 8
    else: dte_score = max(0, 20 - (dte - 60) / 10)
    otm_score = min(10, (otm_pct or 0) * 1.2)
    return round(vol_oi_score + premium_score + dte_score + otm_score)

def days_to_expiry(expiry_str):
    try:
        exp = datetime.strptime(expiry_str, "%Y-%m-%d").date()
        return (exp - date.today()).days
    except Exception:
        return None

def send_discord_alert(signal):
    flag = "EXTREME" if signal["score"] >= 85 else "ELEVATED" if signal["score"] >= 70 else "NOTABLE"
    color = 0xFF2DAA if flag == "EXTREME" else 0xF5A623 if flag == "ELEVATED" else 0xAAAAAA
    arrow = "▲" if signal["type"] == "CALL" else "▼"
    embed = {
        "embeds": [{
            "title": f"{signal['ticker']} {arrow} {signal['type']} — {flag}",
            "color": color,
            "fields": [
                {"name": "Strike / Expiry", "value": f"${signal['strike']} · {signal['expiry']} ({signal['dte']}d)", "inline": True},
                {"name": "Premium", "value": f"${signal['premium']:,.0f}", "inline": True},
                {"name": "Vol/OI", "value": f"{signal['vol_oi']:.1f}×", "inline": True},
                {"name": "Score", "value": str(signal['score']), "inline": True},
            ],
            "footer": {"text": "15-min delayed · RadarPulse DIY scanner · Not financial advice"}
        }]
    }
    requests.post(DISCORD_WEBHOOK, json=embed, timeout=10)

def scan_ticker(ticker):
    url = f"https://api.polygon.io/v3/snapshot/options/{ticker}"
    params = {"apiKey": POLYGON_KEY, "limit": 250}
    try:
        r = requests.get(url, params=params, timeout=15)
        r.raise_for_status()
    except Exception as e:
        print(f"Error fetching {ticker}: {e}")
        return []

    results = r.json().get("results", [])
    signals = []

    for contract in results:
        details = contract.get("details", {})
        day = contract.get("day", {})

        contract_type = details.get("contract_type", "").upper()
        strike = details.get("strike_price", 0)
        expiry = details.get("expiration_date", "")
        volume = day.get("volume", 0) or 0
        oi = contract.get("open_interest", 0) or 0
        vwap = day.get("vwap", 0) or 0

        if oi == 0 or volume == 0 or vwap == 0:
            continue

        vol_oi = volume / oi
        premium = vwap * volume * 100
        dte = days_to_expiry(expiry)

        # Rough OTM estimate (requires spot price; approximation here)
        underlying = contract.get("underlying_asset", {})
        spot = underlying.get("price", 0) or 0
        if spot > 0 and strike > 0:
            otm_pct = abs(strike - spot) / spot * 100
        else:
            otm_pct = 5  # fallback

        if vol_oi < 2.0 or premium < PREMIUM_FLOOR:
            continue

        score = score_contract(vol_oi, premium, dte, otm_pct)
        if score < SCORE_MIN:
            continue

        signals.append({
            "ticker": ticker,
            "type": contract_type,
            "strike": strike,
            "expiry": expiry,
            "dte": dte,
            "volume": volume,
            "oi": oi,
            "vol_oi": round(vol_oi, 1),
            "premium": premium,
            "score": score,
        })

    return signals

# Main scan loop
seen = set()  # avoid duplicate alerts within a session
print(f"Starting scanner — watching {', '.join(WATCHLIST)}")

while True:
    for ticker in WATCHLIST:
        signals = scan_ticker(ticker)
        for sig in signals:
            key = f"{sig['ticker']}:{sig['strike']}:{sig['expiry']}:{sig['type']}"
            if key not in seen:
                seen.add(key)
                print(f"ALERT: {sig['ticker']} {sig['type']} — score {sig['score']} — Vol/OI {sig['vol_oi']}× — ${sig['premium']:,.0f}")
                send_discord_alert(sig)
        time.sleep(1)  # rate limit: 5 requests/min on free tier

    print(f"Scan complete at {datetime.now().strftime('%H:%M:%S')} — sleeping {SCAN_INTERVAL_SECS}s")
    time.sleep(SCAN_INTERVAL_SECS)

Real costs: data, infra, and maintenance

Here's an honest accounting of what a DIY scanner costs:

ComponentDelayed (Polygon Starter)Real-time (Polygon Advanced)
Data subscription~$29/mo~$199/mo
Cloud hosting (Railway, Fly.io, etc.)$5–10/mo (always-on instance)$5–10/mo
Development time (initial build)2–6 weeks (part-time)4–10 weeks (sweep detection adds complexity)
Ongoing maintenance2–4 hours/mo (data format changes, bugs)4–8 hours/mo
Total year-1 cost~$420–$520 cash + 200 hours~$2,500–$2,700 cash + 400 hours

The maintenance cost is often underestimated. Data providers change their API schemas, rate limits, and field names without always announcing it. Polygon has updated its options endpoint schema multiple times. Your scanner will break at least quarterly and require debugging.

What the DIY scanner does well: customization. If you have specific criteria that no commercial tool exposes (sector-only, specific DTE windows, cross-referencing with your own watchlist data), DIY gives you full control. If you're building a commercial product, you also need to own the data layer rather than re-selling a provider's terms.

DIY vs. API: the honest comparison

FactorDIY scannerOptions flow API (e.g., RadarPulse)
Time to first alertDays to weeksMinutes (copy-paste the webhook example)
Sweep detectionRequires tick data ($199/mo+)Included at lower tiers
Scoring calibrationManual (tune from scratch)Calibrated against historical outcomes
Multi-leg filterComplex (requires leg-matching logic)Handled by provider
Congress / cross-domain overlapManual join from SEC disclosure dataPre-tagged in the signal
Historical dataRequires separate archive buildQueryable via /flow/historical
CustomizationFull controlLimited to exposed filter parameters
Operational burdenHigh (you maintain it)Low (provider maintains it)
Cost (real-time)~$200–$220/mo + hoursIncluded in Elite subscription

The typical developer journey: start with a DIY scanner to understand the data, learn what the quality issues are, then switch to an API once they realize the edge is in signal quality rather than raw data access. The custom scoring model that felt important in week 1 usually converges to something very similar to the commercial scoring after a month of calibration.

Frequently asked questions

Can I use Interactive Brokers as a data source?

Yes. IBKR's Trader Workstation API (TWS API) provides real-time options data including individual trade ticks if you subscribe to their options market data packages. The API uses Java or Python client libraries (ib_insync is the most popular Python wrapper). The advantage: real-time data included in the account/subscription. The disadvantage: your scanner only runs while TWS is open on your machine (or a cloud VM running TWS); it's less stable than a pure WebSocket stream from a data vendor.

How do I handle options data normalization across expiry formats?

Different providers use different date formats: 2026-01-17 (ISO 8601), 20260117 (YYYYMMDD), Jan 17 2026 (human-readable). Always parse to a consistent datetime.date object early in your pipeline. The OCC standardized options symbol format (NVDA260117C00150000 = NVDA, Jan 17 2026 Call, strike 150.00) is the canonical identifier. Use it as your primary key if your provider supports it, as it avoids ambiguous field matching across sources.

How do I filter out market-maker hedges from a DIY scanner?

There is no perfectly reliable filter, but several heuristics help: (1) bid-side prints are more likely hedges than ask-side, so filter for ask-side or mid fills; (2) deep ITM contracts with low Vol/OI but large volume are often rolls or delta hedges, not directional bets; (3) simultaneous prints in opposing legs (call + put at the same strike) are spreads, not directional; (4) prints with large volume but very low premium per share (deep ITM, high delta) are likely synthetic equity positions. These four filters together remove most of the routine hedging noise without requiring multi-leg detection logic.

Can I use the RadarPulse API instead of building my own?

Yes. The RadarPulse API (currently staged for launch) exposes scored options flow signals via REST and WebSocket, with pre-computed Vol/OI ratios, sweep detection, and composite unusualness scores. Elite subscribers get API key access, and the developers hub includes integration examples for Discord bots, Zapier, and webhook consumers. If you want to build on top of scored flow data rather than raw prints, this is the faster path.

Skip the raw OPRA tape. RadarPulse's developer API provides pre-scored, sweep-detected, cross-domain-tagged flow signals. Use the raw data guide above to understand how scoring works, then use the API to ship in days instead of weeks.

API access, webhook support, and historical data are available for Elite subscribers. Join the waitlist to be first in.

Join the waitlist →