Real-time monitoring tysięcy urządzeń - Elixir w Przemyśle 4.0

Fabryka komponentów samochodowych pod Poznaniem - 340 urządzeń na hali produkcyjnej: prasy, roboty spawalnicze, transportery, czujniki temperatury i wibracji. Każde urządzenie wysyła dane co 1-5 sekund. To 200-400 wiadomości na sekundę, 24 godziny na dobę, 7 dni w tygodniu.

Dotychczasowy system (Java + Kafka + InfluxDB + Grafana) wymagał 5 serwerów i zespołu 3 osób do utrzymania. Średni czas od zdarzenia do alertu na ekranie operatora: 12-45 sekund.

Nowy system na Elixirze: 1 serwer, czas od zdarzenia do ekranu: < 200 ms. Jak to działa?

Dlaczego BEAM jest idealny do IoT

BEAM (maszyna wirtualna Erlanga/Elixira) został zbudowany do obsługi milionów jednoczesnych połączeń w centralach telefonicznych. Każde urządzenie IoT to - z perspektywy serwera - połączenie, które wysyła dane i czeka na komendy. Dokładnie ten sam problem.

Cecha BEAMZnaczenie dla IoT
Lekkie procesy (2.6 KB)1 proces per urządzenie = 340 procesów = 884 KB
Preemptive schedulingJedno zablokowane urządzenie nie wpływa na inne
Fault toleranceAwaria parsera danych z jednego czujnika nie zabija systemu
Hot code reloadAktualizacja logiki bez restartu - urządzenia nie tracą połączenia
Distributed ErlangKlaster serwerów bez zewnętrznego message brokera

Architektura: jedno urządzenie = jeden proces

                    ┌─────────────────────────────────────┐
   MQTT/TCP/HTTP    │            Phoenix App               │
                    │                                     │
┌──────┐           │  ┌────────────┐   ┌──────────────┐  │
│Czuj. 1│──────────▶│  │ DeviceProc │──▶│ AlertEngine  │  │
└──────┘           │  │ (prasa-01) │   │              │  │
                    │  └────────────┘   │  Reguły:     │  │
┌──────┐           │  ┌────────────┐   │  temp > 85°C │  │
│Czuj. 2│──────────▶│  │ DeviceProc │──▶│  vibr > 4g   │──▶ PubSub
└──────┘           │  │ (robot-07) │   │  offline > 30s│  │    │
                    │  └────────────┘   └──────────────┘  │    │
┌──────┐           │  ┌────────────┐                      │    │
│Czuj. 3│──────────▶│  │ DeviceProc │──▶ TimescaleDB      │    │
└──────┘           │  │ (temp-142) │   (historia)         │    │
                    │  └────────────┘                      │    │
  ...×340           │                                     │    │
                    └─────────────────────────────────────┘    │

                    ┌─────────────────────────────────────┐    │
                    │         LiveView Dashboard          │◀───┘
                    │  ┌─────────┐ ┌─────────┐ ┌───────┐ │
                    │  │ Mapa    │ │ Alerty  │ │Wykresy│ │
                    │  │ hali    │ │ live    │ │ live  │ │
                    │  └─────────┘ └─────────┘ └───────┘ │
                    └─────────────────────────────────────┘

GenServer per urządzenie

