MQTT to InfluxDB and Grafana
Architecture Overview
The full telemetry pipeline runs: Meshtastic node → MQTT broker →
Telegraf (or Python subscriber) → InfluxDB 2.x → Grafana dashboard.
Each component is independently replaceable, so you can swap Telegraf for a
custom Python script without touching InfluxDB or Grafana. This pipeline
requires the gateway to publish decoded JSON (mqtt.json_enabled true);
note that JSON-over-MQTT is not supported on nRF52 gateways (RAK4631, etc.) -
use an ESP32 gateway.
InfluxDB 2.x Setup
Install InfluxDB 2.x on a server or Raspberry Pi:
# Debian / Ubuntu / Raspberry Pi OS
wget https://dl.influxdata.com/influxdb/releases/influxdb2-2.7.1-amd64.deb
sudo dpkg -i influxdb2-2.7.1-amd64.deb
sudo systemctl enable --now influxdb
After installation, visit http://<host>:8086 to complete
the setup wizard. Create an organisation (e.g.
meshamerica), a bucket (e.g.
meshtastic), and generate an API token with
write access to that bucket. Store the token - you will need it for both
Telegraf and the Python client.
Telegraf MQTT Consumer
Telegraf is a convenient path from MQTT to InfluxDB. Install it, then add a
stanza to /etc/telegraf/telegraf.conf. Subscribe only to the JSON
subtree - the bare msh/# wildcard also catches the binary
protobuf 2/e/ topics, which the JSON parser cannot decode:
[[inputs.mqtt_consumer]]
servers = ["tcp://localhost:1883"]
topics = ["msh/+/2/json/#"]
qos = 0
data_format = "json_v2"
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.object]]
path = "payload"
tags = []
# flattens the nested payload object (temperature, relative_humidity, ...)
[[outputs.influxdb_v2]]
urls = ["http://localhost:8086"]
token = "YOUR_INFLUXDB_TOKEN"
org = "meshamerica"
bucket = "meshtastic"
Meshtastic's JSON nests the numeric metrics under a payload
object, so a plain data_format = "json" will not reach
temperature/humidity by default - you must use the json_v2 parser
(or a path/query) to flatten payload.*, or use the Python
subscriber below for full control. Ensure mqtt.json_enabled true
is set on the gateway. Restart Telegraf after any config change:
sudo systemctl restart telegraf.
Python Subscriber (Alternative)
If you need to apply custom logic, use a Python subscriber instead. For a
JSON workflow (mqtt.json_enabled true) the gateway already
publishes decoded JSON, so no protobuf module is needed - parse the JSON
directly. (If you ever do need the protobuf ServiceEnvelope, the
current import path is from meshtastic.protobuf import mesh_pb2.)
pip install paho-mqtt influxdb-client
import paho.mqtt.client as mqtt
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import json, os
INFLUX_URL = "http://localhost:8086"
INFLUX_TOKEN = os.environ["INFLUX_TOKEN"]
INFLUX_ORG = "meshamerica"
INFLUX_BUCKET = "meshtastic"
client_db = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
write_api = client_db.write_api(write_options=SYNCHRONOUS)
# Optional: map node IDs to long names by caching NODEINFO_APP messages
# (payload.longname keyed by "from"); Meshtastic JSON has no top-level "fromName".
node_names = {}
def on_message(client, userdata, msg):
try:
envelope = json.loads(msg.payload)
node_id = envelope.get("from", "unknown")
# NODEINFO packets carry the long name in payload.longname
if envelope.get("type") == "nodeinfo":
ln = envelope.get("payload", {}).get("longname")
if ln:
node_names[node_id] = ln
return
if envelope.get("type") != "telemetry":
return
node_name = node_names.get(node_id, node_id)
tel = envelope.get("payload", {}) # telemetry fields are flat, snake_case
point = (
Point("telemetry")
.tag("node_id", node_id)
.tag("node_name", node_name)
.field("temperature", tel.get("temperature"))
.field("humidity", tel.get("relative_humidity"))
.field("battery_voltage", tel.get("voltage"))
)
write_api.write(bucket=INFLUX_BUCKET, record=point)
except Exception as e:
print(f"Parse error: {e}")
mqttc = mqtt.Client()
mqttc.on_message = on_message
mqttc.connect("localhost", 1883)
mqttc.subscribe("msh/+/2/json/#")
mqttc.loop_forever()
Note: Meshtastic's JSON envelope has no top-level
rxSnr/rxRssi (or snr/rssi)
fields, and telemetry metrics are flat snake_case under payload
(e.g. relative_humidity, not relativeHumidity). Add
signal metrics only if a downstream processor explicitly supplies them.
InfluxDB Line Protocol
Whether you use Telegraf or Python, each telemetry reading lands in InfluxDB as a measurement with tags and fields:
telemetry,node_id=!abc12345,node_name=ridge-repeater temperature=22.4,humidity=61.0,battery_voltage=3.91 1746230400000000000
- Measurement:
telemetry - Tags:
node_id,node_name(indexed, low-cardinality) - Fields:
temperature,humidity,battery_voltage(addsnr/rssionly if a downstream processor supplies them; they are not in the gateway's JSON)
Grafana Setup
Install Grafana on the same host or a separate machine, then add InfluxDB as a data source using the Flux query language:
- Configuration → Data Sources → Add data source → InfluxDB
- Set Query Language to Flux
- URL:
http://localhost:8086 - Paste your InfluxDB org, bucket, and token
Recommended dashboard panels:
- Temperature over time (line graph per node)
- Battery voltage trends (multi-node time series)
- Node count (stat panel - unique node_id values in last hour)
- Signal quality map (table of last RSSI/SNR per node)
Example Flux query - temperature trend for last 24 hours:
from(bucket: "meshtastic")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "telemetry" and r._field == "temperature")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
Retention Policies and Downsampling
In InfluxDB 2.x, retention is set per bucket. For raw telemetry data set the
bucket retention to 90 days. Create a second bucket (e.g.
meshtastic_hourly) with indefinite retention and use a Flux task
to downsample:
// Task: run every 1h - downsample telemetry to hourly averages
option task = { name: "downsample_telemetry", every: 1h }
from(bucket: "meshtastic")
|> range(start: -2h)
|> filter(fn: (r) => r._measurement == "telemetry")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|> to(bucket: "meshtastic_hourly", org: "meshamerica")
Alerting
Grafana alerting fires when conditions are met. Useful rules for mesh deployments:
- Low battery:
battery_voltage < 3.5for any node - triggers SMS or email alert. - Temperature exceeded:
temperature > 50- useful for enclosure health monitoring in summer. - Node offline: no telemetry from a given
node_idfor more than N hours - check the "Last seen" derived field using a Fluxlast()query against the current time.