runqy-python SDK¶
The runqy-python SDK provides simple decorators for writing task handlers that run on runqy workers. It also includes a client for enqueuing tasks.
Overview¶
Task Handlers — The core of the SDK:
@loaddecorator: Define a function that runs once at startup (e.g., load ML models)@taskdecorator: Define a function that handles each incoming taskrun()function: Enter the stdin/stdout loop for long-running moderun_once()function: Process a single task and exit (one-shot mode)
Client — Also included for convenience:
RunqyClientclass: HTTP client for enqueuing tasksenqueue()function: Quick enqueue without creating a client instanceenqueue_batch()function: High-throughput batch enqueue (35,000+ jobs/s)
Installation¶
Or install from source:
Task Handler Example¶
from runqy_python import task, load, run
@load
def setup():
"""Runs once at startup. Return value is passed to task handler."""
model = load_my_model()
return {"model": model}
@task
def handle(payload: dict, ctx: dict) -> dict:
"""Handles each task. Receives payload and context from @load."""
model = ctx["model"]
result = model.predict(payload["input"])
return {"prediction": result}
if __name__ == "__main__":
run()
Client Example¶
The SDK also includes a client for enqueuing tasks from Python:
from runqy_python import RunqyClient
client = RunqyClient("http://localhost:3000", api_key="your-api-key")
# Enqueue a task
task = client.enqueue("inference.default", {"input": "hello"})
print(f"Task ID: {task.task_id}")
# Check result
result = client.get_task(task.task_id)
print(f"State: {result.state}, Result: {result.result}")
Batch Enqueue Example¶
For high-throughput scenarios, use enqueue_batch() to submit thousands of jobs per second:
from runqy_python import RunqyClient
client = RunqyClient("http://localhost:3000", api_key="your-api-key")
# Enqueue 1000 jobs in a single request
jobs = [{"input": f"job-{i}"} for i in range(1000)]
result = client.enqueue_batch("inference.default", jobs)
print(f"Enqueued: {result.enqueued}")
print(f"Failed: {result.failed}")
print(f"First 5 task IDs: {result.task_ids[:5]}")
Performance
Batch enqueue achieves ~35,000-50,000 jobs/s compared to ~800-1,000 jobs/s with individual enqueue() calls.
Source Code¶
runqy-python/
└── src/runqy_python/
├── __init__.py # Package exports
├── decorator.py # @task, @load decorators
├── runner.py # run(), run_once() functions
└── client.py # RunqyClient for enqueuing tasks