Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Integration Patterns

Common messaging patterns with Fila.

Producer / Consumer

The most basic pattern: one service produces messages, another consumes them.

┌──────────┐    enqueue    ┌──────┐    consume    ┌──────────┐
│ Producer │ ────────────► │ Fila │ ────────────► │ Consumer │
└──────────┘               └──────┘               └──────────┘

Use when: You want to decouple a producer from a consumer — the producer doesn’t need to wait for processing to complete.

# producer.py
import asyncio
from fila import FilaClient

async def produce():
    client = await FilaClient.connect("localhost:5555")
    for i in range(100):
        await client.enqueue("orders",
            headers={"tenant": "acme"},
            payload=f"order-{i}".encode())

asyncio.run(produce())
# consumer.py
import asyncio
from fila import FilaClient

async def consume():
    client = await FilaClient.connect("localhost:5555")
    async for msg in client.consume("orders"):
        print(f"processing: {msg.payload}")
        # ... do work ...
        await client.ack("orders", msg.id)

asyncio.run(consume())

Fan-out

Multiple consumers process different types of work from a shared queue. Fila’s fairness keys ensure each workload type gets its fair share.

                              ┌──────────────┐
                         ┌──► │ Consumer (A)  │
┌──────────┐  enqueue   │    └──────────────┘
│ Producer │ ──────► ┌──┴──┐
└──────────┘         │ Fila │  fairness_key routing
                     └──┬──┘
                         └──► ┌──────────────┐
                              │ Consumer (B)  │
                              └──────────────┘

Use when: Different message types need fair scheduling. Without fairness keys, a burst of type-A messages would starve type-B consumers.

# Producer assigns fairness keys via Lua on_enqueue
# Queue created with:
# fila queue create work --on-enqueue '
#   function on_enqueue(msg)
#     return { fairness_key = msg.headers["type"] or "default" }
#   end
# '

# producer.py
async def produce():
    client = await FilaClient.connect("localhost:5555")
    # Type A messages (high volume)
    for i in range(1000):
        await client.enqueue("work",
            headers={"type": "email"}, payload=f"email-{i}".encode())
    # Type B messages (low volume but equally important)
    for i in range(10):
        await client.enqueue("work",
            headers={"type": "sms"}, payload=f"sms-{i}".encode())
    # Fila's DRR scheduler ensures SMS messages aren't starved by emails

Request-Reply

Implement synchronous-style request/reply over async messaging using a correlation ID and a reply queue.

┌─────────┐  enqueue(request)  ┌──────┐  consume   ┌─────────┐
│ Client  │ ─────────────────► │ Fila │ ────────► │ Service │
│         │ ◄───────────────── │      │ ◄──────── │         │
└─────────┘  consume(reply)    └──────┘  enqueue   └─────────┘
                                         (reply)

Use when: A service needs a response but you still want the decoupling and reliability benefits of message queues.

// client.go — sends request, waits for reply
package main

import (
    "context"
    "fmt"
    "log"

    "github.com/google/uuid"
    fila "github.com/faiscadev/fila-go"
)

func main() {
    client, _ := fila.Connect("localhost:5555")
    ctx := context.Background()

    correlationID := uuid.New().String()
    replyQueue := "replies-" + correlationID

    // Create a temporary reply queue
    client.CreateQueue(ctx, replyQueue)
    defer client.DeleteQueue(ctx, replyQueue)

    // Send request with correlation ID
    client.Enqueue(ctx, "requests",
        map[string]string{
            "correlation_id": correlationID,
            "reply_to":       replyQueue,
        },
        []byte("what is 2+2?"))

    // Wait for reply
    stream, _ := client.Consume(ctx, replyQueue)
    reply, _ := stream.Recv()
    fmt.Printf("reply: %s\n", reply.Payload)
    client.Ack(ctx, replyQueue, reply.ID)
}
// service.go — processes requests, sends replies
package main

import (
    "context"
    "fmt"
    "log"

    fila "github.com/faiscadev/fila-go"
)

func main() {
    client, _ := fila.Connect("localhost:5555")
    ctx := context.Background()

    stream, _ := client.Consume(ctx, "requests")
    for {
        msg, err := stream.Recv()
        if err != nil {
            log.Fatal(err)
        }

        // Process request
        answer := fmt.Sprintf("answer: 4 (to: %s)", string(msg.Payload))

        // Send reply to the caller's reply queue
        replyTo := msg.Headers["reply_to"]
        client.Enqueue(ctx, replyTo,
            map[string]string{
                "correlation_id": msg.Headers["correlation_id"],
            },
            []byte(answer))

        client.Ack(ctx, "requests", msg.ID)
    }
}