Skip to main content

MQTT to InfluxDB and Grafana

Architecture Overview

The full telemetry pipeline runs: Meshtastic node → MQTT broker → Telegraf (or Python subscriber) → InfluxDB 2.x → Grafana dashboard. Each component is independently replaceable, so you can swap Telegraf for a custom Python script without touching InfluxDB or Grafana.

InfluxDB 2.x Setup

Install InfluxDB 2.x on a server or Raspberry Pi:

# Debian / Ubuntu / Raspberry Pi OS
wget https://dl.influxdata.com/influxdb/releases/influxdb2-2.7.1-amd64.deb
sudo dpkg -i influxdb2-2.7.1-amd64.deb
sudo systemctl enable --now influxdb

After installation, visit http://<host>:8086 to complete the setup wizard. Create an organisation (e.g. meshamerica), a bucket (e.g. meshtastic), and generate an API token with write access to that bucket. Store the token - you will need it for both Telegraf and the Python client.

Telegraf MQTT Consumer

Telegraf is the easiest path from MQTT to InfluxDB. Install it, then add a stanza to /etc/telegraf/telegraf.conf:

[[inputs.mqtt_consumer]]
 servers = ["tcp://localhost:1883"]
 topics = ["msh/#"]
 qos = 0
 data_format = "json"
 json_string_fields = ["node_id", "node_name"]

[[outputs.influxdb_v2]]
 urls = ["http://localhost:8086"]
 token = "YOUR_INFLUXDB_TOKEN"
 org = "meshamerica"
 bucket = "meshtastic"

Telegraf will subscribe to the msh/# wildcard, parse each arriving JSON payload, and write every numeric field directly to InfluxDB. Restart Telegraf after any config change: sudo systemctl restart telegraf.

Python Subscriber (Alternative)

If you need to decode Meshtastic protobuf packets or apply custom logic, use a Python subscriber instead:

pip install paho-mqtt meshtastic influxdb-client
import paho.mqtt.client as mqtt
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import meshtastic.mesh_pb2 as mesh_pb2, json, os

INFLUX_URL = "http://localhost:8086"
INFLUX_TOKEN = os.environ["INFLUX_TOKEN"]
INFLUX_ORG = "meshamerica"
INFLUX_BUCKET = "meshtastic"

client_db = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
write_api = client_db.write_api(write_options=SYNCHRONOUS)

def on_message(client, userdata, msg):
 try:
 payload = json.loads(msg.payload)
 node_id = payload.get("from", "unknown")
 node_name = payload.get("fromName", node_id)
 tel = payload.get("payload", {})
 point = (
 Point("telemetry")
 .tag("node_id", node_id)
 .tag("node_name", node_name)
 .field("temperature", tel.get("temperature"))
 .field("humidity", tel.get("relativeHumidity"))
 .field("battery_voltage", tel.get("voltage"))
 .field("snr", payload.get("rxSnr"))
 .field("rssi", payload.get("rxRssi"))
 )
 write_api.write(bucket=INFLUX_BUCKET, record=point)
 except Exception as e:
 print(f"Parse error: {e}")

mqttc = mqtt.Client()
mqttc.on_message = on_message
mqttc.connect("localhost", 1883)
mqttc.subscribe("msh/#")
mqttc.loop_forever()

InfluxDB Line Protocol

Whether you use Telegraf or Python, each telemetry reading lands in InfluxDB as a measurement with tags and fields:

telemetry,node_id=!abc12345,node_name=ridge-repeater temperature=22.4,humidity=61.0,battery_voltage=3.91,snr=8.5,rssi=-87 1746230400000000000
  • Measurement: telemetry
  • Tags: node_id, node_name (indexed, low-cardinality)
  • Fields: temperature, humidity, battery_voltage, snr, rssi

Grafana Setup

Install Grafana on the same host or a separate machine, then add InfluxDB as a data source using the Flux query language:

  1. Configuration → Data Sources → Add data source → InfluxDB
  2. Set Query Language to Flux
  3. URL: http://localhost:8086
  4. Paste your InfluxDB org, bucket, and token
  • Temperature over time (line graph per node)
  • Battery voltage trends (multi-node time series)
  • Node count (stat panel - unique node_id values in last hour)
  • Signal quality map (table of last RSSI/SNR per node)

Example Flux query - temperature trend for last 24 hours:

from(bucket: "meshtastic")
 |> range(start: -24h)
 |> filter(fn: (r) => r._measurement == "telemetry" and r._field == "temperature")
 |> aggregateWindow(every: 5m, fn: mean, createEmpty: false)

Retention Policies and Downsampling

In InfluxDB 2.x, retention is set per bucket. For raw telemetry data set the bucket retention to 90 days. Create a second bucket (e.g. meshtastic_hourly) with indefinite retention and use a Flux task to downsample:

// Task: run every 1h - downsample telemetry to hourly averages
option task = { name: "downsample_telemetry", every: 1h }

from(bucket: "meshtastic")
 |> range(start: -2h)
 |> filter(fn: (r) => r._measurement == "telemetry")
 |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
 |> to(bucket: "meshtastic_hourly", org: "meshamerica")

Alerting

Grafana alerting fires when conditions are met. Useful rules for mesh deployments:

  • Low battery: battery_voltage < 3.5 for any node - triggers SMS or email alert.
  • Temperature exceeded: temperature > 50 - useful for enclosure health monitoring in summer.
  • Node offline: no telemetry from a given node_id for more than N hours - check the "Last seen" derived field using a Flux last() query against the current time.