Prerequisites
For this tutorial, you will need:
- A C compiler (e.g., GCC or Clang)
- The libzmq library
- Python 3
- ZeroMQ bindings for python
Install them on Fedora with:
$ dnf install clang zeromq zeromq-devel python3 python3-zmq
For Debian or Ubuntu:
$ apt-get install clang libzmq5 libzmq3-dev python3 python3-zmq
If you run into any issues, refer to each project's installation instructions (which are linked above).
Writing the hardware-interfacing library
Since this is a hypothetical scenario, this tutorial will write a fictitious library with two functions:
- fancyhw_init() to initiate the (hypothetical) hardware
- fancyhw_read_val() to return a value read from the hardware
Save the library's full source code to a file named libfancyhw.h:
This library can simulate the data you want to pass between languages, thanks to the random number generator.
Designing a C interface
The following will go step-by-step through writing the C interface—from including the libraries to managing the data transfer.
Libraries
Begin by loading the necessary libraries (the purpose of each library is in a comment in the code):
// For printf() #include <stdio.h> // For EXIT_* #include <stdlib.h> // For memcpy() #include <string.h> // For sleep() #include <unistd.h> #include <zmq.h> #include "libfancyhw.h"
Significant parameters
Define the main function and the significant parameters needed for the rest of the program:
int main(void) { const unsigned int INIT_PARAM = 12345; const unsigned int REPETITIONS = 10; const unsigned int PACKET_SIZE = 16; const char *TOPIC = "fancyhw_data"; ...
Initialization
Both libraries need some initialization. The fictitious one needs just one parameter:
fancyhw_init(INIT_PARAM);
The ZeroMQ library needs some real initialization. First, define a context—an object that manages all the sockets:
void *context = zmq_ctx_new(); if (!context) { printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno)); return EXIT_FAILURE; }
Then define the socket used to deliver data. ZeroMQ supports several types of sockets, each with its application. Use a publish socket (also known as PUB socket), which can deliver copies of a message to multiple receivers. This approach enables you to attach several receivers that will all get the same messages. If there are no receivers, the messages will be discarded (i.e., they will not be queued). Do this with:
void *data_socket = zmq_socket(context, ZMQ_PUB);
The socket must be bound to an address so that the clients know where to connect. In this case, use the TCP transport layer (there are other options, but TCP is a good default choice):
const int rb = zmq_bind(data_socket, "tcp://*:5555"); if (rb != 0) { printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno)); return EXIT_FAILURE; }
Next, calculate some useful values that you will need later. Note TOPIC in the code below; PUB sockets need a topic to be associated with the messages they send. Topics can be used by the receivers to filter messages:
Sending messages
Start a loop that sends REPETITIONS messages:
for (unsigned int i = 0; i < REPETITIONS; i++) { ...
Before sending a message, fill a buffer of PACKET_SIZE values. The library provides signed integers of 16 bits. Since the dimension of an int in C is not defined, use an int with a specific width:
int16_t buffer[PACKET_SIZE]; for (unsigned int j = 0; j < PACKET_SIZE; j++) { buffer[j] = fancyhw_read_val(); } printf("Read %u data values\n", PACKET_SIZE);
The first step in message preparation and delivery is creating a ZeroMQ message and allocating the memory necessary for your message. This empty message is an envelope to store the data you will ship:
zmq_msg_t envelope; const int rmi = zmq_msg_init_size(&envelope, envelope_size); if (rmi != 0) { printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %s\n", zmq_strerror(errno)); zmq_msg_close(&envelope); break; }
Now that the memory is allocated, store the data in the ZeroMQ message "envelope." The zmq_msg_data() function returns a pointer to the beginning of the buffer in the envelope. The first part is the topic, followed by a space, then the binary data. Add whitespace as a separator between the topic and the data. To move along the buffer, you have to play with casts and pointer arithmetic. (Thank you, C, for making things straightforward.) Do this with:
Send the message through the data_socket:
const size_t rs = zmq_msg_send(&envelope, data_socket, 0); if (rs != envelope_size) { printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %s\n", zmq_strerror(errno)); zmq_msg_close(&envelope); break; }
Make sure to dispose of the envelope after you use it:
Clean it up
Because C does not provide garbage collection, you have to tidy up. After you are done sending your messages, close the program with the clean-up needed to release the used memory:
const int rc = zmq_close(data_socket); if (rc != 0) { printf("ERROR: ZeroMQ error occurred during zmq_close(): %s\n", zmq_strerror(errno)); return EXIT_FAILURE; } const int rd = zmq_ctx_destroy(context); if (rd != 0) { printf("Error occurred during zmq_ctx_destroy(): %s\n", zmq_strerror(errno)); return EXIT_FAILURE; } return EXIT_SUCCESS;
The entire C program
Save the full interface library below to a local file called hw_interface.c:
// For printf() #include <stdio.h> // For EXIT_* #include <stdlib.h> // For memcpy() #include <string.h> // For sleep() #include <unistd.h> #include <zmq.h> #include "libfancyhw.h" int main(void) { const unsigned int INIT_PARAM = 12345; const unsigned int REPETITIONS = 10; const unsigned int PACKET_SIZE = 16; const char *TOPIC = "fancyhw_data"; fancyhw_init(INIT_PARAM); void *context = zmq_ctx_new(); if (!context) { printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno)); return EXIT_FAILURE; } void *data_socket = zmq_socket(context, ZMQ_PUB); const int rb = zmq_bind(data_socket, "tcp://*:5555"); if (rb != 0) { printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno)); return EXIT_FAILURE; } const size_t topic_size = strlen(TOPIC); const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t); printf("Topic: %s; topic size: %zu; Envelope size: %zu\n", TOPIC, topic_size, envelope_size); for (unsigned int i = 0; i < REPETITIONS; i++) { int16_t buffer[PACKET_SIZE]; for (unsigned int j = 0; j < PACKET_SIZE; j++) { buffer[j] = fancyhw_read_val(); } printf("Read %u data values\n", PACKET_SIZE); zmq_msg_t envelope; const int rmi = zmq_msg_init_size(&envelope, envelope_size); if (rmi != 0) { printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %s\n", zmq_strerror(errno)); zmq_msg_close(&envelope); break; } memcpy(zmq_msg_data(&envelope), TOPIC, topic_size); memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1); memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t)); const size_t rs = zmq_msg_send(&envelope, data_socket, 0); if (rs != envelope_size) { printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %s\n", zmq_strerror(errno)); zmq_msg_close(&envelope); break; } zmq_msg_close(&envelope); printf("Message sent; i: %u, topic: %s\n", i, TOPIC); sleep(1); } const int rc = zmq_close(data_socket); if (rc != 0) { printf("ERROR: ZeroMQ error occurred during zmq_close(): %s\n", zmq_strerror(errno)); return EXIT_FAILURE; } const int rd = zmq_ctx_destroy(context); if (rd != 0) { printf("Error occurred during zmq_ctx_destroy(): %s\n", zmq_strerror(errno)); return EXIT_FAILURE; } return EXIT_SUCCESS; }
Compile using the command:
$ clang -std=c99 -I. hw_interface.c -lzmq -o hw_interface
If there are no compilation errors, you can run the interface. What's great is that ZeroMQ PUB sockets can run without any applications sending or retrieving data. That reduces complexity because there is no obligation in terms of which process needs to start first.
Run the interface:
$ ./hw_interface Topic: fancyhw_data; topic size: 12; Envelope size: 45 Read 16 data values Message sent; i: 0, topic: fancyhw_data Read 16 data values Message sent; i: 1, topic: fancyhw_data Read 16 data values ... ...
The output shows the data being sent through ZeroMQ. Now you need an application to read the data.
Write a Python data processor
You are now ready to pass the data from C to a Python application.
Libraries
You need two libraries to help transfer data. First, you need ZeroMQ bindings in Python:
$ python3 -m pip install zmq
The other is the struct library, which decodes binary data. It's commonly available with the Python standard library, so there's no need to pip install it.
The first part of the Python program imports both of these libraries:
import zmq import struct
Significant parameters
To use ZeroMQ, you must subscribe to the same topic used in the constant TOPIC above:
topic = "fancyhw_data".encode('ascii') print("Reading messages with topic: {}".format(topic))
Initialization
Next, initialize the context and the socket. Use a subscribe socket (also known as a SUB socket), which is the natural partner of the PUB socket. The socket also needs to subscribe to the right topic:
with zmq.Context() as context: socket = context.socket(zmq.SUB) socket.connect("tcp://127.0.0.1:5555") socket.setsockopt(zmq.SUBSCRIBE, topic) i = 0 ...
Receiving messages
Start an infinite loop that waits for new messages to be delivered to the SUB socket. The loop will be closed if you press Ctrl+C or if an error occurs:
try: while True: ... # we will fill this in next except KeyboardInterrupt: socket.close() except Exception as error: print("ERROR: {}".format(error)) socket.close()
The loop waits for new messages to arrive with the recv() method. Then it splits whatever is received at the first space to separate the topic from the content:
binary_topic, data_buffer = socket.recv().split(b' ', 1)
Decoding messages
Python does yet not know that the topic is a string, so decode it using the standard ASCII encoding:
topic = binary_topic.decode(encoding = 'ascii') print("Message {:d}:".format(i)) print("\ttopic: '{}'".format(topic))
The next step is to read the binary data using the struct library, which can convert shapeless binary blobs to significant values. First, calculate the number of values stored in the packet. This example uses 16-bit signed integers that correspond to an "h" in the struct format:
packet_size = len(data_buffer) // struct.calcsize("h") print("\tpacket size: {:d}".format(packet_size))
By knowing how many values are in the packet, you can define the format by preparing a string with the number of values and their types (e.g., "16h"):
struct_format = "{:d}h".format(packet_size)
Convert that binary blob to a series of numbers that you can immediately print:
data = struct.unpack(struct_format, data_buffer) print("\tdata: {}".format(data))
The full Python program
Here is the complete data receiver in Python:
#! /usr/bin/env python3 import zmq import struct topic = "fancyhw_data".encode('ascii') print("Reading messages with topic: {}".format(topic)) with zmq.Context() as context: socket = context.socket(zmq.SUB) socket.connect("tcp://127.0.0.1:5555") socket.setsockopt(zmq.SUBSCRIBE, topic) i = 0 try: while True: binary_topic, data_buffer = socket.recv().split(b' ', 1) topic = binary_topic.decode(encoding = 'ascii') print("Message {:d}:".format(i)) print("\ttopic: '{}'".format(topic)) packet_size = len(data_buffer) // struct.calcsize("h") print("\tpacket size: {:d}".format(packet_size)) struct_format = "{:d}h".format(packet_size) data = struct.unpack(struct_format, data_buffer) print("\tdata: {}".format(data)) i += 1 except KeyboardInterrupt: socket.close() except Exception as error: print("ERROR: {}".format(error)) socket.close()
Save it to a file called online_analysis.py. Python does not need to be compiled, so you can run the program immediately.
Here is the output:
$ ./online_analysis.py Reading messages with topic: b'fancyhw_data' Message 0: topic: 'fancyhw_data' packet size: 16 data: (20946, -23616, 9865, 31416, -15911, -10845, -5332, 25662, 10955, -32501, -18717, -24490, -16511, -28861, 24205, 26568) Message 1: topic: 'fancyhw_data' packet size: 16 data: (12505, 31355, 14083, -19654, -9141, 14532, -25591, 31203, 10428, -25564, -732, -7979, 9529, -27982, 29610, 30475) ... ...
0 Comments