Python Plugin Development

The Python implementation provides a Pythonic interface to the DroneEngage DataBus using the same UDP-based protocol as the C++ and Node.js versions. It is ideal for rapid prototyping, scripting, sensor integration, and AI/ML pipelines. Source files are in droneengage_databus/python/.

Features

  • Python 3.7+ — Tested with standard library only (colorama optional)

  • __new__-based SingletonCModule() always returns the same instance; thread-safe via threading.Lock

  • snake_case methodsadd_module_features, set_hardware, send_error_message, etc.

  • m_OnReceive callback — Direct function reference, same pattern as C++

  • Binary Message SupportsendBMSG with bytes payload

  • Config File SupportConfigFile and LocalConfigFile with hot-reload and C-style comment parsing

  • Extensible Parser — Subclass AndruavMessageParserBase to handle commands

  • Colored Console OutputColors utility for styled terminal messages


Quick Start

Installation

cd droneengage_databus/python
pip install colorama       # optional — only needed for colored output

As an editable package:

pip install -e .

Run

python python_client.py --help
python python_client.py MyModule 60000 61111    # name, de_comm port, listen port
python python_client.py MyModule               # uses defaults (60000 / 61111)

Minimal Working Module

from de_module import CModule
from de_facade_base import CFacade_Base
from messages import *

DEFAULT_UDP_DATABUS_PACKET_SIZE = 8192

c_module   = CModule()
base_facade = CFacade_Base()
base_facade.set_module(c_module)

c_module.defineModule(
    "gen",             # MODULE_CLASS_GENERIC
    "MyModule",        # module_id (display name)
    "unique-key-001",  # module_key (persistent GUID)
    "1.0.0",           # version
    []                 # message_filter (empty = receive nothing)
)

c_module.add_module_features("T")   # MODULE_FEATURE_SENDING_TELEMETRY
c_module.add_module_features("R")   # MODULE_FEATURE_RECEIVING_TELEMETRY

c_module.init("0.0.0.0", 60000, "0.0.0.0", 61111, DEFAULT_UDP_DATABUS_PACKET_SIZE)

import time
while True:
    time.sleep(1)
    base_facade.send_error_message("", ERROR_USER_DEFINED, NOTIFICATION_TYPE_NOTICE,
                                   NOTIFICATION_TYPE_INFO, "Hello from Python")

Core Concepts

Singleton Pattern

CModule uses __new__ for thread-safe singleton access:

c_module_a = CModule()
c_module_b = CModule()
assert c_module_a is c_module_b   # True — same instance

Receive Callback

def on_receive(message, msg_len, j_msg):
    msg_type = j_msg.get(ANDRUAV_PROTOCOL_MESSAGE_TYPE)
    cmd      = j_msg.get(ANDRUAV_PROTOCOL_MESSAGE_CMD, {})
    # dispatch on msg_type ...

c_module.m_OnReceive = on_receive

Callback Chain

CUDPClient (chunk reassembly)
  → CModule.onReceive(message, len)
      → validates routing / handles TYPE_AndruavModule_ID (sets m_party_id / m_group_id)
      → calls m_OnReceive(message, len, jMsg)    ← your callback
          → optional: parser.parse_message(jMsg, message, len)
              → parse_command() / parse_remote_execute()  ← your overrides

Thread Safety

All send operations use threading.RLock:

with self.m_lock:
    # build message, call sendMSG

The singleton __new__ gate uses a class-level threading.Lock:

class CModule:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
        return cls._instance

Complete API Reference

CModule

from de_module import CModule
c_module = CModule()   # singleton

Initialization

