C++ゼロからブロックチェーンを開始:P 2 Pモジュールのノード登録と傍受
13010 ワード
ThreadPoolはスレッドプールで、具体的な実装は貼らないで、勝手にスレッドプールを探して実装すればいいです.ここをスタンプしてプログラムの完全なコードを見ることもできます.
P2PNode::P2PNode(const char *if_name)
{
m_sock = socket(AF_INET, SOCK_DGRAM, 0);//IPV4 SOCK_DGRAM (UDP )
if (m_sock < 0)
{
perror("socket
");
return ;
}
int val = 1;
if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) < 0)
{
perror("setsockopt");
}
struct timeval tv_out;
tv_out.tv_sec = 10;// 10
tv_out.tv_usec = 0;
if (setsockopt(m_sock, SOL_SOCKET, SO_RCVTIMEO, &tv_out, sizeof(tv_out)) < 0)
{
perror("setsockopt");
}
memset(&m_serverAddr, 0, sizeof(m_serverAddr));
m_serverAddr.sin_family = AF_INET;
m_serverAddr.sin_port = htons(SERVERPORT);
m_serverAddr.sin_addr.s_addr = inet_addr(SERVERIP);
socklen_t serverLen = sizeof(m_serverAddr);
memset(&m_localAddr, 0, sizeof(m_localAddr));
m_localAddr.sin_family = AF_INET;
m_localAddr.sin_port = htons(LOCALPORT);
m_localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(m_sock, (struct sockaddr*)&m_localAddr, sizeof(m_localAddr)) < 0)
{
perror("bind");
close(m_sock);
exit(2);
}
memset(&m_recvAddr, 0, sizeof(m_recvAddr));
socklen_t recvLen = sizeof(m_recvAddr);
NodeInfo sendNode, recvNode;
memset(&sendNode, 0, sizeof(NodeInfo));
memset(&recvNode, 0, sizeof(NodeInfo));
sendNode.cmd = cmd_register;
sendNode.node.recvPort = LOCALPORT;
get_local_ip(if_name, sendNode.node.recvIp);
while (1)
{
if (sendto(m_sock, &sendNode, sizeof(NodeInfo), 0, (struct sockaddr*)&m_serverAddr, serverLen) < 0)
{
perror("sendto");
close(m_sock);
exit(4);
}
memset(&m_recvAddr, 0, sizeof(m_recvAddr));
memset(&recvNode, 0, sizeof(NodeInfo));
if (recvfrom(m_sock, &recvNode, sizeof(NodeInfo), 0, (struct sockaddr*)&m_recvAddr, &recvLen) < 0)
{
if (errno == EINTR || errno == EAGAIN)
continue;
perror("recvfrom");
close(m_sock);
exit(2);
}
if (recvNode.cmd != cmd_register)
continue;
m_selfNode = recvNode.node;
break;
}
while (1)
{
memset(&sendNode, 0, sizeof(NodeInfo));
memset(&recvNode, 0, sizeof(NodeInfo));
memset(&m_recvAddr, 0, sizeof(m_recvAddr));
sendNode.cmd = cmd_getnode;
sendNode.node = m_selfNode;
if (sendto(m_sock, &sendNode, sizeof(NodeInfo), 0, (struct sockaddr*)&m_serverAddr, serverLen) < 0)
{
perror("sendto");
close(m_sock);
exit(4);
}
if (recvfrom(m_sock, &recvNode, sizeof(NodeInfo), 0, (struct sockaddr*)&m_recvAddr, &recvLen) < 0)
{
if (errno == EINTR || errno == EAGAIN)
continue;
perror("recvfrom");
close(m_sock);
exit(2);
}
if (recvNode.cmd != cmd_getnode)
continue;
m_otherNode = recvNode.node;
break;
}
// IP IP ,
// IP IP ,
// ,
if (strcmp(m_selfNode.queryIp, m_selfNode.recvIp) &&
!strcmp(m_selfNode.queryIp, m_otherNode.queryIp))
{
m_otherIP = m_otherNode.recvIp;
m_otherPort = m_otherNode.recvPort;
}
else
{
m_otherIP = m_otherNode.queryIp;
m_otherPort = m_otherNode.queryPort;
}
pthread_mutex_init(&m_mutexPack, NULL);
pthread_mutex_init(&m_mutexResult, NULL);
}
P2PNode::~P2PNode()
{
NodeInfo sendNode;
memset(&sendNode, 0, sizeof(NodeInfo));
socklen_t serverLen = sizeof(m_serverAddr);
sendNode.cmd = cmd_unregister;
sendNode.node = m_selfNode;
if (sendto(m_sock, &sendNode, sizeof(NodeInfo), 0, (struct sockaddr*)&m_serverAddr, serverLen) < 0)
{
perror("sendto");
close(m_sock);
exit(4);
}
close(m_sock);
pthread_mutex_destroy(&m_mutexPack);
pthread_mutex_destroy(&m_mutexResult);
}
P2PNode* P2PNode::Instance(const char *if_name)
{
static P2PNode node(if_name);
return &node;
}
void P2PNode::Listen()
{
if (pthread_create(&m_tid, NULL, threadFunc, this) != 0)
{
close(m_sock);
exit(2);
}
}
void *P2PNode::threadFunc(void *arg)
{
P2PNode *p = (P2PNode*)arg;
p->threadHandler();
return NULL;
}
void P2PNode::threadHandler()
{
sockaddr_in otherAddr;
memset(&otherAddr, 0, sizeof(otherAddr));
otherAddr.sin_family = AF_INET;
otherAddr.sin_port = htons(m_otherPort);
otherAddr.sin_addr.s_addr = inet_addr(m_otherIP);
P2PMessage recvMess;
P2PMessage sendMess;
P2PResult result;
socklen_t recvLen = sizeof(m_recvAddr);
ThreadPool tp(1);
tp.setTaskFunc(this, &P2PNode::combinationPackage);
tp.start();
while (1)
{
memset(&recvMess, 0, sizeof(P2PMessage));
memset(&sendMess, 0, sizeof(P2PMessage));
memset(&result, 0, sizeof(P2PResult));
memset(&m_recvAddr, 0, sizeof(m_recvAddr));
if (recvfrom(m_sock, &recvMess, sizeof(P2PMessage), 0, (struct sockaddr*)&m_recvAddr, &recvLen) < 0)
{
if (errno == EINTR || errno == EAGAIN)
continue;
perror("recvfrom");
tp.stop();
close(m_sock);
exit(2);
}
if (recvMess.cmd == p2p_result)
{
memcpy(&result, recvMess.mess, recvMess.length);
pthread_mutex_lock(&m_mutexResult);
m_lstResult.push_back(result);
pthread_mutex_unlock(&m_mutexResult);
continue;
}
tp.addTask(recvMess);
result.index = recvMess.index;
memcpy(result.messHash, recvMess.messHash, sizeof(recvMess.messHash));
sendMess.cmd = p2p_result;
sendMess.length = sizeof(result);
memcpy(sendMess.mess, &result, sendMess.length);
if (sendto(m_sock, &sendMess, sizeof(sendMess), 0, (struct sockaddr*)&m_recvAddr, sizeof(m_recvAddr)) < 0)
{
perror("sendto");
tp.stop();
close(m_sock);
exit(4);
}
}
tp.stop();
}