特集12 IPCのメッセージキュー


1.概要
メッセージキューは、カーネルアドレス空間内の内部チェーンテーブルであり、Linuxカーネルを介して各プロセス間でコンテンツが伝達されます.メッセージはメッセージキューに順次送信され、各メッセージキューはIPC識別子で一意に識別することができるいくつかの異なる方法でキューから取得される.カーネル内のメッセージキューはIPCの識別子によって区別され、異なるメッセージキュー間は相対的に独立している.各メッセージキュー内のメッセージは、独立したチェーンテーブルを構成します.
2.メッセージ・バッファ構造
メッセージ・バッファ構造(テンプレート)は、次のようになります.
struct msgbuf
{
long mtype;//メッセージ・タイプ
char mtext[1];//メッセージデータ、このドメインは必ずしもcharまたは長さが1である必要はなく、実際の状況に応じて設定することができる.
};
カスタム・メッセージ・バッファの例:
struct msgbuf
{
long mtype;
char mtext[10];
long length;
};
3.構造msgid_ds
struct msgid_ds
{
struct ipc_perm msg_perm;//キューのライセンス情報の保存
time_t msg_stime;//キューの最後のメッセージを送信するタイムスタンプ.
time_t msg_rtime;//メッセージキューから最後のメッセージのタイムスタンプを取得します.
time_t msg_ctime;//キューを最後に変更するタイムスタンプ.
unsigned long  __msg_cbytes;//キューに存在するバイトの合計数.(つまり、すべてのメッセージサイズの合計)
msgqnum_t msg_qnum;//現在キューにあるメッセージの数
msglen_t msg_qbytes;//キュー内のメッセージに格納されるバイトの最大数
pid_t msg_lspid;//最後のメッセージプロセスを送信するPID
pid_t msg_lrpid;//最後のメッセージプロセスを受信するPID
};
4.構造ipc_perm
struct ipc_perm
{
key_t key;/*関数msgget()で使用するキー値*/
uid_t uid;/*ユーザのUID*/
gid_t gid;/*ユーザのGID*/
uid_t cuid;/*作成者のUID*/
gid_t cgid;/*ブロックを作成するGID*/
unsigned short mode;/*権限*/
unsigned short seq;/*シリアル番号*/
};
5.キー値構築ftok()関数
ftok()関数は、パス名と項目の表示子をシステムVのIPCキー値に変換し、その原型は以下の通りである.
#include
#include
key_t ftok(const char *pathname, int proj_id);
注意:IPC_を使用することもできます.PRIVATEはキー値を生成します.
id = msgget(IPC_PRIVATE, S_IRUSR | S_IWUSR);
6.メッセージキューに関する関数のプロトタイプ
   
int msgget(key_t key, int msgflg);//メッセージキューの作成
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);//メッセージの送信
msgrcv(int msqid, void *msgq, size_t msgsz, long msgtyp, int msgflg);//メッセージの受信
msgctl(int msgqid, int cmd, struct msqid_ds *buf);//メッセージキュー制御
7.メッセージキュー操作の例
#include
#include
#include
#include
#include
#include
#include
#define BUFSZ 512
struct message
{
long msg_type;
char msg_text[BUFSZ];
};
int main()
{
int qid;
key_t key;
int len;
struct message msg;
//キー値の生成
if((key= ftok(".", 'a')) == -1)
{
perror("ftok");
exit(1);
}
if((qid= msgget(key, IPC_CREAT | 0666)) == -1)
{
perror("msgget");
exit(1);
}
printf("openedqueue %d", qid);
puts("Pleaseenter the message to queue:");
if((fgets(msg.msg_text,BUFSZ, stdin)) == NULL)
{
puts("nomessage");
exit(1);
}
msg.msg_type=getpid();
len= strlen(msg.msg_text);
if(msgsnd(qid,&msg,len,0)) < 0)
{
perror("messageposted");
exit(1);
}
printf("messageis :%s", msg.msg_text);
if(msgctl(qid,IPC_RMID,NULL)) < 0)
{
perror("msgctl");
exit(1);
}
exit(0);
}
コード参照:
Clientはserverにファイルの内容の読み取りを要求する
#include <sys/types.h>
#include <stdlib.h>
#include <sys/msg.h>
#include <sys/stat.h>
#include <stddef.h>     /* For definition of offsetof() */
#include <limits.h>
#include <fcntl.h>
#include <signal.h>
#include <sys/wait.h>
#include <string.h>
#include <errno.h>

#define SERVER_KEY 0x1aaaaaa1     /* Key for server's message queue */

struct requestMsg {                /* Requests (client to server) */
	long mtype;                   /* unused */
	int clientId;                 /* ID of client's message queue */
	char pathname[PATH_MAX];      /* File to be returned */
}; 

#define REQ_MSG_SIZE (offsetof(struct requestMsg, pathname) - \
					offsetof(struct requestMsg, clientId) + PATH_MAX)

#define RESP_MSG_SIZE 8192

struct responseMsg {
	long mtype;
	char data[RESP_MSG_SIZE];
};