def defineModule(self, module_class, module_id, module_key, module_version, message_filter):
ParameterDescription
module_class"gen", "fcb", "camera", etc.
module_idDisplay name shown in WebClient
module_keyUnique persistent GUID
module_versionVersion string e.g. "1.0.0"
message_filterList of TYPE_* ints; [] = receive none
def init(self, target_ip, broadcasts_port, host, listening_port, chunk_size):
ParameterDescription
target_ipCommunicator IP ("0.0.0.0" = localhost)
broadcasts_portCommunicator port (typically 60000)
hostLocal bind address ("0.0.0.0")
listening_portLocal receive port (unique per module)
chunk_sizeMax UDP payload (use 8192)
def uninit(self):   # stops UDP threads; call before exit

Module Configuration

def add_module_features(self, feature: str):     # "R", "T", "C", "V", "G", "A", "K", "P"
def set_hardware(self, hardware_serial: str, hardware_serial_type: int):  # 0=undef, 1=CPU
def appendExtraField(self, name: str, ms):        # add custom fields to ID broadcast

Sending

def sendJMSG(self, targetPartyID: str, jmsg: dict, andruav_message_id: int, internal_message: bool):
ParameterDescription
targetPartyID"" = broadcast; or specific party ID
jmsgDict — the message payload
andruav_message_idTYPE_* constant from messages.py
internal_messageTrue = intermodule; False = group/individual
def sendBMSG(self, targetPartyID, bmsg, bmsg_length, andruav_message_id, internal_message, message_cmd):
ParameterDescription
bmsgbytes — binary payload
bmsg_lengthLength of bmsg (or 0 for JSON-only)
message_cmdMetadata dict placed in the JSON header

Wire format: JSON_header.encode() + b'\0' + bmsg

def send_sys_msg(self, jmsg, andruav_message_id):   # system message to communicator
def sendMREMSG(self, command_type):                  # module remote-execute command
def forwardMSG(self, message, datalength):           # forward raw bytes
def sendMSG(self, msg, length):                      # low-level raw send

Receiving

# Assign your callback:
c_module.m_OnReceive = lambda message, msg_len, j_msg: ...

State

c_module.m_party_id   # set after communicator responds
c_module.m_group_id   # set after communicator responds
c_module.m_module_key

CFacade_Base (alias FacadeBase)

from de_facade_base import CFacade_Base
facade = CFacade_Base()   # singleton
facade.set_module(c_module)

Methods

def set_module(self, module):   # inject CModule reference

def request_id(self, target_party_id: str):

def send_error_message(self, target_party_id: str, error_number: int,
                       info_type: int, notification_type: int, description: str):
notification_typeValue
NOTIFICATION_TYPE_EMERGENCY0
NOTIFICATION_TYPE_ALERT1
NOTIFICATION_TYPE_CRITICAL2
NOTIFICATION_TYPE_ERROR3
NOTIFICATION_TYPE_WARNING4
NOTIFICATION_TYPE_NOTICE5
NOTIFICATION_TYPE_INFO6
NOTIFICATION_TYPE_DEBUG7
def api_send_config_template(self, target_party_id: str, module_key: str,
                             json_file_content_json: dict, reply: bool):

Extending CFacade_Base

from de_facade_base import CFacade_Base
from messages import TYPE_AndruavMessage_DUMMY

class MySensorFacade(CFacade_Base):
    def send_sensor_reading(self, target_party_id: str, value: float):
        import time
        self._module.sendJMSG(target_party_id,
            {"sensor_value": value, "ts": int(time.time() * 1e6)},
            TYPE_AndruavMessage_DUMMY, False)

facade = MySensorFacade()
facade.set_module(c_module)
facade.send_sensor_reading("", 23.5)

AndruavMessageParserBase (alias CAndruavMessageParserBase)

Abstract base for inbound message dispatch. Subclass and override the two abstract methods.

from de_message_parser_base import AndruavMessageParserBase

