MQTT Adapter
The MQTT adapter enables Conduit to subscribe to MQTT topics, bringing data from IoT devices, gateways, and message brokers into the context mesh.
Overview
The MQTT adapter supports:
- MQTT 3.1.1 and 5.0: Full protocol support
- Wildcard Subscriptions:
+and#wildcards - Sparkplug B: Native Sparkplug decoding
- JSON Extraction: JSONPath for nested values
- QoS Levels: 0, 1, and 2
- TLS/SSL: Secure connections
Prerequisites
- MQTT Broker: Any compliant broker (Mosquitto, HiveMQ, EMQX, etc.)
- Network Access: Reach the broker (typically port 1883 or 8883)
- Topic Structure: Understanding of your topic hierarchy
Configuration
Basic Configuration
adapter:
type: mqtt
name: mqtt-plant-floor
broker:
host: mqtt.company.com
port: 1883
protocol: tcp # or ssl
credentials:
username: ${MQTT_USER}
password: ${MQTT_PASS}
subscriptions:
- topic: plant/+/temperature
qos: 1
TLS Configuration
broker:
host: mqtt.company.com
port: 8883
protocol: ssl
tls:
ca: /etc/conduit/certs/ca.pem
cert: /etc/conduit/certs/client.pem
key: /etc/conduit/certs/client.key
verify: true
Subscriptions
Simple Subscriptions
subscriptions:
- topic: sensors/temperature
qos: 1
- topic: sensors/pressure
qos: 0
Wildcard Subscriptions
subscriptions:
# Single-level wildcard (+)
- topic: plant/+/temperature
# Matches: plant/area1/temperature, plant/area2/temperature
# Multi-level wildcard (#)
- topic: plant/building1/#
# Matches: plant/building1/*, plant/building1/floor1/*, etc.
QoS Levels
| QoS | Delivery | Use Case | |-----|----------|----------| | 0 | At most once | Non-critical telemetry | | 1 | At least once | Important data, may duplicate | | 2 | Exactly once | Critical, ordered data |
Payload Parsing
JSON Payloads
subscriptions:
- topic: sensors/+/data
format: json
mapping:
value: "$.temperature"
timestamp: "$.ts"
quality: "$.quality"
# Extract multiple values from nested JSON
- topic: device/+/telemetry
format: json
extract:
- path: "$.sensors.temp"
name: "${topic[1]}_temperature"
- path: "$.sensors.humidity"
name: "${topic[1]}_humidity"
- path: "$.sensors.pressure"
name: "${topic[1]}_pressure"
Plain Text
subscriptions:
- topic: simple/temperature
format: text
dataType: float
Binary
subscriptions:
- topic: binary/sensor
format: binary
structure:
- name: temperature
type: float32
offset: 0
- name: pressure
type: float32
offset: 4
- name: status
type: uint8
offset: 8
Sparkplug B
Native support for the Sparkplug B industrial IoT specification:
subscriptions:
- topic: spBv1.0/#
format: sparkplug
sparkplug:
groupId: Plant1
edgeNodeId: Gateway1
metrics:
include:
- "Temperature/*"
- "Pressure/*"
exclude:
- "*/Internal/*"
Sparkplug Message Types
The adapter handles all Sparkplug message types:
| Type | Description | |------|-------------| | NBIRTH | Node birth certificate | | DBIRTH | Device birth certificate | | NDATA | Node data | | DDATA | Device data | | NCMD | Node command | | DCMD | Device command | | NDEATH | Node death | | DDEATH | Device death |
Topic-to-Tag Mapping
Pattern-Based Naming
subscriptions:
- topic: plant/+/+/temperature
format: json
mapping:
value: "$.value"
tagName: "${topic[1]}_${topic[2]}_Temperature"
# plant/area1/tank1/temperature -> area1_tank1_Temperature
# plant/area2/tank3/temperature -> area2_tank3_Temperature
Metadata Extraction
subscriptions:
- topic: devices/+/sensors/+
metadata:
device: "${topic[1]}"
sensor: "${topic[3]}"
source: "mqtt"
Message Processing
Deduplication
processing:
deduplicate:
enabled: true
window: 1000 # ms
key: [topic, value]
Rate Limiting
processing:
rateLimit:
enabled: true
maxPerSecond: 1000
strategy: drop # or buffer
Buffering
processing:
buffer:
enabled: true
maxSize: 10000
flushInterval: 1000 # ms
Connection Management
Keep-Alive
broker:
keepAlive: 60 # seconds
cleanSession: true
Reconnection
broker:
reconnect:
enabled: true
initialDelay: 1000 # ms
maxDelay: 30000
multiplier: 2
Last Will
broker:
lastWill:
topic: conduit/adapters/mqtt-plant-floor/status
message: "offline"
qos: 1
retain: true
Publishing (Optional)
Enable publishing for command responses:
publishing:
enabled: true
defaultQos: 1
defaultRetain: false
topics:
- pattern: conduit/responses/+
qos: 1
Performance Tuning
Connection Settings
broker:
connectionTimeout: 30000 # ms
maxInflight: 100
maxPacketSize: 1048576 # 1MB
Subscription Batching
subscriptions:
batchSize: 100
parallelProcessing: true
workers: 4
Troubleshooting
Connection Issues
Connection Refused
- Verify broker host and port
- Check firewall rules
- Confirm credentials
TLS Handshake Failed
- Verify certificate chain
- Check certificate expiration
- Confirm TLS version compatibility
Message Issues
No Messages Received
- Verify topic pattern matches published topics
- Check subscription QoS vs publish QoS
- Review broker ACLs
Parse Errors
- Verify payload format matches configuration
- Check JSON structure
- Review Sparkplug encoding
Performance Issues
High Latency
- Increase worker count
- Enable parallel processing
- Review message rate limits
Example: IoT Gateway Integration
adapter:
type: mqtt
name: mqtt-iot-gateway
broker:
host: iot-hub.company.com
port: 8883
protocol: ssl
tls:
verify: true
ca: /etc/conduit/certs/iot-ca.pem
credentials:
username: gateway-conduit
password: ${MQTT_PASSWORD}
subscriptions:
# Temperature sensors
- topic: sensors/+/temperature
format: json
mapping:
value: "$.value"
timestamp: "$.timestamp"
tagName: "IoT_${topic[1]}_Temperature"
unit: "°C"
# Sparkplug devices
- topic: spBv1.0/Plant1/#
format: sparkplug
# Raw binary sensors
- topic: legacy/+/data
format: binary
structure:
- name: temp
type: int16
scale: 0.1
- name: humidity
type: uint8
Next Steps
- Modbus Adapter - Connect to field devices
- OPC-UA Adapter - Standard industrial protocol
- Architecture - Adapter architecture