C++ Plugin Development

The C++ implementation provides high-performance, low-latency communication with the DroneEngage DataBus. It is ideal for performance-critical applications, hardware integration, and production drone modules. The library lives in droneengage_common/de_databus/ and is used by all C++ DroneEngage components including the full MAVLink bridge (droneengage_mavlink).

Features

  • C++17 Standard — Modern C++ with RAII, lambdas, and STL concurrency

  • Namespace de::comm — All library classes under de::comm::

  • Singleton PatternCModule, CFacade_Base are global singletons

  • Thread-Safe Sendstd::mutex-protected sendJMSG / sendBMSG

  • Automatic Chunking — Large messages split and reassembled transparently

  • Config-File IntegrationCConfigFile / CLocalConfigFile for production deployments

  • Extensible Parser — Subclass CAndruavMessageParserBase to handle any command


Quick Start

Build

cd droneengage_databus/client
mkdir build && cd build
cmake ..
make

Run

./client --help                          # Show help
./client MyModule 60000 61111            # Module name, DE comm port, listen port
./client MyModule                        # Uses defaults (60000 / 61111)

Minimal Working Module

#include "de_common/de_databus/de_module.hpp"
using namespace de::comm;
using Json_de = nlohmann::json;

int main() {
    CModule& cModule = CModule::getInstance();

    cModule.defineModule(
        MODULE_CLASS_GENERIC,   // module_class
        "MyModule",             // module_id  (display name)
        "unique-key-001",       // module_key (persistent GUID)
        "1.0.0",                // version
        Json_de::array()        // message_filter (empty = receive nothing)
    );

    cModule.addModuleFeatures(MODULE_FEATURE_SENDING_TELEMETRY);
    cModule.addModuleFeatures(MODULE_FEATURE_RECEIVING_TELEMETRY);

    cModule.init("0.0.0.0", 60000, "0.0.0.0", 61111,
                 DEFAULT_UDP_DATABUS_PACKET_SIZE);

    while (true) {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        Json_de msg = {{"t", "hello"}};
        cModule.sendJMSG("", msg, TYPE_AndruavMessage_DUMMY, true);
    }
}

Core Concepts

Namespace

All classes are under de::comm::. Either use:

using namespace de;
using namespace de::comm;

or qualify fully: de::comm::CModule::getInstance().

Singleton Access

de::comm::CModule&      cModule = de::comm::CModule::getInstance();
de::comm::CFacade_Base& cFacade = de::comm::CFacade_Base::getInstance();

Callback Chain

CModule::onReceive()          ← called by CUDPClient after chunk reassembly
  → validates routing fields
  → handles TYPE_AndruavModule_ID (stores m_party_id / m_group_id)
  → calls m_OnReceive(message, len, jMsg)    ← your callback
      → your callback calls parser.parseMessage(jMsg, message, len)
          → parseRemoteExecute() or parseCommand()  ← your overrides

Registration Flow

After init(), the ID broadcaster sends TYPE_AndruavModule_ID every second. On the first communicator response, CModule::onReceive stores:

m_party_id  // accessible via cModule.getPartyId()
m_group_id  // accessible via cModule.getGroupId()

Complete API Reference

de::comm::CModule

Singleton module manager. Handles module registration, message routing, and send operations.

static CModule& getInstance();

Initialization

void defineModule(
    std::string module_class,    // MODULE_CLASS_* constant
    std::string module_id,       // Display name shown in WebClient
    std::string module_key,      // Unique persistent GUID
    std::string module_version,  // Version string, e.g. "1.0.0"
    Json_de     message_filter   // Array of TYPE_* ints; empty = receive none
);

bool init(
    const std::string targetIP,    // Communicator IP ("0.0.0.0" = localhost)
    int broadcatsPort,             // Communicator port (typically 60000)
    const std::string host,        // Local bind IP ("0.0.0.0")
    int listenningPort,            // Local listen port (unique per module)
    int chunkSize                  // Max UDP payload (DEFAULT_UDP_DATABUS_PACKET_SIZE)
);

bool uninit();   // Stop UDP threads and clean up

Module Configuration

void addModuleFeatures(const std::string feature);   // MODULE_FEATURE_* constant
void setHardware(const std::string hardware_serial,
                 const ENUM_HARDWARE_TYPE hardware_serial_type);
void setModuleId(const std::string module_id);
void setModuleClass(const std::string module_class);
void setModuleKey(const char* module_key);
void appendExtraField(const std::string name, const Json_de& ms);

