Skip to main content

Automating Meshtastic: Practical Scripts

Automating Meshtastic: Practical Scripts

The Meshtastic Python library enables powerful automation workflows. This page provides four complete, ready-to-use scripts: a position logger, a message forwarder to Telegram, a battery monitor with alerts, and an automated network health reporter. Each script is self-contained and includes setup instructions.

Script 1: Position Logger to CSV

Logs GPS position updates from all mesh nodes to a CSV file in real time. Useful for tracking mobile assets, recording deployment surveys, or building a trace of network coverage over time.

# position_logger.py
# Logs all position packets from the Meshtastic mesh to a CSV file.
#
# Usage:
#     pip install meshtastic
#     python position_logger.py [--port /dev/ttyUSB0] [--output positions.csv]
import argparse
import csv
import datetime
import os
import sys
import time

import meshtastic
import meshtastic.serial_interface
import meshtastic.tcp_interface
from pubsub import pub

OUTPUT_FILE = "positions.csv"
CSV_FIELDS  = ["timestamp", "node_id", "long_name", "short_name",
               "latitude", "longitude", "altitude_m", "speed_kmh",
               "heading_deg", "snr_db", "hop_count"]


def get_interface(args):
    if args.host:
        return meshtastic.tcp_interface.TCPInterface(hostname=args.host)
    port = args.port or None
    return meshtastic.serial_interface.SerialInterface(devPath=port)


def main():
    parser = argparse.ArgumentParser(description="Log Meshtastic positions to CSV")
    parser.add_argument("--port",   help="Serial port (e.g. /dev/ttyUSB0 or COM4)")
    parser.add_argument("--host",   help="TCP hostname or IP address")
    parser.add_argument("--output", default=OUTPUT_FILE, help="Output CSV file path")
    args = parser.parse_args()

    file_exists = os.path.isfile(args.output)
    outfile = open(args.output, "a", newline="", encoding="utf-8")
    writer  = csv.DictWriter(outfile, fieldnames=CSV_FIELDS)
    if not file_exists:
        writer.writeheader()

    iface_ref = [None]  # mutable container for the interface

    def on_position(packet, interface):
        decoded  = packet.get("decoded", {})
        pos      = decoded.get("position", {})
        if not pos.get("latitudeI"):
            return  # no valid fix

        node_id   = packet.get("fromId", "unknown")
        nodes     = interface.nodes or {}
        node_info = nodes.get(node_id, {})
        user      = node_info.get("user", {})

        lat     = pos["latitudeI"] / 1e7
        lon     = pos["longitudeI"] / 1e7
        alt     = pos.get("altitude", 0)
        speed   = pos.get("groundSpeed", 0)
        heading = pos.get("groundTrack", 0)
        snr     = node_info.get("snr", "")
        hops    = packet.get("hopStart", 0) - packet.get("hopLimit", 0)

        row = {
            "timestamp":   datetime.datetime.utcnow().isoformat(),
            "node_id":     node_id,
            "long_name":   user.get("longName", ""),
            "short_name":  user.get("shortName", ""),
            "latitude":    f"{lat:.7f}",
            "longitude":   f"{lon:.7f}",
            "altitude_m":  alt,
            "speed_kmh":   speed * 3.6 if speed else 0,
            "heading_deg": heading / 100 if heading else 0,
            "snr_db":      snr,
            "hop_count":   hops,
        }
        writer.writerow(row)
        outfile.flush()
        print(f"[{row['timestamp']}] {node_id} ({row['long_name']}) "
              f"@ {lat:.5f},{lon:.5f} alt={alt}m")

    pub.subscribe(on_position, "meshtastic.receive.position")

    iface = get_interface(args)
    iface_ref[0] = iface
    print(f"Logging positions to {args.output}. Press Ctrl-C to stop.")

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        pass
    finally:
        iface.close()
        outfile.close()
        print("Logger stopped.")

if __name__ == "__main__":
    main()

Script 2: Message Forwarder to Telegram

Forwards all text messages received on the mesh to a Telegram chat. Requires a Telegram bot token (create one via @BotFather) and a chat ID. This is a popular pattern for monitoring a community mesh from a smartphone without needing Bluetooth or Wi-Fi proximity to a node.

# mesh_to_telegram.py
# Forwards Meshtastic text messages to a Telegram chat.
#
# Setup:
#     pip install meshtastic requests
#     Set environment variables:
#         TELEGRAM_TOKEN=<your bot token>
#         TELEGRAM_CHAT_ID=<chat id, e.g. -1001234567890>
#     python mesh_to_telegram.py [--host 192.168.1.42]
import os
import sys
import time
import datetime
import requests
import meshtastic
import meshtastic.serial_interface
import meshtastic.tcp_interface
from pubsub import pub

