MQTT Adapter

Subscribe to MQTT topics from any broker - supports JSON, Sparkplug B, and custom payload formats.

MQTT Adapter

The MQTT adapter enables Conduit to subscribe to MQTT topics, bringing data from IoT devices, gateways, and message brokers into the context mesh.

Overview

The MQTT adapter supports:

  • MQTT 3.1.1 and 5.0: Full protocol support
  • Wildcard Subscriptions: + and # wildcards
  • Sparkplug B: Native Sparkplug decoding
  • JSON Extraction: JSONPath for nested values
  • QoS Levels: 0, 1, and 2
  • TLS/SSL: Secure connections

Prerequisites

  1. MQTT Broker: Any compliant broker (Mosquitto, HiveMQ, EMQX, etc.)
  2. Network Access: Reach the broker (typically port 1883 or 8883)
  3. Topic Structure: Understanding of your topic hierarchy

Configuration

Basic Configuration

adapter:
  type: mqtt
  name: mqtt-plant-floor

  broker:
    host: mqtt.company.com
    port: 1883
    protocol: tcp  # or ssl

  credentials:
    username: ${MQTT_USER}
    password: ${MQTT_PASS}

  subscriptions:
    - topic: plant/+/temperature
      qos: 1

TLS Configuration

broker:
  host: mqtt.company.com
  port: 8883
  protocol: ssl

  tls:
    ca: /etc/conduit/certs/ca.pem
    cert: /etc/conduit/certs/client.pem
    key: /etc/conduit/certs/client.key
    verify: true

Subscriptions

Simple Subscriptions

subscriptions:
  - topic: sensors/temperature
    qos: 1

  - topic: sensors/pressure
    qos: 0

Wildcard Subscriptions

subscriptions:
  # Single-level wildcard (+)
  - topic: plant/+/temperature
    # Matches: plant/area1/temperature, plant/area2/temperature

  # Multi-level wildcard (#)
  - topic: plant/building1/#
    # Matches: plant/building1/*, plant/building1/floor1/*, etc.

QoS Levels

| QoS | Delivery | Use Case | |-----|----------|----------| | 0 | At most once | Non-critical telemetry | | 1 | At least once | Important data, may duplicate | | 2 | Exactly once | Critical, ordered data |

Payload Parsing

JSON Payloads

subscriptions:
  - topic: sensors/+/data
    format: json
    mapping:
      value: "$.temperature"
      timestamp: "$.ts"
      quality: "$.quality"

  # Extract multiple values from nested JSON
  - topic: device/+/telemetry
    format: json
    extract:
      - path: "$.sensors.temp"
        name: "${topic[1]}_temperature"
      - path: "$.sensors.humidity"
        name: "${topic[1]}_humidity"
      - path: "$.sensors.pressure"
        name: "${topic[1]}_pressure"

Plain Text

subscriptions:
  - topic: simple/temperature
    format: text
    dataType: float

Binary

subscriptions:
  - topic: binary/sensor
    format: binary
    structure:
      - name: temperature
        type: float32
        offset: 0
      - name: pressure
        type: float32
        offset: 4
      - name: status
        type: uint8
        offset: 8

Sparkplug B

Native support for the Sparkplug B industrial IoT specification:

subscriptions:
  - topic: spBv1.0/#
    format: sparkplug

sparkplug:
  groupId: Plant1
  edgeNodeId: Gateway1

  metrics:
    include:
      - "Temperature/*"
      - "Pressure/*"
    exclude:
      - "*/Internal/*"

Sparkplug Message Types

The adapter handles all Sparkplug message types:

| Type | Description | |------|-------------| | NBIRTH | Node birth certificate | | DBIRTH | Device birth certificate | | NDATA | Node data | | DDATA | Device data | | NCMD | Node command | | DCMD | Device command | | NDEATH | Node death | | DDEATH | Device death |

Topic-to-Tag Mapping

Pattern-Based Naming

subscriptions:
  - topic: plant/+/+/temperature
    format: json
    mapping:
      value: "$.value"
    tagName: "${topic[1]}_${topic[2]}_Temperature"

# plant/area1/tank1/temperature -> area1_tank1_Temperature
# plant/area2/tank3/temperature -> area2_tank3_Temperature

Metadata Extraction

subscriptions:
  - topic: devices/+/sensors/+
    metadata:
      device: "${topic[1]}"
      sensor: "${topic[3]}"
      source: "mqtt"

Message Processing

Deduplication

processing:
  deduplicate:
    enabled: true
    window: 1000  # ms
    key: [topic, value]

Rate Limiting

processing:
  rateLimit:
    enabled: true
    maxPerSecond: 1000
    strategy: drop  # or buffer

Buffering

processing:
  buffer:
    enabled: true
    maxSize: 10000
    flushInterval: 1000  # ms

Connection Management

Keep-Alive

broker:
  keepAlive: 60  # seconds
  cleanSession: true

Reconnection

broker:
  reconnect:
    enabled: true
    initialDelay: 1000  # ms
    maxDelay: 30000
    multiplier: 2

Last Will

broker:
  lastWill:
    topic: conduit/adapters/mqtt-plant-floor/status
    message: "offline"
    qos: 1
    retain: true

Publishing (Optional)

Enable publishing for command responses:

publishing:
  enabled: true
  defaultQos: 1
  defaultRetain: false

  topics:
    - pattern: conduit/responses/+
      qos: 1

Performance Tuning

Connection Settings

broker:
  connectionTimeout: 30000  # ms
  maxInflight: 100
  maxPacketSize: 1048576  # 1MB

Subscription Batching

subscriptions:
  batchSize: 100
  parallelProcessing: true
  workers: 4

Troubleshooting

Connection Issues

Connection Refused

  • Verify broker host and port
  • Check firewall rules
  • Confirm credentials

TLS Handshake Failed

  • Verify certificate chain
  • Check certificate expiration
  • Confirm TLS version compatibility

Message Issues

No Messages Received

  • Verify topic pattern matches published topics
  • Check subscription QoS vs publish QoS
  • Review broker ACLs

Parse Errors

  • Verify payload format matches configuration
  • Check JSON structure
  • Review Sparkplug encoding

Performance Issues

High Latency

  • Increase worker count
  • Enable parallel processing
  • Review message rate limits

Example: IoT Gateway Integration

adapter:
  type: mqtt
  name: mqtt-iot-gateway

  broker:
    host: iot-hub.company.com
    port: 8883
    protocol: ssl
    tls:
      verify: true
      ca: /etc/conduit/certs/iot-ca.pem

  credentials:
    username: gateway-conduit
    password: ${MQTT_PASSWORD}

  subscriptions:
    # Temperature sensors
    - topic: sensors/+/temperature
      format: json
      mapping:
        value: "$.value"
        timestamp: "$.timestamp"
      tagName: "IoT_${topic[1]}_Temperature"
      unit: "°C"

    # Sparkplug devices
    - topic: spBv1.0/Plant1/#
      format: sparkplug

    # Raw binary sensors
    - topic: legacy/+/data
      format: binary
      structure:
        - name: temp
          type: int16
          scale: 0.1
        - name: humidity
          type: uint8

Next Steps