Broadway - jak przetwarzać miliony wiadomości bez utraty jednej i bez pisania infrastruktury

Kontrahent wysyła CSV z 2 milionami wierszy. Codziennie. O 6:00 rano. System musi zaimportować dane do PostgreSQL, przeliczyć stany magazynowe, wysłać powiadomienia o zmianach cen i wygenerować raport różnic. Wszystko przed 8:00, zanim zespół zacznie pracę.

Pierwsze podejście: Task.async w pętli. 2 miliony tasków. BEAM zjada 16 GB RAM-u. Baza danych dostaje 2 miliony INSERT-ów po jednym. Connection pool się wyczerpuje. Import trwa 4 godziny. Porażka.

Drugie podejście: GenServer z kolejką. Ręczny batching, ręczny rate limiting, ręczne retry, ręczne monitorowanie. 800 linii kodu infrastrukturalnego. Działa, ale każda zmiana wymaga chirurgicznej precyzji. Jeden bug w logice retry = duplikaty w bazie.

Trzecie podejście: Broadway. 60 linii kodu. Automatyczny backpressure, batching, rate limiting, fault tolerance, telemetria. Import trwa 12 minut. Zero duplikatów, zero utraconych wierszy, zero 16 GB RAM-u.

Czym jest Broadway

Broadway to framework do budowy wieloetapowych pipeline'ów przetwarzania danych. Zbudowany na GenStage (niskopoziomowa biblioteka backpressure od Dashbit), dodaje wszystko, czego potrzebujesz w produkcji: batching, acknowledgment, rate limiting, graceful shutdown i telemetrię.

Architektura pipeline'u

Producent(y) → Procesor(y) → Batcher(y) → BatchProcesor(y)
EtapRolaCallback
ProducerPobiera dane z zewnętrznego źródła (SQS, Kafka, RabbitMQ, CSV)GenStage handle_demand/2
ProcessorPrzetwarza pojedyncze wiadomości (walidacja, transformacja, routing)handle_message/3
BatcherGrupuje wiadomości po kluczu, rozmiarze lub timeoutAutomatyczny
BatchProcessorWykonuje operacje hurtowe (bulk insert, upload S3)handle_batch/4

Kluczowa cecha: demand-driven flow. Procesory proszą producenta o dane tylko wtedy, gdy mają wolne moce. Producent pobiera z kolejki tylko tyle, ile downstream jest w stanie przetworzyć. Automatycznie. Bez konfiguracji.

Jeśli handle_batch/4 jest wolny (baza jest obciążona), procesory zwalniają, producent przestaje pobierać z SQS. Kolejka rośnie w SQS, nie w pamięci BEAM. Gdy baza się odblokuje, pipeline automatycznie przyspiesza. Zero OOM, zero ręcznego throttlingu.

Pełny pipeline: import zamówień z RabbitMQ

defmodule MyApp.OrderPipeline do
  use Broadway

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {BroadwayRabbitMQ.Producer, [
          queue: "orders",
          connection: [host: "localhost"],
          qos: [prefetch_count: 100],
          on_failure: :reject_and_requeue_once
        ]},
        concurrency: 2
      ],
      processors: [
        default: [
          concurrency: System.schedulers_online() * 2,
          max_demand: 10
        ]
      ],
      batchers: [
        db: [
          concurrency: 5,
          batch_size: 100,
          batch_timeout: 2_000
        ],
        notify: [
          concurrency: 2,
          batch_size: 50,
          batch_timeout: 1_000
        ]
      ]
    )
  end

  @impl true
  def handle_message(_processor, message, _context) do
    case Jason.decode(message.data) do
      {:ok, %{"type" => "order"} = data} ->
        message
        |> Broadway.Message.update_data(fn _ -> data end)
        |> Broadway.Message.put_batcher(:db)

      {:ok, %{"type" => "notification"} = data} ->
        message
        |> Broadway.Message.update_data(fn _ -> data end)
        |> Broadway.Message.put_batcher(:notify)

      {:error, _} ->
        Broadway.Message.failed(message, :invalid_json)
    end
  end

  @impl true
  def handle_batch(:db, messages, _batch_info, _context) do
    rows = Enum.map(messages, fn msg ->
      %{
        external_id: msg.data["id"],
        total: msg.data["total"],
        customer_id: msg.data["customer_id"],
        inserted_at: DateTime.utc_now(),
        updated_at: DateTime.utc_now()
      }
    end)

    MyApp.Repo.insert_all("orders", rows,
      on_conflict: :replace_all,
      conflict_target: :external_id
    )

    messages
  end

  @impl true
  def handle_batch(:notify, messages, _batch_info, _context) do
    Enum.each(messages, fn msg ->
      Phoenix.PubSub.broadcast(MyApp.PubSub, "orders", {:new_order, msg.data})
    end)

    messages
  end

  @impl true
  def handle_failed(messages, _context) do
    Enum.each(messages, fn msg ->
      require Logger
      Logger.error("Order processing failed: #{inspect(msg.status)}")
    end)

    messages
  end