TELEGRAM_TOKEN   = os.environ.get("TELEGRAM_TOKEN", "")
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "")

if not TELEGRAM_TOKEN or not TELEGRAM_CHAT_ID:
    print("ERROR: Set TELEGRAM_TOKEN and TELEGRAM_CHAT_ID environment variables.")
    sys.exit(1)


def send_telegram(text: str):
    # Send a message to the configured Telegram chat.
    url  = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage"
    data = {"chat_id": TELEGRAM_CHAT_ID, "text": text, "parse_mode": "HTML"}
    try:
        resp = requests.post(url, json=data, timeout=10)
        resp.raise_for_status()
    except requests.RequestException as exc:
        print(f"Telegram send failed: {exc}")


def on_receive(packet, interface):
    decoded  = packet.get("decoded", {})
    if decoded.get("portnum") != "TEXT_MESSAGE_APP":
        return

    text      = decoded.get("text", "")
    from_id   = packet.get("fromId", "unknown")
    to_id     = packet.get("toId", "^all")
    hops      = packet.get("hopStart", 0) - packet.get("hopLimit", 0)
    timestamp = datetime.datetime.utcnow().strftime("%H:%M:%S UTC")

    nodes     = interface.nodes or {}
    node_info = nodes.get(from_id, {})
    long_name = node_info.get("user", {}).get("longName", from_id)

    dest_str = "broadcast" if to_id in ("^all", "4294967295") else to_id
    tg_msg   = (
        f"Mesh Message [{timestamp}]
"
        f"From: {long_name} ({from_id})
"
        f"To: {dest_str}  |  {hops} hop(s)
"
        f"
{text}"
    )

    print(f"Forwarding to Telegram: {text!r} from {long_name}")
    send_telegram(tg_msg)


import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--host",  help="TCP hostname or IP")
parser.add_argument("--port",  help="Serial port")
args = parser.parse_args()

pub.subscribe(on_receive, "meshtastic.receive.text")

if args.host:
    iface = meshtastic.tcp_interface.TCPInterface(hostname=args.host)
else:
    iface = meshtastic.serial_interface.SerialInterface(devPath=args.port)

print(f"Forwarding mesh messages to Telegram chat {TELEGRAM_CHAT_ID}. Press Ctrl-C to stop.")
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    pass
finally:
    iface.close()

Script 3: Battery Monitor with Alerts

Monitors battery levels of all nodes in the mesh and sends a text-message alert over the mesh when any node’s battery falls below a configurable threshold. Useful for solar-powered relay nodes where low battery means imminent network degradation.

# battery_monitor.py
# Sends a mesh alert when any node battery drops below a threshold.
#
# Usage:
#     python battery_monitor.py [--threshold 20] [--interval 300]
import argparse
import time
import meshtastic
import meshtastic.serial_interface
from pubsub import pub

ALERT_COOLDOWN = {}  # node_id -> last_alert_time to avoid spamming

def check_battery(iface, threshold, interval):
    # Periodically check all nodes and alert on low battery.
    while True:
        time.sleep(interval)
        nodes = iface.nodes or {}
        now   = time.time()

        for node_id, node_data in nodes.items():
            metrics = node_data.get("deviceMetrics", {})
            battery = metrics.get("batteryLevel")

            if battery is None:
                continue  # no telemetry available

            # Skip nodes that are charging (level > 100 indicates USB power on some firmware)
            if battery > 100:
                continue

            if battery < threshold:
                last_alert = ALERT_COOLDOWN.get(node_id, 0)
                # Only alert once per hour per node
                if now - last_alert > 3600:
                    long_name = node_data.get("user", {}).get("longName", node_id)
                    alert_msg = (
                        f"LOW BATTERY ALERT: {long_name} ({node_id}) "
                        f"is at {battery}%"
                    )
                    print(alert_msg)
                    # Broadcast alert on the primary channel
                    iface.sendText(alert_msg)
                    ALERT_COOLDOWN[node_id] = now

def main():
    parser = argparse.ArgumentParser(description="Meshtastic battery monitor")
    parser.add_argument("--threshold", type=int, default=20,
                        help="Battery percentage threshold for alerts (default: 20)")
    parser.add_argument("--interval",  type=int, default=300,
                        help="Check interval in seconds (default: 300)")
    parser.add_argument("--port",      help="Serial port")
    parser.add_argument("--host",      help="TCP hostname")
    args = parser.parse_args()

    if args.host:
        import meshtastic.tcp_interface
        iface = meshtastic.tcp_interface.TCPInterface(hostname=args.host)
    else:
        iface = meshtastic.serial_interface.SerialInterface(devPath=args.port)

    print(f"Battery monitor started. Alert threshold: {args.threshold}%. "
          f"Check interval: {args.interval}s.")

    import threading
    t = threading.Thread(target=check_battery,
                         args=(iface, args.threshold, args.interval),
                         daemon=True)
    t.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        pass
    finally:
        iface.close()

