sparkの過程の中でScalaのマルチスレッドの小さい問題を覚えます
今回ThriftServerのソースコードを変更して、いくつかの業務を加えて、途中でこのような問題に遭遇して、非同期でタスクを提出する時マルチスレッドを作りたいと思って、最初は使用するscalaのActorで、SQLContextとsqlを伝えて、sparkSessionIdがずっと変化していることを発見して、毎回Actionを提出してトリガした後に発生するsessionIdはすべて一致しないで、これはどういうことですか、その後、マルチスレッド非同期の問題であることが判明し、sqlContextがスレッド側でタスクを実行している間にセッションを再起動します.どうすればいいのでしょうか.以下の方法で実現するしかありません.
現在のメソッド内でcontext変数を使用すればよい
もちろん皆さんの勉強を便利にするために、よくある書き方は以下の通りです.
Callable例
java.util.concurrent.ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(new Callable() {
@Override
public Void call(){
df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file3",com.hadoop.compression.lzo.LzopCodec.class);
return null;
}
});
executorService.submit(new Callable() {
@Override
public Void call(){
df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file4",com.hadoop.compression.lzo.LzopCodec.class);
return null;
}
});
executorService.shutdown();
現在のメソッド内でcontext変数を使用すればよい
もちろん皆さんの勉強を便利にするために、よくある書き方は以下の通りです.
import java.util.concurrent.{Executors, ExecutorService}
object Test {
def main(args: Array[String]) {
//
val threadPool:ExecutorService=Executors.newFixedThreadPool(5)
try {
// 5
for(i
Callable例
import java.util.concurrent.{Callable, FutureTask, Executors, ExecutorService}
object Test {
def main(args: Array[String]) {
val threadPool:ExecutorService=Executors.newFixedThreadPool(3)
try {
val future=new FutureTask[String](new Callable[String] {
override def call(): String = {
Thread.sleep(100)
return "im result"
}
})
threadPool.execute(future)
println(future.get())
}finally {
threadPool.shutdown()
}
}
}