Scale detection throughput
Isaac 0.1 supports asynchronous and batched workflows so you can process inspection backlogs without spinning up additional infrastructure.
Async pipeline
The core detect call is synchronous, so we spin it out into an asyncio.to_thread worker and gather the futures; this gives us concurrent CPU-bound preprocessing while Isaac handles GPU inference.
import asyncio
from glob import glob
from perceptron import detect
async def detect_async(image_path, *, classes):
return await asyncio.to_thread(
detect,
image=image_path,
classes=classes,
expects="box",
)
async def process_batch(image_paths, classes):
tasks = [detect_async(path, classes=classes) for path in image_paths]
return await asyncio.gather(*tasks)
results = asyncio.run(process_batch(glob("parts/*.jpg"), ["scratch"]))
Queue with backoff
When the control plane returns RateLimitError, retry with exponential backoff (or the precise retry_after header) so long-running batches keep moving without hammering the API.
import time
from perceptron.errors import RateLimitError
from perceptron import detect
def detect_with_backoff(image, classes, attempts=3):
for attempt in range(attempts):
try:
return detect(image=image, classes=classes, expects="box")
except RateLimitError as err:
if attempt == attempts - 1:
raise
wait = float(err.retry_after or (2 ** attempt))
time.sleep(wait)
Stream results to S3
import boto3
import json
import uuid
s3 = boto3.client("s3")
def box_to_dict(box):
return {
"mention": box.mention,
"top_left": {"x": box.top_left.x, "y": box.top_left.y},
"bottom_right": {"x": box.bottom_right.x, "y": box.bottom_right.y},
}
def store_result(result, bucket, key_prefix, *, width=None, height=None):
points = result.points
if width and height:
points = result.points_to_pixels(width, height)
payload = {
"text": result.text,
"detections": [box_to_dict(box) for box in points or []],
}
key = f"{key_prefix}/{uuid.uuid4()}.json"
s3.put_object(Bucket=bucket, Key=key, Body=json.dumps(payload))
Monitor throughput
Track a rolling distribution of per-frame latency so you can spot regressions or decide when to scale workers; median plus p99 usually tells you whether storage or inference is the bottleneck.
import statistics as stats
import time
from glob import glob
from perceptron import detect
def detect_with_timing(image_path, classes):
start = time.perf_counter()
result = detect(image=image_path, classes=classes, expects="box")
duration_ms = (time.perf_counter() - start) * 1000
return result, duration_ms
durations = []
for frame in glob("parts/*.jpg"):
_, elapsed = detect_with_timing(frame, ["scratch"])
durations.append(elapsed)
print(f"Median inference (ms): {stats.median(durations):.1f}")
print(f"p99 inference (ms): {stats.quantiles(durations, n=100)[98]:.1f}")
Batch mode lets you process tens of thousands of images per hour on a single GPU-backed worker—no retraining, no extra orchestration.