MQTT & latency alerts
Every app can publish its event stream to an MQTT broker, in addition to the existing signed webhooks. It’s a single tap on the same callbacks dispatcher every outbound event already flows through, so webhooks and MQTT always see exactly the same taxonomy — there is no separate emit path to keep in sync, and MQTT works even when no webhook URL is configured (an app can be MQTT-only). A broker outage can never break the emitting flow: publishing is best-effort and never throws, the same contract webhooks already have.
The same config block also carries stream latency alerting — a per-room health monitor that fires through both webhooks and MQTT.
Configuration
Section titled “Configuration”# Per-app MQTT event publishing (in ADDITION to callbacks). The broker# password NEVER lives here — only the `password_env` REFERENCE; the value is# resolved from the environment / data/secrets.json (like the S3 credentials).mqtt: enabled: false # master switch url: "" # mqtt:// | mqtts:// | ws:// | wss:// (path ok) username: "" password_env: APP_LIVE_MQTT_PASSWORD # credential REFERENCE, not the value topic_prefix: streamhub/live # topics: <prefix>/<category>/<event> qos: 0 # 0 | 1 | 2 tls: false # force TLS (mqtt:// upgraded to mqtts://) events: [all] # 'all' or an explicit list of event names logs: enabled: false # forward app logs to <prefix>/log/<level> level: info # minimum forwarded level
# Stream latency/health alerting: emits stream.latency_high /# stream.latency_recovered through BOTH callbacks and MQTT.latency_alert: enabled: false threshold_ms: 1000 # probe-RTT breach threshold cooldown_seconds: 60 # min seconds between alerts per room interval_seconds: 10 # sampling intervalRead with GET /apps/{app}/mqtt (broker password masked, like the S3
credentials) and written with PUT /apps/{app}/mqtt (partial update — omit
password to keep the stored one). The dashboard exposes the same fields under
the app’s Integrations → MQTT panel. An inline mqtt.password typed into the
raw YAML editor is ignored with a warning — it must go through PUT .../mqtt.
| Field | Type | Default | Description |
|---|---|---|---|
enabled |
boolean | false |
Master switch. Off = no client is ever created, nothing is published. |
url |
string | "" |
Broker URL: mqtt://, mqtts://, ws:// or wss:// (a path is allowed, e.g. wss://mqtt.streamhub.studio/mqtt). Empty = off. |
username |
string | "" |
Broker username (empty for anonymous brokers). |
password_env |
string | APP_<SLUG>_MQTT_PASSWORD |
Name of the env var / secrets.json key holding the broker password. Set the actual value via PUT /apps/{app}/mqtt (password) — it is never stored in the YAML, and reads mask it. |
topic_prefix |
string | streamhub/<app> |
Topic root. Messages go to <prefix>/<category>/<event>. |
qos |
0 | 1 | 2 |
0 |
Publish QoS for every message. |
tls |
boolean | false |
Force TLS: upgrades mqtt:// to mqtts:// (and ws:// to wss://) before connecting. mqtts:///wss:// URLs already use TLS regardless. |
events |
list | [all] |
Event filter: all (the whole taxonomy) or an explicit list of event names, e.g. [vod_ready, stream.latency_high]. |
logs.enabled |
boolean | false |
Forward the app’s log stream to <prefix>/log/<level>. |
logs.level |
level | info |
Minimum forwarded level: trace | debug | info | warn | error | fatal. |
latency_alert
Section titled “latency_alert”| Field | Type | Default | Description |
|---|---|---|---|
enabled |
boolean | false |
Master switch for the per-room latency monitor. |
threshold_ms |
number | 1000 |
Breach threshold for the probe metric, in ms. |
cooldown_seconds |
number | 60 |
Minimum seconds between successive stream.latency_high alerts for the same room. |
interval_seconds |
number | 10 |
Sampling interval per app, in seconds (minimum 2). |
Set it via the API
Section titled “Set it via the API”curl -s -X PUT $BASE/apps/live/mqtt \ -H "Authorization: Bearer $TOKEN" -H 'Content-Type: application/json' \ -d '{ "enabled": true, "url": "mqtts://mqtt.example.com:8883", "username": "streamhub", "password": "a-long-random-secret", "topicPrefix": "streamhub/live", "qos": 0, "events": ["all"], "logs": { "enabled": false, "level": "info" }, "latencyAlert": { "enabled": true, "thresholdMs": 800, "cooldownSeconds": 60, "intervalSeconds": 10 } }'curl -s $BASE/apps/live/mqtt -H "Authorization: Bearer $TOKEN"# { "data": { "enabled": true, "url": "mqtts://mqtt.example.com:8883",# "password": "a-***et", "hasPassword": true, "configured": true, ... } }Topics & envelope
Section titled “Topics & envelope”StreamHub defines the topic convention — the broker imposes none:
<topic_prefix>/<category>/<event> e.g. streamhub/live/vod/vod_ready<topic_prefix>/log/<level> e.g. streamhub/live/log/error| Category | Events |
|---|---|
connection |
room_*, participant_*, track_*, ingress_*, stream_started/stream_ended, hls_*, restream_* |
vod |
recording_*, vod_*, snapshot_taken, egress_* |
plugin |
plugin_worker_started, plugin_worker_stopped, plugin_worker_error |
interaction |
chat_message, reaction |
alert |
stream.latency_high, stream.latency_recovered |
log |
forwarded app log lines (event: "log") |
Every message is a JSON envelope (retain: false, QoS from config):
{ "event": "stream_started", "app": "live", "timestamp": "2026-07-03T12:00:00.000Z", "data": { "room": "live-1", "streamId": "live-1/publisher", "type": "rtmp" }}data carries the same event-specific payload the webhook envelope
carries in its own data field. Log lines use event: "log" and
data: { level, source, message, meta? }.
Subscribe examples:
streamhub/live/# everything for app "live"streamhub/live/vod/# VOD/recording pipeline onlystreamhub/live/alert/# latency alerts onlystreamhub/live/log/error error logs onlyApp log forwarding
Section titled “App log forwarding”When mqtt.logs.enabled is true, every log line attributed to the app (the
same lines GET /apps/{app}/logs returns) at or above mqtt.logs.level is
published to <prefix>/log/<level>. Lines emitted by the MQTT module itself are
excluded (loop guard). Plugin worker stdout/stderr is included with source
plugin:<id>, so a worker’s output can be followed over MQTT too.
High-latency alert
Section titled “High-latency alert”When latency_alert.enabled is true, the monitor samples every active room
of the app every interval_seconds and applies a latched threshold per room:
- a sample above
threshold_msemitsstream.latency_highonce (latched while it stays high), with payload{ room, rttMs, thresholdMs, metric, participants, publishers }; - a sample back at or below the threshold emits
stream.latency_recovered; - a re-breach within
cooldown_secondsof the previous alert is suppressed.
Both events go through the callbacks dispatcher, so they reach the app’s
webhook AND MQTT (<prefix>/alert/...) with the same payload. See
Webhooks → Events for the full event catalog.
The metric: livekit_room_probe_rtt_ms
Section titled “The metric: livekit_room_probe_rtt_ms”Each sample times a LiveKit server API round-trip scoped to the room
(RoomServiceClient.listParticipants(room)) from the core — that round-trip
time is the alert metric, stamped into every alert payload as metric.
Why this metric, rather than per-viewer WebRTC RTT:
- It’s actually available. The LiveKit server API doesn’t expose
per-participant RTT/jitter to the management layer; connection quality lives
client-side. The core’s real-time view of a stream is exactly its server API
- webhooks, so this probe measures that path with zero new infrastructure.
- It degrades with the right failure modes. In a self-hosted single-box or small-cluster deployment, end-to-end latency creep is dominated by SFU/host overload — CPU saturation from transcoding/egress, event-loop stalls, network pressure. Those same conditions inflate the SFU’s API response time, so the probe RTT is a faithful early-warning proxy.
- It’s per-room and cheap. One tiny API call per active room per interval
(capped at 25 rooms/app/pass) — safe to run continuously; the same call also
yields
participants/publishers, attached to the alert payload for context.
A failed probe (room gone, LiveKit restarting) is skipped — it never flips the
alert state, so a LiveKit restart doesn’t fire a spurious recovered event.
Failure semantics
Section titled “Failure semantics”enabled: falseor an emptyurl→ no client is created, nothing is published, and any live client is cleanly closed.- Broker down → the client buffers/reconnects with backoff; QoS
0messages published while disconnected are dropped (fire-and-forget), QoS1/2are queued by the client. Delivery over MQTT is best-effort — the webhook pipeline (with its retries) remains the at-least-once channel. - Config change (
PUT /apps/{app}/mqtt, or the raw editor + hot-reload) → the app’s client is dropped; the next publish reconnects with the new settings. - App delete → the client is closed immediately.
Connections use MQTT 3.1.1/5 with a 60s keepalive, clean sessions, and auto-reconnect with incremental backoff (1s → 2s → 4s … capped at 30s, reset on connect). Clients are created lazily on the first publish.