end

Co się dzieje:

  1. Producer pobiera wiadomości z RabbitMQ (max 100 naraz, dzięki prefetch_count)
  2. Processor dekoduje JSON, routuje zamówienia do batchera :db, notyfikacje do :notify
  3. Batcher :db zbiera 100 wiadomości (lub czeka max 2 sekundy) i oddaje do BatchProcessora
  4. BatchProcessor :db robi jeden INSERT ALL zamiast 100 osobnych insertów
  5. Batcher :notify zbiera 50 wiadomości i broadcastuje przez PubSub
  6. RabbitMQ dostaje ACK dopiero po pomyślnym przetworzeniu. Jeśli coś się nie uda, wiadomość wraca do kolejki (raz)

100 wiadomości = 1 zapytanie SQL. Nie 100 zapytań. Batching zmienia O(n) operacji bazodanowych w O(1).

Źródła danych - producenci

Broadway ma oficjalne producenty dla najpopularniejszych brokerów:

Amazon SQS

# mix.exs: {:broadway_sqs, "~> 0.7"}

producer: [
  module: {BroadwaySQS.Producer, [
    queue_url: "https://sqs.eu-central-1.amazonaws.com/123/orders",
    config: [region: "eu-central-1"]
  ]},
  concurrency: 10
]

Apache Kafka

# mix.exs: {:broadway_kafka, "~> 0.4"}

producer: [
  module: {BroadwayKafka.Producer, [
    hosts: [localhost: 9092],
    group_id: "my-consumer-group",
    topics: ["events"]
  ]},
  concurrency: 1  # Kafka zarządza partycjami wewnętrznie
]

RabbitMQ

# mix.exs: {:broadway_rabbitmq, "~> 0.8"}

producer: [
  module: {BroadwayRabbitMQ.Producer, [
    queue: "my_queue",
    connection: [host: "localhost"],
    qos: [prefetch_count: 50],
    on_failure: :reject_and_requeue_once
  ]},
  concurrency: 5
]

Własny producent - polling z bazy danych

Nie masz kolejki? Nie potrzebujesz. Broadway obsługuje dowolne źródło danych:

defmodule MyApp.DbPoller do
  use GenStage

  @poll_interval 5_000

  @impl true
  def init(_opts) do
    schedule_poll()
    {:producer, %{demand: 0, buffer: []}}
  end

  @impl true
  def handle_demand(demand, state) do
    {to_send, remaining} = Enum.split(state.buffer, demand)
    {:noreply, to_send, %{state | buffer: remaining, demand: state.demand + demand - length(to_send)}}
  end

  @impl true
  def handle_info(:poll, state) do
    schedule_poll()

    rows = MyApp.Repo.all(
      from r in "pending_imports",
        where: r.status == "pending",
        limit: ^max(state.demand, 100),
        order_by: [asc: r.inserted_at]
    )

    messages = Enum.map(rows, fn row ->
      %Broadway.Message{
        data: row,
        acknowledger: {__MODULE__, :ack_ref, row.id}
      }
    end)

    {to_send, to_buffer} = Enum.split(messages, state.demand)
    {:noreply, to_send, %{state | buffer: to_buffer, demand: max(state.demand - length(to_send), 0)}}
  end

  def ack(:ack_ref, successful, failed) do
    success_ids = Enum.map(successful, fn %{acknowledger: {_, _, id}} -> id end)
    failed_ids = Enum.map(failed, fn %{acknowledger: {_, _, id}} -> id end)

    if success_ids != [] do
      MyApp.Repo.update_all(
        from(r in "pending_imports", where: r.id in ^success_ids),
        set: [status: "processed"]
      )
    end

    if failed_ids != [] do
      MyApp.Repo.update_all(
        from(r in "pending_imports", where: r.id in ^failed_ids),
        set: [status: "failed"]
      )
    end
  end

  defp schedule_poll, do: Process.send_after(self(), :poll, @poll_interval)
