nettyベースのバイトストリーム転送ファイル

8772 ワード

  • ビジネスシーンがマイクロサービスが横行している現在、AサービスとBサービスの間でデータ伝送が行われるのは避けられない.今回解決する問題a.ABの2つのサービス間でファイルを転送するb.転送完了時にデータ整合性判定を行うc.現在の接続チャネルに基づいてメッセージ(ファイルアドレスを携帯する、その他の業務に必要なパラメータ)を転送し、Bサービスに対してファイルを業務操作することを通知する
  • .
  • RandomAccessFile簡単に紹介RandomAccessFileはデータ記録を保存するファイルにアクセスするために使用されており、seek()メソッドで記録にアクセスし、読み書きすることができます.これらの記録の大きさは同じである必要はありません.しかし、その大きさと位置は知られていなければならない.ただし、このクラスは操作ファイルに限られます.RandomAccessFileはInputStreamとOutputStreamクラスに属しません.実際には、DataInputとDataOutputインタフェースの実装に加えて(DataInputStreamとDataOutputStreamもこの2つのインタフェースを実装している)、この2つのクラスとは無関係であり、InputStreamとOutputStreamクラスにすでに存在する任意の機能さえ使用しない.完全に独立したクラスであり、すべての方法(ほとんどはそれ自身に属している)はゼロから書かれています.これは、RandomAccessFileがファイル内を前後に移動できるため、他のI/Oクラスとは根本的に動作が異なる可能性があります.要するに、Objectを直接継承する独立したクラスです.基本的に、RandomAccessFileの働き方は、DataInputStreamとDataOutputStreamを組み合わせて、位置決め用のgetFilePointer()や、ファイル内で移動するためのseek()や、ファイルサイズを判断するlength()やskipBytes()が何バイトをスキップするかなど、独自の方法を加えることです.また、そのコンストラクション関数は、読み取り専用(「r」)か、読み取り専用(「rw」)かでファイルを開くパラメータ(Cのfopen()とそっくり)を表す.ファイルのみの書き込みはサポートされていません.RandomAccessFileのみseek検索メソッドがあり、このメソッドはファイルにのみ適用されます.
  • クライアントプライマリコード
  • public class FileUploadClientHandler extends ChannelInboundHandlerAdapter {
       private int byteRead;
       private volatile int start = 0;
       private volatile int lastLength = 0;
       public RandomAccessFile randomAccessFile;
       private FileUploadFile fileUploadFile;//FileUploadFile      :  ,   ,    ,      ,    
       private final static Logger LOGGER = LoggerFactory.getLogger(FileUploadClientHandler.class);
       public FileUploadClientHandler(FileUploadFile ef) {
           if (ef.getFile().exists()) {
               if (!ef.getFile().isFile()) {
                   System.out.println("Not a file :" + ef.getFile());
                   return;
               }
           }
           this.fileUploadFile = ef;
       }
    
       @Override
       public void channelInactive(ChannelHandlerContext ctx) throws Exception {
           // TODO Auto-generated method stub
           super.channelInactive(ctx);
           LOGGER.info("         channelInactive()");
       }
    
       public void channelActive(ChannelHandlerContext ctx) {
           LOGGER.info("    channelActive()  .....");
           try {
               randomAccessFile = new RandomAccessFile(fileUploadFile.getFile(), "r");
               randomAccessFile.seek(fileUploadFile.getStarPos());
               // lastLength = (int) randomAccessFile.length() / 10;
               lastLength = 1024 * 10;
               if(randomAccessFile.length() < lastLength){
                   lastLength = (int)randomAccessFile.length();
               }
               byte[] bytes = new byte[lastLength];
               if ((byteRead = randomAccessFile.read(bytes)) != -1) {
                   fileUploadFile.setEndPos(byteRead);
                   fileUploadFile.setBytes(bytes);
                   ctx.writeAndFlush(fileUploadFile);//        
               } else {
               }
               LOGGER.info("channelActive()       " + byteRead);
           } catch (FileNotFoundException e) {
               e.printStackTrace();
           } catch (IOException i) {
               i.printStackTrace();
           }
           LOGGER.info("channelActive()      ");
       }
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
           if (msg instanceof Integer) {
               start = (Integer) msg;
               if (start != -1) {
                   randomAccessFile = new RandomAccessFile(fileUploadFile.getFile(), "r");
                   randomAccessFile.seek(start); //      start
                   LOGGER.info("  :" + (randomAccessFile.length() - start));
                   int a = (int) (randomAccessFile.length() - start);
                   int b = (int) (randomAccessFile.length() / 1024 * 2);
                   if (a < lastLength) {
                       lastLength = a;
                   }
                   LOGGER.info("    :" + (randomAccessFile.length()) + ",start:" + start + ",a:" + a + ",b:" + b + ",lastLength:" + lastLength);
                   byte[] bytes = new byte[lastLength];
                   LOGGER.info("bytes    ="+bytes.length);
                   if ((byteRead = randomAccessFile.read(bytes)) != -1 && (randomAccessFile.length() - start) > 0) {
                       LOGGER.info("byteRead = "  + byteRead);
                       fileUploadFile.setEndPos(byteRead);
                       fileUploadFile.setBytes(bytes);
                       fileUploadFile.setFile_length(randomAccessFile.length());
                       try {
                           ctx.writeAndFlush(fileUploadFile);
                       } catch (Exception e) {
                           e.printStackTrace();
                       }
                   } else {
                       randomAccessFile.close();
                       LOGGER.info("      channelRead()--------" + byteRead);
                   }
               }
           }else if (msg instanceof String){
               ctx.writeAndFlush("testtttttttt");
               Thread.sleep(1000);
               ctx.close();
           }
       }
       //    
       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
           cause.printStackTrace();
           ctx.close();
       }
    }
    
    
  • サービス側メインコード
  • package com.haoxy.netty.server.file.handler;
    
    import com.haoxy.common.model.FileUploadFile;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.File;
    import java.io.RandomAccessFile;
    
    /**
     * Created by haoxy on 2018/11/15.
     * E-mail:[email protected]
     * github:https://github.com/haoxiaoyong1014
     */
    public class FileUploadServerHandler extends ChannelInboundHandlerAdapter {
        private int byteRead;
        private volatile int start = 0;
        private String file_dir = "E:/tmp";
        private final static Logger LOGGER = LoggerFactory.getLogger(FileUploadServerHandler.class);
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // TODO Auto-generated method stub
            super.channelActive(ctx);
            LOGGER.info("   :channelActive()");
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            // TODO Auto-generated method stub
            super.channelInactive(ctx);
            LOGGER.info("   :channelInactive()");
            LOGGER.info("           ");
            ctx.flush();
            ctx.close();
        }
    
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            LOGGER.info("          ,    ....");
            if (msg instanceof FileUploadFile) {
                FileUploadFile ef = (FileUploadFile) msg;
                byte[] bytes = ef.getBytes();
                byteRead = ef.getEndPos();
                String md5 = ef.getFile_md5();//   
                String path = file_dir + File.separator + md5;
                File file = new File(path);
                RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");//r:      rw:    
                randomAccessFile.seek(start);//           ,
                randomAccessFile.write(bytes);//   seek(start)  ,             start     。        start       
                start = start + byteRead;
                if (byteRead > 0) {
                    ctx.writeAndFlush(start);//        
                    randomAccessFile.close();
    //                if (byteRead != 1024 * 10) {
    //               	Thread.sleep(1000);
    //                	channelInactive(ctx);
    //                }
                } else {
                    ctx.close();
                }
                if (byteRead != 1024 * 10) {
                    randomAccessFile = new RandomAccessFile(file, "r");//  
                    if (randomAccessFile.length() == ef.getFile_length()) {
                        LOGGER.info("      ");
                    } else {
                        LOGGER.info("          ");
                    }
                    ctx.writeAndFlush("zcj");
                    randomAccessFile.close();
                }
                LOGGER.info("    ,    :" + path + "," + byteRead);
            } else {
                LOGGER.info(msg.toString());
                LOGGER.info("    " + ctx.name() + "      ");
                channelInactive(ctx);
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
            LOGGER.info("FileUploadServerHandler--exceptionCaught()");
        }
    
    }
    
    
  • pom依存
  • 
                io.netty
                netty-all
                4.1.21.Final
            
    
  • コードダウンロードリンク対応リソースパッケージダウンロード