【Scala】処理中のものを処理し終わった上で終了するProducer-Consumerパターン


並行処理のデザインパターンでよくあるProducer-Consumerパターンですが、
このパターンに、タイトルの通りの動きのロジックを作成しましたので、紹介いたします。

コード

import java.util.concurrent.ArrayBlockingQueue

class ProducerConsumer[E <: AnyRef](producerConcurrentNumber: Int, producer: () => E, consumerConcurrentNumber: Int, consumer: E => Unit, sentinelObject: E) {
  @volatile private[this] var terminateProducer: Boolean = _

  private[this] val queue = new ArrayBlockingQueue[E](consumerConcurrentNumber * 2)

  private[this] val producerThreadList = List.fill(producerConcurrentNumber) {
    new Thread(() => {
      while (!terminateProducer) queue.put(producer())
    })
  }
  private[this] val consumerThreadList = List.fill(producerConcurrentNumber){
    new Thread(() => {
      //whileを使うと微妙なようなので、再帰関数にして、それに末尾再帰最適化を効かせました
      //var item: E = sentinelObject//ignore value
      //while( {item = queue.take();item} ne sentinelObject)
      //  consumer(item)
      @tailrec
      def consumeLoop(): Unit = {
        val item = queue.take()
        if(sentinelObject eq item) return
        consumer(item)
        consumeLoop()
      }
      consumeLoop()
    })
  }

  def start() {
    producerThreadList.foreach(_.start())
    consumerThreadList.foreach(_.start())
  }

  def stop() {
    terminateProducer = true
    producerThreadList.foreach(_.join())

    1 to producerConcurrentNumber foreach(_ => queue.put(sentinelObject))
    consumerThreadList.foreach(_.join())
  }
}

1秒間に64スレッドで生産して、128スレッドで消費するパターンでの使用例

使用例
import java.util.concurrent.TimeUnit

object Main extends App {
  import java.util.concurrent.atomic.AtomicLong
  val prodCnt = new AtomicLong()
  val consCnt = new AtomicLong()
  val cc: ProducerConsumer[String] = new ProducerConsumer(
    64,
    () =>
      "aaa"+prodCnt.getAndIncrement()
    ,
    128,
    _a => consCnt.getAndIncrement(),
    new String(""))//終了処理の判定だけに使う番兵オブジェクト
  cc.start()
  Thread.sleep(TimeUnit.SECONDS.toMillis(1))
  cc.stop()
  println("----- finish -----")
  println(f"生産した個数: ${prodCnt.get}%,d")
  println(f"消費した個数: ${consCnt.get}%,d")
  assert( prodCnt.get == consCnt.get)
}

解説

Producerの終了方法

terminateProducerをtrueにすると、whileループから外れて生産しなくなって終了
terminateProducerはマルチスレッドから呼ばれるため、@volatileをつけておいたが、
つけなくても終了処理はされるため、外してもよい。

Consumerの終了方法

Producerのスレッドが全部終了し、queueへの追加が行われなくなった後に、
番兵オブジェクト(sentinelObject)をConsumerのスレッド数分だけ投入する。

Consumerスレッドでは、sentinelObjectが来たら終了という判定を行っている。
番兵オブジェクト(sentinelObject)の判定は、!=ではダメでneで同じ参照値かのチェックで判定を行う。

その他

ConsumerスレッドのロジックがScalaっぽくないコードになってしまった。
Scalaでは、ローカル変数宣言時の初期値Must、代入式の結果を利用できないため…。
こういう場合だけは、Javaのほうが短く書ける。

こういうところでwhileを使うと微妙なようなので、再帰関数にして、それに末尾再帰最適化を効かせました。
たぶん、Scalaではこう書くもんなんだと思われる。

参考資料

番兵 - Wikipedia