Python Plugin Development
The Python implementation provides a Pythonic interface to the DroneEngage DataBus using the same UDP-based protocol as the C++ and Node.js versions. It is ideal for rapid prototyping, scripting, sensor integration, and AI/ML pipelines. Source files are in droneengage_databus/python/.
Features
Python 3.7+ — Tested with standard library only (
coloramaoptional)__new__-based Singleton —CModule()always returns the same instance; thread-safe viathreading.Locksnake_case methods —
add_module_features,set_hardware,send_error_message, etc.m_OnReceivecallback — Direct function reference, same pattern as C++Binary Message Support —
sendBMSGwith bytes payloadConfig File Support —
ConfigFileandLocalConfigFilewith hot-reload and C-style comment parsingExtensible Parser — Subclass
AndruavMessageParserBaseto handle commandsColored Console Output —
Colorsutility for styled terminal messages
Quick Start
Installation
cd droneengage_databus/python
pip install colorama # optional — only needed for colored output
As an editable package:
pip install -e .
Run
python python_client.py --help
python python_client.py MyModule 60000 61111 # name, de_comm port, listen port
python python_client.py MyModule # uses defaults (60000 / 61111)
Minimal Working Module
from de_module import CModule
from de_facade_base import CFacade_Base
from messages import *
DEFAULT_UDP_DATABUS_PACKET_SIZE = 8192
c_module = CModule()
base_facade = CFacade_Base()
base_facade.set_module(c_module)
c_module.defineModule(
"gen", # MODULE_CLASS_GENERIC
"MyModule", # module_id (display name)
"unique-key-001", # module_key (persistent GUID)
"1.0.0", # version
[] # message_filter (empty = receive nothing)
)
c_module.add_module_features("T") # MODULE_FEATURE_SENDING_TELEMETRY
c_module.add_module_features("R") # MODULE_FEATURE_RECEIVING_TELEMETRY
c_module.init("0.0.0.0", 60000, "0.0.0.0", 61111, DEFAULT_UDP_DATABUS_PACKET_SIZE)
import time
while True:
time.sleep(1)
base_facade.send_error_message("", ERROR_USER_DEFINED, NOTIFICATION_TYPE_NOTICE,
NOTIFICATION_TYPE_INFO, "Hello from Python")
Core Concepts
Singleton Pattern
CModule uses __new__ for thread-safe singleton access:
c_module_a = CModule()
c_module_b = CModule()
assert c_module_a is c_module_b # True — same instance
Receive Callback
def on_receive(message, msg_len, j_msg):
msg_type = j_msg.get(ANDRUAV_PROTOCOL_MESSAGE_TYPE)
cmd = j_msg.get(ANDRUAV_PROTOCOL_MESSAGE_CMD, {})
# dispatch on msg_type ...
c_module.m_OnReceive = on_receive
Callback Chain
CUDPClient (chunk reassembly)
→ CModule.onReceive(message, len)
→ validates routing / handles TYPE_AndruavModule_ID (sets m_party_id / m_group_id)
→ calls m_OnReceive(message, len, jMsg) ← your callback
→ optional: parser.parse_message(jMsg, message, len)
→ parse_command() / parse_remote_execute() ← your overrides
Thread Safety
All send operations use threading.RLock:
with self.m_lock:
# build message, call sendMSG
The singleton __new__ gate uses a class-level threading.Lock:
class CModule:
_instance = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
Complete API Reference
CModule
from de_module import CModule
c_module = CModule() # singleton
Initialization
def defineModule(self, module_class, module_id, module_key, module_version, message_filter):
| Parameter | Description |
|---|---|
module_class | "gen", "fcb", "camera", etc. |
module_id | Display name shown in WebClient |
module_key | Unique persistent GUID |
module_version | Version string e.g. "1.0.0" |
message_filter | List of TYPE_* ints; [] = receive none |
def init(self, target_ip, broadcasts_port, host, listening_port, chunk_size):
| Parameter | Description |
|---|---|
target_ip | Communicator IP ("0.0.0.0" = localhost) |
broadcasts_port | Communicator port (typically 60000) |
host | Local bind address ("0.0.0.0") |
listening_port | Local receive port (unique per module) |
chunk_size | Max UDP payload (use 8192) |
def uninit(self): # stops UDP threads; call before exit
Module Configuration
def add_module_features(self, feature: str): # "R", "T", "C", "V", "G", "A", "K", "P"
def set_hardware(self, hardware_serial: str, hardware_serial_type: int): # 0=undef, 1=CPU
def appendExtraField(self, name: str, ms): # add custom fields to ID broadcast
Sending
def sendJMSG(self, targetPartyID: str, jmsg: dict, andruav_message_id: int, internal_message: bool):
| Parameter | Description |
|---|---|
targetPartyID | "" = broadcast; or specific party ID |
jmsg | Dict — the message payload |
andruav_message_id | TYPE_* constant from messages.py |
internal_message | True = intermodule; False = group/individual |
def sendBMSG(self, targetPartyID, bmsg, bmsg_length, andruav_message_id, internal_message, message_cmd):
| Parameter | Description |
|---|---|
bmsg | bytes — binary payload |
bmsg_length | Length of bmsg (or 0 for JSON-only) |
message_cmd | Metadata dict placed in the JSON header |
Wire format: JSON_header.encode() + b'\0' + bmsg
def send_sys_msg(self, jmsg, andruav_message_id): # system message to communicator
def sendMREMSG(self, command_type): # module remote-execute command
def forwardMSG(self, message, datalength): # forward raw bytes
def sendMSG(self, msg, length): # low-level raw send
Receiving
# Assign your callback:
c_module.m_OnReceive = lambda message, msg_len, j_msg: ...
State
c_module.m_party_id # set after communicator responds
c_module.m_group_id # set after communicator responds
c_module.m_module_key
CFacade_Base (alias FacadeBase)
from de_facade_base import CFacade_Base
facade = CFacade_Base() # singleton
facade.set_module(c_module)
Methods
def set_module(self, module): # inject CModule reference
def request_id(self, target_party_id: str):
def send_error_message(self, target_party_id: str, error_number: int,
info_type: int, notification_type: int, description: str):
notification_type | Value |
|---|---|
NOTIFICATION_TYPE_EMERGENCY | 0 |
NOTIFICATION_TYPE_ALERT | 1 |
NOTIFICATION_TYPE_CRITICAL | 2 |
NOTIFICATION_TYPE_ERROR | 3 |
NOTIFICATION_TYPE_WARNING | 4 |
NOTIFICATION_TYPE_NOTICE | 5 |
NOTIFICATION_TYPE_INFO | 6 |
NOTIFICATION_TYPE_DEBUG | 7 |
def api_send_config_template(self, target_party_id: str, module_key: str,
json_file_content_json: dict, reply: bool):
Extending CFacade_Base
from de_facade_base import CFacade_Base
from messages import TYPE_AndruavMessage_DUMMY
class MySensorFacade(CFacade_Base):
def send_sensor_reading(self, target_party_id: str, value: float):
import time
self._module.sendJMSG(target_party_id,
{"sensor_value": value, "ts": int(time.time() * 1e6)},
TYPE_AndruavMessage_DUMMY, False)
facade = MySensorFacade()
facade.set_module(c_module)
facade.send_sensor_reading("", 23.5)
AndruavMessageParserBase (alias CAndruavMessageParserBase)
Abstract base for inbound message dispatch. Subclass and override the two abstract methods.
from de_message_parser_base import AndruavMessageParserBase
class MyParser(AndruavMessageParserBase):
def __init__(self, facade):
super().__init__()
self._facade = facade
def parse_remote_execute(self, andruav_message: dict):
# handle TYPE_AndruavMessage_RemoteExecute
pass
def parse_command(self, andruav_message, full_message, full_message_length,
message_type, permission):
from messages import TYPE_AndruavMessage_GPS
if message_type == TYPE_AndruavMessage_GPS:
cmd = andruav_message.get(ANDRUAV_PROTOCOL_MESSAGE_CMD, {})
print(f"GPS: {cmd.get('lat')}, {cmd.get('lng')}")
Wire up to receive:
parser = MyParser(facade)
c_module.m_OnReceive = lambda message, msg_len, j_msg: \
parser.parse_message(j_msg, message, msg_len)
State properties:
parser.is_binary # True if message contains binary payload
parser.is_system # True if sender is the communicator server
parser.is_inter_module # True if routing type == CMD_TYPE_INTERMODULE
parse_message automatically handles TYPE_AndruavMessage_CONFIG_ACTION (restart, apply-config, fetch-config-template) before calling parse_command.
Module Constants
# Module classes (de_module.py)
MODULE_CLASS_GENERIC = "gen"
MODULE_CLASS_FCB = "fcb"
MODULE_CLASS_VIDEO = "camera"
MODULE_CLASS_P2P = "p2p"
MODULE_CLASS_COMM = "comm"
# Module features
MODULE_FEATURE_RECEIVING_TELEMETRY = "R"
MODULE_FEATURE_SENDING_TELEMETRY = "T"
MODULE_FEATURE_CAPTURE_IMAGE = "C"
MODULE_FEATURE_CAPTURE_VIDEO = "V"
# Hardware types
HARDWARE_TYPE_UNDEFINED = 0
HARDWARE_TYPE_CPU = 1
ConfigFile
Reads a JSON configuration file with C-style comment support (// and /* */), file monitoring, backup creation, and hot-reload.
from configFile import ConfigFile
config = ConfigFile.get_instance()
config.init_config_file("mymodule.config.json")
json_data = config.GetConfigJSON()
port = json_data.get("s2s_udp_target_port", "60000")
LocalConfigFile
Stores per-instance data (e.g., the persistent module_key) in a local file.
from localConfigFile import LocalConfigFile
local_config = LocalConfigFile.get_instance()
local_config.InitConfigFile("mymodule.local")
module_key = local_config.getStringField("module_key")
if not module_key:
import time
module_key = str(int(time.time() * 1e6))
local_config.addStringField("module_key", module_key)
local_config.apply()
Package Structure
droneengage_databus/python/
├── __init__.py # Package init with re-exports
├── de_module.py # CModule — main module interface
├── udpClient.py # CUDPClient — UDP with chunking/reassembly
├── de_facade_base.py # FacadeBase (CFacade_Base) — high-level send API
├── de_message_parser_base.py # AndruavMessageParserBase — abstract parser
├── configFile.py # ConfigFile — JSON config with hot-reload
├── localConfigFile.py # LocalConfigFile — persistent local settings
├── messages.py # All TYPE_* and protocol constants
├── colors.py / console_colors.py # ANSI color helpers
└── python_client.py # Working example client
Message Handling
Message Filter
from messages import TYPE_AndruavMessage_GPS, TYPE_AndruavMessage_MAVLINK
c_module.defineModule("gen", "GPSMonitor", module_key, "1.0.0",
[TYPE_AndruavMessage_GPS, TYPE_AndruavMessage_MAVLINK])
Empty [] = receive no messages. None or omit = receive all.
Dispatch Pattern
from messages import (ANDRUAV_PROTOCOL_MESSAGE_TYPE, ANDRUAV_PROTOCOL_MESSAGE_CMD,
TYPE_AndruavMessage_GPS)
def on_receive(message, msg_len, j_msg):
try:
msg_type = j_msg.get(ANDRUAV_PROTOCOL_MESSAGE_TYPE)
cmd = j_msg.get(ANDRUAV_PROTOCOL_MESSAGE_CMD, {})
if msg_type == TYPE_AndruavMessage_GPS:
print(f"GPS: lat={cmd.get('lat')}, lng={cmd.get('lng')}")
except Exception as e:
print(f"Error: {e}")
c_module.m_OnReceive = on_receive
Binary Transmission
from messages import TYPE_AndruavMessage_IMG
import time
with open("photo.jpg", "rb") as f:
image_data = f.read()
metadata = {"lat": 31.5, "lng": 34.5, "alt": 100,
"tim": int(time.time() * 1e6)}
c_module.sendBMSG("", image_data, len(image_data),
TYPE_AndruavMessage_IMG, False, metadata)
Custom Message Types
from messages import TYPE_AndruavMessage_USER_RANGE_START
TYPE_MY_SENSOR_DATA = TYPE_AndruavMessage_USER_RANGE_START + 0
TYPE_MY_CMD_ACK = TYPE_AndruavMessage_USER_RANGE_START + 1
c_module.sendJMSG("", {"temperature": 25.5}, TYPE_MY_SENSOR_DATA, False)
Configuration Management
Reading Config Values
from configFile import ConfigFile
config = ConfigFile.get_instance()
config.init_config_file("mymodule.config.json")
json_cfg = config.GetConfigJSON()
target_ip = json_cfg.get("s2s_udp_target_ip", "0.0.0.0")
target_port = int(json_cfg.get("s2s_udp_target_port", 60000))
listen_port = int(json_cfg.get("s2s_udp_listening_port", 61111))
c_module.init(target_ip, target_port, "0.0.0.0", listen_port, 8192)
Hot-Reload Monitoring
ConfigFile supports watching the config file for changes:
config.startMonitoring()
Config Template
The parser’s parse_command can serve a template.json to the WebClient:
# Handled automatically by AndruavMessageParserBase._handle_config_action
# when TYPE_AndruavMessage_CONFIG_ACTION with action CONFIG_REQUEST_FETCH_CONFIG_TEMPLATE
# is received — it reads template.json and calls:
self._facade.api_send_config_template(sender, module_key, json_content, True)
Graceful Shutdown
import signal, sys
def signal_handler(signum, frame):
c_module.uninit()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
Console Colors
from console_colors import Colors
print(Colors.INFO_CONSOLE_BOLD_TEXT + "Starting module..." + Colors.NORMAL_CONSOLE_TEXT)
print(Colors.SUCCESS_CONSOLE_BOLD_TEXT + "Connected!" + Colors.NORMAL_CONSOLE_TEXT)
print(Colors.ERROR_CONSOLE_TEXT + "Connection failed" + Colors.NORMAL_CONSOLE_TEXT)
Best Practices
Persistent module key — Use
LocalConfigFileto generate and store amodule_keyon first run.Unique listen port — Each module instance must bind a different port.
Minimal message filter — List only
TYPE_*values you handle.Wrap
m_OnReceivein try/except — Exceptions inside will silently stop message delivery.Call
uninit()on shutdown — Stops UDP threads and sockets cleanly.Use
CFacade_Base— Extend it rather than callingsendJMSGdirectly from application code.add_module_featuresuses snake_case — notaddModuleFeatures.
Troubleshooting
| Issue | Solution |
|---|---|
| Module not in WebClient Details | Check c_module.m_party_id is non-empty after init |
| Messages not received | Verify TYPE_* is in message_filter list |
ModuleNotFoundError: messages | Run from python/ directory or install with pip install -e . |
| Binary data corrupt | Verify bmsg_length == len(bmsg) |
m_OnReceive not called | Ensure c_module.init(...) was called after m_OnReceive was set |
| Config not found | Pass absolute or cwd-relative path to init_config_file |