簡単なロックなしキューの実現
Dispruptorはメモリがロックされていません。バッファとしてリング配列に基づいています。詳細はDispruptor-1.0をご参照ください。
以下は自分で設計した簡易版です。現在は衝突やエラーが発見されていません。みんなで一緒にテストしてもいいです。
以下は自分で設計した簡易版です。現在は衝突やエラーが発見されていません。みんなで一緒にテストしてもいいです。
package tianshui.lockfree.queue;
import java.io.Serializable;
import java.lang.reflect.Array;
import java.util.concurrent.atomic.AtomicInteger;
/**
* queue length, 2^16
* @author
* @date 2013-4-9 11:42:10
*/
public class RingBuffer implements Serializable {
/**
*
*/
private static final long serialVersionUID = 6976960108708949038L;
private volatile AtomicInteger head;
private volatile AtomicInteger tail;
private int length;
final T EMPTY = null;
private volatile T[] queue;
public RingBuffer(Class type, int length){
this.head = new AtomicInteger(0);
this.tail = new AtomicInteger(0);
this.length = length == 0 ? 2 << 16 : length; // 2^16
this.queue = (T[]) Array.newInstance(type, this.length);
}
public void enQueue(T t){
if(t == null) t= (T) new Object();
// --
while(this.getTail() - this.getHead() >= this.length);
int ctail = this.tail.getAndIncrement();
while(this.queue[this.getTail(ctail)] != EMPTY); //
this.queue[this.getTail(ctail)] = t;
}
public T deQueue(){
T t = null;
// --
while(this.head.get() >= this.tail.get());
int chead = this.head.getAndIncrement();
while(this.queue[this.getHead(chead)] == EMPTY); //
t = this.queue[this.getHead(chead)];
this.queue[this.getHead(chead)] = EMPTY;
return t;
}
public int getHead(int index){
return index & (this.length - 1);
}
public int getTail(int index) {
return index & (this.length - 1);
}
public int getHead() {
return head.get() & (this.length - 1);
}
public int getTail() {
return tail.get() & (this.length - 1);
}
public T[] getQueue() {
return queue;
}
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
}
以下はテストコードです。package tianshui.lockfree.queue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author
* @date 2013-4-9 04:13:29
*/
public class TestBuffer {
public static AtomicInteger index = new AtomicInteger(0);
public static void main(String[] args){
int tCount = 10; // thread count
int length = 0; // buffer length -> 2^16
final RingBuffer buffer = new RingBuffer(Integer.class, length);
// provider
Runnable pr = new Runnable(){
@Override
public void run() {
while(true){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
int tindex = index.getAndIncrement();
buffer.enQueue(tindex);
System.out.println("buffer enQueue: " + tindex);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
// consumer
Runnable cr = new Runnable(){
@Override
public void run() {
while(true){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
Integer cindex = buffer.deQueue();
System.out.println("buffer deQueue: " + cindex);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
for(int i=0; i