Using MQTT with Memfault
There are many protocols to communicate with your remote devices and backend systems in the cloud. Devices commonly use HTTPS, MQTT, CoAP, LwM2M, or other proprietary protocols. Memfault's API supports HTTPS, but it's easy to form a bridge in your cloud between your device's protocol (e.g., MQTT, CoAP, LWM2M) and Memfault.
Particularly for IoT devices, MQTT is a popular choice to craft a data path to your cloud. MQTT follows a simple publish/subscribe model, allowing devices or services to connect to a broker. The broker facilitates receiving published messages and routing them to subscribers. As an example, a device publishes its sensor data to the broker on a specific topic, and a service subscribes to this same topic to process this data. In addition to the protocol's simplicity and improved efficiency, MQTT is beneficial for memory usage. Many devices have limited RAM to devote to multiple TCP + TLS connections. Using MQTT to send data from your application and the Memfault SDK saves resources because there is only one connection to manage.
This guide will outline the best practices for using MQTT with Memfault. We will cover the basic components of this system, parameters to configure for your setup, and a simple device example. Of particular focus will be features introduced in MQTT v5.0. We highly recommend upgrading your MQTT stack to support v5.0, as it adds several key features that improve efficiency.
MQTT Background
Before covering more advanced topics in MQTT, let's start with the basics. Devices (clients) send data by publishing messages to a server. Other clients receive these messages by creating subscriptions with the server. The server is also known as a broker, as it facilitates the transfer of messages from publishing clients to subscribing clients. MQTT generally uses TCP as its transport layer1, providing reliable and ordered transport. Below is a summary of common operations within MQTT:
Operation | Description | Related Packet Types |
---|---|---|
Connect | Opens an MQTT connection with the server | CONNECT, CONNACK, DISCONNECT, AUTH |
Publish | Sends a message to the server. The server is responsible for forwarding to any active subscribers | PUBLISH, PUBACK, PUBREC, PUBREL, PUBCOMP |
Subscribe | Create a new subscription to a topic with the server | SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK |
Ping | Send a message to the server to indicate the connection is active | PINGREQ, PINGRESP |
The general sequence of operations for a device in the field follows:
- Connect to a server
- Publish data as needed
- Disconnect from the server when complete
A service processing sensor data would take these operations:
- Connect to a server
- Subscribe to any topics (device data)
- Receive published device data
Now that we've reviewed MQTT let's dive into a few interesting parameters we can tune!
MQTT Connections
MQTT uses TCP as a transport layer, requiring that we open a TCP connection before transmitting data to the broker. Once this connection opens, we can send a CONNECT packet to the MQTT broker and publish our data. TCP does not require a keep-alive feature, meaning connections can remain active indefinitely. MQTT v5.0 specifies a keep-alive interval to allow clients and servers to improve connection management. Suppose a device opens an MQTT connection and does not send packets to the broker (PUBLISH, SUBSCRIBE, PINGREQ, etc.). In that case, the broker will automatically close the connection after 1.5x the length of the keep-alive interval. Depending on the server, we can tune this value to reduce the overhead required to create new connections.
One thing to remember is that this parameter heavily depends on your MQTT server's capabilities. AWS IoT Core, in most instances, will default to 1200 seconds (20 minutes)2. Other MQTT brokers may support longer keep-alive intervals.
Even if our broker does not support long keep-alive intervals, we can keep the connection open with minimal data transmission through ping operations. Our device can periodically send PINGREQ before the keep-alive interval expires. This simple packet has a minimal header and no payload. The server will restart the associated timers with the device's connection and respond with a PINGRESP packet. A failed PINGREQ or missing PINGRESP signals that the device's connection is lost. As an added bonus, the device will know when a connection ends through protocol errors or timeouts.
Publishing QoS
MQTT provides three different grades of message reliability when publishing messages. These grades or levels are known as Quality of Service (QoS). The table below outlines the general differences between the levels:
QoS Value | Description | Comment |
---|---|---|
0 | Delivered once at most | ⚠️ may result in dropped data |
1 | Delivered at least once | ✅ Memfault tolerates duplicate packets |
2 | Delivered exactly once | ✅ Ideal QoS level, but adds an additional round trip |
Increasing QoS provides more message reliability at the cost of increased bandwidth and latency. Each message with a QoS > 0 requires additional acknowledgments sent between the device and the broker before a message is considered complete.
Your data path to Memfault requires reliable transport to ensure the correct decoding of Memfault data, so selecting a QoS ≥ 1 is recommended. QoS level 0 does not guarantee message delivery, which can result in data loss.
Topic Architecture
MQTT's publish/subscribe operations revolve around the concept of topics. We define topic names as a string written like a filepath (e.g., /this/is/a/topic). Clients publish messages to topics, which the broker forwards to subscribed clients. There are a few things we should keep in mind when naming our topics:
- Topic strings are UTF-8 encoded
- Topic length is overhead when publishing our data
- Topic structure informs what filters we can use when creating subscriptions
Recommended Memfault Topic Structure
MQTT's topic structure provides a simple yet flexible way to structure the flow of data published by our devices and received by subscribed services. Your application's data should inform your topic structure. When constructing topics, it is beneficial to match a hierarchy. Let's envision our top level as all Memfault data for our example. Memfault contains projects, projects contain devices, and finally, devices produce chunks. Our example leads to the following topic template:
memfault/<project_slug|board_str>/<device_serial>/chunks
The template contains two parameters:
- project_slug or board_str: The URL-friendly version of your Memfault project name, or an identifying string for your hardware board
- device_serial: The device serial used to identify the device with Memfault
Of these two fields, device_serial is the most important, allowing the subscribed service to identify the device a chunk belongs to without encoding additional metadata in the published payload. The project_slug (or board_str) parameter is useful to handle messages by project selectively. The following section will outline how these two fields simplify subscription logic with wildcards.
When building your service on AWS IoT Core, using Basic Ingest Topics (topics
starting with $aws/rules/
) can help reduce your overall cost. See
this page
of the AWS documentation for more info. All our example topics in this page will
work with Basic Ingest.
Easy Subscription Rules With Wildcards
Using our previously defined topic string template, we can now define a subscription for our services to process Memfault chunks. MQTT provides a wildcard syntax to match multiple topics for a single subscription. In other words, we don't need to subscribe to each device's chunks topic. We use a wildcard to subscribe to all devices at once!
Per Project Chunks Subscription
The recommended topic to subscribe to all device chunks of a single project is:
memfault/<project_slug>/+/chunks
By using the +
wildcard, we will receive all chunks from the devices in this
project. The two pieces of information we'll need to send the chunks to Memfault
are the device serial and the Project Key. Since we're only selecting a single
project, injecting the Project Key as a single value into our service is a
simple option. For the device serial, we will need to extract this from the
packet metadata. The client library used with your service will have a field
containing this information. We'll use the Project Key to set our HTTP header
and the device serial to construct our URL.
Generic Chunks Subscription
Using a similar pattern, we can also subscribe to all chunks for all devices in any project! We need a second single-level wildcard:
memfault/+/+/chunks
With two +
wildcards, our subscription will match the following topics:
- memfault/project-slug-a/device_1/chunks
- memfault/project-slug-b/device_2/chunks
We use the same logic to extract the device serial as before, but the service now uses a mapping to translate between the topic's project slug and the respective Project Key.
Minimizing Publishing Overhead With Topic Aliases
MQTT topics use UTF-8 string encoding, which is great for readability but bad for efficiency. Topic strings are required metadata with each PUBLISH packet, which adds overhead every time we send new data to the server. MQTT v5.0 provides a topic alias property to map this topic string to an integer. A PUBLISH packet can contain a topic alias integer. The broker will map subsequent messages with this topic alias value to the topic string. The diagram below outlines this process:
MQTT Servers and Clients each set a maximum number of aliases to control the size of the tables required to map the topic strings to their alias integers. AWS Core IoT limits this to 8 per connection, effectively the max per device connecting to the service.
Choosing An MQTT Payload Size
The specification allows for MQTT packets up to ~256 MB in length, but in practice, the client and server will define one much smaller. MQTT v5.0 configures this parameter when creating a new connection with the MQTT server. The server and client can set max sizes independently of the other. The primary constraint we must satisfy when calculating the maximum chunk size is producing a packet within the minimum Maximum Packet Size allowed by each client receiving chunks. You will want to confirm this for each client subscribing to your device's Memfault topics. The following formula represents this constraint:
Memfault Chunk Size ≤ MIN(MQTT Maximum Packet Size) - MQTT header size
For systems using MQTT v3.1.1 and below, unfortunately, the best way to determine the maximum packet size allowed is by consulting your broker's configuration and through experimentation. Clients and servers can drop packets that are too large. The most straightforward test is to publish packets of varying sizes and observe what packet size your service receives without error. Enabling a QoS > 0 helps diagnose these issues by surfacing publishing failures via acknowledgments.
Once we know our maximum packet size, we need to determine the header size of our packet. Multiple factors influence the size of the header, including:
- Variable-size integers
- Packet type (CONNECT/PUBLISH/SUBSCRIBE/etc)
- Variable header properties
Of these three, the variable header properties fluctuate the most. In the case of PUBLISH packets, the topic name is part of the variable header properties. Since this property is a string, longer topic names produce larger packets. The topic name length is why Topic Aliases can be so valuable for cutting down packet overhead from long topic names.
For example, let's assume our maximum packet size is 1024 (default packet size for ESP-IDF). We'll focus on a simple PUBLISH packet as this is generally the largest packet sent. The Memfault chunk payload dominates the PUBLISH packet size. In this case, we budget at least:
- 1 byte for the Fixed Header
- 2 bytes Remaining Length varint (2 bytes needed to encode a max length of 1024)
- 2 byte length + UTF-8 encoded length of the topic string
PUBLISH packets contain a topic name as part of their variable header section.
For this example, let's assume our project slug is mqtt-project
, and our
device serial is mqtt-12345
, which yields a topic string of
memfault/mqtt-project/mqtt-12345/chunks
that is 41 bytes encoded. Putting this
all together now we have:
- 1 byte for Fixed Header
- 2 bytes Remaining Length varint
- 2 byte length + 41 bytes for topic string
The header size of 46 bytes leaves 978 bytes remaining for Memfault chunk data.
While there are technically some optimizations in minimizing TCP segmentation, these gains, in practice, are minimal and out of the scope of this document. The tradeoff of increasing the total packets required to transmit all Memfault data may not be worth the decreased overhead.
Device and Service Examples
The following examples implement the basic recommendations discussed in this document and are a jumping-off point for your application.
Device Example (ESP-IDF)
Here's a simple implementation for forwarding chunks using ESP-IDF's MQTT client:
static char memfault_mqtt_topic[128];
// construct the topic string; can be done once on system initialization
void initialize_mqtt_topic(void) {
// could be use to control routing for different board types
const char *board_str = "example";
const char *device_serial = "DEMOSERIAL";
sprintf(memfault_mqtt_topic, "memfault/%s/%s/chunks", board_str,
device_serial);
}
void publish_memfault_data(esp_mqtt_client_handle_t client) {
// fetch memfault chunk data
uint8_t chunk_buffer[128]; // size appropriately
size_t chunk_len = sizeof(chunk_buffer);
bool data_available = memfault_packetizer_get_chunk(chunk_buffer, &chunk_len);
if (data_available) {
// publish the data
int msg_id = esp_mqtt_client_publish(client, memfault_mqtt_topic,
chunk_buffer, chunk_len, 1, 0);
assert(msg_id >= 0);
}
}
The example found in the memfault-firmware-sdk at
examples/esp32/apps/memfault_demo_app
3 contains code to forward Memfault
chunks periodically over MQTT. A Kconfig controls the transport used (HTTP vs.
MQTT). To build the MQTT version of the application, check out the README.md.
This example will publish Memfault chunks to the topic
memfault/<project_slug>/<device_serial>/chunks
. The chunk size is tuned to fit
within our example maximum packet size. The MQTT client uses a QoS of 1 to
transmit chunks.
Additional Device Examples
Here are some other example MQTT projects to get started posting Memfault data:
- Cypress PSoC64 + AWS IoT
- Ezurio Pinnacle™ 100 + AWS IoT (formerly Laird Connectivity Pinnacle™ 100)
Example Metrics
This example includes some recommended metrics to measure the performance and behavior of your MQTT device. Use these metrics as Heartbeat metrics, which the device measures and then resets at a consistent interval. This interval depends on your application, but one hour is a good rule of thumb.
- MQTT Publish Bytes
Measures the total size of data sent via PUBLISH packet payloads in bytes. Packet size is typically a pricing factor for MQTT services.
- MQTT Publish Count
Measures the number of PUBLISH packets sent to the broker. For many MQTT services, this is one of the key cost factors.
- MQTT Connection Error Duration
Measures unexpected time spent disconnected from the MQTT broker. Some devices may want to disconnect from the server, so this metric does not include these periods.
- MQTT Connection Duration
Measures time spent in active connection with the server. This metric can aid in determining service cost and as a proxy for battery usage.
If you'd like more best practices with metrics, check out our recommendations.
Broker Setup
Several options are available for setting up a broker to test your application. A simple way to get started is to run a local broker using the Cedalo MQTT Management Center4. This setup produces a series of containers running an MQTT broker and a management console web application on top of it. You can also set up a standalone Mosquitto broker or use an instance of AWS IoT Core (see our example CloudFormation setup in the Pinnacle 100 Guide5).
Service Examples
The final piece in our system will be a service that forwards Memfault chunks from the broker to Memfault over HTTP. Our service will act as a bridge, connecting the MQTT components (our device and broker) to Memfault over HTTP.
- Python w/ Paho
- AWS Lambda w/ JS
Below is a simple script which does the following:
- Connects to our MQTT broker
- Subscribes to all
/chunks
topics in all projects with the topic filtermemfault/+/+/chunks
- Builds a chunks HTTP post request by extracting the device serial from the topic and adding the Project Key as a header
- Posts the request to Memfault's
chunks
endpoint
from typing import Any
import paho.mqtt.client as mqtt
import requests
# MQTT broker configuration
broker_address = "localhost" # Add your broker IP address
broker_port = 1883
broker_username = "your_username"
broker_password = "your_password"
# Fill in with your project and key mapping
memfault_project_key_dict = {"my_project": "your_project_key"}
# Memfault HTTP endpoint configuration
memfault_chunks_endpoint = "https://chunks.memfault.com/api/v0/chunks"
def on_connect(client: mqtt.Client, userdata: Any, flags: dict[str, int], rc: int):
if rc == 0:
print("Connected to MQTT broker")
client.subscribe("memfault/+/+/chunks") # Subscribe to all chunks sent to broker
else:
print("Connection failed")
def on_message(client: mqtt.Client, userdata: Any, msg: mqtt.MQTTMessage):
topic_parts = msg.topic.split("/")
if len(topic_parts) == 4 and topic_parts[0] == "memfault" and topic_parts[3] == "chunks":
device_serial = topic_parts[2]
project_slug = topic_parts[1]
process_memfault_chunk(project_slug, device_serial, msg.payload)
else:
print(f"Message on {msg.topic}")
def process_memfault_chunk(project_slug: str, device_serial: str, chunk_data: bytes):
# Construct the path for the Memfault chunks API endpoint based on the device serial number
endpoint = f"{memfault_chunks_endpoint}/{device_serial}"
# Forward the Memfault chunk data to the Memfault chunks API endpoint
headers = {
"Content-Type": "application/octet-stream",
"Memfault-Project-Key": memfault_project_key_dict[project_slug],
}
response = requests.post(endpoint, data=chunk_data, headers=headers)
if response.status_code == 202:
print(f"Memfault chunk[len: {len(chunk_data)}] for device {device_serial} forwarded successfully")
else:
print(f"Failed to forward Memfault chunk for device {device_serial}: {response.text} with code {response.status_code}")
def main():
client = mqtt.Client()
client.username_pw_set(broker_username, broker_password)
client.on_connect = on_connect
client.on_message = on_message
client.connect(broker_address, broker_port, 60)
client.loop_forever()
if __name__ == "__main__":
main()
Forwarding data to Memfault from AWS IoT is very easy! You just need to configure an AWS IoT Rule for the topic and connect it to an AWS Lambda function. This can either be done manually in the AWS console or programmatically using CloudFormation.
- CloudFormation
- Manually
These steps assume you have installed the AWS CLI and have set up credentials to access your AWS instance.
- Copy and paste the contents below into
cfnTemplate_memfault.yml
- Deploy the configuration
$ aws cloudformation deploy --template cfnTemplate_memfault.yml --stack-name memfault-mqtt-chunk-proxy --capabilities CAPABILITY_NAMED_IAM
Waiting for changeset to be created..
Waiting for stack create/update to complete
cfnTemplate_memfault.yml file contents
AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Description: Memfault MQTT Chunk Proxy
Parameters:
ApplicationName:
Type: String
Default: memfault-chunk-proxy
Resources:
LambdaIotRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub ${ApplicationName}-lambda-iot-role
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: !Sub /${ApplicationName}/
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AWSIoTFullAccess
Policies:
- PolicyName: LambdaIotPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- logs:CreateLogStream
- logs:PutLogEvents
Resource:
- arn:aws:logs:*:*:log-group:/aws/lambda/*
#
# Proxy Memfault "chunks" to Memfault cloud for processing
# For more details about data transport see https://mflt.io/data-to-cloud
#
MemfaultProxyFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: "MemfaultMqttProxyFunction"
Code:
ZipFile: |
const https = require('https')
exports.https = https
exports.handler = (event, context, callback) => {
// Update with your Lambda's environment variable names or use AWS KMS
const projectKeys = {
"project_a" = env.process.MEMFAULT_PROJECT_A_KEY,
"project_b" = env.process.MEMFAULT_PROJECT_B_KEY,
};
const data = Buffer.from(event.data, 'base64');
// Topic Format: memfault/(<project_slug>|<board_str>)/<device_serial>/chunks
const topicParams = event.topic.split('/')
const deviceSerial = topicParams[2]
const projectKey = projectKeys[topicParams[1]]
const options = {
hostname: 'chunks.memfault.com',
port: 443,
path: `/api/v0/chunks/${deviceSerial}`,
method: 'POST',
headers: {
'Content-Type': 'application/octet-stream',
'Content-Length': data.length,
'Memfault-Project-Key': projectKey
}
}
const req = https.request(options, res => {
const response = {
statusCode: res.statusCode
};
callback(null, response);
})
req.on('error', error => {
console.error(error)
})
req.write(data)
req.end()
}
Handler: "index.handler"
Role: !GetAtt LambdaIotRole.Arn
Runtime: nodejs12.x
Timeout: 15
Environment:
Variables:
ApplicationName: !Ref ApplicationName
MemfaultProxyFunctionLogGroup:
Type: AWS::Logs::LogGroup
DependsOn: MemfaultProxyFunction
Properties:
RetentionInDays: 14
LogGroupName: !Join ["", ["/aws/lambda/", !Ref MemfaultProxyFunction]]
MemfaultProxyFunctionPermission:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
FunctionName: !GetAtt
- MemfaultProxyFunction
- Arn
Principal: iot.amazonaws.com
SourceArn: !Sub arn:aws:iot:${AWS::Region}:${AWS::AccountId}:rule/${MemfaultProxyRule}
MemfaultProxyRule:
Type: AWS::IoT::TopicRule
Properties:
RuleName: "MemfaultMqttProxyRule"
TopicRulePayload:
AwsIotSqlVersion: "2016-03-23"
RuleDisabled: false
Sql:
!Sub SELECT encode(*, 'base64') AS data, topic() AS topic FROM
'memfault/+/+/chunks/'
Actions:
- Lambda:
FunctionArn: !GetAtt MemfaultProxyFunction.Arn
Create AWS Lambda
First you will need to create an AWS Lambda to forward data to Memfault.
const https = require("https");
exports.https = https;
exports.handler = (event, context, callback) => {
// Update with your Lambda's environment variable names or use AWS KMS
const projectKeys = {
"project_a" = env.process.MEMFAULT_PROJECT_A_KEY,
"project_b" = env.process.MEMFAULT_PROJECT_B_KEY,
};
const data = Buffer.from(event.data, 'base64');
// Topic Format: memfault/(<project_slug>|<board_str>)/<device_serial>/chunks
const topicParams = event.topic.split('/')
const deviceSerial = topicParams[2]
const projectKey = projectKeys[topicParams[1]]
const options = {
hostname: 'chunks.memfault.com',
port: 443,
path: `/api/v0/chunks/${deviceSerial}`,
method: 'POST',
headers: {
'Content-Type': 'application/octet-stream',
'Content-Length': data.length,
'Memfault-Project-Key': projectKey
}
}
const req = https.request(options, res => {
const response = {
statusCode: res.statusCode
};
callback(null, response);
})
req.on('error', error => {
console.error(error)
})
req.write(data)
req.end()
}
Create AWS IoT Rule
Next you will need to create a new AWS IoT Rule:
SELECT encode(*, 'base64') AS data, topic() AS topic FROM 'memfault/+/+/chunks/'
Finally, as part of the setup you will need to attach the Lambda you just created to the AWS IoT rule so that the lambda gets triggered each time new data arrives.
The simple examples we described should get any project started using Memfault with MQTT. The topic template described provides a flexible manner to add Memfault chunks to any broker, new or established. MQTT's publish/subscribe paradigm makes it easy to send chunks from your downstream devices up to a service for forwarding to Memfault. More advanced setups will enable load balancing across brokers and services or enable QoS 2 to ensure processing data exactly one time between multiple subscribers.