MQTT Proxy

Background

  • MQTT is a standard messaging protocol for IoT (Internet of Things) which is extremely lightweight and used by a wide variety of industries.
  • By supporting MQTT Proxy in Easegress, MQTT clients can produce messages to backend through publish packet pipeline.
  • We also provide the HTTP endpoint to allow the backend to send messages to MQTT clients.

Design

  • Use github.com/eclipse/paho.mqtt.golang/packets to parse MQTT packet. paho.mqtt.golang is a MQTT 3.1.1 go client introduced by Eclipse Foundation (who also introduced the most widely used MQTT broker mosquitto).
  • As a MQTT proxy, we support MQTT clients to publish messages to backend through publish packet pipeline.
  • As Pipeline is protocol independent, it can use MQTT filters to do things like user authentication or topic mapping (map MQTT multi-level topic into single topic and key-value headers).
  • We also support MQTT clients to subscribe topics (wildcard is supported) and send messages back to the MQTT clients through the HTTP endpoint.

By default:

             publish msg                       publish pipeline
MQTT client ------------> Easegress MQTTProxy ----------------> Backend like Kafka

all published msg will go to Backend, will not send to other MQTT clients.

             subscribe msg
MQTT client <---------------- Easegress MQTT HTTP Endpoint <---- Backend Server

all msg send back to MQTT clients come from HTTP endpoint.
  • We assume that IoT devices (use MQTT client) report their status to the backend (through tools like Kafka), and backend process these messages and send instructions back to IoT devices.

Using brokerMode:

             publish msg                       publish pipeline
MQTT client ------------> Easegress MQTTProxy ----------------> Backend like Kafka
                              |
                              | also send msg to subscribers
                              |
             subscribe msg            send msg through http endpoint
MQTT client <------------ Easegress <------------------------------- Backend Server

By setting brokerMode to true. MQTTProxy can both send msg to backend and subscribers. Users can also send msg to clients by using HTTP endpoint.

Example

Save following yaml to file mqttproxy.yaml and then run

egctl create -f mqttproxy.yaml
kind: MQTTProxy
name: mqttproxy
port: 1883  # tcp port for mqtt clients to connect
useTLS: true
certificate:
- name: cert1
  cert: balabala
  key: keyForbalabala
- name: cert2
  cert: foo
  key: bar
rules:
- when:
    packetType: Connect
  pipeline: pipeline-mqtt-auth
- when:
    packetType: Publish
  pipeline: pipeline-mqtt-publish
---
title: by default, brokerMode is disabled.
linkTitle: by default, brokerMode is disabled.
weight: 1
---
brokerMode: true

---

name: pipeline-mqtt-auth
kind: Pipeline
protocol: MQTT
flow:
- filter: auth
filters:
- name: auth
  kind: MQTTClientAuth
  salt: salt
  auth:
  # username and password are both test
  - username: test
    saltedSha256Pass: 1bc1a361f17092bc7af4b2f82bf9194ea9ee2ca49eb2e53e39f555bc1eeaed74

---

name: pipeline-mqtt-publish
kind: Pipeline
protocol: MQTT
flow:
- filter: publish-kafka-backend
filters:
- name: publish-kafka-backend
  kind: KafkaMQTT
  backend: ["127.0.0.1:9092"]
  topic:
    default: kafka-topic

In this example, we use pipeline to process MQTT Connect packet (check username and password) and Publish packet (send to Kafka backend).

Now, we support following filters for MQTTProxy:

  • TopicMapper: map MQTT Publish packet multi-level topic into single topic and key-value headers.
  • MQTTClientAuth: provide username and password checking for MQTT Connect packet.
  • KafkaMQTT: send MQTT Publish message to Kafka backend. By default, KafkaMQTT filter will add clientID, username, mqttTopic to Kafka message headers.

Topic Mapping

In MQTT, there are multi-levels in a topic. Topic mapping is used to map MQTT topic to a single topic with headers. For example:

MQTT multi-level topics:
- beijing/car/123/log
- shanghai/tv/234/status
- nanjing/phone/456/error

with corresponding pattern:
- loc/device/ID/event

with Topic mapper, may produce Kafka topic:
- topic: iot_device, headers: {loc: beijing, device: car, ID: 123, event: log}
- topic: iot_device, headers: {loc: shanghai, device: tv, ID: 234, event: status}
- topic: iot_device, headers: {loc: nanjing, device: phone, ID: 456, event: error}

Topic mapping can make processing MQTT messages easier. In Easegress, we use filter TopicMapper to do topic mapping.

Here’s a simple example:

