Hadoop TDG 2 – Development Environment

10210 ワード

GenericOptionsParser, Tool, and ToolRunner


Hadoop comes with a few helper classes for making it easier to run jobs from the command line. GenericOptionsParser is a class that interprets common Hadoop command-line options and sets them on a Configuration object for your application to use as desired. You don’t usually use GenericOptionsParser directly, as it’s more convenient to implement the Tool interface and run your application with the ToolRunner, which uses GenericOptionsParser internally.
 
Table 5-1. GenericOptionsParser and ToolRunner options Option                      Description -D property=value     Sets the given Hadoop configuration property to the given value.                                  Overrides any default or site properties in the configuration, and any properties set via the -conf option. -conf filename ...       Adds the given files to the list of resources in the configuration. This is a convenient way to set site properties or to set a number of properties at once. -fs uri                       Sets the default filesystem to the given URI. Shortcut for -D fs.default.name=uri -jt host:port              Sets the jobtracker to the given host and port. Shortcut for –D mapred.job.tracker=host:port -files file1,file2,...      Copies the specified files from the local filesystem (or any filesystem if a scheme is specified) to the shared filesystem used by the jobtracker (usually HDFS) and makes                                 them available to MapReduce programs in the task’s working directory. (See “Distributed Cache” on page 288 for more on the distributed cache mechanism for copying files to                                 tasktracker machines.) -archives archive1,archive2,...  Copies the specified archives from the local filesystem (or any filesystem if a scheme is specified) to the shared filesystem used by the jobtracker (usually HDFS), unarchives                                 them, and makes them available to MapReduce programs in the task’s working directory. -libjars jar1,jar2,...    Copy the specified JAR files from the local filesystem (or any filesystem if a scheme is specified) to the shared filesystem used by the jobtracker (usually HDFS), and adds                                 them to the MapReduce task’s classpath. This option is a useful way of shipping JAR files that a job is dependent on.
 

Writing a Unit Test


The map and reduce functions in MapReduce are easy to test in isolation, which is a consequence of their functional style. For known inputs, they produce known outputs. However, since outputs are written to a Context (or an OutputCollector in the old API), rather than simply being returned from the method call, the Context needs to be replaced with a mock so that its outputs can be verified. There are several Java mock object frameworks that can help build mocks; here we use Mockito, which is noted for its clean syntax, although any mock framework should work just as well. 通常のUTパッケージ、MRUnit project(http://incubator.apache.org/mrunit/),which aims to make unit testing MapReduce programs easier.

Mapper


The test for the mapper is shown in Example 5-4. Example 5-4. Unit test for MaxTemperatureMapper
import static org.mockito.Mockito.*;
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.junit.*;
public class MaxTemperatureMapperTest {
    @Test
    public void processesValidRecord() throws IOException, InterruptedException {
        MaxTemperatureMapper mapper = new MaxTemperatureMapper();
        Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" +
                              // Year ^^^^
                              "99999V0203201N00261220001CN9999999N9-00111+99999999999");
                              // Temperature ^^^^^
        MaxTemperatureMapper.Context context =
        mock(MaxTemperatureMapper.Context.class);
        mapper.map(null, value, context);

        verify(context).write(new Text("1950"), new IntWritable(-11));
    }
}
 mock Context,  verify context

 

Running Locally on Test Data


Now that we’ve got the mapper and reducer working on controlled inputs, the next step is to write a job driver and run it on some test data on a development machine.
Equivalently, we could use the -fs and -jt options provided by GenericOptionsParser:
% hadoop v2.MaxTemperatureDriver -fs file:///-jt local input/ncdc/micro output
This command executes MaxTemperatureDriver using input from the local input/ncdc/micro directory, producing output in the local output directory. Note that although we’ve set -fs so we use the local filesystem (file:///), the local job runner will actually work fine against any filesystem, including HDFS (and it can be handy to do this if you have a few files that are on HDFS).
 

Running on a Cluster


Now that we are happy with the program running on a small test dataset, we are ready to try it on the full dataset on a Hadoop cluster.
 
Packaging We don’t need to make any modifications to the program to run on a cluster rather than on a single machine, but we do need to package the program as a JAR file to send to the cluster.
 
Launching a Job To launch the job, we need to run the driver, specifying the cluster that we want to run the job on with the -conf option (we could equally have used the -fs and -jt options):
% hadoop jar hadoop-examples.jar v3.MaxTemperatureDriver -conf conf/hadoop-cluster.xml  input/ncdc/all max-temp
Job, Task, and Task Attempt IDs The format of a job ID is composed of the time that the jobtracker (not the job) started and an incrementing counter maintained by the jobtracker to uniquely identify the job to that instance of the jobtracker. So the job with this ID:
job_200904110811_0002 is the second (0002, job IDs are 1-based) job run by the jobtracker which started at 08:11 on April 11, 2009.
Tasks belong to a job, and their IDs are formed by replacing the job prefix of a job ID with a task prefix, and adding a suffix to identify the task within the job. For example:
task_200904110811_0002_m_000003 is the fourth (000003, task IDs are 0-based) map (m) task of the job with ID job_200904110811_0002.
 
Tasks may be executed more than once, due to failure (see “Task Failure” on page 200) or speculative execution (see “Speculative Execution” on page 213), so to identify different instances of a task execution, task attempts are given unique IDs on the jobtracker. For example:
attempt_200904110811_0002_m_000003_0 is the first (0, attempt IDs are 0-based) attempt at running task task_200904110811_0002_m_000003.
 

Tuning a Job


After a job is working, the question many developers ask is, “Can I make it run faster?” There are a few Hadoop-specific “usual suspects” that are worth checking to see if they are responsible for a performance problem. You should run through the checklist in Table 5-3 before you start trying to profile or optimize at the task level.
 
Number of mappers How long are your mappers running for? If they are only running for a few seconds on average, then you should see if there’s a way to have fewer mappers and make them all run longer, a minute or so, as a rule of thumb. The extent to which this is possible depends on the input format you are using.  Refer “Small files and CombineFileInputFormat” on page 237
Number of reducers For maximum performance, the number of reducers should be slightly less than the number of reduce slots in the cluster. This allows the reducers to finish in one wave and fully utilizes the cluster during the reduce phase. Refer to “Choosing the Number of Reducers” on page 229
Combiners Can your job take advantage of a combiner to reduce the amount of data in passing through the shuffle? Refer to “Combiner Functions” on page 34
Intermediate compression Job execution time can almost always benefit from enabling map output compression. Refer to “Compressing map output” on page 94
Custom serialization If you are using your own custom Writable objects, or custom comparators, then make sure you have implemented RawComparator. Refer to “Implementing a RawComparator for speed” on page 108
Shuffle tweaks The MapReduce shuffle exposes around a dozen tuning parameters for memory management, which may help you eke out the last bit of performance. Refer to “Configuration Tuning” on page 209
 

Apache Oozie


If you need to run a complex workflow, or one on a tight production schedule, or you have a large number of connected workflows with data dependencies between them, then a more sophisticated approach is required. Apache Oozie ( http://incubator.apache.org/oozie/) fits the bill in any or all of these cases. It has been designed to manage the executions of thousands of dependent workflows, each composed of possibly thousands of consistuent actions at the level of an individual Map-Reduce job.