Wednesday, May 16, 2012

Understanding Message Queues in Linux

Many times we see Inter Process Communication happening in our applications and amongst the big list, one of them is Message Queues.

Here is an attempt to help understand how you can create and use your own message queue.


A Message queue is used to transmit "messages" over the IPC between processes. To achieve this, we first need to 'create' the message queue and then 'send' from one process and 'receive' at the other end.

Lets see how we create a message queue.

mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode,
                     struct mq_attr *attr);
Ex:
MQDes = mq_open(MQName, O_RDWR|O_CREAT|O_EXCL, 777, NULL);

Once you have created your message queue and kept it 'open' to write, you are now ready to 'send' your message.

int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);

Ex:
ret = mq_send(MQDes, Message, sizeof(Message), MESSAGE_PRIO);

With this, the server is ready to send messages on the queue.

At the client end, we now need to 'receive' the message.

ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio); 

Ex:
NoOfBytesRx = mq_receive(MQDes, Message, (MQStat.mq_msgsize) , MESSAGE_PRIO);

Make sure you have opened the same message queue in read mode in the client.

These three calls will help you accomplish the Message Queue you intended to create. 

You might have noticed the 'MQStat.mq_msgsize' in the mq_receive call. Now where did that come from. 

int mq_getattr(mqd_t mqdes, struct mq_attr *attr); 
int mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);

Ex:
ret = mq_getattr(MQDes, &MQStat);

Once you are done, you can close & unlink the message queue with the below calls.

int mq_close(mqd_t mqdes);
Ex:
ret = mq_close(MQDes);

int mq_unlink(const char *name);
Ex:
ret = mq_unlink(MQName);


You might want the client to receive the message only when there is a notification for a new message waiting on the queue. You can subscribe to this with the notify call.

int mq_notify(mqd_t mqdes, const struct sigevent *sevp);

Ex:
SIGNAL.sigev_notify = SIGEV_THREAD;
SIGNAL.sigev_notify_function = RxMessage;
SIGNAL.sigev_notify_attributes = NULL;
ret = mq_notify(MQDes, &SIGNAL);


We are now familiar with all the calls required to make a message queue.

With this information you should be able to successfully write your first Message Queue and test.

Here are some possible caveats that you might fall into.

  • When you create a new Queue, make sure you use the correct prototype. You need to provide the mode & attributes. If you give NULL as the attribute, you accept the default attributes.
  • Make sure you add the "\" in the queue name. ex:"/MQServer"
  • In the mq_receive call, you might run into issues with the message size. It is always advisable to do a 'getattr' and accordingly set the size for receiving. 
  • Unless otherwise stated, the message queue will be in Blocking mode. If you need it to be non-blocking use the O_NONBLOCK flag during open/create.
  • You need to register mq_notify for every message that you receive. Otherwise, after processing the first message, the notification will stop.
  • Note that once the message is consumed from the queue, it is no longer available in the queue and there is no way of reading it back.

Sample Code
Server :

Client:


 
 


No comments:

Post a Comment