自分で実現したjava lock
ajax webプッシュ機能をするときに問題に遭遇しました.複数のユーザが同時にList、またはMapの同じデータにアクセスしている場合、データを挿入または並べ替えると、同時に問題が発生します.この問題を避けるためにjavaを調べた.util.concurrentの中のいくつかのクラス.ロックオブジェクトを公開していない実装クラスが見つかりました.だから自分で書いてみよう!
インプリメンテーションコード
ロックインタフェース実装
キュー実装
主にLinkedQueueを使えばロックが使えます
基本的な考え方:1つのスレッドがロックを取得し、他のスレッドがロックを取得すると、スレッドが保留され、待機キューにスレッドが挿入されます.スレッドロックが解放されるまで待機キューをチェックし、キューを出ます.スレッドをアクティブ化
インプリメンテーションコード
ロックインタフェース実装
package com.fantasy.framework.util.concurrent;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.fantasy.framework.util.common.DateUtil;
public class ClassLock implements Lock {
private static final Log logger = LogFactory.getLog(ClassLock.class);
private ConditionObject condition = new ConditionObject();
transient Thread owner = null;
/**
* 。 <br/>
* , , , , 。
*/
public void lock() {
if (owner == null || owner == Thread.currentThread()) {
owner = Thread.currentThread();
} else {
try {
condition.await();
} catch (InterruptedException e) {
logger.debug(e);
}
owner = Thread.currentThread();
}
}
/**
* , 。 <br/>
* , , 。 <br/>
*/
public void lockInterruptibly() throws InterruptedException {
if (!Thread.currentThread().isInterrupted()) {
this.lock();
}
}
/**
* <br/>
*
*/
public Condition newCondition() {
return this.condition;
}
/**
* 。 <br/>
* , , true。 <br/>
* , false。
*/
public boolean tryLock() {
if (owner == null || owner == Thread.currentThread()) {
owner = Thread.currentThread();
return true;
}
return false;
}
/**
* , ,
*/
public boolean tryLock(long time, TimeUnit unit)throws InterruptedException {
if (owner == null || owner == Thread.currentThread()) {
owner = Thread.currentThread();
return true;
} else {
if (condition.await(time, unit)) {
owner = Thread.currentThread();
return true;
} else {
return false;
}
}
}
/**
*
*/
public void unlock() {
if (this.owner == Thread.currentThread()){
this.owner = null;
this.condition.signal();
}
}
public class ConditionObject implements Condition {
private BlockingQueue<Thread> threadQueues = new LinkedBlockingQueue<Thread>();
/**
* 。
*/
public void await() throws InterruptedException {
if(!this.threadQueues.contains(Thread.currentThread()) && owner != Thread.currentThread())
this.threadQueues.offer(Thread.currentThread());
try {
while (true)
awaitNanos(TimeUnit.SECONDS.toNanos(60));
} catch (InterruptedException e) {
logger.error(e);
}
}
/**
* 、 。
*/
public boolean await(long time, TimeUnit unit)throws InterruptedException {
return awaitNanos(unit.toNanos(time)) > 0;
}
public long awaitNanos(long nanosTimeout) throws InterruptedException {
Thread current = Thread.currentThread();
if (Thread.interrupted())
throw new InterruptedException();
if(!this.threadQueues.contains(current) && owner != Thread.currentThread())
this.threadQueues.offer(current);
long start = System.currentTimeMillis();
try {
TimeUnit.NANOSECONDS.sleep(nanosTimeout);
} catch (InterruptedException e) {
logger.error(e);
}
long end = System.currentTimeMillis();
return TimeUnit.MILLISECONDS.toNanos(TimeUnit.NANOSECONDS.toMillis(nanosTimeout) - (end - start));
}
public boolean awaitUntil(Date deadline) throws InterruptedException {
return awaitNanos(DateUtil.interval(deadline, new Date(),Calendar.MILLISECOND)) <= 0 ? false : true;
}
/**
* 。
*/
public void awaitUninterruptibly() {
try {
while (true)
awaitNanos(TimeUnit.SECONDS.toNanos(60));
} catch (InterruptedException e) {
logger.debug(e);
}
}
/**
* 。
*/
public void signal() {
if(owner != null){
owner.interrupt();
}else{
Thread thread = threadQueues.poll();
if (thread != null) {
thread.interrupt();
}
}
}
/**
* 。
*/
public void signalAll() {
throw new RuntimeException("signalAll ");
}
}
}
キュー実装
package com.fantasy.framework.util.concurrent;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class LinkedQueue<E> implements BlockingQueue<E> {
private static final Log logger = LogFactory.getLog(LinkedQueue.class);
private LinkedList<E> items = new LinkedList<E>();//
protected final ClassLock takeLock = new ClassLock();//
protected final ClassLock putLock = new ClassLock();//
public void fullyLock() {
putLock.lock();
takeLock.lock();
}
public void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
public E element() {
return items.element();
}
public boolean offer(E o) {
try {
return items.offer(o);
} finally {
if (this.takeLock.owner != null) {
this.takeLock.newCondition().signal();
}
}
}
public E peek() {
return items.peek();
}
public E poll() {
return items.poll();
}
public E remove() {
return items.remove();
}
public boolean add(E o) {
return items.add(o);
}
public boolean addAll(Collection<? extends E> c) {
return items.addAll(c);
}
public void clear() {
items.clear();
}
public boolean contains(Object o) {
return items.contains(o);
}
public boolean containsAll(Collection<?> c) {
return items.containsAll(c);
}
public boolean isEmpty() {
return items.isEmpty();
}
public Iterator<E> iterator() {
return items.iterator();
}
public boolean remove(Object o) {
return items.remove(o);
}
public boolean removeAll(Collection<?> c) {
return items.removeAll(c);
}
public boolean retainAll(Collection<?> c) {
return items.retainAll(c);
}
public int size() {
return items.size();
}
public Object[] toArray() {
return items.toArray();
}
public <T> T[] toArray(T[] a) {
return items.toArray(a);
}
public int drainTo(Collection<? super E> c) {
return 0;
}
public int drainTo(Collection<? super E> c, int maxElements) {
return 0;
}
public boolean offer(E o, long timeout, TimeUnit unit)
throws InterruptedException {
try {
if (this.putLock.tryLock(timeout, unit)) {
this.offer(o);
return true;
}
return false;
} finally {
this.putLock.unlock();
}
}
public E poll(long time, TimeUnit unit) throws InterruptedException {
try {
long start = System.currentTimeMillis();
long timeout = unit.toMillis(time);
if (this.takeLock.tryLock(time,unit)) {
long end = System.currentTimeMillis();
E e = this.poll();
if(e == null){
this.takeLock.newCondition().await((timeout - (end - start)), TimeUnit.MILLISECONDS);
return this.poll();
}
return e;
}
return null;
} finally {
this.takeLock.unlock();
}
}
public static void main(String[] args) throws Exception{
final LinkedQueue<String> queue = new LinkedQueue<String>();
(new Thread(new Runnable() {
public void run() {
final Thread thread = Thread.currentThread();
try {
queue.takeLock.lock();
System.out.println(" takeLock");
Thread.currentThread().sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
queue.takeLock.unlock();
queue.add("123123");
System.out.println(" takeLock");
}
})).start();
long start = System.currentTimeMillis();
queue.poll(10000, TimeUnit.MILLISECONDS);
long end = System.currentTimeMillis();
System.out.println(" >"+(end - start));
}
public void put(E o){
try {
this.putLock.lock();
this.offer(o);
} finally {
this.putLock.unlock();
}
}
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
public E take(){
try {
this.takeLock.lock();
if (this.size() == 0) {
this.takeLock.newCondition().awaitUninterruptibly();
}
return this.poll();
} finally {
this.takeLock.unlock();
}
}
public List<E> toList() {
return this.items;
}
}
主にLinkedQueueを使えばロックが使えます
LinkedQueue<Message> queue = new LinkedQueue<Message>();
try{
queue.fullyLock();
// takeLock putLock
}finally{
queue.fullyUnlock();
}
基本的な考え方:1つのスレッドがロックを取得し、他のスレッドがロックを取得すると、スレッドが保留され、待機キューにスレッドが挿入されます.スレッドロックが解放されるまで待機キューをチェックし、キューを出ます.スレッドをアクティブ化