[トップ][caffe]ソース分析

105671 ワード

http://blog.csdn.net/chenriwei2/article/details/46368707
http://blog.csdn.net/qq_16055159/articategory/310775
http://blog.luoyetx.com/2015/10/reading-caffe-2/
caffe
1)、i.using namespace caffe;i i.class Caffe{inline static Set Device(int i)}Caffe::Set Device(gpus[0])iii.package caffe;message Datum{…}caffe:Datum datatest;
2)blob(http://blog.csdn.net/iamzhangzhuping/article/details/50445570)なぜdata copyが必要なところがありますか?gpudata and.cpu_data are used in cases were the data is used only as input and will not be modified by the algorithm.mutble_*is used when the data itself gets udated while running the algorithm.その次に、(1)データBlobに対する2回の操作が同じプロセッサを採用しているかどうか、(2)前の操作がデータBlob Whenever the datais caledを更新する可能性があるかどうかに注目したいです。Fnction call and that too using the same processor.If it is using the same processor,data need not be copied.If it is the processor,there is a change the data might have bered.premute.ine.ine。call and hence a data copy is required.
3)src/caffe/test下のファイルは、各layerの前传逆传が正しいかどうかをテストするために、make test make runtst
4)データストアparallel.hpp
class Params {
  const size_t size_;           // Size of buffers
  Dtype* data_;                 // Network parameters
  Dtype* diff_;                 // Gradient
};

class P2PSync : public GPUParams<Dtype>, public Solver<Dtype>::Callback,
    public InternalThread {
 public:
  void run(const vector<int>& gpus);
 protected:
  shared_ptr<Solver<Dtype> > solver_;

  using Params<Dtype>::size_;
  using Params<Dtype>::data_;
  using Params<Dtype>::diff_;
};
blob.hpp
class Blob {
  shared_ptr<SyncedMemory> data_;
  shared_ptr<SyncedMemory> diff_;
};
layer.hpp
protected:
  /** The vector that stores the learnable parameters as a set of blobs. */
  //         ,                      blob  
  vector<shared_ptr<Blob<Dtype> > > blobs_;
void SetUp(const vector<Blob<Dtype>*>& bottom,
      const vector<Blob<Dtype>*>& top)
net.hpp
  /// @brief the blobs storing intermediate results between the layer.
  vector<shared_ptr<Blob<Dtype> > > blobs_;
  //    net init   ,AppendTop  
  /*shared_ptr<Blob<Dtype> > blob_pointer(new Blob<Dtype>()); const int blob_id = blobs_.size(); blobs_.push_back(blob_pointer);*/
caffe.proto(./src/caffe/proto)[google protobuf]
コンパイルしたファイルはcaffe.pb.c&caffe.pb.hにあります。/build/include/caffe/protoにあります。
message Datum {
  optional int32 channels = 1;
  optional int32 height = 2;  
  optional int32 label = 5;   
  repeated float float_data = 6;  
  optional bool encoded = 7 [default = false];
  }
