Published on

Catching Credential Stuffing Before It Catches You (a.k.a. Finding the Needle That Logs In)

Authors

Raw auth logs are where good intentions go to die. Somewhere in that firehose of failed logins is a real attacker patiently spraying passwords across your user base — and you’re supposed to spot them by squinting at a wall of timestamps. Hard pass.

credential-stuffing-detection.png

So I wrote a detection that does the squinting for me: it watches for an account getting hammered with failed logins from a rotating set of IPs, and when it sees the pattern, it opens a Jira ticket for the SOC. No more needle-in-haystack. Just a ticket that says “hey, look at this account.”

Why Credential Stuffing Is Annoying to Catch

Credential stuffing (and its noisier cousin, password spray) is sneaky precisely because each individual event looks boring. A single failed login? Happens constantly. People fat-finger passwords, password managers go stale, someone’s phone is retrying an old credential in their pocket. The signal isn’t any one event — it’s the shape of a cluster of events.

The thing that makes credential stuffing distinct:

  • Volume against one account. An attacker replaying a leaked credential list hits the same username over and over.
  • Rotating source IPs. They spread requests across a proxy pool or botnet so no single IP trips a rate limit.
  • A tight time window. This is automated, so it’s fast — bursts, not a slow trickle.

Any one of those in isolation is noise. All three together, in a five-minute window? That’s a story. The trick is writing something that only fires when the story shows up.

The Detection Logic

The rule I settled on is deliberately simple: flag any account with 20+ failed logins from rotating IPs inside a 5-minute window. The “rotating IPs” part is what keeps it from screaming every time one misconfigured service account retries a thousand times from the same box (that’s a different problem, and a less interesting one).

It’s Python running against our Okta auth logs — pulled straight from the Okta System Log API — on a schedule. Nothing exotic: no streaming pipeline, no ML, no vendor with a quadrant. Just a job that wakes up, pulls a recent window of authentication events, groups them by account, and checks each group against the threshold.

from collections import defaultdict
from datetime import timedelta

FAILED_LOGIN_THRESHOLD = 20
WINDOW = timedelta(minutes=5)
MIN_DISTINCT_IPS = 2  # "rotating" — 2+ IPs means it's not just one box

def find_credential_stuffing(events):
    """
    events: iterable of dicts with keys:
      username, source_ip, outcome, timestamp (datetime)
    Returns a list of suspicious accounts.
    """
    # Only care about failures
    failures = [e for e in events if e["outcome"] == "FAILURE"]

    by_account = defaultdict(list)
    for e in failures:
        by_account[e["username"]].append(e)

    hits = []
    for username, evs in by_account.items():
        evs.sort(key=lambda e: e["timestamp"])
        # slide a 5-minute window across this account's failures
        if _has_burst(evs):
            hits.append(_summarize(username, evs))
    return hits

The actual “is this a burst” check is a sliding window over that account’s failures. For each event, look forward until the timestamp gap exceeds five minutes, count how many failures and how many distinct IPs fell inside, and compare against the thresholds.

def _has_burst(sorted_failures):
    n = len(sorted_failures)
    left = 0
    for right in range(n):
        # shrink window from the left until it fits in WINDOW
        while sorted_failures[right]["timestamp"] - \
              sorted_failures[left]["timestamp"] > WINDOW:
            left += 1

        window = sorted_failures[left:right + 1]
        distinct_ips = {e["source_ip"] for e in window}

        if len(window) >= FAILED_LOGIN_THRESHOLD and \
           len(distinct_ips) >= MIN_DISTINCT_IPS:
            return True
    return False

Is this the most elegant windowing code ever written? No. Does it find the thing? Yes. I set MIN_DISTINCT_IPS to 2 deliberately: one box hammering a single account is just brute force — a different, more boring problem — but the moment the failures come from two or more IPs, you’re looking at rotation, which is the actual signature of stuffing. Two is low enough to catch a slow rotation and high enough to ignore the noisy single-source retries.

Turning a Hit Into a Ticket

A detection that fires into the void is just a cron job with self-esteem issues. The whole point is to put something in front of a human who can act. So every hit gets summarized into a payload and pushed to Jira as a ticket the SOC can triage.

