リングbuffer設計

8976 ワード

背景:
生産者は絶えず生産し、消費者は消費を続け、bufferを追加する必要があります.
チェーンテーブルを使えば、絶え間ない申請スペースが必要になり、解放スペースが必要になり、予め申請しても、専門的に管理する必要があり、一定の複雑さをもたらします.
このときring(リングbuffer)を考えることができます.特に生産者の消費者のシーンでは、鍵をかける必要はありません.
主な考えは次のとおりです.
ヘッダhead変数は生産オフセットを表します
テールtail変数は消費オフセットを表します
実装には少なくともhead tailの2つの変数があります
struct ring_buffer
{
unsigned int head;
unsigned int tail;
}
 
头と尾が等しくて空を表す!!        r.head == r.tail
 
头尾差1表示buffer満!!   
ここでは変数を導入する必要があります.mask、定義は次のようになります.
struct ring_buffer
{
unsigned int head;
unsigned int tail;
unsigned int mask;
}
ここでmaskはcount-1である.ちょうど最大下付き文字に対応します.
r.tail==(r.head+1)&maskという言葉は、headがtailの後ろにあることを意味し、満席を表し、&maskはmaskで割ったのと同じです.
maskには制限があり、2のn次方-1でなければならない.具体的な実装方式はマクロ制御である
#define POWEROF2(x) (0 == (((x) - 1) & (x)))
個数に制限がなければmask=2,tail=3 head=2では成立しない条件もある.
 
公式(出典不明)の実装を添付します.
//lock-free single producer and single consumer queue
#ifndef RING_H
#define RING_H

#include "common.h"
#include 

typedef struct ring
{
    unsigned int size;
    unsigned int mask;
    volatile unsigned int head;
    volatile unsigned int tail;
    void* volatile ring[0] __cache_aligned;
} ring_t;

static inline ring_t* ring_create(unsigned int count)
{
    if (! POWEROF2(count))
    {
        return NULL;
    }

    size_t ring_size = count * sizeof(void*) + sizeof(ring_t);
    ring_t* r = malloc(ring_size);
    if (NULL == r)
     {
        return NULL;
    }

    memset(r, 0, ring_size);
    r->size = count;
    r->mask = count - 1;
    return r;
}

static inline void ring_destroy(ring_t* r)
{
    if (NULL != r)
    {
        free(r);
    }
}

static inline unsigned int ring_len(ring_t* r)
{
    if (r->tail <= r->head)
    {
        return r->head - r->tail;
    }

    return r->size - r->tail + r->head;
}

static inline bool ring_full(ring_t* r)
{
    return r->tail == ((r->head + 1) & r->mask);
}

static inline bool ring_empty(ring_t* r)
{
    return r->head == r->tail;
}

static inline void* ring_peek(ring_t* r)
{
    if (ring_empty(r))
    {
        return NULL;
    }

    return r->ring[r->tail];
}

static inline bool ring_enqueue(ring_t* r, void* data)
{
    if (ring_full(r))
    {
        return false;
    }

    r->ring[r->head] = data;
    smp_wmb();
    r->head = (r->head + 1) & r->mask;
    return true;
}

static inline void* ring_dequeue(ring_t* r)
{
    if (ring_empty(r))
    {
        return NULL;
    }

    void* data = r->ring[r->tail];
    smp_rmb();
    r->tail = (r->tail + 1) & r->mask;
    return data;
}

#endif //RING_H
#ifndef COMMON_H
#define COMMON_H

#include 
#include 
#include 
#include 
#include 
#include 
#include 


#define SEC2USEC(sec) ((sec) * 1000000)

#define POWEROF2(x) (0 == (((x) - 1) & (x)))
#define CACHE_LINE_SIZE 64
#define __cache_aligned __attribute__((__aligned__(CACHE_LINE_SIZE)))
#define CACHE_LINE_MASK (CACHE_LINE_SIZE - 1)
#define CACHE_LINE_ROUNDUP(size) \
    (CACHE_LINE_SIZE * ((size + CACHE_LINE_SIZE - 1)/CACHE_LINE_SIZE))

#define smp_rmb() asm volatile("lfence":::"memory")
#define smp_mb()  asm volatile("mfence":::"memory")
#define smp_wmb() asm volatile("sfence":::"memory")
#define compile_barrier() asm volatile("":::"memory")

#define __LOCAL(var, line) __##var##line
#define _LOCAL(var, line) __LOCAL(var, line)
#define LOCAL(var) _LOCAL(var, __LINE__)

#define container_of(ptr, type, member) ({ \
            const typeof(((type*)0)->member) * __mptr = (ptr); \
            (type*)((char*)__mptr - offsetof(type, member)); \
        })

#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)

#define atomic_compare_and_swap __sync_bool_compare_and_swap
#define atomic_add __sync_fetch_and_add
#define atomic_sub __sync_fetch_and_sub

#define UNUSED(x) (void)(x)
#define DIV_ROUND_UP(n, d) (((n) + (d) - ) / (d))

/* // coulde use ntohl,ntohs,htons,htonl
#if __BYTE_ORDER == __LITTLE_ENDIAN
#define cpu_net_order_swap_16(x) ((uint16_t)((((x) & 0x00ffU) << 8) | (((x) & 0xff00U) >> 8)))
#define cpu_net_order_swap_32(x) \
        ((((x) & 0x000000ffUL) << 24) | \
		(((x) & 0x0000ff00UL) << 8) | \
		(((x) & 0x00ff0000UL) >> 8) | \
		(((x) & 0xff000000UL) >> 24))
#else // big endian
#define cpu_net_order_swap_16(x) (x)
#define cpu_net_order_swap_32(x) (x)
#endif
*/