メッセージで定義されている各フィールドには一意の数字ラベルがあります。このラベルの役割は、フィールドをバイナリファイルで一意に表示し、そのフィールドの値を定義すると変更できなくなります。一つのポイントを強調する必要があります。1~15の数字ラベルは符号化後、1バイトのみを占めます。数字ラベルとフィールドタイプが含まれます。16~2047の数字ラベルは2バイトを占めています。したがって、1~15の数字ラベルは最も頻繁に出現する要素に用いるべきである。一度に1~15のラベルを使い切らないように設計すると、将来も頻繁に出現する可能性が考えられます。
相関関数
caffe:Datum testdata;
testdata.set_chanels();
testdata.has_chanels();
testdata.clear_chanels()
testdata.chanels()or testdata.mutblechanels()testdata.float_data_size();
メイン関数
./tool/caffe.cpp
//         
//gflags->http://dreamrunner.org/blog/2014/03/09/gflags-jian-ming-shi-yong/
DEFINE_string(gpu, "", "Optional; run in GPU mode on given device IDs separated by ','." "Use '-gpu all' to run on all available GPUs. The effective training " "batch size is multiplied by the number of devices.");
DEFINE_string(solver, "", "The solver definition protocol buffer text file.");
DEFINE_string(model, "", "The model definition protocol buffer text file..");
DEFINE_string(snapshot, "", "Optional; the snapshot solver state to resume training.");
DEFINE_string(weights, "", "Optional; the pretrained weights to initialize finetuning, " "separated by ','. Cannot be set simultaneously with snapshot.");
DEFINE_int32(iterations, 50, "The number of iterations to run.");
DEFINE_string(sigint_effect, "stop", "Optional; action to take when a SIGINT signal is received: " "snapshot, stop or none.");
DEFINE_string(sighup_effect, "snapshot", "Optional; action to take when a SIGHUP signal is received: " "snapshot, stop or none.");      
int main(int argc, char** argv) {
  // Print output to stderr (while still logging).
  FLAGS_alsologtostderr = 1;
  // Usage message.
  gflags::SetUsageMessage("command line brew
"
"usage: caffe <command> <args>

"
"commands:
"
" train train or finetune a model
"
" test score a model
"
" device_query show GPU diagnostic information
"
" time benchmark model execution time"); // Run tool or show usage. caffe::GlobalInit(&argc, &argv);// common.cpp if (argc == 2) { return GetBrewFunction(caffe::string(argv[1]))(); } else { gflags::ShowUsageWithFlagsRestrict(argv[0], "tools/caffe"); } }
int train() {

  Caffe::set_solver_count(gpus.size());

  shared_ptr<caffe::Solver<float> > solver(caffe::SolverRegistry<float>::CreateSolver(solver_param));
  //explicit Solver(const SolverParameter& param, const Solver* root_solver = NULL);
  //explicit Solver(const string& param_file, const Solver* root_solver = NULL);
  //createSolver()           sgd
  //boost::shared_ptr   
  //   solver


  if (FLAGS_snapshot.size()) {    
  LOG(INFO) << "Resuming from " << FLAGS_snapshot;    
  solver->Restore(FLAGS_snapshot.c_str());  
  } else if (FLAGS_weights.size()) {
      CopyLayers(solver.get(), FLAGS_weights);
  }
  if (gpus.size() > 1) { 
     caffe::P2PSync<float> sync(solver, NULL, solver->param());   
     sync.run(gpus);  
  } else {  
    LOG(INFO) << "Starting Optimization";  
    solver->Solve();  
  }  
  LOG(INFO) << "Optimization Done.";
  return 0;

}
Update
Step()関数では、反復ごとにApplyUpdate()(class SGDSolver)->Update()(class net)を呼び出します。
template <typename Dtype>
void Net<Dtype>::Update() {
  for (int i = 0; i < learnable_params_.size(); ++i) {
    learnable_params_[i]->Update();
  }
}
->Update()(class blob)
ソロ.cpp
./src/caffe/sover.cpp
void Solver<Dtype>::Init(const SolverParameter& param) {  
CHECK(Caffe::root_solver() || root_solver_)      
<< "root_solver_ needs to be set for all non-root solvers";...}

//Caffe::root_solver()  http://blog.csdn.net/apsvvfb/article/details/50542863

//root_solver() { returnGet().root_solver_; } 
//->Get(){ thread_instance_.reset(new Caffe())}

void Solver<Dtype>::InitTrainNet() {
  if (Caffe::root_solver()) {  
  net_.reset(new Net<Dtype>(net_param));
  //net.hpp:Net(const NetParameter& param, const Net* root_net = NULL); 
// shared_ptr<Net<Dtype> >net_;
//shared_ptr reset()         1,        ,       0,          。    reset()            ,        1             。 
  } else {    
  net_.reset(new Net<Dtype>(net_param, root_solver_->net_.get()));
  }  
}
template <typename Dtype>
void Solver<Dtype>::Step(int iters) {
  vector<Blob<Dtype>*> bottom_vec;
  const int start_iter = iter_;
  const int stop_iter = iter_ + iters;
  int average_loss = this->param_.average_loss();
  vector<Dtype> losses;
  Dtype smoothed_loss = 0;

  while (iter_ < stop_iter) {
    // zero-init the params
    net_->ClearParamDiffs();
    if (param_.test_interval() && iter_ % param_.test_interval() == 0
        && (iter_ > 0 || param_.test_initialization())
        && Caffe::root_solver()) {
      TestAll();
      if (requested_early_exit_) {
        // Break out of the while loop because stop was requested while testing.
        break;
      }
    }

    for (int i = 0; i < callbacks_.size(); ++i) {
      callbacks_[i]->on_start();
    }
    const bool display = param_.display() && iter_ % param_.display() == 0;
    net_->set_debug_info(display && param_.debug_info());
    // accumulate the loss and gradient
    Dtype loss = 0;
    for (int i = 0; i < param_.iter_size(); ++i) {
      loss += net_->ForwardBackward(bottom_vec);
      //     bottom_vec      vector,      .
      /* net.hpp Dtype ForwardBackward(const vector<Blob<Dtype>* > & bottom) { Dtype loss; Forward(bottom, &loss);//   net.cpp    Backward(); return loss; } */
    }
    loss /= param_.iter_size();
    // average the loss across iterations for smoothed reporting
    if (losses.size() < average_loss) {
      losses.push_back(loss);
      int size = losses.size();
      smoothed_loss = (smoothed_loss * (size - 1) + loss) / size;
    } else {
      int idx = (iter_ - start_iter) % average_loss;
      smoothed_loss += (loss - losses[idx]) / average_loss;
      losses[idx] = loss;
    }  
    for (int i = 0; i < callbacks_.size(); ++i) {
      callbacks_[i]->on_gradients_ready();
    }
    ApplyUpdate();

    // Increment the internal iter_ counter -- its value should always indicate
    // the number of times the weights have been updated.
    ++iter_;    
  }
}
4.parallel.hpp
gpusは4つ設けられています。0,1,2,3です
  const size_t size_;           // Size of buffers
  Dtype* data_;                 // Network parameters
  Dtype* diff_;                 // Gradient
template<typename Dtype>
P2PSync<Dtype>::P2PSync(shared_ptr<Solver<Dtype> > root_solver,
                        P2PSync<Dtype>* parent, const SolverParameter& param)
    : GPUParams<Dtype>(root_solver, param.device_id()),
      parent_(parent),
      children_(),
      queue_(),
      initial_iter_(root_solver->iter()),
      solver_() {
#ifndef CPU_ONLY
  int initial_device;
  CUDA_CHECK(cudaGetDevice(&initial_device));
  const int self = param.device_id();
  CUDA_CHECK(cudaSetDevice(self));

  if (parent == NULL) {
    solver_ = root_solver;
  } else {
    Caffe::set_root_solver(false);
    solver_.reset(new WorkerSolver<Dtype>(param, root_solver.get()));
    Caffe::set_root_solver(true);
  }
  this->configure(solver_.get());
  solver_->add_callback(this);

  if (parent) {
    // Enable p2p access between devices
    const int peer = parent->solver_->param().device_id();
    int access;
    CUDA_CHECK(cudaDeviceCanAccessPeer(&access, self, peer));
    if (access) {
      CUDA_CHECK(cudaDeviceEnablePeerAccess(peer, 0));
    } else {
      LOG(INFO)<< "GPU " << self << " does not have p2p access to GPU " << peer;
    }
    // Allocate receiving buffer on parent
    CUDA_CHECK(cudaSetDevice(peer));
    CUDA_CHECK(cudaMalloc(&parent_grads_, size_ * sizeof(Dtype)));
    CUDA_CHECK(cudaSetDevice(self));
  }

  CUDA_CHECK(cudaSetDevice(initial_device));
#else
  NO_GPU;
#endif
}
template<typename Dtype>
void P2PSync<Dtype>::on_start() {
#ifndef CPU_ONLY
#ifdef DEBUG
  int device;
  CUDA_CHECK(cudaGetDevice(&device));
  CHECK(device == solver_->param().device_id());
#else
// CHECK(false);
#endif

  // Wait for update from parent
  if (parent_) {
    P2PSync<Dtype> *parent = queue_.pop();
    //   5.blocking_queue.cpp   

    // gpu1,2,3  
    //syncs[i]->StartInternalThread()
    //-》void P2PSync<Dtype>::InternalThreadEntry()
    //-》solver_->Step(solver_->param().max_iter() - initial_iter_);
    //          ,queue_   。       。

    //  gpu0(root)     on_start()  ,        children_[i]->queue_.push(this);         1,2,3。
    //   ,void P2PSync<Dtype>::run      
    //for (int i = 1; i < syncs.size(); ++i) {
    // syncs[i]->StartInternalThread();
    //}
    //solver_->Solve();
    //      :
    //      ,GPU1,2,3      on_start()  ,  queue_  ,   pop      。
    //   GPU0        ,        (      parent),         children_[i]->queue_.push(this);
    //   GPU0,GPU2    ,   GPU3(  GPU3 parent GPU2,   GPU2   children_[i]->queue_.push(this),GPU3    )

    CHECK(parent == parent_);
  }

  // Update children
  for (int i = children_.size() - 1; i >= 0; i--) {
    Dtype* src = data_;
    Dtype* dst = children_[i]->data_;

    // parent   copy child。
    CUDA_CHECK(cudaMemcpyAsync(dst, src, size_ * sizeof(Dtype),
        cudaMemcpyDeviceToDevice, cudaStreamDefault));
    CUDA_CHECK(cudaStreamSynchronize(cudaStreamDefault));
    children_[i]->queue_.push(this);
  }
#endif
}
template<typename Dtype>
void P2PSync<Dtype>::on_gradients_ready() {
#ifndef CPU_ONLY
#ifdef DEBUG
  int device;
  CUDA_CHECK(cudaGetDevice(&device));
  CHECK(device == solver_->param().device_id());
#endif

  // Sum children gradients as they appear in the queue
  for (int i = 0; i < children_.size(); ++i) {
    P2PSync<Dtype> *child = queue_.pop();
    Dtype* src = child->parent_grads_;
    Dtype* dst = diff_;

#ifdef DEBUG
    bool ok = false;
    for (int j = 0; j < children_.size(); ++j) {
      if (child == children_[j]) {
        ok = true;
      }
    }
    CHECK(ok);
    cudaPointerAttributes attributes;
    CUDA_CHECK(cudaPointerGetAttributes(&attributes, src));
    CHECK(attributes.device == device);
    CUDA_CHECK(cudaPointerGetAttributes(&attributes, dst));
    CHECK(attributes.device == device);
#endif

    caffe_gpu_add(size_, src, dst, dst);//dst=dst+src

  }

  // Send gradients to parent
  if (parent_) {
    Dtype* src = diff_;
    Dtype* dst = parent_grads_;

    CUDA_CHECK(cudaMemcpyAsync(dst, src, size_ * sizeof(Dtype),  //
        cudaMemcpyDeviceToDevice, cudaStreamDefault));
    CUDA_CHECK(cudaStreamSynchronize(cudaStreamDefault));
    parent_->queue_.push(this);
  } else {
    // Loss functions divide gradients by the batch size, so to compensate
    // for split batch, the root solver divides by number of solvers.
    caffe_gpu_scal(size_, Dtype(1.0 / Caffe::solver_count()), diff_);
  }
#endif
}
template<typename Dtype>
void P2PSync<Dtype>::run(const vector<int>& gpus) {
  // Pair devices for map-reduce synchronization
  vector<DevicePair> pairs;//DevicePair[parent,device]
  DevicePair::compute(gpus, &pairs);
  //  4 gpus。0,1,2,3。computer      :
  // [-1,0;0,1;2,3;0,2] <-  [-1,0]   (0    )  ,    [parent,device]
  ostringstream s;
  for (int i = 1; i < pairs.size(); ++i) {
    s << (i == 1 ? "" : ", ") << pairs[i].parent() << ":" << pairs[i].device();
  }
  LOG(INFO)<< "GPUs pairs " << s.str();

  SolverParameter param(solver_->param());
  vector<shared_ptr<P2PSync<Dtype> > > syncs(gpus.size());

  // Build the GPU tree by finding the parent for each solver
  for (int attempts = 0; attempts < pairs.size(); ++attempts) {
    for (int i = 1; i < pairs.size(); ++i) {
      if (!syncs[i].get()) {//     (  1)    ,syncs[i].get()  0。
        P2PSync<Dtype>* parent = NULL;
        for (int j = 0; j < syncs.size(); ++j) {
          P2PSync<Dtype>* sync = j == 0 ? this : syncs[j].get();
          //    run    caffe.cpp int train()   。
          //   this    train      run          sync。
          //caffe::P2PSync<float> sync(solver, NULL, solver->param());
          if (sync) {
            const SolverParameter& p = sync->solver()->param();
            if (p.device_id() == pairs[i].parent()) {
              parent = sync;
            }
          }
        }
        if (parent) {
          param.set_device_id(pairs[i].device());
          syncs[i].reset(new P2PSync<Dtype>(solver_, parent, param));//  1
          parent->children_.push_back((P2PSync<Dtype>*) syncs[i].get());
        }
      }
    }
  }

//reset   
//syncs[1]:GPU1, its parent is "this"
//syncs[3]:GPU2, its child is "syncs[2]"
//syncs[2]:GPU3, its parent is "this"
//syncs[0]     ,        。       root_solver:GPU0。

  LOG(INFO)<< "Starting Optimization";

  for (int i = 1; i < syncs.size(); ++i) {
    syncs[i]->StartInternalThread();
    //   InternalThread  thread_(shared_ptr  )
    //  parallel.cpp  void P2PSync<Dtype>::InternalThreadEntry()
    //    solver_->Step(solver_->param().max_iter() - initial_iter_);
  }

  // Run root solver on current thread
  solver_->Solve();

  for (int i = 1; i < syncs.size(); ++i) {
    syncs[i]->StopInternalThread();
  }
}
5.blocking_queue.cpp
template<typename T>
class BlockingQueue<T>::sync {
 public:
  mutable boost::mutex mutex_;
  boost::condition_variable condition_;
};

template<typename T>
BlockingQueue<T>::BlockingQueue()
    : sync_(new sync()) {
}

template<typename T>
void BlockingQueue<T>::push(const T& t) {
  boost::mutex::scoped_lock lock(sync_->mutex_);
  queue_.push(t);
  lock.unlock();
  //        。
  sync_->condition_.notify_one();
}


template<typename T>
T BlockingQueue<T>::pop(const string& log_on_wait) {
  boost::mutex::scoped_lock lock(sync_->mutex_);

  while (queue_.empty()) {
    if (!log_on_wait.empty()) {
      LOG_EVERY_N(INFO, 1000)<< log_on_wait;
    }
    sync_->condition_.wait(lock);
    //  queue_  ,     。
  }

  //            ,   pop  。
  T t = queue_.front();
  queue_.pop();
  return t;
}
6.net
net.hpp
explicit Net(const NetParameter& param, const Net* root_net = NULL);
explicit Net(const string& param_file, Phase phase,
      const Net* root_net = NULL);
  /// @brief The network name
  string name_;
  /// @brief The phase: TRAIN or TEST
  Phase phase_;
  /// @brief Individual layers in the net
  vector<shared_ptr<Layer<Dtype> > > layers_;
  vector<string> layer_names_;
  map<string, int> layer_names_index_;
  vector<bool> layer_need_backward_;
  /// @brief the blobs storing intermediate results between the layer.
  vector<shared_ptr<Blob<Dtype> > > blobs_;
  vector<string> blob_names_;
  map<string, int> blob_names_index_;
  vector<bool> blob_need_backward_;
  /// bottom_vecs stores the vectors containing the input for each layer.
  /// They don't actually host the blobs (blobs_ does), so we simply store
  /// pointers.
  vector<vector<Blob<Dtype>*> > bottom_vecs_;
  vector<vector<int> > bottom_id_vecs_;
  vector<vector<bool> > bottom_need_backward_;
  /// top_vecs stores the vectors containing the output for each layer
  vector<vector<Blob<Dtype>*> > top_vecs_;
  vector<vector<int> > top_id_vecs_;
  /// Vector of weight in the loss (or objective) function of each net blob,
  /// indexed by blob_id.
  vector<Dtype> blob_loss_weights_;
  vector<vector<int> > param_id_vecs_;
  vector<int> param_owners_;
  vector<string> param_display_names_;
  vector<pair<int, int> > param_layer_indices_;
  map<string, int> param_names_index_;
  /// blob indices for the input and the output of the net
  vector<int> net_input_blob_indices_;//       top blobs
  vector<int> net_output_blob_indices_;
  /*net init    AppenTop(layer_id=-1)//available_blobs->insert(blob_name); for(i=0;i<num_of_layers;i++) { AppendBottom();//available_blobs->erase(blob_name); AppendTop();//available_blobs->insert(blob_name); } available_blobs   blobs     net_output_blob_indices */
  vector<Blob<Dtype>*> net_input_blobs_;
  vector<Blob<Dtype>*> net_output_blobs_;
  /// The parameters in the network.
  vector<shared_ptr<Blob<Dtype> > > params_;
  vector<Blob<Dtype>*> learnable_params_;
  /** * The mapping from params_ -> learnable_params_: we have * learnable_param_ids_.size() == params_.size(), * and learnable_params_[learnable_param_ids_[i]] == params_[i].get() * if and only if params_[i] is an "owner"; otherwise, params_[i] is a sharer * and learnable_params_[learnable_param_ids_[i]] gives its owner. */
  vector<int> learnable_param_ids_;
  /// the learning rate multipliers for learnable_params_
  vector<float> params_lr_;
  vector<bool> has_params_lr_;
  /// the weight decay multipliers for learnable_params_
  vector<float> params_weight_decay_;
  vector<bool> has_params_decay_;
  /// The bytes of memory used by this net
  size_t memory_used_;
  /// Whether to compute and display debug info for the net.
  bool debug_info_;
  /// The root net that actually holds the shared layers in data parallelism
  const Net* const root_net_;
net.cpp
template <typename Dtype>
Net<Dtype>::Net(const NetParameter& param, const Net* root_net)
    : root_net_(root_net) {
  Init(param);
}

template <typename Dtype>
void Net<Dtype>::Init(const NetParameter& in_param) {

  // set the input blobs
  for (int input_id = 0; input_id < param.input_size(); ++input_id) {
    const int layer_id = -1;  // inputs have fake layer ID -1
    AppendTop(param, layer_id, input_id, &available_blobs, &blob_name_to_idx);
  }

  for (int layer_id = 0; layer_id < param.layer_size(); ++layer_id) {    

    if (share_from_root) {
      LOG(INFO) << "Sharing layer " << layer_param.name() << " from root net";
      layers_.push_back(root_net_->layers_[layer_id]);
      layers_[layer_id]->SetShared(true);
    } else {
      layers_.push_back(LayerRegistry<Dtype>::CreateLayer(layer_param));
      //     createlayer!
      //layer_factory.hpp     
    }
    // Figure out this layer's input and output
    for (int bottom_id = 0; bottom_id < layer_param.bottom_size();
         ++bottom_id) {
      const int blob_id = AppendBottom(param, layer_id, bottom_id,
                                       &available_blobs, &blob_name_to_idx);
      // If a blob needs backward, this layer should provide it.
      need_backward |= blob_need_backward_[blob_id];
    }
    int num_top = layer_param.top_size();
    for (int top_id = 0; top_id < num_top; ++top_id) {
      AppendTop(param, layer_id, top_id, &available_blobs, &blob_name_to_idx);
    }
    // If the layer specifies that AutoTopBlobs() -> true and the LayerParameter
    // specified fewer than the required number (as specified by
    // ExactNumTopBlobs() or MinTopBlobs()), allocate them here.
    Layer<Dtype>* layer = layers_[layer_id].get();
    if (layer->AutoTopBlobs()) {
      const int needed_num_top =
          std::max(layer->MinTopBlobs(), layer->ExactNumTopBlobs());
      for (; num_top < needed_num_top; ++num_top) {
        // Add "anonymous" top blobs -- do not modify available_blobs or
        // blob_name_to_idx as we don't want these blobs to be usable as input
        // to other layers.
        AppendTop(param, layer_id, num_top, NULL, NULL);
      }
    }
    // After this layer is connected, set it up.
    if (share_from_root) {
    // .....
    } else {
      layers_[layer_id]->SetUp(bottom_vecs_[layer_id], top_vecs_[layer_id]);
      //     Setup!     layer.hpp   。
    }
    LOG_IF(INFO, Caffe::root_solver())
        << "Setting up " << layer_names_[layer_id];
    for (int top_id = 0; top_id < top_vecs_[layer_id].size(); ++top_id) {
      if (blob_loss_weights_.size() <= top_id_vecs_[layer_id][top_id]) {
        blob_loss_weights_.resize(top_id_vecs_[layer_id][top_id] + 1, Dtype(0));
      }
      blob_loss_weights_[top_id_vecs_[layer_id][top_id]] = layer->loss(top_id);
      LOG_IF(INFO, Caffe::root_solver())
          << "Top shape: " << top_vecs_[layer_id][top_id]->shape_string();
      if (layer->loss(top_id)) {
        LOG_IF(INFO, Caffe::root_solver())
            << " with loss weight " << layer->loss(top_id);
      }
      memory_used_ += top_vecs_[layer_id][top_id]->count();
    }
    LOG_IF(INFO, Caffe::root_solver())
        << "Memory required for data: " << memory_used_ * sizeof(Dtype);
    const int param_size = layer_param.param_size();
    const int num_param_blobs = layers_[layer_id]->blobs().size();
    CHECK_LE(param_size, num_param_blobs)
        << "Too many params specified for layer " << layer_param.name();
    ParamSpec default_param_spec;
    for (int param_id = 0; param_id < num_param_blobs; ++param_id) {
      const ParamSpec* param_spec = (param_id < param_size) ?
          &layer_param.param(param_id) : &default_param_spec;
      const bool param_need_backward = param_spec->lr_mult() != 0;
      need_backward |= param_need_backward;
      layers_[layer_id]->set_param_propagate_down(param_id,
                                                  param_need_backward);
    })
    for (int param_id = 0; param_id < num_param_blobs; ++param_id) {
      AppendParam(param, layer_id, param_id);
    }
    // Finally, set the backward flag
    layer_need_backward_.push_back(need_backward);
    if (need_backward) {
      for (int top_id = 0; top_id < top_id_vecs_[layer_id].size(); ++top_id) {
        blob_need_backward_[top_id_vecs_[layer_id][top_id]] = true;
      }
    }
  }// end of for(int layer_id=0;....

  /*........................... ............................... .............................*/
const vector<Blob<Dtype>*>& Net<Dtype>::Forward(
    const vector<Blob<Dtype>*> & bottom, Dtype* loss) {
  //   solver.cpp  bottom.size() 0
  // Copy bottom to internal bottom
  for (int i = 0; i < bottom.size(); ++i) {
    net_input_blobs_[i]->CopyFrom(*bottom[i]);
  }
  return ForwardPrefilled(loss);
}

//ForwardFromTo(0, layers_.size() - 1);
template <typename Dtype>
Dtype Net<Dtype>::ForwardFromTo(int start, int end) {
  Dtype loss = 0;
  for (int i = start; i <= end; ++i) {
    // LOG(ERROR) << "Forwarding " << layer_names_[i];
    Dtype layer_loss = layers_[i]->Forward(bottom_vecs_[i], top_vecs_[i]);
    loss += layer_loss;
    if (debug_info_) { ForwardDebugInfo(i); }
  }
  return loss;
}
7.layer_factory.hpp
namespace caffe {
template <typename Dtype>
class LayerRegistry {
 public:
  typedef shared_ptr<Layer<Dtype> > (*Creator)(const LayerParameter&);
  typedef std::map<string, Creator> CreatorRegistry;
  //typedef http://www.kuqin.com/language/20090322/41866.html
  //Creator       ,     shared_ptr<Layer<Dtype> >

  static CreatorRegistry& Registry() {
    static CreatorRegistry* g_registry_ = new CreatorRegistry();
    return *g_registry_;
  }

  // Adds a creator.
  static void AddCreator(const string& type, Creator creator) {
    CreatorRegistry& registry = Registry();
    CHECK_EQ(registry.count(type), 0)
        << "Layer type " << type << " already registered.";
    registry[type] = creator;
  }

  // Get a layer using a LayerParameter.
  static shared_ptr<Layer<Dtype> > CreateLayer(const LayerParameter& param) {
    if (Caffe::root_solver()) {
      LOG(INFO) << "Creating layer " << param.name();
    }
    const string& type = param.type();
    CreatorRegistry& registry = Registry();
    CHECK_EQ(registry.count(type), 1) << "Unknown layer type: " << type
        << " (known types: " << LayerTypeListString() << ")";
    return registry[type](param);
    //       new     layer 。DataLayer,ConvolutionLayer,...
  }
};

template <typename Dtype>
class LayerRegisterer {
 public:
  LayerRegisterer(const string& type,
                  shared_ptr<Layer<Dtype> > (*creator)(const LayerParameter&)) {
    LayerRegistry<Dtype>::AddCreator(type, creator);
  }
};

#define REGISTER_LAYER_CREATOR(type, creator) \
  static LayerRegisterer<float> g_creator_f_##type(#type, creator<float>);\
  static LayerRegisterer<double> g_creator_d_##type(#type, creator<double>)\

#define REGISTER_LAYER_CLASS(type)\
  template <typename Dtype>\
  shared_ptr<Layer<Dtype> > Creator_##type##Layer(const LayerParameter& param) \ 
  { \
    return shared_ptr<Layer<Dtype> >(new type##Layer<Dtype>(param));\
  }\
  REGISTER_LAYER_CREATOR(type, Creator_##type##Layer)
  //     REGISTER_LAYER_CLASS,REGISTER_LAYER_CREATOR
  //      LayerRegisterer
  //->AddCreator->registry[type] = creator
  //    , CreateLayer         .return registry[type](param);
  // : data_layer.cpp      REGISTER_LAYER_CLASS(Data);
  //REGISTER_LAYER_CLASS(Data)  namespace caffe  ,  static .
  //          namespace caffe ,    REGISTER_LAYER_CLASS(Data).       static   /  (?)

}  // namespace caffe
8.layer.hpp
class Layer {
 public:
  explicit Layer(const LayerParameter& param)
    : layer_param_(param), is_shared_(false) {
      // Set phase and copy blobs (if there are any).
      phase_ = param.phase();
      if (layer_param_.blobs_size() > 0) {
        blobs_.resize(layer_param_.blobs_size());
        for (int i = 0; i < layer_param_.blobs_size(); ++i) {
          blobs_[i].reset(new Blob<Dtype>());
          blobs_[i]->FromProto(layer_param_.blobs(i));
        }
      }
    }

  void SetUp(const vector<Blob<Dtype>*>& bottom,
      const vector<Blob<Dtype>*>& top) {
    InitMutex();
    CheckBlobCounts(bottom, top);
    LayerSetUp(bottom, top);//   。     layer(  )   。
    Reshape(bottom, top);
    SetLossWeights(top);
  }
9.blob
class Blob {
 public:
  const Dtype* cpu_data() const;
  void set_cpu_data(Dtype* data);
  const Dtype* gpu_data() const;
  const Dtype* cpu_diff() const;
  const Dtype* gpu_diff() const;
  Dtype* mutable_cpu_data();
  Dtype* mutable_gpu_data();
  Dtype* mutable_cpu_diff();
  Dtype* mutable_gpu_diff();


 protected:
  shared_ptr<SyncedMemory> data_;
  shared_ptr<SyncedMemory> diff_;
}

Dtype* Blob<Dtype>::mutable_cpu_data() {
  CHECK(data_);
  return static_cast<Dtype*>(data_->mutable_cpu_data());
}
class SyncedMemory {
 public:
  const void* cpu_data();
  void set_cpu_data(void* data);
  const void* gpu_data();
  void* mutable_cpu_data();
  void* mutable_gpu_data();
  enum SyncedHead { UNINITIALIZED, HEAD_AT_CPU, HEAD_AT_GPU, SYNCED };
  SyncedHead head() { return head_; }
  size_t size() { return size_; }

 private:
  void to_cpu();
  void to_gpu();
  void* cpu_ptr_;
  void* gpu_ptr_;
  size_t size_;
  SyncedHead head_;
  bool own_cpu_data_;

  DISABLE_COPY_AND_ASSIGN(SyncedMemory);
};

void* SyncedMemory::mutable_cpu_data() {
  to_cpu();
  head_ = HEAD_AT_CPU;
  return cpu_ptr_;
}

inline void SyncedMemory::to_cpu() {
  switch (head_) {
  case UNINITIALIZED:
    CaffeMallocHost(&cpu_ptr_, size_, &cpu_malloc_use_cuda_);
    caffe_memset(size_, 0, cpu_ptr_);
    head_ = HEAD_AT_CPU;
    own_cpu_data_ = true;
    break;
  case HEAD_AT_GPU:
#ifndef CPU_ONLY
    if (cpu_ptr_ == NULL) {
      CaffeMallocHost(&cpu_ptr_, size_, &cpu_malloc_use_cuda_);
      own_cpu_data_ = true;
    }
    caffe_gpu_memcpy(size_, gpu_ptr_, cpu_ptr_);
    head_ = SYNCED;
#else
    NO_GPU;
#endif
    break;
  case HEAD_AT_CPU:
  case SYNCED:
    break;
  }
}
10.data_layer
http://blog.csdn.net/iamzhangzhuping/article/details/50582503
継承:
layer->BaseDataLayer BaseDataLayer,Internal Thread->BasePrefetch DataLayer->DataLayer
初期化:
//net.cpp
layers_.push_back(LayerRegistry<Dtype>::CreateLayer(layer_param));
DataLayer<Dtype>::DataLayer(const LayerParameter& param)
  : BasePrefetchingDataLayer<Dtype>(param),
    reader_(param) { //                      //1.BasePrefetchingDataLayer, //2.reader_
}
1.BasePrefetch DataLayer
BasePrefetchingDataLayer<Dtype>::BasePrefetchingDataLayer(
    const LayerParameter& param)
    : BaseDataLayer<Dtype>(param),
      prefetch_free_(), prefetch_full_() {
  for (int i = 0; i < PREFETCH_COUNT; ++i) {
    prefetch_free_.push(&prefetch_[i]);
  }
}
実装順序は(1)BaseDataLayer(param)です。
BaseDataLayer<Dtype>::BaseDataLayer(const LayerParameter& param)
    : Layer<Dtype>(param),
      transform_param_(param.transform_param()) {
}
(2)prefetch_free()prefetch_フルウ()
  // Prefetches batches (asynchronously if to GPU memory)
  static const int PREFETCH_COUNT = 3;

  Batch<Dtype> prefetch_[PREFETCH_COUNT];
  BlockingQueue<Batch<Dtype>*> prefetch_free_;
  BlockingQueue<Batch<Dtype>*> prefetch_full_;
(3)関数の内部:
     for (int i = 0; i < PREFETCH_COUNT; ++i) { prefetch_free_.push(&prefetch_[i]); }
2.reader_
/************* data_reader.cpp *********************** /** * @brief Reads data from a source to queues available to data layers. * A single reading thread is created per source, even if multiple solvers * are running in parallel, e.g. for multi-GPU training. This makes sure * databases are read sequentially, and that each solver accesses a different * subset of the database. Data is distributed to solvers in a round-robin * way to keep parallel training deterministic. */
class DataReader {
 public:
  explicit DataReader(const LayerParameter& param);
  ~DataReader();

  inline BlockingQueue<Datum*>& free() const {
    return queue_pair_->free_;
  }
  inline BlockingQueue<Datum*>& full() const {
    return queue_pair_->full_;
  }

 protected:
  // Queue pairs are shared between a body and its readers
  class QueuePair {
   public:
    explicit QueuePair(int size);
    ~QueuePair();

    BlockingQueue<Datum*> free_;
    BlockingQueue<Datum*> full_;

  DISABLE_COPY_AND_ASSIGN(QueuePair);
  };

  // A single body is created per source
  class Body : public InternalThread {
   public:
    explicit Body(const LayerParameter& param);
    virtual ~Body();

   protected:
    void InternalThreadEntry();
    void read_one(db::Cursor* cursor, QueuePair* qp);

    const LayerParameter param_;
    BlockingQueue<shared_ptr<QueuePair> > new_queue_pairs_;

    friend class DataReader;

  DISABLE_COPY_AND_ASSIGN(Body);
  };

  // A source is uniquely identified by its layer name + path, in case
  // the same database is read from two different locations in the net.
  static inline string source_key(const LayerParameter& param) {
    return param.name() + ":" + param.data_param().source();
  }

  const shared_ptr<QueuePair> queue_pair_;
  shared_ptr<Body> body_;

  static map<const string, boost::weak_ptr<DataReader::Body> > bodies_;

DISABLE_COPY_AND_ASSIGN(DataReader);
};
/************* data_reader.cpp ***********************
using boost::weak_ptr;

map<const string, weak_ptr<DataReader::Body> > DataReader::bodies_;
static boost::mutex bodies_mutex_;

DataReader::DataReader(const LayerParameter& param)
  // Prefetch queue (Number of batches to prefetch to host memory, increase if
  // data access bandwidth varies).
  // optional uint32 prefetch = 10 [default = 4];
    : queue_pair_(new QueuePair( 
        param.data_param().prefetch() * param.data_param().batch_size())) {/*(1)*/
  // Get or create a body
  boost::mutex::scoped_lock lock(bodies_mutex_);
  string key = source_key(param);
  weak_ptr<Body>& weak = bodies_[key];
  body_ = weak.lock();
  if (!body_) {
    body_.reset(new Body(param));            /* (2) */
    bodies_[key] = weak_ptr<Body>(body_);    /* (3) */
  }
  body_->new_queue_pairs_.push(queue_pair_); /* (4) */
}
(1)
DataReader::QueuePair::QueuePair(int size) {
  // Initialize the free queue with requested number of datums
  for (int i = 0; i < size; ++i) {
    free_.push(new Datum());
  }
}
(2)
DataReader::Body::Body(const LayerParameter& param)
    : param_(param),
      new_queue_pairs_() {
  StartInternalThread();
}
void DataReader::Body::InternalThreadEntry() {
  shared_ptr<db::DB> db(db::GetDB(param_.data_param().backend()));
  db->Open(param_.data_param().source(), db::READ);
  shared_ptr<db::Cursor> cursor(db->NewCursor());
  vector<shared_ptr<QueuePair> > qps;
  try {
    int solver_count = param_.phase() == TRAIN ? Caffe::solver_count() : 1;

    // To ensure deterministic runs, only start running once all solvers
    // are ready. But solvers need to peek on one item during initialization,
    // so read one item, then wait for the next solver.
    for (int i = 0; i < solver_count; ++i) {
      shared_ptr<QueuePair> qp(new_queue_pairs_.pop());
      read_one(cursor.get(), qp.get());
      qps.push_back(qp);
    }
    // Main loop
    while (!must_stop()) {
      for (int i = 0; i < solver_count; ++i) {
        read_one(cursor.get(), qps[i].get());
      }
      // Check no additional readers have been created. This can happen if
      // more than one net is trained at a time per process, whether single
      // or multi solver. It might also happen if two data layers have same
      // name and same source.
      CHECK_EQ(new_queue_pairs_.size(), 0);
    }
  } catch (boost::thread_interrupted&) {
    // Interrupted exception is expected on shutdown
  }
}

void DataReader::Body::read_one(db::Cursor* cursor, QueuePair* qp) {
  Datum* datum = qp->free_.pop();
  // TODO deserialize in-place instead of copy?
  datum->ParseFromString(cursor->value());
  qp->full_.push(datum);

  // go to the next iter
  cursor->Next();
  if (!cursor->valid()) {
    DLOG(INFO) << "Restarting data prefetching from start.";
    cursor->SeekToFirst();
  }
}
セットアップ
実現順序//net.cpp
layers_[layer_id]->SetUp(bottom_vecs_[layer_id], top_vecs_[layer_id]);
#include <iostream>
using namespace std;

class InterThread{};

class layer{
public:
    layer(){};
    void SetUp( ) {
        cout<<"layer:SetUp():start"<<endl;
        LayerSetUp();
        cout<<"layer:SetUp():end"<<endl;
    }
    virtual void LayerSetUp(){}
};

class BaseDataLayer:public layer{
public:
    BaseDataLayer():layer(){}
    virtual void LayerSetUp();
    virtual void DataLayerSetUp(){}
};
void BaseDataLayer::LayerSetUp( ) {
    cout<<"BaseDataLayer:LayerSetUp():start"<<endl;
    DataLayerSetUp( );
    cout<<"BaseDataLayer:LayerSetUp():end"<<endl;
}

class BasePrefetchingDataLayer :
        public BaseDataLayer,public InterThread {
        public:
    BasePrefetchingDataLayer():BaseDataLayer(),InterThread(){}
    void LayerSetUp( );
};
void BasePrefetchingDataLayer::LayerSetUp() {
    cout<<"BasePrefetchingDataLayer:LayerSetUp():start"<<endl;
    BaseDataLayer::LayerSetUp( );
    cout<<"BasePrefetchingDataLayer:LayerSetUp():end"<<endl;
}

class DataLayer : public BasePrefetchingDataLayer {
public:
    DataLayer():BasePrefetchingDataLayer(){}
    virtual void DataLayerSetUp();
};
void DataLayer::DataLayerSetUp( ) {
    cout<<"DataLayer:DataLayerSetUp()"<<endl;
}

int main()
{
    layer *layer1;
    layer1 = new DataLayer();
    layer1->SetUp();
}
出力結果:~~~~~~~~~~~~~layer:SetUp():start BasePrefettDataLayer:LayerSetUp():start DataLayer:DataLayetSettLayep():start DataLayer:DataLayer:DataLayer:DataLayer SetSeyetSettttttUyer(DayettttLayer)(DayetLayetLayentrararatttttttttttttttttdedededededededededededededededededededededededededededededededededededededededededededede:end~~~~~~~
コード
bottom:empty top:2
Layer:
  void SetUp(const vector<Blob<Dtype>*>& bottom,
      const vector<Blob<Dtype>*>& top) {
    InitMutex();
    CheckBlobCounts(bottom, top);
    LayerSetUp(bottom, top);
    Reshape(bottom, top);
    SetLossWeights(top);
  }
BasePrefetch DataLayer:
void BasePrefetchingDataLayer<Dtype>::LayerSetUp(
    const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
  BaseDataLayer<Dtype>::LayerSetUp(bottom, top);
  // Before starting the prefetch thread, we make cpu_data and gpu_data
  // calls so that the prefetch thread does not accidentally make simultaneous
  // cudaMalloc calls when the main thread is running. In some GPUs this
  // seems to cause failures if we do not so.
  for (int i = 0; i < PREFETCH_COUNT; ++i) {
    prefetch_[i].data_.mutable_cpu_data();
    if (this->output_labels_) {
      prefetch_[i].label_.mutable_cpu_data();
    }
  }
#ifndef CPU_ONLY
  if (Caffe::mode() == Caffe::GPU) {
    for (int i = 0; i < PREFETCH_COUNT; ++i) {
      prefetch_[i].data_.mutable_gpu_data();
      if (this->output_labels_) {
        prefetch_[i].label_.mutable_gpu_data();
      }
    }
  }
#endif
  DLOG(INFO) << "Initializing prefetch";
  this->data_transformer_->InitRand();
  StartInternalThread();
  DLOG(INFO) << "Prefetch initialized.";
}
void BasePrefetchingDataLayer<Dtype>::InternalThreadEntry() {
#ifndef CPU_ONLY
  cudaStream_t stream;
  if (Caffe::mode() == Caffe::GPU) {
    CUDA_CHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
  }
#endif

  try {
    while (!must_stop()) {
      Batch<Dtype>* batch = prefetch_free_.pop();
      load_batch(batch);
#ifndef CPU_ONLY
      if (Caffe::mode() == Caffe::GPU) {
        batch->data_.data().get()->async_gpu_push(stream);
        CUDA_CHECK(cudaStreamSynchronize(stream));
      }
#endif
      prefetch_full_.push(batch);
    }
  } catch (boost::thread_interrupted&) {
    // Interrupted exception is expected on shutdown
  }
#ifndef CPU_ONLY
  if (Caffe::mode() == Caffe::GPU) {
    CUDA_CHECK(cudaStreamDestroy(stream));
  }
#endif
}
BaseDataLayer
void BaseDataLayer<Dtype>::LayerSetUp(const vector<Blob<Dtype>*>& bottom,
      const vector<Blob<Dtype>*>& top) {
  if (top.size() == 1) {
    output_labels_ = false;
  } else {
    output_labels_ = true;
  }
  data_transformer_.reset(
      new DataTransformer<Dtype>(transform_param_, this->phase_));
  data_transformer_->InitRand();
  // The subclasses should setup the size of bottom and top
  DataLayerSetUp(bottom, top);
}
DataLayer
void DataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
      const vector<Blob<Dtype>*>& top) {
  const int batch_size = this->layer_param_.data_param().batch_size();
  // Read a data point, and use it to initialize the top blob.
  Datum& datum = *(reader_.full().peek());

  // Use data_transformer to infer the expected blob shape from datum.
  vector<int> top_shape = this->data_transformer_->InferBlobShape(datum);
  this->transformed_data_.Reshape(top_shape);
  // Reshape top[0] and prefetch_data according to the batch_size.
  top_shape[0] = batch_size;
  top[0]->Reshape(top_shape);
  for (int i = 0; i < this->PREFETCH_COUNT; ++i) {
    this->prefetch_[i].data_.Reshape(top_shape);
  }
  LOG(INFO) << "output data size: " << top[0]->num() << ","
      << top[0]->channels() << "," << top[0]->height() << ","
      << top[0]->width();
  // label
  if (this->output_labels_) {
    vector<int> label_shape(1, batch_size);
    top[1]->Reshape(label_shape);
    for (int i = 0; i < this->PREFETCH_COUNT; ++i) {
      this->prefetch_[i].label_.Reshape(label_shape);
    }
  }
}
void DataLayer<Dtype>::load_batch(Batch<Dtype>* batch) {
  CPUTimer batch_timer;
  batch_timer.Start();
  double read_time = 0;
  double trans_time = 0;
  CPUTimer timer;
  CHECK(batch->data_.count());
  CHECK(this->transformed_data_.count());

  // Reshape according to the first datum of each batch
  // on single input batches allows for inputs of varying dimension.
  const int batch_size = this->layer_param_.data_param().batch_size();
  Datum& datum = *(reader_.full().peek());
  // Use data_transformer to infer the expected blob shape from datum.
  vector<int> top_shape = this->data_transformer_->InferBlobShape(datum);
  this->transformed_data_.Reshape(top_shape);
  // Reshape batch according to the batch_size.
  top_shape[0] = batch_size;
  batch->data_.Reshape(top_shape);

  Dtype* top_data = batch->data_.mutable_cpu_data();
  Dtype* top_label = NULL;  // suppress warnings about uninitialized variables

  if (this->output_labels_) {
    top_label = batch->label_.mutable_cpu_data();
  }
  for (int item_id = 0; item_id < batch_size; ++item_id) {
    timer.Start();
    // get a datum
    Datum& datum = *(reader_.full().pop("Waiting for data"));
    read_time += timer.MicroSeconds();
    timer.Start();
    // Apply data transformations (mirror, scale, crop...)
    int offset = batch->data_.offset(item_id);
    this->transformed_data_.set_cpu_data(top_data + offset);
    this->data_transformer_->Transform(datum, &(this->transformed_data_));
    // Copy label.
    if (this->output_labels_) {
      top_label[item_id] = datum.label();
    }
    trans_time += timer.MicroSeconds();

    reader_.free().push(const_cast<Datum*>(&datum));
  }
  timer.Stop();
  batch_timer.Stop();
  DLOG(INFO) << "Prefetch batch: " << batch_timer.MilliSeconds() << " ms.";
  DLOG(INFO) << " Read time: " << read_time / 1000 << " ms.";
  DLOG(INFO) << "Transform time: " << trans_time / 1000 << " ms.";
}
締め括りをつける
1.sourceが一つしかないなら、Bodyオブジェクトは一つしかない。Body body_のメンバーnew_queue_pairsの中のfree_full_の列の中の要素はDatumで、列の長さは(4 x batch_)である。2.BasePrefetchingDataLayerの中のprefetch_free_ prefetch_full_の列の要素はBatchです。列の長さは3.3です。2つのStartInternalThreadの関数についてはInternalThreadEntry()の関数を呼び出しました。この関数にはwhile (!must_stop()){xxxfree_.pop()}があります。このwhileサイクルは、コンフィギュレーション関数を呼び出すまで継続します。そのため、次のStopInternalThread()列に新しいデータが入力されるまで一時的にブロックされます。
初期化
最初のソロverのData Reader reader au初期化の時、1.xxxfree_.pop()/free_に入れました。これだけ多くのprefetch_free_.2.BlockingQueueは以下の分析を参照してください。3.xxxfree_の残りのソロの中のData Reader reader au初期化の際、1.queue_pair_(new QueuePair(
param.data_param().prefetch() * param.data_param().batch_size()))
/free_中に入れました。こんなにたくさんのDatum.(4 x batch_)size)3.free_分析:body_コンストラクタDatumは、body_.reset(new Body(param));// Body.関数を呼び出します。
void DataReader::Body::InternalThreadEntry() {
  shared_ptr<db::DB> db(db::GetDB(param_.data_param().backend()));
  db->Open(param_.data_param().source(), db::READ);
  shared_ptr<db::Cursor> cursor(db->NewCursor());
  vector<shared_ptr<QueuePair> > qps;
  try {
    int solver_count = param_.phase() == TRAIN ? Caffe::solver_count() : 1;

    // To ensure deterministic runs, only start running once all solvers
    // are ready. But solvers need to peek on one item during initialization,
    // so read one item, then wait for the next solver.
    for (int i = 0; i < solver_count; ++i) {
      shared_ptr<QueuePair> qp(new_queue_pairs_.pop());
      read_one(cursor.get(), qp.get());
      qps.push_back(qp);
    }
    // Main loop
    /*  while   `body_`            ,     solver     `read_one`  :*/
    while (!must_stop()) {
      for (int i = 0; i < solver_count; ++i) {
        read_one(cursor.get(), qps[i].get());
      }
      // Check no additional readers have been created. This can happen if
      // more than one net is trained at a time per process, whether single
      // or multi solver. It might also happen if two data layers have same
      // name and same source.
      CHECK_EQ(new_queue_pairs_.size(), 0);
    }
  } catch (boost::thread_interrupted&) {
    // Interrupted exception is expected on shutdown
  }
}
void DataReader::Body::read_one(db::Cursor* cursor, QueuePair* qp) {
  Datum* datum = qp->free_.pop();
  // TODO deserialize in-place instead of copy?
  datum->ParseFromString(cursor->value());
  qp->full_.push(datum);

  // go to the next iter
  cursor->Next();
  if (!cursor->valid()) {
    DLOG(INFO) << "Restarting data prefetching from start.";
    cursor->SeekToFirst();
  }
}
body_->new_queue_pairs_.push(queue_pair_);関数では、queue_pair_(new QueuePair(
param.data_param().prefetch() * param.data_param().batch_size()))
の列の中のpopが一つで、body_->new_queue_pairs_.push(queue_pair_);の列の中に新しいデータが入ってきます。そうすると、各ソロverはプリ読み取りできます。このStartInternalThread()は、InternalThreadEntry()列のポップが空きました。その後、何が発生しますか?read_onefree_ですので、次回full_で新しいデータがpushされるまでしばらく渋滞します。いつfree_。で新しいデータがpushされますか?
layer SetUpDatum関数には、
for (int item_id = 0; item_id < batch_size; ++item_id) {
    // get a datum
    Datum& datum = *(reader_.full().pop("Waiting for data"));

    reader_.free().push(const_cast<Datum*>(&datum));
  }
上記の要約で言及されたfree_に関しては、free_は、関数BlockingQueuefree_の関数を呼び出し、DataLayer::load_batchの関数を呼び出す。
BasePrefetchingDataLayer<Dtype>::InternalThreadEntry() {
while (!must_stop()) {
      Batch<Dtype>* batch = prefetch_free_.pop();
      load_batch(batch);
      prefetch_full_.push(batch);
    }
  }
では、prefetch_free_はいつまたプッシュしますか?
void BasePrefetchingDataLayer<Dtype>::Forward_cpu(
    const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
  Batch<Dtype>* batch = prefetch_full_.pop("Data layer prefetch queue empty");
  // Reshape to loaded data.
  top[0]->ReshapeLike(batch->data_);
  // Copy the data
  caffe_copy(batch->data_.count(), batch->data_.cpu_data(),
             top[0]->mutable_cpu_data());
  DLOG(INFO) << "Prefetch copied";
  if (this->output_labels_) {
    // Reshape to loaded labels.
    top[1]->ReshapeLike(batch->label_);
    // Copy the labels.
    caffe_copy(batch->label_.count(), batch->label_.cpu_data(),
        top[1]->mutable_cpu_data());
  }

  prefetch_free_.push(batch);
}
RNG
http://www.boost.org/doc/libs/1_46_0/doc/html/bootst_ランドドm/tutorial.http://stackoverflow.com/questions/2254909/boost-random-number-generator
定義
common.hpp
class Caffe {
 public:
  // This random number generator facade hides boost and CUDA rng
  // implementation from one another (for cross-platform compatibility).
  class RNG {
   public:
    RNG();
    explicit RNG(unsigned int seed);
    explicit RNG(const RNG&);
    RNG& operator=(const RNG&);
    void* generator();
   private:
    class Generator;
    shared_ptr<Generator> generator_;
  };

  // Getters for boost rng, curand, and cublas handles
  inline static RNG& rng_stream() {
    if (!Get().random_generator_) {
      Get().random_generator_.reset(new RNG());
    }
    return *(Get().random_generator_);
  }

  shared_ptr<RNG> random_generator_;
 }
common.cpp
class Caffe::RNG::Generator {
 public:
  Generator() : rng_(new caffe::rng_t(cluster_seedgen())) {}
  explicit Generator(unsigned int seed) : rng_(new caffe::rng_t(seed)) {}
  caffe::rng_t* rng() { return rng_.get(); }
 private:
  shared_ptr<caffe::rng_t> rng_;
};

Caffe::RNG::RNG() : generator_(new Generator()) { }

Caffe::RNG::RNG(unsigned int seed) : generator_(new Generator(seed)) { }

Caffe::RNG& Caffe::RNG::operator=(const RNG& other) {
  generator_ = other.generator_;
  return *this;
}

void* Caffe::RNG::generator() {
  return static_cast<void*>(generator_->rng());
}
初期化prefetch_full_BasePrefetchingDataLayer<Dtype>::LayerSetUpが使用しているところはStartInternalThreadにあります。
if (Caffe::root_solver() && param_.random_seed() >= 0) {
    Caffe::set_random_seed(param_.random_seed());
  }
common.cpp
void Caffe::set_random_seed(const unsigned int seed) {
  // RNG seed
  Get().random_generator_.reset(new RNG(seed));
}
rng.cpp
namespace caffe {

typedef boost::mt19937 rng_t;

inline rng_t* caffe_rng() {
    return static_cast<caffe::rng_t*>(Caffe::rng_stream().generator());
}
    //...
}
これで初期化されました。generator_,[class Caffe generator[u]クラスRNG"rng_(new caffe:rng_t(seed)[class Generator]
shuffle the mages
イメージ.イメージdata_layer.cpp
//shared_ptr<Caffe::RNG> prefetch_rng_;
const unsigned int prefetch_rng_seed = caffe_rng_rand();
prefetch_rng_.reset(new Caffe::RNG(prefetch_rng_seed));
ShuffleImages();
mathfunctions.cpp
unsigned int caffe_rng_rand() {
  return (*caffe_rng())();
}
rng.cpp
namespace caffe {

typedef boost::mt19937 rng_t;

inline rng_t* caffe_rng() {
    return static_cast<caffe::rng_t*>(Caffe::rng_stream().generator());
}
    //...
}
common.hpp
inline static RNG& rng_stream() {
    if (!Get().random_generator_) {
      Get().random_generator_.reset(new RNG());
    }
    return *(Get().random_generator_);
  }

void* Caffe::RNG::generator() {
  return static_cast<void*>(generator_->rng());
}

caffe::rng_t* rng() { return rng_.get(); }
使い方2
data_trnsformer.cpp
template <typename Dtype>
void DataTransformer<Dtype>::InitRand() {
  const bool needs_rand = param_.mirror() ||
      (phase_ == TRAIN && param_.crop_size());
  //shared_ptr<Caffe::RNG> rng_;
  if (needs_rand) {
    const unsigned int rng_seed = caffe_rng_rand();
    rng_.reset(new Caffe::RNG(rng_seed));
  } else {
    rng_.reset();
  }
}

template <typename Dtype>
int DataTransformer<Dtype>::Rand(int n) {
  CHECK(rng_);
  CHECK_GT(n, 0);
  caffe::rng_t* rng =
      static_cast<caffe::rng_t*>(rng_->generator());
  return ((*rng)() % n);
}
//to be continued////