Share data between C and Python with this messaging library


Prerequisites

For this tutorial, you will need:
Install them on Fedora with:
dnf install clang zeromq zeromq-devel python3 python3-zmq
For Debian or Ubuntu:
apt-get install clang libzmq5 libzmq3-dev python3 python3-zmq
If you run into any issues, refer to each project's installation instructions (which are linked above).

Writing the hardware-interfacing library

Since this is a hypothetical scenario, this tutorial will write a fictitious library with two functions:
  • fancyhw_init() to initiate the (hypothetical) hardware
  • fancyhw_read_val() to return a value read from the hardware
Save the library's full source code to a file named libfancyhw.h:
#ifndef LIBFANCYHW_H #define LIBFANCYHW_H #include <stdlib.h> #include <stdint.h> // This is the fictitious hardware interfacing library void fancyhw_init(unsigned int init_param) {     srand(init_param); } int16_t fancyhw_read_val(void) {     return (int16_t)rand(); } #endif
This library can simulate the data you want to pass between languages, thanks to the random number generator.

Designing a C interface

The following will go step-by-step through writing the C interface—from including the libraries to managing the data transfer.

Libraries

Begin by loading the necessary libraries (the purpose of each library is in a comment in the code):
// For printf() #include <stdio.h> // For EXIT_* #include <stdlib.h> // For memcpy() #include <string.h> // For sleep() #include <unistd.h> #include <zmq.h> #include "libfancyhw.h"

Significant parameters

Define the main function and the significant parameters needed for the rest of the program:
int main(void) {     const unsigned int INIT_PARAM = 12345;     const unsigned int REPETITIONS = 10;     const unsigned int PACKET_SIZE = 16;     const char *TOPIC = "fancyhw_data";     ...

Initialization

Both libraries need some initialization. The fictitious one needs just one parameter:
fancyhw_init(INIT_PARAM);
The ZeroMQ library needs some real initialization. First, define a context—an object that manages all the sockets:
void *context = zmq_ctx_new(); if (!context) {     printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));     return EXIT_FAILURE; }
Then define the socket used to deliver data. ZeroMQ supports several types of sockets, each with its application. Use a publish socket (also known as PUB socket), which can deliver copies of a message to multiple receivers. This approach enables you to attach several receivers that will all get the same messages. If there are no receivers, the messages will be discarded (i.e., they will not be queued). Do this with:
void *data_socket = zmq_socket(context, ZMQ_PUB);
The socket must be bound to an address so that the clients know where to connect. In this case, use the TCP transport layer (there are other options, but TCP is a good default choice):
const int rb = zmq_bind(data_socket, "tcp://*:5555"); if (rb != 0) {     printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));     return EXIT_FAILURE; }
Next, calculate some useful values that you will need later. Note TOPIC in the code below; PUB sockets need a topic to be associated with the messages they send. Topics can be used by the receivers to filter messages:
const size_t topic_size = strlen(TOPIC); const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t); printf("Topic: %s; topic size: %zu; Envelope size: %zu\n", TOPIC, topic_size, envelope_size);

Sending messages

