Flink 1.10: Cannot find compatible factory for specified execution.target (=local)
44006 ワード
flink1.10の1つのローカルDemoテストの過程の中で現れた1つの問題、とても簡単な1段のWord Countはコードを聞きます:
maven依存度は次のとおりです.
コード自体は簡単で、公式サイトの一例ですが、実行時にエラーが発生しました.
次の手順に従います.エラーメッセージはパラメータが見つかりません:execution.target 誤報行の内容はDatasetである.print()メソッドは、printメソッドの実装を見ることができる:
ポイント:
printメソッドは実際にcollectメソッドを呼び出し、得られた結果を出力し、collectの実装を見てみましょう.
JobExecutionResult res=getExecutionEnvironment()という行が表示されます.execute();executeの実装:
継続点:
メソッドが実行する最初の行には、executeAsyncメソッドがあります.
以上がこの間違いの根源であり、このコードはorgである.apache.flink.api.java.ExecutionEnvironment.JAvaでは、私たちのエラーはexecutorFactoryのexecutionです.targetパラメータは空です.executorFactoryはexecutorServiceLoaderを介しています.getExecutorFactory(configuration)が得たものです.configurationはクラスで宣言されたプロパティです.上記コードに依存するorg.apache.flink.api.scalaExecutionEnvironment.scalaにconfigurationはありません maven依存にjavaの部分を加える: 再実行、異常なし:
総括:感じはとても科学的ではありませんて、このような欠落の情況は現れるべきではありません...
package flink.io
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object flink_demo {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.createLocalEnvironment(2)
val text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?")
val counts = text.flatMap {
_.toLowerCase.split("\\W+").filter {
_.nonEmpty
}
}.map {
(_, 1)
}
.groupBy(0)
.sum(1)
counts.print()
}
}
maven依存度は次のとおりです.
<properties>
<scala.version>2.11.12</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
コード自体は簡単で、公式サイトの一例ですが、実行時にエラーが発生しました.
Exception in thread "main" java.lang.NullPointerException: Cannot find compatible factory for specified execution.target (=local)
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:937)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:860)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:844)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1864)
at flink.io.flink_demo$.main(flink_demo.scala:28)
at flink.io.flink_demo.main(flink_demo.scala)
次の手順に従います.
/**
* Prints the elements in a DataSet to the standard output stream [[System.out]] of the
* JVM that calls the print() method. For programs that are executed in a cluster, this
* method needs to gather the contents of the DataSet back to the client, to print it
* there.
*
* The string written for each element is defined by the [[AnyRef.toString]] method.
*
* This method immediately triggers the program execution, similar to the
* [[collect()]] and [[count()]] methods.
*/
def print(): Unit = {
javaSet.print()
}
ポイント:
* Prints the elements in a DataSet to the standard output stream {@link System#out} of the JVM that calls
* the print() method. For programs that are executed in a cluster, this method needs
* to gather the contents of the DataSet back to the client, to print it there.
*
* <p>The string written for each element is defined by the {@link Object#toString()} method.
*
* <p>This method immediately triggers the program execution, similar to the
* {@link #collect()} and {@link #count()} methods.
*
* @see #printToErr()
* @see #printOnTaskManager(String)
*/
public void print() throws Exception {
List<T> elements = collect();
for (T e: elements) {
System.out.println(e);
}
}
printメソッドは実際にcollectメソッドを呼び出し、得られた結果を出力し、collectの実装を見てみましょう.
/**
* Convenience method to get the elements of a DataSet as a List.
* As DataSet can contain a lot of data, this method should be used with caution.
*
* @return A List containing the elements of the DataSet
*/
public List<T> collect() throws Exception {
final String id = new AbstractID().toString();
final TypeSerializer<T> serializer = getType().createSerializer(getExecutionEnvironment().getConfig());
this.output(new Utils.CollectHelper<>(id, serializer)).name("collect()");
JobExecutionResult res = getExecutionEnvironment().execute();
ArrayList<byte[]> accResult = res.getAccumulatorResult(id);
if (accResult != null) {
try {
return SerializedListAccumulator.deserializeList(accResult, serializer);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Cannot find type class of collected data type.", e);
} catch (IOException e) {
throw new RuntimeException("Serialization error while deserializing collected data", e);
}
} else {
throw new RuntimeException("The call to collect() could not retrieve the DataSet.");
}
}
JobExecutionResult res=getExecutionEnvironment()という行が表示されます.execute();executeの実装:
/**
* Triggers the program execution. The environment will execute all parts of the program that have
* resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
* writing results (e.g. {@link DataSet#writeAsText(String)},
* {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
* data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
*
* The program execution will be logged and displayed with a generated default name.
*
* @return The result of the job execution, containing elapsed time and accumulators.
* @throws Exception Thrown, if the program executions fails.
*/
public JobExecutionResult execute() throws Exception {
return execute(getDefaultName());
}
継続点:
/**
* Triggers the program execution. The environment will execute all parts of the program that have
* resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
* writing results (e.g. {@link DataSet#writeAsText(String)},
* {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
* data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
*
* The program execution will be logged and displayed with the given job name.
*
* @return The result of the job execution, containing elapsed time and accumulators.
* @throws Exception Thrown, if the program executions fails.
*/
public JobExecutionResult execute(String jobName) throws Exception {
final JobClient jobClient = executeAsync(jobName);
try {
if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
lastJobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
} else {
lastJobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
}
jobListeners.forEach(
jobListener -> jobListener.onJobExecuted(lastJobExecutionResult, null));
} catch (Throwable t) {
jobListeners.forEach(jobListener -> {
jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException(t));
});
ExceptionUtils.rethrowException(t);
}
return lastJobExecutionResult;
}
メソッドが実行する最初の行には、executeAsyncメソッドがあります.
/**
* Triggers the program execution asynchronously. The environment will execute all parts of the program that have
* resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
* writing results (e.g. {@link DataSet#writeAsText(String)},
* {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
* data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
*
* The program execution will be logged and displayed with the given job name.
*
* @return A {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded.
* @throws Exception Thrown, if the program submission fails.
*/
@PublicEvolving
public JobClient executeAsync(String jobName) throws Exception {
checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");
final Plan plan = createProgramPlan(jobName);
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);
checkNotNull(
executorFactory,
"Cannot find compatible factory for specified execution.target (=%s)",
configuration.get(DeploymentOptions.TARGET));
CompletableFuture<JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(plan, configuration);
try {
JobClient jobClient = jobClientFuture.get();
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
return jobClient;
} catch (Throwable t) {
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, t));
ExceptionUtils.rethrow(t);
// make javac happy, this code path will not be reached
return null;
}
}
以上がこの間違いの根源であり、このコードはorgである.apache.flink.api.java.ExecutionEnvironment.JAvaでは、私たちのエラーはexecutorFactoryのexecutionです.targetパラメータは空です.executorFactoryはexecutorServiceLoaderを介しています.getExecutorFactory(configuration)が得たものです.configurationはクラスで宣言されたプロパティです.上記コードに依存するorg.apache.flink.api.scalaExecutionEnvironment.scalaにconfigurationはありません
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.10.0</version>
</dependency>
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
(hear,1)
(s,2)
(them,1)
(ho,1)
(i,2)
(stand,1)
(there,2)
(think,1)
(who,2)
Process finished with exit code 0
総括:感じはとても科学的ではありませんて、このような欠落の情況は現れるべきではありません...