name: pipeline-mqtt-publish
kind: Pipeline
protocol: MQTT
flow:
- filter: topic-mapper
- filter: publish-kafka-backend
filters:
- name: topic-mapper
  kind: TopicMapper
  setKV:  # setKV set topic and header map into MQTT Context
    topic: kafka-topic
    headers: kafka-headers
  # matchIndex and route will decide which policy we use to do the mapping for MQTT topic
  matchIndex: 0
  route:
  - name: gateway
      matchExpr: "gate*"
  - name: direct
      matchExpr: "dir*"
  # policies define how to create topic and header map by using MQTT publish topic.
  policies:
    - name: direct
      topicIndex: 1
      route:
      - topic: iot_phone
        exprs: ["iphone", "xiaomi", "oppo", "pixel"]
      - topic: iot_other
        exprs: [".*"]
      headers:
      0: direct
      1: device
      2: status
    - name: gateway
      topicIndex: 3
      route:
      - topic: iot_phone
        exprs: ["iphone", "xiaomi", "oppo", "pixel"]
      - topic: iot_other
        exprs: [".*"]
      headers:
      0: gateway
      1: gatewayID
      2: device
      3: status
- name: publish-kafka-backend
  kind: KafkaMQTT
  backend: ["my-cluster-kafka-bootstrap.kafka:9092"]
  topic:
    default: kafka-topic
  mqtt:
    # since we don't use original topic name in MQTT Publish packet.
    # we need keys to get topic and header map from MQTT Context.
    topicKey: kafka-topic
    headerKey: kafka-headers

Match different topic mapping policy

Consider there may be multiple schemas for your MQTT topic, so we first provide a router to route your MQTT topic to different mapping policies and then do the map in that policy.

For example,

schema1: gateway/gatewayID/device/status
schema2: direct/device/status

...
    # matchIndex is the MQTT topic level used to match the route policy
    matchIndex: 0
    route:
    - name: gateway
        matchExpr: "gate*"
    - name: direct
        matchExpr: "dir*"
...

means that we use MQTT topic level 0 to match matchExpr to find a corresponding policy. In this case, gateway/gate123/iphone/log will match policy gateway, direct/iphone/log will match policy direct.

Detail of single policy

  policies:
    - name: direct
      topicIndex: 1
      route:
      - topic: iot_phone
        exprs: ["iphone", "xiaomi", "oppo", "pixel"]
      - topic: iot_other
        exprs: [".*"]
      headers:
      0: direct
      1: device
      2: status

topicIndex is the MQTT topic level used to produce Kafka topic (Regular expressions supported), in this case, direct/iphone/... will produce Kafka topic iot_phone, but direct/car/... will produce Kafka topic iot_other. headers used to produce Kafka headers.

More example about topic mapper:

use yaml above:
MQTT topic:
pattern1: gateway/gatewayID/device/status -> match policy gateway
pattern2: direct/device/status -> match policy direct

example1: "gateway/gate123/iphone/log"
topic and header map:
    topic: iot_phone
    headers:
        gateway: gateway
        gatewayID: gate123
        device: iphone
        status: log

example2: "direct/xiaomi/status"
topic and header map:
    topic: iot_phone
    headers:
        direct: direct
        device: xiaomi
        status: status

example3: "direct/tv/log"
topic and header map:
    topic: iot_other
    headers:
        direct: direct
        device: tv
        status: log

Empty topicMapper means there is no map between the MQTT topic and Kafka topic.

Note: For MQTT topic "gateway/gate123/iphone/log", index 0 is "gateway". For "/gateway/gate123/iphone/log" index 0 is still "gateway" not "". So, index 0 is the first non-empty level of multi-level MQTT topic.

HTTP endpoint

We support the backend to send messages back to MQTT clients through the HTTP endpoint.

API for http endpoint:

  • Host: Easegress IP, for example http://127.0.0.1
  • Port: Easegress API address, by default, :2381
  • Path: apis/v1/mqttproxy/{name}/topics/publish, where name is the name of MQTT proxy
  • Method: POST
  • Body:
{
  "topic": "yourTopicName",
  "qos": 1,
  "payload": "dataPayload",
  "base64": false
}

Note: Currently, the QoS only support 0 and 1

To send binary data, you can encode your binary data base64 and send base64 flag to true. Your client will receive the original binary data, we will do the decode.

  • Status code:
    • 200: Success
    • 400: StatusBadRequest, may wrong http method, or wrong data (qos send to illegal number) etc.

The HTTP endpoint schema also works for multi-node deployment. Say you have 3 Easegress instances called eg-0, eg-1, eg-2, and your MQTT client connects to eg-0, if you send messages to eg-1, your client will receive the message too.

We also support wildcard subscriptions. For example,

POST http://127.0.0.1:2381/apis/v2/mqttproxy/mqttproxy/topics/publish
{
  "topic": "Beijing/Phone/Update",
  "qos": 1, // currently only support 0 and 1
  "payload": "time to update",
  "base64": false
}

the clients subscribe following topics will receive the message:
"Beijing/Phone/Update"
"Beijing/+/Update"
"Beijing/Phone/+"
"+/Phone/Update"
"Beijing/+/+"
"Beijing/#"
"+/+/+"

References

  1. https://github.com/eclipse/paho.mqtt.golang
  2. http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html