static inline char*
strnstr(char* dst, unsigned long dlen, const char* src, unsigned long slen)
{
    if (unlikely(0 == slen))
    {
        return dst;
    }

    while (dlen >= slen)
    {
        if (*dst++ == *src)
        {
            if (0 == memcmp(dst, src + 1, slen - 1))
            {
                return --dst;
            }
        }

        --dlen;
    }

    return NULL;
}

static inline char* strnchr(char* dst, unsigned long dlen, char c)
{
    while (0 != dlen--)
    {
        if (*dst++ == c)
        {
            return --dst;
        }
    }
    return NULL;
}

static inline unsigned long long int timeval_to_us(const struct timeval* pt)
{
    return SEC2USEC(pt->tv_sec) + pt->tv_usec;
}

static inline unsigned long long int get_time_us()
{
    struct timeval tv;
    if (unlikely(-1 == gettimeofday(&tv, NULL)))
    {
        return 0;
    }
    
    return timeval_to_us(&tv);
}

// , 
#define STR_TOTAL_THREAD_COUNT "total_thread_count_xxx"

typedef void (*process_string_packet_t)(uint8_t*, uint32_t, unsigned int, unsigned long long, unsigned long long);


#endif //COMMON_H

 
政府の実現には、消費がなければ挿入されず、新生産は廃棄されるという問題がある.それではこのような情况があるかもしれなくて、消费の遅くなって、しかしやはり最新の挿入を后の消费に使うことができることを望んで、私个人は1版を実现して、この机能を持って、バグがあるかもしれなくて、伝言を残してください.
#ifndef RING_BUFFER_H
#define RING_BUFFER_H


#include "stdint.h"
#include "sys/types.h"



typedef struct dmtp_ring_buffer_entry_s
{
 uint8_t	 *data;
 size_t      len;
 size_t      sent;
}dmtp_ring_buffer_entry_t;

typedef struct dmtp_ring_buffer_s
{
 dmtp_ring_buffer_entry_t  *dmtp_ring_buffer_entry_ptr;
 long read_pos;
 long write_pos;
 int slot;
}dmtp_ring_buffer_t;


int dmtp_ring_buffer_init();
int dmtp_ring_buffer_push(uint8_t* data,size_t len);
int dmtp_ring_buffer_read(dmtp_ring_buffer_entry_t** entry);
int dmtp_ring_buffer_pop();

#endif
#include "ring_buffer.h"
#include "macro.h"


dmtp_ring_buffer_t dmtp_ring_buffer;



int dmtp_ring_buffer_init()
{
  dmtp_ring_buffer.read_pos = 0;
  dmtp_ring_buffer.write_pos = 0;
  dmtp_ring_buffer.slot = dmtp_get_rb_slot();
  dmtp_ring_buffer.dmtp_ring_buffer_entry_ptr = malloc(sizeof(dmtp_ring_buffer_entry_t) * (dmtp_ring_buffer.slot) );
  memset(dmtp_ring_buffer.dmtp_ring_buffer_entry_ptr,0,sizeof(dmtp_ring_buffer_entry_t) * (dmtp_ring_buffer.slot) );
  if(NULL == dmtp_ring_buffer.dmtp_ring_buffer_entry_ptr)
  {
	  return RING_BUFFER_INIT_FAILED;
  }
  return RING_BUFFER_OK;
}

int dmtp_ring_buffer_push(uint8_t* data,size_t len)
{
  int config_slot = dmtp_get_rb_slot();
  int temp = dmtp_ring_buffer.write_pos;
  int write_slot = (dmtp_ring_buffer.write_pos)%config_slot;
  //int read_slot = (dmtp_ring_buffer.read_pos)%config_slot;
/*  if( (dmtp_ring_buffer.write_pos != 0) && (write_slot == read_slot) )  
  {
	  return RING_BUFFER_PUSH_FAILED;
  }
  */
  (dmtp_ring_buffer.dmtp_ring_buffer_entry_ptr + write_slot)->data = data;
  (dmtp_ring_buffer.dmtp_ring_buffer_entry_ptr + write_slot)->len = len;
  
  temp++;
  dmtp_ring_buffer.write_pos = temp;
  return RING_BUFFER_OK;
  
}
int dmtp_ring_buffer_read(dmtp_ring_buffer_entry_t** entry)
{
  int config_slot = dmtp_get_rb_slot();
  int read_slot = (dmtp_ring_buffer.read_pos)%config_slot;
  if(dmtp_ring_buffer.read_pos == dmtp_ring_buffer.write_pos) 
  {
	  return RING_BUFFER_READ_NOT_ENOUGH;
  }
  *entry = dmtp_ring_buffer.dmtp_ring_buffer_entry_ptr+read_slot;
  if( ((*entry)->data == NULL) || ((*entry)->len == 0) )
  {
	  return RING_BUFFER_READ_CONTENT_ERROR;
  }
  
  return RING_BUFFER_OK; 
}

int dmtp_ring_buffer_pop()
{
  int config_slot = dmtp_get_rb_slot();
  int ret = RING_BUFFER_OK;
  int temp = dmtp_ring_buffer.read_pos;
  int read_slot = (dmtp_ring_buffer.read_pos)%config_slot;
  uint8_t* data = (dmtp_ring_buffer.dmtp_ring_buffer_entry_ptr+read_slot)->data;
  if(!data)
  {
	  ret=RING_BUFFER_POP_ERROR_DATA_NULL;
  }
  (dmtp_ring_buffer.dmtp_ring_buffer_entry_ptr+read_slot)->len = 0;
  (dmtp_ring_buffer.dmtp_ring_buffer_entry_ptr+read_slot)->sent = 0;
  temp++;
  dmtp_ring_buffer.read_pos = temp;
  return ret; 
}