Skip to content

MQTT & latency alerts

Every app can publish its event stream to an MQTT broker, in addition to the existing signed webhooks. It’s a single tap on the same callbacks dispatcher every outbound event already flows through, so webhooks and MQTT always see exactly the same taxonomy — there is no separate emit path to keep in sync, and MQTT works even when no webhook URL is configured (an app can be MQTT-only). A broker outage can never break the emitting flow: publishing is best-effort and never throws, the same contract webhooks already have.

The same config block also carries stream latency alerting — a per-room health monitor that fires through both webhooks and MQTT.

# Per-app MQTT event publishing (in ADDITION to callbacks). The broker
# password NEVER lives here — only the `password_env` REFERENCE; the value is
# resolved from the environment / data/secrets.json (like the S3 credentials).
mqtt:
enabled: false # master switch
url: "" # mqtt:// | mqtts:// | ws:// | wss:// (path ok)
username: ""
password_env: APP_LIVE_MQTT_PASSWORD # credential REFERENCE, not the value
topic_prefix: streamhub/live # topics: <prefix>/<category>/<event>
qos: 0 # 0 | 1 | 2
tls: false # force TLS (mqtt:// upgraded to mqtts://)
events: [all] # 'all' or an explicit list of event names
logs:
enabled: false # forward app logs to <prefix>/log/<level>
level: info # minimum forwarded level
# Stream latency/health alerting: emits stream.latency_high /
# stream.latency_recovered through BOTH callbacks and MQTT.
latency_alert:
enabled: false
threshold_ms: 1000 # probe-RTT breach threshold
cooldown_seconds: 60 # min seconds between alerts per room
interval_seconds: 10 # sampling interval

Read with GET /apps/{app}/mqtt (broker password masked, like the S3 credentials) and written with PUT /apps/{app}/mqtt (partial update — omit password to keep the stored one). The dashboard exposes the same fields under the app’s Integrations → MQTT panel. An inline mqtt.password typed into the raw YAML editor is ignored with a warning — it must go through PUT .../mqtt.

Field Type Default Description
enabled boolean false Master switch. Off = no client is ever created, nothing is published.
url string "" Broker URL: mqtt://, mqtts://, ws:// or wss:// (a path is allowed, e.g. wss://mqtt.streamhub.studio/mqtt). Empty = off.
username string "" Broker username (empty for anonymous brokers).
password_env string APP_<SLUG>_MQTT_PASSWORD Name of the env var / secrets.json key holding the broker password. Set the actual value via PUT /apps/{app}/mqtt (password) — it is never stored in the YAML, and reads mask it.
topic_prefix string streamhub/<app> Topic root. Messages go to <prefix>/<category>/<event>.
qos 0 | 1 | 2 0 Publish QoS for every message.
tls boolean false Force TLS: upgrades mqtt:// to mqtts:// (and ws:// to wss://) before connecting. mqtts:///wss:// URLs already use TLS regardless.
events list [all] Event filter: all (the whole taxonomy) or an explicit list of event names, e.g. [vod_ready, stream.latency_high].
logs.enabled boolean false Forward the app’s log stream to <prefix>/log/<level>.
logs.level level info Minimum forwarded level: trace | debug | info | warn | error | fatal.
Field Type Default Description
enabled boolean false Master switch for the per-room latency monitor.
threshold_ms number 1000 Breach threshold for the probe metric, in ms.
cooldown_seconds number 60 Minimum seconds between successive stream.latency_high alerts for the same room.
interval_seconds number 10 Sampling interval per app, in seconds (minimum 2).
Terminal window
curl -s -X PUT $BASE/apps/live/mqtt \
-H "Authorization: Bearer $TOKEN" -H 'Content-Type: application/json' \
-d '{
"enabled": true,
"url": "mqtts://mqtt.example.com:8883",
"username": "streamhub",
"password": "a-long-random-secret",
"topicPrefix": "streamhub/live",
"qos": 0,
"events": ["all"],
"logs": { "enabled": false, "level": "info" },
"latencyAlert": { "enabled": true, "thresholdMs": 800, "cooldownSeconds": 60, "intervalSeconds": 10 }
}'
Terminal window
curl -s $BASE/apps/live/mqtt -H "Authorization: Bearer $TOKEN"
# { "data": { "enabled": true, "url": "mqtts://mqtt.example.com:8883",
# "password": "a-***et", "hasPassword": true, "configured": true, ... } }

