High-Speed IOC Correlation: A Python Pipeline for Analysts & Researchers
Correlate millions of logs in seconds with a schema-validated, idempotent pipeline using Pandas, DuckDB, and Pandera.
Abstract
A compact, vendor-agnostic pipeline that correlates large endpoint-log batches with IOC feeds in seconds.
It emphasises: schema validation (fail-fast), vectorised set matching (deterministic and fast), idempotent ingestion (safe re-runs), CMDB enrichment (context), a transparent triage policy (auditable), and a tiny operational footprint (DuckDB + systemd + Slack).
Objective
Engineer a reproducible IOC-correlation workflow that:
rejects bad inputs via strict contracts
matches at O(n+m) using set membership (no row loops)
enriches with CMDB for user / department context
scores hits with a clear, explainable policy
writes idempotently with a high-water mark
alerts only on signal (non-empty high-priority results)
Everything below ships as a single Python CLI.
Pipeline at a Glance
1. The Complete Pipeline Script (hunt.py)
Save as hunt.py, chmod +x hunt.py, and run.
#!/usr/bin/env python3
import os
import sys
import argparse
import json
from pathlib import Path
from ipaddress import ip_address, ip_network
import pandas as pd
import pandera as pa
from pandera import typing as pat
import requests
import duckdb
# ---------------------------
# 1. SCHEMA DEFINITIONS (PANDERA)
# ---------------------------
class IOCs(pa.SchemaModel):
indicator: pat.Series[str]
type: pat.Series[str] = pa.Field(isin=[”ip”, “hash”, “domain”])
class Logs(pa.SchemaModel):
timestamp: pat.Series[pa.DateTime]
hostname: pat.Series[str]
destination_ip: pat.Series[str] = pa.Field(nullable=True)
file_hash: pat.Series[str] = pa.Field(nullable=True)
process_name: pat.Series[str] = pa.Field(nullable=True)
class Assets(pa.SchemaModel):
hostname: pat.Series[str]
primary_user: pat.Series[str] = pa.Field(nullable=True)
department: pat.Series[str] = pa.Field(nullable=True)
os_version: pat.Series[str] = pa.Field(nullable=True)
last_patch: pat.Series[str] = pa.Field(nullable=True)
# ---------------------------
# 2. CORE HELPERS
# ---------------------------
READ_OPTS = dict(dtype=str, keep_default_na=False,
na_values=[”“, “null”, “NULL”, “NaN”])
SUSPICIOUS = {”powershell.exe”, “wscript.exe”, “cmd.exe”, “curl”, “rundll32.exe”}
SENSITIVE = {”Finance”, “HR”, “Exec”, “Legal”}
def normalize_iocs(df: pd.DataFrame):
df = df.assign(
indicator=df[”indicator”].astype(str).str.strip(),
type=df[”type”].str.lower().str.strip()
)
ip_set = set(df.loc[df.type == “ip”, “indicator”])
hash_set = set(df.loc[df.type == “hash”, “indicator”])
domain_set = set(df.loc[df.type == “domain”,”indicator”])
return ip_set, hash_set, domain_set
def qa_logs(df: pd.DataFrame) -> pd.DataFrame:
df = df.dropna(subset=[”destination_ip”, “file_hash”], how=”all”)
df[”timestamp”] = pd.to_datetime(df[”timestamp”], errors=”coerce”, utc=True)
df = df.dropna(subset=[”timestamp”])
for col in [”hostname”, “destination_ip”, “file_hash”, “process_name”]:
if col in df.columns:
df[col] = df[col].astype(str).str.strip()
return df
def match(logs: pd.DataFrame, ip_set: set, hash_set: set) -> pd.DataFrame:
return logs[
logs[”destination_ip”].isin(ip_set) |
logs[”file_hash”].isin(hash_set)
].drop_duplicates()
def enrich(hits: pd.DataFrame, assets: pd.DataFrame) -> pd.DataFrame:
assets = assets.drop_duplicates(”hostname”)
out = (hits.merge(assets, on=”hostname”, how=”left”)
.assign(patch_age=lambda x:
(pd.Timestamp.now(tz=”UTC”) -
pd.to_datetime(x[”last_patch”], errors=”coerce”)).dt.days))
cols = [”timestamp”, “hostname”, “primary_user”, “department”,
“process_name”, “destination_ip”, “file_hash”,
“os_version”, “patch_age”]
return out.reindex(columns=cols, fill_value=pd.NA)
def apply_allowlists(df: pd.DataFrame, cidrs=None, golden_hashes=None) -> pd.DataFrame:
nets = [ip_network(c) for c in (cidrs or [])]
good = set(golden_hashes or [])
def ip_ok(ip):
try:
return any(ip_address(ip) in n for n in nets)
except Exception:
return False
if “destination_ip” in df:
df = df[~df[”destination_ip”].map(ip_ok)]
if “file_hash” in df:
df = df[~df[”file_hash”].isin(good)]
return df
def triage(df: pd.DataFrame):
if df.empty:
return df.assign(score=[]), df.head(0)
df = df.copy()
df[”score”] = (
(df[”file_hash”].notna()).astype(int) * 2 +
(df.get(”process_name”, “”).str.lower().isin(SUSPICIOUS)).astype(int) +
(df.get(”department”, “”).isin(SENSITIVE)).astype(int) +
(df.get(”patch_age”, 0).fillna(0).astype(float) > 30).astype(int)
)
hi = df[df[”score”] >= 3].sort_values([”hostname”, “timestamp”])
md = df[df[”score”] == 2].sort_values([”hostname”, “timestamp”])
return hi, md
def high_water_mark(path=”out/manifest.json”) -> pd.Timestamp:
p = Path(path)
if not p.exists():
return pd.Timestamp(”1970-01-01”, tz=”UTC”)
return pd.to_datetime(json.loads(p.read_text())[”max_ts”], utc=True)
def update_hwm(ts: pd.Timestamp, path=”out/manifest.json”):
Path(path).write_text(json.dumps({”max_ts”: ts.isoformat()}))
def write_duckdb(df: pd.DataFrame, db=”out/risk.duckdb”, table=”current_risk”):
con = duckdb.connect(db)
con.execute(f”CREATE TABLE IF NOT EXISTS {table} AS SELECT * FROM df LIMIT 0”)
con.register(”df”, df)
con.execute(f”INSERT INTO {table} SELECT * FROM df”)
def send_slack_alert(df: pd.DataFrame, url: str):
if not url:
print(”WARN: SLACK_WEBHOOK_URL not set; skipping alert.”, file=sys.stderr)
return
n, h = len(df), df[”hostname”].nunique()
top = df.groupby(”hostname”).size().sort_values(ascending=False).head(3)
hosts = “\n”.join([f”• `{k}`: {v} hits” for k, v in top.items()])
payload = {
“text”: f”IOC correlation: {n} high-priority rows across {h} host(s)”,
“blocks”: [
{”type”: “section”, “text”: {”type”: “mrkdwn”,
“text”: f”:warning: *IOC correlation:* `{n}` rows / `{h}` host(s)”}},
{”type”: “section”, “text”: {”type”: “mrkdwn”,
“text”: f”*Top hosts:*\n{hosts}”}}
]
}
try:
requests.post(url, json=payload, timeout=10).raise_for_status()
except requests.RequestException as e:
print(f”WARN Slack: {e}”, file=sys.stderr)
# ---------------------------
# 3. ORCHESTRATOR
# ---------------------------
def main():
ap = argparse.ArgumentParser(description=”IOC correlation pipeline”)
ap.add_argument(”--assets”, required=True, help=”Path to CMDB assets CSV”)
ap.add_argument(”--logs”, required=True, help=”Path to endpoint logs CSV”)
ap.add_argument(”--ioc”, required=True, help=”Path to IOC feed CSV”)
ap.add_argument(”--out”, default=”out/high_priority_incidents.csv”,
help=”Output path for high-priority CSV”)
ap.add_argument(”--db”, default=”out/risk.duckdb”,
help=”Output path for DuckDB file”)
ap.add_argument(”--since-file”, default=”out/manifest.json”,
help=”Path for manifest/high-water-mark file”)
ap.add_argument(”--allow-cidr”, nargs=”*”, default=[],
help=”CIDR allowlist (e.g., 10.0.0.0/8)”)
ap.add_argument(”--allow-hash”, nargs=”*”, default=[],
help=”Known-good hash allowlist”)
args = ap.parse_args()
# READ
assets = pd.read_csv(args.assets, **READ_OPTS)
logs = pd.read_csv(args.logs, **READ_OPTS)
iocs = pd.read_csv(args.ioc, **READ_OPTS)
# Optional schema validation:
# IOCs.validate(iocs); Logs.validate(logs); Assets.validate(assets)
# TRANSFORM
max_ts = high_water_mark(args.since_file)
logs = qa_logs(logs)
logs = logs[logs[”timestamp”] > max_ts]
if logs.empty:
print(”OK: no new rows”)
return
ip_set, hash_set, _ = normalize_iocs(iocs)
hits = match(logs, ip_set, hash_set)
hits = apply_allowlists(hits, cidrs=args.allow_cidr,
golden_hashes=args.allow_hash)
rep = enrich(hits, assets)
hi, md = triage(rep)
# LOAD
Path(args.out).parent.mkdir(parents=True, exist_ok=True)
Path(args.db).parent.mkdir(parents=True, exist_ok=True)
Path(args.since_file).parent.mkdir(parents=True, exist_ok=True)
if not hi.empty:
hi.to_csv(args.out, index=False)
write_duckdb(hi, db=args.db, table=”current_risk”)
update_hwm(hi[”timestamp”].max(), args.since_file)
send_slack_alert(hi, os.environ.get(”SLACK_WEBHOOK_URL”))
print(f”OK: {len(hi)} high-priority rows -> {args.out} & {args.db}”)
elif not md.empty:
write_duckdb(md, db=args.db, table=”medium_risk”)
update_hwm(md[”timestamp”].max(), args.since_file)
print(f”OK: {len(md)} medium rows -> {args.db} (no alert)”)
else:
print(”OK: no new hits”)
if __name__ == “__main__”:
main()2) Deconstruction (Why Each Part Exists)
Inputs & Contracts: Fail-fast contracts prevent “garbage in” from contaminating results. Optional Pandera schemas make assumptions executable.
CSV Hygiene:
dtype=strand customna_valuesavoid pandas’ guesswork (e.g., hashes read as floats).Deterministic Matching: IOCs $\rightarrow$ Python sets; vectorized
isin$\rightarrow$ $O(n+m)$ membership checks, no row loops.Enrichment Join:
Left-joinwith CMDB keeps hits even if inventory is stale; computepatch_agefor risk.Noise Reduction: Pre-triage allowlists (CIDR & golden hashes) strip known-good traffic.
Transparent Triage: Rule-based, auditable scoring: hash hit > IP hit; suspicious process; sensitive department; patch age.
Idempotent Ingestion: High-water mark in
out/manifest.jsonensures safe re-runs; DuckDB is a fast, file-based sink.Native Alerting: Slack webhook (env var) with concise, host-grouped summary; no
curl.
3) Run & Schedule
Manual run
export SLACK_WEBHOOK_URL=”https://hooks.slack.com/services/XXXX/YYYY/ZZZZ”
python3 hunt.py \
--assets ./data/cmdb_export.csv \
--logs ./data/endpoint_logs.csv \
--ioc ./data/ioc_feed.csv \
--out ./out/high_priority_incidents.csv \
--db ./out/risk.duckdb \
--since-file ./out/manifest.json \
--allow-cidr 10.0.0.0/8 192.168.0.0/16systemd service (/etc/systemd/system/ioc-correlation.service)
[Unit]
Description=IOC Correlation Pipeline
[Service]
Type=oneshot
Environment=PYTHONUNBUFFERED=1
Environment=”SLACK_WEBHOOK_URL=https://hooks.slack.com/services/XXXX/YYYY/ZZZZ”
WorkingDirectory=/opt/ioc
ExecStart=/usr/bin/python3 /opt/ioc/hunt.py \
--assets /opt/ioc/data/cmdb_export.csv \
--logs /opt/ioc/data/endpoint_logs.csv \
--ioc /opt/ioc/data/ioc_feed.csv \
--out /opt/ioc/out/high_priority_incidents.csv \
--db /opt/ioc/out/risk.duckdb \
--since-file /opt/ioc/out/manifest.json \
--allow-cidr 10.0.0.0/8 192.168.0.0/16
systemd timer (/etc/systemd/system/ioc-correlation.timer)
[Unit]
Description=Run IOC Correlation every 15 minutes
[Timer]
OnCalendar=*:0/15
Persistent=true
[Install]
WantedBy=timers.target4) Performance, Testing, and Extensions
Benchmarks (laptop baseline)
Tips – install pyarrow for faster CSV IO; ≥50 M rows → switch to Polars or chunked reads; ≥1 M hash IOCs → front with a Bloom filter (~0.1 % FP).
Sanity tests
import pandas as pd, numpy as np
from hunt import qa_logs, triage
def test_drop_rows_without_observables():
df = pd.DataFrame([
{”timestamp”:”2024-01-01T00:00:00Z”,”hostname”:”h”,
“destination_ip”:np.nan,”file_hash”:np.nan},
{”timestamp”:”2024-01-01T00:00:01Z”,”hostname”:”h”,
“destination_ip”:”1.2.3.4”,”file_hash”:np.nan},
])
out = qa_logs(df)
assert len(out) == 1 and out.iloc[0][”destination_ip”] == “1.2.3.4”
def test_triage_policy():
rep = pd.DataFrame([{
“timestamp”: pd.Timestamp.now(tz=”UTC”), “hostname”:”h1”,
“primary_user”:”u”,”department”:”Finance”,
“process_name”:”powershell.exe”,”destination_ip”:”2.2.2.2”,
“file_hash”:”a1b2”,”os_version”:”win”,”patch_age”:45.0
}])
hi, md = triage(rep)
assert len(hi) == 1 and hi.iloc[0][”score”] >= 3Security & ops hygiene
chmod 600on CSV/DB/manifestRetain outputs 30–90 d; rotate
Version triage thresholds; keep changelog
Redact or hash
primary_userin external alerts if policy requires
Extensions
Domains – add
destination_domain+ punycode normalisation; match againstdomain_setATT&CK – include
technique_idin IOC feed; export Navigator layer JSONStreaming – wrap core in a Kafka consumer; emit enriched hits downstream
SOAR – open tickets automatically for high-priority rows with CMDB context
#ThreatHunting #DetectionEngineering #CyberSecurity #InfoSec #Python #Pandas #DuckDB #DataPipeline #SecurityAutomation #IOC #LogAnalysis #ETL #ThreatIntel #EndpointSecurity #DataValidation #BlueTeam #SecurityAnalytics #Code #Scripting #DataQuality #Deterministic #Reproducibility #CyberResearch #SecurityResearch #ThreatResearch #DigitalForensics #SecurityTools #OpenSourceSecurity