end

Zero Kafki, zero SQS, zero RabbitMQ. Producent polluje tabelę PostgreSQL co 5 sekund, Broadway przetwarza wiersze z pełnym backpressure, batchingiem i acknowledgementa. Przetworzone wiersze oznaczane jako "processed", błędne jako "failed".

Backpressure - dlaczego to zmienia wszystko

Bez backpressure:

Źródło: 10 000 msg/s → Twój system: max 1 000 msg/s
  Efekt: 9 000 msg/s ląduje w pamięci
  Za 10 minut: 5.4M wiadomości w RAM
  Za 15 minut: OOM kill

Z Broadway:

Źródło: 10 000 msg/s → Broadway: demand = 1 000/s
  Efekt: źródło dostarcza tylko 1 000 msg/s
  Reszta czeka w SQS/Kafka/RabbitMQ
  RAM: stały, niezależnie od obciążenia

Broadway nie "buforuje" nadmiarowych wiadomości. On ich nie pobiera. Procesory sygnalizują demand do producenta, producent pobiera z kolejki dokładnie tyle, ile downstream potrzebuje. Nadmiar zostaje w kolejce - która jest zaprojektowana do przechowywania milionów wiadomości (SQS, Kafka) i nie zjada pamięci Twojego serwera.

Rate limiting

Broadway ma wbudowany rate limiting na poziomie producenta:

producer: [
  module: {BroadwaySQS.Producer, [queue_url: "..."]},
  concurrency: 5,
  rate_limiting: [
    allowed_messages: 2000,
    interval: 1_000
  ]
]

Niezależnie od tego ile producentów działa, maksymalnie 2000 wiadomości na sekundę wpłynie do pipeline'u. Przydatne, gdy downstream (baza danych, API) ma znane limity przepustowości.

Zmiana w runtime - bez restartu:

# Godziny szczytu - zwolnij
Broadway.update_rate_limiting(MyApp.OrderPipeline, allowed_messages: 500)

# Noc - przyspiesz
Broadway.update_rate_limiting(MyApp.OrderPipeline, allowed_messages: 5000)

Batching - klucz do wydajności

Jedna z najważniejszych funkcji Broadway. Zamiast 1000 pojedynczych INSERT-ów, robisz 10 batchów po 100.

Routing do batcherów

@impl true
def handle_message(_, message, _) do
  case message.data["type"] do
    "payment" ->
      Broadway.Message.put_batcher(message, :payments)

    "log" ->
      Broadway.Message.put_batcher(message, :logs)

    _ ->
      message  # domyślny batcher
  end
end

Sub-batching po kluczu

put_batch_key/2 tworzy oddzielne batche w ramach jednego batchera:

@impl true
def handle_message(_, message, _) do
  message
  |> Broadway.Message.put_batcher(:db)
  |> Broadway.Message.put_batch_key(message.data["tenant_id"])
end

@impl true
def handle_batch(:db, messages, batch_info, _context) do
  # Wszystkie wiadomości w tym batchu należą do tego samego tenanta
  tenant_id = batch_info.batch_key

  rows = Enum.map(messages, &transform_row/1)
  MyApp.Repo.insert_all("events", rows, prefix: "tenant_#{tenant_id}")

  messages
end

Dynamiczny rozmiar batcha

