skynetソース分析(8)--skynetのネットワーク

8804 ワード

作成者:[email protected]、転載は著者のネットワーク部分がサーバーの最も基礎的な最も核心的な部分であることを明記してください.この技術もすでに非常に成熟しており、今では自分でネットワーク関連のライブラリを実現する人は少ないです.skynetのネットワークライブラリは自分で実現します.
ネットワークの下位層の技術はwindows上で完了ポート(IOCP)、linux上でEPOLL、mac/freebsd上でkqueueである.これらの技術は、高負荷高同時ネットワーク要求を負荷することができる.IOCPもEPOLLも非同期IOで、kqueueはよく知らないので、コメントできません.
実は雲風はepollとkqueueしか実現していませんが、windowsの変種は自分で検索してください.
epollとkqueueの実装はそれぞれskynet_epoll.hとepoll_kqueue.hの中.epollの関数は実はepoll_ですcreate/epoll_ctl/epoll_del/epoll_waitのいくつか、注意しなければならないのはskynetのepollです.createのパラメータは1024です.だから接続数が行かないとここが制限されている可能性が高いです.
skynetはskynet_poll.hにはプラットフォームによって異なるヘッダファイルが含まれており,プラットフォーム依存性を遮断している.そしてsocket_server.cではネットワークサービスの論理を実現した.
そしてskynetはskynet_socket.c中対socket_server.cの論理は再びカプセル化され,connect/send/closeのようなsocketクライアント関連の関数も追加された.
lua層のsocketの使用を容易にするためにlua-socket.cでskynet_socket.cは一度パッケージ化された.このパッケージはc言語層とlua言語層の相互変換である.現在はtcpとudpのみがサポートされており、tcp上のhttp/websocketなどに基づいてはサポートされていません.
前にsocketには個別のスレッドがあります.このスレッドのコードは次のとおりです.
static void *
thread_socket(void *p) {
    struct monitor * m = p;
    skynet_initthread(THREAD_SOCKET);
    for (;;) {
        int r = skynet_socket_poll(); // 
        if (r==0)
            break;
        if (r<0) {
            CHECK_ABORT
            continue;
        }
        wakeup(m,0);
    }
    return NULL;
}

socketスレッドはskynet_を呼び出し続けます.socket_Poll、これは普通のネットワークサーバの書き方と同じです.通常のネットワークサーバもsocketを作成し、socketをバインドし、epollに追加し、epoll_waitはイベントの発生を待つ.
skynetの接続にはステータスフローのプロセスがあり、ステータスマシンと理解できます.
#define SOCKET_TYPE_LISTEN 3  // 
#define SOCKET_TYPE_CONNECTING 4 // 
#define SOCKET_TYPE_CONNECTED 5 // 
#define SOCKET_TYPE_HALFCLOSE 6 // , 
#define SOCKET_TYPE_PACCEPT 7 // 
#define SOCKET_TYPE_BIND 8 // 

もう少しsocketを見てみましょうserver.cこのファイルは、多くのことをして、コードの量も比較的に大きくて、1800行以上あります.このファイルは詳しく言えば、1つは紙面が特に大きく、2つは大部分がネットワーク操作であり、skynet自体とはあまり関連性がない.だから詳しくは言わないで、私が重要だと思うものを選んで話すだけです.この記事では、主にネットワークの制御コマンドについて説明します.
 , , , 
/*
    The first byte is TYPE

    S Start socket
    B Bind socket
    L Listen socket
    K Close socket
    O Connect to (Open)
    X Exit
    D Send package (high)
    P Send package (low)
    A Send UDP package
    T Set opt
    U Create UDP socket
    C set udp address
 */
// , 
struct request_package {
    uint8_t header[8];  // 6 bytes dummy
    union {
        char buffer[256];
        struct request_open open;
        struct request_send send;
        struct request_send_udp send_udp;
        struct request_close close;
        struct request_listen listen;
        struct request_bind bind;
        struct request_start start;
        struct request_setopt setopt;
        struct request_udp udp;
        struct request_setudp set_udp;
    } u;
    uint8_t dummy[256];
};
 