StreamHub defines the topic convention — the broker imposes none:

<topic_prefix>/<category>/<event> e.g. streamhub/live/vod/vod_ready
<topic_prefix>/log/<level> e.g. streamhub/live/log/error
Category Events
connection room_*, participant_*, track_*, ingress_*, stream_started/stream_ended, hls_*, restream_*
vod recording_*, vod_*, snapshot_taken, egress_*
plugin plugin_worker_started, plugin_worker_stopped, plugin_worker_error
interaction chat_message, reaction
alert stream.latency_high, stream.latency_recovered
log forwarded app log lines (event: "log")

Every message is a JSON envelope (retain: false, QoS from config):

{
"event": "stream_started",
"app": "live",
"timestamp": "2026-07-03T12:00:00.000Z",
"data": { "room": "live-1", "streamId": "live-1/publisher", "type": "rtmp" }
}

data carries the same event-specific payload the webhook envelope carries in its own data field. Log lines use event: "log" and data: { level, source, message, meta? }.

Subscribe examples:

streamhub/live/# everything for app "live"
streamhub/live/vod/# VOD/recording pipeline only
streamhub/live/alert/# latency alerts only
streamhub/live/log/error error logs only

When mqtt.logs.enabled is true, every log line attributed to the app (the same lines GET /apps/{app}/logs returns) at or above mqtt.logs.level is published to <prefix>/log/<level>. Lines emitted by the MQTT module itself are excluded (loop guard). Plugin worker stdout/stderr is included with source plugin:<id>, so a worker’s output can be followed over MQTT too.

When latency_alert.enabled is true, the monitor samples every active room of the app every interval_seconds and applies a latched threshold per room:

  • a sample above threshold_ms emits stream.latency_high once (latched while it stays high), with payload { room, rttMs, thresholdMs, metric, participants, publishers };
  • a sample back at or below the threshold emits stream.latency_recovered;
  • a re-breach within cooldown_seconds of the previous alert is suppressed.

Both events go through the callbacks dispatcher, so they reach the app’s webhook AND MQTT (<prefix>/alert/...) with the same payload. See Webhooks → Events for the full event catalog.

Each sample times a LiveKit server API round-trip scoped to the room (RoomServiceClient.listParticipants(room)) from the core — that round-trip time is the alert metric, stamped into every alert payload as metric.

Why this metric, rather than per-viewer WebRTC RTT:

  • It’s actually available. The LiveKit server API doesn’t expose per-participant RTT/jitter to the management layer; connection quality lives client-side. The core’s real-time view of a stream is exactly its server API
    • webhooks, so this probe measures that path with zero new infrastructure.
  • It degrades with the right failure modes. In a self-hosted single-box or small-cluster deployment, end-to-end latency creep is dominated by SFU/host overload — CPU saturation from transcoding/egress, event-loop stalls, network pressure. Those same conditions inflate the SFU’s API response time, so the probe RTT is a faithful early-warning proxy.
  • It’s per-room and cheap. One tiny API call per active room per interval (capped at 25 rooms/app/pass) — safe to run continuously; the same call also yields participants/publishers, attached to the alert payload for context.

A failed probe (room gone, LiveKit restarting) is skipped — it never flips the alert state, so a LiveKit restart doesn’t fire a spurious recovered event.

  • enabled: false or an empty url → no client is created, nothing is published, and any live client is cleanly closed.
  • Broker down → the client buffers/reconnects with backoff; QoS 0 messages published while disconnected are dropped (fire-and-forget), QoS 1/2 are queued by the client. Delivery over MQTT is best-effort — the webhook pipeline (with its retries) remains the at-least-once channel.
  • Config change (PUT /apps/{app}/mqtt, or the raw editor + hot-reload) → the app’s client is dropped; the next publish reconnects with the new settings.
  • App delete → the client is closed immediately.

Connections use MQTT 3.1.1/5 with a 60s keepalive, clean sessions, and auto-reconnect with incremental backoff (1s → 2s → 4s … capped at 30s, reset on connect). Clients are created lazily on the first publish.