Decorators Reference¶
@load¶
Marks a function that runs once at startup. The return value is passed to the task handler as context.
from runqy_task import load
@load
def setup():
"""Initialize resources (models, connections, etc.)"""
model = load_model("model.pt")
db = connect_to_database()
return {"model": model, "db": db}
Signature:
Returns: Any value that will be passed to @task handlers as the ctx parameter.
Use cases:
- Loading ML models
- Establishing database connections
- Reading configuration files
- Any expensive initialization that should happen once
@task¶
Marks a function that handles each incoming task.
from runqy_task import task
@task
def handle(payload: dict, ctx: dict) -> dict:
"""Process a single task."""
result = ctx["model"].predict(payload["input"])
return {"prediction": result}
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
payload |
dict | The task payload from Redis |
ctx |
Any | The return value from @load (or None if no @load is defined) |
Returns: A dictionary that will be serialized as the task result.
Error handling:
To signal that a task failed but should be retried, raise an exception. The worker will handle retry logic based on the task's max_retry setting.
run()¶
Enters the stdin/stdout loop for long-running mode.
This function:
- Calls the
@loadfunction (if defined) - Outputs
{"status": "ready"}to signal the worker - Reads JSON tasks from stdin
- Calls the
@taskfunction for each task - Outputs JSON responses to stdout
- Loops forever until the process is terminated
run_once()¶
Processes a single task for one-shot mode.
This function:
- Calls the
@loadfunction (if defined) - Reads a single JSON task from stdin
- Calls the
@taskfunction - Outputs the JSON response to stdout
- Exits
Protocol¶
Input Format (stdin)¶
Output Format (stdout)¶
Ready signal (after @load):
Task response:
Error response: