Skip to content

Result Delivery Patterns

By default, runqy workers do not store task results in Redis. This guide explains why and how to handle result delivery in your tasks.

Default Behavior

When a task completes, the worker only tracks success or failure status in Redis. The full result payload is not stored by default (redis_storage: false).

Why? Task results can be large—videos, images, ML model outputs, generated files. Storing these in Redis would:

  • Bloat memory usage on your Redis instance
  • Require manual cleanup of stale results
  • Create bottlenecks for large payloads

Instead, your Python task code is responsible for delivering results to clients.

Your Task Handles Delivery

The recommended pattern is to include delivery instructions in your task payload. Your task code then sends results directly to their destination.

Common patterns:

Pattern Best For
Webhook callback Real-time notifications, async workflows
Blob storage (S3/GCS) Large files, media, ML outputs
Database write Structured data, audit trails
Message queue Event-driven architectures

Pattern 1: Webhook Callback

Pass a webhook_url in your task payload. When the task completes, POST the result to that URL.

from runqy_python import task, run
import requests
import logging

@task
def handle(payload: dict, ctx: dict) -> dict:
    webhook_url = payload.get("webhook_url")

    # Do the work
    result = process_data(payload["input"])

    # Deliver via webhook
    if webhook_url:
        try:
            response = requests.post(
                webhook_url,
                json={
                    "task_id": payload.get("task_id"),
                    "status": "completed",
                    "result": result
                },
                timeout=30
            )
            response.raise_for_status()
        except requests.RequestException as e:
            logging.error(f"Webhook delivery failed: {e}")
            # Option 1: Raise to trigger retry
            # raise
            # Option 2: Handle gracefully, log for investigation

    # Return value not stored when redis_storage=false (default)
    # Just return to signal success
    return

if __name__ == "__main__":
    run()

Return Values When redis_storage is Disabled

When redis_storage: false (the default), the return value is not stored anywhere. The worker only needs to know success vs failure, so return or return None is sufficient. The SDK sends {"result": null, "error": null} to signal success.

Enqueueing with webhook:

task_id = enqueue_task("inference.default", {
    "input": "process this",
    "webhook_url": "https://api.example.com/webhooks/task-complete"
})

Webhook Best Practices

  • Retry failed deliveries: Implement exponential backoff for transient failures
  • Verify signatures: Sign webhook payloads so receivers can verify authenticity
  • Handle timeouts: Set reasonable timeouts (30s) to avoid blocking tasks
  • Log failures: Track failed deliveries for investigation

Pattern 2: Blob Storage (S3/GCS)

For large results like images or videos, upload to cloud storage and optionally notify via webhook with the URL.

from runqy_python import task, load, run
import boto3
import requests
import uuid

@load
def setup():
    """Initialize S3 client once."""
    s3 = boto3.client('s3')
    return {"s3": s3}

@task
def handle(payload: dict, ctx: dict) -> dict:
    s3 = ctx["s3"]
    bucket = "my-results-bucket"

    # Generate result (e.g., process image, run ML inference)
    output_data = generate_output(payload["input"])

    # Upload to S3
    key = f"results/{uuid.uuid4()}.png"
    s3.put_object(
        Bucket=bucket,
        Key=key,
        Body=output_data,
        ContentType="image/png"
    )

    result_url = f"https://{bucket}.s3.amazonaws.com/{key}"

    # Notify via webhook if provided
    webhook_url = payload.get("webhook_url")
    if webhook_url:
        requests.post(webhook_url, json={
            "task_id": payload.get("task_id"),
            "status": "completed",
            "result_url": result_url
        }, timeout=30)

    return

if __name__ == "__main__":
    run()

Presigned URLs

For private buckets, generate presigned URLs with expiration:

presigned_url = s3.generate_presigned_url(
    'get_object',
    Params={'Bucket': bucket, 'Key': key},
    ExpiresIn=3600  # 1 hour
)

Pattern 3: Database Write

Write results directly to your application database:

from runqy_python import task, load, run
import psycopg2

@load
def setup():
    conn = psycopg2.connect(
        host="db.example.com",
        database="myapp",
        user="worker",
        password="secret"
    )
    return {"db": conn}

@task
def handle(payload: dict, ctx: dict) -> dict:
    task_id = payload.get("task_id")
    result = process_data(payload["input"])

    cursor = ctx["db"].cursor()
    cursor.execute(
        "UPDATE tasks SET status = %s, result = %s, completed_at = NOW() WHERE id = %s",
        ("completed", json.dumps(result), task_id)
    )
    ctx["db"].commit()
    cursor.close()

    return

if __name__ == "__main__":
    run()

Pattern 4: Message Queue

Publish results to a message queue for downstream processing:

from runqy_python import task, load, run
import pika
import json

@load
def setup():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('rabbitmq.example.com')
    )
    channel = connection.channel()
    channel.queue_declare(queue='task_results', durable=True)
    return {"channel": channel}

@task
def handle(payload: dict, ctx: dict) -> dict:
    result = process_data(payload["input"])

    ctx["channel"].basic_publish(
        exchange='',
        routing_key='task_results',
        body=json.dumps({
            "task_id": payload.get("task_id"),
            "result": result
        }),
        properties=pika.BasicProperties(delivery_mode=2)  # persistent
    )

    return

if __name__ == "__main__":
    run()

When to Enable redis_storage

Enable redis_storage: true in your queue deployment config (server-side) when:

  • Results are small (< 1MB)
  • Clients need to poll synchronously for results
  • Results are short-lived and you set appropriate TTLs
  • You're building simple integrations or prototypes
# Queue deployment YAML (server-side, e.g. runqy/deployment/simple.yml)
queues:
  simple:
    deployment:
      redis_storage: true

The server transmits this setting to the worker during bootstrap registration.

Memory Considerations

With redis_storage: true, results accumulate in Redis until they expire or are manually deleted. Monitor your Redis memory usage and set appropriate TTLs for result keys.

Combining Patterns

You can combine patterns. For example, upload to S3 for large files but also write metadata to your database:

@task
def handle(payload: dict, ctx: dict) -> dict:
    # Generate large output
    video_data = generate_video(payload["input"])

    # Upload to S3
    s3_url = upload_to_s3(video_data)

    # Write metadata to database
    save_to_db(payload["task_id"], {
        "status": "completed",
        "result_url": s3_url,
        "duration_seconds": len(video_data) / 1000
    })

    # Notify client
    notify_webhook(payload["webhook_url"], {
        "task_id": payload["task_id"],
        "result_url": s3_url
    })

    return