Linux IPCプロセス間通信_メッセージ伝達.カーネルソース分析

13995 ワード

Linux IPCメッセージキューの分析
——Linuxソース解読報告
注:転載自己:http://zhoutall.com/archives/383
Linuxのソースコードを確認するサイトを紹介します.http://lxr.free-electrons.com/ident
一.Linux IPC概要
Linux IPCは主に4つの部分を含みます.最初のUNIX IPC、System V IPC、Posix IPC、およびsocketに基づくIPC.
Linux IPC进程间通信_消息传递_内核源码分析_第1张图片
プロセス間通信には主に以下のような形式があります.信号signalパイプラインpipeによるFIFO信号量Semaphores共有メモリshared memoryメッセージキューmessageソケットsocket
二.Linux IPCニュースキューのソースコード分析
A.ファイルディレクトリ構造include/linux/msg.h
ipc/msg.c
ipc/msguttil.c
B.主要データ構造分析
msg_msg、メッセージの基本データ構造、各msg_msgは、1つのpageのコンテンツを占有し、そのうちの1つは、構造体を格納することに加えて、残りの部分はメッセージの内容を直接記憶するために使用される.ここでは、メッセージのtypeとsizeが記録されています.nextポインタはmsg_を指すために使用されます.msgseg構造
メッセージ・キューの設計では、メッセージの内容が一つのpageより大きいと、チェーンのような構造を使用するが、後のノードはtypeやsizeなどのデータをマークする必要がなく、後のノードはmsg_を使用する.msgsegを表示します
/* one msg_msg structure for each message */
struct msg_msg {
    struct list_head m_list;
    long  m_type;
    int m_ts;           /* message text size */
    struct msg_msgseg* next;
    void *security;
    /* the actual message follows immediately */
};
msg_msgsegは次のチェーンブロックを指すポインタだけを記憶すればいいです.各msg_msgsegも一つのpageの空間を占有します.一つのpageはこの構造体を記憶する以外に、残りの部分はmessageのデータを記憶するために使います.
struct msg_msgseg {
    struct msg_msgseg* next;
    /* the next part of the message follows immediately */
};
メッセージキューmsg_queue
のデータ構造には、message、receiver、sender列のチェーンポインタが含まれ、他の関連データも含まれています.
/* one msq_queue structure for each present queue on the system */
struct msg_queue {
    struct kern_ipc_perm q_perm;
    time_t q_stime;         /* last msgsnd time */
    time_t q_rtime;         /* last msgrcv time */
    time_t q_ctime;         /* last change time */
    unsigned long q_cbytes;     /* current number of bytes on queue */
    unsigned long q_qnum;       /* number of messages in queue */
    unsigned long q_qbytes;     /* max number of bytes on queue */
    pid_t q_lspid;          /* pid of last msgsnd */
    pid_t q_lrpid;          /* last receive pid */
 
    struct list_head q_messages;
    struct list_head q_receivers;
    struct list_head q_senders;
};
msqidudsはmsg_ですqueue上msg_メッセージキューのデータ構造
struct msqid_ds {
    struct ipc_perm msg_perm;
    struct msg *msg_first;      /* first message on queue,unused  */
    struct msg *msg_last;       /* last message in queue,unused */
    __kernel_time_t msg_stime;  /* last msgsnd time */
    __kernel_time_t msg_rtime;  /* last msgrcv time */
    __kernel_time_t msg_ctime;  /* last change time */
    unsigned long  msg_lcbytes; /* Reuse junk fields for 32 bit */
    unsigned long  msg_lqbytes; /* ditto */
    unsigned short msg_cbytes;  /* current number of bytes on queue */
    unsigned short msg_qnum;    /* number of messages in queue */
    unsigned short msg_qbytes;  /* max number of bytes on queue */
    __kernel_ipc_pid_t msg_lspid;   /* pid of last msgsnd */
    __kernel_ipc_pid_t msg_lrpid;   /* last receive pid */
};
キューで眠るreceiverデータ構造
/* one msg_receiver structure for each sleeping receiver */
struct msg_receiver {
    struct list_head    r_list;
    struct task_struct  *r_tsk; //     
 
    int         r_mode;
    long            r_msgtype;
    long            r_maxsize;
 