Sending

void sendJMSG(
    const std::string targetPartyID,  // "" = broadcast; or specific party ID
    const Json_de jmsg,               // Message payload object
    const int andruav_message_id,     // TYPE_AndruavMessage_* constant
    const bool internal_message       // true = intermodule; false = group/individual
);

void sendBMSG(
    const std::string& targetPartyID,
    const char* bmsg,                 // Binary payload bytes
    const int bmsg_length,
    const int& andruav_message_id,
    const bool& internal_message,
    const Json_de& message_cmd        // Metadata JSON in the header
);

void sendSYSMSG(const Json_de& jmsg, const int& andruav_message_id);
void sendMREMSG(const int& command_type);
void forwardMSG(const char* message, const std::size_t datalength);
void sendMSG(const char* msg, const int length);  // low-level raw send

Receiving

void setMessageOnReceive(void (*onReceive)(const char*, int len, Json_de jMsg));

Callback signature:

void onReceive(const char* message, int len, Json_de jMsg) {
    const int msgType = jMsg[ANDRUAV_PROTOCOL_MESSAGE_TYPE].get<int>();
    const Json_de& cmd = jMsg[ANDRUAV_PROTOCOL_MESSAGE_CMD];
    // dispatch on msgType ...
}

Getters

const std::string getModuleKey() const;
const std::string getPartyId()  const;   // Set after communicator responds
const std::string getGroupId()  const;   // Set after communicator responds
const Json_de     getModuleFeatures() const;

de::comm::CUDPClient

Low-level UDP transport — normally used indirectly through CModule.

void init(const char* targetIP, int broadcatsPort,
          const char* host, int listenningPort, int chunkSize);
void start();
void stop();
void setJsonId(std::string jsonID);   // Sets the ID packet broadcast periodically
void sendMSG(const char* msg, const int length);
bool isStarted() const;
ConstantValueDescription
DEFAULT_UDP_DATABUS_PACKET_SIZE8192Recommended chunk size
MAX_UDP_DATABUS_PACKET_SIZE0xFFFFMaximum allowed chunk size

Chunking protocol:

  • Each chunk carries a 2-byte little-endian header (chunk sequence number).

  • The final chunk is marked with header 0xFFFF.

  • ~10 ms delay between chunks to reduce packet loss under congestion.


de::comm::CFacade_Base

Singleton. Holds m_module reference to CModule. Provides high-level send helpers. Subclass to add domain-specific methods.

static CFacade_Base& getInstance();

void requestID(const std::string& target_party_id) const;

void sendErrorMessage(
    const std::string& target_party_id,
    const int& error_number,        // e.g. ERROR_USER_DEFINED
    const int& info_type,           // component reporting the error
    const int& notification_type,   // e.g. NOTIFICATION_TYPE_INFO
    const std::string& description
) const;

void API_sendConfigTemplate(
    const std::string& target_party_id,
    const std::string& module_key,
    const Json_de& json_file_content_json,
    const bool reply
);

Extending CFacade_Base:

class CMyFacade : public de::comm::CFacade_Base {
public:
    static CMyFacade& getInstance() {
        static CMyFacade instance;
        return instance;
    }

    void sendSensorReading(const std::string& target, double value) const {
        Json_de msg = {{"sensor_value", value}, {"ts", get_time_usec()}};
        m_module.sendJMSG(target, msg, TYPE_AndruavMessage_DUMMY, false);
    }
};

de::comm::CAndruavMessageParserBase

Abstract base for inbound message dispatch. Subclass and override the two pure-virtual methods.

// Call from your m_OnReceive callback:
void parseMessage(Json_de& andruav_message, const char* full_message,
                  const int& full_message_length);

Must override (pure virtual):

protected:
    virtual void parseRemoteExecute(Json_de& andruav_message) = 0;

    virtual void parseCommand(Json_de& andruav_message,
                              const char* full_message,
                              const int& full_message_length,
                              int messageType,
                              uint32_t permission) = 0;

State flags set before dispatch:

protected:
    bool m_is_binary;        // true if message contains binary payload
    bool m_is_system;        // true if sender is the communicator server
    bool m_is_inter_module;  // true if routing type == CMD_TYPE_INTERMODULE

    de::comm::CFacade_Base& m_facade = de::comm::CFacade_Base::getInstance();

parseMessage automatically handles TYPE_AndruavMessage_CONFIG_ACTION (restart, apply-config, fetch-config-template) before calling parseCommand.


