Java apiを呼び出してHDFSにアクセス


に頼る
<dependency>
			<groupId> org.apache.hadoop</groupId>
			<artifactId>hadoop-client </artifactId>
			<version>2.6.0</version>
</dependency>

ファイルのアップロード
/**
     *      hdfs  
     *            
     * @param s    
     * @param d     
     * @throws IOException
     * @see [ 、 #  、 #  ]
     */
    public static void uploadLocalFile2HDFS(String s, String d)
        throws IOException
    {
        
        Configuration config = new Configuration();
        Path dstDir = new Path(d); 
        FileSystem hdfs = dstDir.getFileSystem(config);
        
        Path src = new Path(s);
  
        
        hdfs.copyFromLocalFile(src, dstDir);
        
        hdfs.close();
    }

ファイルの作成と書き込み
/**
     *          
     * <      >
     * @param toCreateFilePath    【      】
     * @param content     
     * @param hdfsAddress    HDFS  
     * @throws IOException
     * @see [ 、 #  、 #  ]
     */
    public static void createNewHDFSFile(String toCreateFilePath, String content,String hdfsAddress) throws IOException
    {
        Configuration config = new Configuration();
        Path dstDir = new Path(hdfsAddress); 
        FileSystem hdfs = dstDir.getFileSystem(config);
        
        FSDataOutputStream os = hdfs.create(new Path(toCreateFilePath));

        os.write(content.getBytes("UTF-8"));
        
        os.close();
        
        hdfs.close();
    }

ファイル/ディレクトリの削除
/**
     *     /  
     * <      >
     * @param dst
     * @param hdfsAddress
     * @return
     * @throws IOException
     * @see [ 、 #  、 #  ]
     */
    public static boolean deleteHDFSFile(String dst,String hdfsAddress) throws IOException
    {
        Configuration config = new Configuration();
        Path dstDir = new Path(hdfsAddress); 
        FileSystem hdfs = dstDir.getFileSystem(config);
        
        Path dstPath = new Path(dst); 
        boolean isDeleted = hdfs.deleteOnExit(dstPath);
        
        hdfs.close();
        
        return isDeleted;
    }

ファイルの読み込み
/**
     *     
     * <      >
     * @param dst
     * @param hdfsAddress
     * @return
     * @throws Exception
     * @see [ 、 #  、 #  ]
     */
    public static byte[] readHDFSFile(String dst,String hdfsAddress) throws Exception
    {
        Configuration config = new Configuration();
        Path dstDir = new Path(hdfsAddress); 
        FileSystem fs = dstDir.getFileSystem(config);
        
        // check if the file exists
        Path path = new Path(dst);
        if ( fs.exists(path) )
        {
            FSDataInputStream is = fs.open(path);
            // get the file info to create the buffer
            FileStatus stat = fs.getFileStatus(path);
            
            // create the buffer
            byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
            is.readFully(0, buffer);
            
            is.close();
            fs.close();
            
            return buffer;
        }
        else
        {
            throw new Exception("the file is not found .");
        }
    }

ディレクトリの作成
/**
     *        
     *           
     * @param dir
     * @param hdfsAddress
     * @throws IOException
     * @see [ 、 #  、 #  ]
     */
    public static void mkdir(String dir,String hdfsAddress) throws IOException
    {
        Configuration config = new Configuration();
        Path dstDir = new Path(hdfsAddress); 
        FileSystem fs = dstDir.getFileSystem(config);
        
        fs.mkdirs(new Path(dir));
        
        fs.close();
    }

ディレクトリ内のすべてのファイルを読み込む
/**
     *             
     * <      >
     * @param dir
     * @param hdfsAddress
     * @throws IOException
     * @see [ 、 #  、 #  ]
     */
    public static void listAll(String dir,String hdfsAddress) throws IOException
    {
        Configuration config = new Configuration();
        Path dstDir = new Path(hdfsAddress); 
        FileSystem fs = dstDir.getFileSystem(config);
        
        FileStatus[] stats = fs.listStatus(new Path(dir));
        
        for(int i = 0; i < stats.length; ++i)
        {
            if (stats[i].isFile())
            {
                // regular file
                System.out.println(stats[i].getPath().toString());
            }
            else if (stats[i].isDirectory())
            {
                // dir
                System.out.println(stats[i].getPath().toString());
            }
            else if(stats[i].isSymlink())
            {
                // is s symlink in linux
                System.out.println(stats[i].getPath().toString());
            }
                 
        }
        fs.close();
    }

ローカルファイルの内容をhdfsファイルにコピーする
/**
     *        copy hdfs   
     * <      >
     * @param localFilePath       
     * @param hdfsPath hdfs    
     * @throws FileNotFoundException 
     * @throws IOException
     * @see [ 、 #  、 #  ]
     */
    public static void FileCopyWhitProcess(String localFilePath, String hdfsPath) throws FileNotFoundException{
        InputStream in = new BufferedInputStream(new FileInputStream(localFilePath));
        Configuration config = new Configuration();
        FileSystem fs;
        FSDataOutputStream out;
        try
        {
           
            fs = FileSystem.get(URI.create(hdfsPath), config);
            out= fs.create(new Path(hdfsPath), new Progressable()
            {
                
                @Override
                public void progress()
                {
                    System.out.println(".");
                    
                }
            });
            IOUtils.copyBytes(in, out, 4096, true);
        }
        catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally{
            IOUtils.closeStream(in);
        }
        
    }