ネットゲームメモリデータベースの設計(2)


続き第1編では、先日コアストレージをいくつか修正しました.以前はリレーショナル・データベースの行と表に対応するつもりでしたが、valueタイプはarrayまたはlistしか使用できませんでした.
データベースをより汎用化するためにvalueがサポートするタイプにも7種類の基本タイプを追加する.
もちろん、これは本文の核心ではなく、本編では主にテストフロントエンドとテストのリモートコールプロトコルを紹介する.
テストフロントエンドのサーバコードを貼り付けます.
#include "netservice.h"
#include "msg_loop.h"
#include "datasocket.h"
#include "SysTime.h"
#include "db_protocal.h"

atomic_32_t wpacket_count = 0;
atomic_32_t rpacket_count = 0;
atomic_32_t buf_count = 0;

global_table_t gtb;
void server_process_packet(datasocket_t s,rpacket_t r)
{
    // 
    cache_protocal_t p;
    uint32_t coro_id = rpacket_read_uint32(r);
    uint8_t type = rpacket_read_uint8(r);
    switch(type)
    {
        case CACHE_GET:
            p = create_get();
            break;
        case CACHE_SET:
            p = create_set();
            break;
        case CACHE_DEL:
            p = create_del();
            break;            
    }
    wpacket_t ret = p->execute(gtb,r,coro_id);
    if(NULL != ret)
        data_send(s,ret);
    destroy_protocal(&p);
}

void process_new_connection(datasocket_t s)
{
    printf("w:%u,r:%u,b:%u
",wpacket_count,rpacket_count,buf_count); } void process_connection_disconnect(datasocket_t s,int32_t reason) { release_datasocket(&s); printf("w:%u,r:%u,b:%u
",wpacket_count,rpacket_count,buf_count); } void process_send_block(datasocket_t s) { // , close_datasocket(s); } const char *ip; uint32_t port; int main(int argc,char **argv) { init_net_service(); ip = argv[1]; port = atoi(argv[2]); netservice_t n = create_net_service(1); gtb = global_table_create(65536); int32_t i = 0; char key[64]; for( ; i < 1000000; ++i) { basetype_t a = basetype_create_int32(i); snprintf(key,64,"test%d",i); a = global_table_insert(gtb,key,a,global_hash(key)); if(!a) printf("error 1
"); basetype_release(&a); } net_add_listener(n,ip,port); msg_loop_t m = create_msg_loop(server_process_packet,process_new_connection,process_connection_disconnect,process_send_block); while(1) { msg_loop_once(m,n,100); } return 0; }

フロントエンドのネットワークモジュールは、前編で紹介するネットワークフレームワークを用いる、起動時にまず100 W本の32ビット型のレコードを挿入し、その後メッセージサイクルに入り、クライアントからの操作要求を絶えず処理する.
取得:CACHE_GET;追加/変更:CACHE_SET;削除さくじょ:CACHE_DEL.
サーバはプロトコルを処理し、結果をクライアントに返す.
次に、クライアントをテストします.
#include "db_protocal.h"
#include "dbtype.h"
#include <stdio.h>
#include "SocketWrapper.h"
#include "SysTime.h"
#include "KendyNet.h"
#include "Connector.h"
#include "Connection.h"
#include "common_define.h"
#include "netservice.h"
#include "msg_loop.h"
#include "co_sche.h"

sche_t g_sche = NULL;
uint32_t call_count = 0;

atomic_32_t wpacket_count = 0;
atomic_32_t rpacket_count = 0;
atomic_32_t buf_count = 0;
datasocket_t db_s;

int8_t test_select(const char *key,int32_t i)
{
    coro_t co = get_current_coro();
    wpacket_t wpk = get_wpacket(64);
    wpacket_write_uint32(wpk,(int32_t)co);
    wpacket_write_uint8(wpk,CACHE_GET);//ÉèÖÃ
    wpacket_write_string(wpk,key);
    data_send(db_s,wpk);
    coro_block(co);
    int8_t ret = rpacket_read_uint8(co->rpc_response);
    rpacket_read_uint8(co->rpc_response);
    int32_t val = rpacket_read_uint32(co->rpc_response);
    if(val != i)
        printf("error
"); //printf("begin
");
rpacket_destroy(&co->rpc_response); //printf("end
");
return ret; } void *test_coro_fun2(void *arg) { coro_t co = get_current_coro(); while(1) { char key[64]; int32_t i = rand()%1000000; snprintf(key,64,"test%d",100); if(0 == test_select(key,100)) ++call_count; } } void server_process_packet(datasocket_t s,rpacket_t r) { coro_t co = (coro_t)rpacket_read_uint32(r); co->rpc_response = rpacket_create_by_rpacket(r); coro_wakeup(co); } void process_new_connection(datasocket_t s) { printf("connect server
"); db_s = s; g_sche = sche_create(20000,65536,NULL,NULL); int i = 0; for(; i < 20000; ++i) { sche_spawn(g_sche,test_coro_fun2,NULL); } } void process_connection_disconnect(datasocket_t s,int32_t reason) { release_datasocket(&s); } void process_send_block(datasocket_t s) { //·¢ËÍ×èÈû,Ö±½Ó¹Ø±Õ close_datasocket(s); } int main(int argc,char **argv) { init_net_service(); const char *ip = argv[1]; uint32_t port = atoi(argv[2]); netservice_t n = create_net_service(1); net_connect(n,ip,port); msg_loop_t m = create_msg_loop(server_process_packet,process_new_connection,process_connection_disconnect,process_send_block); uint32_t tick = GetSystemMs(); while(1) { msg_loop_once(m,n,1); uint32_t now = GetSystemMs(); if(now - tick > 1000) { printf("call_count:%u
",(call_count*1000)/(now-tick)); tick = now; call_count = 0; } if(g_sche) sche_schedule(g_sche); } return 0; }

操作インタフェースはユーザーレベルスレッドを使用して実装され、同期呼び出しインタフェースをサポートします.ユーザーレベルスレッドは要求を発行すると、結果が戻るまで自分をブロックします.
キーセクションはtest_selectは、自分のcoroアドレスをidとしてプロトコルにパッケージし、サーバに送信し、coro_を呼び出します.ブロックブロックブロック.サーバが返すパケット
にも対応したcoro_が付いていましたidは、クライアントのスケジューリングシステムがどのcoroを起動すべきかを通知する.coroが起動する結果パケットから操作結果とデータを読み出し、上位呼び出し者に返す.
テスト結果から,1 W個のcoroクライアントを起動し,1秒間に平均50 W回の操作が可能となった.万人オンラインのMMORPGゲームにはもう十分だと思います.
まだ足りない場合は、表領域の分割によって、複数のメモリ・データベース・プロセスを起動してリクエストをサービスできます.
プロジェクトアドレス:https://github.com/sniperHW/kendylib/tree/master/dbcache