Note on JGroup Cluster of TCP_NIO Transport
11048 ワード
Our application need to meet the reliable communication and the least data loss requirements for the JGroup cluster. The JGroup-2.6.1 release provides some out-of-box configurations in xml format. I choose the tcp-nio.xml configuration, because the inefficiency of the traditional IO model and unreliable data transmission of UDP. In this article I will try to explain why we prefer TCP-NIO method and then the configuration details from tcp-nio.xml.
1
1.1
Why TCP-NIO?
Traditional I/O classes have primarily been stream-oriented, often invoking methods on several layers of objects to handle individual bytes or characters.
This object-oriented approach, composing behaviors by plugging I/O objects together, offering tremendous flexibility but can be a performance killer when large amounts of data must be handled. Efficiency is the goal of I/O, and efficient I/O often doesn't map well to objects. Efficient I/O usually means that you must take the shortest path from Point A to Point B. Complexity destroys performance when doing high-volume I/O.
The traditional I/O abstractions of the Java platform have served well and are appropriate for a wide range of uses. But these classes do not scale well when moving large amounts of data, nor do they provide some common I/O functionality widely available on most operating systems today. These features — such as file locking, non-blocking I/O, readiness selection, and memory mapping — are essential for scalability and may be required to interact properly with non-Java applications, especially at the enterprise level. The classic Java I/O mechanism doesn't model these common I/O services.
Java Specification Request#51 (http://jcp.org/jsr/detail/51.jsp) details the need for high-speed, scalable I/O, which better leverages the I/O capabilities of the underlying operating system.
—from O’Reilly Java NIO
1.2
Why not UDP?
Our application might need to go over WAN, and there is chance that some routers over become overloaded which result in discarding packets. The Order the router drops a packet is Broadcast, Multicast, and finally Unicast.
2
2.1 Overview
JGroups uses a JChannel as the main API to connect to a cluster, send and receive messages, and to register listeners that are called when things (such as member joins) happen.
What is sent around are Messages, which contain a byte buffer (the payload), plus the sender's and receiver's address. Addresses are subclasses of org.jgroups.Address, and usually contain an IP address plus a port.
To join a cluster, we'll use a JChannel. An instance of JChannel is created with a configuration (e.g. an XML file) which defines the properties of the channel.
The protocol stack contains a number of protocol layers in a bidirectional list. All messages sent and received over the channel have to pass through the protocol stack. Every layer may modify, reorder, pass or drop a message, or add a header to a message.
The protocol stack in the xml configuration file outlines the protocols bottom-up, take tcp-nio,xml for example, bottom protocol is TCP_NIO and the upmost protocol is pbcast.STREAMING_STATE_TRANSFER. The order shouldn’t be changed.
2.2
Let’s take a look at the configuration:
1) TCP is a replacement of UDP as bottom layer in cases where IP Multicast based on UDP is not desired. This may be the case when operating over a WAN, where routers will discard IP MCAST. As a rule of thumb UDP is used as transport for LANs, whereas TCP is used for WANs. Whereas TCPNIO is a replacement of TCP, TCP_NIO performs much better because it fully take advantage of the NIO services.
2) TCPPING The TCPPING protocol layer retrieves the initial membership in answer to the GMS's FIND_INITIAL_MBRS event.
3) MERGE2 If a group gets split for some reasons (e.g. network partition), this protocol merges the subgroups back into one group.
4) FD_SOCK Failure detection protocol based on a ring of TCP sockets created between group members.
5) FD Failure detection based on heartbeat messages. If reply is not received without timeout ms, max_tries times, a member is declared suspected, and will be excluded by GMS. If we use FD_SOCK instead, then we don't send heartbeats, but establish TCP sockets and declare a member dead only when a socket is closed.
6) VERIFY_SUBSPECT Verifies that a suspected member is really dead by pinging that member once again. Drops suspect message if member does respond. Tries to minimize false suspicions.
7) pbcast.NAKACK Lossless and FIFO delivery of multicast messages, using negative acks. E.g. when receiving P:1, P:3, P:4, a receiver asks P for retransmission of message 2.
8) VIEW_SYNC Periodically sends the view to the group. When a view is received which is greater than the current view, we install it. Otherwise we simply discard it. This is used to solve the problem for unreliable view dissemination outlined in JGroups/doc/ReliableViewInstallation.txt. This protocol is supposed to be just below GMS.
9) pbcast.GMS Group Membership Service. Responsible for joining/leaving members. Also handles suspected members, and excludes them from the membership. Sends Views (topology configuration) to all members when a membership change has occurred.
10) FC Credit based flow control protocol. When senders send faster than receivers can process the messages, over time the receivers will run out of memory. Therefore the senders have to be throttled to the fastest rate at which all receivers can process messages.
11) FRAG2 Fragments messages larger than frag_size bytes
12) STREAMING_STATE_TRANSFER In order to transfer application state to a joining member of a group pbcast.STATE_TRANSFER has to load entire
state into memory and send it to a joining member. Major limitation of this approach is that the state transfer that isvery large (>1Gb) would likely result in OutOfMemoryException. In order to alleviate this problem a new state transfer methodology, based on a streaming state transfer
Details descriptions of each protocols and its configuration parameter can be found on
JGroup WIKI
2.2 Notes on some important parameters:
2.2.1 bind_addr on TCP-NIO
On which address JGroup to create the server socket. Though optional, it’s Useful in case of a multi-homed machine and you just want it bind on the specific address. If not specified, the server socket will be created on all available interfaces
2.2.2 initial_hosts on TCPPING
The comma-separated lists of hosts names and ports used to connect to get the initial memberships. Note that there should not be spaces between the entries in the list. TCPPING.initial_hosts should contain the universe of all possible members. If you decide to add a new member who isn't listed in TCP.initial_hosts, that node will still be able to join, but you may have the problem
For example, there are 192.168.1.1, 192.168.1.2, 192.168.1.3 and 192.168.1.4 starting at port 7800 in the cluster:
Your TCPPING’s configuration should be :
All the other protocols and their respective parameters can remain untouched.
1
1.1
Why TCP-NIO?
Traditional I/O classes have primarily been stream-oriented, often invoking methods on several layers of objects to handle individual bytes or characters.
This object-oriented approach, composing behaviors by plugging I/O objects together, offering tremendous flexibility but can be a performance killer when large amounts of data must be handled. Efficiency is the goal of I/O, and efficient I/O often doesn't map well to objects. Efficient I/O usually means that you must take the shortest path from Point A to Point B. Complexity destroys performance when doing high-volume I/O.
The traditional I/O abstractions of the Java platform have served well and are appropriate for a wide range of uses. But these classes do not scale well when moving large amounts of data, nor do they provide some common I/O functionality widely available on most operating systems today. These features — such as file locking, non-blocking I/O, readiness selection, and memory mapping — are essential for scalability and may be required to interact properly with non-Java applications, especially at the enterprise level. The classic Java I/O mechanism doesn't model these common I/O services.
Java Specification Request#51 (http://jcp.org/jsr/detail/51.jsp) details the need for high-speed, scalable I/O, which better leverages the I/O capabilities of the underlying operating system.
—from O’Reilly Java NIO
1.2
Why not UDP?
Our application might need to go over WAN, and there is chance that some routers over become overloaded which result in discarding packets. The Order the router drops a packet is Broadcast, Multicast, and finally Unicast.
2
2.1 Overview
JGroups uses a JChannel as the main API to connect to a cluster, send and receive messages, and to register listeners that are called when things (such as member joins) happen.
What is sent around are Messages, which contain a byte buffer (the payload), plus the sender's and receiver's address. Addresses are subclasses of org.jgroups.Address, and usually contain an IP address plus a port.
To join a cluster, we'll use a JChannel. An instance of JChannel is created with a configuration (e.g. an XML file) which defines the properties of the channel.
The protocol stack contains a number of protocol layers in a bidirectional list. All messages sent and received over the channel have to pass through the protocol stack. Every layer may modify, reorder, pass or drop a message, or add a header to a message.
The protocol stack in the xml configuration file outlines the protocols bottom-up, take tcp-nio,xml for example, bottom protocol is TCP_NIO and the upmost protocol is pbcast.STREAMING_STATE_TRANSFER. The order shouldn’t be changed.
2.2
Let’s take a look at the configuration:
<config>
<TCP_NIO
bind_addr="localhost"
start_port="7800"
loopback="true"
recv_buf_size="20000000"
send_buf_size="640000"
discard_incompatible_packets="true"
max_bundle_size="64000"
max_bundle_timeout="30"
use_incoming_packet_handler="true"
enable_bundling="true"
use_send_queues="false"
sock_conn_timeout="300"
skip_suspected_members="true"
use_concurrent_stack="true"
thread_pool.enabled="true"
thread_pool.min_threads="1"
thread_pool.max_threads="25"
thread_pool.keep_alive_time="5000"
thread_pool.queue_enabled="false"
thread_pool.queue_max_size="100"
thread_pool.rejection_policy="Run"
oob_thread_pool.enabled="true"
oob_thread_pool.min_threads="1"
oob_thread_pool.max_threads="8"
oob_thread_pool.keep_alive_time="5000"
oob_thread_pool.queue_enabled="false"
oob_thread_pool.queue_max_size="100"
oob_thread_pool.rejection_policy="Run"
reader_threads="3"
writer_threads="3"
processor_threads="0"
processor_minThreads="0"
processor_maxThreads="0"
processor_queueSize="100"
processor_keepAliveTime="9223372036854775807"/>
<TCPPING timeout="3000"
initial_hosts="${jgroups.tcpping.initial_hosts:localhost[7800],localhost[7801]}"
port_range="1"
num_initial_members="3"/>
<MERGE2 max_interval="100000"
min_interval="20000"/>
<FD_SOCK/>
<FD timeout="10000" max_tries="5" shun="true"/>
<VERIFY_SUSPECT timeout="1500" />
<pbcast.NAKACK
use_mcast_xmit="false" gc_lag="0"
retransmit_timeout="300,600,1200,2400,4800"
discard_delivered_msgs="true"/>
<pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
max_bytes="400000"/>
<VIEW_SYNC avg_send_interval="60000"/>
<pbcast.GMS print_local_addr="true" join_timeout="3000"
shun="true"
view_bundling="true"/>
<FC max_credits="2000000"
min_threshold="0.10"/>
<FRAG2 frag_size="60000" />
<pbcast.STREAMING_STATE_TRANSFER/>
</config>
1) TCP is a replacement of UDP as bottom layer in cases where IP Multicast based on UDP is not desired. This may be the case when operating over a WAN, where routers will discard IP MCAST. As a rule of thumb UDP is used as transport for LANs, whereas TCP is used for WANs. Whereas TCPNIO is a replacement of TCP, TCP_NIO performs much better because it fully take advantage of the NIO services.
2) TCPPING The TCPPING protocol layer retrieves the initial membership in answer to the GMS's FIND_INITIAL_MBRS event.
3) MERGE2 If a group gets split for some reasons (e.g. network partition), this protocol merges the subgroups back into one group.
4) FD_SOCK Failure detection protocol based on a ring of TCP sockets created between group members.
5) FD Failure detection based on heartbeat messages. If reply is not received without timeout ms, max_tries times, a member is declared suspected, and will be excluded by GMS. If we use FD_SOCK instead, then we don't send heartbeats, but establish TCP sockets and declare a member dead only when a socket is closed.
6) VERIFY_SUBSPECT Verifies that a suspected member is really dead by pinging that member once again. Drops suspect message if member does respond. Tries to minimize false suspicions.
7) pbcast.NAKACK Lossless and FIFO delivery of multicast messages, using negative acks. E.g. when receiving P:1, P:3, P:4, a receiver asks P for retransmission of message 2.
8) VIEW_SYNC Periodically sends the view to the group. When a view is received which is greater than the current view, we install it. Otherwise we simply discard it. This is used to solve the problem for unreliable view dissemination outlined in JGroups/doc/ReliableViewInstallation.txt. This protocol is supposed to be just below GMS.
9) pbcast.GMS Group Membership Service. Responsible for joining/leaving members. Also handles suspected members, and excludes them from the membership. Sends Views (topology configuration) to all members when a membership change has occurred.
10) FC Credit based flow control protocol. When senders send faster than receivers can process the messages, over time the receivers will run out of memory. Therefore the senders have to be throttled to the fastest rate at which all receivers can process messages.
11) FRAG2 Fragments messages larger than frag_size bytes
12) STREAMING_STATE_TRANSFER In order to transfer application state to a joining member of a group pbcast.STATE_TRANSFER has to load entire
state into memory and send it to a joining member. Major limitation of this approach is that the state transfer that isvery large (>1Gb) would likely result in OutOfMemoryException. In order to alleviate this problem a new state transfer methodology, based on a streaming state transfer
Details descriptions of each protocols and its configuration parameter can be found on
JGroup WIKI
2.2 Notes on some important parameters:
2.2.1 bind_addr on TCP-NIO
On which address JGroup to create the server socket. Though optional, it’s Useful in case of a multi-homed machine and you just want it bind on the specific address. If not specified, the server socket will be created on all available interfaces
2.2.2 initial_hosts on TCPPING
The comma-separated lists of hosts names and ports used to connect to get the initial memberships. Note that there should not be spaces between the entries in the list. TCPPING.initial_hosts should contain the universe of all possible members. If you decide to add a new member who isn't listed in TCP.initial_hosts, that node will still be able to join, but you may have the problem
For example, there are 192.168.1.1, 192.168.1.2, 192.168.1.3 and 192.168.1.4 starting at port 7800 in the cluster:
Your TCPPING’s configuration should be :
<TCPPING timeout="3000"
initial_hosts="${jgroups.tcpping.initial_hosts: 192.168.1.1 [7800], 192.168.1.2 [7800], 192.168.1.3 [7800], 192.168.1.4 [7800]}"
port_range="1"
num_initial_members="4"/>
All the other protocols and their respective parameters can remain untouched.