# runqy Documentation > Documentation for runqy - a distributed task queue system with server-driven bootstrap architecture # Getting Started ## Introduction # Introduction runqy is a distributed task queue system designed for machine learning inference and other workloads that require: - **Stateless workers** that receive configuration at startup - **Server-driven bootstrap** for centralized control - **Long-running Python processes** that stay warm between tasks ## Key Concepts ### Server-Driven Bootstrap Unlike traditional task queues where workers are pre-configured, runqy workers start with minimal configuration (just the server URL and API key). At startup, they: 1. Register with the server via `POST /worker/register` 2. Receive Redis credentials and deployment configuration 3. Clone the task code from a git repository 4. Set up a Python virtual environment 5. Start the Python process and wait for it to be ready ### One Worker = One Parent Queue = One Process Each worker instance processes tasks from a single parent queue using a single supervised Python process. Sub-queues of the same parent (e.g., `inference.high` and `inference.low`) share the same code deployment and runtime. **Queue configuration options:** - `queue: "inference"` — Listen on **all** sub-queues of `inference` - `queues: [inference.high, inference.low]` — Listen **only** on specific sub-queues This constraint ensures: - Predictable resource usage - Simple failure isolation - Easy horizontal scaling (add more workers for more capacity) - Efficient resource sharing across sub-queues with different priorities ### Long-Running vs One-Shot Modes runqy supports two execution modes: - **Long-running** (`long_running`): The Python process stays alive between tasks, ideal for ML inference where model loading is expensive - **One-shot** (`one_shot`): A new Python process is spawned for each task ## Next Steps - [Quick Start](quickstart.md) — Set up a local development environment - [Architecture](architecture.md) — Deep dive into how the components interact ## Installation # Installation This guide covers all methods for installing runqy server and worker. ## Quick Install (Recommended) The fastest way to get started is using the install script. === "Linux/macOS (Server)" ```bash curl -fsSL https://raw.githubusercontent.com/publikey/runqy/main/install.sh | sh ``` === "Windows (Server)" ```powershell iwr https://raw.githubusercontent.com/publikey/runqy/main/install.ps1 -useb | iex ``` === "Linux/macOS (Worker)" ```bash curl -fsSL https://raw.githubusercontent.com/publikey/runqy-worker/main/install.sh | sh ``` === "Windows (Worker)" ```powershell iwr https://raw.githubusercontent.com/publikey/runqy-worker/main/install.ps1 -useb | iex ``` !!! tip "Verify installation" After installation, verify with: ```bash runqy --version runqy-worker --version ``` ## Docker Compose Quickstart The fastest way to run the full stack without cloning the repo: ```bash # Download the quickstart compose file curl -O https://raw.githubusercontent.com/Publikey/runqy/main/docker-compose.quickstart.yml # Start all services docker-compose -f docker-compose.quickstart.yml up -d # View logs docker-compose -f docker-compose.quickstart.yml logs -f ``` ## Docker Compose (Full Repo) For a complete local environment with source code access: ```bash # Clone the monorepo git clone https://github.com/publikey/runqy.git cd runqy # Start all services docker-compose up -d # View logs docker-compose logs -f ``` This starts: | Service | Port | Description | |---------|------|-------------| | `runqy-server` | 3000 | API server and dashboard | | `runqy-worker` | - | Task processor | | `redis` | 6379 | Task queue backend | | `postgres` | 5432 | Configuration storage | Access the dashboard at [http://localhost:3000/monitoring/](http://localhost:3000/monitoring/). ### Docker Images Pre-built images are available on GitHub Container Registry: ```bash # Server docker pull ghcr.io/publikey/runqy:latest # Worker (minimal - Alpine-based) docker pull ghcr.io/publikey/runqy-worker:latest # Worker (inference - PyTorch + CUDA) docker pull ghcr.io/publikey/runqy-worker:inference ``` #### Server Image ```bash docker run -d \ -p 3000:3000 \ -e REDIS_HOST=your-redis-host \ -e REDIS_PASSWORD=your-redis-password \ -e RUNQY_API_KEY=your-api-key \ ghcr.io/publikey/runqy:latest serve --sqlite ``` #### Worker Images **Minimal (default)** — Lightweight Alpine-based image. You install your own runtime. - Image: `ghcr.io/publikey/runqy-worker:latest` or `:minimal` - Base: Alpine 3.19 with git, curl, ca-certificates - Platforms: linux/amd64, linux/arm64 ```bash docker run -v $(pwd)/config.yml:/app/config.yml ghcr.io/publikey/runqy-worker:latest ``` **Inference** — Pre-configured for ML workloads with PyTorch and CUDA. - Image: `ghcr.io/publikey/runqy-worker:inference` - Base: PyTorch 2.1.0 + CUDA 11.8 - Platform: linux/amd64 only - Includes: Python 3, pip, PyTorch, CUDA runtime ```bash docker run --gpus all -v $(pwd)/config.yml:/app/config.yml ghcr.io/publikey/runqy-worker:inference ``` #### Worker Image Tags | Tag | Description | |-----|-------------| | `latest`, `minimal` | Minimal Alpine image (multi-arch) | | `inference` | PyTorch + CUDA image (amd64 only) | | `` | Specific version, minimal base | | `-minimal` | Specific version, minimal base | | `-inference` | Specific version, inference base | ## Manual Binary Download Download pre-built binaries from GitHub Releases: - **Server**: [github.com/publikey/runqy/releases](https://github.com/publikey/runqy/releases) - **Worker**: [github.com/publikey/runqy-worker/releases](https://github.com/publikey/runqy-worker/releases) ### Linux (amd64) ```bash # Server curl -LO https://github.com/publikey/runqy/releases/latest/download/runqy_latest_linux_amd64.tar.gz tar xzf runqy_latest_linux_amd64.tar.gz sudo mv runqy /usr/local/bin/ # Worker curl -LO https://github.com/publikey/runqy-worker/releases/latest/download/runqy-worker_latest_linux_amd64.tar.gz tar xzf runqy-worker_latest_linux_amd64.tar.gz sudo mv runqy-worker /usr/local/bin/ ``` ### macOS (Apple Silicon) ```bash # Server curl -LO https://github.com/publikey/runqy/releases/latest/download/runqy_latest_darwin_arm64.tar.gz tar xzf runqy_latest_darwin_arm64.tar.gz sudo mv runqy /usr/local/bin/ # Worker curl -LO https://github.com/publikey/runqy-worker/releases/latest/download/runqy-worker_latest_darwin_arm64.tar.gz tar xzf runqy-worker_latest_darwin_arm64.tar.gz sudo mv runqy-worker /usr/local/bin/ ``` ### Windows (amd64) ```powershell # Server Invoke-WebRequest -Uri https://github.com/publikey/runqy/releases/latest/download/runqy_latest_windows_amd64.zip -OutFile runqy.zip Expand-Archive runqy.zip -DestinationPath . # Worker Invoke-WebRequest -Uri https://github.com/publikey/runqy-worker/releases/latest/download/runqy-worker_latest_windows_amd64.zip -OutFile runqy-worker.zip Expand-Archive runqy-worker.zip -DestinationPath . ``` Add the extracted directory to your system PATH. ### Available Archives **Server (`runqy`):** - `runqy__linux_amd64.tar.gz` - `runqy__linux_arm64.tar.gz` - `runqy__darwin_amd64.tar.gz` - `runqy__darwin_arm64.tar.gz` - `runqy__windows_amd64.zip` - `runqy__windows_arm64.zip` **Worker (`runqy-worker`):** - `runqy-worker__linux_amd64.tar.gz` - `runqy-worker__linux_arm64.tar.gz` - `runqy-worker__darwin_amd64.tar.gz` - `runqy-worker__darwin_arm64.tar.gz` - `runqy-worker__windows_amd64.zip` - `runqy-worker__windows_arm64.zip` ## From Source Build from source if you need the latest development version or want to modify the code. ### Prerequisites - Go 1.24+ - Git ### Server ```bash git clone https://github.com/publikey/runqy.git cd runqy/app go build -o runqy . # Optional: Install to PATH sudo mv runqy /usr/local/bin/ ``` ### Worker ```bash git clone https://github.com/publikey/runqy-worker.git cd runqy-worker go build -o runqy-worker ./cmd/worker # Optional: Install to PATH sudo mv runqy-worker /usr/local/bin/ ``` ### Build with Version Info ```bash # Server go build -ldflags "-X main.Version=1.0.0 -X main.Commit=$(git rev-parse HEAD)" -o runqy . # Worker go build -ldflags "-X main.Version=1.0.0 -X main.Commit=$(git rev-parse HEAD)" -o runqy-worker ./cmd/worker ``` ## Supported Platforms | Platform | Architecture | Server | Worker | |----------|-------------|--------|--------| | Linux | amd64 | ✅ | ✅ | | Linux | arm64 | ✅ | ✅ | | macOS | amd64 | ✅ | ✅ | | macOS | arm64 (Apple Silicon) | ✅ | ✅ | | Windows | amd64 | ✅ | ✅ | | Windows | arm64 | ✅ | ✅ | ## Environment Variables After installation, configure these environment variables before starting the server: | Variable | Required | Description | |----------|----------|-------------| | `REDIS_HOST` | Yes | Redis hostname | | `REDIS_PORT` | No | Redis port (default: 6379) | | `REDIS_PASSWORD` | Yes | Redis password (can be empty) | | `REDIS_TLS` | No | Enable TLS for Redis (default: false) | | `RUNQY_API_KEY` | Yes | API key for authentication | | `DATABASE_HOST` | No | PostgreSQL host (default: localhost, use `--sqlite` for dev) | | `DATABASE_PORT` | No | PostgreSQL port (default: 5432) | | `DATABASE_USER` | No | PostgreSQL username (default: postgres) | | `DATABASE_PASSWORD` | No | PostgreSQL password | | `DATABASE_DBNAME` | No | PostgreSQL database name (default: sdxl_queuing_dev) | | `DATABASE_SSL` | No | PostgreSQL SSL mode (default: disable) | See [Configuration](../server/configuration.md) for the full reference. ## Next Steps Once installed: 1. [Quick Start](quickstart.md) — Run through the complete setup tutorial 2. [CLI Reference](../server/cli.md) — Learn the available commands 3. [Python SDK](../python-sdk/index.md) — Write your first task handler ## Quick Start # Quick Start This guide walks you through setting up a local runqy environment with working examples. ## Prerequisites - Docker (for Redis, or use Docker Compose for full stack) - **One of:** - Pre-built binaries (recommended) - Go 1.24+ (to build from source) !!! note "No database required for development" SQLite is embedded in the server for development. PostgreSQL is only needed for production. ## Choose Your Installation Method === "Quick Install (Recommended)" The fastest way to get started: **Linux/macOS:** ```bash # Install server curl -fsSL https://raw.githubusercontent.com/publikey/runqy/main/install.sh | sh # Install worker curl -fsSL https://raw.githubusercontent.com/publikey/runqy-worker/main/install.sh | sh ``` **Windows (PowerShell):** ```powershell # Install server iwr https://raw.githubusercontent.com/publikey/runqy/main/install.ps1 -useb | iex # Install worker iwr https://raw.githubusercontent.com/publikey/runqy-worker/main/install.ps1 -useb | iex ``` === "Docker Compose Quickstart (Recommended)" Run the full stack without cloning the repo: ```bash curl -O https://raw.githubusercontent.com/Publikey/runqy/main/docker-compose.quickstart.yml docker-compose -f docker-compose.quickstart.yml up -d ``` This starts Redis, PostgreSQL, server, and worker. Skip to [Step 5](#5-enqueue-a-task). === "Docker Compose (Full Repo)" Clone the repo and run the full stack: ```bash git clone https://github.com/Publikey/runqy.git cd runqy docker-compose up -d ``` This starts Redis, PostgreSQL, server, and worker. Skip to [Step 5](#5-enqueue-a-task). === "From Source" Build from source: ```bash git clone https://github.com/Publikey/runqy.git git clone https://github.com/Publikey/runqy-worker.git # Build server cd runqy/app && go build -o runqy . # Build worker cd ../runqy-worker && go build -o runqy-worker ./cmd/worker ``` For more installation options, see the [Installation Guide](installation.md). ## 1. Start Redis ```bash docker run -d --name redis -p 6379:6379 redis:7-alpine ``` !!! warning "Redis 8.x is not supported" runqy uses [asynq](https://github.com/hibiken/asynq) which relies on Lua scripts that are incompatible with Redis 8.x. Use **Redis 7.x** (`redis:7-alpine`). The Docker Compose files already pin this version. ## 2. Start the Server === "Pre-built Binary" === "Linux/Mac" ```bash export REDIS_HOST=localhost export REDIS_PORT=6379 export REDIS_PASSWORD="" export RUNQY_API_KEY=dev-api-key runqy serve --sqlite ``` === "Windows (PowerShell)" ```powershell $env:REDIS_HOST = "localhost" $env:REDIS_PORT = "6379" $env:REDIS_PASSWORD = "" $env:RUNQY_API_KEY = "dev-api-key" runqy serve --sqlite ``` === "From Source" === "Linux/Mac" ```bash cd runqy/app export REDIS_HOST=localhost export REDIS_PORT=6379 export REDIS_PASSWORD="" export RUNQY_API_KEY=dev-api-key go run . serve --sqlite ``` === "Windows (PowerShell)" ```powershell cd runqy/app $env:REDIS_HOST = "localhost" $env:REDIS_PORT = "6379" $env:REDIS_PASSWORD = "" $env:RUNQY_API_KEY = "dev-api-key" go run . serve --sqlite ``` The server starts on port 3000 by default. !!! info "API Authentication" The server reads the API key from the `RUNQY_API_KEY` environment variable. HTTP clients (curl, SDKs) must send it as an `Authorization: Bearer {key}` header. ## 3. Deploy the Example Queues In a new terminal: === "Pre-built Binary" === "Linux/Mac" ```bash # Download example config curl -fsSL https://raw.githubusercontent.com/Publikey/runqy/main/examples/quickstart.yaml -o quickstart.yaml runqy login -s http://localhost:3000 -k dev-api-key runqy config create -f quickstart.yaml ``` === "Windows (PowerShell)" ```powershell # Download example config Invoke-WebRequest -Uri "https://raw.githubusercontent.com/Publikey/runqy/main/examples/quickstart.yaml" -OutFile "quickstart.yaml" runqy login -s http://localhost:3000 -k dev-api-key runqy config create -f quickstart.yaml ``` === "From Source" === "Linux/Mac" ```bash cd runqy/app go build -o runqy . ./runqy login -s http://localhost:3000 -k dev-api-key config create -f ../examples/quickstart.yaml ``` === "Windows (PowerShell)" ```powershell cd runqy/app go build -o runqy.exe . .\runqy.exe login -s http://localhost:3000 -k dev-api-key .\runqy.exe config create -f ..\examples\quickstart.yaml ``` This deploys two example queues: | Queue | Mode | Description | |-------|------|-------------| | `quickstart-oneshot` | one_shot | Spawns a new Python process per task | | `quickstart-longrunning` | long_running | Keeps Python process alive between tasks | !!! note "Sub-queue naming: the `.default` suffix" When a queue has no explicit sub-queues defined, runqy automatically appends `.default`. So `quickstart-oneshot` becomes `quickstart-oneshot.default` at runtime. You'll see this suffix in worker logs, Redis keys, and API responses. When enqueueing, you can use either the short name (`quickstart-oneshot`) or the full name (`quickstart-oneshot.default`). ## 4. Start a Worker In a new terminal: === "Pre-built Binary" === "Linux/Mac" ```bash # Download example config curl -fsSL https://raw.githubusercontent.com/publikey/runqy-worker/main/config.yml.example -o config.yml # Start worker runqy-worker -config config.yml ``` === "Windows (PowerShell)" ```powershell # Download example config Invoke-WebRequest -Uri "https://raw.githubusercontent.com/publikey/runqy-worker/main/config.yml.example" -OutFile "config.yml" # Start worker runqy-worker -config config.yml ``` === "From Source" === "Linux/Mac" ```bash cd runqy-worker cp config.yml.example config.yml go run ./cmd/worker ``` === "Windows (PowerShell)" ```powershell cd runqy-worker Copy-Item config.yml.example config.yml go run ./cmd/worker ``` The downloaded config is pre-configured for the quickstart (no changes needed): ```yaml server: url: "http://localhost:3000" api_key: "dev-api-key" worker: queues: - "quickstart-oneshot.default" - "quickstart-longrunning.default" ``` The worker will: 1. Register with the server 2. Clone the example task code from the runqy repo 3. Set up a Python virtual environment 4. Start the Python process 5. The worker is now ready to process tasks ## 5. Enqueue a Task In a new terminal: === "Linux/Mac" ```bash curl -X POST http://localhost:3000/queue/add \ -H "Authorization: Bearer dev-api-key" \ -H "Content-Type: application/json" \ -d '{ "queue": "quickstart-oneshot.default", "timeout": 60, "data": {"operation": "uppercase", "data": "hello world"} }' ``` === "Windows (PowerShell)" ```powershell curl.exe -X POST http://localhost:3000/queue/add ` -H "Authorization: Bearer dev-api-key" ` -H "Content-Type: application/json" ` -d '{\"queue\": \"quickstart-oneshot.default\", \"timeout\": 60, \"data\": {\"operation\": \"uppercase\", \"data\": \"hello world\"}}' ``` Response: ```json { "info": { "id": "abc123...", "state": "pending", "queue": "quickstart-oneshot.default", ... }, "data": {...} } ``` !!! tip "Task ID" Use the `id` from the response to check the result in the next step. !!! note "Queue name shorthand" You can omit the `.default` suffix when enqueueing. For example, `quickstart-oneshot` automatically resolves to `quickstart-oneshot.default`. ??? info "Request format: nested vs flat" The `/queue/add` endpoint accepts two formats: **Nested format** (wraps payload in `"data"`): ```json {"queue": "myqueue", "timeout": 60, "data": {"operation": "uppercase", "data": "hello"}} ``` **Flat format** (all extra fields become the payload): ```json {"queue": "myqueue", "timeout": 60, "operation": "uppercase", "data": "hello"} ``` Both are equivalent. In the nested format, the `"data"` wrapper contains your task payload as-is. Note that in the quickstart example, the inner `"data"` key is a task input field (not the wrapper) — this is valid but can look confusing at first glance. ### Try Long-Running Mode To try long-running mode, just enqueue to `quickstart-longrunning.default` — the worker already listens on both queues. ## 6. Check the Result Replace `{id}` with the task ID from the previous step: === "Linux/Mac" ```bash curl http://localhost:3000/queue/{id} ``` === "Windows (PowerShell)" ```powershell curl.exe http://localhost:3000/queue/{id} ``` Response: ```json { "info": { "state": "completed", "queue": "quickstart-oneshot.default", "result": {"result": "HELLO WORLD"} } } ``` ## 7. Monitor Visit [http://localhost:3000/monitoring/](http://localhost:3000/monitoring/) to see the web dashboard. ## Example Task Code The quickstart uses example tasks from `runqy/examples/`: === "One-Shot Mode" ```python title="examples/quickstart-oneshot/main.py" from runqy_python import task, run_once @task def process(payload: dict) -> dict: operation = payload.get("operation", "echo") data = payload.get("data") if operation == "echo": return {"result": data} elif operation == "uppercase": return {"result": data.upper() if isinstance(data, str) else data} elif operation == "double": return {"result": data * 2 if isinstance(data, (int, float)) else data} else: return {"error": f"Unknown operation: {operation}"} if __name__ == "__main__": run_once() ``` === "Long-Running Mode" ```python title="examples/quickstart-longrunning/main.py" from runqy_python import task, load, run @load def setup(): # Initialize resources once at startup return {"processor": SimpleProcessor()} @task def process(payload: dict, ctx: dict) -> dict: processor = ctx["processor"] operation = payload.get("operation", "echo") data = payload.get("data") result = processor.process(operation, data) return {"result": result, "calls": processor.call_count} if __name__ == "__main__": run() ``` ## Next Steps - [Architecture Overview](architecture.md) — Understand how the components work together - [Python SDK](../python-sdk/index.md) — Write your own task handlers - [Configuration](../server/configuration.md) — Configure the server for production ## Architecture # Architecture runqy uses a server-driven bootstrap architecture where workers are stateless and receive all configuration from a central server. ## System Overview ``` ┌─────────────────────┐ ┌─────────────────┐ ┌───────────────┐ │ runqy-server │ │ Redis │ │ Clients │ │ │───→│ │←───│ (enqueue) │ │ POST /worker/ │ │ - Task queues │ │ │ │ register │ │ - Worker state │ │ │ └─────────────────────┘ │ - Results │ └───────────────┘ │ └─────────────────┘ │ config + ↑ │ deployment │ dequeue/heartbeat ↓ │ ┌──────────────────────────────────────────────────────────────────┐ │ runqy-worker (Go) │ │ │ │ Bootstrap: │ │ 1. POST /worker/register → receive Redis creds + git repo │ │ 2. git clone deployment code │ │ 3. Create virtualenv, pip install │ │ 4. Spawn Python process (startup_cmd) │ │ 5. Wait for {"status": "ready"} on stdout │ │ │ │ Run loop: │ │ 1. Dequeue task from Redis │ │ 2. Send JSON to Python via stdin │ │ 3. Read JSON response from stdout │ │ 4. Write result back to Redis │ └──────────────────────────────────────────────────────────────────┘ │ │ stdin/stdout JSON ↓ ┌──────────────────────────────────────────────────────────────────┐ │ Python Task (runqy-python SDK) │ │ │ │ @load → runs once at startup, returns ctx (e.g., ML model) │ │ @task → handles each task, receives payload + ctx │ │ run() → enters stdin/stdout loop │ └──────────────────────────────────────────────────────────────────┘ ``` ## Component Responsibilities ### runqy-server The server is the central control plane: - **Worker registration**: Authenticates workers and provides them with Redis credentials and deployment configuration - **Queue configuration**: Defines which queues exist and how tasks should be routed - **Deployment specs**: Specifies which git repository, branch, and startup command each queue uses ### runqy-worker The worker is the execution engine: - **Bootstrap**: Fetches configuration from the server, deploys code, sets up the Python environment - **Task processing**: Dequeues tasks from Redis, forwards them to the Python process - **Process supervision**: Monitors the Python process health, reports status via heartbeat - **Result handling**: Writes task results back to Redis ### runqy-python (Python SDK) The SDK provides a simple interface for writing task handlers: - **`@load` decorator**: Marks a function that runs once at startup (e.g., loading an ML model) - **`@task` decorator**: Marks a function that handles each task - **`run()` function**: Enters the stdin/stdout loop for long-running mode - **`run_once()` function**: Processes a single task for one-shot mode ## Queue Organization ### Sub-Queues and Priority runqy supports sub-queues to handle different priority levels for the same task type. This is particularly useful when different user tiers or applications need the same processing but with different service levels. **Example: Paid vs Free Users** ``` inference.premium (priority: 10) ─┐ ├─→ Same task code (same deployment) inference.standard (priority: 3) ─┘ ``` Both sub-queues run identical task code, but workers prioritize `inference.premium` tasks. Your API routes users based on their tier: - Paid user request → enqueue to `inference.premium` - Free user request → enqueue to `inference.standard` When workers have capacity, they process higher-priority sub-queues first, ensuring paid users experience lower latency. ### Queue Naming Queues use the format `{parent}.{sub_queue}`: - `inference.premium` — High priority - `inference.standard` — Standard priority - `simple.default` — Default (when no sub-queue specified) When you reference a queue without a sub-queue suffix (e.g., `inference`), runqy automatically appends `.default` (→ `inference.default`). ## Communication Protocol ### Worker ↔ Python Process Communication uses JSON over stdin/stdout: **Ready signal** (Python → Worker, after `@load` completes): ```json {"status": "ready"} ``` **Task input** (Worker → Python): ```json {"task_id": "abc123", "payload": {"msg": "hello"}} ``` **Task response** (Python → Worker): ```json {"task_id": "abc123", "result": {...}, "error": null, "retry": false} ``` ### Redis Key Format runqy uses an asynq-compatible key format: | Key | Purpose | |-----|---------| | `asynq:{queue}:pending` | List of pending task IDs | | `asynq:{queue}:active` | List of currently processing task IDs | | `asynq:t:{task_id}` | Hash containing task data | | `asynq:result:{task_id}` | Task result string | | `asynq:workers:{worker_id}` | Worker heartbeat hash | ## Failure Handling ### Python Process Crash If the supervised Python process crashes: 1. Worker detects the crash via process monitor 2. All in-flight tasks fail immediately (no retry) 3. Worker updates heartbeat with `healthy: false` 4. Worker continues running but won't process new tasks 5. Manual restart is required (no auto-recovery) ### Task Failure If a task returns an error: 1. Worker writes the error to Redis 2. If `retry: true` in response, task may be retried (up to `max_retry`) 3. Otherwise, task is marked as failed # Server ## Overview # runqy Server The runqy server (`runqy/app`) is the central control plane that manages worker registration, queue configuration, and provides monitoring capabilities. ## Overview The server provides: - **Worker registration endpoint**: `POST /worker/register` - **Queue configuration**: Defines available queues and their deployment specs - **Redis credential distribution**: Securely provides Redis connection info to workers - **REST API**: Full API for managing queues, workers, and tasks - **CLI**: Comprehensive command-line interface for local and remote management - **Web dashboard**: Monitoring interface for task queues and workers - **Swagger documentation**: Interactive API documentation ## CLI The runqy server includes a powerful CLI for managing your task queue system. The CLI can operate in two modes: - **Local mode**: Direct access to Redis and PostgreSQL for fast operations - **Remote mode**: Connect to any runqy server via HTTP API with authentication ```bash # Local operations runqy queue list runqy task enqueue -q inference.high -p '{"msg":"hello"}' runqy worker list # Remote operations (with saved credentials) runqy login -s https://production.example.com:3000 -k your-api-key runqy queue list # Now operates on remote server ``` See the [CLI Reference](cli.md) for complete documentation. ## Installation ```bash git clone https://github.com/Publikey/runqy.git cd runqy ``` ## Configuration The server uses environment variables for configuration. Create a `.env.secret` file: ```bash cp .env.secret.sample .env.secret ``` | Variable | Required | Default | Description | |----------|----------|---------|-------------| | `PORT` | No | 3000 | HTTP server port | | `REDIS_HOST` | Yes | localhost | Redis hostname | | `REDIS_PORT` | No | 6379 | Redis port | | `REDIS_PASSWORD` | Yes | - | Redis password | | `REDIS_TLS` | No | false | Enable TLS for Redis connection | | `DATABASE_HOST` | No | localhost | PostgreSQL hostname | | `DATABASE_PORT` | No | 5432 | PostgreSQL port | | `DATABASE_USER` | No | postgres | PostgreSQL username | | `DATABASE_PASSWORD` | Yes | - | PostgreSQL password | | `DATABASE_DBNAME` | No | sdxl_queuing_dev | PostgreSQL database name | | `DATABASE_SSL` | No | disable | PostgreSQL SSL mode | | `RUNQY_API_KEY` | Yes | - | API key for authenticated endpoints | ## Running ```bash cd app && go run . ``` Or build and run: ```bash cd app && go build -o runqy && ./runqy ``` ## Source Code The server is implemented in Go with a modular architecture: ``` runqy/app/ ├── main.go # Entry point ├── api/ # REST API handlers ├── models/ # Data models ├── monitoring/ # Web dashboard └── config/ # Configuration loading ``` Queue worker configurations are stored in `runqy/deployment/` as YAML files or in PostgreSQL. ## Related - [Configuration Reference](configuration.md) - [API Reference](api.md) ## Configuration # Server Configuration The server is configured via **environment variables** and **CLI flags**. CLI flags take priority over environment variables, which take priority over defaults. ## Configuration Methods ### 1. Environment Variables (`.env.secret` file) Create a `.env.secret` file in the parent directory of `app/`: ```bash # .env.secret RUNQY_API_KEY=your-secret-api-key REDIS_HOST=localhost REDIS_PORT=6379 REDIS_PASSWORD=your-redis-password DATABASE_HOST=localhost DATABASE_PORT=5432 DATABASE_USER=postgres DATABASE_PASSWORD=your-db-password DATABASE_DBNAME=sdxl_queuing_dev ``` ### 2. CLI Flags ```bash runqy serve --port 8080 --api-key my-key --redis-uri redis://:password@localhost:6379 runqy serve --db-host pg.example.com --db-name mydb --db-user admin runqy serve --sqlite --sqlite-path ./data/runqy.db ``` ### 3. Priority Order ``` CLI flags > Environment variables > Defaults ``` ## Environment Variables Reference ### Server | Variable | Default | CLI Flag | Description | |----------|---------|----------|-------------| | `PORT` | `3000` | `--port` | HTTP server port | | `RUNQY_API_KEY` | — | `--api-key` (global) | API key for authenticated endpoints | ### Redis | Variable | Default | CLI Flag | Description | |----------|---------|----------|-------------| | `REDIS_HOST` | `localhost` | — | Redis hostname | | `REDIS_PORT` | `6379` | — | Redis port | | `REDIS_PASSWORD` | — | — | Redis password | | `REDIS_TLS` | `false` | `--redis-tls` | Enable TLS for Redis connection | | — | — | `--redis-uri` (global) | Redis URI (overrides host/port/password/tls). Format: `redis[s]://[:password@]host[:port]` | ### PostgreSQL | Variable | Default | CLI Flag | Description | |----------|---------|----------|-------------| | `DATABASE_HOST` | `localhost` | `--db-host` | PostgreSQL hostname | | `DATABASE_PORT` | `5432` | `--db-port` | PostgreSQL port | | `DATABASE_USER` | `postgres` | `--db-user` | PostgreSQL username | | `DATABASE_PASSWORD` | — | `--db-password` | PostgreSQL password | | `DATABASE_DBNAME` | `sdxl_queuing_dev` | `--db-name` | PostgreSQL database name | | `DATABASE_SSL` | `disable` | `--db-ssl` | PostgreSQL SSL mode | ### SQLite (alternative to PostgreSQL) | Variable | Default | CLI Flag | Description | |----------|---------|----------|-------------| | `SQLITE_DB_PATH` | `runqy.db` | `--sqlite-path` | SQLite database file path | | — | — | `--sqlite` | Use SQLite instead of PostgreSQL | ### Monitoring & Security | Variable | Default | CLI Flag | Description | |----------|---------|----------|-------------| | `ASYNQ_READ_ONLY` | `false` | — | Restrict monitoring UI to read-only mode | | `PROMETHEUS_ADDRESS` | — | — | Prometheus server URL for time-series charts | | `RUNQY_JWT_SECRET` | (auto-generated) | — | JWT secret for dashboard authentication. If not set, a random secret is generated on each restart (sessions won't survive restarts) | | `RUNQY_VAULT_MASTER_KEY` | — | — | Base64-encoded 32-byte key for vault encryption. If not set, the vaults feature is disabled | ### Additional Server Flags | Flag | Description | |------|-------------| | `--config ` | Path to queue workers config directory (overrides `QUEUE_WORKERS_DIR`) | | `--watch` | Enable file/git watching for config auto-reload | | `--config-repo ` | GitHub repo URL for configs | | `--config-branch ` | Git branch (default: main) | | `--no-ui` | Disable the monitoring web dashboard | | `--debug` | Enable verbose logging | ## Queue Worker Definitions (YAML) !!! note "Optional" YAML-based queue definitions are optional. In most cases, queues are created and managed after startup using the CLI (`runqy config create`) or the monitoring dashboard GUI. YAML files in the `deployment/` directory (or a custom directory specified with `--config`) allow you to pre-configure queues so that a fresh server starts with all queues already set up — useful for automated deployments or spinning up new environments. ```yaml queues: inference: priority: 6 mode: long_running # or one_shot deployment: git_url: "https://github.com/example/repo.git" branch: "main" startup_cmd: "python main.py" startup_timeout_secs: 300 requirements_file: "requirements.txt" # Optional simple: priority: 3 mode: one_shot deployment: git_url: "https://github.com/example/simple-tasks.git" branch: "main" startup_cmd: "python task.py" startup_timeout_secs: 60 ``` ### Queue Options | Option | Type | Required | Description | |--------|------|----------|-------------| | `priority` | int | Yes | Queue priority (higher = more important) | | `mode` | string | No | Execution mode: `long_running` (default) or `one_shot` | | `deployment` | object | Yes | Deployment configuration | ### Deployment Options | Option | Type | Required | Description | |--------|------|----------|-------------| | `git_url` | string | Yes | Git repository URL for task code | | `branch` | string | Yes | Git branch to clone | | `code_path` | string | No | Path of the task code within the repository (e.g., `simple-task`) | | `startup_cmd` | string | Yes | Command to start the Python process | | `startup_timeout_secs` | int | Yes | Timeout for process startup | | `requirements_file` | string | No | Path to requirements.txt (default: `requirements.txt`) | | `vaults` | list | No | List of vault names to inject as environment variables | | `redis_storage` | bool | No | If `true`, task results are written to Redis. If `false`, results are not stored in Redis and must be managed by the task itself (e.g., via webhook). Default: `false` | | `git_token` | string | No | Path to your github PAT's. Use the following format `vault://{VAULT-NAME}/{KEY}` (e.g., `vault://github_pats/SIMPLE_TASK`) | ## Sub-Queues Sub-queues allow you to assign different priorities to the same task type based on the caller's tier or urgency. This is useful when multiple clients submit identical tasks but require different service levels. ### Use Case: Paid vs Free Users A common pattern is routing tasks from paid users to a high-priority sub-queue while free users go to a lower-priority sub-queue: ```yaml queues: inference: sub_queues: - name: premium # Paid users — processed first priority: 10 - name: standard # Free users — processed when premium queue is empty priority: 3 deployment: git_url: "https://github.com/example/inference.git" branch: "main" startup_cmd: "python main.py" startup_timeout_secs: 300 ``` Your API backend then routes tasks based on user tier: ```python # In your API backend if user.is_premium: client.enqueue("inference.premium", payload) else: client.enqueue("inference.standard", payload) ``` Both sub-queues run the **same task code** (same deployment), but workers prioritize `inference.premium` tasks over `inference.standard` tasks. ### Naming Format Queues use the format `{parent}.{sub_queue}`: - `inference.premium` — High priority inference tasks - `inference.standard` — Standard priority inference tasks - `simple.default` — Default simple tasks Workers register for a parent queue and can process tasks from any of its sub-queues based on priority. ### Queue Creation Behavior When you define a queue in the configuration, the sub-queues created depend on whether you explicitly define them: | Configuration | Sub-queues created | |---------------|-------------------| | No `sub_queues` defined | `.default` is created automatically | | `sub_queues` explicitly defined | Only the listed sub-queues are created (NO `.default`) | **Example 1: No sub-queues → `.default` is created** ```yaml queues: myqueue: deployment: ... # Creates: myqueue.default ``` **Example 2: Explicit sub-queues → only those are created** ```yaml queues: myqueue: sub_queues: - name: high priority: 10 - name: low priority: 3 deployment: ... # Creates: myqueue.high, myqueue.low # Does NOT create: myqueue.default ``` !!! warning "Fallback to `.default` may fail" If you reference `myqueue` (without sub-queue suffix) and `.default` doesn't exist (because you defined explicit sub-queues), the operation will fail. Always use the full queue name when sub-queues are explicitly defined. ### Automatic Default Fallback When a queue name is referenced without a sub-queue suffix, runqy automatically appends `.default`: | You provide | Resolves to | |-------------|-------------| | `inference` | `inference.default` | | `simple` | `simple.default` | | `inference.high` | `inference.high` (unchanged) | This applies to: - **Workers:** When a worker registers for queue `inference`, it actually registers for `inference.default` - **Task enqueueing:** When you enqueue a task to `inference`, it goes to `inference.default` - **Task retrieval:** When you query queue `inference`, it looks up `inference.default` !!! warning "Queue must exist" If the resolved queue (e.g., `inference.default`) doesn't exist in the configuration, an error is returned. The fallback only adds the `.default` suffix—it doesn't create the queue automatically. ## Monitoring ### Environment Variables | Variable | Required | Description | |----------|----------|-------------| | `PROMETHEUS_ADDRESS` | No | Prometheus server URL (e.g., `http://localhost:9090`). Enables time-series charts in the dashboard. If not set, the dashboard uses Redis-based historical data. | The server always exposes Prometheus metrics at `/metrics` regardless of this setting. See the [Monitoring Guide](../guides/monitoring.md) for full setup instructions including dashboard authentication. ## Vaults Vaults provide secure storage for secrets that are injected into workers as environment variables. ### Environment Variables | Variable | Required | Description | |----------|----------|-------------| | `RUNQY_VAULT_MASTER_KEY` | No | Base64-encoded 32-byte key for encrypting vault entries. If not set, the vaults feature is disabled. | ### Generating a Master Key ```bash # Generate a secure 256-bit key openssl rand -base64 32 # Output: YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoxMjM0NTY= # Set as environment variable export RUNQY_VAULT_MASTER_KEY="YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoxMjM0NTY=" ``` ### Using Vaults in Queue Configuration ```yaml queues: inference: priority: 6 deployment: git_url: "https://github.com/example/repo.git" branch: "main" startup_cmd: "python main.py" startup_timeout_secs: 300 vaults: - api-keys - model-credentials ``` When a worker registers for this queue, all entries from the specified vaults are decrypted and sent to the worker. The worker then injects them as environment variables before starting the Python process. ### Accessing Secrets in Python ```python import os # Access vault entries as environment variables openai_key = os.environ.get("OPENAI_API_KEY") hf_token = os.environ.get("HF_TOKEN") ``` !!! warning "Security Note" Vault entries are transmitted to workers during bootstrap. Ensure workers are running in a trusted environment and the connection to the server is secured (HTTPS). ## CLI Reference # CLI Reference The runqy server includes a comprehensive command-line interface for managing queues, tasks, workers, and configurations. The CLI can operate locally (direct database access) or remotely (via HTTP API). ## Installation Build the CLI from source: ```bash cd runqy/app go build -o runqy . ``` ## Command Overview | Command | Description | |---------|-------------| | `runqy` | Start the HTTP server (default, same as `runqy serve`) | | `runqy serve` | Start the HTTP server | | `runqy queue` | Queue management commands | | `runqy task` | Task management commands | | `runqy worker` | Worker management commands | | `runqy config` | Configuration management commands | | `runqy vault` | Vault management commands (secrets) | | `runqy login` | Save server credentials for remote mode | | `runqy logout` | Remove saved credentials | | `runqy auth` | Authentication management (status, list, switch) | ## Server Commands ### Start the Server ```bash # Start with defaults runqy serve # Or simply (serve is the default command) runqy # Start with custom config directory runqy serve --config ./my-deployment # Start with git-based config and auto-reload runqy serve --config-repo https://github.com/org/configs.git --watch # Start without monitoring dashboard (API only) runqy serve --no-ui ``` **Serve Flags:** | Flag | Description | Default | |------|-------------|---------| | `--config` | Path to queue workers config directory | `QUEUE_WORKERS_DIR` env | | `--watch` | Enable file/git watching for config auto-reload | `false` | | `--no-ui` | Disable the monitoring web dashboard | `false` | | `--config-repo` | GitHub repo URL for configs | - | | `--config-branch` | Git branch | `main` | | `--config-path` | Path within repo to YAML files | - | | `--clone-dir` | Directory to clone repo into | - | | `--watch-interval` | Git polling interval in seconds | - | ## Queue Commands ### List Queues ```bash runqy queue list ``` Output: ``` QUEUE PENDING ACTIVE SCHEDULED RETRY ARCHIVED COMPLETED PAUSED inference.high 5 2 0 1 0 150 no inference.low 12 0 3 0 0 89 no ``` ### Inspect Queue ```bash runqy queue inspect inference.high ``` Shows detailed queue information including status, pause state, memory usage, and task counts. ### Pause/Unpause Queue ```bash # Pause a queue (stops processing new tasks) runqy queue pause inference.high # Resume a paused queue runqy queue unpause inference.high ``` ## Task Commands ### Enqueue a Task ```bash # Enqueue a task with JSON payload runqy task enqueue --queue inference.high --payload '{"prompt":"Hello world","width":1024}' # Short flags runqy task enqueue -q inference.high -p '{"msg":"test"}' # With custom timeout (seconds) runqy task enqueue -q inference.high -p '{"data":"value"}' --timeout 300 ``` **Enqueue Flags:** | Flag | Description | Default | |------|-------------|---------| | `-q, --queue` | Queue name (required) | - | | `-p, --payload` | JSON payload | `{}` | | `-t, --timeout` | Task timeout in seconds | `600` | ### List Tasks ```bash # List pending tasks in a queue runqy task list inference.high # List tasks by state runqy task list inference.high --state pending runqy task list inference.high --state active runqy task list inference.high --state scheduled runqy task list inference.high --state retry runqy task list inference.high --state archived runqy task list inference.high --state completed # Limit number of results runqy task list inference.high --state pending --limit 20 ``` ### Get Task Details ```bash runqy task get inference.high abc123-task-id ``` Output: ``` Task ID: abc123-task-id Type: task Queue: inference.high State: completed Max Retry: 3 Retried: 0 Timeout: 10m0s ... ``` ### Cancel and Delete Tasks ```bash # Cancel a running task runqy task cancel abc123-task-id # Delete a task from a queue runqy task delete inference.high abc123-task-id ``` ## Worker Commands ### List Workers ```bash runqy worker list ``` Output: ``` WORKER_ID STATUS QUEUES CONCURRENCY LAST_BEAT STALE worker-abc123-def456 ready inference.high 1 5s no worker-xyz789-uvw012 ready inference.low 1 3s no ``` ### Get Worker Info ```bash runqy worker info worker-abc123-def456 ``` Output: ``` Worker ID: worker-abc123-def456 Status: ready Queues: inference.high Concurrency: 1 Started At: 2024-01-15 10:30:00 Last Beat: 2024-01-15 10:35:45 (5s ago) ``` ## Config Commands ### List Queue Configurations ```bash runqy config list ``` Output: ``` NAME PRIORITY PROVIDER MODE GIT_URL inference.high 10 worker long_running https://github.com/org/worker.git inference.low 5 worker long_running https://github.com/org/worker.git simple.default 1 worker one_shot https://github.com/org/simple.git ``` ### Reload Configurations ```bash # Reload configs from default directory (QUEUE_WORKERS_DIR) runqy config reload # Reload from a specific directory runqy config reload --dir ./my-deployment ``` ### Validate Configuration Files ```bash # Validate YAML files without loading into database runqy config validate # Validate from a specific directory runqy config validate --dir ./my-deployment ``` !!! note `config validate` is local-only and does not work in remote mode. ### Create Queue Configuration ```bash # From YAML file runqy config create -f ./my-queue.yaml # Inline parameters runqy config create --name myqueue --priority 5 \ --git-url https://github.com/org/repo.git \ --startup-cmd "python main.py" # Update existing queue (use --force) runqy config create -f ./queue.yaml --force ``` **Create Flags:** | Flag | Description | Default | |------|-------------|---------| | `-f, --file` | YAML config file path | - | | `--name` | Queue name | - | | `--priority` | Queue priority | `1` | | `--git-url` | Git repository URL | - | | `--branch` | Git branch | `main` | | `--startup-cmd` | Startup command | - | | `--mode` | Mode: `long_running` or `one_shot` | `long_running` | | `--code-path` | Path within repo to the code | - | | `--force` | Update existing queue if it already exists | `false` | ### Remove Queue Configuration ```bash runqy config remove myqueue # Or with flag runqy config remove --name myqueue ``` ## Vault Commands Vaults store encrypted secrets that are injected into workers as environment variables. !!! note "Prerequisite" The vaults feature requires the `RUNQY_VAULT_MASTER_KEY` environment variable to be set on the server. ### List Vaults ```bash runqy vault list ``` Output: ``` NAME DESCRIPTION ENTRIES api-keys API keys for external services 3 credentials Database credentials 2 ``` ### Create a Vault ```bash # Create with description runqy vault create api-keys -d "API keys for external services" # Create without description runqy vault create my-secrets ``` ### Show Vault Details ```bash runqy vault show api-keys ``` Output: ``` Vault: api-keys Description: API keys for external services Created: 2024-01-15T10:30:00Z Updated: 2024-01-15T12:45:00Z Entries (3): KEY VALUE SECRET OPENAI_API_KEY sk****yz yes HF_TOKEN hf****ab yes DEBUG_MODE true no ``` Secret values are masked in the output for security. ### Delete a Vault ```bash # With confirmation prompt runqy vault delete api-keys # Skip confirmation runqy vault delete api-keys --force ``` ### Set a Vault Entry ```bash # Set a secret entry (default) runqy vault set api-keys OPENAI_API_KEY sk-your-key-here # Set a non-secret entry (visible in API responses) runqy vault set api-keys DEBUG_MODE true --no-secret ``` **Set Flags:** | Flag | Description | Default | |------|-------------|---------| | `--no-secret` | Store as non-secret (visible in API) | `false` | ### Get a Vault Entry ```bash runqy vault get api-keys OPENAI_API_KEY ``` Output: ``` sk-your-actual-key-here ``` !!! warning "Local Only" The `vault get` command only works in local mode for security reasons. It returns the decrypted value. ### Remove a Vault Entry ```bash runqy vault unset api-keys OPENAI_API_KEY ``` ### List Vault Entries ```bash runqy vault entries api-keys ``` Output: ``` KEY VALUE SECRET UPDATED OPENAI_API_KEY sk****yz yes 2024-01-15T12:45:00Z HF_TOKEN hf****ab yes 2024-01-15T10:30:00Z DEBUG_MODE true no 2024-01-15T11:00:00Z ``` ## Remote Mode The CLI can operate in two modes: 1. **Local mode** (default): Connects directly to Redis/PostgreSQL 2. **Remote mode**: Connects to a runqy server via HTTP API Use remote mode to manage a runqy server from a different machine. ### Usage ```bash # Remote mode - specify server URL and API key runqy --server https://runqy.example.com:3000 --api-key YOUR_API_KEY queue list # Short flags runqy -s https://runqy.example.com:3000 -k YOUR_API_KEY queue list # API key can also be set via environment variable export RUNQY_API_KEY=YOUR_API_KEY runqy -s https://runqy.example.com:3000 queue list ``` ### Remote Mode Examples ```bash # List queues on remote server runqy -s https://server:3000 -k API_KEY queue list # Enqueue a task on remote server runqy -s https://server:3000 -k API_KEY task enqueue -q inference.high -p '{"msg":"hello"}' # List workers on remote server runqy -s https://server:3000 -k API_KEY worker list # Trigger config reload on remote server runqy -s https://server:3000 -k API_KEY config reload ``` ### Remote Mode Support | Command | Remote Support | Notes | |---------|---------------|-------| | `queue list/inspect/pause/unpause` | Yes | Full support | | `task enqueue/list/get/cancel/delete` | Yes | Full support | | `worker list/info` | Yes | Full support | | `config list/reload/create/remove` | Yes | Full support | | `config validate` | No | Local-only (validates local YAML files) | | `vault list/create/show/delete` | Yes | Full support | | `vault set/unset/entries` | Yes | Full support | | `vault get` | No | Local-only (returns decrypted secrets) | | `serve` | No | Server command, not applicable | ## Authentication Persistence Save server credentials so you don't need to specify `--server` and `--api-key` for every command. ### Login and Save Credentials ```bash # Save credentials for a server (saved as "default" profile) runqy login -s https://production.example.com:3000 -k prod-api-key # Save with a custom profile name runqy login -s https://staging.example.com:3000 -k staging-key --name staging # API key can be prompted interactively runqy login -s https://server:3000 # API Key: ``` Credentials are stored in `~/.runqy/credentials.json` with restricted permissions (0600). ### Using Saved Credentials After logging in, commands work without flags: ```bash # Before (verbose) runqy --server https://server:3000 --api-key KEY queue list # After login (simple) runqy queue list runqy task enqueue -q myqueue -p '{"msg":"hello"}' runqy worker list ``` ### Manage Multiple Servers ```bash # List all saved servers runqy auth list ``` Output: ``` NAME URL CURRENT default https://production.example.com:3000 * staging https://staging.example.com:3000 ``` ```bash # Show current connection runqy auth status ``` Output: ``` Current server: default URL: https://production.example.com:3000 API Key: prod...key ``` ```bash # Switch to different server runqy auth switch staging # Switched to "staging" ``` ### Logout ```bash # Remove current profile runqy logout # Remove specific profile runqy logout --name staging # Remove all saved credentials runqy logout --all ``` ### Credential Priority Credentials are resolved in this order (highest to lowest): 1. Command-line flags (`--server`, `--api-key`) 2. Environment variables (`RUNQY_SERVER`, `RUNQY_API_KEY`) 3. Saved credentials (`~/.runqy/credentials.json`) 4. Local mode (direct Redis/PostgreSQL access) ## Global Flags These flags are available for all commands: | Flag | Description | |------|-------------| | `-s, --server` | Remote server URL for CLI-over-HTTP mode | | `-k, --api-key` | API key for authentication (or set `RUNQY_API_KEY` env var) | | `--redis-uri` | Redis URI (overrides `REDIS_HOST`/`REDIS_PORT`) | | `-v, --version` | Print version information | | `-h, --help` | Help for the command | ## Shell Completion Generate shell completion scripts for your shell: === "Bash" ```bash runqy completion bash > /etc/bash_completion.d/runqy ``` === "Zsh" ```bash runqy completion zsh > "${fpath[1]}/_runqy" ``` === "Fish" ```bash runqy completion fish > ~/.config/fish/completions/runqy.fish ``` === "PowerShell" ```powershell runqy completion powershell > runqy.ps1 ``` ## API Reference # Server API Reference ## Authentication Most endpoints require Bearer token authentication: ``` Authorization: Bearer {api_key} ``` The API key is configured via the `RUNQY_API_KEY` environment variable on the server. ## Worker Endpoints ### `POST /worker/register` Registers a worker and returns configuration including Redis credentials and deployment specs. **Request Headers:** | Header | Description | |--------|-------------| | `Authorization` | `Bearer {api_key}` | | `Content-Type` | `application/json` | **Request Body:** ```json { "worker_id": "worker-abc123", "queue": "inference" } ``` | Field | Type | Description | |-------|------|-------------| | `worker_id` | string | Unique identifier for this worker | | `queue` | string | Queue name to process tasks from | **Response:** ```json { "redis": { "addr": "localhost:6379", "password": "", "db": 0 }, "queue": { "name": "inference", "priority": 6, "mode": "long_running" }, "deployment": { "git_url": "https://github.com/example/repo.git", "branch": "main", "startup_cmd": "python main.py", "startup_timeout_secs": 300, "requirements_file": "requirements.txt", "vaults": ["api-keys"] }, "vaults": { "OPENAI_API_KEY": "sk-actual-decrypted-key", "HF_TOKEN": "hf-actual-decrypted-token" } } ``` The `vaults` field in the response contains decrypted key-value pairs from all vaults referenced in the deployment configuration. These are injected as environment variables by the worker. **Error Responses:** | Status | Description | |--------|-------------| | 401 | Invalid or missing API key | | 400 | Invalid request body | | 404 | Queue not found | | 500 | Internal server error | --- ## Queue Endpoints ### `POST /queue/add` Enqueue a new task. **Request Body:** ```json { "queue": "inference.high", "timeout": 300, "data": { "prompt": "Hello world", "width": 1024 } } ``` **Response:** ```json { "info": { "id": "task-uuid", "queue": "inference.high", "state": "pending" } } ``` ### `POST /queue/add-batch` Enqueue multiple tasks in a single request. Uses Redis pipelining for high-throughput job submission. **Request Body:** ```json { "queue": "inference.high", "timeout": 300, "jobs": [ {"data": {"prompt": "Hello"}}, {"data": {"prompt": "World"}}, {"data": {"prompt": "Foo"}, "timeout": 600} ] } ``` | Field | Type | Required | Description | |-------|------|----------|-------------| | `queue` | string | Yes | Target queue name | | `timeout` | int | No | Default timeout for all jobs (seconds) | | `jobs` | array | Yes | Array of job objects | | `jobs[].data` | object | Yes | Task payload | | `jobs[].timeout` | int | No | Override timeout for this job | **Response:** ```json { "enqueued": 3, "failed": 0, "task_ids": ["uuid-1", "uuid-2", "uuid-3"], "errors": [] } ``` | Field | Type | Description | |-------|------|-------------| | `enqueued` | int | Number of successfully enqueued tasks | | `failed` | int | Number of failed tasks | | `task_ids` | array | List of task IDs (in order) | | `errors` | array | Error messages for failed tasks | **Performance:** | Method | Throughput | |--------|------------| | Single `POST /queue/add` calls | ~800-1,000 jobs/s | | Batch endpoint (100 jobs/request) | ~35,000-50,000 jobs/s | !!! tip "Optimal batch size" Batch sizes of 50-200 jobs per request offer the best balance of throughput and latency. --- ### `GET /queue/{task_id}` Get task status and result. The queue is automatically determined from the task's metadata stored in Redis. **Response:** ```json { "info": { "id": "task-uuid", "queue": "inference.high", "state": "completed", "result": "..." } } ``` **Error Responses:** | Status | Description | |--------|-------------| | 404 | Task not found | | 400 | Invalid request | | 500 | Internal server error | --- ## CLI API Endpoints These endpoints are used by the CLI for remote management. All require Bearer token authentication. ### Queue Management | Method | Endpoint | Description | |--------|----------|-------------| | `GET` | `/api/cli/queues` | List all queues with statistics | | `GET` | `/api/cli/queues/{queue}` | Get detailed queue information | | `POST` | `/api/cli/queues/{queue}/pause` | Pause a queue | | `POST` | `/api/cli/queues/{queue}/unpause` | Unpause a queue | ### Task Management | Method | Endpoint | Description | |--------|----------|-------------| | `POST` | `/api/cli/tasks` | Enqueue a new task | | `GET` | `/api/cli/queues/{queue}/tasks` | List tasks in a queue | | `GET` | `/api/cli/queues/{queue}/tasks/{task_id}` | Get task details | | `DELETE` | `/api/cli/tasks/{task_id}` | Cancel a task | | `DELETE` | `/api/cli/queues/{queue}/tasks/{task_id}` | Delete a task | ### Worker Management | Method | Endpoint | Description | |--------|----------|-------------| | `GET` | `/api/cli/workers` | List all workers | | `GET` | `/api/cli/workers/{worker_id}` | Get worker details | ### Config Management | Method | Endpoint | Description | |--------|----------|-------------| | `GET` | `/api/cli/configs` | List all queue configurations | | `POST` | `/api/cli/configs/reload` | Reload configurations from YAML | | `POST` | `/api/cli/configs` | Create a new queue configuration | | `DELETE` | `/api/cli/configs/{name}` | Delete a queue configuration | --- ## Vault Endpoints Vault endpoints manage encrypted secrets. All require Bearer token authentication. !!! note "Prerequisite" The vaults feature must be enabled by setting the `RUNQY_VAULT_MASTER_KEY` environment variable. If not set, vault endpoints return `503 Service Unavailable`. ### `GET /api/vaults` List all vaults with entry counts. **Response:** ```json { "vaults": [ { "name": "api-keys", "description": "API keys for external services", "entry_count": 3 } ], "count": 1 } ``` ### `POST /api/vaults` Create a new vault. **Request Body:** ```json { "name": "api-keys", "description": "API keys for external services" } ``` **Response:** ```json { "message": "vault created successfully", "vault": { "name": "api-keys", "description": "API keys for external services" } } ``` ### `GET /api/vaults/{name}` Get vault details with entries (secret values masked). **Response:** ```json { "name": "api-keys", "description": "API keys for external services", "entries": [ { "key": "OPENAI_API_KEY", "value": "sk****yz", "is_secret": true, "created_at": "2024-01-15T10:30:00Z", "updated_at": "2024-01-15T12:45:00Z" } ], "created_at": "2024-01-15T10:30:00Z", "updated_at": "2024-01-15T12:45:00Z" } ``` ### `DELETE /api/vaults/{name}` Delete a vault and all its entries. **Response:** ```json { "message": "vault deleted successfully" } ``` ### `POST /api/vaults/{name}/entries` Set or update a vault entry. **Request Body:** ```json { "key": "OPENAI_API_KEY", "value": "sk-your-key-here", "is_secret": true } ``` | Field | Type | Required | Description | |-------|------|----------|-------------| | `key` | string | Yes | Entry key (environment variable name) | | `value` | string | Yes | Entry value | | `is_secret` | boolean | No | Mask value in API responses (default: `true`) | **Response:** ```json { "message": "entry set successfully" } ``` ### `GET /api/vaults/{name}/entries` List all entries in a vault (secret values masked). **Response:** ```json { "entries": [ { "key": "OPENAI_API_KEY", "value": "sk****yz", "is_secret": true, "created_at": "2024-01-15T10:30:00Z", "updated_at": "2024-01-15T12:45:00Z" } ], "count": 1 } ``` ### `DELETE /api/vaults/{name}/entries/{key}` Delete a vault entry. **Response:** ```json { "message": "entry deleted successfully" } ``` --- ## Swagger Documentation Interactive API documentation is available at: ``` http://localhost:3000/swagger/index.html ``` # Worker ## Overview # runqy Worker The runqy worker (`runqy-worker`) is a Go binary that processes tasks from Redis and supervises Python processes. ## Prerequisites The worker requires a Python runtime to execute task code: | Requirement | Description | |-------------|-------------| | **Python 3.8+** | With the `venv` module installed | | **Git** | For cloning task code repositories | ### Python venv module The worker creates virtual environments for task isolation. On some Linux distributions, the `venv` module is a separate package: === "Debian/Ubuntu" ```bash sudo apt install python3-venv ``` === "Fedora/RHEL" ```bash sudo dnf install python3-venv ``` === "macOS/Windows" The `venv` module is included by default with Python installations from python.org. ### Alternative: Use uv (recommended) [uv](https://docs.astral.sh/uv/) is a fast Python package installer that can also create virtual environments. If installed, the worker will automatically use it instead of `python -m venv`: ```bash # Install uv (cross-platform) curl -LsSf https://astral.sh/uv/install.sh | sh # Or with pip pip install uv ``` **Benefits of uv:** - Much faster venv creation - Doesn't require the `venv` module - Faster dependency installation when used with `requirements.txt` ## Overview The worker: - **Bootstraps** by registering with the server and deploying task code - **Supervises** a Python process for task execution - **Processes** tasks from Redis queues - **Reports** health status via heartbeat ## Installation ### Binary Download Download pre-built binaries from the [GitHub releases page](https://github.com/publikey/runqy-worker/releases). === "Linux (amd64)" ```bash curl -LO https://github.com/publikey/runqy-worker/releases/latest/download/runqy-worker_latest_linux_amd64.tar.gz tar -xzf runqy-worker_latest_linux_amd64.tar.gz ./runqy-worker -config config.yml ``` === "macOS (Apple Silicon)" ```bash curl -LO https://github.com/publikey/runqy-worker/releases/latest/download/runqy-worker_latest_darwin_arm64.tar.gz tar -xzf runqy-worker_latest_darwin_arm64.tar.gz ./runqy-worker -config config.yml ``` === "Windows (amd64)" ```powershell Invoke-WebRequest -Uri https://github.com/publikey/runqy-worker/releases/latest/download/runqy-worker_latest_windows_amd64.zip -OutFile runqy-worker.zip Expand-Archive runqy-worker.zip -DestinationPath . .\runqy-worker.exe -config config.yml ``` ### Docker Docker images are available at `ghcr.io/publikey/runqy-worker`. === "Minimal (default)" Lightweight Alpine-based image. You install your own runtime (Python, Node, etc.). - **Image:** `ghcr.io/publikey/runqy-worker:latest` or `:minimal` - **Base:** Alpine 3.19 with git, curl, ca-certificates - **Platforms:** linux/amd64, linux/arm64 ```bash docker run -v $(pwd)/config.yml:/app/config.yml ghcr.io/publikey/runqy-worker:latest ``` === "Inference" Pre-configured for ML workloads with PyTorch and CUDA. - **Image:** `ghcr.io/publikey/runqy-worker:inference` - **Base:** PyTorch 2.1.0 + CUDA 11.8 - **Platform:** linux/amd64 only - **Includes:** Python 3, pip, PyTorch, CUDA runtime ```bash docker run --gpus all -v $(pwd)/config.yml:/app/config.yml ghcr.io/publikey/runqy-worker:inference ``` === "Docker Compose" ```yaml version: "3.8" services: worker: image: ghcr.io/publikey/runqy-worker:latest volumes: - ./config.yml:/app/config.yml - ./deployment:/app/deployment restart: unless-stopped ``` #### Available Tags | Tag | Description | |-----|-------------| | `latest`, `minimal` | Minimal Alpine image (multi-arch) | | `inference` | PyTorch + CUDA image (amd64 only) | | `` | Specific version, minimal base | | `-minimal` | Specific version, minimal base | | `-inference` | Specific version, inference base | ### From Source ```bash git clone https://github.com/Publikey/runqy-worker.git cd runqy-worker go build -o runqy-worker ./cmd/worker ``` ## Running ```bash ./runqy-worker -config config.yml ``` ## Lifecycle 1. **Register**: Worker calls `POST /worker/register` on the server 2. **Deploy**: Clone git repository and set up Python environment 3. **Start**: Spawn Python process using `startup_cmd` 4. **Wait**: Wait for `{"status": "ready"}` from Python 5. **Process**: Dequeue tasks, send to Python, write results to Redis 6. **Heartbeat**: Periodically update worker status in Redis ## Source Code The worker is implemented in Go (~2500 lines of code): ``` runqy-worker/ ├── cmd/worker/ # Binary entry point ├── internal/ │ ├── bootstrap/ # server_client, code_deploy, process_supervisor │ ├── handler/ # stdio_handler, oneshot_handler │ └── redis/ # Redis operations ├── worker.go # Main Worker struct ├── processor.go # Task dequeue loop └── *.go # handler, task, heartbeat, retry, config ``` ## Related - [Configuration Reference](configuration.md) - [Deployment Guide](deployment.md) ## Configuration # Worker Configuration The worker is configured via a YAML file. ## Configuration File ```yaml server: url: "http://localhost:8081" api_key: "your-secret-api-key" worker: queue: "inference" concurrency: 1 shutdown_timeout: 30s deployment: dir: "./deployment" use_system_site_packages: true # Set to false for isolated virtualenv recovery: enabled: true # Auto-restart crashed Python processes (default: true) max_restarts: 5 # Circuit breaker threshold (default: 5) cooldown_period: 10m # Stable run time to reset counter (default: 10m) ``` ## Configuration Options ### `server` | Option | Type | Required | Description | |--------|------|----------|-------------| | `url` | string | Yes | runqy server URL | | `api_key` | string | Yes | API key for authentication | ### `worker` | Option | Type | Required | Description | |--------|------|----------|-------------| | `queue` | string | Yes | Queue name to process tasks from | | `concurrency` | int | No | Number of concurrent task processors (default: 1) | | `shutdown_timeout` | duration | No | Graceful shutdown timeout (default: 30s) | ### `deployment` | Option | Type | Required | Description | |--------|------|----------|-------------| | `dir` | string | Yes | Directory for cloning task code | | `use_system_site_packages` | bool | No | Inherit packages from base Python environment (default: `true`). Set to `false` for isolated virtualenv | ### `recovery` Controls auto-recovery when a supervised Python process crashes. Enabled by default. | Option | Type | Required | Description | |--------|------|----------|-------------| | `enabled` | bool | No | Enable auto-recovery (default: `true`) | | `max_restarts` | int | No | Max consecutive restarts before entering degraded state (default: `5`) | | `initial_delay` | duration | No | Delay before the first restart attempt (default: `1s`) | | `max_delay` | duration | No | Maximum delay between restart attempts (default: `5m`) | | `backoff_factor` | float | No | Multiplier for exponential backoff between restarts (default: `2.0`) | | `cooldown_period` | duration | No | Time without crash to reset the failure counter (default: `10m`) | ```yaml recovery: enabled: true max_restarts: 5 initial_delay: "1s" max_delay: "5m" backoff_factor: 2.0 cooldown_period: "10m" ``` !!! info "How auto-recovery works" When a Python process crashes, the worker automatically restarts it with exponential backoff. If the process keeps crashing (reaching `max_restarts` without a stable run), the worker enters **degraded state** and stops retrying — manual restart is required. If the process runs successfully for `cooldown_period`, the failure counter resets to zero. ## Environment Variables All configuration values can be set via environment variables, which take priority over `config.yml`: | Variable | Description | Default | |----------|-------------|---------| | `RUNQY_SERVER_URL` | Server URL | - | | `RUNQY_API_KEY` | API key for authentication | - | | `RUNQY_QUEUES` | Queues to listen on (comma-separated) | - | | `RUNQY_CONCURRENCY` | Number of concurrent tasks | `1` | | `RUNQY_SHUTDOWN_TIMEOUT` | Graceful shutdown timeout | `30s` | | `RUNQY_BOOTSTRAP_RETRIES` | Number of bootstrap retry attempts | `3` | | `RUNQY_BOOTSTRAP_RETRY_DELAY` | Delay between bootstrap retries | `5s` | | `RUNQY_GIT_SSH_KEY` | Path to SSH private key for git clone | - | | `RUNQY_GIT_TOKEN` | Git PAT token for HTTPS clone | - | | `RUNQY_DEPLOYMENT_DIR` | Local deployment directory | `./deployment` | | `RUNQY_USE_SYSTEM_SITE_PACKAGES` | Inherit packages from base Python (`true`/`false`) | `true` | | `RUNQY_MAX_RETRY` | Max task retries | `3` | | `RUNQY_RECOVERY_ENABLED` | Enable process auto-recovery (`true`/`false`) | `true` | | `RUNQY_RECOVERY_MAX_RESTARTS` | Max consecutive restarts before degraded state | `5` | | `RUNQY_RECOVERY_INITIAL_DELAY` | Initial delay before restart attempt | `1s` | | `RUNQY_RECOVERY_MAX_DELAY` | Maximum backoff delay between restarts | `5m` | | `RUNQY_RECOVERY_COOLDOWN` | Stable run time to reset failure counter | `10m` | ### Examples === "PowerShell" ```powershell $env:RUNQY_SERVER_URL = "http://localhost:3000" $env:RUNQY_API_KEY = "your-api-key" $env:RUNQY_QUEUES = "inference" runqy-worker ``` === "Bash" ```bash export RUNQY_SERVER_URL="http://localhost:3000" export RUNQY_API_KEY="your-api-key" export RUNQY_QUEUES="inference" runqy-worker ``` === "Docker" ```bash docker run \ -e RUNQY_SERVER_URL=http://host.docker.internal:3000 \ -e RUNQY_API_KEY=your-api-key \ -e RUNQY_QUEUES=inference \ runqy-worker ``` !!! tip "No config.yml needed" When using environment variables, you can run the worker without any `config.yml` file. ## Vault Injection If the queue's deployment configuration references vaults, the worker automatically: 1. Receives decrypted vault entries from the server during bootstrap 2. Injects them as environment variables before starting the Python process 3. Python code can access secrets via `os.environ` ```python import os # Access secrets from vaults api_key = os.environ.get("OPENAI_API_KEY") db_password = os.environ.get("DB_PASSWORD") ``` Vaults are configured on the server side in the queue's deployment YAML. See the [Vaults Guide](../guides/vaults.md) for details. ## Key Constraint **One worker = one queue = one supervised Python process** Each worker instance: - Processes tasks from a single queue - Runs a single Python process - Has a concurrency of 1 for the Python process (though the worker can manage queue operations concurrently) To scale, run multiple worker instances. ## Degraded State If the supervised Python process crashes repeatedly and exceeds `max_restarts` without a stable run: 1. Worker enters **degraded state** — no more restart attempts 2. Heartbeat reports `healthy: false` with recovery state `degraded` 3. Tasks are returned to queue for retry (but will keep failing on this worker) 4. **Manual restart of the worker is required** to recover To disable auto-recovery entirely and revert to the old behavior (immediate degraded state on first crash), set `recovery.enabled: false`. ## Deployment # Worker Deployment ## Docker ```dockerfile FROM golang:1.21 AS builder WORKDIR /app COPY . . RUN go build -o worker ./cmd/worker FROM python:3.11-slim # Install git for code deployment RUN apt-get update && apt-get install -y git && rm -rf /var/lib/apt/lists/* WORKDIR /app COPY --from=builder /app/worker . COPY config.yml . CMD ["./worker", "-config", "config.yml"] ``` ## Docker Compose ```yaml version: '3.8' services: worker: build: . environment: - RUNQY_SERVER_URL=http://server:8081 - RUNQY_API_KEY=your-api-key - RUNQY_QUEUE=inference volumes: - ./deployment:/app/deployment depends_on: - redis restart: unless-stopped redis: image: redis:7-alpine ports: - "6379:6379" ``` ## Kubernetes ```yaml apiVersion: apps/v1 kind: Deployment metadata: name: runqy-worker spec: replicas: 3 selector: matchLabels: app: runqy-worker template: metadata: labels: app: runqy-worker spec: containers: - name: worker image: your-registry/runqy-worker:latest env: - name: RUNQY_SERVER_URL value: "http://runqy-server:8081" - name: RUNQY_API_KEY valueFrom: secretKeyRef: name: runqy-secrets key: api-key - name: RUNQY_QUEUE value: "inference" resources: requests: memory: "256Mi" cpu: "100m" limits: memory: "1Gi" cpu: "1000m" ``` ## Queue Configuration Workers can listen on queues in two ways: ### Listen on all sub-queues ```yaml worker: queue: "inference" # Listens on ALL sub-queues (.high, .low, .default, etc.) ``` ### Listen on specific sub-queues only ```yaml worker: queues: - inference.high - inference.low ``` This listens **only** on `inference.high` and `inference.low`, ignoring other sub-queues like `inference.medium`. !!! note "Shared Runtime" Sub-queues of the same parent always share one code deployment and one runtime process. Configuring `[inference.high, inference.low]` deploys code once and starts one Python process. ## Scaling To handle more tasks, run multiple worker instances: - Each worker processes one parent queue with one Python process - Sub-queues of the same parent share a single runtime - Workers are stateless and can be scaled horizontally - Use container orchestration (Kubernetes, Docker Swarm) for auto-scaling ## Health Checks Workers report health via Redis heartbeat at `asynq:workers:{worker_id}`: ```bash redis-cli HGETALL asynq:workers:worker-abc123 ``` The `healthy` field indicates if the Python process is running correctly. # Python SDK ## Overview # runqy-python SDK The `runqy-python` SDK provides simple decorators for writing task handlers that run on runqy workers. It also includes a client for enqueuing tasks. ## Overview **Task Handlers** — The core of the SDK: - **`@load` decorator**: Define a function that runs once at startup (e.g., load ML models) - **`@task` decorator**: Define a function that handles each incoming task - **`run()` function**: Enter the stdin/stdout loop for long-running mode - **`run_once()` function**: Process a single task and exit (one-shot mode) **Client** — Also included for convenience: - **`RunqyClient` class**: HTTP client for enqueuing tasks - **`enqueue()` function**: Quick enqueue without creating a client instance - **`enqueue_batch()` function**: High-throughput batch enqueue (35,000+ jobs/s) ## Installation ```bash pip install runqy-python ``` Or install from source: ```bash pip install git+https://github.com/Publikey/runqy-python.git ``` ## Task Handler Example ```python from runqy_python import task, load, run @load def setup(): """Runs once at startup. Return value is passed to task handler.""" model = load_my_model() return {"model": model} @task def handle(payload: dict, ctx: dict) -> dict: """Handles each task. Receives payload and context from @load.""" model = ctx["model"] result = model.predict(payload["input"]) return {"prediction": result} if __name__ == "__main__": run() ``` ## Client Example The SDK also includes a client for enqueuing tasks from Python: ```python from runqy_python import RunqyClient client = RunqyClient("http://localhost:3000", api_key="your-api-key") # Enqueue a task task = client.enqueue("inference.default", {"input": "hello"}) print(f"Task ID: {task.task_id}") # Check result result = client.get_task(task.task_id) print(f"State: {result.state}, Result: {result.result}") ``` ## Batch Enqueue Example For high-throughput scenarios, use `enqueue_batch()` to submit thousands of jobs per second: ```python from runqy_python import RunqyClient client = RunqyClient("http://localhost:3000", api_key="your-api-key") # Enqueue 1000 jobs in a single request jobs = [{"input": f"job-{i}"} for i in range(1000)] result = client.enqueue_batch("inference.default", jobs) print(f"Enqueued: {result.enqueued}") print(f"Failed: {result.failed}") print(f"First 5 task IDs: {result.task_ids[:5]}") ``` !!! tip "Performance" Batch enqueue achieves ~35,000-50,000 jobs/s compared to ~800-1,000 jobs/s with individual `enqueue()` calls. ## Source Code ``` runqy-python/ └── src/runqy_python/ ├── __init__.py # Package exports ├── decorator.py # @task, @load decorators ├── runner.py # run(), run_once() functions └── client.py # RunqyClient for enqueuing tasks ``` ## Related - [Installation](installation.md) - [Decorators Reference](decorators.md) - [Examples](examples.md) ## Installation # Installation ## From PyPI ```bash pip install runqy-python ``` ## From Source ```bash pip install git+https://github.com/Publikey/runqy-python.git ``` ## For Development ```bash git clone https://github.com/Publikey/runqy-python.git cd runqy-python pip install -e . ``` ## Requirements - Python 3.9+ - No external dependencies (uses only standard library) ## Verify Installation ```python from runqy_python import task, load, run print("runqy-python installed successfully!") ``` ## Decorators # Decorators Reference ## `@load` Marks a function that runs once at startup. The return value is passed to the task handler as context. ```python from runqy_python import load @load def setup(): """Initialize resources (models, connections, etc.)""" model = load_model("model.pt") db = connect_to_database() return {"model": model, "db": db} ``` **Signature:** ```python def setup() -> Any ``` **Returns:** Any value that will be passed to `@task` handlers as the `ctx` parameter. **Use cases:** - Loading ML models - Establishing database connections - Reading configuration files - Any expensive initialization that should happen once ## `@task` Marks a function that handles each incoming task. ```python from runqy_python import task @task def handle(payload: dict, ctx: dict) -> dict: """Process a single task.""" result = ctx["model"].predict(payload["input"]) return {"prediction": result} ``` **Signature:** ```python def handle(payload: dict, ctx: Any) -> dict ``` **Parameters:** | Parameter | Type | Description | |-----------|------|-------------| | `payload` | dict | The task payload from Redis | | `ctx` | Any | The return value from `@load` (or `None` if no `@load` is defined) | **Returns:** A dictionary that will be serialized as the task result. **Error handling:** To signal that a task failed but should be retried, raise an exception. The worker will handle retry logic based on the task's `max_retry` setting. ## `run()` Enters the stdin/stdout loop for long-running mode. ```python from runqy_python import run if __name__ == "__main__": run() ``` This function: 1. Calls the `@load` function (if defined) 2. Outputs `{"status": "ready"}` to signal the worker 3. Reads JSON tasks from stdin 4. Calls the `@task` function for each task 5. Outputs JSON responses to stdout 6. Loops forever until the process is terminated ## `run_once()` Processes a single task for one-shot mode. ```python from runqy_python import run_once if __name__ == "__main__": run_once() ``` This function: 1. Calls the `@load` function (if defined) 2. Reads a single JSON task from stdin 3. Calls the `@task` function 4. Outputs the JSON response to stdout 5. Exits ## Protocol ### Input Format (stdin) ```json {"task_id": "abc123", "payload": {"msg": "hello"}} ``` ### Output Format (stdout) **Ready signal (after `@load`):** ```json {"status": "ready"} ``` **Task response:** ```json {"task_id": "abc123", "result": {"output": "world"}, "error": null, "retry": false} ``` **Error response:** ```json {"task_id": "abc123", "result": null, "error": "Something went wrong", "retry": true} ``` ## Examples # Examples ## Basic Task Handler ```python from runqy_python import task, run @task def handle(payload: dict, ctx: dict) -> dict: name = payload.get("name", "World") return {"message": f"Hello, {name}!"} if __name__ == "__main__": run() ``` ## ML Inference Task ```python from runqy_python import task, load, run import torch @load def setup(): """Load model once at startup.""" model = torch.load("model.pt") model.eval() return {"model": model} @task def handle(payload: dict, ctx: dict) -> dict: """Run inference on each request.""" model = ctx["model"] input_tensor = torch.tensor(payload["input"]) with torch.no_grad(): output = model(input_tensor) return {"prediction": output.tolist()} if __name__ == "__main__": run() ``` ## One-Shot Task For tasks that don't benefit from keeping the process warm: ```python from runqy_python import task, run_once @task def handle(payload: dict, ctx: dict) -> dict: # Process data result = expensive_computation(payload["data"]) return {"result": result} if __name__ == "__main__": run_once() ``` ## Task with Database Connection ```python from runqy_python import task, load, run import psycopg2 @load def setup(): """Establish database connection once.""" conn = psycopg2.connect( host="localhost", database="mydb", user="user", password="password" ) return {"db": conn} @task def handle(payload: dict, ctx: dict) -> dict: """Query database for each task.""" cursor = ctx["db"].cursor() cursor.execute("SELECT * FROM users WHERE id = %s", (payload["user_id"],)) user = cursor.fetchone() cursor.close() return {"user": user} if __name__ == "__main__": run() ``` ## Task with Webhook Delivery For tasks that need to deliver results asynchronously (recommended pattern since results are not stored in Redis by default): ```python from runqy_python import task, run import requests import logging @task def handle(payload: dict, ctx: dict) -> dict: # Extract webhook URL from payload webhook_url = payload.get("webhook_url") # Do the work result = process_data(payload["input"]) # Deliver via webhook if webhook_url: try: response = requests.post( webhook_url, json={"task_id": payload.get("task_id"), "result": result}, timeout=30 ) response.raise_for_status() except Exception as e: logging.error(f"Webhook delivery failed: {e}") # Optionally: raise to trigger retry, or handle gracefully # No need to return data when redis_storage=false (default) return if __name__ == "__main__": run() ``` See [Result Delivery Patterns](../guides/result-delivery.md) for more delivery options including S3 upload and database writes. ## Testing Locally You can test your task handler without a worker: ```bash echo '{"task_id":"t1","payload":{"name":"Alice"}}' | python your_task.py ``` Expected output: ```json {"status": "ready"} {"task_id": "t1", "result": {"message": "Hello, Alice!"}, "error": null, "retry": false} ``` # Guides ## Local Development # Local Development This guide covers setting up a complete local development environment for runqy. ## Prerequisites - Go 1.21+ - Python 3.9+ - Redis - Git ## Setup ### 1. Start Redis === "Docker" ```bash docker run -d --name redis -p 6379:6379 redis:7-alpine ``` === "macOS" ```bash brew install redis brew services start redis ``` === "Linux" ```bash sudo apt install redis-server sudo systemctl start redis ``` ### 2. Clone Repositories ```bash mkdir runqy-dev && cd runqy-dev git clone https://github.com/Publikey/runqy.git git clone https://github.com/Publikey/runqy-worker.git ``` ### 3. Start PostgreSQL (optional, for full features) === "Docker" ```bash docker run -d --name postgres -p 5432:5432 \ -e POSTGRES_PASSWORD=devpassword \ postgres:15-alpine ``` === "Skip" PostgreSQL is optional for basic local development. The server can function with Redis-only configuration. ### 4. Configure and Start Server ```bash cd runqy # Create environment file cp .env.secret.sample .env.secret # Edit .env.secret with your credentials: cat > .env.secret << 'EOF' REDIS_HOST=localhost REDIS_PASSWORD= DATABASE_HOST=localhost DATABASE_PORT=5432 DATABASE_USER=postgres DATABASE_PASSWORD=devpassword DATABASE_DBNAME=runqy_dev RUNQY_API_KEY=dev-api-key EOF cd app && go run . ``` The server starts on port 3000 by default. ### 5. Configure and Start Worker In a new terminal: ```bash cd runqy-worker cat > config.yml << 'EOF' server: url: "http://localhost:3000" api_key: "dev-api-key" worker: queue: "inference" concurrency: 1 shutdown_timeout: 30s deployment: dir: "./deployment" EOF go run ./cmd/worker ``` ## Testing ### Test Python Task Directly ```bash cd test-dummy-task echo '{"task_id":"t1","payload":{"foo":"bar"}}' | python python/hello_world/dummy_task.py ``` ### Enqueue a Task ```bash redis-cli HSET asynq:t:test-1 type task payload '{"msg":"hello"}' retry 0 max_retry 2 queue inference.default LPUSH asynq:inference.default:pending test-1 ``` ### Check Results ```bash redis-cli GET asynq:result:test-1 ``` ### Monitor Worker Health ```bash redis-cli HGETALL "asynq:workers:*" ``` ## Debugging ### View Worker Logs The worker outputs logs to stdout. Look for: - `Registered with server` — Successful server connection - `Deployed code` — Git clone completed - `Python process ready` — Received `{"status": "ready"}` - `Processing task` — Dequeued a task - `Task completed` — Task finished successfully ### View Python Process Output In long-running mode, the Python process communicates via stdin/stdout. To debug: 1. Add logging to stderr in your task code: ```python import sys print("Debug message", file=sys.stderr) ``` 2. The worker will forward stderr to its own logs. ### Common Issues **Worker can't connect to server:** - Check server is running on the configured port - Verify API key matches between server and worker configs **Python process won't start:** - Check `startup_cmd` is correct - Verify Python dependencies are installed - Look for syntax errors in task code **Tasks stuck in pending:** - Check worker is healthy (Redis heartbeat) - Verify queue names match (including sub-queues) ## Enqueueing Tasks # Enqueueing Tasks runqy uses Redis for task storage. This guide shows how to enqueue tasks from various languages. ## Quick Comparison | Method | Throughput | Use Case | |--------|------------|----------| | HTTP API (`POST /queue/add`) | ~800-1,000/s | Simple integrations | | HTTP Batch API (`POST /queue/add-batch`) | ~35,000-50,000/s | High-throughput from any language | | Direct Redis (pipelined) | ~40,000-80,000/s | Maximum performance | For most use cases, the **Batch API** offers the best balance of simplicity and performance. ## Redis Key Format runqy uses an asynq-compatible key format: | Key | Type | Description | |-----|------|-------------| | `asynq:t:{task_id}` | Hash | Task data | | `asynq:{queue}:pending` | List | Pending task IDs | ## Task Data Fields | Field | Type | Description | |-------|------|-------------| | `type` | string | Task type (usually `"task"`) | | `payload` | string | JSON-encoded payload | | `retry` | string | Current retry count | | `max_retry` | string | Maximum retry attempts | | `queue` | string | Queue name (including sub-queue) | ## Queue Naming Sub-queues let you assign different priorities to the same task type. A common use case is routing paid users to a high-priority sub-queue while free users go to a lower-priority sub-queue—both execute the same task code, but paid users get processed first. Queues use the format `{parent}.{sub_queue}`: - `inference.premium` — High priority (paid users) - `inference.standard` — Standard priority (free users) - `simple.default` — Default simple queue Workers register for a parent queue (e.g., `inference`) and process tasks from all its sub-queues, prioritizing higher-priority sub-queues first. ### Automatic Default Fallback When you specify a queue name without the sub-queue suffix, runqy automatically appends `.default`: | You provide | Resolves to | |-------------|-------------| | `inference` | `inference.default` | | `simple` | `simple.default` | | `inference.high` | `inference.high` (unchanged) | This works in the API, CLI, and direct Redis operations (the server normalizes the queue name before processing). !!! warning "Queue must exist" If the resolved queue (e.g., `inference.default`) doesn't exist in the configuration, the operation fails with an error. ## HTTP Batch API The batch endpoint is the recommended approach for high-throughput job submission. It uses Redis pipelining internally for optimal performance. ### Python (using SDK) ```python from runqy_python import RunqyClient client = RunqyClient("http://localhost:3000", api_key="your-api-key") # Submit 1000 jobs in one request jobs = [{"prompt": f"Generate image {i}"} for i in range(1000)] result = client.enqueue_batch("inference.default", jobs) print(f"Enqueued: {result.enqueued} jobs") ``` ### cURL ```bash curl -X POST http://localhost:3000/queue/add-batch \ -H "Authorization: Bearer your-api-key" \ -H "Content-Type: application/json" \ -d '{ "queue": "inference.default", "jobs": [ {"data": {"prompt": "Hello"}}, {"data": {"prompt": "World"}} ] }' ``` ### Node.js ```javascript const response = await fetch('http://localhost:3000/queue/add-batch', { method: 'POST', headers: { 'Authorization': 'Bearer your-api-key', 'Content-Type': 'application/json', }, body: JSON.stringify({ queue: 'inference.default', jobs: [ { data: { prompt: 'Hello' } }, { data: { prompt: 'World' } }, ], }), }); const result = await response.json(); console.log(`Enqueued: ${result.enqueued}`); ``` --- ## Direct Redis Access For maximum performance or when you need direct Redis access, you can enqueue tasks directly. ### Redis CLI ```bash redis-cli # Create task data HSET asynq:t:my-task-id \ type task \ payload '{"input": "hello world"}' \ retry 0 \ max_retry 3 \ queue inference.default # Push to pending queue LPUSH asynq:inference.default:pending my-task-id ``` ### Python ```python import redis import json import uuid def enqueue_task(queue: str, payload: dict, max_retry: int = 3) -> str: """Enqueue a task and return its ID.""" r = redis.Redis(host='localhost', port=6379) task_id = str(uuid.uuid4()) # Create task hash r.hset(f"asynq:t:{task_id}", mapping={ "type": "task", "payload": json.dumps(payload), "retry": "0", "max_retry": str(max_retry), "queue": queue }) # Push to pending list r.lpush(f"asynq:{queue}:pending", task_id) return task_id # Usage task_id = enqueue_task("inference.default", {"input": "hello"}) print(f"Enqueued task: {task_id}") ``` ### Node.js ```javascript const Redis = require('ioredis'); const { v4: uuidv4 } = require('uuid'); const redis = new Redis(); async function enqueueTask(queue, payload, maxRetry = 3) { const taskId = uuidv4(); // Create task hash await redis.hset(`asynq:t:${taskId}`, { type: 'task', payload: JSON.stringify(payload), retry: '0', max_retry: String(maxRetry), queue: queue }); // Push to pending list await redis.lpush(`asynq:${queue}:pending`, taskId); return taskId; } // Usage const taskId = await enqueueTask('inference.default', { input: 'hello' }); console.log(`Enqueued task: ${taskId}`); ``` ### Go ```go package main import ( "context" "encoding/json" "fmt" "github.com/google/uuid" "github.com/redis/go-redis/v9" ) func enqueueTask(ctx context.Context, rdb *redis.Client, queue string, payload map[string]any, maxRetry int) (string, error) { taskID := uuid.New().String() payloadJSON, err := json.Marshal(payload) if err != nil { return "", err } // Create task hash err = rdb.HSet(ctx, fmt.Sprintf("asynq:t:%s", taskID), map[string]any{ "type": "task", "payload": string(payloadJSON), "retry": "0", "max_retry": fmt.Sprintf("%d", maxRetry), "queue": queue, }).Err() if err != nil { return "", err } // Push to pending list err = rdb.LPush(ctx, fmt.Sprintf("asynq:%s:pending", queue), taskID).Err() if err != nil { return "", err } return taskID, nil } ``` ## Checking Results !!! warning "Default: Results Not Stored in Redis" By default, workers do **not** store task results in Redis (`redis_storage: false`). Only task success/failure status is tracked. Your Python task code should handle result delivery (webhook, storage, etc.). See [Result Delivery Patterns](result-delivery.md). Enable `redis_storage: true` in your queue deployment config (server-side) if you need Redis result storage. ### Poll for Result (when redis_storage is enabled) ```python import redis import json import time def wait_for_result(task_id: str, timeout: int = 30) -> dict: """Wait for task result with timeout.""" r = redis.Redis(host='localhost', port=6379) key = f"asynq:result:{task_id}" start = time.time() while time.time() - start < timeout: result = r.get(key) if result: return json.loads(result) time.sleep(0.1) raise TimeoutError(f"Task {task_id} did not complete within {timeout}s") # Usage result = wait_for_result(task_id) print(f"Result: {result}") ``` ### Check Task Status ```bash # View task data redis-cli HGETALL asynq:t:my-task-id # Check if task is pending redis-cli LRANGE asynq:inference.default:pending 0 -1 # Check if task is active (being processed) redis-cli LRANGE asynq:inference.default:active 0 -1 # Get result redis-cli GET asynq:result:my-task-id ``` ## Result Delivery # Result Delivery Patterns By default, runqy workers do **not** store task results in Redis. This guide explains why and how to handle result delivery in your tasks. ## Default Behavior When a task completes, the worker only tracks success or failure status in Redis. The full result payload is **not stored** by default (`redis_storage: false`). **Why?** Task results can be large—videos, images, ML model outputs, generated files. Storing these in Redis would: - Bloat memory usage on your Redis instance - Require manual cleanup of stale results - Create bottlenecks for large payloads Instead, your Python task code is responsible for delivering results to clients. ## Your Task Handles Delivery The recommended pattern is to include delivery instructions in your task payload. Your task code then sends results directly to their destination. Common patterns: | Pattern | Best For | |---------|----------| | Webhook callback | Real-time notifications, async workflows | | Blob storage (S3/GCS) | Large files, media, ML outputs | | Database write | Structured data, audit trails | | Message queue | Event-driven architectures | ## Pattern 1: Webhook Callback Pass a `webhook_url` in your task payload. When the task completes, POST the result to that URL. ```python from runqy_python import task, run import requests import logging @task def handle(payload: dict, ctx: dict) -> dict: webhook_url = payload.get("webhook_url") # Do the work result = process_data(payload["input"]) # Deliver via webhook if webhook_url: try: response = requests.post( webhook_url, json={ "task_id": payload.get("task_id"), "status": "completed", "result": result }, timeout=30 ) response.raise_for_status() except requests.RequestException as e: logging.error(f"Webhook delivery failed: {e}") # Option 1: Raise to trigger retry # raise # Option 2: Handle gracefully, log for investigation # Return value not stored when redis_storage=false (default) # Just return to signal success return if __name__ == "__main__": run() ``` !!! note "Return Values When redis_storage is Disabled" When `redis_storage: false` (the default), the return value is not stored anywhere. The worker only needs to know success vs failure, so `return` or `return None` is sufficient. The SDK sends `{"result": null, "error": null}` to signal success. **Enqueueing with webhook:** ```python task_id = enqueue_task("inference.default", { "input": "process this", "webhook_url": "https://api.example.com/webhooks/task-complete" }) ``` ### Webhook Best Practices - **Retry failed deliveries**: Implement exponential backoff for transient failures - **Verify signatures**: Sign webhook payloads so receivers can verify authenticity - **Handle timeouts**: Set reasonable timeouts (30s) to avoid blocking tasks - **Log failures**: Track failed deliveries for investigation ## Pattern 2: Blob Storage (S3/GCS) For large results like images or videos, upload to cloud storage and optionally notify via webhook with the URL. ```python from runqy_python import task, load, run import boto3 import requests import uuid @load def setup(): """Initialize S3 client once.""" s3 = boto3.client('s3') return {"s3": s3} @task def handle(payload: dict, ctx: dict) -> dict: s3 = ctx["s3"] bucket = "my-results-bucket" # Generate result (e.g., process image, run ML inference) output_data = generate_output(payload["input"]) # Upload to S3 key = f"results/{uuid.uuid4()}.png" s3.put_object( Bucket=bucket, Key=key, Body=output_data, ContentType="image/png" ) result_url = f"https://{bucket}.s3.amazonaws.com/{key}" # Notify via webhook if provided webhook_url = payload.get("webhook_url") if webhook_url: requests.post(webhook_url, json={ "task_id": payload.get("task_id"), "status": "completed", "result_url": result_url }, timeout=30) return if __name__ == "__main__": run() ``` ### Presigned URLs For private buckets, generate presigned URLs with expiration: ```python presigned_url = s3.generate_presigned_url( 'get_object', Params={'Bucket': bucket, 'Key': key}, ExpiresIn=3600 # 1 hour ) ``` ## Pattern 3: Database Write Write results directly to your application database: ```python from runqy_python import task, load, run import psycopg2 @load def setup(): conn = psycopg2.connect( host="db.example.com", database="myapp", user="worker", password="secret" ) return {"db": conn} @task def handle(payload: dict, ctx: dict) -> dict: task_id = payload.get("task_id") result = process_data(payload["input"]) cursor = ctx["db"].cursor() cursor.execute( "UPDATE tasks SET status = %s, result = %s, completed_at = NOW() WHERE id = %s", ("completed", json.dumps(result), task_id) ) ctx["db"].commit() cursor.close() return if __name__ == "__main__": run() ``` ## Pattern 4: Message Queue Publish results to a message queue for downstream processing: ```python from runqy_python import task, load, run import pika import json @load def setup(): connection = pika.BlockingConnection( pika.ConnectionParameters('rabbitmq.example.com') ) channel = connection.channel() channel.queue_declare(queue='task_results', durable=True) return {"channel": channel} @task def handle(payload: dict, ctx: dict) -> dict: result = process_data(payload["input"]) ctx["channel"].basic_publish( exchange='', routing_key='task_results', body=json.dumps({ "task_id": payload.get("task_id"), "result": result }), properties=pika.BasicProperties(delivery_mode=2) # persistent ) return if __name__ == "__main__": run() ``` ## When to Enable redis_storage Enable `redis_storage: true` in your queue deployment config (server-side) when: - Results are small (< 1MB) - Clients need to poll synchronously for results - Results are short-lived and you set appropriate TTLs - You're building simple integrations or prototypes ```yaml # Queue deployment YAML (server-side, e.g. runqy/deployment/simple.yml) queues: simple: deployment: redis_storage: true ``` The server transmits this setting to the worker during bootstrap registration. !!! warning "Memory Considerations" With `redis_storage: true`, results accumulate in Redis until they expire or are manually deleted. Monitor your Redis memory usage and set appropriate TTLs for result keys. ## Combining Patterns You can combine patterns. For example, upload to S3 for large files but also write metadata to your database: ```python @task def handle(payload: dict, ctx: dict) -> dict: # Generate large output video_data = generate_video(payload["input"]) # Upload to S3 s3_url = upload_to_s3(video_data) # Write metadata to database save_to_db(payload["task_id"], { "status": "completed", "result_url": s3_url, "duration_seconds": len(video_data) / 1000 }) # Notify client notify_webhook(payload["webhook_url"], { "task_id": payload["task_id"], "result_url": s3_url }) return ``` ## Vaults (Secrets) # Vaults: Secure Secret Management Vaults provide secure, encrypted storage for secrets that workers need to access external services. Vault entries are automatically injected as environment variables when workers bootstrap. ## Overview ```mermaid flowchart LR subgraph Server V[("Vaults
(Encrypted)")] DB[(Database)] V --> DB end subgraph Worker Bootstrap W[Worker] W -->|1. Register| Server Server -->|2. Decrypt & Send| W W -->|3. Inject as ENV| P[Python Process] end P -->|os.environ| API[External APIs] ``` ## Setup ### 1. Generate a Master Key The vault master key is used to encrypt all vault entries. Generate a secure 256-bit key: ```bash # Using OpenSSL openssl rand -base64 32 ``` Output: ``` YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoxMjM0NTY= ``` !!! danger "Keep This Secret" - Store the master key securely (e.g., in a secrets manager like AWS Secrets Manager, HashiCorp Vault, or Kubernetes Secrets) - Never commit the master key to version control - If the key is lost, all vault entries become unrecoverable - If the key is compromised, rotate all secrets stored in vaults ### 2. Configure the Server Set the `RUNQY_VAULT_MASTER_KEY` environment variable: === "Environment Variable" ```bash export RUNQY_VAULT_MASTER_KEY="YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoxMjM0NTY=" runqy serve ``` === "Docker" ```yaml services: runqy-server: image: runqy/server:latest environment: - RUNQY_VAULT_MASTER_KEY=YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoxMjM0NTY= ``` === "Kubernetes" ```yaml apiVersion: v1 kind: Secret metadata: name: runqy-secrets type: Opaque data: vault-master-key: WVdKalpHVm1aMmhwYW10c2JXNXZjSEZ5YzNSMWRuZDRlWG94TWpNME5UWT0= --- apiVersion: apps/v1 kind: Deployment spec: template: spec: containers: - name: server env: - name: RUNQY_VAULT_MASTER_KEY valueFrom: secretKeyRef: name: runqy-secrets key: vault-master-key ``` If `RUNQY_VAULT_MASTER_KEY` is not set, the server starts with vaults disabled. Vault API endpoints return `503 Service Unavailable`. ## Managing Vaults ### Create a Vault Vaults are named collections of key-value pairs: ```bash runqy vault create api-keys -d "API keys for external services" runqy vault create model-credentials -d "ML model access tokens" ``` ### Add Secrets ```bash # Add API keys runqy vault set api-keys OPENAI_API_KEY sk-proj-xxx runqy vault set api-keys ANTHROPIC_API_KEY sk-ant-xxx runqy vault set api-keys HF_TOKEN hf_xxx # Add database credentials runqy vault set model-credentials DB_PASSWORD supersecret123 ``` By default, all entries are marked as secrets and their values are masked in API responses and CLI output. ### Add Non-Secret Values For configuration values that don't need masking: ```bash runqy vault set api-keys DEBUG_MODE true --no-secret runqy vault set api-keys LOG_LEVEL info --no-secret ``` ### View Vault Contents ```bash runqy vault show api-keys ``` Output: ``` Vault: api-keys Description: API keys for external services Created: 2024-01-15T10:30:00Z Updated: 2024-01-15T14:22:00Z Entries (4): KEY VALUE SECRET OPENAI_API_KEY sk****xx yes ANTHROPIC_API_KEY sk****xx yes HF_TOKEN hf****xx yes DEBUG_MODE true no ``` ### Get Decrypted Values To retrieve the actual decrypted value (local mode only): ```bash runqy vault get api-keys OPENAI_API_KEY ``` Output: ``` sk-proj-actual-key-value-here ``` !!! note `vault get` only works in local mode for security reasons. ### Remove Entries ```bash runqy vault unset api-keys OLD_KEY ``` ### Delete a Vault ```bash runqy vault delete api-keys --force ``` ## Using Vaults with Queues ### Configure Queue Deployment Reference vaults in your queue configuration: ```yaml queues: inference: priority: 6 deployment: git_url: "https://github.com/example/inference.git" branch: "main" startup_cmd: "python main.py" startup_timeout_secs: 300 vaults: - api-keys - model-credentials ``` When a worker registers for this queue, the server: 1. Retrieves all entries from the specified vaults 2. Decrypts the values 3. Sends them to the worker in the registration response 4. The worker injects them as environment variables ### Access in Python ```python import os from runqy_python import task, run @task def generate_text(payload, ctx): # Access vault entries as environment variables openai_key = os.environ.get("OPENAI_API_KEY") # Use the secrets from openai import OpenAI client = OpenAI( api_key=openai_key, ) response = openai.ChatCompletion.create( model="gpt-4", messages=[{"role": "user", "content": payload["prompt"]}] ) return {"text": response.choices[0].message.content} if __name__ == "__main__": run() ``` ## Multiple Vaults When multiple vaults are specified, entries are merged. If the same key exists in multiple vaults, later vaults override earlier ones: ```yaml deployment: vaults: - shared-secrets # Applied first - project-secrets # Overrides shared-secrets if keys conflict ``` ## Security Best Practices ### 1. Use Separate Vaults for Different Concerns ```bash runqy vault create shared-infrastructure -d "Shared infra credentials" runqy vault create project-a-secrets -d "Project A specific secrets" runqy vault create project-b-secrets -d "Project B specific secrets" ``` ### 2. Limit Vault Access per Queue Only reference the vaults a queue actually needs: ```yaml queues: project-a: deployment: vaults: - shared-infrastructure - project-a-secrets # Only project A secrets project-b: deployment: vaults: - shared-infrastructure - project-b-secrets # Only project B secrets ``` ### 3. Rotate Secrets Regularly ```bash # Update a secret runqy vault set api-keys OPENAI_API_KEY sk-new-key-value # Workers receive the new value on next bootstrap/restart ``` ### 4. Audit Vault Usage Check which queues reference a vault before modifying: ```bash runqy config list # Review the deployment.vaults field for each queue ``` ### 5. Secure the Master Key - Store in a dedicated secrets manager - Use different keys for different environments (dev, staging, prod) - Implement key rotation procedures - Monitor access to the master key ## Encryption Details Vaults use AES-256-GCM encryption: - **Algorithm**: AES-256 in GCM mode - **Key**: 256-bit key derived from `RUNQY_VAULT_MASTER_KEY` - **Nonce**: Random 12-byte nonce per encryption - **Storage**: Encrypted ciphertext stored as binary blob in database The encryption ensures: - **Confidentiality**: Values cannot be read without the master key - **Integrity**: Tampering with encrypted data is detected - **Authenticity**: Only the holder of the master key can encrypt/decrypt ## Troubleshooting ### "vaults not configured" Error The server returns `503 Service Unavailable` with message "vaults not configured". **Cause**: `RUNQY_VAULT_MASTER_KEY` environment variable is not set or invalid. **Solution**: Set a valid base64-encoded 32-byte key. ### "vault not found" Error **Cause**: The vault name doesn't exist. **Solution**: Create the vault first with `runqy vault create`. ### Workers Not Receiving Secrets **Possible causes**: 1. Vault names in queue config don't match actual vault names 2. Vaults are empty 3. Server's vault feature is disabled **Debug steps**: ```bash # Check vault exists and has entries runqy vault show api-keys # Check queue config references correct vaults runqy config list # Check server logs for vault-related warnings ``` ### Key Rotation To rotate the master key: 1. Export all vault entries (values will be decrypted) 2. Stop the server 3. Set new `RUNQY_VAULT_MASTER_KEY` 4. Re-import all vault entries (values will be re-encrypted with new key) 5. Restart workers to receive re-encrypted secrets !!! warning There's no automatic key rotation. Plan for manual rotation if needed. ## Monitoring # Monitoring runqy provides multiple monitoring options, from simple Redis queries to full Prometheus/Grafana integration. ## Web Dashboard The built-in web dashboard at `/monitoring` provides real-time visibility into your queues and workers. ### Dashboard Features - **Task History**: View processed/failed task counts over time (Today, 7D, 30D) - **Queue Sizes**: Live view of pending, active, retry, and archived tasks per queue - **Worker Status**: Monitor worker health, active tasks, and heartbeats - **Task Management**: Inspect, retry, or delete tasks directly from the UI The dashboard works out of the box using Redis data. No additional setup required. !!! tip "Headless Mode" For API-only deployments (e.g., headless servers, CI/CD pipelines), disable the dashboard with `--no-ui`: ```bash runqy serve --no-ui ``` The REST API, Prometheus metrics (`/metrics`), and Swagger docs remain available. ### Authentication The dashboard requires authentication to protect sensitive queue and task data. #### First-Time Setup On first access, you'll be prompted to create an admin account: 1. Navigate to `/monitoring` 2. You'll be redirected to `/monitoring/setup` 3. Enter your email and password (minimum 8 characters) 4. Click "Create Admin Account" After setup, the dashboard is protected and requires login. #### Login Flow 1. Navigate to `/monitoring` 2. Enter your email and password 3. You'll be logged in for 7 days (JWT cookie) #### Session Management - Sessions expire after 7 days - Click "Logout" in the sidebar to end your session - If your session expires, you'll be redirected to the login page !!! note "Single Admin" The dashboard supports a single admin account. There is no password reset feature—if you forget your password, you'll need to delete the `admin_user` row from the database and re-run setup. ## Worker Health Workers report their health status via Redis heartbeat. ### Check Worker Status ```bash # List all workers redis-cli KEYS "asynq:workers:*" # Get worker details redis-cli HGETALL asynq:workers:worker-abc123 ``` The heartbeat hash includes: | Field | Description | |-------|-------------| | `started` | Worker start timestamp | | `healthy` | `true` if Python process is running | | `queue` | Queue being processed | | `active_task` | Currently processing task ID (if any) | ### Degraded State When `healthy: false`: - The supervised Python process has crashed - Worker won't process new tasks - Manual restart is required ## Queue Metrics ### Pending Tasks ```bash # Count pending tasks redis-cli LLEN asynq:inference.default:pending # List pending task IDs redis-cli LRANGE asynq:inference.default:pending 0 -1 ``` ### Active Tasks ```bash # Count active tasks redis-cli LLEN asynq:inference.default:active # List active task IDs redis-cli LRANGE asynq:inference.default:active 0 -1 ``` ## Task Inspection ### View Task Data ```bash redis-cli HGETALL asynq:t:task-id-here ``` ### View Task Result ```bash redis-cli GET asynq:result:task-id-here ``` --- ## Prometheus Integration (Optional) For advanced monitoring with sub-second time-series data, runqy integrates with Prometheus. This is optional—the dashboard works without it. ### What Prometheus Adds | Feature | Without Prometheus | With Prometheus | |---------|-------------------|-----------------| | Task history | Daily aggregates (90 days) | Sub-second time-series | | Real-time throughput | Snapshot totals | Rate per second | | Custom dashboards | Basic dashboard | Full Grafana support | | Alerting | Manual monitoring | AlertManager integration | | Retention | 90 days in Redis | Configurable (years) | ### Architecture ``` ┌─────────────────┐ scrape ┌─────────────────┐ │ runqy server │ ───────────────>│ Prometheus │ │ :3000/metrics │ /metrics │ :9090 │ └─────────────────┘ └─────────────────┘ │ │ query ▼ ┌─────────────────┐ │ Grafana │ │ :3001 │ └─────────────────┘ ``` ### Setup #### 1. Configure Prometheus to Scrape runqy Add to your `prometheus.yml`: ```yaml scrape_configs: - job_name: 'runqy' static_configs: - targets: ['localhost:3000'] # runqy server address scrape_interval: 15s metrics_path: /metrics ``` #### 2. Set the Prometheus Address (Optional) To enable Prometheus-powered charts in the dashboard: ```bash export PROMETHEUS_ADDRESS=http://localhost:9090 ``` !!! note This environment variable is optional. Without it, the dashboard uses Redis data which works perfectly for most use cases. Set this only if you want sub-second time-series data in the dashboard. ### Available Metrics runqy exposes the following Prometheus metrics at `/metrics`: #### Queue Metrics | Metric | Type | Description | |--------|------|-------------| | `asynq_queue_size` | Gauge | Number of tasks in each state (pending, active, retry, archived) | | `asynq_queue_latency_seconds` | Gauge | Time since oldest pending task was enqueued | | `asynq_queue_memory_usage_approx_bytes` | Gauge | Approximate memory usage per queue | #### Task Metrics | Metric | Type | Description | |--------|------|-------------| | `asynq_tasks_processed_total` | Counter | Total tasks processed (labeled by queue) | | `asynq_tasks_failed_total` | Counter | Total tasks failed (labeled by queue) | ### Example Queries **Tasks processed per second:** ```promql rate(asynq_tasks_processed_total[5m]) ``` **Tasks failed per second:** ```promql rate(asynq_tasks_failed_total[5m]) ``` **Error rate percentage:** ```promql rate(asynq_tasks_failed_total[5m]) / rate(asynq_tasks_processed_total[5m]) * 100 ``` **Queue depth (pending tasks):** ```promql asynq_queue_size{state="pending"} ``` **Queue latency:** ```promql asynq_queue_latency_seconds ``` ### Grafana Dashboard Import the asynq Grafana dashboard for a pre-built visualization: 1. In Grafana, go to **Dashboards > Import** 2. Enter dashboard ID: `18863` (asynq dashboard) 3. Select your Prometheus data source 4. Click **Import** Or create custom panels using the queries above. ### Docker Compose Example Here's a complete setup with Prometheus and Grafana: ```yaml version: '3.8' services: runqy: image: ghcr.io/publikey/runqy:latest ports: - "3000:3000" environment: - REDIS_HOST=redis - REDIS_PASSWORD= - RUNQY_API_KEY=your-api-key - PROMETHEUS_ADDRESS=http://prometheus:9090 # Optional depends_on: - redis redis: image: redis:7-alpine ports: - "6379:6379" prometheus: image: prom/prometheus:latest ports: - "9090:9090" volumes: - ./prometheus.yml:/etc/prometheus/prometheus.yml command: - '--config.file=/etc/prometheus/prometheus.yml' - '--storage.tsdb.retention.time=30d' grafana: image: grafana/grafana:latest ports: - "3001:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin volumes: - grafana-data:/var/lib/grafana volumes: grafana-data: ``` Create `prometheus.yml`: ```yaml global: scrape_interval: 15s scrape_configs: - job_name: 'runqy' static_configs: - targets: ['runqy:3000'] ``` ## Alerting ### Prometheus AlertManager Example alert rules (`alerts.yml`): ```yaml groups: - name: runqy rules: # High queue depth - alert: HighQueueDepth expr: asynq_queue_size{state="pending"} > 1000 for: 5m labels: severity: warning annotations: summary: "High queue depth on {{ $labels.queue }}" description: "Queue {{ $labels.queue }} has {{ $value }} pending tasks" # High error rate - alert: HighErrorRate expr: > rate(asynq_tasks_failed_total[5m]) / rate(asynq_tasks_processed_total[5m]) > 0.1 for: 5m labels: severity: critical annotations: summary: "High error rate on {{ $labels.queue }}" description: "Queue {{ $labels.queue }} has {{ $value | humanizePercentage }} error rate" # Queue latency - alert: HighQueueLatency expr: asynq_queue_latency_seconds > 300 for: 5m labels: severity: warning annotations: summary: "High latency on {{ $labels.queue }}" description: "Queue {{ $labels.queue }} has {{ $value | humanizeDuration }} latency" # No tasks processed (potential worker issue) - alert: NoTasksProcessed expr: > increase(asynq_tasks_processed_total[10m]) == 0 and asynq_queue_size{state="pending"} > 0 for: 10m labels: severity: critical annotations: summary: "No tasks processed on {{ $labels.queue }}" description: "Queue {{ $labels.queue }} has pending tasks but none processed in 10 minutes" ``` ### Key Metrics to Monitor | Metric | Alert Threshold | Description | |--------|----------------|-------------| | Queue depth | > 1000 pending | Tasks accumulating faster than processing | | Error rate | > 10% | High failure rate indicates issues | | Queue latency | > 5 minutes | Tasks waiting too long | | Worker health | `healthy: false` | Worker process crashed | ## Best Practices 1. **Start with the dashboard** - The built-in dashboard covers most monitoring needs 2. **Add Prometheus for scale** - When you need historical data beyond 90 days or sub-second metrics 3. **Set up alerts early** - Don't wait for production issues to configure alerting 4. **Monitor worker health** - A crashed worker won't process tasks and won't recover automatically 5. **Track error rates** - Sudden spikes often indicate code bugs or upstream service issues