if __name__ == "__main__":
    main()

Script 4: Automated Network Health Reporter

Generates a periodic network health report and either prints it to the console or sends it as a mesh broadcast. Summarizes node count, online/offline status, average SNR, channel utilization, and identifies any nodes not heard in the last configured window.

# health_reporter.py
# Generates periodic Meshtastic network health reports.
#
# Usage:
#     python health_reporter.py [--interval 3600] [--broadcast]
import argparse
import time
import datetime
import meshtastic
import meshtastic.serial_interface
import meshtastic.tcp_interface


def generate_report(iface, offline_threshold_minutes=60) -> str:
    # Build a health report string from current node data.
    nodes   = iface.nodes or {}
    now     = time.time()
    total   = len(nodes)
    online  = []
    offline = []
    snr_values       = []
    util_values      = []

    for node_id, node_data in nodes.items():
        last_heard = node_data.get("lastHeard", 0)
        age_min    = (now - last_heard) / 60 if last_heard else None
        long_name  = node_data.get("user", {}).get("longName", node_id)
        metrics    = node_data.get("deviceMetrics", {})
        snr        = node_data.get("snr")
        util       = metrics.get("channelUtilization")

        if snr is not None:
            snr_values.append(snr)
        if util is not None:
            util_values.append(util)

        if age_min is not None and age_min < offline_threshold_minutes:
            online.append((long_name, age_min, snr))
        else:
            offline.append((long_name, age_min))

    avg_snr  = sum(snr_values)  / len(snr_values)  if snr_values  else None
    avg_util = sum(util_values) / len(util_values) if util_values else None

    lines = [
        f"=== Mesh Health Report {datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')} ===",
        f"Total nodes: {total}  |  Online (<{offline_threshold_minutes}m): {len(online)}"
        f"  |  Offline: {len(offline)}",
    ]

    if avg_util is not None:
        health = "OK" if avg_util < 15 else "WARN" if avg_util < 25 else "HIGH"
        lines.append(f"Avg channel utilization: {avg_util:.1f}%  [{health}]")

    if avg_snr is not None:
        lines.append(f"Avg SNR (last heard): {avg_snr:.1f} dB")

    if offline:
        lines.append(f"Offline nodes ({len(offline)}):")
        for name, age in offline:
            age_str = f"{age:.0f}m ago" if age is not None else "never"
            lines.append(f"  - {name}: last heard {age_str}")

    return "
".join(lines)


def main():
    parser = argparse.ArgumentParser(description="Meshtastic network health reporter")
    parser.add_argument("--interval",  type=int, default=3600,
                        help="Report interval in seconds (default: 3600)")
    parser.add_argument("--broadcast", action="store_true",
                        help="Broadcast report as mesh text message")
    parser.add_argument("--offline",   type=int, default=60,
                        help="Minutes without a packet to consider a node offline (default: 60)")
    parser.add_argument("--port",      help="Serial port")
    parser.add_argument("--host",      help="TCP hostname")
    args = parser.parse_args()

    if args.host:
        iface = meshtastic.tcp_interface.TCPInterface(hostname=args.host)
    else:
        iface = meshtastic.serial_interface.SerialInterface(devPath=args.port)

    print("Health reporter running. First report in", args.interval, "seconds.")

    try:
        while True:
            time.sleep(args.interval)
            report = generate_report(iface, offline_threshold_minutes=args.offline)
            print(report)
            print()

            if args.broadcast:
                # Mesh messages are limited to ~228 bytes; send a summary
                nodes  = iface.nodes or {}
                now    = time.time()
                online = sum(
                    1 for n in nodes.values()
                    if (now - n.get("lastHeard", 0)) / 60 < args.offline
                )
                metrics  = [n.get("deviceMetrics", {}).get("channelUtilization")
                            for n in nodes.values()
                            if n.get("deviceMetrics", {}).get("channelUtilization") is not None]
                avg_util = sum(metrics) / len(metrics) if metrics else 0
                short_report = (
                    f"Mesh status: {online}/{len(nodes)} online, "
                    f"chan util {avg_util:.1f}%"
                )
                iface.sendText(short_report)
                print(f"Broadcast: {short_report!r}")
    except KeyboardInterrupt:
        pass
    finally:
        iface.close()

if __name__ == "__main__":
    main()