defmodule MyApp.Device do
  use GenServer

  @offline_threshold_ms 30_000
  @persist_interval_ms 10_000

  defstruct [
    :device_id, :type, :location,
    :last_reading, :last_seen_at,
    :status, :alert_state,
    readings_buffer: []
  ]

  # --- API ---

  def start_link(device_config) do
    GenServer.start_link(__MODULE__, device_config,
      name: via(device_config.device_id))
  end

  def push_reading(device_id, reading) do
    GenServer.cast(via(device_id), {:reading, reading})
  end

  def get_status(device_id) do
    GenServer.call(via(device_id), :status)
  end

  def send_command(device_id, command) do
    GenServer.call(via(device_id), {:command, command})
  end

  defp via(device_id) do
    {:via, Registry, {MyApp.DeviceRegistry, device_id}}
  end

  # --- Implementacja ---

  @impl true
  def init(config) do
    # Timer: sprawdzaj czy urządzenie jest online
    Process.send_after(self(), :check_alive, @offline_threshold_ms)
    # Timer: zapisuj bufor do bazy co 10 sekund
    Process.send_after(self(), :persist, @persist_interval_ms)

    state = %__MODULE__{
      device_id: config.device_id,
      type: config.type,
      location: config.location,
      status: :online,
      alert_state: :normal
    }

    {:ok, state}
  end

  @impl true
  def handle_cast({:reading, reading}, state) do
    now = DateTime.utc_now()

    state = %{state |
      last_reading: reading,
      last_seen_at: now,
      status: :online,
      readings_buffer: [reading | state.readings_buffer]
    }

    # Sprawdź reguły alertów
    state = check_alerts(state, reading)

    # Powiadom dashboard w czasie rzeczywistym
    Phoenix.PubSub.broadcast(MyApp.PubSub,
      "device:#{state.device_id}",
      {:reading, state.device_id, reading})

    Phoenix.PubSub.broadcast(MyApp.PubSub,
      "floor:#{state.location.floor}",
      {:device_update, state.device_id, reading})

    {:noreply, state}
  end

  @impl true
  def handle_info(:check_alive, state) do
    Process.send_after(self(), :check_alive, @offline_threshold_ms)

    if state.last_seen_at &&
       DateTime.diff(DateTime.utc_now(), state.last_seen_at, :millisecond) > @offline_threshold_ms do
      state = %{state | status: :offline}
      trigger_alert(state, :device_offline)
      {:noreply, state}
    else
      {:noreply, state}
    end
  end

  @impl true
  def handle_info(:persist, state) do
    Process.send_after(self(), :persist, @persist_interval_ms)

    if state.readings_buffer != [] do
      # Bulk insert do TimescaleDB - wydajne
      MyApp.Telemetry.bulk_insert(state.device_id, state.readings_buffer)
      {:noreply, %{state | readings_buffer: []}}
    else
      {:noreply, state}
    end
  end

  defp check_alerts(state, reading) do
    rules = MyApp.AlertRules.for_device_type(state.type)

    Enum.reduce(rules, state, fn rule, acc ->
      if rule.check.(reading) do
        trigger_alert(acc, rule.type)
        %{acc | alert_state: :alerting}
      else
        acc
      end
    end)
  end

  defp trigger_alert(state, alert_type) do
    Phoenix.PubSub.broadcast(MyApp.PubSub, "alerts",
      {:alert, %{
        device_id: state.device_id,
        type: alert_type,
        location: state.location,
        reading: state.last_reading,
        at: DateTime.utc_now()
      }})
  end
end

340 urządzeń = 340 procesów GenServer. Każdy:

  • Przechowuje ostatni odczyt w pamięci (natychmiastowy dostęp)
  • Buforuje odczyty i zapisuje do bazy co 10 sekund (redukcja I/O o 90%)
  • Sam wykrywa offline (timer per urządzenie)
  • Sam sprawdza reguły alertów (bez centralnego silnika)
  • Broadcast'uje zmiany na dashboard (PubSub, < 1 ms)

Reguły alertów

defmodule MyApp.AlertRules do
  def for_device_type(:hydraulic_press) do
    [
      %{type: :overtemp, check: fn r -> r.temperature > 85.0 end},
      %{type: :overpressure, check: fn r -> r.pressure > 250.0 end},
      %{type: :vibration, check: fn r -> r.vibration_g > 4.0 end},
      %{type: :cycle_slow, check: fn r -> r.cycle_time_ms > 5000 end}
    ]
  end

  def for_device_type(:welding_robot) do
    [
      %{type: :current_spike, check: fn r -> r.current_a > 400.0 end},
      %{type: :position_drift, check: fn r -> r.position_error_mm > 0.5 end},
      %{type: :gas_low, check: fn r -> r.gas_flow_lpm < 8.0 end}
    ]
  end

  def for_device_type(:temperature_sensor) do
    [
      %{type: :overtemp, check: fn r -> r.temperature > 40.0 end},
      %{type: :undertemp, check: fn r -> r.temperature < 15.0 end}
    ]
  end
end

Reguły to zwykłe funkcje Elixira - łatwe do testowania, łatwe do zmiany. Hot code reload pozwala zmienić próg alertu bez restartu systemu.

LiveView Dashboard - hala produkcyjna w przeglądarce

defmodule MyAppWeb.FloorDashboardLive do
  use MyAppWeb, :live_view

  @impl true
  def mount(%{"floor" => floor}, _session, socket) do
    # Subskrybuj aktualizacje z całego piętra
    Phoenix.PubSub.subscribe(MyApp.PubSub, "floor:#{floor}")
    Phoenix.PubSub.subscribe(MyApp.PubSub, "alerts")

    devices = MyApp.DeviceManager.list_devices_on_floor(floor)
    alerts = MyApp.Alerts.recent(floor, limit: 20)

    socket =
      socket
      |> assign(floor: floor, devices: devices, alerts: alerts)
      |> assign(stats: calculate_stats(devices))

    {:ok, socket}
  end

  # Aktualizacja odczytu urządzenia - UI odświeża się natychmiast
  @impl true
  def handle_info({:device_update, device_id, reading}, socket) do
    devices = update_device_in_list(socket.assigns.devices, device_id, reading)

    socket =
      socket
      |> assign(devices: devices)
      |> assign(stats: calculate_stats(devices))

    {:noreply, socket}
  end

  # Nowy alert - pojawia się na dashboardzie w czasie rzeczywistym
  @impl true
  def handle_info({:alert, alert}, socket) do
    if alert.location.floor == socket.assigns.floor do
      alerts = [alert | Enum.take(socket.assigns.alerts, 19)]
      {:noreply, assign(socket, alerts: alerts)}
    else
      {:noreply, socket}
    end
  end

  defp calculate_stats(devices) do
    %{
      total: length(devices),
      online: Enum.count(devices, &(&1.status == :online)),
      alerting: Enum.count(devices, &(&1.alert_state == :alerting)),
      offline: Enum.count(devices, &(&1.status == :offline))
    }
  end
