Skip to main content
A Task is a concrete, runnable data point: an environment plus a task id, arguments, slug, and metadata. Calling an @env.template() function returns a Task. A Taskset is a named, ordered collection of tasks.
from hud import Environment, Taskset
from hud.eval import Task

Authoring Tasks

@env.template() registers an async-generator task on an Environment. The returned callable is the authoring handle; call it with arguments to create a public Task.
env = Environment("letter-count")

@env.template()
async def count_letter(word: str = "strawberry", letter: str = "r"):
    answer = yield f"How many '{letter}'s are in '{word}'?"
    yield 1.0 if answer == str(word.count(letter)) else 0.0

task = count_letter(word="raspberry")  # -> hud.eval.Task

Task

Task is a Pydantic model — one portable, validated row of data:
FieldTypeDescription
envstrThe name of the environment it belongs to.
idstrThe task id registered on the environment.
argsdictBound arguments.
slugstr | NoneStable id for sync/filtering/registry.
columnsdict | NoneMetadata for filtering and leaderboards.
validationlist[dict] | NoneSync/platform metadata.
agent_configdict | NonePer-task agent overrides (e.g. {"max_steps": 50}). Applied during hosted execution.
The env on a task is a name, never a live object: it is the join key between the row and whatever placement can bring that environment up. Running a task never needs a live env in-process — the prompt and grade arrive over the wire from whatever substrate placement brought up.

Placement: where a task runs

Placement is decided at execution time with the runtime= parameter — a provider. A provider is called with the task row being placed and brings up one fresh substrate for it:
class Provider(Protocol):
    def __call__(self, task: Task, /) -> AbstractAsyncContextManager[Runtime]: ...
The contract is structural — a class holding real state (a platform session, an image cache, a warm pool) or a plain closure both qualify.
ProviderDescription
LocalRuntime(path)Serve the row’s env from a local .py source in a child process (the same serving path a container CMD runs). env= pins one explicitly.
DockerRuntime(image)docker run a fresh container per rollout from an image whose CMD serves the control channel (the scaffolded Dockerfile.hud). port= (default 8765) is the in-container port; run_args= passes extra docker run flags. The control port is the only one published.
Runtime(url)Attach to an already-served control channel (provisioned elsewhere; no lifecycle).
HUDRuntime()Lease the environment on HUD infra but keep the agent loop local; the SDK opens a tunnel and drives the remote control channel through a local Runtime (the default when runtime= is omitted).
HostedRuntime()Submit the whole rollout to the HUD platform so the agent runs remotely next to the env.
from hud import DockerRuntime, HUDRuntime, HostedRuntime, LocalRuntime, Runtime

job = await task.run(agent, runtime=LocalRuntime("env.py"))          # local subprocess
job = await task.run(agent, runtime=DockerRuntime("my-env:latest"))  # fresh container
job = await task.run(agent, runtime=Runtime("tcp://host:8765"))  # already served
job = await task.run(agent, runtime=HUDRuntime())  # local agent, cloud env
job = await task.run(agent, runtime=HostedRuntime())  # remote agent + cloud env
Because the provider sees the row, placement can vary per task — heavier substrates for heavier rows, no engine involvement:
def placer(task):
    gpus = 4 if task.args.get("big_model") else 1
    return my_cloud(image=f"hud/{task.env}", gpus=gpus)

job = await taskset.run(agent, runtime=placer)

Running a Task

task.run(agent, runtime=...) executes the task end to end — provision, agent, grade — and returns a Job holding the graded Runs. It is the single-task form of Taskset.run() with identical scheduling semantics (group=, max_concurrent=) and failure isolation (a crashed rollout comes back as a failed Run inside the job rather than raising). There are no standalone traces — every run reports under a job:
job = await count_letter(word="strawberry").run(agent, runtime=LocalRuntime("env.py"))
print(job.reward)           # mean reward across runs
print(job.runs[0].trace.content)
For manual control (custom drivers, no agent), compose the engine’s public pieces yourself — a provider, connect, and the Run lifecycle. Exiting the Run grades it; this path skips the trace reporting and failure isolation task.run() provides:
from hud import Run, connect

task = count_letter(word="strawberry")
async with LocalRuntime("env.py")(task) as runtime, connect(runtime) as client:
    async with Run(client, task.id, task.args) as run:
        run.trace.content = "3"  # your driver fills the trace
print(run.reward)                # graded on exit

Task Methods

MethodDescription
task.run(agent, runtime=..., group=..., max_concurrent=...)Schedule through the rollout engine (single-task Taskset.run); returns a Job.
task.default_slug()Stable slug from the task id and, when present, an args hash.
There is no bespoke serialization: the model is the row. task.model_dump() is the portable entry ({"env": name, "id": ..., "args": ...}) and Task.model_validate(data) rebuilds it — standard Pydantic.

Constructing Rows Directly

When you don’t have the task function in hand (data pipelines, generated tasksets), construct the model — fields and metadata are explicit:
from hud import Task

t = Task(env="letter-count", id="count_letter", args={"word": "strawberry"}, slug="count-straw")

Taskset

A named, ordered collection of tasks.
taskset = Taskset("letters", [
    count_letter(word="strawberry"),
    count_letter(word="raspberry"),
])

Sources

ConstructorDescription
Taskset(name, tasks)Wrap an iterable of Tasks.
Taskset.from_file(path)Load .py, directory, .json, or .jsonl sources.
Taskset.from_module(path)Load public Task or Taskset objects from Python source.
Taskset.from_api(name)Load a platform taskset by name or id.
taskset.to_file(path)Write .json or .jsonl (hud sync tasks --export adds CSV).

Collection Operations

OperationDescription
len(taskset) / iter(taskset)Count / iterate tasks.
taskset["slug"]Lookup by slug.
taskset.filter(slugs)Keep matching slugs.
taskset.exclude(slugs)Drop matching slugs.

Running

Taskset.run() expands each task group times, acquires a fresh substrate per rollout from the runtime= provider (called with that rollout’s task row, so one provider serves a mixed-env taskset), lets agent(run) fill the trace, grades on exit, and returns a Job.
job = await taskset.run(agent, runtime=LocalRuntime("env.py"), group=8, max_concurrent=10)
for run in job.runs:
    print(run.reward)
MethodDescription
await taskset.run(agent, runtime=None, group=1, max_concurrent=None, job=None)Run the taskset and return Job (pass an open job to accumulate into it).

Job

The platform receipt for one execution — there are no standalone traces, so every run (including a single task.run) reports under a job.
MemberTypeDescription
idstrHUD job id.
namestrDisplay name.
runslist[Run]Runs in expansion order.
groupintRuns per task.
rewardfloatMean reward across runs.
await Job.start(name, group=1)JobOpen a job spanning multiple scheduler calls (a training session); pass it as job= to accumulate.

Sync

hud.eval.sync.diff() compares local tasks to remote tasks and returns a SyncPlan.
from hud.eval.sync import diff

local = Taskset.from_file("tasks.py")
remote = Taskset.from_api("SheetBench-50")

plan = diff(local, remote)
print(plan.summary())
Type / methodDescription
SyncPlan.to_createLocal tasks not present remotely.
SyncPlan.to_updateLocal tasks whose signature differs.
SyncPlan.unchangedMatching tasks.
SyncPlan.remote_onlyRemote tasks not present locally.
Use hud sync tasks to upload a taskset to the platform.

See Also

Environment

Types: Run & Trace

Graders

Train on rewards