memcacheQueueキュー


add('1asdf');
$obj->getQueueLength();
$obj->read(11);
$obj->get(8);
*/
class memcacheQueue{
public static   $client;            //memcache     
public          $access;            //       
private         $currentSide;       //        :A/B
private         $lastSide;          //        :A/B
private         $sideAHead;         //A    
private         $sideATail;         //A    
private         $sideBHead;         //B    
private         $sideBTail;         //B    
private         $currentHead;       //     
private         $currentTail;       //     
private         $lastHead;          //     
private         $lastTail;          //     
private         $expire;            //    , ,1~2592000, 30  ;0     
private         $sleepTime;         //      ,  
private         $queueName;         //    ,   
private         $retryNum;          //    ,= 10 *      
const   MAXNUM      = 10;                 //(  )     ,    10K
const   HEAD_KEY    = '_lkkQueueHead_';     //   key
const   TAIL_KEY    = '_lkkQueueTail_';     //   key
const   VALU_KEY    = '_lkkQueueValu_';     //   key
const   LOCK_KEY    = '_lkkQueueLock_';     //   key
const   SIDE_KEY    = '_lkkQueueSide_';     //   key
/*
*     
* @param   [config]    array   memcache     
* @param   [queueName] string      
* @param   [expire]    string      
* @return  NULL
*/
public function __construct($queueName ='',$expire='',$config =''){
if(empty($config)){
self::$client = memcache_pconnect('localhost',11211);
}elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211')
self::$client = memcache_pconnect($config['host'],$config['port']);
}elseif(is_string($config)){//"127.0.0.1:11211"
$tmp = explode(':',$config);
$conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';
$conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';
self::$client = memcache_pconnect($conf['host'],$conf['port']);
}
if(!self::$client) return false;
ignore_user_abort(TRUE);//       ,      
set_time_limit(0);//          
$this->access = false;
$this->sleepTime = 1000;
$expire = (empty($expire) && $expire!=0) ? 3600 : (int)$expire;
$this->expire = $expire;
$this->queueName = $queueName;
$this->retryNum = 10000;
$side = memcache_add(self::$client, $queueName . self::SIDE_KEY, 'A',false, $expire);
$this->getHeadNTail($queueName);
if(!isset($this->sideAHead) || empty($this->sideAHead)) $this->sideAHead = 0;
if(!isset($this->sideATail) || empty($this->sideATail)) $this->sideATail = 0;
if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
}
/*
*        
* @param   [queueName] string      
* @return  NULL
*/
private function getHeadNTail($queueName){
$this->sideAHead = (int)memcache_get(self::$client, $queueName.'A'. self::HEAD_KEY);
$this->sideATail = (int)memcache_get(self::$client, $queueName.'A'. self::TAIL_KEY);
$this->sideBHead = (int)memcache_get(self::$client, $queueName.'B'. self::HEAD_KEY);
$this->sideBTail = (int)memcache_get(self::$client, $queueName.'B'. self::TAIL_KEY);
}
/*
*           
* @return  string       
*/
public function getCurrentSide(){
$currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
if($currentSide == 'A'){
$this->currentSide = 'A';
$this->lastSide = 'B';
$this->currentHead   = $this->sideAHead;
$this->currentTail   = $this->sideATail;
$this->lastHead      = $this->sideBHead;
$this->lastTail      = $this->sideBTail;
}else{
$this->currentSide = 'B';
$this->lastSide = 'A';
$this->currentHead   = $this->sideBHead;
$this->currentTail   = $this->sideBTail;
$this->lastHead      = $this->sideAHead;
$this->lastTail      = $this->sideATail;
}
return $this->currentSide;
}
/*
*     
* @return boolean
*/
private function getLock(){
if($this->access === false){
while(!memcache_add(self::$client, $this->queueName .self::LOCK_KEY, 1, false, $this->expire) ){
usleep($this->sleepTime);
@$i++;
if($i > $this->retryNum){//    N 
return false;
break;
}
}
return $this->access = true;
}
return false;
}
/*
*     
* @return NULL
*/
private function unLock(){
memcache_delete(self::$client, $this->queueName .self::LOCK_KEY);
$this->access = false;
}
/*
*     
* @param   [data]       
* @return  boolean
*/
public function add($data){
$result = false;
if(!$this->getLock()){
return $result;
}
$this->getHeadNTail($this->queueName);
$this->getCurrentSide();
if($this->isFull()){
$this->unLock();
return false;
}
if($this->currentTail < self::MAXNUM){
$value_key = $this->queueName .$this->currentSide . self::VALU_KEY . $this->currentTail;
if(memcache_add(self::$client, $value_key, $data, false, $this->expire)){
$this->changeTail();
$result = true;
}
}else{//      ,     
$this->unLock();
$this->changeCurrentSide();
return $this->add($data);
}
$this->unLock();
return $result;
}
/*
*     
* @param   [length]    int      
* @return  array
*/
public function get($length=0){
if(!is_numeric($length)) return false;
if(empty($length)) $length = self::MAXNUM * 2;//      
if(!$this->getLock()) return false;
if($this->isEmpty()){
$this->unLock();
return false;
}
$keyArray   = $this->getKeyArray($length);
$lastKey    = $keyArray['lastKey'];
$currentKey = $keyArray['currentKey'];
$keys       = $keyArray['keys'];
$this->changeHead($this->lastSide,$lastKey);
$this->changeHead($this->currentSide,$currentKey);
$data   = @memcache_get(self::$client, $keys);
foreach($keys as $v){//      
@memcache_delete(self::$client, $v, 0);
}
$this->unLock();
return $data;
}
/*
*     
* @param   [length]    int      
* @return  array
*/
public function read($length=0){
if(!is_numeric($length)) return false;
if(empty($length)) $length = self::MAXNUM * 2;//      
$keyArray   = $this->getKeyArray($length);
$data   = @memcache_get(self::$client, $keyArray['keys']);
return $data;
}
/*
*          key  
* @param   [length]    int     
* @return  array
*/
private function getKeyArray($length){
$result = array('keys'=>array(),'lastKey'=>array(),'currentKey'=>array());
$this->getHeadNTail($this->queueName);
$this->getCurrentSide();
if(empty($length)) return $result;
//      key
$i = $result['lastKey'] = 0;
for($i=0;$ilastHead + $i;
if($result['lastKey'] >= $this->lastTail) break;
$result['keys'][] = $this->queueName .$this->lastSide . self::VALU_KEY . $result['lastKey'];
}
//      key
$j = $length - $i;
$k = $result['currentKey'] = 0;
for($k=0;$kcurrentHead + $k;
if($result['currentKey'] >= $this->currentTail) break;
$result['keys'][] = $this->queueName .$this->currentSide . self::VALU_KEY . $result['currentKey'];
}
return $result;
}
/*
*             
* @return  NULL
*/
private function changeTail(){
$tail_key = $this->queueName .$this->currentSide . self::TAIL_KEY;
memcache_add(self::$client, $tail_key, 0,false, $this->expire);//    ,   ;  false;
//memcache_increment(self::$client, $tail_key, 1);//   +1
$v = memcache_get(self::$client, $tail_key) +1;
memcache_set(self::$client, $tail_key,$v,false,$this->expire);
}
/*
*        
* @param   [side]      string       
* @param   [headValue] int          
* @return  NULL
*/
private function changeHead($side,$headValue){
if($headValue < 1) return false;
$head_key = $this->queueName .$side . self::HEAD_KEY;
$tail_key = $this->queueName .$side . self::TAIL_KEY;
$sideTail = memcache_get(self::$client, $tail_key);
if($headValue < $sideTail){
memcache_set(self::$client, $head_key,$headValue+1,false,$this->expire);
}elseif($headValue >= $sideTail){
$this->resetSide($side);
}
}
/*
*      ,         、     0
* @param   [side]  string       
* @return  NULL
*/
private function resetSide($side){
$head_key = $this->queueName .$side . self::HEAD_KEY;
$tail_key = $this->queueName .$side . self::TAIL_KEY;
memcache_set(self::$client, $head_key,0,false,$this->expire);
memcache_set(self::$client, $tail_key,0,false,$this->expire);
}
/*
*          
* @return  string
*/
private function changeCurrentSide(){
$currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
if($currentSide == 'A'){
memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'B',false,$this->expire);
$this->currentSide = 'B';
}else{
memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'A',false,$this->expire);
$this->currentSide = 'A';
}
return $this->currentSide;
}
/*
*           
* @return  boolean
*/
public function isFull(){
$result = false;
if($this->sideATail == self::MAXNUM && $this->sideBTail == self::MAXNUM){
$result = true;
}
return $result;
}
/*
*           
* @return  boolean
*/
public function isEmpty(){
$result = true;
if($this->sideATail > 0 || $this->sideBTail > 0){
$result = false;
}
return $result;
}
/*
*          
*         ,             ,            
* @return  int
*/
public function getQueueLength(){
$this->getHeadNTail($this->queueName);
$this->getCurrentSide();
$sideALength = $this->sideATail - $this->sideAHead;
$sideBLength = $this->sideBTail - $this->sideBHead;
$result = $sideALength + $sideBLength;
return $result;
}
/*
*         ,   HEAD_KEY、TAIL_KEY、SIDE_KEY  key
* @return  boolean
*/
public function clear(){
if(!$this->getLock()) return false;
for($i=0;$i<:maxnum>queueName.'A'. self::VALU_KEY .$i, 0);
@memcache_delete(self::$client, $this->queueName.'B'. self::VALU_KEY .$i, 0);
}
$this->unLock();
$this->resetSide('A');
$this->resetSide('B');
return true;
}
/*
*     memcache    
* @return  NULL
*/
public function memFlush(){
memcache_flush(self::$client);
}
}
?>

 
 
