Mark as Flow (Manual Flows) for Python
Overview
Hud automatically instruments your application's entry points — HTTP frameworks, message queues, background workers, and more — with no code changes. Manual flows let you extend this to entry points that automatic instrumentation doesn't cover.
When you mark a block of code as a flow, the Hud SDK collects flow metrics and forensics for it, and Hud treats it as a first-class entry point: it appears in the UI and MCP, and gets issue detection, error rates, duration graphs, and related issues — just like an auto-detected flow.
There are two kinds of manual flows in Python:
| Flow type | Use it for | Where it appears in Hud | flow_type |
|---|---|---|---|
| Custom | Everything that isn't a queue (e.g. scripts, scheduled tasks, cron jobs, background loops) | Custom Flows page | FlowType.CUSTOM (default) |
| Queue | Custom or in-house queue consumers | Queues page | FlowType.QUEUE |
All of the manual flow APIs are exported from hud_sdk and work together with set_context and set_failure.
Requirements
| SDK | Minimum version | Supported flow types |
|---|---|---|
| Python | 0.4.10 | Custom, Queue |
Important Notes
- All manual flow APIs must be called after
register()— calling them during startup before Hud is initialized has no effect.- Flow names must be stable, non-empty strings. Avoid high-cardinality names (e.g. embedding user IDs or timestamps) — put variable data in
set_contextinstead. The number of distinct custom flow names is capped.- Manual flows cannot be nested: starting a flow inside an already-active flow (or inside an HTTP request) is skipped, and the parent flow is used.
- We recommend using the decorator or context manager APIs rather than the manual
start_flow/end_flowpair — they automatically end the flow and record exceptions for you.
Mark Custom flows
Use custom flows for code that isn't a queue — scripts, main functions, scheduled tasks, cron jobs, background loops, and so on.
Custom flows appear on the Custom Flows page, which includes a table of all custom flows plus a dedicated page per flow with detailed graphs, functions, forensics, and related issues.
There are three ways to define a custom flow. The decorator and context manager are recommended.
Option A: Decorator (recommended)
Decorate a function with hud_sdk.sync_flow(name) (or hud_sdk.async_flow(name) for async def functions). Every call to the function is tracked as a flow. If the function raises, the exception's class name is recorded as the flow's error and the failure triggers an investigation.
import hud_sdk
@hud_sdk.sync_flow("nightly-report")
def generate_nightly_report(date):
data = collect_data(date)
return build_report(data)
# Each call is tracked as the "nightly-report" flow:
generate_nightly_report("2026-06-17")import hud_sdk
@hud_sdk.async_flow("nightly-report")
async def generate_nightly_report(date):
data = await collect_data(date)
return build_report(data)Option B: Context manager (recommended)
Wrap a block of code with with hud_sdk.sync_flow(name): (or async with hud_sdk.async_flow(name): in async code). The flow spans the body of the with block, and an uncaught exception is recorded as the flow's error.
import hud_sdk
with hud_sdk.sync_flow("cleanup-expired-sessions"):
delete_expired_sessions()import hud_sdk
async with hud_sdk.async_flow("cleanup-expired-sessions"):
await delete_expired_sessions()Option C: start_flow / end_flow
start_flow / end_flowWhen you can't wrap a block or a function, bracket the code manually with hud_sdk.start_flow(name) and hud_sdk.end_flow(). Pass an optional failure reason to end_flow to mark the flow as failed.
Unlike the decorator and context manager, the manual pair does not automatically record exceptions — use set_failure or pass a reason to end_flow yourself.
import hud_sdk
def run_cron_job():
hud_sdk.start_flow("cleanup-expired-sessions")
try:
delete_expired_sessions()
except Exception:
hud_sdk.set_failure("CleanupFailed")
raise
finally:
hud_sdk.end_flow()Always pair every
start_flowwith anend_flow(including on the error path). Anend_flowwithout a precedingstart_flowis skipped.
Mark Queue flows
To instrument a custom or in-house queue consumer, pass flow_type=hud_sdk.FlowType.QUEUE. The flow appears on the Queues page as a first-class queue (not as a custom flow), and no SQS / Kafka dependency is required.
Set framework to a short string identifying your queue (e.g. "sqs", "rabbitmq"). It defaults to "custom".
For queue flows you can also pass two extra arguments:
enqueued_at— a timezone-awaredatetimeof when the message was enqueued. Hud uses it to record the end-to-end duration (how long the message waited in the queue before processing). A naive datetime is ignored.message_count— the number of messages processed in this invocation (int).
enqueued_at and message_count are supported by the manual start_flow and the context manager, but not by the decorator (a single decorated function has no per-call queue metadata), where they are ignored.
from datetime import datetime, timezone
import hud_sdk
with hud_sdk.sync_flow(
"orders",
flow_type=hud_sdk.FlowType.QUEUE,
framework="sqs",
enqueued_at=message.enqueued_at, # a timezone-aware datetime
message_count=1,
):
process_order(message.payload)from datetime import datetime, timezone
import hud_sdk
hud_sdk.start_flow(
"orders",
flow_type=hud_sdk.FlowType.QUEUE,
framework="sqs",
enqueued_at=message.enqueued_at, # a timezone-aware datetime
message_count=1,
)
try:
process_order(message.payload)
finally:
hud_sdk.end_flow()import hud_sdk
# enqueued_at / message_count are not available on the decorator.
@hud_sdk.async_flow("orders", flow_type=hud_sdk.FlowType.QUEUE, framework="sqs")
async def handle_order(message):
await process_order(message.payload)Combining with context and failures
Manual flows work with the same helpers as auto-detected flows:
set_context— attach metadata (e.g.order_id,tenant) to the current flow's forensics.set_failure— explicitly mark the current flow as failed even when no exception was raised.
import hud_sdk
@hud_sdk.sync_flow("process-batch")
def process_batch(batch_id):
hud_sdk.set_context(batch_id=batch_id)
result = run_batch(batch_id)
if result.rejected:
hud_sdk.set_failure("BatchRejected", reason=result.reason)
return resultAPI reference
# Custom and queue flows — decorator or context manager (recommended)
hud_sdk.sync_flow(
name: str,
*,
flow_type: hud_sdk.FlowType = hud_sdk.FlowType.CUSTOM,
framework: str = "custom",
enqueued_at: datetime | None = None, # queue flows only; ignored on the decorator
message_count: int | None = None, # queue flows only; ignored on the decorator
)
hud_sdk.async_flow(...) # same signature as sync_flow, for async code
# Custom and queue flows — manual pair
hud_sdk.start_flow(
name: str,
*,
flow_type: hud_sdk.FlowType = hud_sdk.FlowType.CUSTOM,
framework: str = "custom",
enqueued_at: datetime | None = None, # queue flows only
message_count: int | None = None, # queue flows only
) -> None
hud_sdk.end_flow(failure_reason: str | None = None) -> None
# Flow types
hud_sdk.FlowType.CUSTOM
hud_sdk.FlowType.QUEUE
