MQTT to InfluxDB and Grafana
Architecture Overview
The full telemetry pipeline runs: Meshtastic node → MQTT broker → Telegraf (or Python subscriber) → InfluxDB 2.x → Grafana dashboard. Each component is independently replaceable, so you can swap Telegraf for a custom Python script without touching InfluxDB or Grafana.
InfluxDB 2.x Setup
Install InfluxDB 2.x on a server or Raspberry Pi:
# Debian / Ubuntu / Raspberry Pi OS
wget https://dl.influxdata.com/influxdb/releases/influxdb2-2.7.1-amd64.deb
sudo dpkg -i influxdb2-2.7.1-amd64.deb
sudo systemctl enable --now influxdb
After installation, visit http://<host>:8086 to complete
the setup wizard. Create an organisation (e.g.
meshamerica), a bucket (e.g.
meshtastic), and generate an API token with
write access to that bucket. Store the token — you will need it for both
Telegraf and the Python client.
Telegraf MQTT Consumer
Telegraf is the easiest path from MQTT to InfluxDB. Install it, then add a
stanza to /etc/telegraf/telegraf.conf:
[[inputs.mqtt_consumer]]
servers = ["tcp://localhost:1883"]
topics = ["msh/#"]
qos = 0
data_format = "json"
json_string_fields = ["node_id", "node_name"]
[[outputs.influxdb_v2]]
urls = ["http://localhost:8086"]
token = "YOUR_INFLUXDB_TOKEN"
org = "meshamerica"
bucket = "meshtastic"
Telegraf will subscribe to the msh/# wildcard, parse each
arriving JSON payload, and write every numeric field directly to InfluxDB.
Restart Telegraf after any config change: sudo systemctl restart
telegraf.
Python Subscriber (Alternative)
If you need to decode Meshtastic protobuf packets or apply custom logic, use a Python subscriber instead:
pip install paho-mqtt meshtastic influxdb-client
import paho.mqtt.client as mqtt
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import meshtastic.mesh_pb2 as mesh_pb2, json, os
INFLUX_URL = "http://localhost:8086"
INFLUX_TOKEN = os.environ["INFLUX_TOKEN"]
INFLUX_ORG = "meshamerica"
INFLUX_BUCKET = "meshtastic"
client_db = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
write_api = client_db.write_api(write_options=SYNCHRONOUS)
def on_message(client, userdata, msg):
try:
payload = json.loads(msg.payload)
node_id = payload.get("from", "unknown")
node_name = payload.get("fromName", node_id)
tel = payload.get("payload", {})
point = (
Point("telemetry")
.tag("node_id", node_id)
.tag("node_name", node_name)
.field("temperature", tel.get("temperature"))
.field("humidity", tel.get("relativeHumidity"))
.field("battery_voltage", tel.get("voltage"))
.field("snr", payload.get("rxSnr"))
.field("rssi", payload.get("rxRssi"))
)
write_api.write(bucket=INFLUX_BUCKET, record=point)
except Exception as e:
print(f"Parse error: {e}")
mqttc = mqtt.Client()
mqttc.on_message = on_message
mqttc.connect("localhost", 1883)
mqttc.subscribe("msh/#")
mqttc.loop_forever()
InfluxDB Line Protocol
Whether you use Telegraf or Python, each telemetry reading lands in InfluxDB as a measurement with tags and fields:
telemetry,node_id=!abc12345,node_name=ridge-repeater temperature=22.4,humidity=61.0,battery_voltage=3.91,snr=8.5,rssi=-87 1746230400000000000
- Measurement:
telemetry - Tags:
node_id,node_name(indexed, low-cardinality) - Fields:
temperature,humidity,battery_voltage,snr,rssi
Grafana Setup
Install Grafana on the same host or a separate machine, then add InfluxDB as a data source using the Flux query language:
- Configuration → Data Sources → Add data source → InfluxDB
- Set Query Language to Flux
- URL:
http://localhost:8086 - Paste your InfluxDB org, bucket, and token
Recommended dashboard panels:
- Temperature over time (line graph per node)
- Battery voltage trends (multi-node time series)
- Node count (stat panel — unique node_id values in last hour)
- Signal quality map (table of last RSSI/SNR per node)
Example Flux query — temperature trend for last 24 hours:
from(bucket: "meshtastic")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "telemetry" and r._field == "temperature")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
Retention Policies and Downsampling
In InfluxDB 2.x, retention is set per bucket. For raw telemetry data set the
bucket retention to 90 days. Create a second bucket (e.g.
meshtastic_hourly) with indefinite retention and use a Flux task
to downsample:
// Task: run every 1h — downsample telemetry to hourly averages
option task = { name: "downsample_telemetry", every: 1h }
from(bucket: "meshtastic")
|> range(start: -2h)
|> filter(fn: (r) => r._measurement == "telemetry")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|> to(bucket: "meshtastic_hourly", org: "meshamerica")
Alerting
Grafana alerting fires when conditions are met. Useful rules for mesh deployments:
- Low battery:
battery_voltage < 3.5for any node — triggers SMS or email alert. - Temperature exceeded:
temperature > 50— useful for enclosure health monitoring in summer. - Node offline: no telemetry from a given
node_idfor more than N hours — check the "Last seen" derived field using a Fluxlast()query against the current time.
No comments to display
No comments to display