end

Operator widzi na ekranie:

  • Mapę hali z kolorowymi pinami (zielony = OK, żółty = alert, czerwony = offline)
  • Alerty w czasie rzeczywistym - pojawiają się < 200 ms od zdarzenia
  • Statystyki - ile urządzeń online, ile w alarmie
  • Drill-down - kliknięcie w urządzenie pokazuje historię odczytów

Wszystko bez JavaScriptu (poza minimalnym klientem LiveView), bez WebSocket boilerplate'u, bez Redux/React. Serwer renderuje HTML i wysyła diffa przez WebSocket.

Skala: od 340 do 10 000 urządzeń

Co się stanie, gdy fabryka rośnie?

SkalaProcesy BEAMRAMCPU (1 serwer)Uwagi
340 urządzeń340~2 MB< 5%Jeden serwer, zapas 95%
1 000 urządzeń1 000~5 MB~10%Jeden serwer
5 000 urządzeń5 000~25 MB~30%Jeden serwer
10 000 urządzeń10 000~50 MB~50%Jeden serwer lub klaster
50 000 urządzeń50 000~250 MBKlaster2-3 serwery (Distributed Erlang)

Przy 10 000 urządzeń wysyłających dane co 2 sekundy = 5 000 wiadomości/sekundę. Jeden serwer z 8 rdzeniami obsługuje to bez problemu.

Dla porównania - tradycyjny stack:

SkalaJava + Kafka + InfluxDBElixir + PostgreSQL/TimescaleDB
1 000 urządzeń3 serwery1 serwer
5 000 urządzeń5 serwerów1 serwer
10 000 urządzeń7-10 serwerów1-2 serwery
Koszt infrastruktury/mies.8 000 - 25 000 PLN2 000 - 5 000 PLN
Latencja zdarzenie → ekran5-45 sek.< 200 ms

Protokoły komunikacji

Urządzenia IoT komunikują się różnymi protokołami. Elixir obsługuje wszystkie:

MQTT - najpopularniejszy w IoT

defmodule MyApp.MqttHandler do
  use Tortoise311.Handler

  @impl true
  def handle_message(["devices", device_id, "telemetry"], payload, state) do
    reading = Jason.decode!(payload)

    # Przekaż do procesu urządzenia
    MyApp.Device.push_reading(device_id, reading)

    {:ok, state}
  end

  @impl true
  def handle_message(["devices", device_id, "status"], payload, state) do
    status = Jason.decode!(payload)
    MyApp.Device.update_status(device_id, status)
    {:ok, state}
  end
end

# Topic structure:
# devices/{device_id}/telemetry  → dane pomiarowe
# devices/{device_id}/status     → status urządzenia
# devices/{device_id}/commands   → komendy do urządzenia

TCP/Binary - urządzenia przemysłowe (Modbus, OPC-UA)

defmodule MyApp.ModbusListener do
  use GenServer

  def init(port) do
    {:ok, listen_socket} = :gen_tcp.listen(port, [
      :binary, active: true, reuseaddr: true
    ])

    {:ok, %{listen_socket: listen_socket}}
  end

  def handle_info({:tcp, socket, data}, state) do
    # Parsuj ramkę Modbus
    case MyApp.Modbus.parse_frame(data) do
      {:ok, device_id, registers} ->
        reading = MyApp.Modbus.registers_to_reading(registers)
        MyApp.Device.push_reading(device_id, reading)

      {:error, reason} ->
        Logger.warning("Invalid Modbus frame: #{reason}")
    end

    {:noreply, state}
  end
end

HTTP/Webhook - nowoczesne czujniki z API

# Phoenix controller dla urządzeń wysyłających HTTP POST
defmodule MyAppWeb.DeviceTelemetryController do
  use MyAppWeb, :controller

  def create(conn, %{"device_id" => device_id, "readings" => readings}) do
    Enum.each(readings, fn reading ->
      MyApp.Device.push_reading(device_id, reading)
    end)

    json(conn, %{status: "ok", received: length(readings)})
  end
end