// return type
static int
ctrl_cmd(struct socket_server *ss, struct socket_message *result) {
    int fd = ss->recvctrl_fd;
    // the length of message is one byte, so 256+8 buffer size is enough.
    uint8_t buffer[256];
    uint8_t header[2];
    block_readpipe(fd, header, sizeof(header));
    int type = header[0];
    int len = header[1];
    block_readpipe(fd, buffer, len);
    // ctrl command only exist in local fd, so don't worry about endian.
    switch (type) {
    case 'S':
        return start_socket(ss,(struct request_start *)buffer, result);
    case 'B':
        return bind_socket(ss,(struct request_bind *)buffer, result);
    case 'L':
        return listen_socket(ss,(struct request_listen *)buffer, result);
    case 'K':
        return close_socket(ss,(struct request_close *)buffer, result);
    case 'O':
        return open_socket(ss, (struct request_open *)buffer, result);  // connect
    case 'X':
        result->opaque = 0;
        result->id = 0;
        result->ud = 0;
        result->data = NULL;
        return SOCKET_EXIT;
    case 'D':
        return send_socket(ss, (struct request_send *)buffer, result, PRIORITY_HIGH, NULL);
    case 'P':
        return send_socket(ss, (struct request_send *)buffer, result, PRIORITY_LOW, NULL);
    case 'A': {
        struct request_send_udp * rsu = (struct request_send_udp *)buffer;
        return send_socket(ss, &rsu->send, result, PRIORITY_HIGH, rsu->address);
    }
    case 'C':
        return set_udp_address(ss, (struct request_setudp *)buffer, result);
    case 'T':
        setopt_socket(ss, (struct request_setopt *)buffer);
        return -1;
    case 'U':
        add_udp_socket(ss, (struct request_udp *)buffer);
        return -1;
    default:
        fprintf(stderr, "socket-server: Unknown ctrl %c.
",type); return -1; }; return -1; }

「O」を例にこのプロセスを分析すると、skynetはメッセージに基づいており、これはその設計理念である.だからsocketの操作も、メッセージに基づいています.どうやって作ったのか、まずrecvctrlを作成します.fd、recvctrlにメッセージを送信します.fd.そして毎回socket_server_pollが呼び出されると、このrecvctrlがselectされます.fd、メッセージがあるかどうか見てください.ある場合は上のcrl_を呼び出しますcmd関数.
プロセスは大体はっきりしていますが、今から「O」というコマンド、つまり接続を開くのはどのようなプロセスなのかを見てみましょう.1.lua層のconnect関数はlconnect関数に対応し、
static int
lconnect(lua_State *L) {
    size_t sz = 0;
    const char * addr = luaL_checklstring(L,1,&sz);
    char tmp[sz];
    int port = 0;
    const char * host = address_port(L, tmp, addr, 2, &port);
    if (port == 0) {
        return luaL_error(L, "Invalid port");
    }
    struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1));
    int id = skynet_socket_connect(ctx, host, port);  // Lua c c 
    lua_pushinteger(L, id);

    return 1;
}


2.lconnect関数がskynetを呼び出しました_socket_connect関数.
int 
skynet_socket_connect(struct skynet_context *ctx, const char *host, int port) {
    uint32_t source = skynet_context_handle(ctx);
    return socket_server_connect(SOCKET_SERVER, source, host, port);
}

3.skynet_socket_connect関数はまたsocketを調整しましたserver_connect
int 
socket_server_connect(struct socket_server *ss, uintptr_t opaque, const char * addr, int port) {
    struct request_package request;
    int len = open_request(ss, &request, opaque, addr, port);
    if (len < 0)
        return -1;
    send_request(ss, &request, 'O', sizeof(request.u.open) + len); // 'O’
    return request.u.open.id;
}

4.socket_server_connectはsendという名前を呼び出しましたrequestの関数
static void
send_request(struct socket_server *ss, struct request_package *request, char type, int len) {
    request->header[6] = (uint8_t)type;
    request->header[7] = (uint8_t)len;
    for (;;) {
// write ss->send_ctrl_fd
        ssize_t n = write(ss->sendctrl_fd, &request->header[6], len+2);
        if (n<0) {
            if (errno != EINTR) {
                fprintf(stderr, "socket-server : send ctrl command error %s.
", strerror(errno)); } continue; } assert(n == len+2); return; } }

この関数以降、sendctrl_にデータが送信されることが明らかになった.fdという記述子に載っています.5.send_からrequestを見るとsendctrlにデータが届いているのにfdのデータはrecvctrlから取得されますfdで取ったのは、明らかに合わない.これはいったいどういうことですか.はい、確かにそうです.まだコードを見ていないと、これは確かに説明が通じません.
    int fd[2];
    poll_fd efd = sp_create();
    if (sp_invalid(efd)) {
        fprintf(stderr, "socket-server: create event pool failed.
"); return NULL; } if (pipe(fd)) { // ,fd[0] ,fd[1] sp_release(efd); fprintf(stderr, "socket-server: create socket pair failed.
"); return NULL; } if (sp_add(efd, fd[0], NULL)) { // add recvctrl_fd to event poll fprintf(stderr, "socket-server: can't add server fd to event pool.
"); close(fd[0]); close(fd[1]); sp_release(efd); return NULL; } struct socket_server *ss = MALLOC(sizeof(*ss)); ss->event_fd = efd; ss->recvctrl_fd = fd[0]; // ss->sendctrl_fd = fd[1]; // ss->checkctrl = 1;

6.上の数行のコードはsendctrl_を説明しています.fdはパイプの書き込み端、recvctrl_fdはパイプの読み取り端であり,上記5の疑問を説明した.パイプ書き込み端のデータが読み取り端に届くからです.だからsendctrlからfdが書き込まれ、recvctrlに着きます.fdの中.
7.socket_server_Pollでselect関数チェックrecvctrl_fd,メッセージがあればcase制御文に入り,'O',open_を呼び出すsocket、この関数では、おなじみのconnect関数が呼び出されます.ターゲットipとポートに接続する接続を開きます.
なぜ1つのネットワークを操作するのにこんなに苦労するのか、周りを回るのは非常に直感的ではありません.skynetはメッセージベースであり、各サービスにはmonitorがあるため、各メッセージを処理するときはできるだけ短くしなければならない.そうすれば、サービス内の他の要求をブロックしない.connectという明らかにブロックであり、もちろん非ブロックと書くこともできますが、非ブロックでは、非ブロックは実際にselect技術に基づいて実現されているため、絶えず掛ける必要があります.どんどん掛けると、これは面倒で、書くのが苦しくて間違いやすいです.そのため、クラウド風はこれらをネットワークスレッドに配置し、作業スレッドに影響を与えません.しかし、ネットワークスレッドがブロックされる可能性があり、ネットワークスレッドがブロックされるとサービスが応答しないという欠点もあります.あるいは大量のパケット蓄積をもたらし,ピークを引き起こす.