Flink 1.10: Cannot find compatible factory for specified execution.target (=local)

44006 ワード

flink1.10の1つのローカルDemoテストの過程の中で現れた1つの問題、とても簡単な1段のWord Countはコードを聞きます:
package flink.io
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object flink_demo {
  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.createLocalEnvironment(2)
    val text = env.fromElements(
      "Who's there?",
      "I think I hear them. Stand, ho! Who's there?")

    val counts = text.flatMap {
      _.toLowerCase.split("\\W+").filter {
        _.nonEmpty
      }
    }.map {
      (_, 1)    

    }
      .groupBy(0)
      .sum(1)

    counts.print()
  }

}


maven依存度は次のとおりです.
<properties>
        <scala.version>2.11.12</scala.version>
    </properties>


    <dependencies>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.10.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.10.0</version>
            <scope>provided</scope>
        </dependency>
  
    </dependencies>

コード自体は簡単で、公式サイトの一例ですが、実行時にエラーが発生しました.
Exception in thread "main" java.lang.NullPointerException: Cannot find compatible factory for specified execution.target (=local)
	at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
	at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:937)
	at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:860)
	at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:844)
	at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
	at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
	at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1864)
	at flink.io.flink_demo$.main(flink_demo.scala:28)
	at flink.io.flink_demo.main(flink_demo.scala)


次の手順に従います.
  • エラーメッセージはパラメータが見つかりません:execution.target
  • 誤報行の内容はDatasetである.print()メソッドは、printメソッドの実装を見ることができる:
  • /**
       * Prints the elements in a DataSet to the standard output stream [[System.out]] of the
       * JVM that calls the print() method. For programs that are executed in a cluster, this
       * method needs to gather the contents of the DataSet back to the client, to print it
       * there.
       *
       * The string written for each element is defined by the [[AnyRef.toString]] method.
       *
       * This method immediately triggers the program execution, similar to the
       * [[collect()]] and [[count()]] methods.
       */
     def print(): Unit = {
        javaSet.print()
      }
    

    ポイント:
     * Prints the elements in a DataSet to the standard output stream {@link System#out} of the JVM that calls
    	 * the print() method. For programs that are executed in a cluster, this method needs
    	 * to gather the contents of the DataSet back to the client, to print it there.
    	 *
    	 * <p>The string written for each element is defined by the {@link Object#toString()} method.
    	 *
    	 * <p>This method immediately triggers the program execution, similar to the
    	 * {@link #collect()} and {@link #count()} methods.
    	 *
    	 * @see #printToErr()
    	 * @see #printOnTaskManager(String)
    	 */
    	public void print() throws Exception {
    		List<T> elements = collect();
    		for (T e: elements) {
    			System.out.println(e);
    		}
    	}
    

    printメソッドは実際にcollectメソッドを呼び出し、得られた結果を出力し、collectの実装を見てみましょう.
    /**
    	 * Convenience method to get the elements of a DataSet as a List.
    	 * As DataSet can contain a lot of data, this method should be used with caution.
    	 *
    	 * @return A List containing the elements of the DataSet
    	 */
    	public List<T> collect() throws Exception {
    		final String id = new AbstractID().toString();
    		final TypeSerializer<T> serializer = getType().createSerializer(getExecutionEnvironment().getConfig());
    
    		this.output(new Utils.CollectHelper<>(id, serializer)).name("collect()");
    		JobExecutionResult res = getExecutionEnvironment().execute();
    
    		ArrayList<byte[]> accResult = res.getAccumulatorResult(id);
    		if (accResult != null) {
    			try {
    				return SerializedListAccumulator.deserializeList(accResult, serializer);
    			} catch (ClassNotFoundException e) {
    				throw new RuntimeException("Cannot find type class of collected data type.", e);
    			} catch (IOException e) {
    				throw new RuntimeException("Serialization error while deserializing collected data", e);
    			}
    		} else {
    			throw new RuntimeException("The call to collect() could not retrieve the DataSet.");
    		}
    	}
    

    JobExecutionResult res=getExecutionEnvironment()という行が表示されます.execute();executeの実装:
    /**
    	 * Triggers the program execution. The environment will execute all parts of the program that have
    	 * resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
    	 * writing results (e.g. {@link DataSet#writeAsText(String)},
    	 * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
    	 * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
    	 *
    	 * 

    The program execution will be logged and displayed with a generated default name. * * @return The result of the job execution, containing elapsed time and accumulators. * @throws Exception Thrown, if the program executions fails. */

    public JobExecutionResult execute() throws Exception { return execute(getDefaultName()); }

    継続点:
    /**
    	 * Triggers the program execution. The environment will execute all parts of the program that have
    	 * resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
    	 * writing results (e.g. {@link DataSet#writeAsText(String)},
    	 * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
    	 * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
    	 *
    	 * 

    The program execution will be logged and displayed with the given job name. * * @return The result of the job execution, containing elapsed time and accumulators. * @throws Exception Thrown, if the program executions fails. */

    public JobExecutionResult execute(String jobName) throws Exception { final JobClient jobClient = executeAsync(jobName); try { if (configuration.getBoolean(DeploymentOptions.ATTACHED)) { lastJobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get(); } else { lastJobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID()); } jobListeners.forEach( jobListener -> jobListener.onJobExecuted(lastJobExecutionResult, null)); } catch (Throwable t) { jobListeners.forEach(jobListener -> { jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException(t)); }); ExceptionUtils.rethrowException(t); } return lastJobExecutionResult; }

    メソッドが実行する最初の行には、executeAsyncメソッドがあります.
    /**
    	 * Triggers the program execution asynchronously. The environment will execute all parts of the program that have
    	 * resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
    	 * writing results (e.g. {@link DataSet#writeAsText(String)},
    	 * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
    	 * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
    	 *
    	 * 

    The program execution will be logged and displayed with the given job name. * * @return A {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded. * @throws Exception Thrown, if the program submission fails. */

    @PublicEvolving public JobClient executeAsync(String jobName) throws Exception { checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file."); final Plan plan = createProgramPlan(jobName); final PipelineExecutorFactory executorFactory = executorServiceLoader.getExecutorFactory(configuration); checkNotNull( executorFactory, "Cannot find compatible factory for specified execution.target (=%s)", configuration.get(DeploymentOptions.TARGET)); CompletableFuture<JobClient> jobClientFuture = executorFactory .getExecutor(configuration) .execute(plan, configuration); try { JobClient jobClient = jobClientFuture.get(); jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null)); return jobClient; } catch (Throwable t) { jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, t)); ExceptionUtils.rethrow(t); // make javac happy, this code path will not be reached return null; } }

    以上がこの間違いの根源であり、このコードはorgである.apache.flink.api.java.ExecutionEnvironment.JAvaでは、私たちのエラーはexecutorFactoryのexecutionです.targetパラメータは空です.executorFactoryはexecutorServiceLoaderを介しています.getExecutorFactory(configuration)が得たものです.configurationはクラスで宣言されたプロパティです.上記コードに依存するorg.apache.flink.api.scalaExecutionEnvironment.scalaにconfigurationはありません
  • maven依存にjavaの部分を加える:
  •         <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.11</artifactId>
                <version>1.10.0</version>
            </dependency>
    
  • 再実行、異常なし:
  • SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    (hear,1)
    (s,2)
    (them,1)
    (ho,1)
    (i,2)
    (stand,1)
    (there,2)
    (think,1)
    (who,2)
    
    Process finished with exit code 0
    

    総括:感じはとても科学的ではありませんて、このような欠落の情況は現れるべきではありません...