Broadway - jak przetwarzać miliony wiadomości bez utraty jednej i bez pisania infrastruktury
Kontrahent wysyła CSV z 2 milionami wierszy. Codziennie. O 6:00 rano. System musi zaimportować dane do PostgreSQL, przeliczyć stany magazynowe, wysłać powiadomienia o zmianach cen i wygenerować raport różnic. Wszystko przed 8:00, zanim zespół zacznie pracę.
Pierwsze podejście: Task.async w pętli. 2 miliony tasków. BEAM zjada 16 GB RAM-u. Baza danych dostaje 2 miliony INSERT-ów po jednym. Connection pool się wyczerpuje. Import trwa 4 godziny. Porażka.
Drugie podejście: GenServer z kolejką. Ręczny batching, ręczny rate limiting, ręczne retry, ręczne monitorowanie. 800 linii kodu infrastrukturalnego. Działa, ale każda zmiana wymaga chirurgicznej precyzji. Jeden bug w logice retry = duplikaty w bazie.
Trzecie podejście: Broadway. 60 linii kodu. Automatyczny backpressure, batching, rate limiting, fault tolerance, telemetria. Import trwa 12 minut. Zero duplikatów, zero utraconych wierszy, zero 16 GB RAM-u.
Czym jest Broadway
Broadway to framework do budowy wieloetapowych pipeline'ów przetwarzania danych. Zbudowany na GenStage (niskopoziomowa biblioteka backpressure od Dashbit), dodaje wszystko, czego potrzebujesz w produkcji: batching, acknowledgment, rate limiting, graceful shutdown i telemetrię.
Architektura pipeline'u
Producent(y) → Procesor(y) → Batcher(y) → BatchProcesor(y)| Etap | Rola | Callback |
|---|---|---|
| Producer | Pobiera dane z zewnętrznego źródła (SQS, Kafka, RabbitMQ, CSV) | GenStage handle_demand/2 |
| Processor | Przetwarza pojedyncze wiadomości (walidacja, transformacja, routing) | handle_message/3 |
| Batcher | Grupuje wiadomości po kluczu, rozmiarze lub timeout | Automatyczny |
| BatchProcessor | Wykonuje operacje hurtowe (bulk insert, upload S3) | handle_batch/4 |
Kluczowa cecha: demand-driven flow. Procesory proszą producenta o dane tylko wtedy, gdy mają wolne moce. Producent pobiera z kolejki tylko tyle, ile downstream jest w stanie przetworzyć. Automatycznie. Bez konfiguracji.
Jeśli handle_batch/4 jest wolny (baza jest obciążona), procesory zwalniają, producent przestaje pobierać z SQS. Kolejka rośnie w SQS, nie w pamięci BEAM. Gdy baza się odblokuje, pipeline automatycznie przyspiesza. Zero OOM, zero ręcznego throttlingu.
Pełny pipeline: import zamówień z RabbitMQ
defmodule MyApp.OrderPipeline do
use Broadway
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {BroadwayRabbitMQ.Producer, [
queue: "orders",
connection: [host: "localhost"],
qos: [prefetch_count: 100],
on_failure: :reject_and_requeue_once
]},
concurrency: 2
],
processors: [
default: [
concurrency: System.schedulers_online() * 2,
max_demand: 10
]
],
batchers: [
db: [
concurrency: 5,
batch_size: 100,
batch_timeout: 2_000
],
notify: [
concurrency: 2,
batch_size: 50,
batch_timeout: 1_000
]
]
)
end
@impl true
def handle_message(_processor, message, _context) do
case Jason.decode(message.data) do
{:ok, %{"type" => "order"} = data} ->
message
|> Broadway.Message.update_data(fn _ -> data end)
|> Broadway.Message.put_batcher(:db)
{:ok, %{"type" => "notification"} = data} ->
message
|> Broadway.Message.update_data(fn _ -> data end)
|> Broadway.Message.put_batcher(:notify)
{:error, _} ->
Broadway.Message.failed(message, :invalid_json)
end
end
@impl true
def handle_batch(:db, messages, _batch_info, _context) do
rows = Enum.map(messages, fn msg ->
%{
external_id: msg.data["id"],
total: msg.data["total"],
customer_id: msg.data["customer_id"],
inserted_at: DateTime.utc_now(),
updated_at: DateTime.utc_now()
}
end)
MyApp.Repo.insert_all("orders", rows,
on_conflict: :replace_all,
conflict_target: :external_id
)
messages
end
@impl true
def handle_batch(:notify, messages, _batch_info, _context) do
Enum.each(messages, fn msg ->
Phoenix.PubSub.broadcast(MyApp.PubSub, "orders", {:new_order, msg.data})
end)
messages
end
@impl true
def handle_failed(messages, _context) do
Enum.each(messages, fn msg ->
require Logger
Logger.error("Order processing failed: #{inspect(msg.status)}")
end)
messages
end
endCo się dzieje:
- Producer pobiera wiadomości z RabbitMQ (max 100 naraz, dzięki
prefetch_count) - Processor dekoduje JSON, routuje zamówienia do batchera
:db, notyfikacje do:notify - Batcher
:dbzbiera 100 wiadomości (lub czeka max 2 sekundy) i oddaje do BatchProcessora - BatchProcessor
:dbrobi jedenINSERT ALLzamiast 100 osobnych insertów - Batcher
:notifyzbiera 50 wiadomości i broadcastuje przez PubSub - RabbitMQ dostaje ACK dopiero po pomyślnym przetworzeniu. Jeśli coś się nie uda, wiadomość wraca do kolejki (raz)
100 wiadomości = 1 zapytanie SQL. Nie 100 zapytań. Batching zmienia O(n) operacji bazodanowych w O(1).
Źródła danych - producenci
Broadway ma oficjalne producenty dla najpopularniejszych brokerów:
Amazon SQS
# mix.exs: {:broadway_sqs, "~> 0.7"}
producer: [
module: {BroadwaySQS.Producer, [
queue_url: "https://sqs.eu-central-1.amazonaws.com/123/orders",
config: [region: "eu-central-1"]
]},
concurrency: 10
]Apache Kafka
# mix.exs: {:broadway_kafka, "~> 0.4"}
producer: [
module: {BroadwayKafka.Producer, [
hosts: [localhost: 9092],
group_id: "my-consumer-group",
topics: ["events"]
]},
concurrency: 1 # Kafka zarządza partycjami wewnętrznie
]RabbitMQ
# mix.exs: {:broadway_rabbitmq, "~> 0.8"}
producer: [
module: {BroadwayRabbitMQ.Producer, [
queue: "my_queue",
connection: [host: "localhost"],
qos: [prefetch_count: 50],
on_failure: :reject_and_requeue_once
]},
concurrency: 5
]Własny producent - polling z bazy danych
Nie masz kolejki? Nie potrzebujesz. Broadway obsługuje dowolne źródło danych:
defmodule MyApp.DbPoller do
use GenStage
@poll_interval 5_000
@impl true
def init(_opts) do
schedule_poll()
{:producer, %{demand: 0, buffer: []}}
end
@impl true
def handle_demand(demand, state) do
{to_send, remaining} = Enum.split(state.buffer, demand)
{:noreply, to_send, %{state | buffer: remaining, demand: state.demand + demand - length(to_send)}}
end
@impl true
def handle_info(:poll, state) do
schedule_poll()
rows = MyApp.Repo.all(
from r in "pending_imports",
where: r.status == "pending",
limit: ^max(state.demand, 100),
order_by: [asc: r.inserted_at]
)
messages = Enum.map(rows, fn row ->
%Broadway.Message{
data: row,
acknowledger: {__MODULE__, :ack_ref, row.id}
}
end)
{to_send, to_buffer} = Enum.split(messages, state.demand)
{:noreply, to_send, %{state | buffer: to_buffer, demand: max(state.demand - length(to_send), 0)}}
end
def ack(:ack_ref, successful, failed) do
success_ids = Enum.map(successful, fn %{acknowledger: {_, _, id}} -> id end)
failed_ids = Enum.map(failed, fn %{acknowledger: {_, _, id}} -> id end)
if success_ids != [] do
MyApp.Repo.update_all(
from(r in "pending_imports", where: r.id in ^success_ids),
set: [status: "processed"]
)
end
if failed_ids != [] do
MyApp.Repo.update_all(
from(r in "pending_imports", where: r.id in ^failed_ids),
set: [status: "failed"]
)
end
end
defp schedule_poll, do: Process.send_after(self(), :poll, @poll_interval)
endZero Kafki, zero SQS, zero RabbitMQ. Producent polluje tabelę PostgreSQL co 5 sekund, Broadway przetwarza wiersze z pełnym backpressure, batchingiem i acknowledgementa. Przetworzone wiersze oznaczane jako "processed", błędne jako "failed".
Backpressure - dlaczego to zmienia wszystko
Bez backpressure:
Źródło: 10 000 msg/s → Twój system: max 1 000 msg/s
Efekt: 9 000 msg/s ląduje w pamięci
Za 10 minut: 5.4M wiadomości w RAM
Za 15 minut: OOM killZ Broadway:
Źródło: 10 000 msg/s → Broadway: demand = 1 000/s
Efekt: źródło dostarcza tylko 1 000 msg/s
Reszta czeka w SQS/Kafka/RabbitMQ
RAM: stały, niezależnie od obciążeniaBroadway nie "buforuje" nadmiarowych wiadomości. On ich nie pobiera. Procesory sygnalizują demand do producenta, producent pobiera z kolejki dokładnie tyle, ile downstream potrzebuje. Nadmiar zostaje w kolejce - która jest zaprojektowana do przechowywania milionów wiadomości (SQS, Kafka) i nie zjada pamięci Twojego serwera.
Rate limiting
Broadway ma wbudowany rate limiting na poziomie producenta:
producer: [
module: {BroadwaySQS.Producer, [queue_url: "..."]},
concurrency: 5,
rate_limiting: [
allowed_messages: 2000,
interval: 1_000
]
]Niezależnie od tego ile producentów działa, maksymalnie 2000 wiadomości na sekundę wpłynie do pipeline'u. Przydatne, gdy downstream (baza danych, API) ma znane limity przepustowości.
Zmiana w runtime - bez restartu:
# Godziny szczytu - zwolnij
Broadway.update_rate_limiting(MyApp.OrderPipeline, allowed_messages: 500)
# Noc - przyspiesz
Broadway.update_rate_limiting(MyApp.OrderPipeline, allowed_messages: 5000)Batching - klucz do wydajności
Jedna z najważniejszych funkcji Broadway. Zamiast 1000 pojedynczych INSERT-ów, robisz 10 batchów po 100.
Routing do batcherów
@impl true
def handle_message(_, message, _) do
case message.data["type"] do
"payment" ->
Broadway.Message.put_batcher(message, :payments)
"log" ->
Broadway.Message.put_batcher(message, :logs)
_ ->
message # domyślny batcher
end
endSub-batching po kluczu
put_batch_key/2 tworzy oddzielne batche w ramach jednego batchera:
@impl true
def handle_message(_, message, _) do
message
|> Broadway.Message.put_batcher(:db)
|> Broadway.Message.put_batch_key(message.data["tenant_id"])
end
@impl true
def handle_batch(:db, messages, batch_info, _context) do
# Wszystkie wiadomości w tym batchu należą do tego samego tenanta
tenant_id = batch_info.batch_key
rows = Enum.map(messages, &transform_row/1)
MyApp.Repo.insert_all("events", rows, prefix: "tenant_#{tenant_id}")
messages
endDynamiczny rozmiar batcha
Od Broadway v1.1 - batch na podstawie rozmiaru danych, nie liczby wiadomości:
batchers: [
s3: [
batch_size: {0, fn message, acc ->
new_size = acc + byte_size(message.data)
if new_size >= 5_000_000, do: {:emit, 0}, else: {:cont, new_size}
end},
batch_timeout: 10_000
]
]Batch emitowany, gdy osiągnie 5 MB danych - niezależnie czy to 10 czy 10 000 wiadomości. Idealny dla S3 put_object, gdzie rozmiar payloadu ma znaczenie.
Obsługa błędów
Jawne oznaczanie wiadomości jako failed
@impl true
def handle_message(_, message, _) do
case validate(message.data) do
:ok -> message
{:error, reason} -> Broadway.Message.failed(message, reason)
end
endDead Letter Queue
Broadway nie ma wbudowanej DLQ, ale wzorzec jest prosty:
@impl true
def handle_failed(messages, _context) do
rows = Enum.map(messages, fn msg ->
%{
pipeline: "order_pipeline",
original_data: :erlang.term_to_binary(msg.data),
failure_reason: inspect(msg.status),
inserted_at: DateTime.utc_now()
}
end)
MyApp.Repo.insert_all("dead_letter_queue", rows)
messages
endWiadomości, które nie dały się przetworzyć, lądują w tabeli PostgreSQL. Oban worker może je potem ponownie przetworzyć:
defmodule MyApp.Workers.RetryDeadLetters do
use Oban.Worker, queue: :retries
@impl true
def perform(_job) do
MyApp.Repo.all(
from d in "dead_letter_queue",
where: d.retry_count < 3,
limit: 100
)
|> Enum.each(fn record ->
data = :erlang.binary_to_term(record.original_data)
MyApp.WebhookProducer.push_message(data)
end)
:ok
end
endBroadway vs Oban - kiedy co
Broadway i Oban to komplementarne narzędzia, nie konkurencja:
| Wymiar | Broadway | Oban |
|---|---|---|
| Cel | Ciągłe przetwarzanie strumieni | Pojedyncze zadania w tle |
| Źródło | Zewnętrzne kolejki (SQS, Kafka, RabbitMQ) | PostgreSQL |
| Persystencja | W brokerze | W bazie danych |
| Throughput | Miliony msg/s | Tysiące jobs/s |
| Batching | Natywny, first-class | Brak |
| Backpressure | Automatyczny | Brak (limity per-queue) |
| Retry | Zależy od producenta | Wbudowany z backoff |
| Scheduling | Brak | Cron, unikalne joby |
| UI | Brak (telemetria) | Oban Web |
Broadway: kontrahent wysyła 100 000 webhooków dziennie → przetwórz je strumieniowo z batchingiem
Oban: po przetworzeniu zamówienia → wyślij email, wygeneruj PDF, zaktualizuj raport
Razem:
@impl true
def handle_batch(:db, messages, _info, _context) do
# Broadway: hurtowy insert
rows = Enum.map(messages, &to_row/1)
MyApp.Repo.insert_all("orders", rows)
# Oban: follow-up per zamówienie
Enum.each(messages, fn msg ->
%{order_id: msg.data["id"]}
|> MyApp.Workers.SendConfirmation.new()
|> Oban.insert()
end)
messages
endBroadway przetwarza strumień zamówień z Kafki. Oban wysyła emaile potwierdzające. Każde narzędzie robi to, do czego zostało zaprojektowane.
Monitoring - telemetria
Broadway emituje zdarzenia :telemetry dla każdego etapu pipeline'u:
defmodule MyApp.BroadwayMetrics do
def setup do
:telemetry.attach_many("broadway", [
[:broadway, :processor, :message, :stop],
[:broadway, :processor, :message, :exception],
[:broadway, :batch_processor, :stop]
], &handle_event/4, nil)
end
def handle_event([:broadway, :processor, :message, :stop], %{duration: d}, meta, _) do
ms = System.convert_time_unit(d, :native, :millisecond)
if ms > 1000 do
require Logger
Logger.warning("Slow message: #{ms}ms in #{meta.topology_name}")
end
end
def handle_event([:broadway, :processor, :message, :exception], _, meta, _) do
require Logger
Logger.error("Broadway exception: #{inspect(meta.reason)}")
end
def handle_event([:broadway, :batch_processor, :stop], %{duration: d}, meta, _) do
:telemetry.execute(
[:my_app, :broadway, :batch],
%{
duration_ms: System.convert_time_unit(d, :native, :millisecond),
success: length(meta.successful_messages),
failed: length(meta.failed_messages)
},
%{batcher: meta.batch_info.batcher}
)
end
endPodłącz do Prometheus, StatsD, LiveDashboard - i masz pełną widoczność: ile wiadomości przetwarza pipeline, jak szybko, ile się nie udaje, gdzie są wąskie gardła.
Testowanie
Broadway ma wbudowane narzędzia testowe:
defmodule MyApp.OrderPipelineTest do
use ExUnit.Case
test "valid orders are inserted" do
{:ok, _} = MyApp.OrderPipeline.start_link(
name: :test_pipeline,
producer_module: {Broadway.DummyProducer, []}
)
ref = Broadway.test_message(:test_pipeline,
~s({"type": "order", "id": "123", "total": 99.90, "customer_id": "c1"})
)
assert_receive {:ack, ^ref, [_successful], []}, 5_000
end
test "invalid JSON is rejected" do
{:ok, _} = MyApp.OrderPipeline.start_link(
name: :test_pipeline_2,
producer_module: {Broadway.DummyProducer, []}
)
ref = Broadway.test_message(:test_pipeline_2, "not json")
assert_receive {:ack, ^ref, [], [_failed]}, 5_000
end
endBroadway.test_message/3 wstrzykuje wiadomość do pipeline'u bez prawdziwego brokera. DummyProducer zastępuje SQS/Kafka/RabbitMQ. Test jest szybki, deterministyczny i nie wymaga żadnej infrastruktury.
Tuning - ile procesów, jakie batche
producer: [
concurrency: 5 # Producenci: dopasuj do źródła
# SQS: 5-10, Kafka: 1 (partycje wewnętrzne)
],
processors: [
default: [
concurrency: 50, # Procesory: schedulers * 2 dla I/O-bound
max_demand: 10 # Ile wiadomości naraz z producenta
]
],
batchers: [
db: [
concurrency: 5, # BatchProcesory: dopasuj do bazy
batch_size: 100, # Wiadomości per batch
batch_timeout: 2_000 # Max czas oczekiwania na pełny batch
]
]Reguły:
- Procesory:
System.schedulers_online() * 2dla I/O-bound,schedulers_online()dla CPU-bound - batch_size: 100-500 dla bazy danych, 1000+ dla S3/plików
- batch_timeout: 1-5 sekund. Za krótki = małe batche. Za długi = latencja
- Partycjonowanie: jeśli kolejność ma znaczenie, użyj
partition_by- wszystkie wiadomości z tym samym kluczem trafią do tego samego procesora
Kiedy Broadway, kiedy nie
Broadway: ciągły strumień danych z zewnętrznego źródła, potrzebujesz backpressure, batchingu i fault tolerance
Nie-Broadway: jednorazowy import pliku (użyj Stream + Repo.insert_all), pojedyncze zadanie w tle (użyj Oban), prosty pub/sub (użyj Phoenix.PubSub)
Broadway rozwiązuje jeden problem i rozwiązuje go doskonale: przetwarzanie ciągłych strumieni danych z gwarancją, że żadna wiadomość nie zostanie utracona, żadna nie zostanie przetworzona dwa razy, i żaden spike ruchu nie położy systemu.
Masz kontrahenta, który zalewa Cię danymi, a Twój system nie nadąża? Porozmawiajmy - pokażemy, jak Broadway przetworzy Twój strumień danych bez utraty jednej wiadomości.