学習ノート:UDPプロセス心拍検査を実現


考え方:
UDPサーバー側:サーバーはクライアントチェーンテーブルを維持し、クライアント接続がある場合、新しいノードを作成してクライアントホスト名とアドレスを保存し、ハートビート時間heart_を保存する.beat_time.
クライアントがハートビートを開始すると、サーバはハートビートパケットがclientノードを更新するハートビート時間を受信します.サービスタイミングはclientチェーンテーブル内の各clientの心拍時間を検出し、心拍時間と現在時間の時間差が一定の時間間隔を超えた場合、そのclientはすでにオフラインであると考えられる.
UDPクライアント:ハートビートパケットをタイミングで送信します.
サービス側コード:
#include   
#include   
#include   
#include   
#include   
#include   
#include   
#include 
#include 
#include 
#include 
#include 

#define UDP_PORT_NUM   (8888)
  
#define CLIENT_LOGIN    100  
#define CLIENT_CHAT     200  
#define CLIENT_QUIT     300  
  
#define SERVER_CHAT     400  
#define SERVER_QUIT     500  
  
#define PRINT_ONLINE    600  
#define PRIVATE_CHAT    700

#define HEART_BEAT      800

static int s_epoll_fd = -1;

typedef enum
{
	E_CLIENT_OFF_LINE = 0,
	E_CLIENT_ON_LINE
}eClientSatus;

struct node   
{  
    char name[20];					// client name
    struct sockaddr_in client_addr; // client addr
	eClientSatus client_status;		// client status
	struct timespec heart_beat_time; 		// each heart beat update this time
    struct node *next;  
};  
  
struct message  
{  
    long type;  
    char name[20];  
    char peer_name[20];  
    char mtext[512];  
};  
  
struct node *create_list(void);  
void insert_list(struct node *, char *, struct sockaddr_in *);  
void delete_list(struct node *, char *);  
  
void recv_message(int , struct node *);  
void send_message(int , struct sockaddr_in *, pid_t );  
  
void client_login(int , struct node *, struct message *, struct sockaddr_in *);  
void client_chat(int , struct node *, struct message *);  
void client_quit(int , struct node *, struct message *);  
void server_chat(int , struct node *, struct message *);  
void server_quit(int , struct node *, struct message *);  
  
void brocast_msg(int , struct node *, struct message *);  
void print_online(int , struct node *, struct message *);  
void private_chat(int , struct node *, struct message *);  
  
void father_func(int sig_no)  
{  
    return ;  
}

/* return ms */
static int calcProcessTime(int *pStartMs)
{
	struct timespec endTime;
	clock_gettime(CLOCK_REALTIME, &endTime);
	if(pStartMs){
		return endTime.tv_sec * 1000 + endTime.tv_nsec / 1000000 - *pStartMs;
	}
	return endTime.tv_sec * 1000 + endTime.tv_nsec / 1000000;
}

static void epollAddEvent(int epollFd, int addFd, int state)
{
	struct epoll_event ev;
	ev.events = state;
	ev.data.fd = addFd;
	epoll_ctl(epollFd, EPOLL_CTL_ADD, addFd, &ev);
	return;
}

