Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

SDK Examples

Working code for every SDK showing the core enqueue -> consume -> ack flow.

Setup

All examples assume a running broker with a queue:

fila-server &
fila queue create demo

Rust

use std::collections::HashMap;
use std::time::Duration;

use fila_sdk::FilaClient;
use tokio_stream::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = FilaClient::connect("http://localhost:5555").await?;

    // Enqueue
    let mut headers = HashMap::new();
    headers.insert("tenant".to_string(), "acme".to_string());
    let id = client.enqueue("demo", headers, b"hello".to_vec()).await?;
    println!("enqueued: {id}");

    // Consume
    let mut stream = client.consume("demo").await?;
    let msg = tokio::time::timeout(Duration::from_secs(5), stream.next())
        .await??
        .expect("stream error");

    println!("received: {} ({})", msg.id, String::from_utf8_lossy(&msg.payload));

    // Ack
    client.ack("demo", &msg.id).await?;
    println!("acked: {}", msg.id);

    Ok(())
}

Add to Cargo.toml:

[dependencies]
fila-sdk = "0.1"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"

Rust with API key authentication

#![allow(unused)]
fn main() {
let opts = ConnectOptions::new("127.0.0.1:5555")
    .with_api_key("my-secret-key");
let client = FilaClient::connect_with_options(opts).await?;
}

Go

package main

import (
	"context"
	"fmt"
	"log"

	fila "github.com/faiscadev/fila-go"
)

func main() {
	ctx := context.Background()
	client, err := fila.Connect("localhost:5555")
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	// Enqueue
	id, err := client.Enqueue(ctx, "demo", map[string]string{
		"tenant": "acme",
	}, []byte("hello"))
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println("enqueued:", id)

	// Consume
	stream, err := client.Consume(ctx, "demo")
	if err != nil {
		log.Fatal(err)
	}

	msg, err := stream.Next()
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("received: %s (%s)\n", msg.ID, string(msg.Payload))

	// Ack
	if err := client.Ack(ctx, "demo", msg.ID); err != nil {
		log.Fatal(err)
	}
	fmt.Println("acked:", msg.ID)
}

Python

from fila import FilaClient

client = FilaClient("localhost:5555")

# Enqueue
msg_id = client.enqueue("demo", {"tenant": "acme"}, b"hello")
print(f"enqueued: {msg_id}")

# Consume
stream = client.consume("demo")
msg = next(stream)
print(f"received: {msg.id} ({msg.payload.decode()})")

# Ack
client.ack("demo", msg.id)
print(f"acked: {msg.id}")

JavaScript / Node.js

const { FilaClient } = require('fila-client');

async function main() {
  const client = await FilaClient.connect('localhost:5555');

  // Enqueue
  const id = await client.enqueue('demo', { tenant: 'acme' }, Buffer.from('hello'));
  console.log('enqueued:', id);

  // Consume
  const stream = client.consume('demo');
  for await (const msg of stream) {
    console.log(`received: ${msg.id} (${msg.payload.toString()})`);

    // Ack
    await client.ack('demo', msg.id);
    console.log('acked:', msg.id);
    break; // just one message for the demo
  }
}

main().catch(console.error);

Ruby

require 'fila'

client = Fila::Client.new('localhost:5555')

# Enqueue
id = client.enqueue('demo', { 'tenant' => 'acme' }, 'hello')
puts "enqueued: #{id}"

# Consume
client.consume('demo') do |msg|
  puts "received: #{msg.id} (#{msg.payload})"

  # Ack
  client.ack('demo', msg.id)
  puts "acked: #{msg.id}"
  break
end

Java

import dev.fila.client.FilaClient;
import dev.fila.client.Message;

public class Demo {
    public static void main(String[] args) throws Exception {
        FilaClient client = FilaClient.connect("localhost:5555");

        // Enqueue
        String id = client.enqueue("demo",
            java.util.Map.of("tenant", "acme"),
            "hello".getBytes());
        System.out.println("enqueued: " + id);

        // Consume
        var stream = client.consume("demo");
        Message msg = stream.next();
        System.out.printf("received: %s (%s)%n", msg.getId(),
            new String(msg.getPayload()));

        // Ack
        client.ack("demo", msg.getId());
        System.out.println("acked: " + msg.getId());

        client.close();
    }
}