class MyParser(AndruavMessageParserBase):
    def __init__(self, facade):
        super().__init__()
        self._facade = facade

    def parse_remote_execute(self, andruav_message: dict):
        # handle TYPE_AndruavMessage_RemoteExecute
        pass

    def parse_command(self, andruav_message, full_message, full_message_length,
                      message_type, permission):
        from messages import TYPE_AndruavMessage_GPS
        if message_type == TYPE_AndruavMessage_GPS:
            cmd = andruav_message.get(ANDRUAV_PROTOCOL_MESSAGE_CMD, {})
            print(f"GPS: {cmd.get('lat')}, {cmd.get('lng')}")

Wire up to receive:

parser = MyParser(facade)
c_module.m_OnReceive = lambda message, msg_len, j_msg: \
    parser.parse_message(j_msg, message, msg_len)

State properties:

parser.is_binary        # True if message contains binary payload
parser.is_system        # True if sender is the communicator server
parser.is_inter_module  # True if routing type == CMD_TYPE_INTERMODULE

parse_message automatically handles TYPE_AndruavMessage_CONFIG_ACTION (restart, apply-config, fetch-config-template) before calling parse_command.


Module Constants

# Module classes (de_module.py)
MODULE_CLASS_GENERIC = "gen"
MODULE_CLASS_FCB     = "fcb"
MODULE_CLASS_VIDEO   = "camera"
MODULE_CLASS_P2P     = "p2p"
MODULE_CLASS_COMM    = "comm"

# Module features
MODULE_FEATURE_RECEIVING_TELEMETRY = "R"
MODULE_FEATURE_SENDING_TELEMETRY   = "T"
MODULE_FEATURE_CAPTURE_IMAGE       = "C"
MODULE_FEATURE_CAPTURE_VIDEO       = "V"

# Hardware types
HARDWARE_TYPE_UNDEFINED = 0
HARDWARE_TYPE_CPU       = 1

ConfigFile