/* Types for response messages sent from server to client */
#define RESP_MT_FAILURE 1       /* File couldn't be opened */
#define RESP_MT_DATA    2      /* Message contains file data */
#define RESP_MT_END     3     /* File data complete */

server:
#include "svmsg_file.h"

static void               /* SIGCHLD handler */
grimReaper(int sig)
{
	int savedErrno;
	savedErrno = errno;  /* waitpid() might change 'errno' */
	while (waitpid(-1, NULL, WNOHANG) > 0)
		continue;
	errno = savedErrno;
}

static void
serveRequest(const struct requestMsg *req)  /* Executed in child process: serve a single client */
{
	int fd;
	ssize_t numRead;
	struct responseMsg resp;
	
	fd = open(req->pathname, O_RDONLY);
	if (fd == -1) {                          /* Open failed: send error text */
		resp.mtype = RESP_MT_FAILURE;
		snprintf(resp.data, sizeof(resp.data), "%s", "Couldn't open");
		msgsnd(req->clientId, &resp, strlen(resp.data) + 1, 0);
		exit(EXIT_FAILURE);
	}
	
	resp.mtype = RESP_MT_DATA;
	while ((numRead = read(fd, resp.data, RESP_MSG_SIZE)) > 0)
		if (msgsnd(req->clientId, &resp, numRead, 0) == -1)
			break;
	/* Send a message of type RESP_MT_END to signify end-of-file */
	resp.mtype = RESP_MT_END;
	msgsnd(req->clientId, &resp, 0, 0);    /* Zero-length mtext */
}

int
main(int argc, char *argv[])
{
	struct requestMsg req;
	pid_t pid;
	ssize_t msgLen;
	int serverId;
	struct sigaction sa;
	
	/* Create server message queue */
	serverId = msgget(SERVER_KEY, IPC_CREAT | IPC_EXCL |
						S_IRUSR | S_IWUSR | S_IWGRP);
	if (serverId == -1)
		perror("msgget");
	
	/* Establish SIGCHLD handler to reap terminated children */
	sigemptyset(&sa.sa_mask);
	sa.sa_flags = SA_RESTART;
	sa.sa_handler = grimReaper;
	if (sigaction(SIGCHLD, &sa, NULL) == -1)
		perror("sigaction error!");
	/* Read requests, handle each in a separate child process */
	for (;;) {
		msgLen = msgrcv(serverId, &req, REQ_MSG_SIZE, 0, 0);
		if (msgLen == -1) {
			if (errno == EINTR)
				continue;
			perror("msgrcv error");
			break;
		}
		
		pid = fork(); 
		if (pid == -1) { 
			perror("fork"); 
			break; 
		} 

		if (pid == 0) { 
			serveRequest(&req); 
			_exit(EXIT_SUCCESS); 
		} 
		
	}
	
	/* If msgrcv() or fork() fails, remove server MQ and exit */
	if (msgctl(serverId, IPC_RMID, NULL) == -1)
		perror("msgctl");
	exit(EXIT_SUCCESS);

}

client:
#include "svmsg_file.h"

static int clientId;

static void
removeQueue(void)
{
	if (msgctl(clientId, IPC_RMID, NULL) == -1)
		perror("msgctl");

}

int
main(int argc, char *argv[])
{
	struct requestMsg req;
	struct responseMsg resp;
	int serverId, numMsgs;
	ssize_t msgLen, totBytes;
	
	if (argc != 2 || strcmp(argv[1], "--help") == 0)
		perror("%s pathname
", argv[0]); if (strlen(argv[1]) > sizeof(req.pathname) - 1) perror("pathname too long (max: %ld bytes)
", (long) sizeof(req.pathname) - 1); /* Get server's queue identifier; create queue for response */ serverId = msgget(SERVER_KEY, S_IWUSR); if (serverId == -1) perror("msgget - server message queue"); clientId = msgget(IPC_PRIVATE, S_IRUSR | S_IWUSR | S_IWGRP); if (clientId == -1) perror("msgget - client message queue"); if (atexit(removeQueue) != 0) perror("atexit"); /* Send message asking for file named in argv[1] */ req.mtype = 1; /* Any type will do */ req.clientId = clientId; strncpy(req.pathname, argv[1], sizeof(req.pathname) - 1); req.pathname[sizeof(req.pathname) - 1] = '\0'; if (msgsnd(serverId, &req, REQ_MSG_SIZE, 0) == -1) perror("msgsnd"); /* Get first response, which may be failure notification */ msgLen = msgrcv(clientId, &resp, RESP_MSG_SIZE, 0, 0); if (msgLen == -1) perror("msgrcv"); if (resp.mtype == RESP_MT_FAILURE) { printf("%s
", resp.data); if (msgctl(clientId, IPC_RMID, NULL) == -1) perror("msgctl"); exit(EXIT_FAILURE); } totBytes = msgLen; for (numMsgs = 1; resp.mtype == RESP_MT_DATA; numMsgs++) { msgLen = msgrcv(clientId, &resp, RESP_MSG_SIZE, 0, 0); if (msgLen == -1) perror("msgrcv"); totBytes += msgLen; } printf("Received %ld bytes (%d messages)
", (long) totBytes, numMsgs); exit(EXIT_SUCCESS); }