生産者消費者モデル-Java実装(投稿)

58832 ワード

詳細
転載先:https://www.cnblogs.com/chentingk/p/6497107.html
センシングフェーズソフトウェア業界の発展に伴い、インターネットユーザーの増加に伴い、この芸術の興起は合理的なようだ.毎日のPV 10億本以上の淘宝は、同時処理の手段が業界一流と言える.ユーザーがタオバオのトップページにアクセスする平均待ち時間は数秒しかありませんが、サーバーが処理する流れは非常に複雑です.まずトップページを担当するサーバは数千台あり,計算によりユーザルーティングに最も近いサーバをトップページに戻す.次はウェブページ上のリソースで、JSとCSSファイルだけで百以上あり、画像リソースなどもあります.数秒でアリの数千人のトップエンジニアの知恵がどのように頂点に達しているかを見ることができます.
大手電子商取引サイトでは、彼らのサービスやアプリケーションがデカップリングされた後、メッセージキューを通じて互いに通信しています.メッセージキューとアプリケーション間のアーキテクチャ関係は、生産者消費者モデルである.
紹介する前に、まず現実間のモデルを探します.筆者は最近,多くの技術モデルが生活中のモデルと密接に関係していることに気づいた.多くの人がケンタッキーやマクドナルドで消費したことがあると信じていますが、筆者が店に入って消費したとき、注文の流れと同時モデルが非常に近いことに気づきました.店ごとに流れは違いますが、2つのモデルしかありません.ケンタッキーでは、注文した後、注文した食べ物をパッケージして前に持ってきて、お会計させます.時間がかかると、操作が完了しないとテーブル番号が残って後で送られてくることがあります.マクドナルドでの注文モデルは、ファーストフードを注文した後、すぐに支払うように要求し、支払いが終わった後、次の席で食事を取り、食事を取るのはそばで待っていて、もう一人の従業員が食事の手配を担当しています.
ケンタッキープロセス
マクドナルドの注文図
同時モデルでは、ケンタッキーは1つのスレッドですべてのサービスを完了する傾向があり、マクドナルドはサービスのデカップリングに傾き、自分のビジネスに集中させる傾向があります.ケンタッキーのモデルはBIOサーバーのモデル設計と似ており、マクドナルドのモデルは生産者の消費者モデルとよく似ている.
生産消費者モデル生産者消費者モデルは具体的には、1つのシステムにおいて、生産者と消費者の2つの役割が存在し、彼らはメモリバッファを通じて通信を行い、生産者は消費者が必要とする資料を生産し、消費者は資料を製品にする.生産消費者モデルは以下の図である.
              
ますます発展しているサービスタイプでは、例えばユーザーを登録するサービスは、いくつかの独立したサービス(アカウント検証、メールボックス検証コード、携帯電話メールコードなど)にデカップリングされる可能性があります.消費者として、ユーザーがデータを入力するのを待っていて、フロントのデータが提出された後、分解されて各サービスがあるurlに送信され、配布される役割は生産者に相当します.消費者は、データを取得するときに一度に処理できない可能性があります.では、メモリバッファというリクエストキューがあります.この仕事をするフレームワークをメッセージキューと言います.
生産者消費者モデルの実現生産者はスレッドの山であり、消費者は別のスレッドであり、メモリバッファはList配列キューを使用することができ、データ型は簡単なクラスを定義するだけでよい.重要なのは、マルチスレッド間のコラボレーションをどのように処理するかです.これもマルチスレッド通信の一例です.
このモデルでは、メモリバッファが空の場合、消費者は待たなければならないが、メモリバッファがいっぱいの場合、生産者は待たなければならない.他の場合はダイナミックバランスにすることができます.マルチスレッドによる臨界領域リソースの操作は、読み書き中に1つのスレッドしか存在しないことを保証しなければならないので、ロックのポリシーを設計する必要があることに注意してください.
次の例は本で紹介したもので、生産者は数字を生産してバッファに格納し、消費者はバッファからデータを取り出し、その平方を求めて出力する.
package ProducterAndConsumer.Version1;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 *  
 * @author ctk
 *  
 */

public class Producer implements Runnable {
    private volatile boolean isRunning = true;
    private BlockingQueue queue;//  
    private static AtomicInteger count = new AtomicInteger();//    
    private static final int SLEEPTIME = 1000;

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        PCData data = null;
        Random r = new Random();
        System.out.println("start producting id:" + Thread.currentThread().getId());
        try {
            while (isRunning) {
                Thread.sleep(r.nextInt(SLEEPTIME));
                data = new PCData(count.incrementAndGet());
                System.out.println(data + "  ");
                if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
                    System.err.println("  ");
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }

    }