Jeden system, wiele protokołów. Procesowi GenServer nie obchodzi, skąd przyszły dane - obsługuje je identycznie.

Predykcja awarii (predictive maintenance)

Mając historię odczytów, można wykrywać anomalie zanim dojdzie do awarii:

defmodule MyApp.PredictiveMaintenance do
  @doc "Sprawdź trend wibracji - rosnący trend = zbliżająca się awaria"
  def check_vibration_trend(device_id, hours \\ 24) do
    readings = MyApp.Telemetry.get_readings(device_id, last_hours: hours)

    vibrations = Enum.map(readings, & &1.vibration_g)
    avg_first_half = average(Enum.take(vibrations, div(length(vibrations), 2)))
    avg_second_half = average(Enum.drop(vibrations, div(length(vibrations), 2)))

    trend = (avg_second_half - avg_first_half) / max(avg_first_half, 0.001)

    cond do
      trend > 0.3 ->
        {:warning, "Wibracje wzrosły o #{Float.round(trend * 100, 1)}% w ciągu #{hours}h"}

      trend > 0.5 ->
        {:critical, "Wibracje wzrosły o #{Float.round(trend * 100, 1)}% - zaplanuj przegląd"}

      true ->
        :ok
    end
  end

  @doc "Sprawdź degradację wydajności prasy"
  def check_press_efficiency(device_id) do
    recent = MyApp.Telemetry.avg_cycle_time(device_id, last_hours: 8)
    baseline = MyApp.Telemetry.avg_cycle_time(device_id, last_days: 30)

    if recent > baseline * 1.15 do
      {:warning, "Czas cyklu wzrósł o #{Float.round((recent / baseline - 1) * 100, 1)}% - możliwe zużycie narzędzia"}
    else
      :ok
    end
  end
end

# Uruchamiane cyklicznie przez Oban:
defmodule MyApp.Workers.PredictiveCheck do
  use Oban.Worker, queue: :maintenance

  @impl true
  def perform(%{args: %{"device_id" => device_id}}) do
    case MyApp.PredictiveMaintenance.check_vibration_trend(device_id) do
      {:warning, message} ->
        MyApp.Alerts.create_maintenance_alert(device_id, message)
      {:critical, message} ->
        MyApp.Alerts.create_critical_alert(device_id, message)
      :ok ->
        :ok
    end
  end
end

Porównanie ze stosem tradycyjnym

KomponentTradycyjny IoT stackElixir stack
IngestionKafka / RabbitMQGenServer per device
ProcessingJava / Python workersProcesy BEAM
StorageInfluxDB / TimescaleDBTimescaleDB (ten sam PostgreSQL)
AlertingOsobny serwisAlertEngine (proces BEAM)
DashboardGrafanaPhoenix LiveView
Message brokerKafka (3+ serwery)Phoenix PubSub (wbudowany)
Serwery5-101-2
Zespół utrzymania2-3 osoby1 osoba
Latencja5-45 sekund< 200 ms
Koszt/mies.12 000 - 30 000 PLN2 000 - 5 000 PLN

Dlaczego nie Kafka + Java

Kafka jest świetna do strumieniowania danych między systemami. Ale jeśli masz jeden system monitorujący urządzenia, Kafka to overengineering:

  • Kafka wymaga klastra ZooKeeper/KRaft (3+ serwery)
  • Kafka dodaje latencję (batching, partitions, consumers)
  • Kafka nie daje Ci stanu urządzenia w pamięci (musisz odpytywać bazę)
  • Kafka nie daje Ci dashboardu (potrzebujesz Grafany)
  • Kafka nie daje Ci alertów (potrzebujesz osobnego serwisu)

BEAM daje to wszystko w jednym procesie runtime'u: ingestion, processing, state, alerting, dashboard. Mniej ruchomych części = mniej awarii = mniej kosztów.

Co to oznacza dla fabryki

MetrykaPrzed (Java + Kafka)Po (Elixir + LiveView)
Czas reakcji na alert12-45 sek.< 200 ms
Nieplanowane przestoje/mies.4-61-2 (predictive maintenance)
Koszt przestoju/godz.15 000 - 50 000 PLN-
Oszczędność/rok (mniej przestojów)-200 000 - 600 000 PLN
Koszt infrastruktury/rok180 000 - 360 000 PLN24 000 - 60 000 PLN
Koszt utrzymania/rok300 000 PLN (3 osoby)120 000 PLN (1 osoba)

Alert, który dociera do operatora w 200 ms zamiast 45 sekund, to 44 sekundy na reakcję, zanim prasa uszkodzi formę wartą 80 000 PLN. Jeden uniknięty przestój zwraca koszt całego systemu.


Chcesz monitoring produkcji w czasie rzeczywistym, bez Kafki i Grafany? Porozmawiajmy - zaprojektujemy system dopasowany do Twojej hali.