【Hadoop】よく使われるHDFS操作に詳しい(Java実装)


一、実験目的
1.HDFSのHadoopアーキテクチャにおける役割を理解する.
2.HDFS操作でよく使われるShellコマンドを使いこなす.
3.HDFS操作でよく使われるJava APIに詳しい.
二、実験プラットフォーム
OS:Linux(deepin)
Hadoopバージョン:2.7.17
JDKバージョン:1.8
Java IDE:Eclipse
三、実験内容
1.分散ファイルシステムHDFSホームでjavaプログラミングを用いてそれぞれ実現する:
  • ファイルを表示します.txtが存在するかどうか、存在しない場合は作成します.(HdfsExpTask1)
  • hadoop.txt書き込み内容.(HdfsExpTask2)
  • hadoopを読み出す.txtファイルの内容.(HdfsExpTask3)
  • import java.io.BufferedReader;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.net.URISyntaxException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    
    public class HdfsExpTask1 {
    
    	public static void main(String[] args) {
    		try {
    			Path filePath = new Path("hadoop.txt");
    			Configuration config = new Configuration();
    			config.set("fs.defaultFS", "hdfs://localhost:9000/");
    			FileSystem fs = FileSystem.get(config);
    			if(fs.exists(filePath)) {
    				System.out.println(filePath+" exists.");
    			} else {
    				FSDataOutputStream ops = fs.create(filePath);
    				ops.close();
    				fs.close();
    			}
    		} catch(Exception e) {
    			e.printStackTrace();
    		}
    
    	}
    
    }
    
    import java.io.BufferedReader;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.net.URISyntaxException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class HdfsExpTask2 {
    
    	public static void main(String[] args) {
    		try {
    			Path filePath = new Path("hadoop.txt");
    			Configuration config = new Configuration();
    			config.set("fs.defaultFs", "hdfs://localhost:9000/");
    			FileSystem fs = FileSystem.get(config);
    			String content = "Task2: WRITE(201701060413)";
    			byte[] buff = content.getBytes();
    			FSDataOutputStream ops = fs.create(filePath);
    			ops.write(buff, 0, buff.length);
    			ops.close();
    			fs.close();
    		}catch(Exception e) {
    			e.printStackTrace();
    		}
    	}
    
    }
    
    import java.io.BufferedReader;
    
    public class HdfsExpTask3 {
    
    	public static void main(String[] args) {
    		try {
    			Path filePath = new Path("hadoop.txt");
    			Configuration config = new Configuration();
    			config.set("fs.defaultFS", "hdfs://localhost:9000/");
    			FileSystem fs = FileSystem.get(config);
    			
    			FSDataInputStream ips = fs.open(filePath);
    			BufferedReader reader = new BufferedReader(new InputStreamReader(ips));
    			String content = reader.readLine();
    			while(content != null) {
    				System.out.println(content);
    				content = reader.readLine();
    			}
    			reader.close();
    			ips.close();
    			fs.close();
    		}catch(Exception e) {
    			e.printStackTrace();
    		}
    
    	}
    
    }
    

     
     
    2.Javaプログラミングは以下の指定機能を実現する.
  • HDFSで指定されたファイルの内容を端末に出力する.
  • はHDFSのいずれかのディレクトリを与え、そのディレクトリの下のすべてのファイルの読み書き権限、サイズ、作成時間、パスなどの情報を出力し、そのファイルがディレクトリであれば、そのディレクトリの下のすべてのファイルに関する情報を再帰的に出力する.
  • は、HDFS内のファイルのパスを提供し、そのファイルを作成および削除する.ファイルが存在するディレクトリが存在しない場合は、自動的にディレクトリが作成されます.
  • HDFSで指定されたファイルにコンテンツを追加し、ユーザが指定したコンテンツを既存のファイルの先頭または末尾に追加する.
  • HDFSで指定されたファイルを削除します.
  • HDFSで指定されたディレクトリを削除し、ユーザーがディレクトリにファイルがある場合にディレクトリを削除するかどうかを指定する.
  • HDFSでは、ファイルをソースパスから宛先パスに移動します.
  • import java.io.BufferedInputStream;
    import java.io.BufferedReader;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.net.URISyntaxException;
    import java.util.Scanner;
    
    import org.apache.commons.compress.utils.IOUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.LocatedFileStatus;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.RemoteIterator;
    
    public class HdfsExpTask4 {
    	/*
    	 *2.1  HDFS               
    	 */
    	public static void cat(Configuration conf, String path) throws IOException{
    		FileSystem fs = FileSystem.get(conf);
    		Path filePath = new Path(path);
    		if(fs.exists(filePath)) {
    			FSDataInputStream in = fs.open(filePath);
    			BufferedReader br = new BufferedReader(new InputStreamReader(in));
    			String content = null;
    			while((content = br.readLine()) != null) {
    				System.out.println(content);
    			}
    			br.close();
    			fs.close();
    		}else {
    			System.out.println("file "+filePath+ "doesn't exist.");
    		}
    		fs.close();
    	}
    	
    	/*
    	 *2.2   HDFS      ,            
    	 *    、  、    、     ,        ,
    	 *                 ;
    	 */
    	//             
    	public static void printFileInformation(Configuration conf, String file)throws IOException {
    		FileSystem fs = FileSystem.get(conf);
    		FileStatus[] status = fs.listStatus(new Path(file));
    		for (FileStatus s:status) {
    			System.out.println("    :"+s.getPermission()+";     :"+s.getBlockSize()+";     :"
    					+s.getPath()+";       :"+s.getModificationTime());
    		}
    		fs.close();
    	}
    	//             
    	public static void printFileInfo(Configuration conf, String filePath) throws IOException {
    		FileSystem fs = FileSystem.get(conf);
    		Path path= new Path(filePath);
    		RemoteIterator iterator = fs.listFiles(path, true);
    		while(iterator.hasNext()) {
    			FileStatus s = iterator.next();
    			System.out.println("    :"+s.getPermission()+";     :"+s.getBlockSize()+";     :"
    					+s.getPath()+";       :"+s.getModificationTime());
    		}
    		fs.close();
    	}
    	/*
    	 * 2.3     HDFS       ,             。
    	 *                 ,       ;
    	 */
    	//     
    	public static void createFile(Configuration conf, String filePath) throws IOException {
    		FileSystem fs = FileSystem.get(conf);
    		Path path= new Path(filePath);
    		FSDataOutputStream ops = fs.create(path);
    		System.out.println("      :"+filePath);
    		ops.close();
    		fs.close();
    	}
    	// 2.5     
    	public static void deleteFile(Configuration conf, String filePath) throws IOException {
    		FileSystem fs = FileSystem.get(conf);
    		Path path= new Path(filePath);
    		if(fs.deleteOnExit(path)) {
    			System.out.println("      :"+filePath);
    		} else {
    			System.out.println("      :"+filePath);
    		}
    		fs.close();
    	}
    	/*
    	 * 2.4  HDFS          ,                    ;
    	 */
    	//          
    	public static void addContentToTail(Configuration conf, String filePath, String content, boolean head) throws IOException {
    		FileSystem fs = FileSystem.get(conf);
    		Path path= new Path(filePath);
    		FSDataOutputStream ops = fs.append(path);
    		ops.write(content.getBytes());
    		if (!head) {
    			System.out.println("        。");	
    		}
    		ops.close();
    		fs.close();
    	}
    	//        
    	public static void moveToLocalFile(Configuration conf, String remoteFilePath, String localFilePath) throws IOException, InterruptedException, URISyntaxException {
    		FileSystem fs = FileSystem.get(conf);			
    		Path remotePath = new Path(remoteFilePath); 
    		Path localPath = new Path(localFilePath); 
    		fs.moveToLocalFile(remotePath, localPath);
    	}
    	//                
    	public static void addFileToTail(Configuration conf, String localFilePath, String remoteFilePath) throws IOException {
    		FileSystem fs = FileSystem.get(conf);
    		Path remotePath = new Path(remoteFilePath);
    		//         (       )   
    		FileInputStream inps = new FileInputStream(localFilePath);
    		//          ,            
    		FSDataOutputStream ops = fs.append(remotePath);
    		byte[] buffer = new byte[1024];
    		int read = -1;
    		while((read = inps.read(buffer)) > 0) {
    			ops.write(buffer, 0, read);
    		}
    		ops.close();
    		inps.close();
    		fs.close();
    	}
    	//          
    	public static void addContentToHead(Configuration conf, String filePath, String content) throws IOException, InterruptedException, URISyntaxException {
    		//           
    		String localTmpPath = "tmp.txt";
    		//             
    		moveToLocalFile(conf, filePath, localTmpPath);
    		//       HDFS  (  )
    		createFile(conf, filePath);
    		addContentToTail(conf, filePath, content, true);
    		addFileToTail(conf, localTmpPath, filePath);
    		System.out.println("        。");
    	}
    	/*
    	 * 2.6   HDFS      ,                     ;
    	 */
    	public static void deleteDirectory(Configuration conf, String filePath) throws IOException {
    		FileSystem fs = FileSystem.get(conf);
    		Path path = new Path(filePath);
    		FileStatus[] status = fs.listStatus(path);
    		if(status.length > 0) {
    			System.out.println("        ,      Y/N");
    			Scanner cin = new Scanner(System.in);
    			String flag = cin.next();
    			if(flag.equals("Y")) {
    				fs.delete(path,true);
    				System.out.println("     .");
    			}else {
    				System.out.println("      .");
    			}
    		}else {
    			fs.delete(path,true);
    			System.out.println("     .");
    		}
    		fs.close();
    	}
    	
    	/*
    	 * 2.7  HDFS ,              。
    	 */
    	public static void moveFile(Configuration conf, String srcPath, String dirPath) throws IOException {
    		FileSystem fs = FileSystem.get(conf);
    		if(fs.exists(new Path(dirPath))) {
    			System.out.println("     。");
    			return;
    		}
    		if(fs.rename(new Path(srcPath), new Path(dirPath))) {
    			System.out.println("      。");
    		} else {
    			System.out.println("      。");
    		}
    	}
    	
    	public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
    		Scanner cin = new Scanner(System.in);
    		Configuration conf = new Configuration();
    		conf.set("fs.defaultFS", "hdfs://localhost:9000");
    		conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true");
    		conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
    		conf.set("dfs.replication","1");
    		//     
    		String filePath = "hadoop.txt";
    		//   hdfs  
    		HdfsExpTask4.createFile(conf, filePath);
    		//            
    		HdfsExpTask4.addContentToTail(conf, filePath, "!!!hello!!!", false);
    		HdfsExpTask4.cat(conf, filePath);
    		//            
    		HdfsExpTask4.addContentToHead(conf, filePath, "hello hadoop!");
    		//          
    		HdfsExpTask4.cat(conf, filePath);
    		//           
    		HdfsExpTask4.printFileInformation(conf, filePath);
    		//       
    		HdfsExpTask4.moveFile(conf, filePath, "newHadoop.txt");
    		//     
    		HdfsExpTask4.deleteFile(conf, "newHadoop.txt");
    		//      
    		String dirPath = "test/hadoop/aaa.txt";
    		HdfsExpTask4.createFile(conf, dirPath);
    		HdfsExpTask4.printFileInfo(conf, "test");
    		//HdfsExpTask4.deleteDirectory(conf, "test");
    		System.out.println("end.");
    	}
    }