    public void stop() {
        isRunning = false;
    }
}
package ProducterAndConsumer.Version1;
/**
 *  
 * @author ctk
 */
import java.text.MessageFormat;
import java.util.Random;
import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable{
    private BlockingQueue queue;
    private static final int SLEEPTIME = 1000;
    public Consumer(BlockingQueue queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("start Consumer id :"+Thread.currentThread().getId());
        Random r = new Random();
        try{
            while(true){
                PCData data = queue.take();
                if(data != null)
                {
                    int re = data.getData() * data.getData();
                    System.out.println(MessageFormat.format("{0}*{1}={2}", data.getData(),data.getData(),re));
                    Thread.sleep(r.nextInt(SLEEPTIME));
                }
            }
        }catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }

}
package ProducterAndConsumer.Version1;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;

/**
 *  
 * @author ctk
 *
 */
public class Main {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue queue = new LinkedBlockingDeque<>(10);
        Producer p1 = new Producer(queue);
        Producer p2 = new Producer(queue);
        Producer p3 = new Producer(queue);
        Consumer c1 = new Consumer(queue);
        Consumer c2 = new Consumer(queue);
        Consumer c3 = new Consumer(queue);
        ExecutorService service = Executors.newCachedThreadPool();
        service.execute(p1);
        service.execute(p2);
        service.execute(p3);
        service.execute(c1);
        service.execute(c2);
        service.execute(c3);
        Thread.sleep(10*1000);
        p1.stop();
        p2.stop();
        p3.stop();
        Thread.sleep(3000);
        service.shutdown();
    }
}
package ProducterAndConsumer.Version1;
/**
 *  
 * @author ctk
 *
 */
public class PCData {
    private final int intData;
    public PCData(int d){
        intData = d;
    }
    public PCData(String d){
        intData = Integer.valueOf(d);
    }
    public int getData(){
        return intData;
    }
    @Override
    public String toString(){
        return "data:"+intData;
    }
}

BlockingQueueはブロックキューであるため、そのアクセスは1つのスレッドだけが進行していることを保証することができるので、論理によると、生産者はメモリがいっぱいになったときに待機し、消費者キューを呼び覚まし、逆に消費者は飢餓状態で生産者を待機させ、生産者を目覚めさせる.
次の2つのバージョンはnotify/wait()とawait()/signal()メソッドを使用して設計されています.構造的にはモデル図に従っている.
package ProducterAndConsumer.Version2;

import java.util.List;

/**
 *  
 * 
 * @author ctk
 *
 */

public class Consumer implements Runnable {
    private List queue;

    public Consumer(List queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                if (Thread.currentThread().isInterrupted())
                    break;
                PCData data = null;
                synchronized (queue) {
                    if (queue.size() == 0) {
                        queue.wait();
                        queue.notifyAll();
                    }
                    data = queue.remove(0);
                }
                System.out.println(
                        Thread.currentThread().getId() + "  :" + data.get() + " result:" + (data.get() * data.get()));
                Thread.sleep(1000);
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}


package ProducterAndConsumer.Version2;

import java.util.List;
import java.util.Random;

/**
 *  
 * 
 * @author MacBook
 *
 */
public class Producer implements Runnable {
    private List queue;
    private int length;

    public Producer(List queue, int length) {
        this.queue = queue;
        this.length = length;
    }

