Java apiを呼び出してHDFSにアクセス
に頼る
ファイルのアップロード
ファイルの作成と書き込み
ファイル/ディレクトリの削除
ファイルの読み込み
ディレクトリの作成
ディレクトリ内のすべてのファイルを読み込む
ローカルファイルの内容をhdfsファイルにコピーする
<dependency>
<groupId> org.apache.hadoop</groupId>
<artifactId>hadoop-client </artifactId>
<version>2.6.0</version>
</dependency>
ファイルのアップロード
/**
* hdfs
*
* @param s
* @param d
* @throws IOException
* @see [ 、 # 、 # ]
*/
public static void uploadLocalFile2HDFS(String s, String d)
throws IOException
{
Configuration config = new Configuration();
Path dstDir = new Path(d);
FileSystem hdfs = dstDir.getFileSystem(config);
Path src = new Path(s);
hdfs.copyFromLocalFile(src, dstDir);
hdfs.close();
}
ファイルの作成と書き込み
/**
*
* < >
* @param toCreateFilePath 【 】
* @param content
* @param hdfsAddress HDFS
* @throws IOException
* @see [ 、 # 、 # ]
*/
public static void createNewHDFSFile(String toCreateFilePath, String content,String hdfsAddress) throws IOException
{
Configuration config = new Configuration();
Path dstDir = new Path(hdfsAddress);
FileSystem hdfs = dstDir.getFileSystem(config);
FSDataOutputStream os = hdfs.create(new Path(toCreateFilePath));
os.write(content.getBytes("UTF-8"));
os.close();
hdfs.close();
}
ファイル/ディレクトリの削除
/**
* /
* < >
* @param dst
* @param hdfsAddress
* @return
* @throws IOException
* @see [ 、 # 、 # ]
*/
public static boolean deleteHDFSFile(String dst,String hdfsAddress) throws IOException
{
Configuration config = new Configuration();
Path dstDir = new Path(hdfsAddress);
FileSystem hdfs = dstDir.getFileSystem(config);
Path dstPath = new Path(dst);
boolean isDeleted = hdfs.deleteOnExit(dstPath);
hdfs.close();
return isDeleted;
}
ファイルの読み込み
/**
*
* < >
* @param dst
* @param hdfsAddress
* @return
* @throws Exception
* @see [ 、 # 、 # ]
*/
public static byte[] readHDFSFile(String dst,String hdfsAddress) throws Exception
{
Configuration config = new Configuration();
Path dstDir = new Path(hdfsAddress);
FileSystem fs = dstDir.getFileSystem(config);
// check if the file exists
Path path = new Path(dst);
if ( fs.exists(path) )
{
FSDataInputStream is = fs.open(path);
// get the file info to create the buffer
FileStatus stat = fs.getFileStatus(path);
// create the buffer
byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
is.readFully(0, buffer);
is.close();
fs.close();
return buffer;
}
else
{
throw new Exception("the file is not found .");
}
}
ディレクトリの作成
/**
*
*
* @param dir
* @param hdfsAddress
* @throws IOException
* @see [ 、 # 、 # ]
*/
public static void mkdir(String dir,String hdfsAddress) throws IOException
{
Configuration config = new Configuration();
Path dstDir = new Path(hdfsAddress);
FileSystem fs = dstDir.getFileSystem(config);
fs.mkdirs(new Path(dir));
fs.close();
}
ディレクトリ内のすべてのファイルを読み込む
/**
*
* < >
* @param dir
* @param hdfsAddress
* @throws IOException
* @see [ 、 # 、 # ]
*/
public static void listAll(String dir,String hdfsAddress) throws IOException
{
Configuration config = new Configuration();
Path dstDir = new Path(hdfsAddress);
FileSystem fs = dstDir.getFileSystem(config);
FileStatus[] stats = fs.listStatus(new Path(dir));
for(int i = 0; i < stats.length; ++i)
{
if (stats[i].isFile())
{
// regular file
System.out.println(stats[i].getPath().toString());
}
else if (stats[i].isDirectory())
{
// dir
System.out.println(stats[i].getPath().toString());
}
else if(stats[i].isSymlink())
{
// is s symlink in linux
System.out.println(stats[i].getPath().toString());
}
}
fs.close();
}
ローカルファイルの内容をhdfsファイルにコピーする
/**
* copy hdfs
* < >
* @param localFilePath
* @param hdfsPath hdfs
* @throws FileNotFoundException
* @throws IOException
* @see [ 、 # 、 # ]
*/
public static void FileCopyWhitProcess(String localFilePath, String hdfsPath) throws FileNotFoundException{
InputStream in = new BufferedInputStream(new FileInputStream(localFilePath));
Configuration config = new Configuration();
FileSystem fs;
FSDataOutputStream out;
try
{
fs = FileSystem.get(URI.create(hdfsPath), config);
out= fs.create(new Path(hdfsPath), new Progressable()
{
@Override
public void progress()
{
System.out.println(".");
}
});
IOUtils.copyBytes(in, out, 4096, true);
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
IOUtils.closeStream(in);
}
}