From 53b58500f7cdc05094d885ef003f9e7abab4dc75 Mon Sep 17 00:00:00 2001 From: henri Date: Wed, 19 Feb 2025 17:48:48 +1300 Subject: [PATCH 1/3] Added example script : meshtastic_serial_message_reader.py --- examples/meshtastic_serial_message_reader.py | 45 ++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 examples/meshtastic_serial_message_reader.py diff --git a/examples/meshtastic_serial_message_reader.py b/examples/meshtastic_serial_message_reader.py new file mode 100644 index 000000000..bf0112e4f --- /dev/null +++ b/examples/meshtastic_serial_message_reader.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 +# +# Released Under GNU GPLv3 +# Copyright 2025 Henri Shustak +# +# About : +# This script will print messages as they arrive from a meshtastic node connected via serial port USB. +# If you have multiple nodes attached, you will need to edit this script and specify the node to monitor. +# https://gist.github.com/henri/a6584d55813f971e5b1a4ee940c07d25 +# +# Requirements : +# You will need to install python meshtastic libraries : https://github.com/meshtastic/python +# +# Version History : +# 1.0 - initial release +# 1.1 - added support for sender id and bug fixs + +import time +import meshtastic +import meshtastic.serial_interface +from pubsub import pub + +def onReceive(packet, interface): + # DEBUGGING + # print(f"message arrived") + # print(f"{packet}") + try: + if packet['decoded']['portnum'] == 'TEXT_MESSAGE_APP': + message = packet['decoded']['text'] + channel_num = packet['channel'] + sender_id = packet['fromId'] + print(f"{channel_num} : {sender_id} : {message}") + except KeyError as e: + print(f"unable to decode message") + return + +#pub.subscribe(onReceive, "meshtastic.receive.text") +pub.subscribe(onReceive, "meshtastic.receive") + +# try to find a meshtastic device, otherwise provide a device path like /dev/ttyUSB0 +interface = meshtastic.serial_interface.SerialInterface() + +while True: + time.sleep(10) # wait for the next message + From acff793f28abf390b040ca278dcc6fc130cb5afe Mon Sep 17 00:00:00 2001 From: henri Date: Thu, 20 Feb 2025 08:44:32 +1300 Subject: [PATCH 2/3] Updates and bug fixes meshtastic_serial_message_reader.py --- examples/meshtastic_serial_message_reader.py | 35 ++++++++++++++------ 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/examples/meshtastic_serial_message_reader.py b/examples/meshtastic_serial_message_reader.py index bf0112e4f..c5d64b4bc 100644 --- a/examples/meshtastic_serial_message_reader.py +++ b/examples/meshtastic_serial_message_reader.py @@ -6,16 +6,18 @@ # About : # This script will print messages as they arrive from a meshtastic node connected via serial port USB. # If you have multiple nodes attached, you will need to edit this script and specify the node to monitor. -# https://gist.github.com/henri/a6584d55813f971e5b1a4ee940c07d25 # -# Requirements : +# Requirements : # You will need to install python meshtastic libraries : https://github.com/meshtastic/python # # Version History : # 1.0 - initial release -# 1.1 - added support for sender id and bug fixs +# 1.1 - added support for sender id and bug fixes +# 1.2 - added date and time reporting to each message +# 1.3 - bug fixes and improved error handling import time +from datetime import datetime, timezone import meshtastic import meshtastic.serial_interface from pubsub import pub @@ -24,22 +26,33 @@ def onReceive(packet, interface): # DEBUGGING # print(f"message arrived") # print(f"{packet}") + # print(f"-----------------------------------------------------------------") try: if packet['decoded']['portnum'] == 'TEXT_MESSAGE_APP': - message = packet['decoded']['text'] - channel_num = packet['channel'] - sender_id = packet['fromId'] - print(f"{channel_num} : {sender_id} : {message}") - except KeyError as e: - print(f"unable to decode message") + try: + message = packet['decoded']['text'] + try: + channel_num = packet['channel'] + except KeyError as e1: + channel_num = 0 + sender_id = packet['fromId'] + message_time = datetime.now().strftime(f"%a %b %d %Y %H:%M:%S {tz_name}") + print(f"{message_time} : {channel_num} : {sender_id} : {message}") + except KeyError as e2: + print(f"unable to decode message") + return + except KeyError as e3: return +# configure the local time zone +tz_name = time.tzname[time.localtime().tm_isdst > 0] + +# registrer for incomming messages #pub.subscribe(onReceive, "meshtastic.receive.text") pub.subscribe(onReceive, "meshtastic.receive") -# try to find a meshtastic device, otherwise provide a device path like /dev/ttyUSB0 +# attempt to locate a meshtastic device, otherwise provide a device path like /dev/ttyUSB0 interface = meshtastic.serial_interface.SerialInterface() while True: time.sleep(10) # wait for the next message - From 81ae8b6c87ba84a62661bd377275c7381407a991 Mon Sep 17 00:00:00 2001 From: Ian McEwen Date: Sun, 31 May 2026 17:40:19 -0700 Subject: [PATCH 3/3] Make examples more regularized and focused, and add contribution guidelines for the examples folder --- examples/CONTRIBUTING.md | 70 +++++++++++++ examples/get_hw.py | 51 +++++++--- examples/hello_world_serial.py | 43 +++++--- examples/info_example.py | 50 ++++++--- examples/meshtastic_serial_message_reader.py | 102 ++++++++++--------- examples/pub_sub_example.py | 30 ------ examples/pub_sub_example2.py | 42 -------- examples/replymessage.py | 20 ++-- examples/scan_for_devices.py | 56 +++++++--- examples/set_owner.py | 49 ++++++--- examples/show_ports.py | 22 +++- examples/tcp_connection_info_once.py | 41 ++++++++ examples/tcp_gps_example.py | 51 ++++++++-- examples/tcp_pubsub_send_and_receive.py | 54 ++++++++++ examples/textchat.py | 18 ++-- examples/waypoint.py | 61 +++++------ 16 files changed, 511 insertions(+), 249 deletions(-) create mode 100644 examples/CONTRIBUTING.md delete mode 100644 examples/pub_sub_example.py delete mode 100644 examples/pub_sub_example2.py create mode 100644 examples/tcp_connection_info_once.py create mode 100644 examples/tcp_pubsub_send_and_receive.py diff --git a/examples/CONTRIBUTING.md b/examples/CONTRIBUTING.md new file mode 100644 index 000000000..97bfccd99 --- /dev/null +++ b/examples/CONTRIBUTING.md @@ -0,0 +1,70 @@ +# Contributing Example Scripts + +Use this guide when adding or updating scripts in `examples/`. + +## Must-have checklist before opening a PR + +1. Script teaches one clear thing (its primary learning goal). +2. File name matches that goal. +3. Top docstring states purpose, transport scope, behavior, and expected output. +4. Script has safe shutdown (`with ...` or `finally`) and graceful `KeyboardInterrupt` handling. +5. Argument handling is clear (`argparse` preferred). +6. Errors are explicit (no bare `except:`). +7. The script is not a near-duplicate of an existing example. + +## Choose the right teaching goal + +Each example should have one primary lesson. Keep it focused. + +- Good: "Send one text message over serial." +- Good: "Print inbound text messages." +- Avoid: discovery + chat + config mutation all in one script unless that combined flow is the lesson. + +## Transport scope must be explicit + +State exactly what transports are supported and why. + +- Serial-only when that keeps the example simplest. +- Multi-transport (Serial/TCP/BLE) only when transport selection is part of the lesson. +- If TCP/BLE are supported, expose explicit flags (`--host`, `--ble`) and document defaults. + +## Behavior and output should be predictable + +Readers should know if the script sends, receives, mutates config, or combines those. + +- Receive examples: subscribe to the narrowest pubsub topic that matches the lesson. +- Send examples: clarify destination behavior (broadcast default vs explicit destination). +- Mutation examples: clearly document side effects. + +Output should make success easy to confirm: + +- Print concise, stable status/event lines. +- Avoid noisy debug output unless the script is specifically diagnostic-focused. + +## Cleanup and error handling + +- Use context managers where practical; otherwise close interfaces in `finally`. +- Handle `KeyboardInterrupt` cleanly. +- Exit non-zero for invalid args, connection/setup failures, or command failures. + +## Naming guidance + +Use descriptive names tied to the teaching goal. + +- Prefer names like `tcp_connection_info_once.py` over `pub_sub_example.py`. +- Prefer names like `tcp_pubsub_send_and_receive.py` over `pub_sub_example2.py`. +- Avoid generic names such as `example2.py`. + +Keep existing filenames only when compatibility or discoverability outweighs clarity. + +## New script vs extending an existing one + +Create a new script when: + +- The learning goal is genuinely distinct. +- Combining behaviors would make either example harder to understand. + +Extend an existing script when: + +- The change deepens the same lesson. +- The resulting script remains focused and readable. diff --git a/examples/get_hw.py b/examples/get_hw.py index cfa92bdbb..137796f04 100644 --- a/examples/get_hw.py +++ b/examples/get_hw.py @@ -1,21 +1,42 @@ -"""Simple program to demo how to use meshtastic library. - To run: python examples/get_hw.py +"""Print the local node hardware model. + +Purpose: show the narrowest read-only local hardware lookup. +Transport scope: Serial only. +Behavior: reads local node metadata and prints hwModel. +Expected output: one hardware model line, if available. +Cleanup/error handling: exits with code 3 for bad args and closes interface on exit. """ +import argparse import sys -import meshtastic import meshtastic.serial_interface -# simple arg check -if len(sys.argv) != 1: - print(f"usage: {sys.argv[0]}") - print("Print the hardware model for the local node.") - sys.exit(3) - -iface = meshtastic.serial_interface.SerialInterface() -if iface.nodes: - for n in iface.nodes.values(): - if n["num"] == iface.myInfo.my_node_num: - print(n["user"]["hwModel"]) -iface.close() + +def main() -> int: + """Print the hardware model for the local node.""" + if len(sys.argv) != 1: + print(f"usage: {sys.argv[0]}") + print("Print the hardware model for the local node.") + return 3 + + parser = argparse.ArgumentParser(description="Print local Meshtastic hardware model") + parser.parse_args() + + try: + with meshtastic.serial_interface.SerialInterface() as iface: + if iface.nodes: + for node in iface.nodes.values(): + if node["num"] == iface.myInfo.my_node_num: + print(node["user"]["hwModel"]) + break + except KeyboardInterrupt: + return 0 + except Exception as exc: + print(f"Error: Could not read hardware model: {exc}") + return 1 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/examples/hello_world_serial.py b/examples/hello_world_serial.py index bed81db70..5f0b09b4a 100644 --- a/examples/hello_world_serial.py +++ b/examples/hello_world_serial.py @@ -1,19 +1,38 @@ -"""Simple program to demo how to use meshtastic library. - To run: python examples/hello_world_serial.py +"""Send one text message over serial. + +Purpose: minimal send-only example. +Transport scope: Serial only. +Behavior: sends one message and exits. +Expected output: no output on success. +Cleanup/error handling: exits with code 3 for bad args, closes interface on exit. """ +import argparse import sys -import meshtastic import meshtastic.serial_interface -# simple arg check -if len(sys.argv) < 2: - print(f"usage: {sys.argv[0]} message") - sys.exit(3) -# By default will try to find a meshtastic device, -# otherwise provide a device path like /dev/ttyUSB0 -iface = meshtastic.serial_interface.SerialInterface() -iface.sendText(sys.argv[1]) -iface.close() +def main() -> int: + """Parse arguments and send one text message.""" + if len(sys.argv) < 2: + print(f"usage: {sys.argv[0]} message") + return 3 + + parser = argparse.ArgumentParser(description="Send one Meshtastic text message over serial") + parser.add_argument("message", help="Message text to broadcast") + args = parser.parse_args() + + try: + with meshtastic.serial_interface.SerialInterface() as iface: + iface.sendText(args.message) + except KeyboardInterrupt: + return 0 + except Exception as exc: + print(f"Error: Could not send message: {exc}") + return 1 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/examples/info_example.py b/examples/info_example.py index 7cc6a7195..ca6dabcc5 100644 --- a/examples/info_example.py +++ b/examples/info_example.py @@ -1,20 +1,46 @@ -"""Simple program to demo how to use meshtastic library. - To run: python examples/info.py +"""Show a concise local node summary over serial. + +Purpose: read local node identity and metadata in one place. +Transport scope: Serial only. +Behavior: reads node database, prints local node ID/name/hardware model. +Expected output: 1-3 summary lines describing the local node. +Cleanup/error handling: closes interface on exit and prints clear errors on failure. """ -import meshtastic import meshtastic.serial_interface -iface = meshtastic.serial_interface.SerialInterface() -# call showInfo() just to ensure values are populated -# info = iface.showInfo() +def main() -> int: + """Print local node summary fields.""" + try: + with meshtastic.serial_interface.SerialInterface() as iface: + local_num = iface.myInfo.my_node_num + local_node = None + if iface.nodes: + for node in iface.nodes.values(): + if node["num"] == local_num: + local_node = node + break + + if not local_node: + print(f"Local node not found in node database (node num: {local_num}).") + return 1 + user = local_node.get("user", {}) + print(f"Node number: {local_num}") + print(f"Node ID: {local_node.get('id', 'unknown')}") + print( + "Name: " + f"{user.get('longName', 'unknown')} ({user.get('shortName', 'unknown')})" + ) + print(f"Hardware model: {user.get('hwModel', 'unknown')}") + except KeyboardInterrupt: + return 0 + except Exception as exc: + print(f"Error: Could not read local node summary: {exc}") + return 1 + return 0 -if iface.nodes: - for n in iface.nodes.values(): - if n["num"] == iface.myInfo.my_node_num: - print(n["user"]["hwModel"]) - break -iface.close() +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/examples/meshtastic_serial_message_reader.py b/examples/meshtastic_serial_message_reader.py index c5d64b4bc..4cf81120c 100644 --- a/examples/meshtastic_serial_message_reader.py +++ b/examples/meshtastic_serial_message_reader.py @@ -1,58 +1,64 @@ #!/usr/bin/env python3 -# -# Released Under GNU GPLv3 -# Copyright 2025 Henri Shustak -# -# About : -# This script will print messages as they arrive from a meshtastic node connected via serial port USB. -# If you have multiple nodes attached, you will need to edit this script and specify the node to monitor. -# -# Requirements : -# You will need to install python meshtastic libraries : https://github.com/meshtastic/python -# -# Version History : -# 1.0 - initial release -# 1.1 - added support for sender id and bug fixes -# 1.2 - added date and time reporting to each message -# 1.3 - bug fixes and improved error handling +"""Passively monitor incoming text messages over serial. +Purpose: receive-only monitor for text messages. +Transport scope: Serial only. +Behavior: subscribes to text receive events and prints timestamp/channel/sender/text. +Expected output: one line per received text message. +Cleanup/error handling: graceful Ctrl+C exit and explicit connection errors. +""" + +import argparse import time -from datetime import datetime, timezone -import meshtastic -import meshtastic.serial_interface +from datetime import datetime +from typing import Any, Optional + from pubsub import pub +import meshtastic.serial_interface -def onReceive(packet, interface): - # DEBUGGING - # print(f"message arrived") - # print(f"{packet}") - # print(f"-----------------------------------------------------------------") - try: - if packet['decoded']['portnum'] == 'TEXT_MESSAGE_APP': - try: - message = packet['decoded']['text'] - try: - channel_num = packet['channel'] - except KeyError as e1: - channel_num = 0 - sender_id = packet['fromId'] - message_time = datetime.now().strftime(f"%a %b %d %Y %H:%M:%S {tz_name}") - print(f"{message_time} : {channel_num} : {sender_id} : {message}") - except KeyError as e2: - print(f"unable to decode message") - return - except KeyError as e3: +_TZ_NAME = time.tzname[time.localtime().tm_isdst > 0] + + +def on_receive(packet: dict[str, Any], interface: Any) -> None: # pylint: disable=unused-argument + """Print a compact line for each received text packet.""" + decoded = packet.get("decoded", {}) + if decoded.get("portnum") != "TEXT_MESSAGE_APP": return -# configure the local time zone -tz_name = time.tzname[time.localtime().tm_isdst > 0] + message = decoded.get("text") + if not message: + return + + channel_num = packet.get("channel", 0) + sender_id = packet.get("fromId", "unknown") + message_time = datetime.now().strftime(f"%a %b %d %Y %H:%M:%S {_TZ_NAME}") + print(f"{message_time} : {channel_num} : {sender_id} : {message}") + -# registrer for incomming messages -#pub.subscribe(onReceive, "meshtastic.receive.text") -pub.subscribe(onReceive, "meshtastic.receive") +def main() -> int: + """Connect over serial and print inbound text messages.""" + parser = argparse.ArgumentParser(description="Read incoming Meshtastic text over serial") + parser.add_argument("--port", default=None, help="Serial port path (default: auto-detect)") + args = parser.parse_args() + + pub.subscribe(on_receive, "meshtastic.receive") + + iface: Optional[meshtastic.serial_interface.SerialInterface] = None + try: + iface = meshtastic.serial_interface.SerialInterface(devPath=args.port) + print("Connected. Listening for text messages. Press Ctrl+C to exit.") + while True: + time.sleep(1) + except KeyboardInterrupt: + return 0 + except Exception as exc: + print(f"Error: Could not monitor serial messages: {exc}") + return 1 + finally: + if iface: + iface.close() + return 0 -# attempt to locate a meshtastic device, otherwise provide a device path like /dev/ttyUSB0 -interface = meshtastic.serial_interface.SerialInterface() -while True: - time.sleep(10) # wait for the next message +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/examples/pub_sub_example.py b/examples/pub_sub_example.py deleted file mode 100644 index fa3cb4161..000000000 --- a/examples/pub_sub_example.py +++ /dev/null @@ -1,30 +0,0 @@ -"""Simple program to demo how to use meshtastic library. - To run: python examples/pub_sub_example.py -""" - -import sys - -from pubsub import pub - -import meshtastic -import meshtastic.tcp_interface - -# simple arg check -if len(sys.argv) < 2: - print(f"usage: {sys.argv[0]} host") - sys.exit(1) - - -def onConnection(interface, topic=pub.AUTO_TOPIC): # pylint: disable=unused-argument - """This is called when we (re)connect to the radio.""" - print(interface.myInfo) - interface.close() - - -pub.subscribe(onConnection, "meshtastic.connection.established") - -try: - iface = meshtastic.tcp_interface.TCPInterface(sys.argv[1]) -except: - print(f"Error: Could not connect to {sys.argv[1]}") - sys.exit(1) diff --git a/examples/pub_sub_example2.py b/examples/pub_sub_example2.py deleted file mode 100644 index 64819be03..000000000 --- a/examples/pub_sub_example2.py +++ /dev/null @@ -1,42 +0,0 @@ -"""Simple program to demo how to use meshtastic library. - To run: python examples/pub_sub_example2.py -""" - -import sys -import time - -from pubsub import pub - -from meshtastic.tcp_interface import TCPInterface - -# simple arg check -if len(sys.argv) < 2: - print(f"usage: {sys.argv[0]} host") - sys.exit(1) - - -def onReceive(packet, interface): # pylint: disable=unused-argument - """called when a packet arrives""" - print(f"Received: {packet}") - - -def onConnection(interface, topic=pub.AUTO_TOPIC): # pylint: disable=unused-argument - """called when we (re)connect to the radio""" - # defaults to broadcast, specify a destination ID if you wish - interface.sendText("hello mesh") - - -pub.subscribe(onReceive, "meshtastic.receive") -pub.subscribe(onConnection, "meshtastic.connection.established") - -iface=None -try: - iface = TCPInterface(hostname=sys.argv[1]) - while True: - time.sleep(1000) -except Exception as ex: - print(f"Error: Could not connect to {sys.argv[1]} {ex}") - raise -finally: - if iface: - iface.close() diff --git a/examples/replymessage.py b/examples/replymessage.py index f6f48a286..d44c41206 100644 --- a/examples/replymessage.py +++ b/examples/replymessage.py @@ -1,7 +1,10 @@ -"""Reply message demo script. - To run: python examples/replymessage.py - To run with TCP: python examples/replymessage.py --host 192.168.1.5 - To run with BLE: python examples/replymessage.py --ble 24:62:AB:DD:DF:3A +"""Auto-reply to received text messages. + +Purpose: demonstrate receive callback + generated reply flow. +Transport scope: Serial default, optional TCP/BLE. +Behavior: listens for text, prints message metadata, sends one reply per text message. +Expected output: "Connected..." plus message/reply lines while running. +Cleanup/error handling: clear connect failures and graceful Ctrl+C close. """ import argparse @@ -13,7 +16,7 @@ import meshtastic.ble_interface from meshtastic.mesh_interface import MeshInterface -def onReceive(packet: dict, interface: MeshInterface) -> None: # pylint: disable=unused-argument +def onReceive(packet: dict, interface: MeshInterface) -> None: """Reply to every received packet with some info""" text: Optional[str] = packet.get("decoded", {}).get("text") if text: @@ -66,8 +69,5 @@ def onConnection(interface: MeshInterface, topic: Any = pub.AUTO_TOPIC) -> None: except KeyboardInterrupt: pass finally: - try: - if iface: - iface.close() - except AttributeError: - pass + if iface: + iface.close() diff --git a/examples/scan_for_devices.py b/examples/scan_for_devices.py index 8c6707abc..58cf27ce8 100644 --- a/examples/scan_for_devices.py +++ b/examples/scan_for_devices.py @@ -1,7 +1,13 @@ -"""Program to scan for hardware - To run: python examples/scan_for_devices.py +"""Scan host serial hardware for supported Meshtastic devices. + +Purpose: host-side discovery without opening a radio session. +Transport scope: none (OS/device scanning only). +Behavior: scans vendor IDs, lists matched devices, and candidate active ports. +Expected output: vendor ID list, zero-or-more detected devices, and port list. +Cleanup/error handling: exits with code 3 for bad args and code 1 on scan errors. """ +import argparse import sys from meshtastic.util import ( @@ -10,20 +16,38 @@ get_unique_vendor_ids, ) -# simple arg check -if len(sys.argv) != 1: - print(f"usage: {sys.argv[0]}") - print("Detect which device we might have.") - sys.exit(3) -vids = get_unique_vendor_ids() -print(f"Searching for all devices with these vendor ids {vids}") +def main() -> int: + """Run device detection and print candidate ports.""" + if len(sys.argv) != 1: + print(f"usage: {sys.argv[0]}") + print("Detect which device we might have.") + return 3 + + parser = argparse.ArgumentParser(description="Scan host for supported Meshtastic devices") + parser.parse_args() + + try: + vids = get_unique_vendor_ids() + print(f"Searching for all devices with these vendor ids {vids}") + + supported_devices = detect_supported_devices() + if supported_devices: + print("Detected possible devices:") + for device in supported_devices: + print( + f" name:{device.name}{device.version} firmware:{device.for_firmware}" + ) + else: + print("Detected possible devices: none") + + ports = active_ports_on_supported_devices(supported_devices) + print(f"ports:{ports}") + except Exception as exc: + print(f"Error: device scan failed: {exc}") + return 1 + return 0 -sds = detect_supported_devices() -if len(sds) > 0: - print("Detected possible devices:") -for d in sds: - print(f" name:{d.name}{d.version} firmware:{d.for_firmware}") -ports = active_ports_on_supported_devices(sds) -print(f"ports:{ports}") +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/examples/set_owner.py b/examples/set_owner.py index bc5fff43c..296c2b498 100644 --- a/examples/set_owner.py +++ b/examples/set_owner.py @@ -1,21 +1,40 @@ -"""Simple program to demo how to use meshtastic library. - To run: python examples/set_owner.py Bobby 333 +"""Set local owner long/short name over serial. + +Purpose: demonstrate a local config mutation workflow. +Transport scope: Serial only. +Behavior: updates owner long name and optional short name. +Expected output: prints the owner values being applied. +Cleanup/error handling: exits with code 3 for bad args and closes interface on exit. """ +import argparse import sys -import meshtastic import meshtastic.serial_interface -# simple arg check -if len(sys.argv) < 2: - print(f"usage: {sys.argv[0]} long_name [short_name]") - sys.exit(3) - -iface = meshtastic.serial_interface.SerialInterface() -long_name = sys.argv[1] -short_name = None -if len(sys.argv) > 2: - short_name = sys.argv[2] -iface.localNode.setOwner(long_name, short_name) -iface.close() + +def main() -> int: + """Parse args and set owner fields.""" + if len(sys.argv) < 2: + print(f"usage: {sys.argv[0]} long_name [short_name]") + return 3 + + parser = argparse.ArgumentParser(description="Set Meshtastic local owner information") + parser.add_argument("long_name", help="Owner long name") + parser.add_argument("short_name", nargs="?", default=None, help="Owner short name") + args = parser.parse_args() + + print(f"Setting owner long_name={args.long_name}, short_name={args.short_name}") + try: + with meshtastic.serial_interface.SerialInterface() as iface: + iface.localNode.setOwner(args.long_name, args.short_name) + except KeyboardInterrupt: + return 0 + except Exception as exc: + print(f"Error: Could not set owner: {exc}") + return 1 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/examples/show_ports.py b/examples/show_ports.py index 30c9cd975..932d51a7f 100644 --- a/examples/show_ports.py +++ b/examples/show_ports.py @@ -1,6 +1,24 @@ -"""Simple program to show serial ports. +"""List serial ports currently visible to Meshtastic helpers. + +Purpose: fastest host-side serial port enumeration. +Transport scope: none (host serial listing only). +Behavior: prints result of `findPorts()`. +Expected output: list-like representation of available candidate ports. +Cleanup/error handling: exits with code 1 on unexpected scan error. """ from meshtastic.util import findPorts -print(findPorts()) + +def main() -> int: + """Print discovered serial ports.""" + try: + print(findPorts()) + except Exception as exc: + print(f"Error: Could not list ports: {exc}") + return 1 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/examples/tcp_connection_info_once.py b/examples/tcp_connection_info_once.py new file mode 100644 index 000000000..2b00d4ef6 --- /dev/null +++ b/examples/tcp_connection_info_once.py @@ -0,0 +1,41 @@ +"""Connect over TCP, print connection info once, then exit. + +Purpose: demonstrate pubsub connection lifecycle callback. +Transport scope: TCP only. +Behavior: subscribe to `meshtastic.connection.established`, print `myInfo`, then close. +Expected output: one object/line showing local radio info after connect. +Cleanup/error handling: explicit connect failure message and clean close on callback. +""" + +import argparse + +from pubsub import pub +import meshtastic.tcp_interface + + +def on_connection(interface, topic=pub.AUTO_TOPIC): # pylint: disable=unused-argument + """Print local radio info when connected, then close.""" + print(interface.myInfo) + interface.close() + + +def main() -> int: + """Parse args, connect, and wait for established callback.""" + parser = argparse.ArgumentParser(description="Print radio info on TCP connect and exit") + parser.add_argument("host", help="TCP hostname or IP of the Meshtastic node") + args = parser.parse_args() + + pub.subscribe(on_connection, "meshtastic.connection.established") + + try: + meshtastic.tcp_interface.TCPInterface(args.host) + except KeyboardInterrupt: + return 0 + except Exception as exc: + print(f"Error: Could not connect to {args.host}: {exc}") + return 1 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/examples/tcp_gps_example.py b/examples/tcp_gps_example.py index 399fb05a2..c3db9b0b9 100644 --- a/examples/tcp_gps_example.py +++ b/examples/tcp_gps_example.py @@ -1,14 +1,47 @@ -"""Demonstration of how to look up a radio's location via its LAN connection. - Before running, connect your machine to the same WiFi network as the radio. +"""Look up local node position over TCP. + +Purpose: demonstrate read-only position lookup via LAN/TCP. +Transport scope: TCP only. +Behavior: connects, reads local node position, prints it, then exits. +Expected output: position dict for local node. +Cleanup/error handling: explicit connect/read failures and clean close. """ +# pylint: disable=duplicate-code + +import argparse -import meshtastic import meshtastic.tcp_interface -radio_hostname = "meshtastic.local" # Can also be an IP -iface = meshtastic.tcp_interface.TCPInterface(radio_hostname) -my_node_num = iface.myInfo.my_node_num -pos = iface.nodesByNum[my_node_num]["position"] -print(pos) -iface.close() +def main() -> int: + """Connect over TCP and print local node position.""" + parser = argparse.ArgumentParser(description="Print local node position over TCP") + parser.add_argument( + "--host", + default="meshtastic.local", + help="TCP hostname or IP (default: meshtastic.local)", + ) + args = parser.parse_args() + + iface = None + try: + iface = meshtastic.tcp_interface.TCPInterface(args.host) + my_node_num = iface.myInfo.my_node_num + pos = iface.nodesByNum[my_node_num].get("position") + if pos is None: + print(f"No position available for local node {my_node_num}") + return 1 + print(pos) + except KeyboardInterrupt: + return 0 + except Exception as exc: + print(f"Error: Could not read position from {args.host}: {exc}") + return 1 + finally: + if iface: + iface.close() + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/examples/tcp_pubsub_send_and_receive.py b/examples/tcp_pubsub_send_and_receive.py new file mode 100644 index 000000000..b506852cb --- /dev/null +++ b/examples/tcp_pubsub_send_and_receive.py @@ -0,0 +1,54 @@ +"""Send once on connect and print received packets over TCP. + +Purpose: demonstrate pubsub send-on-connect plus receive callback flow. +Transport scope: TCP only. +Behavior: sends "hello mesh" at connect, prints packets while running. +Expected output: "Connected..." plus "Received: ..." lines for inbound packets. +Cleanup/error handling: graceful Ctrl+C exit and clean interface close. +""" + +import argparse +import time + +from pubsub import pub +from meshtastic.tcp_interface import TCPInterface + + +def on_receive(packet, interface): # pylint: disable=unused-argument + """Print each inbound packet.""" + print(f"Received: {packet}") + + +def on_connection(interface, topic=pub.AUTO_TOPIC): # pylint: disable=unused-argument + """Send a broadcast text when connected.""" + print("Connected. Sending one broadcast message.") + interface.sendText("hello mesh") + + +def main() -> int: + """Parse args, connect via TCP, and run callbacks.""" + parser = argparse.ArgumentParser(description="TCP pubsub send-and-receive example") + parser.add_argument("host", help="TCP hostname or IP of the Meshtastic node") + args = parser.parse_args() + + pub.subscribe(on_receive, "meshtastic.receive") + pub.subscribe(on_connection, "meshtastic.connection.established") + + iface = None + try: + iface = TCPInterface(hostname=args.host) + while True: + time.sleep(1) + except KeyboardInterrupt: + return 0 + except Exception as exc: + print(f"Error: Could not connect to {args.host}: {exc}") + return 1 + finally: + if iface: + iface.close() + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/examples/textchat.py b/examples/textchat.py index bfceb0b72..c42af397f 100644 --- a/examples/textchat.py +++ b/examples/textchat.py @@ -1,7 +1,10 @@ -"""Simple text chat demo for meshtastic. - To run: python examples/textchat.py - To run with TCP: python examples/textchat.py --host 192.168.1.5 - To run with BLE: python examples/textchat.py --ble 24:62:AB:DD:DF:3A +"""Interactive text chat demo. + +Purpose: demonstrate bidirectional text chat loop. +Transport scope: Serial default, optional TCP/BLE. +Behavior: prints incoming messages and sends each typed line as text. +Expected output: incoming sender/text lines and sent messages reaching peers. +Cleanup/error handling: explicit connect errors and graceful Ctrl+C / EOF close. """ import argparse @@ -66,8 +69,5 @@ def onConnection(interface: MeshInterface, topic: Any = pub.AUTO_TOPIC) -> None: except EOFError: pass finally: - try: - if iface: - iface.close() - except AttributeError: - pass + if iface: + iface.close() diff --git a/examples/waypoint.py b/examples/waypoint.py index 6e3a973d9..15a495fb7 100644 --- a/examples/waypoint.py +++ b/examples/waypoint.py @@ -1,7 +1,10 @@ -"""Program to create and delete waypoint - To run: - python3 examples/waypoint.py --port /dev/ttyUSB0 create 45 test the_desc_2 '2024-12-18T23:05:23' 48.74 7.35 - python3 examples/waypoint.py delete 45 +"""Create or delete a waypoint. + +Purpose: demonstrate waypoint mutation API (create/delete). +Transport scope: Serial only. +Behavior: sends waypoint create/delete request and prints API response. +Expected output: request response object printed to stdout. +Cleanup/error handling: explicit argument parsing and clean interface close. """ import argparse @@ -12,26 +15,28 @@ import meshtastic.serial_interface parser = argparse.ArgumentParser( - prog='waypoint', - description='Create and delete Meshtastic waypoint') -parser.add_argument('--port', default=None) -parser.add_argument('--debug', default=False, action='store_true') - -subparsers = parser.add_subparsers(dest='cmd') -parser_delete = subparsers.add_parser('delete', help='Delete a waypoint') -parser_delete.add_argument('id', help="id of the waypoint") - -parser_create = subparsers.add_parser('create', help='Create a new waypoint') -parser_create.add_argument('id', help="id of the waypoint") -parser_create.add_argument('name', help="name of the waypoint") -parser_create.add_argument('description', help="description of the waypoint") -parser_create.add_argument('icon', help="icon of the waypoint") -parser_create.add_argument('expire', help="expiration date of the waypoint as interpreted by datetime.fromisoformat") -parser_create.add_argument('latitude', help="latitude of the waypoint") -parser_create.add_argument('longitude', help="longitude of the waypoint") + prog="waypoint", description="Create and delete Meshtastic waypoint" +) +parser.add_argument("--port", default=None) +parser.add_argument("--debug", default=False, action="store_true") + +subparsers = parser.add_subparsers(dest="cmd", required=True) +parser_delete = subparsers.add_parser("delete", help="Delete a waypoint") +parser_delete.add_argument("id", type=int, help="ID of the waypoint") + +parser_create = subparsers.add_parser("create", help="Create a new waypoint") +parser_create.add_argument("id", type=int, help="ID of the waypoint") +parser_create.add_argument("name", help="Name of the waypoint") +parser_create.add_argument("description", help="Description of the waypoint") +parser_create.add_argument("icon", help="Icon of the waypoint") +parser_create.add_argument( + "expire", + help="Expiration time as ISO timestamp accepted by datetime.fromisoformat", +) +parser_create.add_argument("latitude", type=float, help="Latitude of the waypoint") +parser_create.add_argument("longitude", type=float, help="Longitude of the waypoint") args = parser.parse_args() -print(args) # By default will try to find a meshtastic device, # otherwise provide a device path like /dev/ttyUSB0 @@ -40,18 +45,16 @@ else: d = None with meshtastic.serial_interface.SerialInterface(args.port, debugOut=d) as iface: - if args.cmd == 'create': + if args.cmd == "create": p = iface.sendWaypoint( - waypoint_id=int(args.id), + waypoint_id=args.id, name=args.name, description=args.description, icon=args.icon, expire=int(datetime.datetime.fromisoformat(args.expire).timestamp()), - latitude=float(args.latitude), - longitude=float(args.longitude), + latitude=args.latitude, + longitude=args.longitude, ) else: - p = iface.deleteWaypoint(int(args.id)) + p = iface.deleteWaypoint(args.id) print(p) - -# iface.close()