Module Class Constants

#define MODULE_CLASS_COMM          "comm"    // Communication module
#define MODULE_CLASS_FCB           "fcb"     // Flight control board
#define MODULE_CLASS_VIDEO         "camera"  // Camera / video
#define MODULE_CLASS_P2P           "p2p"     // Peer-to-peer
#define MODULE_CLASS_GENERIC       "gen"     // General purpose
#define MODULE_CLASS_GPIO          "gpio"    // GPIO hardware
#define MODULE_CLASS_A_RECOGNITION "ai_rec"  // AI recognition
#define MODULE_CLASS_TRACKING      "trk"     // Target tracking
#define MODULE_CLASS_VIEWLINK      "vlk"     // Viewlink camera

Module Feature Constants

#define MODULE_FEATURE_RECEIVING_TELEMETRY  "R"
#define MODULE_FEATURE_SENDING_TELEMETRY    "T"
#define MODULE_FEATURE_CAPTURE_IMAGE        "C"
#define MODULE_FEATURE_CAPTURE_VIDEO        "V"
#define MODULE_FEATURE_GPIO                 "G"
#define MODULE_FEATURE_AI_RECOGNITION       "A"
#define MODULE_FEATURE_TRACKING             "K"
#define MODULE_FEATURE_P2P                  "P"

Hardware Type Enum

typedef enum {
    HARDWARE_TYPE_UNDEFINED = 0,
    HARDWARE_TYPE_CPU       = 1
} ENUM_HARDWARE_TYPE;

Message System

Routing Logic in sendJMSG

internal_messagetargetPartyIDRouting type
trueanyCMD_TYPE_INTERMODULE
false""CMD_COMM_GROUP
falsenon-emptyCMD_COMM_INDIVIDUAL

Wire Protocol Fields

ConstantJSON keyMeaning
INTERMODULE_MODULE_KEY"mk"Sender module key
INTERMODULE_ROUTING_TYPE"ty"Routing type string
ANDRUAV_PROTOCOL_TARGET_ID"tp"Destination party ID
ANDRUAV_PROTOCOL_MESSAGE_TYPE"mt"Message type integer
ANDRUAV_PROTOCOL_MESSAGE_CMD"ms"Payload object

Custom User Message Types

#define TYPE_AndruavMessage_USER_RANGE_START  80000
#define TYPE_AndruavMessage_USER_RANGE_END    90000

#define TYPE_MY_SENSOR_DATA  TYPE_AndruavMessage_USER_RANGE_START + 0
#define TYPE_MY_CMD_ACK      TYPE_AndruavMessage_USER_RANGE_START + 1

Examples

1. Basic Module Communication (client.cpp)

./client MyModule 60000 61111
./client --help
CModule& cModule = CModule::getInstance();

cModule.defineModule(MODULE_CLASS_GENERIC, "MyModule",
                     generateRandomModuleID(), "0.0.1",
                     Json_de::array());
cModule.addModuleFeatures(MODULE_FEATURE_SENDING_TELEMETRY);
cModule.addModuleFeatures(MODULE_FEATURE_RECEIVING_TELEMETRY);
cModule.setHardware("123456", ENUM_HARDWARE_TYPE::HARDWARE_TYPE_CPU);
cModule.init("0.0.0.0", target_port, "0.0.0.0", listen_port,
             DEFAULT_UDP_DATABUS_PACKET_SIZE);

while (!exit_me) {
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    Json_de message = {{"t", "THIS IS A TEST MESSAGE"}};
    cModule.sendJMSG("", message, TYPE_AndruavMessage_DUMMY, true);
}

2. Binary Data Transmission (image_sender.cpp)

./image_sender ./img.jpeg
char* content = nullptr;
int content_length = 0;
readBinaryFile(file_name, content, content_length);

Json_de msg_cmd;
msg_cmd["lat"] = 0; msg_cmd["lng"] = 0;
msg_cmd["alt"] = 0; msg_cmd["tim"] = get_time_usec();

// Wire: JSON header + '\0' + image bytes
cModule.sendBMSG("", content, content_length,
                 TYPE_AndruavMessage_IMG, false, msg_cmd);
delete[] content;


4. Adaptive Rate Control — Sender (sender_adapter.cpp)