    @Override
    public void run() {
        try {
            while (true) {

                if (Thread.currentThread().isInterrupted())
                    break;
                Random r = new Random();
                long temp = r.nextInt(100);
                System.out.println(Thread.currentThread().getId() + "  :" + temp);
                PCData data = new PCData();
                data.set(temp);
                synchronized (queue) {
                    if (queue.size() >= length) {
                        queue.notifyAll();
                        queue.wait();
                    } else
                        queue.add(data);
                }
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}


package ProducterAndConsumer.Version2;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    public static void main(String[] args) {
        List queue = new ArrayList();
        int length = 10;
        Producer p1 = new Producer(queue,length);
        Producer p2 = new Producer(queue,length);
        Producer p3 = new Producer(queue,length);
        Consumer c1 = new Consumer(queue);
        Consumer c2 = new Consumer(queue);
        Consumer c3 = new Consumer(queue);
        ExecutorService service = Executors.newCachedThreadPool();
        service.execute(p1);
        service.execute(p2);
        service.execute(p3);
        service.execute(c1);
        service.execute(c2);
        service.execute(c3);
        
    }
}


package ProducterAndConsumer.Version2;
/**
 *  
 * @author ctk
 *
 */
public class PCData {
    private long value;
    public void set(long value){
        this.value = value;
        
    }
    public long get(){
        return value;
    }
}

    
    
package ProducterAndConsumer.Version3;

import java.util.List;
/**
 *  
 * @author ctk
 *
 */
public class Consumer implements Runnable{
    private List queue;
    public Consumer(List queue){
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) {
                if (Thread.currentThread().isInterrupted())
                    break;
                PCData data = null;
                Main.lock.lock();
                if (queue.size() == 0){
                    Main.full.signalAll();
                    Main.empty.await();
                }
                Thread.sleep(1000);
                data = queue.remove(0);
                Main.lock.unlock();
                System.out.println(" ID:"+Thread.currentThread().getId()+"  :"+data.getData()+" result:"+(data.getData()*data.getData()));
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
}


package ProducterAndConsumer.Version3;

import java.util.List;
import java.util.Random;
/**
 *  
 * @author ctk
 *
 */
public class Producter implements Runnable{
    private List queue;
    private int len;
    public Producter(List queue,int len){
        this.queue = queue;
        this.len = len;
    }
    @Override
    public void run() {
        try{
            while(true){
                if(Thread.currentThread().isInterrupted())
                    break;
                Random r = new Random();
                PCData data = new PCData();
                data.setData(r.nextInt(500));
                Main.lock.lock();
                if(queue.size() >= len)
                {
                    Main.empty.signalAll();
                    Main.full.await();
                }
                Thread.sleep(1000);
                queue.add(data);
                Main.lock.unlock();
                System.out.println(" ID:"+Thread.currentThread().getId()+"  :"+data.getData());
            }
        }catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}


package ProducterAndConsumer.Version3;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class Main {
    public static ReentrantLock lock = new ReentrantLock();
    public static Condition empty = lock.newCondition();
    public static Condition full = lock.newCondition();
    public static void main(String[] args) {
        List queue = new ArrayList();
        int length = 10;
        Producter p1 = new Producter(queue,length);
        Producter p2 = new Producter(queue,length);
        Producter p3 = new Producter(queue,length);
        Consumer c1 = new Consumer(queue);
        Consumer c2 = new Consumer(queue);
        Consumer c3 = new Consumer(queue);
        ExecutorService service = Executors.newCachedThreadPool();
        service.execute(p1);
        service.execute(p2);
        service.execute(p3);
        service.execute(c1);
        service.execute(c2);
        service.execute(c3);
    }
}


package ProducterAndConsumer.Version3;

public class PCData {
    private int data;

    public int getData() {
        return data;
    }

    public void setData(int data) {
        this.data = data;
    }
}

awaitのバージョンは個人的に書いた後、コンソールごとに1つの言葉しか出力されず、同じ時間に生産者や消費者が1つだけアクティブになっていることを示していますが、waitのバージョンでは、1回に複数の生成者がアクティブになる可能性があります.個人的にはwaitのバージョンが私の構想に近いと思います.
生産消費者モデル思考午後本をめくって、偶然並列計算の流水線の思考を発見しました.並列計算のポイントは分治法思考であり,分割した2つの部分が因果的に関連していないことを証明できれば並列計算が可能である.例えば本の例(A+B)*Cのように、この式は並列計算で分割することはできません.その結果はA+Bの後の結果にCを乗じたからです.しかし、並列流水線の思考は、私たちは2人の労働者を招待することができて、すべての労働者に1歩の処理を担当することができます.
分解後のアーキテクチャは、P 1:D=A+B;P2:R = D*3;
この2つのスレッド処理では因果が存在する必要はないので,並列に計算することができる.
このモデルは生産消費者モデルに基づいて設計され、流水ラインは流水ラインを使用して半製品を伝達する必要があり、流水ラインはメモリバッファであり、P 2にとってP 1は生産者であり、システムに必要な結果にとって、P 2は生産者である.
              
後記偶然1冊の本を読んで、上の言及した高速道路を創立する学習方法は非常に効率的な学習方法で、新しい技術を学ぶ時それらは多かれ少なかれ現実の中でマッピングがあって、だから万巻の本を読んで万里の道を歩いて、経験と学術の需要は並行して成長します.技術モデルは技術分野だけでなく、管理分野も参照して考えることができ、learn more、study less.