2013-スレッド同期の使用


実際のプロジェクト開発では、効率を向上させるためにプログラムの同時進行を制御するためにスレッドクラスを使用する必要がある場合が多いが、スレッドセキュリティの問題にも関連しています.
簡単なスレッド制御Executorsクラスを使用して簡単に実装できます.具体的には、
記事1:Java ScheduledThreadPoolExecutorタスクの遅延または周期的な実行
記事2:ExecutorsとThreadPoolExecutor 2を使用してキューを使用
記事3:ForkJoin-タスク分割マージプロセッサJDK 7の新機能は、大きなタスクを分割して処理し、結果をマージすることができます.
  
しかし、いくつかのビジネスに関連するスレッド制御は依然として伝統的な方法で実現する必要があります.以下のコードの背景:大量の要求アクセスがあり、要求パラメータが多く重複し、キャッシュメカニズムに参加し、スレッドの安全を保証します.
サービスクラスとWorkerクラスで実装します.具体的なコードは次のとおりです.
  
package itour.cn.fare.gateway.service;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ObeQueryServiceImpl implements ObeQueryService {

	private static Logger log = LoggerFactory.getLogger(ObeQueryServiceImpl.class);
	private static ObeQueryServiceImpl service = new ObeQueryServiceImpl();
	private Map<String, Integer> avhTask = new HashMap<String, Integer>();//  , 
	private Map<String, Object> avhLock = new HashMap<String, Object>();//  , 


	private void getAvh(String key, String org, String dis, String date, String airCompany) {

		synchronized (this) {
			if (avhLock.get(key) == null) {//  , 

				avhLock.put(key, new byte[0]);

				log.debug(String.format("AV create lock:%s  ", key));
			}
		}

		synchronized (avhLock.get(key)) {

			//  , 
			if (avhTask.get(key) == null) {

				avhTask.put(key, new Integer(1));

				ObeAvhWorker worker = new ObeAvhWorker(key, org, dis, date, airCompany);

				log.debug(String.format("AV[%s]  AvWorker Start ", key));
				worker.start();

			}

			try {

				log.debug(String.format("AV[%s] Waitting AvWorker ", key));

				avhLock.get(key).wait();//  run 

			} catch (InterruptedException e) {
				log.error(e.getMessage(), e);
			}

			log.debug(String.format("AV[%s] AvWorker Notified", key));

		}
	}


	public Map<String, Integer> getAvhTask() {
		return avhTask;
	}

	public Map<String, Object> getAvhLock() {
		return avhLock;
	}

}

package itour.cn.fare.gateway; import itour.cn.fare.gateway.service.ObeQueryServiceImpl; import java.io.Serializable; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.travelsky.sbeclient.obe.exceptions.ObeException; import com.travelsky.sbeclient.obe.response.AVResponse; import com.travelsky.sbeclient.obe.response.AvItem; public class ObeAvhWorker extends Thread { private static Logger log = LoggerFactory.getLogger(ObeAvhWorker.class); private ObeQueryServiceImpl service = ObeQueryServiceImpl.getInstance(); private String key; private String org; private String dis; private String date; private String airCompany; public ObeAvhWorker(String key, String org, String dis, String date, String airCompany) { this.key = key; this.org = org; this.dis = dis; this.date = date; this.airCompany = airCompany; } @Override public void run() { log.debug("ObeAvhWorker Thread Start for " + key); try { worker(); }catch (Exception e) { log.error(e.getMessage(), e); } finally { synchronized (service.getAvhLock().get(key)) { service.getAvhLock().get(key).notifyAll(); } service.getAvhTask().remove(key); } } private void worker() throws Exception { ObeSession session = new ObeSession(); AVResponse avr = session.avh(org, dis, date, airCompany); List<AvItem> avItems = avr.getAvItems(); //キャッシュに入れる if (avItems != null && avItems.size() > 0) { if (log.isDebugEnabled()) log.info(String.format("Avh[%s] put into cachePool total %d", key, avItems.size())); CachePoolManager.getInstance().putAvCV(key, (Serializable) avItems); } } } ワークベンチでタスクを実行したときは、ロックを解除し、タスクテーブルのタスクを削除することを忘れないでください。