def _summarize(username, failures):
    distinct_ips = sorted({e["source_ip"] for e in failures})
    first = min(e["timestamp"] for e in failures)
    last = max(e["timestamp"] for e in failures)
    return {
        "username": username,
        "failed_attempts": len(failures),
        "distinct_ips": distinct_ips,
        "first_seen": first.isoformat(),
        "last_seen": last.isoformat(),
    }
import requests

def open_jira_ticket(hit, jira_base, auth, project_key="SOC"):
    summary = (
        f"Possible credential stuffing against {hit['username']} "
        f"({hit['failed_attempts']} failures from {len(hit['distinct_ips'])} IPs)"
    )
    description = (
        f"Account *{hit['username']}* saw {hit['failed_attempts']} failed "
        f"logins between {hit['first_seen']} and {hit['last_seen']} "
        f"from {len(hit['distinct_ips'])} distinct source IPs.\n\n"
        f"Source IPs:\n" + "\n".join(f"- {ip}" for ip in hit["distinct_ips"])
    )
    payload = {
        "fields": {
            "project": {"key": project_key},
            "summary": summary,
            "description": description,
            "issuetype": {"name": "Task"},
        }
    }
    resp = requests.post(
        f"{jira_base}/rest/api/2/issue", json=payload, auth=auth, timeout=15
    )
    resp.raise_for_status()
    return resp.json()["key"]

Now the SOC analyst opens their queue and sees a ticket that already answers the first three questions they’d ask: which account, how bad, and where from. The IP list is right there for blocklisting or a threat-intel lookup.

Not Filing the Same Attack Every Five Minutes

Here’s the catch with a job that runs on a tight loop: an attacker who hammers an account for twenty minutes trips the detection on four consecutive runs — and nobody wants four identical tickets. So before filing, I fingerprint each hit into a stable hash and skip anything I’ve already seen.

import hashlib

def fingerprint(hit):
    # stable signature: the account plus the set of IPs coming after it
    ips = ",".join(hit["distinct_ips"])
    raw = f"{hit['username']}|{ips}"
    return hashlib.sha256(raw.encode()).hexdigest()[:16]

That fingerprint rides along on the Jira ticket as a label, and the filing step searches for the label before it creates anything. If a ticket with this signature already exists, the run skips it (or, if you’re feeling tidy, comments on the open one instead of spawning a new ticket). One account under sustained attack gets one ticket — not a pager full of duplicates.

Wiring It Up on a Schedule

There’s no webhook here, no always-on listener — it’s a batch job. It runs every five minutes: the script wakes up, pulls a recent slice of Okta auth events, runs the detection, and files whatever’s new.

def run():
    events = load_recent_auth_logs()  # last few minutes from the Okta System Log API
    hits = find_credential_stuffing(events)
    for hit in hits:
        key = open_jira_ticket(hit, JIRA_BASE, JIRA_AUTH)
        print(f"Filed {key} for {hit['username']}")

if __name__ == "__main__":
    run()

Cron fires it every five minutes. Each run pulls a slice a little longer than the five-minute window so a burst straddling two runs doesn’t get sliced in half — and because every hit is fingerprinted, that overlap re-detecting the same attack never turns into duplicate tickets. After that, it’s a detection-to-ticket pipeline that runs while you do literally anything else.

Why This Matters

Credential stuffing is one of those threats that’s both extremely common and extremely easy to miss, because the raw signal is buried under a mountain of perfectly normal failed logins. The win here isn’t a fancy algorithm — it’s turning “someone could go read the logs” into “a ticket showed up describing exactly what to look at.” That’s the difference between a control that exists on paper and one that actually fires.

Is it perfect? Not even close. A patient attacker who stays under 20 attempts, or rotates slowly enough to dodge the window, walks right past it. The thresholds that work for us won’t necessarily be yours — any threshold is a tradeoff between alert fatigue (too sensitive) and quiet misses (too loose). And it pairs best with the boring-but-essential stuff: MFA everywhere, breached-password checks, and rate limiting at the edge.

But as a tripwire that converts noise into an actionable ticket with zero human babysitting? It earns its keep. What threshold would you start with — and how long before you stopped trusting it and started tuning?