Od Broadway v1.1 - batch na podstawie rozmiaru danych, nie liczby wiadomości:

batchers: [
  s3: [
    batch_size: {0, fn message, acc ->
      new_size = acc + byte_size(message.data)
      if new_size >= 5_000_000, do: {:emit, 0}, else: {:cont, new_size}
    end},
    batch_timeout: 10_000
  ]
]

Batch emitowany, gdy osiągnie 5 MB danych - niezależnie czy to 10 czy 10 000 wiadomości. Idealny dla S3 put_object, gdzie rozmiar payloadu ma znaczenie.

Obsługa błędów

Jawne oznaczanie wiadomości jako failed

@impl true
def handle_message(_, message, _) do
  case validate(message.data) do
    :ok -> message
    {:error, reason} -> Broadway.Message.failed(message, reason)
  end
end

Dead Letter Queue

Broadway nie ma wbudowanej DLQ, ale wzorzec jest prosty:

@impl true
def handle_failed(messages, _context) do
  rows = Enum.map(messages, fn msg ->
    %{
      pipeline: "order_pipeline",
      original_data: :erlang.term_to_binary(msg.data),
      failure_reason: inspect(msg.status),
      inserted_at: DateTime.utc_now()
    }
  end)

  MyApp.Repo.insert_all("dead_letter_queue", rows)
  messages
end

Wiadomości, które nie dały się przetworzyć, lądują w tabeli PostgreSQL. Oban worker może je potem ponownie przetworzyć:

defmodule MyApp.Workers.RetryDeadLetters do
  use Oban.Worker, queue: :retries

  @impl true
  def perform(_job) do
    MyApp.Repo.all(
      from d in "dead_letter_queue",
        where: d.retry_count < 3,
        limit: 100
    )
    |> Enum.each(fn record ->
      data = :erlang.binary_to_term(record.original_data)
      MyApp.WebhookProducer.push_message(data)
    end)

    :ok
  end
end

Broadway vs Oban - kiedy co

Broadway i Oban to komplementarne narzędzia, nie konkurencja:

WymiarBroadwayOban
CelCiągłe przetwarzanie strumieniPojedyncze zadania w tle
ŹródłoZewnętrzne kolejki (SQS, Kafka, RabbitMQ)PostgreSQL
PersystencjaW brokerzeW bazie danych
ThroughputMiliony msg/sTysiące jobs/s
BatchingNatywny, first-classBrak
BackpressureAutomatycznyBrak (limity per-queue)
RetryZależy od producentaWbudowany z backoff
SchedulingBrakCron, unikalne joby
UIBrak (telemetria)Oban Web

Broadway: kontrahent wysyła 100 000 webhooków dziennie → przetwórz je strumieniowo z batchingiem

Oban: po przetworzeniu zamówienia → wyślij email, wygeneruj PDF, zaktualizuj raport

Razem:

@impl true
def handle_batch(:db, messages, _info, _context) do
  # Broadway: hurtowy insert
  rows = Enum.map(messages, &to_row/1)
  MyApp.Repo.insert_all("orders", rows)

  # Oban: follow-up per zamówienie
  Enum.each(messages, fn msg ->
    %{order_id: msg.data["id"]}
    |> MyApp.Workers.SendConfirmation.new()
    |> Oban.insert()
  end)

  messages
end

Broadway przetwarza strumień zamówień z Kafki. Oban wysyła emaile potwierdzające. Każde narzędzie robi to, do czego zostało zaprojektowane.

Monitoring - telemetria

Broadway emituje zdarzenia :telemetry dla każdego etapu pipeline'u:

