nettyベースのバイトストリーム転送ファイル
8772 ワード
public class FileUploadClientHandler extends ChannelInboundHandlerAdapter {
private int byteRead;
private volatile int start = 0;
private volatile int lastLength = 0;
public RandomAccessFile randomAccessFile;
private FileUploadFile fileUploadFile;//FileUploadFile : , , , ,
private final static Logger LOGGER = LoggerFactory.getLogger(FileUploadClientHandler.class);
public FileUploadClientHandler(FileUploadFile ef) {
if (ef.getFile().exists()) {
if (!ef.getFile().isFile()) {
System.out.println("Not a file :" + ef.getFile());
return;
}
}
this.fileUploadFile = ef;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelInactive(ctx);
LOGGER.info(" channelInactive()");
}
public void channelActive(ChannelHandlerContext ctx) {
LOGGER.info(" channelActive() .....");
try {
randomAccessFile = new RandomAccessFile(fileUploadFile.getFile(), "r");
randomAccessFile.seek(fileUploadFile.getStarPos());
// lastLength = (int) randomAccessFile.length() / 10;
lastLength = 1024 * 10;
if(randomAccessFile.length() < lastLength){
lastLength = (int)randomAccessFile.length();
}
byte[] bytes = new byte[lastLength];
if ((byteRead = randomAccessFile.read(bytes)) != -1) {
fileUploadFile.setEndPos(byteRead);
fileUploadFile.setBytes(bytes);
ctx.writeAndFlush(fileUploadFile);//
} else {
}
LOGGER.info("channelActive() " + byteRead);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException i) {
i.printStackTrace();
}
LOGGER.info("channelActive() ");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof Integer) {
start = (Integer) msg;
if (start != -1) {
randomAccessFile = new RandomAccessFile(fileUploadFile.getFile(), "r");
randomAccessFile.seek(start); // start
LOGGER.info(" :" + (randomAccessFile.length() - start));
int a = (int) (randomAccessFile.length() - start);
int b = (int) (randomAccessFile.length() / 1024 * 2);
if (a < lastLength) {
lastLength = a;
}
LOGGER.info(" :" + (randomAccessFile.length()) + ",start:" + start + ",a:" + a + ",b:" + b + ",lastLength:" + lastLength);
byte[] bytes = new byte[lastLength];
LOGGER.info("bytes ="+bytes.length);
if ((byteRead = randomAccessFile.read(bytes)) != -1 && (randomAccessFile.length() - start) > 0) {
LOGGER.info("byteRead = " + byteRead);
fileUploadFile.setEndPos(byteRead);
fileUploadFile.setBytes(bytes);
fileUploadFile.setFile_length(randomAccessFile.length());
try {
ctx.writeAndFlush(fileUploadFile);
} catch (Exception e) {
e.printStackTrace();
}
} else {
randomAccessFile.close();
LOGGER.info(" channelRead()--------" + byteRead);
}
}
}else if (msg instanceof String){
ctx.writeAndFlush("testtttttttt");
Thread.sleep(1000);
ctx.close();
}
}
//
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
package com.haoxy.netty.server.file.handler;
import com.haoxy.common.model.FileUploadFile;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.RandomAccessFile;
/**
* Created by haoxy on 2018/11/15.
* E-mail:[email protected]
* github:https://github.com/haoxiaoyong1014
*/
public class FileUploadServerHandler extends ChannelInboundHandlerAdapter {
private int byteRead;
private volatile int start = 0;
private String file_dir = "E:/tmp";
private final static Logger LOGGER = LoggerFactory.getLogger(FileUploadServerHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelActive(ctx);
LOGGER.info(" :channelActive()");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelInactive(ctx);
LOGGER.info(" :channelInactive()");
LOGGER.info(" ");
ctx.flush();
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
LOGGER.info(" , ....");
if (msg instanceof FileUploadFile) {
FileUploadFile ef = (FileUploadFile) msg;
byte[] bytes = ef.getBytes();
byteRead = ef.getEndPos();
String md5 = ef.getFile_md5();//
String path = file_dir + File.separator + md5;
File file = new File(path);
RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");//r: rw:
randomAccessFile.seek(start);// ,
randomAccessFile.write(bytes);// seek(start) , start 。 start
start = start + byteRead;
if (byteRead > 0) {
ctx.writeAndFlush(start);//
randomAccessFile.close();
// if (byteRead != 1024 * 10) {
// Thread.sleep(1000);
// channelInactive(ctx);
// }
} else {
ctx.close();
}
if (byteRead != 1024 * 10) {
randomAccessFile = new RandomAccessFile(file, "r");//
if (randomAccessFile.length() == ef.getFile_length()) {
LOGGER.info(" ");
} else {
LOGGER.info(" ");
}
ctx.writeAndFlush("zcj");
randomAccessFile.close();
}
LOGGER.info(" , :" + path + "," + byteRead);
} else {
LOGGER.info(msg.toString());
LOGGER.info(" " + ctx.name() + " ");
channelInactive(ctx);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
LOGGER.info("FileUploadServerHandler--exceptionCaught()");
}
}
io.netty
netty-all
4.1.21.Final