CAS Exchanger

38871 ワード



package com.test.distrib;

import java.io.PrintStream;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

public class FileDistributionStreamExchangeTask implements Runnable {

    private static final String counterGrp = "edit-counters";
    private static final String storeName1 = "wikipedia-stats1";
    private static final String storeName2 = "wikipedia-stats2";
    private static final String storeKey1 = "Hello1";
    private static final String storeKey2 = "Hello2";
    private static final PrintStream OUTPUT_STREAM = System.out;
    private static final String counterName1 = "repeat-edits1";
    private static final String counterName2 = "repeat-edits2";

    private DistributionStreamWorker worker1;
    private DistributionStreamWorker worker2;

    private static final long limits = 2000;

    private volatile DistributionStreamWorker cworker;
    private volatile AtomicReferenceFieldUpdater<FileDistributionStreamExchangeTask, DistributionStreamWorker> storeCAS =
            AtomicReferenceFieldUpdater.newUpdater(FileDistributionStreamExchangeTask.class,
                    DistributionStreamWorker.class,
                    "cworker");


    public FileDistributionStreamExchangeTask() {
        init();
    }

    public void init() {
        Map<String, Map<String, List<String>>> taskContext = new HashMap<>();
        taskContext.put(storeName1, new HashMap<>());
        taskContext.put(storeName2, new HashMap<>());
        this.worker1 = new DistributionStreamWorker(storeKey1, new AtomicLong(), taskContext.get(storeName1));
        this.worker2 = new DistributionStreamWorker(storeKey2, new AtomicLong(), taskContext.get(storeName2));
        // cworker should be null, init once
        storeCAS.getAndSet(this, worker1);
        Timer t = new Timer();
        t.schedule(new TimerTask() {
            @Override
            public void run() {
                window();
            }
        }, 0, 100);

    }

    public void process(IncomingMessageEnvelope envelope) {
        DistributionStreamWorker sWorker = cworker;
        while (sWorker.counter.get() >= limits) {// TODO 1
            // TODO 2
            if (sWorker == worker1) {
                if (!storeCAS.compareAndSet(this, worker1, worker2)) {
                    sWorker = cworker;
                    System.out.println("Swap 1 - 2");
                    continue;
                }
            } else {
                if (!storeCAS.compareAndSet(this, worker2, worker1)) {
                    sWorker = cworker;
                    System.out.println("Swap 2 - 1");
                    continue;
                }
            }
            sWorker.purge(this.limits);
        }
		// TODO 5
        cworker.handleMsg(envelope);
    }


    public void window() {
        // Send the remains
        System.out.println("window flush==============================");
        cworker.purge(-1);
    }

    @Override
    public void run() {

        while (true) {
            IncomingMessageEnvelope msg = new IncomingMessageEnvelope();
            Map<String, Object> map = new HashMap<>();
            map.put("orderDetails", "Generator " + Thread.currentThread().getId());
            msg.setMessage(map);
            map.put("" + System.currentTimeMillis(), "Thread " + Thread.currentThread().getId() + ": some msg");
            process(msg);
        }
    }

    private static class IncomingMessageEnvelope {
        private Map<String, Object> message;


        public Map<String, Object> getMessage() {
            return message;
        }

        public void setMessage(Map<String, Object> message) {
            this.message = message;
        }
    }


    private static class DistributionStreamWorker {
        volatile AtomicLong counter;
        volatile String batchInfo = null;
        Map<String, List<String>> store;
        String storeKey;

        private DistributionStreamWorker(String storeKey, AtomicLong counter, Map<String, List<String>> store) {
            this.counter = counter;
            this.store = store;
            this.storeKey = storeKey;
        }

        public synchronized void purge(long limit) {
            // TODO 3
            if (limit == -1 || counter.get() >= limit) {
                flush();
            }
        }

        private void flush() {
            List<String> stack = this.store.get(storeKey);
            if (stack == null) {
                return;
            }
            OUTPUT_STREAM.println("Start to flush" + this.store.get(storeKey).size());
            this.store.remove(storeKey);
            counter.set(0l);
            batchInfo = "";
        }

        public synchronized void handleMsg(IncomingMessageEnvelope envelope) {
			// TODO 4
            if (counter.get() >= limits) {
                flush();
            }
            System.out.println("Clean" + Thread.currentThread().getId());
            Map<String, Object> edit = envelope.getMessage();
            List<String> arr = this.store.get(storeKey);
            if (arr == null) {
                arr = new ArrayList<>();
                this.store.put(storeKey, arr);
            }
            String msg = edit.get("orderDetails").toString();
            long count = this.counter.get();
            if (count > limits) {
                System.err.println(count + "Out of limists!!!!");
                System.exit(0);
            }
            System.out.println("Total :" + count + "Current Consumer ===" + Thread.currentThread().getId() +
                    "+++++++++++" + msg);
            arr.add(msg);
            // Update Batch here if possible
            batchInfo += 1;
            this.counter.getAndIncrement();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        final FileDistributionStreamExchangeTask t = new FileDistributionStreamExchangeTask();
        Thread[] tgroup = new Thread[2];
        for (int i = 0; i < tgroup.length; i++) {
            tgroup[i] = new Thread(t);
            tgroup[i].join();
        }
        for (int i = 0; i < tgroup.length; i++) {
            tgroup[i] = new Thread(t);
            tgroup[i].start();
        }

    }
}