闘地主ゲーム実現9:コードの4(データベース処理API)
20148 ワード
game_db.h
game_db.c
#ifndef GAMEDB_H
#define GAMEDB_H
typedef enum {SELECT,UPDATE,STMT_SELECT,STMT_UPDATE} task_type;
typedef struct _db_callback db_callback;
typedef struct _db_tasks db_tasks;
db_tasks *task_root,*task_last;
sem_t task_queue_count;
typedef int DB_HANDER(db_tasks*);
db_tasks *addDBTask(task_type,char *,int,DB_HANDER *);
db_tasks *addCallbackDBTask(task_type,char *,int,db_callback *);
db_tasks *doWaitDBTask(task_type ,char *,int,MYSQL_BIND *,MYSQL_BIND *,int,int);
MYSQL **sql_con;
/*MYSQL *maint_con;*/
int *dbstatus;
void freetask(db_tasks *);
int init_db_thread();
void releaseGameDb();
struct _db_tasks
{
int db_con;
task_type type;
char *sql;
unsigned int sql_size;
int exec_status;
int row_count;
MYSQL_RES *res_set;
db_callback *callback;
db_tasks *next;
db_tasks *pre;
MYSQL_STMT *stmt;
int param_count;
MYSQL_BIND *bind_param;
MYSQL_BIND *bind_result;
};
struct _db_callback
{
DB_HANDER *handler;
order_request_response *RS;
};
#endif /* GAMEDB_H */
game_db.c
/*
* DEBUG: section 81
*/
#include "game.h"
char *sql_user = NULL;
char *sql_password = NULL;
char *sql_host = NULL;
char *sql_database = NULL;
static unsigned int _sql_port = 0;
/* */
#define MAX_DBCONNS 10
int dbconnects = MAX_DBCONNS;
#define ERRORSLEEP 5
#define SLEEPSECS 30
sem_t task_queue_count;
pthread_mutex_t task_queue_mutex;
pthread_cond_t task_queue_cond;
int *threadDbId;
pthread_t *threadId;
int ph_sleep(unsigned int,unsigned int);
static int exitDbThreadFlag = 0;
static db_tasks *dodbTask(db_tasks *,int);
static MYSQL *sql_open_con()
{
MYSQL *_conn = NULL;
if (!(_conn = mysql_init(NULL)))
debug(81, 1) ("Unable to allocate MySQL data structure
");
#if MYSQL_VERSION_ID > 32349
mysql_options(_conn, MYSQL_READ_DEFAULT_GROUP, "client");
#endif
if (!(mysql_real_connect(_conn, sql_host, sql_user, sql_password, sql_database, _sql_port, NULL, 0))) {
debug(81, 1) ("Error connecting to MySQL server at %s:
%u (%s)
", sql_host,mysql_errno(_conn),mysql_error(_conn));
return NULL;
}
return _conn;
}
static MYSQL *sql_reopen(MYSQL * sql)
{
MYSQL *new_sql = NULL;
if (!(new_sql = mysql_init(NULL)))
return NULL;
#if MYSQL_VERSION_ID > 32349
mysql_options(new_sql, MYSQL_READ_DEFAULT_GROUP, "client");
#endif
if (!(mysql_real_connect(new_sql, sql_host, sql_user, sql_password, sql_database, _sql_port, NULL, 0)))
{
mysql_close(new_sql);
return NULL;
}
mysql_close(sql);
return new_sql;
}
void pthread_mutex_push_clean_rs(void *arg) {
debug(81, 1)("pthread_mutex_unlock
");
pthread_mutex_unlock((pthread_mutex_t *)arg);
}
void freetask(db_tasks *task) {
if(task) {
if(task->sql)
xfree(task->sql);
if(task->res_set)
mysql_free_result(task->res_set);
if(task->stmt)
mysql_stmt_close(task->stmt);
if(task->bind_param)
xfree(task->bind_param);
if(task->bind_result)
xfree(task->bind_result);
xfree(task);
}
}
db_tasks *addCallbackDBTask(task_type type,char *sqlst,int sql_size,db_callback *callback) {
db_tasks *task=(db_tasks*) xmalloc(sizeof(db_tasks));
debug(82, 2) ("%s(%s,%d)
",__FUNCTION__,__FILE__,__LINE__);
bzero(task,sizeof(db_tasks));
task->type=type;
task->sql = sqlst;
task->sql_size=sql_size;
task->callback = callback;
/*
pthread_cleanup_push(pthread_mutex_push_clean_rs,(void *)&task_queue_mutex);
*/
pthread_mutex_lock (&task_queue_mutex);
debug(81, 5)("start addTask
");
if(task_root == NULL) {
task_root = task;
task_last = task;
}
else {
task->pre = task_last;
task_last->next = task;
task_last = task;
}
task_root->pre = NULL;
task_last->next = NULL;
if(task)
sem_post (&task_queue_count);
debug(81, 5)("end addTask
");
pthread_mutex_unlock (&task_queue_mutex);
/*pthread_cleanup_pop(0);*/
return task;
}
db_tasks *addDBTask(task_type type,char *sqlst,int sql_size,DB_HANDER *dbHander) {
unsigned char *sql;
db_tasks *task=(db_tasks*) xmalloc(sizeof(db_tasks));
bzero(task,sizeof(db_tasks));
task->type=type;
sql = xmalloc(sql_size+1);
memcpy(sql,sqlst,sql_size);
sql[sql_size] = '\0';
task->sql = sql;
task->sql_size=sql_size;
task->callback = NULL;
pthread_cleanup_push(pthread_mutex_push_clean_rs,(void *)&task_queue_mutex);
pthread_mutex_lock (&task_queue_mutex);
debug(81, 5)("start addTask
");
if(task_root == NULL) {
task_root = task;
task_last = task;
}
else {
task->pre = task_last;
task_last->next = task;
task_last = task;
}
task_root->pre = NULL;
task_last->next = NULL;
if(task)
sem_post (&task_queue_count);
debug(81, 5)("end addTask
");
pthread_mutex_unlock (&task_queue_mutex);
pthread_cleanup_pop(0);
return task;
}
/*
void *_add_db_task_thread(int *arg) {
int i = arg[0];
int sleept =0;
int docount = 0;
while(docount < 1000) {
docount++;
addDBTask(SELECT,"select * from gamedb.game",strlen("select * from gamedb.game"),NULL);
sleept = rand()%100000;
ph_sleep(rand()%1,sleept);
}
return NULL;
}
*/
db_tasks *getFirstTask() {
db_tasks *task = task_root;
if(task == NULL)
return NULL;
task_root = task_root->next;
if(task_root != NULL)
task_root->pre = NULL;
task->next = NULL;
task->pre = NULL;
return task;
}
db_tasks *doWaitDBTask(task_type type,char *sqlst,int sql_size,MYSQL_BIND *bind_param,MYSQL_BIND *bind_result,int param_count,int db_con) {
unsigned char *sql;
db_tasks *task=(db_tasks*) xmalloc(sizeof(db_tasks));
bzero(task,sizeof(db_tasks));
task->type=type;
sql = xmalloc(sql_size+1);
memcpy(sql,sqlst,sql_size);
sql[sql_size] = '\0';
task->sql = sql;
task->sql_size=sql_size;
task->callback = NULL;
task->bind_param = bind_param;
task->bind_result = bind_result;
task->param_count = param_count;
return dodbTask(task,db_con);
}
static db_tasks *dodbTask(db_tasks *task,int db_con) {
int mysql_errsta = 0;
if(task == NULL)
return NULL;
while ((mysql_errsta = mysql_ping(sql_con[db_con])) != 0)
{
debug(81, 1) ("Error(%s) occurred when ping to MySQL server at %s(%s,%d)
",mysql_error(sql_con[db_con]),__FUNCTION__,__FILE__,__LINE__);
sleep(ERRORSLEEP);
if(task->stmt != NULL)
mysql_stmt_close(task->stmt);
task->stmt = NULL;
sql_con[db_con] = sql_reopen(sql_con[db_con]);
if(sql_con[db_con] != NULL)
mysql_select_db(sql_con[db_con],sql_database);
}
task->db_con = db_con;
task->exec_status = 1;
if(task->type == SELECT) {
if((task->exec_status = mysql_real_query(sql_con[db_con],task->sql,task->sql_size)) == 0) {
task->res_set=mysql_store_result(sql_con[db_con]);
task->row_count = mysql_num_rows(task->res_set);
}
else {
debug(81, 1) ("Error exec sql at %s:
%u (%s)
", sql_host,mysql_errno(sql_con[db_con]),mysql_error(sql_con[db_con]));
}
}
else if(task->type == UPDATE) {
if((task->exec_status = mysql_real_query(sql_con[db_con],task->sql,task->sql_size)) == 0) {
task->row_count = mysql_affected_rows(sql_con[db_con]);
if(task->row_count < 1) {
debug(81, 2)(">>>>>>>>>readd Task:%s
",task->sql);
}
}
}
else {
if(task->stmt == NULL) {
task->stmt = mysql_stmt_init(sql_con[db_con]);
if (!task->stmt){
debug(81, 2)(" mysql_stmt_init(), out of memory
");
return task;
}
if(mysql_stmt_prepare(task->stmt, task->sql, task->sql_size)) {
debug(81, 2)(" mysql_stmt_prepare() failed
");
debug(81, 2)(" %s
", mysql_stmt_error(task->stmt));
return task;
}
if(mysql_stmt_param_count(task->stmt) != task->param_count) {
debug(81, 2)(" invalid parameter count returned by MySQL
");
return task;
}
if (task->bind_param != NULL && mysql_stmt_bind_param(task->stmt, task->bind_param)) {
debug(81, 2)(" mysql_stmt_bind_param() failed
");
debug(81, 2)(" %s
", mysql_stmt_error(task->stmt));
return task;
}
}
task->exec_status = mysql_stmt_execute(task->stmt);
if(!task->exec_status) {
if(task->type == STMT_UPDATE)
task->row_count = mysql_stmt_affected_rows(task->stmt);
else task->row_count = mysql_stmt_num_rows(task->stmt);
if (task->bind_result != NULL && mysql_stmt_bind_result(task->stmt, task->bind_result)) {
debug(81, 2)(" mysql_stmt_bind_result() failed
");
debug(81, 2)(" %s
", mysql_stmt_error(task->stmt));
return task;
}
mysql_stmt_store_result(task->stmt);
}
}
return task;
}
void endTask(db_tasks *task,int tid) {
assert(task);
debug(82, 2) ("%s(%s,%d)
",__FUNCTION__,__FILE__,__LINE__);
/*
MYSQL_ROW row;
unsigned int i;
while((row = mysql_fetch_row(task->res_set)) != NULL) {
for(i = 0;i<mysql_num_fields(task->res_set);i++) {
if(i>0)
debug(81, 5)("\t");
debug(81, 5)("%s",row[i] != NULL?row[i]:"NULL");
}
}
mysql_free_result(task->res_set);
free(task->sql);
free(task);
*/
db_callback *callback = task->callback;
if(callback && callback->handler) {
callback->handler(task);
}
debug(82, 2) ("%s(%s,%d)
",__FUNCTION__,__FILE__,__LINE__);
freetask(task);
}
void *_db_task_thread(int *arg) {
int i = arg[0];
db_tasks *task;
debug(81, 2)("run thread %d
",i);
sql_con[i] = sql_open_con();
if(sql_con[i] == NULL) {
debug(81, 2)("sql_con[%d] is NULL
",i);
return NULL;
}
#if MYSQL_VERSION_ID > 50000
mysql_set_character_set(sql_con[i], "gbk");
#endif
mysql_select_db(sql_con[i],sql_database);
/*int sleept = 0;*/
pthread_cleanup_push(pthread_mutex_push_clean_rs,(void *)&task_queue_mutex);
while (1) {
task = NULL;
debug(81, 2)("thread %d sem_wait
",i);
sem_wait (&task_queue_count);
if(exitDbThreadFlag) {
pthread_exit(NULL);
}
int mysql_errsta = 0;
while ((mysql_errsta = mysql_ping(sql_con[i])) != 0)
{
debug(81, 1) ("Error(%s) occurred when ping to MySQL server at %s(%s,%d)
",mysql_error(sql_con[i]),__FUNCTION__,__FILE__,__LINE__);
ph_sleep(ERRORSLEEP,0);
sql_con[i] = sql_reopen(sql_con[i]);
if(sql_con[i] != NULL)
mysql_select_db(sql_con[i],sql_database);
}
//debug(81, 5)("thread %d pthread_mutex_lock
",i);
pthread_mutex_lock (&task_queue_mutex);
debug(81, 2)("thread %d getTask
",i);
task = getFirstTask();
//debug(81, 5)("thread %d getFirstTask sql: %s
",i,task->sql);
//pthread_cond_signal(&task_queue_cond);
pthread_mutex_unlock (&task_queue_mutex);
if(task) {
dodbTask(task,i);
endTask(task,i);
}
/*
sleept = rand()%100000;
ph_sleep(rand()%1,sleept);
*/
}
pthread_cleanup_pop(0);
}
int init_db_thread() {
int i = 0;
/*
sql_user = xstrdup("root");
sql_password = xstrdup("root");
sql_host = xstrdup("127.0.0.1");
sql_database = xstrdup("zsgame");
*/
sql_user = xstrdup("root");
sql_password = xstrdup("root");
sql_host = xstrdup("127.0.0.1");
sql_database = xstrdup("zsgame");
_sql_port = 3306;
threadDbId = xmalloc(sizeof(int)*(dbconnects+1));
threadId = xmalloc(sizeof(pthread_t)*(dbconnects+1));
sql_con = (MYSQL**)xmalloc(sizeof(MYSQL*)*dbconnects);
dbstatus = xmalloc(sizeof(int)*dbconnects);
sem_init (&task_queue_count, 0, 0);
pthread_mutex_init (&task_queue_mutex,NULL);
debug(81, 2)("init_db_thread
");
for(i = 0;i<dbconnects - 1;i++) {
threadDbId[i] = i;
debug(81, 2)("create thread %d
",threadDbId[i]);
pthread_create(&threadId[i],NULL,(void *)_db_task_thread,&threadDbId[i]);
pthread_detach(threadId[i]);
/*pthread_join(th,NULL);
printf("sleep 2
");*/
}
threadDbId[i] = i;
debug(81, 2)("create system config thread %d
",threadDbId[i]);
pthread_create(&threadId[i],NULL,(void *)_sys_config_thread,&threadDbId[i]);
pthread_detach(threadId[i]);
/*
maint_con = sql_open_con();
mysql_select_db(maint_con,sql_database);
*/
/*pthread_t th;
pthread_create(&th,NULL,(void *)_add_db_task_thread,&xxx[i]);
pthread_detach(th);
sleep(200);
*/
return 0;
}
static void getGameSetting(int dbcon) {
char *sqlst = "select game_rate from zsgame.t_game where game_id = 1";
db_tasks *task = doWaitDBTask(SELECT,sqlst,strlen(sqlst), NULL,NULL,0,dbcon);
if(task->row_count > 0) {
MYSQL_ROW row = mysql_fetch_row(task->res_set);
unsigned char *colvalue = row[0];
if(colvalue != NULL) {
game_rate = atof(colvalue);
}
}
freetask(task);
}
static void getGameBombsSetting(int dbcon) {
char *sqlst = "select total_game,lowest_game,every_game_bomb from zsgame.t_game_bomb_setting";
db_tasks *task = doWaitDBTask(SELECT,sqlst,strlen(sqlst), NULL,NULL,0,dbcon);
if(task->row_count > 0) {
MYSQL_ROW row = mysql_fetch_row(task->res_set);
unsigned char *colvalue = row[0];
if(colvalue != NULL) {
maxbombFrq = atoi(colvalue);
}
colvalue = row[1];
if(colvalue != NULL) {
minbombFrq = atoi(colvalue);
}
colvalue = row[2];
if(colvalue != NULL) {
minBombCount = atoi(colvalue);
maxBombCount = minBombCount;
}
}
freetask(task);
}
/*
static void testProcedure(int dbcon) {
char *sqlst = "call t_player_score_procedure(6, 3.5, 1,@abcd)";
char *sqlst2 = "SELECT @abcd";
db_tasks *task = doWaitDBTask(UPDATE,sqlst,strlen(sqlst), NULL,NULL,0,dbcon);
printf("task->exec_status = %d
",task->exec_status);
freetask(task);
task = doWaitDBTask(SELECT,sqlst2,strlen(sqlst2), NULL,NULL,0,dbcon);
printf("task->exec_status = %d
",task->exec_status);
printf("task->row_count = %d
",task->row_count);
if(task->row_count > 0) {
MYSQL_ROW row = mysql_fetch_row(task->res_set);
unsigned char *colvalue = row[0];
if(colvalue != NULL) {
printf("*******************************
");
printf("call t_player_score_procedure
value= %s
",colvalue);
printf("*******************************
");
}
}
freetask(task);
}
*/
void *_sys_config_thread(int *arg) {
int i = arg[0];
MYSQL_ROW row;
db_tasks *task;
unsigned char *colvalue;
sql_con[i] = sql_open_con();
if(sql_con[i] == NULL) {
debug(81, 2)("sql_con[%d] is NULL
",i);
return NULL;
}
#if MYSQL_VERSION_ID > 50000
mysql_set_character_set(sql_con[i], "gbk");
#endif
mysql_select_db(sql_con[i],sql_database);
getGameSetting(i);
getGameBombsSetting(i);
gameActiveSetting(i);
/*testProcedure(i);*/
while(1) {
char *sqlst = "select update_tables_id,table_name,table_id from zsgame.t_update_tables where is_update=1";
task = doWaitDBTask(SELECT,sqlst,strlen(sqlst), NULL,NULL,0,i);
while(task->row_count > 0) {
row = mysql_fetch_row(task->res_set);
colvalue = row[1];
if(colvalue) {
if(strcmp(colvalue,"t_game_bomb_setting") == 0)
getGameBombsSetting(i);
else if(strcmp(colvalue,"t_game") == 0)
getGameSetting(i);
else if(strcmp(colvalue,"t_active") == 0)
gameActiveSetting(i);
}
colvalue = row[0];
if(colvalue) {
char sql[512] = {'\0'};
strcat(sql,"delete from zsgame.t_update_tables where update_tables_id=");
strcat(sql,colvalue);
freetask(doWaitDBTask(UPDATE,sql,strlen(sql), NULL,NULL,0,i));
}
(task->row_count)--;
}
freetask(task);
debug(81, 2)("ph_sleep %d seconds
",SLEEPSECS);
ph_sleep(SLEEPSECS,0);
}
}
void releaseGameDb() {
int i;
exitDbThreadFlag = 1;
for(i = 0; i < dbconnects; i++) {
if(sql_con[i] != NULL)
mysql_close(sql_con[i]);
}
xfree(threadDbId);
xfree(sql_user);
xfree(sql_password);
xfree(sql_host);
xfree(sql_database);
xfree(threadId);
xfree(sql_con);
xfree(dbstatus);
sem_destroy(&task_queue_count);
pthread_mutex_destroy(&task_queue_mutex);
}
int ph_sleep(unsigned int sleepSecond,unsigned int sleepusec)
{
struct timeval t_timeval ={sleepSecond,sleepusec};
//struct timespec ts;
// ts.tv_sec = sleepSecond;
//ts.tv_nsec = sleepusec;
t_timeval.tv_sec = sleepSecond;
t_timeval.tv_usec = sleepusec;
select( 0, NULL, NULL, NULL, &t_timeval );
//pthread_cond_t mycond = PTHREAD_COND_INITIALIZER;
//pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;
//pthread_cond_timedwait(&mycond, &mymutex, &ts);
return 0;
}
/*
gcc game_db.c -o gamedb -lpthread `mysql_config --libs` `mysql_config --include`
*/
#ifndef GAME_H
int main(int argc, char *argv[])
{
srand((int) time(NULL));
init_db_thread();
int i = 0;
return ;
}
#endif /* GAME_H */