    struct msg_msg      *volatile r_msg; //            
};
列の上で眠るsenderのデータ構造
/* one msg_sender for each sleeping sender */
struct msg_sender {
    struct list_head    list;
    struct task_struct  *tsk; //     
};
C.ツール関数分析
ロード.msgは、メッセージを送る時に関数を簡略化して次のように分析します.
struct msg_msg *load_msg(const void __user *src, int len)
{
    struct msg_msg *msg;
    struct msg_msgseg **pseg;
    int alen;
  // #define DATALEN_MSG (PAGE_SIZE-sizeof(struct msg_msg))
  // DATALEN_MSG   page  msg_msg      ,     ,    page            
    //          page   
  alen = len;
    if (alen > DATALEN_MSG)
        alen = DATALEN_MSG;
  //kmalloc msg    
    msg = kmalloc(sizeof(*msg) + alen, GFP_KERNEL);
  //            ,      msg                 ,  msg+1       page      msg      
    copy_from_user(msg + 1, src, alen)
    //      msg       page    ,      page,     page    msg_msgseg   
  len -= alen;
    src = ((char __user *)src) + alen;
    pseg = &msg->next;
  //while  ,      page        text
    while (len > 0) {
        struct msg_msgseg *seg;
  //            ,         msg_msgseg
        alen = len;
        if (alen > DATALEN_SEG)
            alen = DATALEN_SEG;
        seg = kmalloc(sizeof(*seg) + alen, GFP_KERNEL);
        *pseg = seg;
        seg->next = NULL;
  //   copy_from_user   
        copy_from_user(seg + 1, src, alen)
        pseg = &seg->next;
        len -= alen;
        src = ((char __user *)src) + alen;
    }
  //     msg_msg   
  //       error(        ),    free_msg(msg)         ,        
    return msg;
}
storemsgは、メッセージを受信する時に使用し、主なロジックはロード(u)に類似しています.msg
簡略化されたコードは以下の通りです.
int store_msg(void __user *dest, struct msg_msg *msg, int len)
{
    int alen;
    struct msg_msgseg *seg;
 
    alen = len;
    if (alen > DATALEN_MSG)
        alen = DATALEN_MSG;
  //   copy_from_user,  copy_to_user                    ,         
    copy_to_user(dest, msg + 1, alen)
    len -= alen;
    dest = ((char __user *)dest) + alen;
    seg = msg->next;
    while (len > 0) {
        alen = len;
        if (alen > DATALEN_SEG)
            alen = DATALEN_SEG;
        copy_to_user(dest, seg + 1, alen)
        len -= alen;
        dest = ((char __user *)dest) + alen;
        seg = seg->next;
    }
    return 0;
}
free_msg、前にもう一度ロードを検討します.msgの時にfree_に言及しました.msgの作業は、エラーが発生したときに、msg構造に割り当てられているPagesを解放するために使用されます.
void free_msg(struct msg_msg *msg)
{
    struct msg_msgseg *seg;
  seg = msg->next;
  //kfree kmalloc     ,    msg        free,    next      NULL
    kfree(msg);
    while (seg != NULL) {
        struct msg_msgseg *tmp = seg->next;
        kfree(seg);
        seg = tmp;
    }
}
pipeline_sendでは、メッセージを送るときに呼び出します.条件は、送信プロセスが送信されるときに、たまたまプロセスを受信し、受信する準備ができていることです.
static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
{
    struct list_head *tmp;
 
  //     receiver    receiver
    tmp = msq->q_receivers.next;
    while (tmp != &msq->q_receivers) {
        struct msg_receiver *msr;
  //  msg_receiver     
        msr = list_entry(tmp, struct msg_receiver, r_list);
        tmp = tmp->next;
  //testmsg     type mode                    
        if (testmsg(msg, msr->r_msgtype, msr->r_mode) &&
            !security_msg_queue_msgrcv(msq, msg, msr->r_tsk,
                           msr->r_msgtype, msr->r_mode)) {
 
  //        receiver
            list_del(&msr->r_list);
  //           ,        
            if (msr->r_maxsize < msg->m_ts) {
                msr->r_msg = NULL;
                wake_up_process(msr->r_tsk);
  //      ,         
                smp_mb();
                msr->r_msg = ERR_PTR(-E2BIG);
            } else {
  //      
                msr->r_msg = NULL;
  //msg_queue       
  //  last receieved pid
                msq->q_lrpid = task_pid_vnr(msr->r_tsk);
                //  last msgrcv time
  msq->q_rtime = get_seconds();
  //      
                wake_up_process(msr->r_tsk);
                smp_mb();
  //      
                msr->r_msg = msg;
                return 1;
            }
        }
    }
    return 0;
}
D.他のいくつかのツール関数は新しいmsg_を作成します.queue、主にいくつかの初期化操作を実行します.
static int newque(struct ipccance*ns、struct ipccas*params)
sender列にsender static inline void ss_を追加します.add(struct msguqueue*msq、struct msgusender*mss)
senderを削除して、senderの構造体を観察して、mss->listがこのsenderのlist上のノードstatic inline void ss_であることが分かります.del(struct msgusender*mss)
sender static void ss_を呼び覚まします.wakeup(struct listuh*h,int kill)
receiver static void expune_を呼び覚まします.all(struct msguqueue*msq,int res)
明晰msg_queue、回収キューの空間は主にmsg、sender、receiverの三つの列を一つのノードfreeにスタティックvoid freeque(struct ipcmunamespace*ns、struct kernupcccu*ipcp)がないです.
メッセージを受信する際に呼び出し、メッセージのtypeやmodeなどの情報で判断するとstatic int testmsg(stuct msgumg*msg、long type、int mode)を受信します.
メッセージマッチングのモードを設定し、msgrcvで呼び出し、取得したmodeはtestmsgでstatic inline int convert_を使用します.mode(long*msgtyp,int msgflig)
E.主要なシステム呼び出し
msggaetメッセージキューの取得または作成
SYSCALL_DEFINE2(msgget, key_t, key, int, msgflg)
{
    struct ipc_namespace *ns;
    struct ipc_ops msg_ops;
  struct ipc_params msg_params;
  //       
    ns = current->nsproxy->ipc_ns;
    msg_ops.getnew = newque;
    msg_ops.associate = msg_security;
    msg_ops.more_checks = NULL;
 
    msg_params.key = key;
    msg_params.flg = msgflg;
    ipcget  ,ipc      
    return ipcget(ns, &msg_ids(ns), &msg_ops, &msg_params);
}
msgsnd送信メッセージ
関数実行フロー:
現在のプロセスのipc namespaceを取得します.
ロード.msgはmsg構造を構築し、senderのメッセージ内容を保存し、メッセージはユーザー状態からカーネル状態に移行する.
msgのtypeとsizeを初期化します.
ipc権限チェック
判断msg_queueは十分な空間がありますか?新しいmsgを入れて、下に入れて直接pipelineにジャンプします.send
queueがいっぱいになる時は待ってください.flagGがIPC_かどうか確認してください.NOWAIT、そうですか?
ss_を呼び出しますadd,このsenderをsender列に追加します.
スケジュール待ちプロセスを呼び出します.
last msgsnd time、last msgsdpidを設定してpipelineを呼び出します.sendは直接プロセスを待ってメッセージを送ります.
直接送信に失敗した場合は、作成したmsgをメッセージ待ちキューに入れ、msg gageを修正します.queue対応変数
最後にfree_msg、解放空間
簡略化されたコード:
//       
long do_msgsnd(int msqid, long mtype, void __user *mtext, size_t msgsz, int msgflg)
{
    struct msg_queue *msq;
    struct msg_msg *msg;
    struct ipc_namespace *ns;
    ns = current->nsproxy->ipc_ns;
  //    load_msg
    msg = load_msg(mtext, msgsz);
    msg->m_type = mtype;
    msg->m_ts = msgsz;
    msq->q_lspid = task_tgid_vnr(current);
    msq->q_stime = get_seconds();
    if (!pipelined_send(msq, msg)) {
        /* noone is waiting for this message, enqueue it */
        list_add_tail(&msg->m_list, &msq->q_messages);
        msq->q_cbytes += msgsz;
        msq->q_qnum++;
        atomic_add(msgsz, &ns->msg_bytes);
        atomic_inc(&ns->msg_hdrs);
    }
    msg = NULL;
    if (msg != NULL)
        free_msg(msg);
  return 0;
}
//      
SYSCALL_DEFINE4(msgsnd, int, msqid, struct msgbuf __user *, msgp, size_t, msgsz, int, msgflg)
{
    long mtype;
 
    if (get_user(mtype, &msgp->mtype))
        return -EFAULT;
    return do_msgsnd(msqid, mtype, msgp->mtext, msgsz, msgflg);
}
msgrcv受信メッセージ
関数実行フロー:
convert_.modeは検索モードを設定し、ipcのnamespaceを取得します.
ipc権限チェック
遍歴するメッセージは、testmsgを使用して、要求に合致するmsgが存在するかどうかを確認し、直接設定された検索モードでマッチングする.
適切なメッセージを見つけて、そのmsgをリストから削除し、msg_を設定します.queue関連パラメータは、ss_を呼び出します.wakeupはsenderを呼び覚まして、storeにジャンプします.msg処
storeを使うmsgはメッセージをユーザ空間にコピーします.
free_msg、リリース操作
もし前に遍歴したらメッセージは適切なメッセージが見つからず、IPCを設定しているかどうかを判断します.NOWAIT、設定したらエラーです.
このreceiverをreceiversキューに追加します.
スケジュールを呼び出す
senderがmsgsndを呼び出してpipelineを行うまで待つ.send
簡略化されたコード:
//       
long do_msgrcv(int msqid, long *pmtype, void __user *mtext, size_t msgsz, long msgtyp, int msgflg)
{
    struct msg_queue *msq;
    struct msg_msg *msg;
    int mode;
    struct ipc_namespace *ns;
    mode = convert_mode(&msgtyp, msgflg);
    ns = current->nsproxy->ipc_ns;
    for (;;) {
        struct msg_receiver msr_d;
        struct list_head *tmp;
        tmp = msq->q_messages.next;
        while (tmp != &msq->q_messages) {
            struct msg_msg *walk_msg;
            walk_msg = list_entry(tmp, struct msg_msg, m_list);
            if (testmsg(walk_msg, msgtyp, mode)) {
                msg = walk_msg;
                if (mode == SEARCH_LESSEQUAL && walk_msg->m_type != 1) {
                    msg = walk_msg;
                    msgtyp = walk_msg->m_type - 1;
                } else {
                    msg = walk_msg;
                    break;
                }
            }
            tmp = tmp->next;
        }
        if (!IS_ERR(msg)) {
            //     message   
            list_del(&msg->m_list);
            msq->q_qnum--;
            msq->q_rtime = get_seconds();
            msq->q_lrpid = task_tgid_vnr(current);
            msq->q_cbytes -= msg->m_ts;
            atomic_sub(msg->m_ts, &ns->msg_bytes);
            atomic_dec(&ns->msg_hdrs);
            ss_wakeup(&msq->q_senders, 0);
            msg_unlock(msq);
            break;
        }
        //      message,receiver      , receiver    msg_queue  receiver  
        list_add_tail(&msr_d.r_list, &msq->q_receivers);
        msr_d.r_tsk = current;
        msr_d.r_msgtype = msgtyp;
        msr_d.r_mode = mode;
        msr_d.r_maxsize = msgsz;
        current->state = TASK_INTERRUPTIBLE;
        schedule();
  ....//                 
        list_del(&msr_d.r_list);
        if (signal_pending(current)) {
            break;
        }
    }
    msgsz = (msgsz > msg->m_ts) ? msg->m_ts : msgsz;
    *pmtype = msg->m_type;
    //    store_msg
  store_msg(mtext, msg, msgsz)
    free_msg(msg);
    return msgsz;
}
//      
SYSCALL_DEFINE5(msgrcv, int, msqid, struct msgbuf __user *, msgp, size_t, msgsz, long, msgtyp, int, msgflg)
{
    long mtype;
    do_msgrcv(msqid, &mtype, msgp->mtext, msgsz, msgtyp, msgflg);
    if (put_user(mtype, &msgp->mtype))
        return -EFAULT;
    return 0;
}
三.IPCメッセージキューのまとめ
メッセージ・キューは、Posixメッセージ・キューsystem Vメッセージ・キューを含むメッセージのリンク・テーブルである.十分な権限を持つプロセスは、メッセージをキューに追加し、読み取り権限を付与されたプロセスは、キューのメッセージを読むことができます.メッセージ・キューは、信号伝達情報が少なく、パイプラインは、フォーマットなしのバイト・ストリームおよびバッファサイズ制限などの欠点を運ぶだけであることを克服する.