Nettyインスタンスの数
183695 ワード
Nettyインスタンスの数
NettyはJDK NIOベースのネットワークフレームワーク
NIOプログラミングをシンプル化し、プログラムを使わずにselectorを自分でメンテナンスし、ネットワーク通信とデータ処理の部分を分離
下層のデータ通信、心拍検出(keepalived)に多く用いられる
1.データ通信
1.1 Hello World
SO_BACKLOG詳細:サーバのTCPカーネルメンテナンスの2つのキューAとBクライアントがサービス側にconnectを要求した場合、SYN(1回目の握手)を送信サービス側がSYNを受け取った後、クライアントにSYN ACK(2回目の握手)を送信し、 TCPカーネルは、キューAクライアントに接続を入れて受信し、サービス側にACKを送信する(3回目の握手)、 TCPカーネルはA->B、acceptから接続を戻し、接続完了A/Bキューの長さとBACKLOGであり、accept速度が追いつかない場合、A/BキューがBACKLOGをいっぱいにすると、クライアント接続がTCPカーネルに拒否されてbacklogを大きくして緩和できるという現象を経験値~100
1.2パッケージの取り外しの問題
TCP/IPはパケットの転送,パケットの順序などを確保しているが,プログラミングではパケットの取り外し粘着問題の解決が必要である.
->受信した一連のパケットのデータ、処理の区切りはどこですか? 基本的なソリューション:
1)終了区切り文字として特殊文字
2)メッセージの長さを決める.パケットの長さを固定して、長さはスペースで補完するのに足りない.受信者はtrimを必要として、効率が高くなくて推薦しない
3)カスタムプロトコル.メッセージヘッダにメッセージの全長のフィールドを含める.セキュリティが必要な場合に考慮する.
特殊文字
ていちょう
1.3コーディング
すなわち、オブジェクトのネットワーク伝送とローカル持続化を実現するためにjavaのシーケンス化を用いると符号ストリームが大きいため、Marshalling,Kyro(Protobufベース)を用いることが多い
次の例では、符号化伝送javabean(Marshalling javabeanはserializableを実装する必要がある)を用いて、messageをgzip圧縮する
カスタムコーデック
javabean
GZip圧縮Util
サービス側とクライアント
1.4長接続/短接続
1.長い接続で、一貫して接続が自発的に中断されず、リアルタイム性が強い2.短い接続である.データはキャッシュに入れ、一度にすべてのデータを一括して提出し、サービス側が受信した後に接続を閉じる以上の2種類は、ChannelHandlerContextにChannelFutureListener.celOSEリスナーを追加するかどうかによって実現する
3.長い接続、一定時間アクティブでない場合は接続を閉じる.SocketChannelにReadTimeoutHandler実装を追加する.例は以下の通りである.
1.5 UDP使用(少ない使用)
2.心拍数検査
クラスタ内のプライマリサーバはスレーブサーバの状態を知る必要があるためclientは5~10秒おきにサーバにハートビートパケットを送信する
nettyとタイミングタスクで実現可能
3. HTTP
3.1 Hello World
3.2 HTTPダウンロードファイル
3.3 HTTPアップロードファイル(少ない使用)
実際のアプリケーションでは、ファイルアップロードサービス側に成熟したフレームワークfastDFS(小さなファイル)とHDFS(大きなファイル)があります.
ブレークポイントの継続を実現するには、アップロードの進捗を記録する必要があります.HTTPヘッダのRangeとContent-Langeを参照してください.
NettyはJDK NIOベースのネットワークフレームワーク
NIOプログラミングをシンプル化し、プログラムを使わずにselectorを自分でメンテナンスし、ネットワーク通信とデータ処理の部分を分離
下層のデータ通信、心拍検出(keepalived)に多く用いられる
1.データ通信
1.1 Hello World
public class Server {
public static void main(String[] args) throws Exception {
// 1
//
// ( )
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();
// 2 ServerBootstrap,
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup) //
.channel(NioServerSocketChannel.class) // NIO .NioServerSocketChannel TCP, NioDatagramChannel UDP
.option(ChannelOption.SO_BACKLOG, 1024) // TCP
.option(ChannelOption.SO_SNDBUF, 32 * 1024) //
.option(ChannelOption.SO_RCVBUF, 32 * 1024) //
.option(ChannelOption.SO_KEEPALIVE, true) //
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel sc) throws Exception { //SocketChannel
// 3 , addLast ...
sc.pipeline().addLast(new ServerHandler());
}
});
// 4 , bind future( ), sync
ChannelFuture cf1 = b.bind(8765).sync();
//ChannelFuture cf2 = b.bind(8764).sync(); //
// 5 , sync
cf1.channel().closeFuture().sync();
//cf2.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}
SO_BACKLOG詳細:サーバのTCPカーネルメンテナンスの2つのキューAとBクライアントがサービス側にconnectを要求した場合、SYN(1回目の握手)を送信サービス側がSYNを受け取った後、クライアントにSYN ACK(2回目の握手)を送信し、 TCPカーネルは、キューAクライアントに接続を入れて受信し、サービス側にACKを送信する(3回目の握手)、 TCPカーネルはA->B、acceptから接続を戻し、接続完了A/Bキューの長さとBACKLOGであり、accept速度が追いつかない場合、A/BキューがBACKLOGをいっぱいにすると、クライアント接続がTCPカーネルに拒否されてbacklogを大きくして緩和できるという現象を経験値~100
public class ServerHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("server channel active... ");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
System.out.println("Server :" + body );
String response = " :" + body ;
ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
// future , ( ). , server . ctx[.channel()].close()
//.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
System.out.println(" ");
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)
throws Exception {
ctx.close();
}
}
public class Client {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
//ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync(); //
// , Buffer . write flush , writeFlush
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
Thread.sleep(2000);
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
//cf2.channel().writeAndFlush(Unpooled.copiedBuffer("999".getBytes()));
cf1.channel().closeFuture().sync();
//cf2.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
public class ClientHandler extends ChannelHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
System.out.println("Client :" + body );
} finally {
// xxxHandler msg : (write) , msg ; ,!
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
}
1.2パッケージの取り外しの問題
TCP/IPはパケットの転送,パケットの順序などを確保しているが,プログラミングではパケットの取り外し粘着問題の解決が必要である.
->受信した一連のパケットのデータ、処理の区切りはどこですか? 基本的なソリューション:
1)終了区切り文字として特殊文字
2)メッセージの長さを決める.パケットの長さを固定して、長さはスペースで補完するのに足りない.受信者はtrimを必要として、効率が高くなくて推薦しない
3)カスタムプロトコル.メッセージヘッダにメッセージの全長のフィールドを含める.セキュリティが必要な場合に考慮する.
特殊文字
public class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_SNDBUF, 32*1024)
.option(ChannelOption.SO_RCVBUF, 32*1024)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// DelimiterBasedFrameDecoder $_
ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
// . StringDecoder, Handler msg String ( ByteBuffer). ByteBuffer
sc.pipeline().addLast(new StringDecoder());
//
sc.pipeline().addLast(new ServerHandler());
}
});
//4
ChannelFuture cf = b.bind(8765).sync();
//
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}
public class ServerHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(" server channel active... ");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Server :" + msg);
String response = " : " + msg + "$_";
ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {
ctx.close();
}
}
public class Client {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("bbbb$_".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("cccc$_".getBytes()));
cf.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
public class ClientHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client channel active... ");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
String response = (String) msg;
System.out.println("Client: " + response);
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
ていちょう
public class Server {
public static void main(String[] args) throws Exception{
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_SNDBUF, 32*1024)
.option(ChannelOption.SO_RCVBUF, 32*1024)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// , 5, 5
sc.pipeline().addLast(new FixedLengthFrameDecoder(5));
//
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture cf = b.bind(8765).sync();
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}
public class ServerHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(" server channel active... ");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String request = (String)msg;
System.out.println("Server :" + msg);
String response = request ;
ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {
}
}
public class Client {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new FixedLengthFrameDecoder(5));
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("aaa".getBytes()));
cf.channel().writeAndFlush(Unpooled.copiedBuffer("bbccccc".getBytes()));
cf.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
public class ClientHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client channel active... ");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String response = (String) msg;
System.out.println("Client: " + response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
}
}
1.3コーディング
すなわち、オブジェクトのネットワーク伝送とローカル持続化を実現するためにjavaのシーケンス化を用いると符号ストリームが大きいため、Marshalling,Kyro(Protobufベース)を用いることが多い
次の例では、符号化伝送javabean(Marshalling javabeanはserializableを実装する必要がある)を用いて、messageをgzip圧縮する
カスタムコーデック
public final class MarshallingCodeCFactory {
//
public static MarshallingDecoder buildMarshallingDecoder() {
// , serial java
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
// , 5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
// provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
// . provider, ,
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
return decoder;
}
//
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
// . Serializable JavaBean
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
javabean
public class Request implements Serializable { // Serializable
private String id ;
private String name ;
private String requestMessage ;
private byte[] attachment;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getRequestMessage() {
return requestMessage;
}
public void setRequestMessage(String requestMessage) {
this.requestMessage = requestMessage;
}
public byte[] getAttachment() {
return attachment;
}
public void setAttachment(byte[] attachment) {
this.attachment = attachment;
}
}
public class Response implements Serializable { // Serializable
private String id;
private String name;
private String responseMessage;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getResponseMessage() {
return responseMessage;
}
public void setResponseMessage(String responseMessage) {
this.responseMessage = responseMessage;
}
}
GZip圧縮Util
public class GzipUtils {
public static byte[] gzip(byte[] data) throws Exception {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(bos);
gzip.write(data);
gzip.finish();
gzip.close();
byte[] ret = bos.toByteArray();
bos.close();
return ret;
}
public static byte[] ungzip(byte[] data) throws Exception{
ByteArrayInputStream bis = new ByteArrayInputStream(data);
GZIPInputStream gzip = new GZIPInputStream(bis);
byte[] buf = new byte[1024];
int num = -1;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
while((num = gzip.read(buf)) != -1 ){
bos.write(buf, 0, num);
}
gzip.close();
bis.close();
byte[] ret = bos.toByteArray();
bos.close();
return ret;
}
}
サービス側とクライアント
public class Server {
public static void main(String[] args) throws Exception{
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer() {
protected void initChannel(SocketChannel sc) throws Exception {
// . , Handler msg
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture cf = b.bind(8765).sync();
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
public class ServerHandler extends ChannelHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Request req = (Request)msg;
System.out.println("Server : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage());
byte[] attachment = GzipUtils.ungzip(req.getAttachment());
String path = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + "001.jpg";
FileOutputStream fos = new FileOutputStream(path);
fos.write(attachment);
fos.close();
Response resp = new Response();
resp.setId(req.getId());
resp.setName("resp" + req.getId());
resp.setResponseMessage(" " + req.getId());
ctx.writeAndFlush(resp);//.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
public class Client {
public static void main(String[] args) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
for(int i = 0; i < 5; i++){
Request req = new Request();
req.setId("" + i);
req.setName("req" + i);
req.setRequestMessage(" " + i);
String path = System.getProperty("user.dir") + File.separatorChar + "sources" + File.separatorChar + "001.jpg";
File file = new File(path);
FileInputStream in = new FileInputStream(file);
byte[] data = new byte[in.available()];
in.read(data);
in.close();
req.setAttachment(GzipUtils.gzip(data)); //
cf.channel().writeAndFlush(req);
}
cf.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
public class ClientHandler extends ChannelHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
Response resp = (Response) msg;
System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
1.4長接続/短接続
1.長い接続で、一貫して接続が自発的に中断されず、リアルタイム性が強い2.短い接続である.データはキャッシュに入れ、一度にすべてのデータを一括して提出し、サービス側が受信した後に接続を閉じる以上の2種類は、ChannelHandlerContextにChannelFutureListener.celOSEリスナーを追加するかどうかによって実現する
3.長い接続、一定時間アクティブでない場合は接続を閉じる.SocketChannelにReadTimeoutHandler実装を追加する.例は以下の通りである.
public final class MarshallingCodeCFactory {
public static MarshallingDecoder buildMarshallingDecoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
return decoder;
}
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
public class Request implements Serializable{
private String id ;
private String name ;
private String requestMessage ;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getRequestMessage() {
return requestMessage;
}
public void setRequestMessage(String requestMessage) {
this.requestMessage = requestMessage;
}
}
public class Response implements Serializable{
private String id;
private String name;
private String responseMessage;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getResponseMessage() {
return responseMessage;
}
public void setResponseMessage(String responseMessage) {
this.responseMessage = responseMessage;
}
}
public class Server {
public static void main(String[] args) throws Exception{
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer() {
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ReadTimeoutHandler(5)); // ,
sc.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture cf = b.bind(8765).sync();
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}
public class ServerHandler extends ChannelHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Request request = (Request) msg;
System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());
Response response = new Response();
response.setId(request.getId());
response.setName("response" + request.getId());
response.setResponseMessage(" " + request.getId());
ctx.writeAndFlush(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
public class Client {
private EventLoopGroup group;
private Bootstrap b;
private ChannelFuture cf ;
//
private static class SingletonHolder {
static final Client instance = new Client();
}
public static Client getInstance(){
return SingletonHolder.instance;
}
private Client(){
group = new NioEventLoopGroup();
b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
// handler( , )
sc.pipeline().addLast(new ReadTimeoutHandler(5)); // 5s,
sc.pipeline().addLast(new ClientHandler());
}
});
}
public void connect(){
try {
this.cf = b.connect("127.0.0.1", 8765).sync();
System.out.println(" , ");
} catch (Exception e) {
e.printStackTrace();
}
}
public ChannelFuture getChannelFuture(){
if(this.cf == null) { //
this.connect();
}
if(!this.cf.channel().isActive()){ //
this.connect();
}
return this.cf;
}
public static void main(String[] args) throws Exception{
final Client c = Client.getInstance();
ChannelFuture cf = c.getChannelFuture();
for(int i = 1; i <= 3; i++ ){
Request request = new Request();
request.setId("" + i);
request.setName("request" + i);
request.setRequestMessage(" " + i);
cf.channel().writeAndFlush(request);
TimeUnit.SECONDS.sleep(4); // 4s
}
cf.channel().closeFuture().sync(); //
//
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(" ");
ChannelFuture cf = c.getChannelFuture();
assert true == cf.channel().isActive(); //
//
Request request = new Request();
request.setId("" + 4);
request.setName("request" + 4);
request.setRequestMessage(" " + 4);
cf.channel().writeAndFlush(request);
cf.channel().closeFuture().sync();
System.out.println(" ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
System.out.println(" , ..");
}
}
public class ClientHandler extends ChannelHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
Response resp = (Response) msg;
System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
1.5 UDP使用(少ない使用)
public class Server {
public void run(int port) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioDatagramChannel.class) // UDP: NioDatagramChannel
.option(ChannelOption.SO_BROADCAST, true) //
.handler(new ServerHandler());
b.bind(port).sync().channel().closeFuture().await();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new Server().run(8765);
}
}
public class ServerHandler extends SimpleChannelInboundHandler {
//
private static final String[] DICTIONARY = {
" , 。",
" , 。",
" , 。",
" , 。",
" , 。 , !"
};
private String nextQuote() {
int quoteId = ThreadLocalRandom.current().nextInt(DICTIONARY.length);
return DICTIONARY[quoteId];
}
@Override
public void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
String req = packet.content().toString(CharsetUtil.UTF_8);
System.out.println(req);
if (" ?".equals(req)) {
ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(" : " + nextQuote(), CharsetUtil.UTF_8), packet.sender()));
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
cause.printStackTrace();
}
}
public class Client {
public void run(int port) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ClientHandler());
Channel ch = b.bind(0).sync().channel();
// UDP
ch.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(" ?", CharsetUtil.UTF_8), new InetSocketAddress("255.255.255.255", port))).sync();
if (!ch.closeFuture().await(15000)) {
System.out.println(" !");
}
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new Client().run(8765);
}
}
public class ClientHandler extends SimpleChannelInboundHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
String response = msg.content().toString(CharsetUtil.UTF_8);
if (response.startsWith(" : ")) {
System.out.println(response);
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
2.心拍数検査
クラスタ内のプライマリサーバはスレーブサーバの状態を知る必要があるためclientは5~10秒おきにサーバにハートビートパケットを送信する
nettyとタイミングタスクで実現可能
public final class MarshallingCodeCFactory {
public static MarshallingDecoder buildMarshallingDecoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
return decoder;
}
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
public class RequestInfo implements Serializable {
private String ip ;
private HashMap cpuPercMap ;
private HashMap memoryMap;
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public HashMap getCpuPercMap() {
return cpuPercMap;
}
public void setCpuPercMap(HashMap cpuPercMap) {
this.cpuPercMap = cpuPercMap;
}
public HashMap getMemoryMap() {
return memoryMap;
}
public void setMemoryMap(HashMap memoryMap) {
this.memoryMap = memoryMap;
}
}
public class Server {
public static void main(String[] args) throws Exception{
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer() {
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ServerHeartBeatHandler());
}
});
ChannelFuture cf = b.bind(8765).sync();
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}
public class ServerHeartBeatHandler extends ChannelHandlerAdapter {
private static HashMap AUTH_IP_MAP = new HashMap();
private static final String SUCCESS_KEY = "auth_success_key";
static {
AUTH_IP_MAP.put("127.0.0.1", "1234");
}
private boolean auth(ChannelHandlerContext ctx, Object msg){
String [] ret = ((String) msg).split(",");
String auth = AUTH_IP_MAP.get(ret[0]);
if(auth != null && auth.equals(ret[1])){
// ,
ctx.writeAndFlush(SUCCESS_KEY);
return true;
} else {
ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE);
return false;
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof String){
auth(ctx, msg);
} else if (msg instanceof RequestInfo) {
RequestInfo info = (RequestInfo) msg;
System.out.println("--------------------------------------------");
System.out.println(" ip : " + info.getIp());
System.out.println(" cpu : ");
HashMap cpu = info.getCpuPercMap();
System.out.println(" : " + cpu.get("combined"));
System.out.println(" : " + cpu.get("user"));
System.out.println(" : " + cpu.get("sys"));
System.out.println(" : " + cpu.get("wait"));
System.out.println(" : " + cpu.get("idle"));
System.out.println(" memory : ");
HashMap memory = info.getMemoryMap();
System.out.println(" : " + memory.get("total"));
System.out.println(" : " + memory.get("used"));
System.out.println(" : " + memory.get("free"));
System.out.println("--------------------------------------------");
ctx.writeAndFlush("info received!");
} else {
ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE);
}
}
}
public class Client {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ClienHeartBeatHandler());
}
});
ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
cf.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
public class ClienHeartBeatHandler extends ChannelHandlerAdapter {
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private ScheduledFuture> heartBeat; //
//
private InetAddress addr ;
private static final String SUCCESS_KEY = "auth_success_key";
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
addr = InetAddress.getLocalHost();
//String ip = addr.getHostAddress();
String ip = "127.0.0.1";
String key = "1234";
//
String auth = ip + "," + key;
//
ctx.writeAndFlush(auth);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
if(msg instanceof String){
String ret = (String) msg;
if(SUCCESS_KEY.equals(ret)){
// , 5
this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 5, TimeUnit.SECONDS);
System.out.println(msg);
} else {
//
System.out.println(msg);
}
}
} finally {
// ,
ReferenceCountUtil.release(msg);
}
}
private class HeartBeatTask implements Runnable {
private final ChannelHandlerContext ctx;
public HeartBeatTask(final ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
try {
RequestInfo info = new RequestInfo();
//ip
info.setIp(addr.getHostAddress());
Sigar sigar = new Sigar();
//cpu prec
CpuPerc cpuPerc = sigar.getCpuPerc();
HashMap cpuPercMap = new HashMap();
cpuPercMap.put("combined", cpuPerc.getCombined());
cpuPercMap.put("user", cpuPerc.getUser());
cpuPercMap.put("sys", cpuPerc.getSys());
cpuPercMap.put("wait", cpuPerc.getWait());
cpuPercMap.put("idle", cpuPerc.getIdle());
// memory
Mem mem = sigar.getMem();
HashMap memoryMap = new HashMap();
memoryMap.put("total", mem.getTotal() / 1024L);
memoryMap.put("used", mem.getUsed() / 1024L);
memoryMap.put("free", mem.getFree() / 1024L);
info.setCpuPercMap(cpuPercMap);
info.setMemoryMap(memoryMap);
ctx.writeAndFlush(info);
} catch (Exception e) {
e.printStackTrace();
}
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
//
if (heartBeat != null) {
heartBeat.cancel(true);
heartBeat = null;
}
ctx.fireExceptionCaught(cause);
}
}
}
3. HTTP
3.1 Hello World
public final class HttpHelloWorldServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080"));
public static void main(String[] args) throws Exception {
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey());
} else {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.SO_BACKLOG, 1024);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new HttpHelloWorldServerInitializer(sslCtx));
Channel ch = b.bind(PORT).sync().channel();
System.err.println("Open your web browser and navigate to " +
(SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class HttpHelloWorldServerInitializer extends ChannelInitializer {
private final SslContext sslCtx;
public HttpHelloWorldServerInitializer(SslContext sslCtx) {
this.sslCtx = sslCtx;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new HttpServerCodec()); // ! http , HttpRequest HttpResponse
p.addLast(new HttpHelloWorldServerHandler());
}
}
public class HttpHelloWorldServerHandler extends ChannelHandlerAdapter {
private static final byte[] CONTENT = "HELLO WORLD".getBytes();
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
HttpRequest req = (HttpRequest) msg;
if (HttpHeaderUtil.is100ContinueExpected(req)) {
ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE));
}
boolean keepAlive = HttpHeaderUtil.isKeepAlive(req);
//
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(CONTENT));
response.headers().set(CONTENT_TYPE, "text/plain;charset=UTF-8");
response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());
if (!keepAlive) {
// Request ,
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
} else {
// , response KEEP_ALIVE
response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ctx.write(response);
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
3.2 HTTPダウンロードファイル
public class HttpDownloadServer {
private static final String DEFAULT_URL = "/sources/";
public void run(final int port, final String url) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// addLast key,
// request
ch.pipeline().addLast("http-decoder", new HttpRequestDecoder());
// response
ch.pipeline().addLast("http-encoder", new HttpResponseEncoder());
// chunked, response
ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
// ObjectAggregator, response FullHttpResponse
ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536));
// handler
ch.pipeline().addLast("fileServerHandler", new HttpDownoadServerHandler(url));
}
});
ChannelFuture future = b.bind("127.0.0.1", port).sync();
System.out.println("HTTP , : " + "http://localhost:" + port + url);
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8765;
new HttpDownloadServer().run(port, DEFAULT_URL);
}
}
// SimpleChannelInboundHandler, , msg
public class HttpDownoadServerHandler extends SimpleChannelInboundHandler {
private final String url;
public HttpDownoadServerHandler(String url) {
this.url = url;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
// ( )
if (!request.decoderResult().isSuccess()) {
// 400
sendError(ctx, BAD_REQUEST);
return;
}
// : GET
if (request.method() != GET) {
// 405
sendError(ctx, METHOD_NOT_ALLOWED);
return;
}
// uri
final String uri = request.uri();
// url ,
final String path = parseURI(uri);
// , path null
if (path == null) {
//403
sendError(ctx, FORBIDDEN);
return;
}
// file
File file = new File(path);
//
if (file.isHidden() || !file.exists()) {
// 404
sendError(ctx, NOT_FOUND);
return;
}
//
if (file.isDirectory()) {
if (uri.endsWith("/")) {
// "/" :
sendListing(ctx, file);
} else {
// "/" , "/"
sendRedirect(ctx, uri + '/');
}
return;
}
// file
if (!file.isFile()) {
// 403
sendError(ctx, FORBIDDEN);
return;
}
//
RandomAccessFile randomAccessFile = null;
try {
randomAccessFile = new RandomAccessFile(file, "r");//
} catch (FileNotFoundException fnfe) {
// 404
sendError(ctx, NOT_FOUND);
return;
}
//
long fileLength = randomAccessFile.length();
//
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
//
HttpHeaderUtil.setContentLength(response, fileLength);
// Content-Type
setContentTypeHeader(response, file);
// KeepAlive
if (HttpHeaderUtil.isKeepAlive(request)) {
response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
// response header, HttpObjectAggregator
ctx.write(response);
// ChunkedFile. ChunkedFile RandomAccessFile . 8192
ChannelFuture sendFileFuture = ctx.write(new ChunkedFile(randomAccessFile, 0, fileLength, 8192), ctx.newProgressivePromise());
//
sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
@Override
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
if (total < 0) {
System.err.println("Transfer progress: " + progress);
} else {
System.err.println("Transfer progress: " + progress + " / " + total);
}
}
@Override
public void operationComplete(ChannelProgressiveFuture future) throws Exception {
System.out.println("Transfer complete.");
}
});
// Chunked, !
ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
// Keep-Alive, ,
if (!HttpHeaderUtil.isKeepAlive(request)) {
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (ctx.channel().isActive()) {
// 500
sendError(ctx, INTERNAL_SERVER_ERROR);
ctx.close();
}
}
// URI
private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*");
private String parseURI(String uri) {
try {
// UTF-8
uri = URLDecoder.decode(uri, "UTF-8");
} catch (UnsupportedEncodingException e) {
try {
// ISO-8859-1
uri = URLDecoder.decode(uri, "ISO-8859-1");
} catch (UnsupportedEncodingException e1) {
//
throw new Error();
}
}
// uri :4
// step 1
if (!uri.startsWith(url)) {
return null;
}
// step 2
if (!uri.startsWith("/")) {
return null;
}
// step 3
uri = uri.replace('/', File.separatorChar);
// step 4
if (uri.contains(File.separator + '.') || uri.contains('.' + File.separator) ||
uri.startsWith(".") || uri.endsWith(".") ||
INSECURE_URI.matcher(uri).matches()) {
return null;
}
// + URI
return System.getProperty("user.dir") + File.separator + uri;
}
//
private static final Pattern ALLOWED_FILE_NAME = Pattern.compile("[A-Za-z0-9][-_A-Za-z0-9\\.]*");
// , html
private static void sendListing(ChannelHandlerContext ctx, File dir) {
//
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK);
//
response.headers().set(CONTENT_TYPE, "text/html; charset=UTF-8");
//
StringBuilder ret = new StringBuilder();
String dirPath = dir.getPath();
ret.append("\r
");
ret.append(""<span style="color:#000000;">);
ret.append(dirPath);
ret.append(</span>" :"<span style="color:#000000;">);
ret.append(</span>" \r
");
ret.append("");
ret.append(dirPath).append(" :");
ret.append("
\r
");
ret.append("");
ret.append("- :..
\r
");
// ,
for (File f : dir.listFiles()) {
//step 1:
if (f.isHidden() || !f.canRead()) {
continue;
}
String name = f.getName();
//step 2:
if (!ALLOWED_FILE_NAME.matcher(name).matches()) {
continue;
}
ret.append("- :);
ret.append(name);
ret.append("\">");
ret.append(name);
ret.append("
\r
");
}
ret.append("
\r
");
// ByteBuf,
ByteBuf buffer = Unpooled.copiedBuffer(ret, CharsetUtil.UTF_8);
//
response.content().writeBytes(buffer);
// ByteBuf
buffer.release();
//
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
//
private static void sendRedirect(ChannelHandlerContext ctx, String newUri) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, FOUND);
response.headers().set(LOCATION, newUri);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
//
private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status.toString()+ "\r
", CharsetUtil.UTF_8));
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
private static void setContentTypeHeader(HttpResponse response, File file) {
// mime Content-Type
MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
response.headers().set(CONTENT_TYPE, mimeTypesMap.getContentType(file.getPath()));
}
}
3.3 HTTPアップロードファイル(少ない使用)
実際のアプリケーションでは、ファイルアップロードサービス側に成熟したフレームワークfastDFS(小さなファイル)とHDFS(大きなファイル)があります.
ブレークポイントの継続を実現するには、アップロードの進捗を記録する必要があります.HTTPヘッダのRangeとContent-Langeを参照してください.
public final class HttpUploadServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey());
} else {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.handler(new LoggingHandler(LogLevel.INFO));
b.childHandler(new HttpUploadServerInitializer(sslCtx));
Channel ch = b.bind(PORT).sync().channel();
System.err.println("Open your web browser and navigate to " + (SSL ? "https" : "http") + "://127.0.0.1:" + PORT + '/');
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class HttpUploadServerInitializer extends ChannelInitializer {
private final SslContext sslCtx;
public HttpUploadServerInitializer(SslContext sslCtx) {
this.sslCtx = sslCtx;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpRequestDecoder());
pipeline.addLast(new HttpResponseEncoder());
//
pipeline.addLast(new HttpContentCompressor());
pipeline.addLast(new HttpUploadServerHandler());
}
}
public class HttpUploadServerHandler extends SimpleChannelInboundHandler {
private static final Logger logger = Logger.getLogger(HttpUploadServerHandler.class.getName());
private HttpRequest request;
private boolean readingChunks;
private final StringBuilder responseContent = new StringBuilder();
private static final HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE); // minsize
private HttpPostRequestDecoder decoder;
static {
DiskFileUpload.deleteOnExitTemporaryFile = true; //
DiskFileUpload.baseDirectory = "D:" + File.separatorChar + "aa"; //
DiskAttribute.deleteOnExitTemporaryFile = true; //
DiskAttribute.baseDirectory = "D:" + File.separatorChar + "aa"; //
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (decoder != null) {
decoder.cleanFiles();
}
}
@Override
public void messageReceived(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (msg instanceof HttpRequest) { // HttpRequest
HttpRequest request = this.request = (HttpRequest) msg;
URI uri = new URI(request.uri());
if (!uri.getPath().startsWith("/form")) {
//
writeMenu(ctx);
return;
}
//
responseContent.setLength(0);
responseContent.append("WELCOME TO THE WILD WILD WEB SERVER\r
");
responseContent.append("===================================\r
");
responseContent.append("VERSION: " + request.protocolVersion().text() + "\r
");
responseContent.append("REQUEST_URI: " + request.uri() + "\r
\r
");
responseContent.append("\r
\r
");
for (Entry entry : request.headers()) {
responseContent.append("HEADER: " + entry.getKey() + '=' + entry.getValue() + "\r
");
}
responseContent.append("\r
\r
");
Set cookies = null;
String value = request.headers().getAndConvert(HttpHeaderNames.COOKIE);
if (value == null) {
cookies = Collections.emptySet();
} else {
cookies = ServerCookieDecoder.decode(value);
}
for (Cookie cookie : cookies) {
responseContent.append("COOKIE: " + cookie + "\r
");
}
responseContent.append("\r
\r
");
QueryStringDecoder decoderQuery = new QueryStringDecoder(request.uri());
Map> uriAttributes = decoderQuery.parameters();
for (Entry> attr: uriAttributes.entrySet()) {
for (String attrVal: attr.getValue()) {
responseContent.append("URI: " + attr.getKey() + '=' + attrVal + "\r
");
}
}
responseContent.append("\r
\r
");
// GET , return
if (request.method().equals(HttpMethod.GET)) {
responseContent.append("\r
\r
END OF GET CONTENT\r
");
return;
}
// POST
try {
decoder = new HttpPostRequestDecoder(factory, request);
} catch (ErrorDataDecoderException e1) {
e1.printStackTrace();
responseContent.append(e1.getMessage());
writeResponse(ctx.channel());
ctx.channel().close();
return;
}
readingChunks = HttpHeaderUtil.isTransferEncodingChunked(request);
responseContent.append("Is Chunked: " + readingChunks + "\r
");
responseContent.append("IsMultipart: " + decoder.isMultipart() + "\r
");
if (readingChunks) {
responseContent.append("Chunks: ");
}
}
if (decoder != null) {
if (msg instanceof HttpContent) { //HttpContent
// chunk
HttpContent chunk = (HttpContent) msg;
try {
decoder.offer(chunk);
} catch (ErrorDataDecoderException e1) {
e1.printStackTrace();
responseContent.append(e1.getMessage());
writeResponse(ctx.channel());
ctx.channel().close();
return;
}
responseContent.append('o'); // chunk 'o'
readHttpDataChunkByChunk();
// chunk
if (chunk instanceof LastHttpContent) {
writeResponse(ctx.channel());
readingChunks = false;
reset();
}
}
} else {
writeResponse(ctx.channel());
}
}
private void reset() {
request = null;
decoder.destroy(); //
decoder = null;
}
private void readHttpDataChunkByChunk() throws Exception {
try {
while (decoder.hasNext()) {
InterfaceHttpData data = decoder.next();
if (data != null) {
try {
writeHttpData(data);
} finally {
data.release();
}
}
}
} catch (EndOfDataDecoderException e1) {
responseContent.append("\r
\r
END OF CONTENT CHUNK BY CHUNK\r
\r
");
}
}
private void writeHttpData(InterfaceHttpData data) throws Exception {
if (data.getHttpDataType() == HttpDataType.Attribute) {
Attribute attribute = (Attribute) data;
String value = null;
try {
value = attribute.getValue();
} catch (IOException e1) {
e1.printStackTrace();
responseContent.append("\r
BODY Attribute: " + attribute.getHttpDataType().name() + ": " + attribute.getName() + " Error while reading value: " + e1.getMessage() + "\r
");
return;
}
if (value.length() > 100) {
responseContent.append("\r
BODY Attribute: " + attribute.getHttpDataType().name() + ": " + attribute.getName() + " data too long\r
");
} else {
responseContent.append("\r
BODY Attribute: " + attribute.getHttpDataType().name() + ": " + attribute + "\r
");
}
} else {
responseContent.append("\r
-----------start-------------" + "\r
");
responseContent.append("\r
BODY FileUpload: " + data.getHttpDataType().name() + ": " + data + "\r
");
responseContent.append("\r
------------end------------" + "\r
");
if (data.getHttpDataType() == HttpDataType.FileUpload) {
FileUpload fileUpload = (FileUpload) data;
if (fileUpload.isCompleted()) {
System.out.println("file name : " + fileUpload.getFilename());
System.out.println("file length: " + fileUpload.length());
System.out.println("file maxSize : " + fileUpload.getMaxSize());
System.out.println("file path :" + fileUpload.getFile().getPath());
System.out.println("file absolutepath :" + fileUpload.getFile().getAbsolutePath());
System.out.println("parent path :" + fileUpload.getFile().getParentFile());
if (fileUpload.length() < 1024 * 1024 * 10) {
responseContent.append("\tContent of file\r
");
try {
responseContent.append(fileUpload.getString(fileUpload.getCharset()));
} catch (Exception e1) {
e1.printStackTrace();
}
responseContent.append("\r
");
} else {
responseContent.append("\tFile too long to be printed out:" + fileUpload.length() + "\r
");
}
fileUpload.renameTo(new File(fileUpload.getFile().getPath())); // ,
decoder.removeHttpDataFromClean(fileUpload);
} else {
responseContent.append("\tFile to be continued but should not!\r
");
}
}
}
}
private void writeResponse(Channel channel) {
ByteBuf buf = copiedBuffer(responseContent.toString(), CharsetUtil.UTF_8);
responseContent.setLength(0);
//
boolean close = request.headers().contains(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE, true)
|| request.protocolVersion().equals(HttpVersion.HTTP_1_0)
&& !request.headers().contains(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE, true);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
// Content-Length
if (!close) {
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes());
}
Set cookies = null;
String value = request.headers().getAndConvert(HttpHeaderNames.COOKIE);
if (value == null) {
cookies = Collections.emptySet();
} else {
cookies = ServerCookieDecoder.decode(value);
}
if (!cookies.isEmpty()) {
for (Cookie cookie : cookies) {
response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.encode(cookie));
}
}
ChannelFuture future = channel.writeAndFlush(response);
if (close) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
// html
private void writeMenu(ChannelHandlerContext ctx) {
responseContent.setLength(0);
// create Pseudo Menu
responseContent.append("");
responseContent.append("");
responseContent.append("Netty Test Form \r
");
responseContent.append("\r
");
responseContent.append("");
responseContent.append("");
responseContent.append("");
responseContent.append("");
responseContent.append("");
responseContent.append("");
responseContent.append("Netty Test Form");
responseContent.append("Choose one FORM");
responseContent.append("
\r
");
// GET
responseContent.append("GET FORM
");
responseContent.append("