./sender_adapter sender_mod 60000 1000
#define TYPE_CUSTOM_DATA        TYPE_AndruavMessage_USER_RANGE_START + 0
#define TYPE_CUSTOM_CHANGE_RATE TYPE_AndruavMessage_USER_RANGE_START + 1

int delay = 1000; int counter = 0;

void onReceive(const char*, int, Json_de jMsg) {
    if (jMsg[ANDRUAV_PROTOCOL_MESSAGE_TYPE].get<int>() == TYPE_CUSTOM_CHANGE_RATE) {
        const int delta = jMsg["ms"]["value"].get<int>();
        delay = (delta == 0) ? delay / 2 : delay + delta;
    }
}

while (!exit_me) {
    std::this_thread::sleep_for(std::chrono::milliseconds(delay));
    Json_de msg = {{"t", "DATA"}, {"counter", ++counter}};
    cModule.sendJMSG("", msg, TYPE_CUSTOM_DATA, true);
}

5. Queue-Based Processing — Receiver (receiver_adapter.cpp)

./receiver_adapter receiver_mod 60000 1000
std::queue<std::vector<char>> messageQueue;
std::mutex queueMutex;
std::condition_variable queueCV;

void onReceive(const char* message, int len, Json_de) {
    std::unique_lock<std::mutex> lock(queueMutex, std::defer_lock);
    if (lock.try_lock()) {
        messageQueue.push(std::vector<char>(message, message + len));
        queueCV.notify_one();
    }
}

void processMessages() {
    while (!messageQueue.empty()) {
        std::this_thread::sleep_for(std::chrono::milliseconds(delay));
        // Backpressure: tell sender to slow down if queue is growing
        Json_de fb = {{"value", (diff > 0 && counter > 2) ? +2*(int)counter : 0}};
        cModule.sendJMSG("sender_mod", fb, TYPE_CUSTOM_CHANGE_RATE, false);
    }
}


Port Configuration

ExampleListen Port
client.cppUser-defined (default 61111)
image_sender.cpp50000
mavlink_listener.cpp70014
sender_adapter.cpp70034
receiver_adapter.cpp70024

Communicator (de_comm) listens on port 60000 by default.


Build Flags

FlagEffect
-DDEBUGEnable basic debug output
-DDDEBUGVerbose: print every sent/received message
-DDE_DISABLE_TRYDisable try-catch in onReceive (performance profiling)

Thread Safety

  • sendJMSG and sendBMSG hold std::lock_guard<std::mutex> on m_lock.

  • CUDPClient::sendMSG uses a separate m_lock2 mutex for the send socket.

  • Never call sendJMSG while holding m_lock inside a callback (deadlock risk).

  • For high-frequency receives, push to a std::queue in the callback and process in a separate thread (see receiver_adapter.cpp).

  • Use try_lock in callbacks to avoid blocking the receive thread.


Best Practices

  1. Persistent module key — Store in CLocalConfigFile so the communicator recognizes the module across restarts.

  2. Unique listen port — Each running module instance must bind a different port.

  3. Minimal message filter — List only TYPE_* values your module handles; reduces overhead.

  4. Delete binary buffersreadBinaryFile allocates with new[]; call delete[] after sendBMSG.

  5. Non-blocking receive callback — Push to a queue for slow processing; keep the callback fast.

  6. uninit() on shutdown — Always call cModule.uninit() before exit() to stop UDP threads.

  7. Random or timestamp module key — Use generateRandomModuleID() for testing; use CLocalConfigFile for production.


Troubleshooting

IssueSolution
Module not in WebClient DetailsVerify communicator is on port 60000; check getPartyId() is non-empty
Port already in use errorChoose a different listenPort
Messages not receivedEnsure TYPE_* is in your MESSAGE_FILTER array
Segfault in receive callbackCheck jMsg.contains(ANDRUAV_PROTOCOL_MESSAGE_CMD) before accessing
High CPU usageAdd sleep_for in main loop; avoid busy-wait
Binary message corruptVerify bmsg_length exactly matches the allocated buffer
Module disconnects after restartUse persistent module_key from CLocalConfigFile

Additional Resources

  • Overview page./de-custom-plugins.md

  • Python implementation./de-custom-plugins-python.md

  • Node.js implementation./de-custom-plugins-nodejs.md

  • Core library sourcedroneengage_common/de_databus/

  • Test examples sourcedroneengage_databus/client/test/

  • Production exampledroneengage_mavlink/src/

  • Message type definitionsde_common/de_databus/messages.hpp

  • Protocol internalsdroneengage_common/README.md