Reads a JSON configuration file with C-style comment support (// and /* */), file monitoring, backup creation, and hot-reload.

from configFile import ConfigFile

config = ConfigFile.get_instance()
config.init_config_file("mymodule.config.json")
json_data = config.GetConfigJSON()
port = json_data.get("s2s_udp_target_port", "60000")

LocalConfigFile

Stores per-instance data (e.g., the persistent module_key) in a local file.

from localConfigFile import LocalConfigFile

local_config = LocalConfigFile.get_instance()
local_config.InitConfigFile("mymodule.local")
module_key = local_config.getStringField("module_key")
if not module_key:
    import time
    module_key = str(int(time.time() * 1e6))
    local_config.addStringField("module_key", module_key)
    local_config.apply()

Package Structure

droneengage_databus/python/
├── __init__.py               # Package init with re-exports
├── de_module.py              # CModule — main module interface
├── udpClient.py              # CUDPClient — UDP with chunking/reassembly
├── de_facade_base.py         # FacadeBase (CFacade_Base) — high-level send API
├── de_message_parser_base.py # AndruavMessageParserBase — abstract parser
├── configFile.py             # ConfigFile — JSON config with hot-reload
├── localConfigFile.py        # LocalConfigFile — persistent local settings
├── messages.py               # All TYPE_* and protocol constants
├── colors.py / console_colors.py  # ANSI color helpers
└── python_client.py          # Working example client

Message Handling

Message Filter

from messages import TYPE_AndruavMessage_GPS, TYPE_AndruavMessage_MAVLINK

c_module.defineModule("gen", "GPSMonitor", module_key, "1.0.0",
    [TYPE_AndruavMessage_GPS, TYPE_AndruavMessage_MAVLINK])

Empty [] = receive no messages. None or omit = receive all.

Dispatch Pattern

from messages import (ANDRUAV_PROTOCOL_MESSAGE_TYPE, ANDRUAV_PROTOCOL_MESSAGE_CMD,
                      TYPE_AndruavMessage_GPS)

def on_receive(message, msg_len, j_msg):
    try:
        msg_type = j_msg.get(ANDRUAV_PROTOCOL_MESSAGE_TYPE)
        cmd      = j_msg.get(ANDRUAV_PROTOCOL_MESSAGE_CMD, {})
        if msg_type == TYPE_AndruavMessage_GPS:
            print(f"GPS: lat={cmd.get('lat')}, lng={cmd.get('lng')}")
    except Exception as e:
        print(f"Error: {e}")

c_module.m_OnReceive = on_receive

Binary Transmission

from messages import TYPE_AndruavMessage_IMG
import time

with open("photo.jpg", "rb") as f:
    image_data = f.read()

metadata = {"lat": 31.5, "lng": 34.5, "alt": 100,
            "tim": int(time.time() * 1e6)}

c_module.sendBMSG("", image_data, len(image_data),
                  TYPE_AndruavMessage_IMG, False, metadata)

Custom Message Types

from messages import TYPE_AndruavMessage_USER_RANGE_START

TYPE_MY_SENSOR_DATA = TYPE_AndruavMessage_USER_RANGE_START + 0
TYPE_MY_CMD_ACK     = TYPE_AndruavMessage_USER_RANGE_START + 1

c_module.sendJMSG("", {"temperature": 25.5}, TYPE_MY_SENSOR_DATA, False)

Configuration Management

Reading Config Values

from configFile import ConfigFile

config   = ConfigFile.get_instance()
config.init_config_file("mymodule.config.json")
json_cfg = config.GetConfigJSON()

target_ip   = json_cfg.get("s2s_udp_target_ip", "0.0.0.0")
target_port = int(json_cfg.get("s2s_udp_target_port", 60000))
listen_port = int(json_cfg.get("s2s_udp_listening_port", 61111))

c_module.init(target_ip, target_port, "0.0.0.0", listen_port, 8192)

Hot-Reload Monitoring

ConfigFile supports watching the config file for changes:

config.startMonitoring()

Config Template

The parser’s parse_command can serve a template.json to the WebClient:

# Handled automatically by AndruavMessageParserBase._handle_config_action
# when TYPE_AndruavMessage_CONFIG_ACTION with action CONFIG_REQUEST_FETCH_CONFIG_TEMPLATE
# is received — it reads template.json and calls:
self._facade.api_send_config_template(sender, module_key, json_content, True)

Graceful Shutdown

import signal, sys

def signal_handler(signum, frame):
    c_module.uninit()
    sys.exit(0)

signal.signal(signal.SIGINT,  signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

Console Colors

from console_colors import Colors

print(Colors.INFO_CONSOLE_BOLD_TEXT + "Starting module..." + Colors.NORMAL_CONSOLE_TEXT)
print(Colors.SUCCESS_CONSOLE_BOLD_TEXT + "Connected!" + Colors.NORMAL_CONSOLE_TEXT)
print(Colors.ERROR_CONSOLE_TEXT + "Connection failed" + Colors.NORMAL_CONSOLE_TEXT)

Best Practices

  1. Persistent module key — Use LocalConfigFile to generate and store a module_key on first run.

  2. Unique listen port — Each module instance must bind a different port.

  3. Minimal message filter — List only TYPE_* values you handle.

  4. Wrap m_OnReceive in try/except — Exceptions inside will silently stop message delivery.

  5. Call uninit() on shutdown — Stops UDP threads and sockets cleanly.

  6. Use CFacade_Base — Extend it rather than calling sendJMSG directly from application code.

  7. add_module_features uses snake_case — not addModuleFeatures.


Troubleshooting

IssueSolution
Module not in WebClient DetailsCheck c_module.m_party_id is non-empty after init
Messages not receivedVerify TYPE_* is in message_filter list
ModuleNotFoundError: messagesRun from python/ directory or install with pip install -e .
Binary data corruptVerify bmsg_length == len(bmsg)
m_OnReceive not calledEnsure c_module.init(...) was called after m_OnReceive was set
Config not foundPass absolute or cwd-relative path to init_config_file

See Also