Skip to main content

Scale detection throughput

Isaac 0.1 supports asynchronous and batched workflows so you can process inspection backlogs without spinning up additional infrastructure.

Async pipeline

The core detect call is synchronous, so we spin it out into an asyncio.to_thread worker and gather the futures; this gives us concurrent CPU-bound preprocessing while Isaac handles GPU inference.
import asyncio
from glob import glob
from perceptron import detect

async def detect_async(image_path, *, classes):
    return await asyncio.to_thread(
        detect,
        image=image_path,
        classes=classes,
        expects="box",
    )

async def process_batch(image_paths, classes):
    tasks = [detect_async(path, classes=classes) for path in image_paths]
    return await asyncio.gather(*tasks)

results = asyncio.run(process_batch(glob("parts/*.jpg"), ["scratch"]))

Queue with backoff

When the control plane returns RateLimitError, retry with exponential backoff (or the precise retry_after header) so long-running batches keep moving without hammering the API.
import time
from perceptron.errors import RateLimitError
from perceptron import detect

def detect_with_backoff(image, classes, attempts=3):
    for attempt in range(attempts):
        try:
            return detect(image=image, classes=classes, expects="box")
        except RateLimitError as err:
            if attempt == attempts - 1:
                raise
            wait = float(err.retry_after or (2 ** attempt))
            time.sleep(wait)

Stream results to S3

import boto3
import json
import uuid

s3 = boto3.client("s3")

def box_to_dict(box):
    return {
        "mention": box.mention,
        "top_left": {"x": box.top_left.x, "y": box.top_left.y},
        "bottom_right": {"x": box.bottom_right.x, "y": box.bottom_right.y},
    }

def store_result(result, bucket, key_prefix, *, width=None, height=None):
    points = result.points
    if width and height:
        points = result.points_to_pixels(width, height)

    payload = {
        "text": result.text,
        "detections": [box_to_dict(box) for box in points or []],
    }
    key = f"{key_prefix}/{uuid.uuid4()}.json"
    s3.put_object(Bucket=bucket, Key=key, Body=json.dumps(payload))

Monitor throughput

Track a rolling distribution of per-frame latency so you can spot regressions or decide when to scale workers; median plus p99 usually tells you whether storage or inference is the bottleneck.
import statistics as stats
import time
from glob import glob
from perceptron import detect

def detect_with_timing(image_path, classes):
    start = time.perf_counter()
    result = detect(image=image_path, classes=classes, expects="box")
    duration_ms = (time.perf_counter() - start) * 1000
    return result, duration_ms

durations = []
for frame in glob("parts/*.jpg"):
    _, elapsed = detect_with_timing(frame, ["scratch"])
    durations.append(elapsed)

print(f"Median inference (ms): {stats.median(durations):.1f}")
print(f"p99 inference (ms): {stats.quantiles(durations, n=100)[98]:.1f}")
Batch mode lets you process tens of thousands of images per hour on a single GPU-backed worker—no retraining, no extra orchestration.