簡単にmemcacheキュー関数を実現し、A/Bブロック切替
getCurrentSide());
//exit;
for($i=0;$i<30;$i++){
$v1=$obj->add('  '."**".$i);
}
//var_dump($v1);
$v2=$obj->getQueueLength();
var_dump($v2);
$v3=$obj->read(30);
var_dump($v3);
$v4=$obj->get(8);
var_dump($v4);
$v5=$obj->read(15);
var_dump($v5);
$obj->clear();
?>

 
結果:
string 'A' (length=1)
int 20
array
'SSSA_lkkQueueValu_0' => string '  **0' (length=7)
'SSSA_lkkQueueValu_1' => string '  **1' (length=7)
'SSSA_lkkQueueValu_2' => string '  **2' (length=7)
'SSSA_lkkQueueValu_3' => string '  **3' (length=7)
'SSSA_lkkQueueValu_4' => string '  **4' (length=7)
'SSSA_lkkQueueValu_5' => string '  **5' (length=7)
'SSSA_lkkQueueValu_6' => string '  **6' (length=7)
'SSSA_lkkQueueValu_7' => string '  **7' (length=7)
'SSSA_lkkQueueValu_8' => string '  **8' (length=7)
'SSSA_lkkQueueValu_9' => string '  **9' (length=7)
'SSSB_lkkQueueValu_0' => string '  **10' (length=8)
'SSSB_lkkQueueValu_1' => string '  **11' (length=8)
'SSSB_lkkQueueValu_2' => string '  **12' (length=8)
'SSSB_lkkQueueValu_3' => string '  **13' (length=8)
'SSSB_lkkQueueValu_4' => string '  **14' (length=8)
'SSSB_lkkQueueValu_5' => string '  **15' (length=8)
'SSSB_lkkQueueValu_6' => string '  **16' (length=8)
'SSSB_lkkQueueValu_7' => string '  **17' (length=8)
'SSSB_lkkQueueValu_8' => string '  **18' (length=8)
'SSSB_lkkQueueValu_9' => string '  **19' (length=8)
array
'SSSA_lkkQueueValu_0' => string '  **0' (length=7)
'SSSA_lkkQueueValu_1' => string '  **1' (length=7)
'SSSA_lkkQueueValu_2' => string '  **2' (length=7)
'SSSA_lkkQueueValu_3' => string '  **3' (length=7)
'SSSA_lkkQueueValu_4' => string '  **4' (length=7)
'SSSA_lkkQueueValu_5' => string '  **5' (length=7)
'SSSA_lkkQueueValu_6' => string '  **6' (length=7)
'SSSA_lkkQueueValu_7' => string '  **7' (length=7)
array
'SSSA_lkkQueueValu_8' => string '  **8' (length=7)
'SSSA_lkkQueueValu_9' => string '  **9' (length=7)
'SSSB_lkkQueueValu_0' => string '  **10' (length=8)
'SSSB_lkkQueueValu_1' => string '  **11' (length=8)
'SSSB_lkkQueueValu_2' => string '  **12' (length=8)
'SSSB_lkkQueueValu_3' => string '  **13' (length=8)
'SSSB_lkkQueueValu_4' => string '  **14' (length=8)
'SSSB_lkkQueueValu_5' => string '  **15' (length=8)
'SSSB_lkkQueueValu_6' => string '  **16' (length=8)
'SSSB_lkkQueueValu_7' => string '  **17' (length=8)
'SSSB_lkkQueueValu_8' => string '  **18' (length=8)
'SSSB_lkkQueueValu_9' => string '  **19' (length=8)