defmodule MyApp.BroadwayMetrics do
  def setup do
    :telemetry.attach_many("broadway", [
      [:broadway, :processor, :message, :stop],
      [:broadway, :processor, :message, :exception],
      [:broadway, :batch_processor, :stop]
    ], &handle_event/4, nil)
  end

  def handle_event([:broadway, :processor, :message, :stop], %{duration: d}, meta, _) do
    ms = System.convert_time_unit(d, :native, :millisecond)

    if ms > 1000 do
      require Logger
      Logger.warning("Slow message: #{ms}ms in #{meta.topology_name}")
    end
  end

  def handle_event([:broadway, :processor, :message, :exception], _, meta, _) do
    require Logger
    Logger.error("Broadway exception: #{inspect(meta.reason)}")
  end

  def handle_event([:broadway, :batch_processor, :stop], %{duration: d}, meta, _) do
    :telemetry.execute(
      [:my_app, :broadway, :batch],
      %{
        duration_ms: System.convert_time_unit(d, :native, :millisecond),
        success: length(meta.successful_messages),
        failed: length(meta.failed_messages)
      },
      %{batcher: meta.batch_info.batcher}
    )
  end
end

Podłącz do Prometheus, StatsD, LiveDashboard - i masz pełną widoczność: ile wiadomości przetwarza pipeline, jak szybko, ile się nie udaje, gdzie są wąskie gardła.

Testowanie

Broadway ma wbudowane narzędzia testowe:

defmodule MyApp.OrderPipelineTest do
  use ExUnit.Case

  test "valid orders are inserted" do
    {:ok, _} = MyApp.OrderPipeline.start_link(
      name: :test_pipeline,
      producer_module: {Broadway.DummyProducer, []}
    )

    ref = Broadway.test_message(:test_pipeline,
      ~s({"type": "order", "id": "123", "total": 99.90, "customer_id": "c1"})
    )

    assert_receive {:ack, ^ref, [_successful], []}, 5_000
  end

  test "invalid JSON is rejected" do
    {:ok, _} = MyApp.OrderPipeline.start_link(
      name: :test_pipeline_2,
      producer_module: {Broadway.DummyProducer, []}
    )

    ref = Broadway.test_message(:test_pipeline_2, "not json")

    assert_receive {:ack, ^ref, [], [_failed]}, 5_000
  end
end

Broadway.test_message/3 wstrzykuje wiadomość do pipeline'u bez prawdziwego brokera. DummyProducer zastępuje SQS/Kafka/RabbitMQ. Test jest szybki, deterministyczny i nie wymaga żadnej infrastruktury.

Tuning - ile procesów, jakie batche

producer: [
  concurrency: 5        # Producenci: dopasuj do źródła
                          # SQS: 5-10, Kafka: 1 (partycje wewnętrzne)
],
processors: [
  default: [
    concurrency: 50,     # Procesory: schedulers * 2 dla I/O-bound
    max_demand: 10        # Ile wiadomości naraz z producenta
  ]
],
batchers: [
  db: [
    concurrency: 5,      # BatchProcesory: dopasuj do bazy
    batch_size: 100,      # Wiadomości per batch
    batch_timeout: 2_000  # Max czas oczekiwania na pełny batch
  ]
]

Reguły:

  • Procesory: System.schedulers_online() * 2 dla I/O-bound, schedulers_online() dla CPU-bound
  • batch_size: 100-500 dla bazy danych, 1000+ dla S3/plików
  • batch_timeout: 1-5 sekund. Za krótki = małe batche. Za długi = latencja
  • Partycjonowanie: jeśli kolejność ma znaczenie, użyj partition_by - wszystkie wiadomości z tym samym kluczem trafią do tego samego procesora

Kiedy Broadway, kiedy nie

Broadway: ciągły strumień danych z zewnętrznego źródła, potrzebujesz backpressure, batchingu i fault tolerance

Nie-Broadway: jednorazowy import pliku (użyj Stream + Repo.insert_all), pojedyncze zadanie w tle (użyj Oban), prosty pub/sub (użyj Phoenix.PubSub)

Broadway rozwiązuje jeden problem i rozwiązuje go doskonale: przetwarzanie ciągłych strumieni danych z gwarancją, że żadna wiadomość nie zostanie utracona, żadna nie zostanie przetworzona dwa razy, i żaden spike ruchu nie położy systemu.

Masz kontrahenta, który zalewa Cię danymi, a Twój system nie nadąża? Porozmawiajmy - pokażemy, jak Broadway przetworzy Twój strumień danych bez utraty jednej wiadomości.