Uploading Data With MQTT
Introduction
Devices that utilize an MQTT connection may prefer to re-use it to send Memfault chunk data. This has the following advantages:
- It doesn't require an HTTP client implementation on the device.
- It doesn't require a separate TLS session (possibly concurrent) and certificate management, which can reduce memory requirements.
Since the data is not flowing directly from the device to Memfault's chunk REST endpoint, some additional work is required to forward the data from the MQTT server to Memfault.
Here are some example projects that use MQTT to post Memfault data:
Recommended QoS Level
We recommend using Quality of Service level 1 or 2:
QoS Level | Description | Comment |
---|---|---|
0 | At most once delivery | ⚠️ may result in dropped data |
1 | At least once delivery | ✅ Memfault tolerates duplicate packets |
2 | Exactly once delivery | ✅ Ideal QoS level, but adds an additional round trip |
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718099
Recommended Topic Format
We recommend the following topic format for publishing Memfault chunk data (the following examples assume this topic is used):
prod/<board>/<device_id>/memfault/<memfault_project_key>/chunk
Publishing Memfault Chunks
An example of publishing Memfault chunks with an MQTT client (here it's based on the ESP32 MQTT client example):
static char memfault_mqtt_topic[128];
// construct the topic string; can be done once on system initialization
void initialize_mqtt_topic(void) {
// could be use to control routing for different board types
const char *board_str = "example";
const char *device_serial = "DEMOSERIAL";
const char *memfault_project_key = "<memfault project key>";
sprintf(memfault_mqtt_topic, "prod/%s/%s/memfault/%s/chunk", board_str,
device_serial, memfault_project_key);
}
void publish_memfault_data(esp_mqtt_client_handle_t client) {
// fetch memfault chunk data
uint8_t chunk_buffer[128]; // size appropriately
size_t chunk_len = sizeof(chunk_buffer);
bool data_available = memfault_packetizer_get_chunk(chunk_buffer, &chunk_len);
if (data_available) {
// publish the data
int msg_id = esp_mqtt_client_publish(client, memfault_mqtt_topic,
chunk_buffer, chunk_len, 0, 0);
assert(msg_id >= 0);
}
}
Integration With Specific MQTT Brokers
AWS-IoT
Forwarding data to Memfault from AWS IoT is very easy! You just need to configure an AWS IoT Rule for the topic and connect it to an AWS Lambda function. This can either be done manually in the AWS console or programmatically using CloudFormation.
- CloudFormation
- Manually
These steps assume you have installed the AWS CLI and have set up credentials to access your AWS instance.
- Copy and paste the contents below into
cfnTemplate_memfault.yml
- Deploy the configuration
$ aws cloudformation deploy --template cfnTemplate_memfault.yml --stack-name memfault-pinnacle-100-chunk-proxy --capabilities CAPABILITY_NAMED_IAM
Waiting for changeset to be created..
Waiting for stack create/update to complete
cfnTemplate_memfault.yml file contents
AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Description: Memfault Pinnacle 100 Chunk Proxy
Parameters:
ApplicationName:
Type: String
Default: memfault-chunk-proxy
Resources:
LambdaIotRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub ${ApplicationName}-lambda-iot-role
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: !Sub /${ApplicationName}/
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AWSIoTFullAccess
Policies:
- PolicyName: LambdaIotPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- logs:CreateLogStream
- logs:PutLogEvents
Resource:
- arn:aws:logs:*:*:log-group:/aws/lambda/*
#
# Proxy Memfault "chunks" to Memfault cloud for processing
# For more details about data transport see https://mflt.io/data-to-cloud
#
MemfaultProxyFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: "MemfaultPinnacle100ProxyFunction"
Code:
ZipFile: |
const https = require('https')
exports.https = https
exports.handler = (event, context, callback) => {
const data = Buffer.from(event.data, 'base64');
// Topic Format: prod/<board>/<device_id>/memfault/<memfault_project_key>/chunk
const topicParams = event.topic.split('/')
const deviceSerial = topicParams[2]
const options = {
hostname: 'chunks.memfault.com',
port: 443,
path: `/api/v0/chunks/${deviceSerial}`,
method: 'POST',
headers: {
'Content-Type': 'application/octet-stream',
'Content-Length': data.length,
'Memfault-Project-Key': topicParams[4]
}
}
const req = https.request(options, res => {
const response = {
statusCode: res.statusCode
};
callback(null, response);
})
req.on('error', error => {
console.error(error)
})
req.write(data)
req.end()
}
Handler: "index.handler"
Role: !GetAtt LambdaIotRole.Arn
Runtime: nodejs12.x
Timeout: 15
Environment:
Variables:
ApplicationName: !Ref ApplicationName
MemfaultProxyFunctionLogGroup:
Type: AWS::Logs::LogGroup
DependsOn: MemfaultProxyFunction
Properties:
RetentionInDays: 14
LogGroupName: !Join ["", ["/aws/lambda/", !Ref MemfaultProxyFunction]]
MemfaultProxyFunctionPermission:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
FunctionName: !GetAtt
- MemfaultProxyFunction
- Arn
Principal: iot.amazonaws.com
SourceArn: !Sub arn:aws:iot:${AWS::Region}:${AWS::AccountId}:rule/${MemfaultProxyRule}
MemfaultProxyRule:
Type: AWS::IoT::TopicRule
Properties:
RuleName: "MemfaultPinnacle100ProxyRule"
TopicRulePayload:
AwsIotSqlVersion: "2016-03-23"
RuleDisabled: false
Sql:
!Sub SELECT encode(*, 'base64') AS data, topic() AS topic FROM
'prod/+/+/memfault/#'
Actions:
- Lambda:
FunctionArn: !GetAtt MemfaultProxyFunction.Arn
Create AWS Lambda
First you will need to create an AWS Lambda to forward data to Memfault.
const https = require("https");
exports.https = https;
exports.handler = (event, context, callback) => {
const data = Buffer.from(event.data, "base64");
// Topic Format: prod/<board>/<device_id>/memfault/<memfault_project_key>/chunk
const topicParams = event.topic.split("/");
const deviceSerial = topicParams[2];
const options = {
hostname: "chunks.memfault.com",
port: 443,
path: `/api/v0/chunks/${deviceSerial}`,
method: "POST",
headers: {
"Content-Type": "application/octet-stream",
"Content-Length": data.length,
"Memfault-Project-Key": topicParams[4],
},
};
const req = https.request(options, (res) => {
const response = {
statusCode: res.statusCode,
};
callback(null, response);
});
req.on("error", (error) => {
console.error(error);
});
req.write(data);
req.end();
};
Create AWS IoT Rule
Next you will need to create a new AWS IoT Rule:
SELECT encode(*, 'base64') AS data, topic() AS topic FROM 'prod/+/+/memfault/#'
Finally, as part of the setup you will need to attach the Lambda you just created to the AWS IoT rule so that the lambda gets triggered each time new data arrives.