Start a loop that sends REPETITIONS messages:
for (unsigned int i = 0; i < REPETITIONS; i++) {     ...
Before sending a message, fill a buffer of PACKET_SIZE values. The library provides signed integers of 16 bits. Since the dimension of an int in C is not defined, use an int with a specific width:
int16_t buffer[PACKET_SIZE]; for (unsigned int j = 0; j < PACKET_SIZE; j++) {     buffer[j] = fancyhw_read_val(); } printf("Read %u data values\n", PACKET_SIZE);
The first step in message preparation and delivery is creating a ZeroMQ message and allocating the memory necessary for your message. This empty message is an envelope to store the data you will ship:
zmq_msg_t envelope; const int rmi = zmq_msg_init_size(&envelope, envelope_size); if (rmi != 0) {     printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %s\n", zmq_strerror(errno));     zmq_msg_close(&envelope);     break; }
Now that the memory is allocated, store the data in the ZeroMQ message "envelope." The zmq_msg_data() function returns a pointer to the beginning of the buffer in the envelope. The first part is the topic, followed by a space, then the binary data. Add whitespace as a separator between the topic and the data. To move along the buffer, you have to play with casts and pointer arithmetic. (Thank you, C, for making things straightforward.) Do this with:
memcpy(zmq_msg_data(&envelope), TOPIC, topic_size); memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1); memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t));
Send the message through the data_socket:
const size_t rs = zmq_msg_send(&envelope, data_socket, 0); if (rs != envelope_size) {     printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %s\n", zmq_strerror(errno));     zmq_msg_close(&envelope);     break; }
Make sure to dispose of the envelope after you use it:
zmq_msg_close(&envelope); printf("Message sent; i: %u, topic: %s\n", i, TOPIC);

Clean it up

Because C does not provide garbage collection, you have to tidy up. After you are done sending your messages, close the program with the clean-up needed to release the used memory:
const int rc = zmq_close(data_socket); if (rc != 0) {     printf("ERROR: ZeroMQ error occurred during zmq_close(): %s\n", zmq_strerror(errno));     return EXIT_FAILURE; } const int rd = zmq_ctx_destroy(context); if (rd != 0) {     printf("Error occurred during zmq_ctx_destroy(): %s\n", zmq_strerror(errno));     return EXIT_FAILURE; } return EXIT_SUCCESS;

The entire C program

Save the full interface library below to a local file called hw_interface.c:
// For printf() #include <stdio.h> // For EXIT_* #include <stdlib.h> // For memcpy() #include <string.h> // For sleep() #include <unistd.h> #include <zmq.h> #include "libfancyhw.h" int main(void) {     const unsigned int INIT_PARAM = 12345;     const unsigned int REPETITIONS = 10;     const unsigned int PACKET_SIZE = 16;     const char *TOPIC = "fancyhw_data";     fancyhw_init(INIT_PARAM);     void *context = zmq_ctx_new();     if (!context)     {         printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));         return EXIT_FAILURE;     }     void *data_socket = zmq_socket(context, ZMQ_PUB);     const int rb = zmq_bind(data_socket, "tcp://*:5555");     if (rb != 0)     {         printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));         return EXIT_FAILURE;     }     const size_t topic_size = strlen(TOPIC);     const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t);     printf("Topic: %s; topic size: %zu; Envelope size: %zu\n", TOPIC, topic_size, envelope_size);     for (unsigned int i = 0; i < REPETITIONS; i++)     {         int16_t buffer[PACKET_SIZE];         for (unsigned int j = 0; j < PACKET_SIZE; j++)         {             buffer[j] = fancyhw_read_val();         }         printf("Read %u data values\n", PACKET_SIZE);         zmq_msg_t envelope;             const int rmi = zmq_msg_init_size(&envelope, envelope_size);         if (rmi != 0)         {             printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %s\n", zmq_strerror(errno));                 zmq_msg_close(&envelope);                 break;         }                 memcpy(zmq_msg_data(&envelope), TOPIC, topic_size);         memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1);         memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t));             const size_t rs = zmq_msg_send(&envelope, data_socket, 0);         if (rs != envelope_size)         {             printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %s\n", zmq_strerror(errno));                 zmq_msg_close(&envelope);                 break;         }             zmq_msg_close(&envelope);         printf("Message sent; i: %u, topic: %s\n", i, TOPIC);         sleep(1);     }     const int rc = zmq_close(data_socket);     if (rc != 0)     {         printf("ERROR: ZeroMQ error occurred during zmq_close(): %s\n", zmq_strerror(errno));         return EXIT_FAILURE;     }     const int rd = zmq_ctx_destroy(context);     if (rd != 0)     {         printf("Error occurred during zmq_ctx_destroy(): %s\n", zmq_strerror(errno));         return EXIT_FAILURE;     }     return EXIT_SUCCESS; }
Compile using the command:
clang -std=c99 -I. hw_interface.c -lzmq -o hw_interface
If there are no compilation errors, you can run the interface. What's great is that ZeroMQ PUB sockets can run without any applications sending or retrieving data. That reduces complexity because there is no obligation in terms of which process needs to start first.
Run the interface:
$ ./hw_interface Topic: fancyhw_data; topic size: 12; Envelope size: 45 Read 16 data values Message sent; i: 0, topic: fancyhw_data Read 16 data values Message sent; i: 1, topic: fancyhw_data Read 16 data values ... ...
The output shows the data being sent through ZeroMQ. Now you need an application to read the data.

Write a Python data processor

You are now ready to pass the data from C to a Python application.

Libraries

You need two libraries to help transfer data. First, you need ZeroMQ bindings in Python:
python3 -m pip install zmq
The other is the struct library, which decodes binary data. It's commonly available with the Python standard library, so there's no need to pip install it.
The first part of the Python program imports both of these libraries:
import zmq import struct

Significant parameters

To use ZeroMQ, you must subscribe to the same topic used in the constant TOPIC above:
topic = "fancyhw_data".encode('ascii') print("Reading messages with topic: {}".format(topic))

Initialization

Next, initialize the context and the socket. Use a subscribe socket (also known as a SUB socket), which is the natural partner of the PUB socket. The socket also needs to subscribe to the right topic:
with zmq.Context() as context:     socket = context.socket(zmq.SUB)     socket.connect("tcp://127.0.0.1:5555")     socket.setsockopt(zmq.SUBSCRIBE, topic)     i = 0     ...

Receiving messages

Start an infinite loop that waits for new messages to be delivered to the SUB socket. The loop will be closed if you press Ctrl+C or if an error occurs:
    try:         while True:             ... # we will fill this in next     except KeyboardInterrupt:         socket.close()     except Exception as error:         print("ERROR: {}".format(error))         socket.close()
The loop waits for new messages to arrive with the recv() method. Then it splits whatever is received at the first space to separate the topic from the content:
binary_topic, data_buffer = socket.recv().split(b' ', 1)

Decoding messages

Python does yet not know that the topic is a string, so decode it using the standard ASCII encoding:
topic = binary_topic.decode(encoding = 'ascii') print("Message {:d}:".format(i)) print("\ttopic: '{}'".format(topic))
The next step is to read the binary data using the struct library, which can convert shapeless binary blobs to significant values. First, calculate the number of values stored in the packet. This example uses 16-bit signed integers that correspond to an "h" in the struct format:
packet_size = len(data_buffer) // struct.calcsize("h") print("\tpacket size: {:d}".format(packet_size))
By knowing how many values are in the packet, you can define the format by preparing a string with the number of values and their types (e.g., "16h"):
struct_format = "{:d}h".format(packet_size)
Convert that binary blob to a series of numbers that you can immediately print:
data = struct.unpack(struct_format, data_buffer) print("\tdata: {}".format(data))

The full Python program

Here is the complete data receiver in Python:
#! /usr/bin/env python3 import zmq import struct topic = "fancyhw_data".encode('ascii') print("Reading messages with topic: {}".format(topic)) with zmq.Context() as context:     socket = context.socket(zmq.SUB)     socket.connect("tcp://127.0.0.1:5555")     socket.setsockopt(zmq.SUBSCRIBE, topic)     i = 0     try:         while True:             binary_topic, data_buffer = socket.recv().split(b' ', 1)             topic = binary_topic.decode(encoding = 'ascii')             print("Message {:d}:".format(i))             print("\ttopic: '{}'".format(topic))             packet_size = len(data_buffer) // struct.calcsize("h")             print("\tpacket size: {:d}".format(packet_size))             struct_format = "{:d}h".format(packet_size)             data = struct.unpack(struct_format, data_buffer)             print("\tdata: {}".format(data))             i += 1     except KeyboardInterrupt:         socket.close()     except Exception as error:         print("ERROR: {}".format(error))         socket.close()
Save it to a file called online_analysis.py. Python does not need to be compiled, so you can run the program immediately.
Here is the output:
$ ./online_analysis.py Reading messages with topic: b'fancyhw_data' Message 0:         topic: 'fancyhw_data'         packet size: 16         data: (20946, -23616986531416, -15911, -10845, -53322566210955, -32501, -18717, -24490, -16511, -288612420526568) Message 1:         topic: 'fancyhw_data'         packet size: 16         data: (125053135514083, -19654, -914114532, -255913120310428, -25564, -732, -79799529, -279822961030475) ... ...

Post a Comment

0 Comments