Mark as Flow (Manual Flows) for Python

Overview

Hud automatically instruments your application's entry points — HTTP frameworks, message queues, background workers, and more — with no code changes. Manual flows let you extend this to entry points that automatic instrumentation doesn't cover.

When you mark a block of code as a flow, the Hud SDK collects flow metrics and forensics for it, and Hud treats it as a first-class entry point: it appears in the UI and MCP, and gets issue detection, error rates, duration graphs, and related issues — just like an auto-detected flow.

There are two kinds of manual flows in Python:

Flow typeUse it forWhere it appears in Hudflow_type
CustomEverything that isn't a queue (e.g. scripts, scheduled tasks, cron jobs, background loops)Custom Flows pageFlowType.CUSTOM (default)
QueueCustom or in-house queue consumersQueues pageFlowType.QUEUE

All of the manual flow APIs are exported from hud_sdk and work together with set_context and set_failure.



Requirements

SDKMinimum versionSupported flow types
Python0.4.10Custom, Queue
🚧

Important Notes

  • All manual flow APIs must be called after register() — calling them during startup before Hud is initialized has no effect.
  • Flow names must be stable, non-empty strings. Avoid high-cardinality names (e.g. embedding user IDs or timestamps) — put variable data in set_context instead. The number of distinct custom flow names is capped.
  • Manual flows cannot be nested: starting a flow inside an already-active flow (or inside an HTTP request) is skipped, and the parent flow is used.
  • We recommend using the decorator or context manager APIs rather than the manual start_flow / end_flow pair — they automatically end the flow and record exceptions for you.

Mark Custom flows

Use custom flows for code that isn't a queue — scripts, main functions, scheduled tasks, cron jobs, background loops, and so on.

Custom flows appear on the Custom Flows page, which includes a table of all custom flows plus a dedicated page per flow with detailed graphs, functions, forensics, and related issues.

There are three ways to define a custom flow. The decorator and context manager are recommended.

Option A: Decorator (recommended)

Decorate a function with hud_sdk.sync_flow(name) (or hud_sdk.async_flow(name) for async def functions). Every call to the function is tracked as a flow. If the function raises, the exception's class name is recorded as the flow's error and the failure triggers an investigation.

import hud_sdk

@hud_sdk.sync_flow("nightly-report")
def generate_nightly_report(date):
    data = collect_data(date)
    return build_report(data)

# Each call is tracked as the "nightly-report" flow:
generate_nightly_report("2026-06-17")
import hud_sdk

@hud_sdk.async_flow("nightly-report")
async def generate_nightly_report(date):
    data = await collect_data(date)
    return build_report(data)

Option B: Context manager (recommended)

Wrap a block of code with with hud_sdk.sync_flow(name): (or async with hud_sdk.async_flow(name): in async code). The flow spans the body of the with block, and an uncaught exception is recorded as the flow's error.

import hud_sdk

with hud_sdk.sync_flow("cleanup-expired-sessions"):
    delete_expired_sessions()
import hud_sdk

async with hud_sdk.async_flow("cleanup-expired-sessions"):
    await delete_expired_sessions()

Option C: start_flow / end_flow

When you can't wrap a block or a function, bracket the code manually with hud_sdk.start_flow(name) and hud_sdk.end_flow(). Pass an optional failure reason to end_flow to mark the flow as failed.

Unlike the decorator and context manager, the manual pair does not automatically record exceptions — use set_failure or pass a reason to end_flow yourself.

import hud_sdk

def run_cron_job():
    hud_sdk.start_flow("cleanup-expired-sessions")
    try:
        delete_expired_sessions()
    except Exception:
        hud_sdk.set_failure("CleanupFailed")
        raise
    finally:
        hud_sdk.end_flow()
👉

Always pair every start_flow with an end_flow (including on the error path). An end_flow without a preceding start_flow is skipped.


Mark Queue flows

To instrument a custom or in-house queue consumer, pass flow_type=hud_sdk.FlowType.QUEUE. The flow appears on the Queues page as a first-class queue (not as a custom flow), and no SQS / Kafka dependency is required.

Set framework to a short string identifying your queue (e.g. "sqs", "rabbitmq"). It defaults to "custom".

For queue flows you can also pass two extra arguments:

  • enqueued_at — a timezone-aware datetime of when the message was enqueued. Hud uses it to record the end-to-end duration (how long the message waited in the queue before processing). A naive datetime is ignored.
  • message_count — the number of messages processed in this invocation (int).

enqueued_at and message_count are supported by the manual start_flow and the context manager, but not by the decorator (a single decorated function has no per-call queue metadata), where they are ignored.

from datetime import datetime, timezone
import hud_sdk

with hud_sdk.sync_flow(
    "orders",
    flow_type=hud_sdk.FlowType.QUEUE,
    framework="sqs",
    enqueued_at=message.enqueued_at,  # a timezone-aware datetime
    message_count=1,
):
    process_order(message.payload)
from datetime import datetime, timezone
import hud_sdk

hud_sdk.start_flow(
    "orders",
    flow_type=hud_sdk.FlowType.QUEUE,
    framework="sqs",
    enqueued_at=message.enqueued_at,  # a timezone-aware datetime
    message_count=1,
)
try:
    process_order(message.payload)
finally:
    hud_sdk.end_flow()
import hud_sdk

# enqueued_at / message_count are not available on the decorator.
@hud_sdk.async_flow("orders", flow_type=hud_sdk.FlowType.QUEUE, framework="sqs")
async def handle_order(message):
    await process_order(message.payload)

Combining with context and failures

Manual flows work with the same helpers as auto-detected flows:

  • set_context — attach metadata (e.g. order_id, tenant) to the current flow's forensics.
  • set_failure — explicitly mark the current flow as failed even when no exception was raised.
import hud_sdk

@hud_sdk.sync_flow("process-batch")
def process_batch(batch_id):
    hud_sdk.set_context(batch_id=batch_id)

    result = run_batch(batch_id)
    if result.rejected:
        hud_sdk.set_failure("BatchRejected", reason=result.reason)
    return result

API reference

# Custom and queue flows — decorator or context manager (recommended)
hud_sdk.sync_flow(
    name: str,
    *,
    flow_type: hud_sdk.FlowType = hud_sdk.FlowType.CUSTOM,
    framework: str = "custom",
    enqueued_at: datetime | None = None,   # queue flows only; ignored on the decorator
    message_count: int | None = None,      # queue flows only; ignored on the decorator
)

hud_sdk.async_flow(...)  # same signature as sync_flow, for async code

# Custom and queue flows — manual pair
hud_sdk.start_flow(
    name: str,
    *,
    flow_type: hud_sdk.FlowType = hud_sdk.FlowType.CUSTOM,
    framework: str = "custom",
    enqueued_at: datetime | None = None,   # queue flows only
    message_count: int | None = None,      # queue flows only
) -> None
hud_sdk.end_flow(failure_reason: str | None = None) -> None

# Flow types
hud_sdk.FlowType.CUSTOM
hud_sdk.FlowType.QUEUE