void* monitor_thread(void* user)
{
	struct node *head = (struct node *)user;
	struct epoll_event events[1] = {0};
	int cnt = 0;
	while(1)
	{
		int ms = 0;
		ms = calcProcessTime(NULL);
		int fireEvents = epoll_wait(s_epoll_fd, events, 1, -1);
		ms = calcProcessTime(&ms);
		if(fireEvents > 0){
			printf("monitor time out: %d ms
", ms); uint64_t exp; ssize_t size = read(events[0].data.fd, &exp, sizeof(uint64_t)); if(size != sizeof(uint64_t)) { printf("read error!
"); } struct node *p = head; struct timespec nowTime; clock_gettime(CLOCK_REALTIME, &nowTime); while (p != NULL){ /* if 10 second not receive client heart beat, we think the client is offline */ if(p->client_status == E_CLIENT_ON_LINE) { int duration = (nowTime.tv_sec * 1000 + nowTime.tv_nsec / 1000000) - (p->heart_beat_time.tv_sec * 1000 + p->heart_beat_time.tv_nsec / 1000000); if (duration > 10000) { p->client_status = E_CLIENT_OFF_LINE; printf("client: %s had off line
", p->name); } } p = p->next; } printf("All client check finished
"); } else{ printf("fireEvents = %d", fireEvents); } } } int main(int argc, const char *argv[]) { int socket_fd; pid_t pid; struct sockaddr_in server_addr; struct node *head; if (argc < 2){ fprintf(stderr, "usages : %s ip
", argv[0]); exit(-1); } /* create UDP socket */ if ((socket_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0){ perror("failed to create socket"); exit(-1); } /* create client list */ head = create_list(); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(UDP_PORT_NUM); server_addr.sin_addr.s_addr = inet_addr(argv[1]); if (bind(socket_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0){ perror("failed to bind"); exit(-1); } if ((pid = fork()) < 0){ perror("failed to fork pid"); exit(-1); } if (pid == 0){ /* create epoll */ int epollFd = -1; epollFd = epoll_create(1); s_epoll_fd = epollFd; /* create timer */ int timerFd = -1; timerFd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK); /* add timer fd to epoll monitor event */ epollAddEvent(epollFd, timerFd, EPOLLIN); /* create thread to monitor */ pthread_t threadId = -1; pthread_create(&threadId, NULL, &monitor_thread, (void*)head); /* create timer, time out is 10 seconds */ struct itimerspec its; its.it_interval.tv_sec = 10; // 10 seconds its.it_interval.tv_nsec = 0; its.it_value.tv_sec = 10; its.it_value.tv_nsec = 0; if (timerfd_settime(timerFd, 0, &its, NULL) < 0) { fprintf(stderr, "timerfd_settime error
"); } printf("udp server running......
"); recv_message(socket_fd, head); } else{ printf("udp server running......
"); send_message(socket_fd, &server_addr, pid); } return 0; } struct node *create_list(void) { struct node *head; head = (struct node *)malloc(sizeof(struct node)); head->next = NULL; return head; } void insert_list(struct node *head, char *name, struct sockaddr_in *client_addr) { if(!head) return; struct node *new; new = (struct node *)malloc(sizeof(struct node)); strcpy(new->name, name); new->client_addr = *client_addr; new->client_status = E_CLIENT_ON_LINE; clock_gettime(CLOCK_REALTIME, &new->heart_beat_time); new->next = head->next; head->next = new; return ; } void delete_list(struct node *head, char *name) { if(!head) return; struct node *p = head->next; struct node *q = head; while (p != NULL){ if (strcmp(p->name, name) == 0) break; p = p->next; q = q->next; } q->next = p->next; p->next = NULL; free(p); return ; } void recv_message(int socket_fd, struct node *head) { struct message msg; struct sockaddr_in client_addr; int client_addrlen = sizeof(struct sockaddr); while (1) { memset(&msg, 0, sizeof(msg)); if (recvfrom(socket_fd, &msg, sizeof(msg), 0, (struct sockaddr *)&client_addr, &client_addrlen) < 0){ perror("failed to recvform client"); exit(-1); } switch(msg.type) { case CLIENT_LOGIN: client_login(socket_fd, head, &msg, &client_addr); break; case CLIENT_CHAT: client_chat(socket_fd, head, &msg); break; case CLIENT_QUIT: client_quit(socket_fd, head, &msg); break; case SERVER_CHAT: server_chat(socket_fd, head, &msg); break; case SERVER_QUIT: server_quit(socket_fd, head, &msg); break; case PRINT_ONLINE: print_online(socket_fd, head, &msg); break; case PRIVATE_CHAT: private_chat(socket_fd, head, &msg); break; case HEART_BEAT: heart_beat(socket_fd, head, &msg); break; default: break; } } return ; } void send_message(int socket_fd, struct sockaddr_in *server_addr, pid_t pid) { struct message msg; char buf[512]; signal(getppid(), father_func); while (1) { usleep(500); printf(">"); fgets(buf, sizeof(buf), stdin); buf[strlen(buf) - 1] = 0; strcpy(msg.mtext, buf); strcpy(msg.name , "server"); msg.type = SERVER_CHAT; if (strncmp(buf, "quit", 4) == 0){ msg.type = SERVER_QUIT; printf("udp server will quit, socket_fd = %d!!!!!
", socket_fd); if (sendto(socket_fd, &msg, sizeof(msg), 0, (struct sockaddr *)server_addr, sizeof(struct sockaddr)) < 0){ perror("failed to send server_quit message"); exit(-1); } //pause(); kill(pid, SIGKILL); waitpid(pid, NULL, 0); close(socket_fd); exit(-1); } if (sendto(socket_fd, &msg, sizeof(msg), 0, (struct sockaddr *)server_addr, sizeof(struct sockaddr)) < 0){ perror("failed to send server_chat message"); exit(-1); } } return ; } void client_login(int socket_fd, struct node *head, struct message *msg, struct sockaddr_in *client_addr) { printf("********Login In Notice********
"); printf("%s has Login In, you can talk to him/her.
", msg->name); printf("****************************
"); insert_list(head, msg->name, client_addr); brocast_msg(socket_fd, head, msg); return ; } void client_chat(int socket_fd, struct node *head, struct message *msg) { printf("********Group Chat********
"); printf("name: %s
", msg->name); printf("msg: %s
", msg->mtext); printf("**************************
"); brocast_msg(socket_fd, head, msg); return ; } void client_quit(int socket_fd, struct node *head, struct message *msg) { printf("*********Quit Msg********
"); printf("%s is Quit
", msg->name); printf("*************************
"); delete_list(head, msg->name); brocast_msg(socket_fd, head, msg); return ; } void server_chat(int socket_fd, struct node *head, struct message *msg) { printf("********Server Msg*********
"); printf("msg: %s
", msg->mtext); printf("***************************
"); brocast_msg(socket_fd, head, msg); return ; } void server_quit(int socket_fd, struct node *head, struct message *msg) { brocast_msg(socket_fd, head, msg); kill(getppid(), SIGUSR1); return ; } void print_online(int socket_fd, struct node *head, struct message *msg) { struct node *p = head->next; struct sockaddr_in my_addr; char buf[512]; printf("%s is request to print online client
", msg->name); memset(buf, 0, sizeof(buf)); while (p != NULL) { if (strcmp(p->name, msg->name) == 0){ my_addr = p->client_addr; p = p->next; continue; } strcat(buf, p->name); strcat(buf, " "); p = p->next; } strcpy(msg->mtext, buf); msg->type = PRINT_ONLINE; if (sendto(socket_fd, msg, sizeof(struct message), 0, (struct sockaddr *)&my_addr, sizeof(my_addr)) < 0){ perror("failed to send print_online message"); exit(-1); } return ; } void private_chat(int socket_fd, struct node *head, struct message *msg) { struct node *p = head->next; struct sockaddr_in *peer_addr = NULL; printf("******** Private Msg ********
"); printf("from %s
", msg->name); printf("to %s
", msg->peer_name); printf("msg: %s
", msg->mtext); printf("*****************************
"); while (p != NULL) { if (strcmp(p->name, msg->peer_name) == 0){ peer_addr = &(p->client_addr); break; } p = p->next; } if(!peer_addr){ printf("[Udp server] user %s is not in online list!!
", msg->peer_name); msg->type = SERVER_CHAT; sprintf(msg->mtext, "The one of %s who you want to talk is not exist!!!
", msg->peer_name); p = head->next; while (p != NULL){ if (strcmp(p->name, msg->name) == 0){ peer_addr = &(p->client_addr); break; } p = p->next; } } if(peer_addr){ if (sendto(socket_fd, msg, sizeof(struct message), 0, (struct sockaddr *)peer_addr, sizeof(struct sockaddr_in)) < 0){ perror("failed to send private message"); exit(-1); } } return ; } void heart_beat(int socket_fd, struct node *head, struct message *msg) { struct node *p = head->next; printf("%s heart beat ^^^
", msg->name); while (p != NULL) { if (strcmp(p->name, msg->name) == 0){ p->client_status = E_CLIENT_ON_LINE; clock_gettime(CLOCK_REALTIME, &p->heart_beat_time); } p = p->next; } return ; } void brocast_msg(int socket_fd, struct node *head, struct message *msg) { struct node *p = head->next; while(p != NULL) { if (msg->type == CLIENT_LOGIN){ if (strcmp(p->name, msg->name) == 0){ p = p->next; continue; } } sendto(socket_fd, msg, sizeof(struct message), 0, (struct sockaddr *)&(p->client_addr), sizeof(struct sockaddr)); p = p->next; } return ; }

クライアントコード:
#include   
#include   
#include   
#include   
#include   
#include   
#include   
#include  
#include 
#include 
#include 

#define UDP_PORT_NUM   (8888)

#define CLIENT_LOGIN    100  
#define CLIENT_CHAT     200  
#define CLIENT_QUIT     300  
  
#define SERVER_CHAT     400  
#define SERVER_QUIT     500  
  
#define PRINT_ONLINE    600  
#define PRIVATE_CHAT    700

#define HEART_BEAT      800

static char s_my_name[20];
static int s_server_fd;
static int s_epoll_fd = -1;

struct message  
{  
    long type;  
    char name[20];  
    char peer_name[20];  
    char mtext[512];  
};  
  
void recv_message(int );  
void send_message(int , struct sockaddr_in *, char *, pid_t);  
  
void login_msg(struct message *);  
void group_msg(struct message *);  
void quit_msg(struct message *);  
void server_msg(struct message *);  
void server_quit(void);  
void online_msg(struct message *);  
void private_msg(struct message *);  
  
void print_online(int , struct message *, struct sockaddr_in *);  
void group_chat(int , struct message *, struct sockaddr_in *);  
void private_chat(int ,struct message *, struct sockaddr_in *);  
void client_quit(int , struct message *, struct sockaddr_in *, pid_t );  

static void epollAddEvent(int epollFd, int addFd, int state)
{
	struct epoll_event ev;
	ev.events = state;
	ev.data.fd = addFd;
	epoll_ctl(epollFd, EPOLL_CTL_ADD, addFd, &ev);
	return;
}

void heart_beat(struct sockaddr_in *server_addr)  
{  
	struct message msg;  
    msg.type = HEART_BEAT;  
    strcpy(msg.name, s_my_name);  
  
    if (sendto(s_server_fd, &msg, sizeof(struct message), 0,   
                (struct sockaddr *)server_addr, sizeof(struct sockaddr)) < 0){  
        perror("failed to send online message");  
        return;  
    }  
  
    return ;  
}

void* heart_beat_thread(void* user)
{
	struct sockaddr_in *server_addr = (struct sockaddr_in*)user;
	struct epoll_event events[1] = {0};
	int cnt = 0;
	while(1)
	{
		int fireEvents = epoll_wait(s_epoll_fd, events, 1, -1);
		if(fireEvents > 0){
			printf("I heart beat
"); uint64_t exp; ssize_t size = read(events[0].data.fd, &exp, sizeof(uint64_t)); if(size != sizeof(uint64_t)) { printf("read error!
"); } heart_beat(server_addr); } else{ printf("fireEvents = %d", fireEvents); } } return NULL; } int main(int argc, char *argv[]) { pid_t pid; int server_fd; struct sockaddr_in server_addr; if (argc < 4){ fprintf(stderr, "usages: %s ip name heart_beat(seconds)
", argv[0]); exit(-1); } int heart_beat_period = atoi(argv[3]); if ((server_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0){ perror("failed to create server_fd"); exit(-1); } s_server_fd = server_fd; server_addr.sin_family = AF_INET; server_addr.sin_port = htons(UDP_PORT_NUM); server_addr.sin_addr.s_addr = inet_addr(argv[1]); if ((pid = fork()) < 0){ perror("failed to fork pid"); exit(-1); } memset(s_my_name, '\0', 20); memcpy(s_my_name, argv[2], sizeof(char) * (strlen(argv[2]) + 1)); if (pid == 0) { /* create epoll */ int epollFd = -1; epollFd = epoll_create(1); s_epoll_fd = epollFd; /* create timer */ int timerFd = -1; timerFd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK); /* add timer fd to epoll monitor event */ epollAddEvent(epollFd, timerFd, EPOLLIN); /* create heart beat thread */ pthread_t threadId = -1; pthread_create(&threadId, NULL, &heart_beat_thread, (void*)&server_addr); /* create timer, time out is heart beat seconds */ struct itimerspec its; its.it_interval.tv_sec = heart_beat_period; its.it_interval.tv_nsec = 0; its.it_value.tv_sec = heart_beat_period; its.it_value.tv_nsec = 0; if (timerfd_settime(timerFd, 0, &its, NULL) < 0) { fprintf(stderr, "timerfd_settime error
"); } recv_message(server_fd); } else { send_message(server_fd, &server_addr, argv[2], pid); } return 0; } void recv_message(int server_fd) { struct message msg; while (1) { memset(&msg, 0, sizeof(msg)); int nbytes = 0; if ((nbytes = recvfrom(server_fd, &msg, sizeof(msg), 0, NULL, NULL)) < 0){ perror("failed to recv server message"); exit(-1); } switch(msg.type) { case CLIENT_LOGIN: login_msg(&msg); break; case CLIENT_CHAT: group_msg(&msg); break; case CLIENT_QUIT: quit_msg(&msg); break; case SERVER_CHAT: server_msg(&msg); break; case SERVER_QUIT: server_quit(); break; case PRINT_ONLINE: online_msg(&msg); break; case PRIVATE_CHAT: private_msg(&msg); break; default: break; } } return ; } void send_message(int server_fd, struct sockaddr_in *server_addr, char *name, pid_t pid) { struct message msg; char buf[512]; int c; msg.type = CLIENT_LOGIN; strcpy(msg.name, name); if (sendto(server_fd, &msg, sizeof(msg), 0, (struct sockaddr *)server_addr, sizeof(struct sockaddr)) < 0){ perror("failed to send login message"); exit(-1); } while(1) { usleep(500); printf("#####################################
"); printf("the following options can be choose:
"); printf("input 1 is to print online client
"); printf("input 2 is to chat to all the client
"); printf("input 3 is to chat to only one client
"); printf("input q is to quit
"); printf("#####################################
"); printf(">"); c = getchar(); while (getchar() != '
'); switch(c) { case '1': print_online(server_fd, &msg, server_addr); break; case '2': printf("[Notice:]you can input cmd to close group chat....
"); group_chat(server_fd, &msg, server_addr); break; case '3': printf("[Notice:]you can input cmd to close private chat....
"); private_chat(server_fd, &msg, server_addr); break; case 'q': client_quit(server_fd, &msg, server_addr, pid); break; default: break; } } return ; } void login_msg(struct message *msg) { printf("********Login In Notice********
"); printf("%s has Login In, you can talk to him/her.
", msg->name); printf("****************************
"); return ; } void group_msg(struct message *msg) { printf("******** Group Msg ********
"); printf("name: %s
", msg->name); printf("msg: %s
", msg->mtext); printf("******** Group Msg ********
"); return ; } void quit_msg(struct message *msg) { printf("######## Quit Msg ########
"); printf("%s is Quit
", msg->name); printf("######## Quit Msg ########
"); return ; } void server_msg(struct message *msg) { printf("******** Server Msg ********
"); printf("msg: %s
", msg->mtext); printf("******** Server Msg ********
"); return ; } void server_quit(void ) { kill(getppid(), SIGKILL); exit(0); } void online_msg(struct message *msg) { printf("******** Clients you can talk to. ********
"); printf("Clients: %s
", msg->mtext); printf("******** Clients you can talk to. ********
"); return ; } void private_msg(struct message *msg) { printf("******** Private Msg ********
"); printf("[%s] say: %s
", msg->name, msg->mtext); printf("******** Private Msg ********
"); return ; } void print_online(int server_fd, struct message *msg, struct sockaddr_in *server_addr) { msg->type = PRINT_ONLINE; if (sendto(server_fd, msg, sizeof(struct message), 0, (struct sockaddr *)server_addr, sizeof(struct sockaddr)) < 0){ perror("failed to send online message"); exit(-1); } return ; } void group_chat(int server_fd, struct message *msg, struct sockaddr_in *server_addr) { char buf[512]; memset(buf, 0, sizeof(buf)); printf("*********** Group Chat Room ***********
"); printf("****** Welcome to group chat room ******
"); printf("* if you want to quit, please input quit
"); printf("****************************************
"); while(1) { memset(buf, 0, sizeof(buf)); usleep(500); printf(">"); fgets(buf, sizeof(buf), stdin); buf[strlen(buf) - 1] = 0; msg->type = CLIENT_CHAT; strcpy(msg->mtext, buf); if (strncmp(buf, "quit", 4) == 0) break; if (sendto(server_fd, msg, sizeof(struct message), 0, (struct sockaddr *)server_addr, sizeof(struct sockaddr)) < 0){ perror("failed to send group message"); exit(-1); } } return ; } void private_chat(int server_fd, struct message *msg, struct sockaddr_in *server_addr) { char name[20]; char buf[512]; memset(name, 0, sizeof(name)); printf("please input the peer_name
"); printf(">"); fgets(name, sizeof(name), stdin); name[strlen(name) - 1] = 0; if(strncmp(s_my_name, name, strlen(name)) == 0){ printf("you can not talk with yourself!!!!!
"); return; } strcpy(msg->peer_name, name); msg->type = PRIVATE_CHAT; printf("you want to talk to %s!
", msg->peer_name); while(1) { usleep(500); printf(">"); fgets(buf, sizeof(buf), stdin); buf[strlen(buf) - 1] = 0; strcpy(msg->mtext, buf); if (strncmp(buf, "quit", 4) == 0) break; if (sendto(server_fd, msg, sizeof(struct message), 0, (struct sockaddr *)server_addr, sizeof(struct sockaddr)) < 0){ perror("failed to send private message"); exit(-1); } } return ; } void client_quit(int server_fd, struct message *msg, struct sockaddr_in *server_addr, pid_t pid) { msg->type = CLIENT_QUIT; if (sendto(server_fd, msg, sizeof(struct message), 0, (struct sockaddr *)server_addr, sizeof(struct sockaddr)) < 0){ perror("failed to send quit message"); exit(-1); } kill(pid, SIGKILL); waitpid(pid, NULL, 0); exit(0); }