Real-time monitoring tysięcy urządzeń - Elixir w Przemyśle 4.0
Fabryka komponentów samochodowych pod Poznaniem - 340 urządzeń na hali produkcyjnej: prasy, roboty spawalnicze, transportery, czujniki temperatury i wibracji. Każde urządzenie wysyła dane co 1-5 sekund. To 200-400 wiadomości na sekundę, 24 godziny na dobę, 7 dni w tygodniu.
Dotychczasowy system (Java + Kafka + InfluxDB + Grafana) wymagał 5 serwerów i zespołu 3 osób do utrzymania. Średni czas od zdarzenia do alertu na ekranie operatora: 12-45 sekund.
Nowy system na Elixirze: 1 serwer, czas od zdarzenia do ekranu: < 200 ms. Jak to działa?
Dlaczego BEAM jest idealny do IoT
BEAM (maszyna wirtualna Erlanga/Elixira) został zbudowany do obsługi milionów jednoczesnych połączeń w centralach telefonicznych. Każde urządzenie IoT to - z perspektywy serwera - połączenie, które wysyła dane i czeka na komendy. Dokładnie ten sam problem.
| Cecha BEAM | Znaczenie dla IoT |
|---|---|
| Lekkie procesy (2.6 KB) | 1 proces per urządzenie = 340 procesów = 884 KB |
| Preemptive scheduling | Jedno zablokowane urządzenie nie wpływa na inne |
| Fault tolerance | Awaria parsera danych z jednego czujnika nie zabija systemu |
| Hot code reload | Aktualizacja logiki bez restartu - urządzenia nie tracą połączenia |
| Distributed Erlang | Klaster serwerów bez zewnętrznego message brokera |
Architektura: jedno urządzenie = jeden proces
┌─────────────────────────────────────┐
MQTT/TCP/HTTP │ Phoenix App │
│ │
┌──────┐ │ ┌────────────┐ ┌──────────────┐ │
│Czuj. 1│──────────▶│ │ DeviceProc │──▶│ AlertEngine │ │
└──────┘ │ │ (prasa-01) │ │ │ │
│ └────────────┘ │ Reguły: │ │
┌──────┐ │ ┌────────────┐ │ temp > 85°C │ │
│Czuj. 2│──────────▶│ │ DeviceProc │──▶│ vibr > 4g │──▶ PubSub
└──────┘ │ │ (robot-07) │ │ offline > 30s│ │ │
│ └────────────┘ └──────────────┘ │ │
┌──────┐ │ ┌────────────┐ │ │
│Czuj. 3│──────────▶│ │ DeviceProc │──▶ TimescaleDB │ │
└──────┘ │ │ (temp-142) │ (historia) │ │
│ └────────────┘ │ │
...×340 │ │ │
└─────────────────────────────────────┘ │
│
┌─────────────────────────────────────┐ │
│ LiveView Dashboard │◀───┘
│ ┌─────────┐ ┌─────────┐ ┌───────┐ │
│ │ Mapa │ │ Alerty │ │Wykresy│ │
│ │ hali │ │ live │ │ live │ │
│ └─────────┘ └─────────┘ └───────┘ │
└─────────────────────────────────────┘GenServer per urządzenie
defmodule MyApp.Device do
use GenServer
@offline_threshold_ms 30_000
@persist_interval_ms 10_000
defstruct [
:device_id, :type, :location,
:last_reading, :last_seen_at,
:status, :alert_state,
readings_buffer: []
]
# --- API ---
def start_link(device_config) do
GenServer.start_link(__MODULE__, device_config,
name: via(device_config.device_id))
end
def push_reading(device_id, reading) do
GenServer.cast(via(device_id), {:reading, reading})
end
def get_status(device_id) do
GenServer.call(via(device_id), :status)
end
def send_command(device_id, command) do
GenServer.call(via(device_id), {:command, command})
end
defp via(device_id) do
{:via, Registry, {MyApp.DeviceRegistry, device_id}}
end
# --- Implementacja ---
@impl true
def init(config) do
# Timer: sprawdzaj czy urządzenie jest online
Process.send_after(self(), :check_alive, @offline_threshold_ms)
# Timer: zapisuj bufor do bazy co 10 sekund
Process.send_after(self(), :persist, @persist_interval_ms)
state = %__MODULE__{
device_id: config.device_id,
type: config.type,
location: config.location,
status: :online,
alert_state: :normal
}
{:ok, state}
end
@impl true
def handle_cast({:reading, reading}, state) do
now = DateTime.utc_now()
state = %{state |
last_reading: reading,
last_seen_at: now,
status: :online,
readings_buffer: [reading | state.readings_buffer]
}
# Sprawdź reguły alertów
state = check_alerts(state, reading)
# Powiadom dashboard w czasie rzeczywistym
Phoenix.PubSub.broadcast(MyApp.PubSub,
"device:#{state.device_id}",
{:reading, state.device_id, reading})
Phoenix.PubSub.broadcast(MyApp.PubSub,
"floor:#{state.location.floor}",
{:device_update, state.device_id, reading})
{:noreply, state}
end
@impl true
def handle_info(:check_alive, state) do
Process.send_after(self(), :check_alive, @offline_threshold_ms)
if state.last_seen_at &&
DateTime.diff(DateTime.utc_now(), state.last_seen_at, :millisecond) > @offline_threshold_ms do
state = %{state | status: :offline}
trigger_alert(state, :device_offline)
{:noreply, state}
else
{:noreply, state}
end
end
@impl true
def handle_info(:persist, state) do
Process.send_after(self(), :persist, @persist_interval_ms)
if state.readings_buffer != [] do
# Bulk insert do TimescaleDB - wydajne
MyApp.Telemetry.bulk_insert(state.device_id, state.readings_buffer)
{:noreply, %{state | readings_buffer: []}}
else
{:noreply, state}
end
end
defp check_alerts(state, reading) do
rules = MyApp.AlertRules.for_device_type(state.type)
Enum.reduce(rules, state, fn rule, acc ->
if rule.check.(reading) do
trigger_alert(acc, rule.type)
%{acc | alert_state: :alerting}
else
acc
end
end)
end
defp trigger_alert(state, alert_type) do
Phoenix.PubSub.broadcast(MyApp.PubSub, "alerts",
{:alert, %{
device_id: state.device_id,
type: alert_type,
location: state.location,
reading: state.last_reading,
at: DateTime.utc_now()
}})
end
end340 urządzeń = 340 procesów GenServer. Każdy:
- Przechowuje ostatni odczyt w pamięci (natychmiastowy dostęp)
- Buforuje odczyty i zapisuje do bazy co 10 sekund (redukcja I/O o 90%)
- Sam wykrywa offline (timer per urządzenie)
- Sam sprawdza reguły alertów (bez centralnego silnika)
- Broadcast'uje zmiany na dashboard (PubSub, < 1 ms)
Reguły alertów
defmodule MyApp.AlertRules do
def for_device_type(:hydraulic_press) do
[
%{type: :overtemp, check: fn r -> r.temperature > 85.0 end},
%{type: :overpressure, check: fn r -> r.pressure > 250.0 end},
%{type: :vibration, check: fn r -> r.vibration_g > 4.0 end},
%{type: :cycle_slow, check: fn r -> r.cycle_time_ms > 5000 end}
]
end
def for_device_type(:welding_robot) do
[
%{type: :current_spike, check: fn r -> r.current_a > 400.0 end},
%{type: :position_drift, check: fn r -> r.position_error_mm > 0.5 end},
%{type: :gas_low, check: fn r -> r.gas_flow_lpm < 8.0 end}
]
end
def for_device_type(:temperature_sensor) do
[
%{type: :overtemp, check: fn r -> r.temperature > 40.0 end},
%{type: :undertemp, check: fn r -> r.temperature < 15.0 end}
]
end
endReguły to zwykłe funkcje Elixira - łatwe do testowania, łatwe do zmiany. Hot code reload pozwala zmienić próg alertu bez restartu systemu.
LiveView Dashboard - hala produkcyjna w przeglądarce
defmodule MyAppWeb.FloorDashboardLive do
use MyAppWeb, :live_view
@impl true
def mount(%{"floor" => floor}, _session, socket) do
# Subskrybuj aktualizacje z całego piętra
Phoenix.PubSub.subscribe(MyApp.PubSub, "floor:#{floor}")
Phoenix.PubSub.subscribe(MyApp.PubSub, "alerts")
devices = MyApp.DeviceManager.list_devices_on_floor(floor)
alerts = MyApp.Alerts.recent(floor, limit: 20)
socket =
socket
|> assign(floor: floor, devices: devices, alerts: alerts)
|> assign(stats: calculate_stats(devices))
{:ok, socket}
end
# Aktualizacja odczytu urządzenia - UI odświeża się natychmiast
@impl true
def handle_info({:device_update, device_id, reading}, socket) do
devices = update_device_in_list(socket.assigns.devices, device_id, reading)
socket =
socket
|> assign(devices: devices)
|> assign(stats: calculate_stats(devices))
{:noreply, socket}
end
# Nowy alert - pojawia się na dashboardzie w czasie rzeczywistym
@impl true
def handle_info({:alert, alert}, socket) do
if alert.location.floor == socket.assigns.floor do
alerts = [alert | Enum.take(socket.assigns.alerts, 19)]
{:noreply, assign(socket, alerts: alerts)}
else
{:noreply, socket}
end
end
defp calculate_stats(devices) do
%{
total: length(devices),
online: Enum.count(devices, &(&1.status == :online)),
alerting: Enum.count(devices, &(&1.alert_state == :alerting)),
offline: Enum.count(devices, &(&1.status == :offline))
}
end
endOperator widzi na ekranie:
- Mapę hali z kolorowymi pinami (zielony = OK, żółty = alert, czerwony = offline)
- Alerty w czasie rzeczywistym - pojawiają się < 200 ms od zdarzenia
- Statystyki - ile urządzeń online, ile w alarmie
- Drill-down - kliknięcie w urządzenie pokazuje historię odczytów
Wszystko bez JavaScriptu (poza minimalnym klientem LiveView), bez WebSocket boilerplate'u, bez Redux/React. Serwer renderuje HTML i wysyła diffa przez WebSocket.
Skala: od 340 do 10 000 urządzeń
Co się stanie, gdy fabryka rośnie?
| Skala | Procesy BEAM | RAM | CPU (1 serwer) | Uwagi |
|---|---|---|---|---|
| 340 urządzeń | 340 | ~2 MB | < 5% | Jeden serwer, zapas 95% |
| 1 000 urządzeń | 1 000 | ~5 MB | ~10% | Jeden serwer |
| 5 000 urządzeń | 5 000 | ~25 MB | ~30% | Jeden serwer |
| 10 000 urządzeń | 10 000 | ~50 MB | ~50% | Jeden serwer lub klaster |
| 50 000 urządzeń | 50 000 | ~250 MB | Klaster | 2-3 serwery (Distributed Erlang) |
Przy 10 000 urządzeń wysyłających dane co 2 sekundy = 5 000 wiadomości/sekundę. Jeden serwer z 8 rdzeniami obsługuje to bez problemu.
Dla porównania - tradycyjny stack:
| Skala | Java + Kafka + InfluxDB | Elixir + PostgreSQL/TimescaleDB |
|---|---|---|
| 1 000 urządzeń | 3 serwery | 1 serwer |
| 5 000 urządzeń | 5 serwerów | 1 serwer |
| 10 000 urządzeń | 7-10 serwerów | 1-2 serwery |
| Koszt infrastruktury/mies. | 8 000 - 25 000 PLN | 2 000 - 5 000 PLN |
| Latencja zdarzenie → ekran | 5-45 sek. | < 200 ms |
Protokoły komunikacji
Urządzenia IoT komunikują się różnymi protokołami. Elixir obsługuje wszystkie:
MQTT - najpopularniejszy w IoT
defmodule MyApp.MqttHandler do
use Tortoise311.Handler
@impl true
def handle_message(["devices", device_id, "telemetry"], payload, state) do
reading = Jason.decode!(payload)
# Przekaż do procesu urządzenia
MyApp.Device.push_reading(device_id, reading)
{:ok, state}
end
@impl true
def handle_message(["devices", device_id, "status"], payload, state) do
status = Jason.decode!(payload)
MyApp.Device.update_status(device_id, status)
{:ok, state}
end
end
# Topic structure:
# devices/{device_id}/telemetry → dane pomiarowe
# devices/{device_id}/status → status urządzenia
# devices/{device_id}/commands → komendy do urządzeniaTCP/Binary - urządzenia przemysłowe (Modbus, OPC-UA)
defmodule MyApp.ModbusListener do
use GenServer
def init(port) do
{:ok, listen_socket} = :gen_tcp.listen(port, [
:binary, active: true, reuseaddr: true
])
{:ok, %{listen_socket: listen_socket}}
end
def handle_info({:tcp, socket, data}, state) do
# Parsuj ramkę Modbus
case MyApp.Modbus.parse_frame(data) do
{:ok, device_id, registers} ->
reading = MyApp.Modbus.registers_to_reading(registers)
MyApp.Device.push_reading(device_id, reading)
{:error, reason} ->
Logger.warning("Invalid Modbus frame: #{reason}")
end
{:noreply, state}
end
endHTTP/Webhook - nowoczesne czujniki z API
# Phoenix controller dla urządzeń wysyłających HTTP POST
defmodule MyAppWeb.DeviceTelemetryController do
use MyAppWeb, :controller
def create(conn, %{"device_id" => device_id, "readings" => readings}) do
Enum.each(readings, fn reading ->
MyApp.Device.push_reading(device_id, reading)
end)
json(conn, %{status: "ok", received: length(readings)})
end
endJeden system, wiele protokołów. Procesowi GenServer nie obchodzi, skąd przyszły dane - obsługuje je identycznie.
Predykcja awarii (predictive maintenance)
Mając historię odczytów, można wykrywać anomalie zanim dojdzie do awarii:
defmodule MyApp.PredictiveMaintenance do
@doc "Sprawdź trend wibracji - rosnący trend = zbliżająca się awaria"
def check_vibration_trend(device_id, hours \\ 24) do
readings = MyApp.Telemetry.get_readings(device_id, last_hours: hours)
vibrations = Enum.map(readings, & &1.vibration_g)
avg_first_half = average(Enum.take(vibrations, div(length(vibrations), 2)))
avg_second_half = average(Enum.drop(vibrations, div(length(vibrations), 2)))
trend = (avg_second_half - avg_first_half) / max(avg_first_half, 0.001)
cond do
trend > 0.3 ->
{:warning, "Wibracje wzrosły o #{Float.round(trend * 100, 1)}% w ciągu #{hours}h"}
trend > 0.5 ->
{:critical, "Wibracje wzrosły o #{Float.round(trend * 100, 1)}% - zaplanuj przegląd"}
true ->
:ok
end
end
@doc "Sprawdź degradację wydajności prasy"
def check_press_efficiency(device_id) do
recent = MyApp.Telemetry.avg_cycle_time(device_id, last_hours: 8)
baseline = MyApp.Telemetry.avg_cycle_time(device_id, last_days: 30)
if recent > baseline * 1.15 do
{:warning, "Czas cyklu wzrósł o #{Float.round((recent / baseline - 1) * 100, 1)}% - możliwe zużycie narzędzia"}
else
:ok
end
end
end
# Uruchamiane cyklicznie przez Oban:
defmodule MyApp.Workers.PredictiveCheck do
use Oban.Worker, queue: :maintenance
@impl true
def perform(%{args: %{"device_id" => device_id}}) do
case MyApp.PredictiveMaintenance.check_vibration_trend(device_id) do
{:warning, message} ->
MyApp.Alerts.create_maintenance_alert(device_id, message)
{:critical, message} ->
MyApp.Alerts.create_critical_alert(device_id, message)
:ok ->
:ok
end
end
endPorównanie ze stosem tradycyjnym
| Komponent | Tradycyjny IoT stack | Elixir stack |
|---|---|---|
| Ingestion | Kafka / RabbitMQ | GenServer per device |
| Processing | Java / Python workers | Procesy BEAM |
| Storage | InfluxDB / TimescaleDB | TimescaleDB (ten sam PostgreSQL) |
| Alerting | Osobny serwis | AlertEngine (proces BEAM) |
| Dashboard | Grafana | Phoenix LiveView |
| Message broker | Kafka (3+ serwery) | Phoenix PubSub (wbudowany) |
| Serwery | 5-10 | 1-2 |
| Zespół utrzymania | 2-3 osoby | 1 osoba |
| Latencja | 5-45 sekund | < 200 ms |
| Koszt/mies. | 12 000 - 30 000 PLN | 2 000 - 5 000 PLN |
Dlaczego nie Kafka + Java
Kafka jest świetna do strumieniowania danych między systemami. Ale jeśli masz jeden system monitorujący urządzenia, Kafka to overengineering:
- Kafka wymaga klastra ZooKeeper/KRaft (3+ serwery)
- Kafka dodaje latencję (batching, partitions, consumers)
- Kafka nie daje Ci stanu urządzenia w pamięci (musisz odpytywać bazę)
- Kafka nie daje Ci dashboardu (potrzebujesz Grafany)
- Kafka nie daje Ci alertów (potrzebujesz osobnego serwisu)
BEAM daje to wszystko w jednym procesie runtime'u: ingestion, processing, state, alerting, dashboard. Mniej ruchomych części = mniej awarii = mniej kosztów.
Co to oznacza dla fabryki
| Metryka | Przed (Java + Kafka) | Po (Elixir + LiveView) |
|---|---|---|
| Czas reakcji na alert | 12-45 sek. | < 200 ms |
| Nieplanowane przestoje/mies. | 4-6 | 1-2 (predictive maintenance) |
| Koszt przestoju/godz. | 15 000 - 50 000 PLN | - |
| Oszczędność/rok (mniej przestojów) | - | 200 000 - 600 000 PLN |
| Koszt infrastruktury/rok | 180 000 - 360 000 PLN | 24 000 - 60 000 PLN |
| Koszt utrzymania/rok | 300 000 PLN (3 osoby) | 120 000 PLN (1 osoba) |
Alert, który dociera do operatora w 200 ms zamiast 45 sekund, to 44 sekundy na reakcję, zanim prasa uszkodzi formę wartą 80 000 PLN. Jeden uniknięty przestój zwraca koszt całego systemu.
Chcesz monitoring produkcji w czasie rzeczywistym, bez Kafki i Grafany? Porozmawiajmy - zaprojektujemy system dopasowany do Twojej hali.