[トップ][caffe]ソース分析
105671 ワード
http://blog.csdn.net/chenriwei2/article/details/46368707
http://blog.csdn.net/qq_16055159/articategory/310775
http://blog.luoyetx.com/2015/10/reading-caffe-2/
caffe
1)、i.using namespace caffe;i i.class Caffe{inline static Set Device(int i)}Caffe::Set Device(gpus[0])iii.package caffe;message Datum{…}caffe:Datum datatest;
2)blob(http://blog.csdn.net/iamzhangzhuping/article/details/50445570)なぜdata copyが必要なところがありますか?gpudata and.cpu_data are used in cases were the data is used only as input and will not be modified by the algorithm.mutble_*is used when the data itself gets udated while running the algorithm.その次に、(1)データBlobに対する2回の操作が同じプロセッサを採用しているかどうか、(2)前の操作がデータBlob Whenever the datais caledを更新する可能性があるかどうかに注目したいです。Fnction call and that too using the same processor.If it is using the same processor,data need not be copied.If it is the processor,there is a change the data might have bered.premute.ine.ine。call and hence a data copy is required.
3)src/caffe/test下のファイルは、各layerの前传逆传が正しいかどうかをテストするために、make test make runtst
4)データストアparallel.hpp
コンパイルしたファイルはcaffe.pb.c&caffe.pb.hにあります。/build/include/caffe/protoにあります。
相関関数
caffe:Datum testdata;
testdata.set_chanels();
testdata.has_chanels();
testdata.clear_chanels()
testdata.chanels()or testdata.mutblechanels()testdata.float_data_size();
メイン関数
./tool/caffe.cpp
Step()関数では、反復ごとに
ソロ.cpp
./src/caffe/sover.cpp
gpusは4つ設けられています。0,1,2,3です
net.hpp
http://blog.csdn.net/iamzhangzhuping/article/details/50582503
継承:
layer->BaseDataLayer BaseDataLayer,Internal Thread->BasePrefetch DataLayer->DataLayer
初期化:
//net.cpp
実現順序//net.cpp
コード
bottom:empty top:2
Layer:
1.sourceが一つしかないなら、Bodyオブジェクトは一つしかない。Body
初期化
最初のソロverのData Reader reader au初期化の時、1.
layer SetUp
http://www.boost.org/doc/libs/1_46_0/doc/html/bootst_ランドドm/tutorial.http://stackoverflow.com/questions/2254909/boost-random-number-generator
定義
common.hpp
shuffle the mages
イメージ.イメージdata_layer.cpp
data_trnsformer.cpp
http://blog.csdn.net/qq_16055159/articategory/310775
http://blog.luoyetx.com/2015/10/reading-caffe-2/
caffe
1)、i.using namespace caffe;i i.class Caffe{inline static Set Device(int i)}Caffe::Set Device(gpus[0])iii.package caffe;message Datum{…}caffe:Datum datatest;
2)blob(http://blog.csdn.net/iamzhangzhuping/article/details/50445570)なぜdata copyが必要なところがありますか?gpudata and.cpu_data are used in cases were the data is used only as input and will not be modified by the algorithm.mutble_*is used when the data itself gets udated while running the algorithm.その次に、(1)データBlobに対する2回の操作が同じプロセッサを採用しているかどうか、(2)前の操作がデータBlob Whenever the datais caledを更新する可能性があるかどうかに注目したいです。Fnction call and that too using the same processor.If it is using the same processor,data need not be copied.If it is the processor,there is a change the data might have bered.premute.ine.ine。call and hence a data copy is required.
3)src/caffe/test下のファイルは、各layerの前传逆传が正しいかどうかをテストするために、make test make runtst
4)データストアparallel.hpp
class Params {
const size_t size_; // Size of buffers
Dtype* data_; // Network parameters
Dtype* diff_; // Gradient
};
class P2PSync : public GPUParams<Dtype>, public Solver<Dtype>::Callback,
public InternalThread {
public:
void run(const vector<int>& gpus);
protected:
shared_ptr<Solver<Dtype> > solver_;
using Params<Dtype>::size_;
using Params<Dtype>::data_;
using Params<Dtype>::diff_;
};
blob.hppclass Blob {
shared_ptr<SyncedMemory> data_;
shared_ptr<SyncedMemory> diff_;
};
layer.hppprotected:
/** The vector that stores the learnable parameters as a set of blobs. */
// , blob
vector<shared_ptr<Blob<Dtype> > > blobs_;
void SetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top)
net.hpp /// @brief the blobs storing intermediate results between the layer.
vector<shared_ptr<Blob<Dtype> > > blobs_;
// net init ,AppendTop
/*shared_ptr<Blob<Dtype> > blob_pointer(new Blob<Dtype>()); const int blob_id = blobs_.size(); blobs_.push_back(blob_pointer);*/
caffe.proto(./src/caffe/proto)[google protobuf]コンパイルしたファイルはcaffe.pb.c&caffe.pb.hにあります。/build/include/caffe/protoにあります。
message Datum {
optional int32 channels = 1;
optional int32 height = 2;
optional int32 label = 5;
repeated float float_data = 6;
optional bool encoded = 7 [default = false];
}
メッセージで定義されている各フィールドには一意の数字ラベルがあります。このラベルの役割は、フィールドをバイナリファイルで一意に表示し、そのフィールドの値を定義すると変更できなくなります。一つのポイントを強調する必要があります。1~15の数字ラベルは符号化後、1バイトのみを占めます。数字ラベルとフィールドタイプが含まれます。16~2047の数字ラベルは2バイトを占めています。したがって、1~15の数字ラベルは最も頻繁に出現する要素に用いるべきである。一度に1~15のラベルを使い切らないように設計すると、将来も頻繁に出現する可能性が考えられます。相関関数
caffe:Datum testdata;
testdata.set_chanels();
testdata.has_chanels();
testdata.clear_chanels()
testdata.chanels()or testdata.mutblechanels()testdata.float_data_size();
メイン関数
./tool/caffe.cpp
//
//gflags->http://dreamrunner.org/blog/2014/03/09/gflags-jian-ming-shi-yong/
DEFINE_string(gpu, "", "Optional; run in GPU mode on given device IDs separated by ','." "Use '-gpu all' to run on all available GPUs. The effective training " "batch size is multiplied by the number of devices.");
DEFINE_string(solver, "", "The solver definition protocol buffer text file.");
DEFINE_string(model, "", "The model definition protocol buffer text file..");
DEFINE_string(snapshot, "", "Optional; the snapshot solver state to resume training.");
DEFINE_string(weights, "", "Optional; the pretrained weights to initialize finetuning, " "separated by ','. Cannot be set simultaneously with snapshot.");
DEFINE_int32(iterations, 50, "The number of iterations to run.");
DEFINE_string(sigint_effect, "stop", "Optional; action to take when a SIGINT signal is received: " "snapshot, stop or none.");
DEFINE_string(sighup_effect, "snapshot", "Optional; action to take when a SIGHUP signal is received: " "snapshot, stop or none.");
int main(int argc, char** argv) {
// Print output to stderr (while still logging).
FLAGS_alsologtostderr = 1;
// Usage message.
gflags::SetUsageMessage("command line brew
"
"usage: caffe <command> <args>
"
"commands:
"
" train train or finetune a model
"
" test score a model
"
" device_query show GPU diagnostic information
"
" time benchmark model execution time");
// Run tool or show usage.
caffe::GlobalInit(&argc, &argv);// common.cpp
if (argc == 2) {
return GetBrewFunction(caffe::string(argv[1]))();
} else {
gflags::ShowUsageWithFlagsRestrict(argv[0], "tools/caffe");
}
}
int train() {
Caffe::set_solver_count(gpus.size());
shared_ptr<caffe::Solver<float> > solver(caffe::SolverRegistry<float>::CreateSolver(solver_param));
//explicit Solver(const SolverParameter& param, const Solver* root_solver = NULL);
//explicit Solver(const string& param_file, const Solver* root_solver = NULL);
//createSolver() sgd
//boost::shared_ptr
// solver
if (FLAGS_snapshot.size()) {
LOG(INFO) << "Resuming from " << FLAGS_snapshot;
solver->Restore(FLAGS_snapshot.c_str());
} else if (FLAGS_weights.size()) {
CopyLayers(solver.get(), FLAGS_weights);
}
if (gpus.size() > 1) {
caffe::P2PSync<float> sync(solver, NULL, solver->param());
sync.run(gpus);
} else {
LOG(INFO) << "Starting Optimization";
solver->Solve();
}
LOG(INFO) << "Optimization Done.";
return 0;
}
UpdateStep()関数では、反復ごとに
ApplyUpdate()
(class SGDSolver)->Update()
(class net)を呼び出します。template <typename Dtype>
void Net<Dtype>::Update() {
for (int i = 0; i < learnable_params_.size(); ++i) {
learnable_params_[i]->Update();
}
}
->Update()
(class blob)ソロ.cpp
./src/caffe/sover.cpp
void Solver<Dtype>::Init(const SolverParameter& param) {
CHECK(Caffe::root_solver() || root_solver_)
<< "root_solver_ needs to be set for all non-root solvers";...}
//Caffe::root_solver() http://blog.csdn.net/apsvvfb/article/details/50542863
//root_solver() { returnGet().root_solver_; }
//->Get(){ thread_instance_.reset(new Caffe())}
void Solver<Dtype>::InitTrainNet() {
if (Caffe::root_solver()) {
net_.reset(new Net<Dtype>(net_param));
//net.hpp:Net(const NetParameter& param, const Net* root_net = NULL);
// shared_ptr<Net<Dtype> >net_;
//shared_ptr reset() 1, , 0, 。 reset() , 1 。
} else {
net_.reset(new Net<Dtype>(net_param, root_solver_->net_.get()));
}
}
template <typename Dtype>
void Solver<Dtype>::Step(int iters) {
vector<Blob<Dtype>*> bottom_vec;
const int start_iter = iter_;
const int stop_iter = iter_ + iters;
int average_loss = this->param_.average_loss();
vector<Dtype> losses;
Dtype smoothed_loss = 0;
while (iter_ < stop_iter) {
// zero-init the params
net_->ClearParamDiffs();
if (param_.test_interval() && iter_ % param_.test_interval() == 0
&& (iter_ > 0 || param_.test_initialization())
&& Caffe::root_solver()) {
TestAll();
if (requested_early_exit_) {
// Break out of the while loop because stop was requested while testing.
break;
}
}
for (int i = 0; i < callbacks_.size(); ++i) {
callbacks_[i]->on_start();
}
const bool display = param_.display() && iter_ % param_.display() == 0;
net_->set_debug_info(display && param_.debug_info());
// accumulate the loss and gradient
Dtype loss = 0;
for (int i = 0; i < param_.iter_size(); ++i) {
loss += net_->ForwardBackward(bottom_vec);
// bottom_vec vector, .
/* net.hpp Dtype ForwardBackward(const vector<Blob<Dtype>* > & bottom) { Dtype loss; Forward(bottom, &loss);// net.cpp Backward(); return loss; } */
}
loss /= param_.iter_size();
// average the loss across iterations for smoothed reporting
if (losses.size() < average_loss) {
losses.push_back(loss);
int size = losses.size();
smoothed_loss = (smoothed_loss * (size - 1) + loss) / size;
} else {
int idx = (iter_ - start_iter) % average_loss;
smoothed_loss += (loss - losses[idx]) / average_loss;
losses[idx] = loss;
}
for (int i = 0; i < callbacks_.size(); ++i) {
callbacks_[i]->on_gradients_ready();
}
ApplyUpdate();
// Increment the internal iter_ counter -- its value should always indicate
// the number of times the weights have been updated.
++iter_;
}
}
4.parallel.hppgpusは4つ設けられています。0,1,2,3です
const size_t size_; // Size of buffers
Dtype* data_; // Network parameters
Dtype* diff_; // Gradient
template<typename Dtype>
P2PSync<Dtype>::P2PSync(shared_ptr<Solver<Dtype> > root_solver,
P2PSync<Dtype>* parent, const SolverParameter& param)
: GPUParams<Dtype>(root_solver, param.device_id()),
parent_(parent),
children_(),
queue_(),
initial_iter_(root_solver->iter()),
solver_() {
#ifndef CPU_ONLY
int initial_device;
CUDA_CHECK(cudaGetDevice(&initial_device));
const int self = param.device_id();
CUDA_CHECK(cudaSetDevice(self));
if (parent == NULL) {
solver_ = root_solver;
} else {
Caffe::set_root_solver(false);
solver_.reset(new WorkerSolver<Dtype>(param, root_solver.get()));
Caffe::set_root_solver(true);
}
this->configure(solver_.get());
solver_->add_callback(this);
if (parent) {
// Enable p2p access between devices
const int peer = parent->solver_->param().device_id();
int access;
CUDA_CHECK(cudaDeviceCanAccessPeer(&access, self, peer));
if (access) {
CUDA_CHECK(cudaDeviceEnablePeerAccess(peer, 0));
} else {
LOG(INFO)<< "GPU " << self << " does not have p2p access to GPU " << peer;
}
// Allocate receiving buffer on parent
CUDA_CHECK(cudaSetDevice(peer));
CUDA_CHECK(cudaMalloc(&parent_grads_, size_ * sizeof(Dtype)));
CUDA_CHECK(cudaSetDevice(self));
}
CUDA_CHECK(cudaSetDevice(initial_device));
#else
NO_GPU;
#endif
}
template<typename Dtype>
void P2PSync<Dtype>::on_start() {
#ifndef CPU_ONLY
#ifdef DEBUG
int device;
CUDA_CHECK(cudaGetDevice(&device));
CHECK(device == solver_->param().device_id());
#else
// CHECK(false);
#endif
// Wait for update from parent
if (parent_) {
P2PSync<Dtype> *parent = queue_.pop();
// 5.blocking_queue.cpp
// gpu1,2,3
//syncs[i]->StartInternalThread()
//-》void P2PSync<Dtype>::InternalThreadEntry()
//-》solver_->Step(solver_->param().max_iter() - initial_iter_);
// ,queue_ 。 。
// gpu0(root) on_start() , children_[i]->queue_.push(this); 1,2,3。
// ,void P2PSync<Dtype>::run
//for (int i = 1; i < syncs.size(); ++i) {
// syncs[i]->StartInternalThread();
//}
//solver_->Solve();
// :
// ,GPU1,2,3 on_start() , queue_ , pop 。
// GPU0 , ( parent), children_[i]->queue_.push(this);
// GPU0,GPU2 , GPU3( GPU3 parent GPU2, GPU2 children_[i]->queue_.push(this),GPU3 )
CHECK(parent == parent_);
}
// Update children
for (int i = children_.size() - 1; i >= 0; i--) {
Dtype* src = data_;
Dtype* dst = children_[i]->data_;
// parent copy child。
CUDA_CHECK(cudaMemcpyAsync(dst, src, size_ * sizeof(Dtype),
cudaMemcpyDeviceToDevice, cudaStreamDefault));
CUDA_CHECK(cudaStreamSynchronize(cudaStreamDefault));
children_[i]->queue_.push(this);
}
#endif
}
template<typename Dtype>
void P2PSync<Dtype>::on_gradients_ready() {
#ifndef CPU_ONLY
#ifdef DEBUG
int device;
CUDA_CHECK(cudaGetDevice(&device));
CHECK(device == solver_->param().device_id());
#endif
// Sum children gradients as they appear in the queue
for (int i = 0; i < children_.size(); ++i) {
P2PSync<Dtype> *child = queue_.pop();
Dtype* src = child->parent_grads_;
Dtype* dst = diff_;
#ifdef DEBUG
bool ok = false;
for (int j = 0; j < children_.size(); ++j) {
if (child == children_[j]) {
ok = true;
}
}
CHECK(ok);
cudaPointerAttributes attributes;
CUDA_CHECK(cudaPointerGetAttributes(&attributes, src));
CHECK(attributes.device == device);
CUDA_CHECK(cudaPointerGetAttributes(&attributes, dst));
CHECK(attributes.device == device);
#endif
caffe_gpu_add(size_, src, dst, dst);//dst=dst+src
}
// Send gradients to parent
if (parent_) {
Dtype* src = diff_;
Dtype* dst = parent_grads_;
CUDA_CHECK(cudaMemcpyAsync(dst, src, size_ * sizeof(Dtype), //
cudaMemcpyDeviceToDevice, cudaStreamDefault));
CUDA_CHECK(cudaStreamSynchronize(cudaStreamDefault));
parent_->queue_.push(this);
} else {
// Loss functions divide gradients by the batch size, so to compensate
// for split batch, the root solver divides by number of solvers.
caffe_gpu_scal(size_, Dtype(1.0 / Caffe::solver_count()), diff_);
}
#endif
}
template<typename Dtype>
void P2PSync<Dtype>::run(const vector<int>& gpus) {
// Pair devices for map-reduce synchronization
vector<DevicePair> pairs;//DevicePair[parent,device]
DevicePair::compute(gpus, &pairs);
// 4 gpus。0,1,2,3。computer :
// [-1,0;0,1;2,3;0,2] <- [-1,0] (0 ) , [parent,device]
ostringstream s;
for (int i = 1; i < pairs.size(); ++i) {
s << (i == 1 ? "" : ", ") << pairs[i].parent() << ":" << pairs[i].device();
}
LOG(INFO)<< "GPUs pairs " << s.str();
SolverParameter param(solver_->param());
vector<shared_ptr<P2PSync<Dtype> > > syncs(gpus.size());
// Build the GPU tree by finding the parent for each solver
for (int attempts = 0; attempts < pairs.size(); ++attempts) {
for (int i = 1; i < pairs.size(); ++i) {
if (!syncs[i].get()) {// ( 1) ,syncs[i].get() 0。
P2PSync<Dtype>* parent = NULL;
for (int j = 0; j < syncs.size(); ++j) {
P2PSync<Dtype>* sync = j == 0 ? this : syncs[j].get();
// run caffe.cpp int train() 。
// this train run sync。
//caffe::P2PSync<float> sync(solver, NULL, solver->param());
if (sync) {
const SolverParameter& p = sync->solver()->param();
if (p.device_id() == pairs[i].parent()) {
parent = sync;
}
}
}
if (parent) {
param.set_device_id(pairs[i].device());
syncs[i].reset(new P2PSync<Dtype>(solver_, parent, param));// 1
parent->children_.push_back((P2PSync<Dtype>*) syncs[i].get());
}
}
}
}
//reset
//syncs[1]:GPU1, its parent is "this"
//syncs[3]:GPU2, its child is "syncs[2]"
//syncs[2]:GPU3, its parent is "this"
//syncs[0] , 。 root_solver:GPU0。
LOG(INFO)<< "Starting Optimization";
for (int i = 1; i < syncs.size(); ++i) {
syncs[i]->StartInternalThread();
// InternalThread thread_(shared_ptr )
// parallel.cpp void P2PSync<Dtype>::InternalThreadEntry()
// solver_->Step(solver_->param().max_iter() - initial_iter_);
}
// Run root solver on current thread
solver_->Solve();
for (int i = 1; i < syncs.size(); ++i) {
syncs[i]->StopInternalThread();
}
}
5.blocking_queue.cpptemplate<typename T>
class BlockingQueue<T>::sync {
public:
mutable boost::mutex mutex_;
boost::condition_variable condition_;
};
template<typename T>
BlockingQueue<T>::BlockingQueue()
: sync_(new sync()) {
}
template<typename T>
void BlockingQueue<T>::push(const T& t) {
boost::mutex::scoped_lock lock(sync_->mutex_);
queue_.push(t);
lock.unlock();
// 。
sync_->condition_.notify_one();
}
template<typename T>
T BlockingQueue<T>::pop(const string& log_on_wait) {
boost::mutex::scoped_lock lock(sync_->mutex_);
while (queue_.empty()) {
if (!log_on_wait.empty()) {
LOG_EVERY_N(INFO, 1000)<< log_on_wait;
}
sync_->condition_.wait(lock);
// queue_ , 。
}
// , pop 。
T t = queue_.front();
queue_.pop();
return t;
}
6.netnet.hpp
explicit Net(const NetParameter& param, const Net* root_net = NULL);
explicit Net(const string& param_file, Phase phase,
const Net* root_net = NULL);
/// @brief The network name
string name_;
/// @brief The phase: TRAIN or TEST
Phase phase_;
/// @brief Individual layers in the net
vector<shared_ptr<Layer<Dtype> > > layers_;
vector<string> layer_names_;
map<string, int> layer_names_index_;
vector<bool> layer_need_backward_;
/// @brief the blobs storing intermediate results between the layer.
vector<shared_ptr<Blob<Dtype> > > blobs_;
vector<string> blob_names_;
map<string, int> blob_names_index_;
vector<bool> blob_need_backward_;
/// bottom_vecs stores the vectors containing the input for each layer.
/// They don't actually host the blobs (blobs_ does), so we simply store
/// pointers.
vector<vector<Blob<Dtype>*> > bottom_vecs_;
vector<vector<int> > bottom_id_vecs_;
vector<vector<bool> > bottom_need_backward_;
/// top_vecs stores the vectors containing the output for each layer
vector<vector<Blob<Dtype>*> > top_vecs_;
vector<vector<int> > top_id_vecs_;
/// Vector of weight in the loss (or objective) function of each net blob,
/// indexed by blob_id.
vector<Dtype> blob_loss_weights_;
vector<vector<int> > param_id_vecs_;
vector<int> param_owners_;
vector<string> param_display_names_;
vector<pair<int, int> > param_layer_indices_;
map<string, int> param_names_index_;
/// blob indices for the input and the output of the net
vector<int> net_input_blob_indices_;// top blobs
vector<int> net_output_blob_indices_;
/*net init AppenTop(layer_id=-1)//available_blobs->insert(blob_name); for(i=0;i<num_of_layers;i++) { AppendBottom();//available_blobs->erase(blob_name); AppendTop();//available_blobs->insert(blob_name); } available_blobs blobs net_output_blob_indices */
vector<Blob<Dtype>*> net_input_blobs_;
vector<Blob<Dtype>*> net_output_blobs_;
/// The parameters in the network.
vector<shared_ptr<Blob<Dtype> > > params_;
vector<Blob<Dtype>*> learnable_params_;
/** * The mapping from params_ -> learnable_params_: we have * learnable_param_ids_.size() == params_.size(), * and learnable_params_[learnable_param_ids_[i]] == params_[i].get() * if and only if params_[i] is an "owner"; otherwise, params_[i] is a sharer * and learnable_params_[learnable_param_ids_[i]] gives its owner. */
vector<int> learnable_param_ids_;
/// the learning rate multipliers for learnable_params_
vector<float> params_lr_;
vector<bool> has_params_lr_;
/// the weight decay multipliers for learnable_params_
vector<float> params_weight_decay_;
vector<bool> has_params_decay_;
/// The bytes of memory used by this net
size_t memory_used_;
/// Whether to compute and display debug info for the net.
bool debug_info_;
/// The root net that actually holds the shared layers in data parallelism
const Net* const root_net_;
net.cpptemplate <typename Dtype>
Net<Dtype>::Net(const NetParameter& param, const Net* root_net)
: root_net_(root_net) {
Init(param);
}
template <typename Dtype>
void Net<Dtype>::Init(const NetParameter& in_param) {
// set the input blobs
for (int input_id = 0; input_id < param.input_size(); ++input_id) {
const int layer_id = -1; // inputs have fake layer ID -1
AppendTop(param, layer_id, input_id, &available_blobs, &blob_name_to_idx);
}
for (int layer_id = 0; layer_id < param.layer_size(); ++layer_id) {
if (share_from_root) {
LOG(INFO) << "Sharing layer " << layer_param.name() << " from root net";
layers_.push_back(root_net_->layers_[layer_id]);
layers_[layer_id]->SetShared(true);
} else {
layers_.push_back(LayerRegistry<Dtype>::CreateLayer(layer_param));
// createlayer!
//layer_factory.hpp
}
// Figure out this layer's input and output
for (int bottom_id = 0; bottom_id < layer_param.bottom_size();
++bottom_id) {
const int blob_id = AppendBottom(param, layer_id, bottom_id,
&available_blobs, &blob_name_to_idx);
// If a blob needs backward, this layer should provide it.
need_backward |= blob_need_backward_[blob_id];
}
int num_top = layer_param.top_size();
for (int top_id = 0; top_id < num_top; ++top_id) {
AppendTop(param, layer_id, top_id, &available_blobs, &blob_name_to_idx);
}
// If the layer specifies that AutoTopBlobs() -> true and the LayerParameter
// specified fewer than the required number (as specified by
// ExactNumTopBlobs() or MinTopBlobs()), allocate them here.
Layer<Dtype>* layer = layers_[layer_id].get();
if (layer->AutoTopBlobs()) {
const int needed_num_top =
std::max(layer->MinTopBlobs(), layer->ExactNumTopBlobs());
for (; num_top < needed_num_top; ++num_top) {
// Add "anonymous" top blobs -- do not modify available_blobs or
// blob_name_to_idx as we don't want these blobs to be usable as input
// to other layers.
AppendTop(param, layer_id, num_top, NULL, NULL);
}
}
// After this layer is connected, set it up.
if (share_from_root) {
// .....
} else {
layers_[layer_id]->SetUp(bottom_vecs_[layer_id], top_vecs_[layer_id]);
// Setup! layer.hpp 。
}
LOG_IF(INFO, Caffe::root_solver())
<< "Setting up " << layer_names_[layer_id];
for (int top_id = 0; top_id < top_vecs_[layer_id].size(); ++top_id) {
if (blob_loss_weights_.size() <= top_id_vecs_[layer_id][top_id]) {
blob_loss_weights_.resize(top_id_vecs_[layer_id][top_id] + 1, Dtype(0));
}
blob_loss_weights_[top_id_vecs_[layer_id][top_id]] = layer->loss(top_id);
LOG_IF(INFO, Caffe::root_solver())
<< "Top shape: " << top_vecs_[layer_id][top_id]->shape_string();
if (layer->loss(top_id)) {
LOG_IF(INFO, Caffe::root_solver())
<< " with loss weight " << layer->loss(top_id);
}
memory_used_ += top_vecs_[layer_id][top_id]->count();
}
LOG_IF(INFO, Caffe::root_solver())
<< "Memory required for data: " << memory_used_ * sizeof(Dtype);
const int param_size = layer_param.param_size();
const int num_param_blobs = layers_[layer_id]->blobs().size();
CHECK_LE(param_size, num_param_blobs)
<< "Too many params specified for layer " << layer_param.name();
ParamSpec default_param_spec;
for (int param_id = 0; param_id < num_param_blobs; ++param_id) {
const ParamSpec* param_spec = (param_id < param_size) ?
&layer_param.param(param_id) : &default_param_spec;
const bool param_need_backward = param_spec->lr_mult() != 0;
need_backward |= param_need_backward;
layers_[layer_id]->set_param_propagate_down(param_id,
param_need_backward);
})
for (int param_id = 0; param_id < num_param_blobs; ++param_id) {
AppendParam(param, layer_id, param_id);
}
// Finally, set the backward flag
layer_need_backward_.push_back(need_backward);
if (need_backward) {
for (int top_id = 0; top_id < top_id_vecs_[layer_id].size(); ++top_id) {
blob_need_backward_[top_id_vecs_[layer_id][top_id]] = true;
}
}
}// end of for(int layer_id=0;....
/*........................... ............................... .............................*/
const vector<Blob<Dtype>*>& Net<Dtype>::Forward(
const vector<Blob<Dtype>*> & bottom, Dtype* loss) {
// solver.cpp bottom.size() 0
// Copy bottom to internal bottom
for (int i = 0; i < bottom.size(); ++i) {
net_input_blobs_[i]->CopyFrom(*bottom[i]);
}
return ForwardPrefilled(loss);
}
//ForwardFromTo(0, layers_.size() - 1);
template <typename Dtype>
Dtype Net<Dtype>::ForwardFromTo(int start, int end) {
Dtype loss = 0;
for (int i = start; i <= end; ++i) {
// LOG(ERROR) << "Forwarding " << layer_names_[i];
Dtype layer_loss = layers_[i]->Forward(bottom_vecs_[i], top_vecs_[i]);
loss += layer_loss;
if (debug_info_) { ForwardDebugInfo(i); }
}
return loss;
}
7.layer_factory.hppnamespace caffe {
template <typename Dtype>
class LayerRegistry {
public:
typedef shared_ptr<Layer<Dtype> > (*Creator)(const LayerParameter&);
typedef std::map<string, Creator> CreatorRegistry;
//typedef http://www.kuqin.com/language/20090322/41866.html
//Creator , shared_ptr<Layer<Dtype> >
static CreatorRegistry& Registry() {
static CreatorRegistry* g_registry_ = new CreatorRegistry();
return *g_registry_;
}
// Adds a creator.
static void AddCreator(const string& type, Creator creator) {
CreatorRegistry& registry = Registry();
CHECK_EQ(registry.count(type), 0)
<< "Layer type " << type << " already registered.";
registry[type] = creator;
}
// Get a layer using a LayerParameter.
static shared_ptr<Layer<Dtype> > CreateLayer(const LayerParameter& param) {
if (Caffe::root_solver()) {
LOG(INFO) << "Creating layer " << param.name();
}
const string& type = param.type();
CreatorRegistry& registry = Registry();
CHECK_EQ(registry.count(type), 1) << "Unknown layer type: " << type
<< " (known types: " << LayerTypeListString() << ")";
return registry[type](param);
// new layer 。DataLayer,ConvolutionLayer,...
}
};
template <typename Dtype>
class LayerRegisterer {
public:
LayerRegisterer(const string& type,
shared_ptr<Layer<Dtype> > (*creator)(const LayerParameter&)) {
LayerRegistry<Dtype>::AddCreator(type, creator);
}
};
#define REGISTER_LAYER_CREATOR(type, creator) \
static LayerRegisterer<float> g_creator_f_##type(#type, creator<float>);\
static LayerRegisterer<double> g_creator_d_##type(#type, creator<double>)\
#define REGISTER_LAYER_CLASS(type)\
template <typename Dtype>\
shared_ptr<Layer<Dtype> > Creator_##type##Layer(const LayerParameter& param) \
{ \
return shared_ptr<Layer<Dtype> >(new type##Layer<Dtype>(param));\
}\
REGISTER_LAYER_CREATOR(type, Creator_##type##Layer)
// REGISTER_LAYER_CLASS,REGISTER_LAYER_CREATOR
// LayerRegisterer
//->AddCreator->registry[type] = creator
// , CreateLayer .return registry[type](param);
// : data_layer.cpp REGISTER_LAYER_CLASS(Data);
//REGISTER_LAYER_CLASS(Data) namespace caffe , static .
// namespace caffe , REGISTER_LAYER_CLASS(Data). static / (?)
} // namespace caffe
8.layer.hppclass Layer {
public:
explicit Layer(const LayerParameter& param)
: layer_param_(param), is_shared_(false) {
// Set phase and copy blobs (if there are any).
phase_ = param.phase();
if (layer_param_.blobs_size() > 0) {
blobs_.resize(layer_param_.blobs_size());
for (int i = 0; i < layer_param_.blobs_size(); ++i) {
blobs_[i].reset(new Blob<Dtype>());
blobs_[i]->FromProto(layer_param_.blobs(i));
}
}
}
void SetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {
InitMutex();
CheckBlobCounts(bottom, top);
LayerSetUp(bottom, top);// 。 layer( ) 。
Reshape(bottom, top);
SetLossWeights(top);
}
9.blobclass Blob {
public:
const Dtype* cpu_data() const;
void set_cpu_data(Dtype* data);
const Dtype* gpu_data() const;
const Dtype* cpu_diff() const;
const Dtype* gpu_diff() const;
Dtype* mutable_cpu_data();
Dtype* mutable_gpu_data();
Dtype* mutable_cpu_diff();
Dtype* mutable_gpu_diff();
protected:
shared_ptr<SyncedMemory> data_;
shared_ptr<SyncedMemory> diff_;
}
Dtype* Blob<Dtype>::mutable_cpu_data() {
CHECK(data_);
return static_cast<Dtype*>(data_->mutable_cpu_data());
}
class SyncedMemory {
public:
const void* cpu_data();
void set_cpu_data(void* data);
const void* gpu_data();
void* mutable_cpu_data();
void* mutable_gpu_data();
enum SyncedHead { UNINITIALIZED, HEAD_AT_CPU, HEAD_AT_GPU, SYNCED };
SyncedHead head() { return head_; }
size_t size() { return size_; }
private:
void to_cpu();
void to_gpu();
void* cpu_ptr_;
void* gpu_ptr_;
size_t size_;
SyncedHead head_;
bool own_cpu_data_;
DISABLE_COPY_AND_ASSIGN(SyncedMemory);
};
void* SyncedMemory::mutable_cpu_data() {
to_cpu();
head_ = HEAD_AT_CPU;
return cpu_ptr_;
}
inline void SyncedMemory::to_cpu() {
switch (head_) {
case UNINITIALIZED:
CaffeMallocHost(&cpu_ptr_, size_, &cpu_malloc_use_cuda_);
caffe_memset(size_, 0, cpu_ptr_);
head_ = HEAD_AT_CPU;
own_cpu_data_ = true;
break;
case HEAD_AT_GPU:
#ifndef CPU_ONLY
if (cpu_ptr_ == NULL) {
CaffeMallocHost(&cpu_ptr_, size_, &cpu_malloc_use_cuda_);
own_cpu_data_ = true;
}
caffe_gpu_memcpy(size_, gpu_ptr_, cpu_ptr_);
head_ = SYNCED;
#else
NO_GPU;
#endif
break;
case HEAD_AT_CPU:
case SYNCED:
break;
}
}
10.data_layerhttp://blog.csdn.net/iamzhangzhuping/article/details/50582503
継承:
layer->BaseDataLayer BaseDataLayer,Internal Thread->BasePrefetch DataLayer->DataLayer
初期化:
//net.cpp
layers_.push_back(LayerRegistry<Dtype>::CreateLayer(layer_param));
DataLayer<Dtype>::DataLayer(const LayerParameter& param)
: BasePrefetchingDataLayer<Dtype>(param),
reader_(param) { // //1.BasePrefetchingDataLayer, //2.reader_
}
1.BasePrefetch DataLayerBasePrefetchingDataLayer<Dtype>::BasePrefetchingDataLayer(
const LayerParameter& param)
: BaseDataLayer<Dtype>(param),
prefetch_free_(), prefetch_full_() {
for (int i = 0; i < PREFETCH_COUNT; ++i) {
prefetch_free_.push(&prefetch_[i]);
}
}
実装順序は(1)BaseDataLayer(param)です。BaseDataLayer<Dtype>::BaseDataLayer(const LayerParameter& param)
: Layer<Dtype>(param),
transform_param_(param.transform_param()) {
}
(2)prefetch_free()prefetch_フルウ() // Prefetches batches (asynchronously if to GPU memory)
static const int PREFETCH_COUNT = 3;
Batch<Dtype> prefetch_[PREFETCH_COUNT];
BlockingQueue<Batch<Dtype>*> prefetch_free_;
BlockingQueue<Batch<Dtype>*> prefetch_full_;
(3)関数の内部: for (int i = 0; i < PREFETCH_COUNT; ++i) { prefetch_free_.push(&prefetch_[i]); }
2.reader_/************* data_reader.cpp *********************** /** * @brief Reads data from a source to queues available to data layers. * A single reading thread is created per source, even if multiple solvers * are running in parallel, e.g. for multi-GPU training. This makes sure * databases are read sequentially, and that each solver accesses a different * subset of the database. Data is distributed to solvers in a round-robin * way to keep parallel training deterministic. */
class DataReader {
public:
explicit DataReader(const LayerParameter& param);
~DataReader();
inline BlockingQueue<Datum*>& free() const {
return queue_pair_->free_;
}
inline BlockingQueue<Datum*>& full() const {
return queue_pair_->full_;
}
protected:
// Queue pairs are shared between a body and its readers
class QueuePair {
public:
explicit QueuePair(int size);
~QueuePair();
BlockingQueue<Datum*> free_;
BlockingQueue<Datum*> full_;
DISABLE_COPY_AND_ASSIGN(QueuePair);
};
// A single body is created per source
class Body : public InternalThread {
public:
explicit Body(const LayerParameter& param);
virtual ~Body();
protected:
void InternalThreadEntry();
void read_one(db::Cursor* cursor, QueuePair* qp);
const LayerParameter param_;
BlockingQueue<shared_ptr<QueuePair> > new_queue_pairs_;
friend class DataReader;
DISABLE_COPY_AND_ASSIGN(Body);
};
// A source is uniquely identified by its layer name + path, in case
// the same database is read from two different locations in the net.
static inline string source_key(const LayerParameter& param) {
return param.name() + ":" + param.data_param().source();
}
const shared_ptr<QueuePair> queue_pair_;
shared_ptr<Body> body_;
static map<const string, boost::weak_ptr<DataReader::Body> > bodies_;
DISABLE_COPY_AND_ASSIGN(DataReader);
};
/************* data_reader.cpp ***********************
using boost::weak_ptr;
map<const string, weak_ptr<DataReader::Body> > DataReader::bodies_;
static boost::mutex bodies_mutex_;
DataReader::DataReader(const LayerParameter& param)
// Prefetch queue (Number of batches to prefetch to host memory, increase if
// data access bandwidth varies).
// optional uint32 prefetch = 10 [default = 4];
: queue_pair_(new QueuePair(
param.data_param().prefetch() * param.data_param().batch_size())) {/*(1)*/
// Get or create a body
boost::mutex::scoped_lock lock(bodies_mutex_);
string key = source_key(param);
weak_ptr<Body>& weak = bodies_[key];
body_ = weak.lock();
if (!body_) {
body_.reset(new Body(param)); /* (2) */
bodies_[key] = weak_ptr<Body>(body_); /* (3) */
}
body_->new_queue_pairs_.push(queue_pair_); /* (4) */
}
(1)DataReader::QueuePair::QueuePair(int size) {
// Initialize the free queue with requested number of datums
for (int i = 0; i < size; ++i) {
free_.push(new Datum());
}
}
(2)DataReader::Body::Body(const LayerParameter& param)
: param_(param),
new_queue_pairs_() {
StartInternalThread();
}
void DataReader::Body::InternalThreadEntry() {
shared_ptr<db::DB> db(db::GetDB(param_.data_param().backend()));
db->Open(param_.data_param().source(), db::READ);
shared_ptr<db::Cursor> cursor(db->NewCursor());
vector<shared_ptr<QueuePair> > qps;
try {
int solver_count = param_.phase() == TRAIN ? Caffe::solver_count() : 1;
// To ensure deterministic runs, only start running once all solvers
// are ready. But solvers need to peek on one item during initialization,
// so read one item, then wait for the next solver.
for (int i = 0; i < solver_count; ++i) {
shared_ptr<QueuePair> qp(new_queue_pairs_.pop());
read_one(cursor.get(), qp.get());
qps.push_back(qp);
}
// Main loop
while (!must_stop()) {
for (int i = 0; i < solver_count; ++i) {
read_one(cursor.get(), qps[i].get());
}
// Check no additional readers have been created. This can happen if
// more than one net is trained at a time per process, whether single
// or multi solver. It might also happen if two data layers have same
// name and same source.
CHECK_EQ(new_queue_pairs_.size(), 0);
}
} catch (boost::thread_interrupted&) {
// Interrupted exception is expected on shutdown
}
}
void DataReader::Body::read_one(db::Cursor* cursor, QueuePair* qp) {
Datum* datum = qp->free_.pop();
// TODO deserialize in-place instead of copy?
datum->ParseFromString(cursor->value());
qp->full_.push(datum);
// go to the next iter
cursor->Next();
if (!cursor->valid()) {
DLOG(INFO) << "Restarting data prefetching from start.";
cursor->SeekToFirst();
}
}
セットアップ実現順序//net.cpp
layers_[layer_id]->SetUp(bottom_vecs_[layer_id], top_vecs_[layer_id]);
#include <iostream>
using namespace std;
class InterThread{};
class layer{
public:
layer(){};
void SetUp( ) {
cout<<"layer:SetUp():start"<<endl;
LayerSetUp();
cout<<"layer:SetUp():end"<<endl;
}
virtual void LayerSetUp(){}
};
class BaseDataLayer:public layer{
public:
BaseDataLayer():layer(){}
virtual void LayerSetUp();
virtual void DataLayerSetUp(){}
};
void BaseDataLayer::LayerSetUp( ) {
cout<<"BaseDataLayer:LayerSetUp():start"<<endl;
DataLayerSetUp( );
cout<<"BaseDataLayer:LayerSetUp():end"<<endl;
}
class BasePrefetchingDataLayer :
public BaseDataLayer,public InterThread {
public:
BasePrefetchingDataLayer():BaseDataLayer(),InterThread(){}
void LayerSetUp( );
};
void BasePrefetchingDataLayer::LayerSetUp() {
cout<<"BasePrefetchingDataLayer:LayerSetUp():start"<<endl;
BaseDataLayer::LayerSetUp( );
cout<<"BasePrefetchingDataLayer:LayerSetUp():end"<<endl;
}
class DataLayer : public BasePrefetchingDataLayer {
public:
DataLayer():BasePrefetchingDataLayer(){}
virtual void DataLayerSetUp();
};
void DataLayer::DataLayerSetUp( ) {
cout<<"DataLayer:DataLayerSetUp()"<<endl;
}
int main()
{
layer *layer1;
layer1 = new DataLayer();
layer1->SetUp();
}
出力結果:~~~~~~~~~~~~~layer:SetUp():start BasePrefettDataLayer:LayerSetUp():start DataLayer:DataLayetSettLayep():start DataLayer:DataLayer:DataLayer:DataLayer SetSeyetSettttttUyer(DayettttLayer)(DayetLayetLayentrararatttttttttttttttttdedededededededededededededededededededededededededededededededededededededededededededede:end~~~~~~~コード
bottom:empty top:2
Layer:
void SetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {
InitMutex();
CheckBlobCounts(bottom, top);
LayerSetUp(bottom, top);
Reshape(bottom, top);
SetLossWeights(top);
}
BasePrefetch DataLayer:void BasePrefetchingDataLayer<Dtype>::LayerSetUp(
const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
BaseDataLayer<Dtype>::LayerSetUp(bottom, top);
// Before starting the prefetch thread, we make cpu_data and gpu_data
// calls so that the prefetch thread does not accidentally make simultaneous
// cudaMalloc calls when the main thread is running. In some GPUs this
// seems to cause failures if we do not so.
for (int i = 0; i < PREFETCH_COUNT; ++i) {
prefetch_[i].data_.mutable_cpu_data();
if (this->output_labels_) {
prefetch_[i].label_.mutable_cpu_data();
}
}
#ifndef CPU_ONLY
if (Caffe::mode() == Caffe::GPU) {
for (int i = 0; i < PREFETCH_COUNT; ++i) {
prefetch_[i].data_.mutable_gpu_data();
if (this->output_labels_) {
prefetch_[i].label_.mutable_gpu_data();
}
}
}
#endif
DLOG(INFO) << "Initializing prefetch";
this->data_transformer_->InitRand();
StartInternalThread();
DLOG(INFO) << "Prefetch initialized.";
}
void BasePrefetchingDataLayer<Dtype>::InternalThreadEntry() {
#ifndef CPU_ONLY
cudaStream_t stream;
if (Caffe::mode() == Caffe::GPU) {
CUDA_CHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
}
#endif
try {
while (!must_stop()) {
Batch<Dtype>* batch = prefetch_free_.pop();
load_batch(batch);
#ifndef CPU_ONLY
if (Caffe::mode() == Caffe::GPU) {
batch->data_.data().get()->async_gpu_push(stream);
CUDA_CHECK(cudaStreamSynchronize(stream));
}
#endif
prefetch_full_.push(batch);
}
} catch (boost::thread_interrupted&) {
// Interrupted exception is expected on shutdown
}
#ifndef CPU_ONLY
if (Caffe::mode() == Caffe::GPU) {
CUDA_CHECK(cudaStreamDestroy(stream));
}
#endif
}
BaseDataLayervoid BaseDataLayer<Dtype>::LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {
if (top.size() == 1) {
output_labels_ = false;
} else {
output_labels_ = true;
}
data_transformer_.reset(
new DataTransformer<Dtype>(transform_param_, this->phase_));
data_transformer_->InitRand();
// The subclasses should setup the size of bottom and top
DataLayerSetUp(bottom, top);
}
DataLayervoid DataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {
const int batch_size = this->layer_param_.data_param().batch_size();
// Read a data point, and use it to initialize the top blob.
Datum& datum = *(reader_.full().peek());
// Use data_transformer to infer the expected blob shape from datum.
vector<int> top_shape = this->data_transformer_->InferBlobShape(datum);
this->transformed_data_.Reshape(top_shape);
// Reshape top[0] and prefetch_data according to the batch_size.
top_shape[0] = batch_size;
top[0]->Reshape(top_shape);
for (int i = 0; i < this->PREFETCH_COUNT; ++i) {
this->prefetch_[i].data_.Reshape(top_shape);
}
LOG(INFO) << "output data size: " << top[0]->num() << ","
<< top[0]->channels() << "," << top[0]->height() << ","
<< top[0]->width();
// label
if (this->output_labels_) {
vector<int> label_shape(1, batch_size);
top[1]->Reshape(label_shape);
for (int i = 0; i < this->PREFETCH_COUNT; ++i) {
this->prefetch_[i].label_.Reshape(label_shape);
}
}
}
void DataLayer<Dtype>::load_batch(Batch<Dtype>* batch) {
CPUTimer batch_timer;
batch_timer.Start();
double read_time = 0;
double trans_time = 0;
CPUTimer timer;
CHECK(batch->data_.count());
CHECK(this->transformed_data_.count());
// Reshape according to the first datum of each batch
// on single input batches allows for inputs of varying dimension.
const int batch_size = this->layer_param_.data_param().batch_size();
Datum& datum = *(reader_.full().peek());
// Use data_transformer to infer the expected blob shape from datum.
vector<int> top_shape = this->data_transformer_->InferBlobShape(datum);
this->transformed_data_.Reshape(top_shape);
// Reshape batch according to the batch_size.
top_shape[0] = batch_size;
batch->data_.Reshape(top_shape);
Dtype* top_data = batch->data_.mutable_cpu_data();
Dtype* top_label = NULL; // suppress warnings about uninitialized variables
if (this->output_labels_) {
top_label = batch->label_.mutable_cpu_data();
}
for (int item_id = 0; item_id < batch_size; ++item_id) {
timer.Start();
// get a datum
Datum& datum = *(reader_.full().pop("Waiting for data"));
read_time += timer.MicroSeconds();
timer.Start();
// Apply data transformations (mirror, scale, crop...)
int offset = batch->data_.offset(item_id);
this->transformed_data_.set_cpu_data(top_data + offset);
this->data_transformer_->Transform(datum, &(this->transformed_data_));
// Copy label.
if (this->output_labels_) {
top_label[item_id] = datum.label();
}
trans_time += timer.MicroSeconds();
reader_.free().push(const_cast<Datum*>(&datum));
}
timer.Stop();
batch_timer.Stop();
DLOG(INFO) << "Prefetch batch: " << batch_timer.MilliSeconds() << " ms.";
DLOG(INFO) << " Read time: " << read_time / 1000 << " ms.";
DLOG(INFO) << "Transform time: " << trans_time / 1000 << " ms.";
}
締め括りをつける1.sourceが一つしかないなら、Bodyオブジェクトは一つしかない。Body
body_
のメンバーnew_queue_pairs
の中のfree_
とfull_
の列の中の要素はDatumで、列の長さは(4 x batch_)である。2.BasePrefetchingDataLayer
の中のprefetch_free_
prefetch_full_
の列の要素はBatchです。列の長さは3.3です。2つのStartInternalThread
の関数についてはInternalThreadEntry()
の関数を呼び出しました。この関数にはwhile (!must_stop()){xxxfree_.pop()}
があります。このwhileサイクルは、コンフィギュレーション関数を呼び出すまで継続します。そのため、次のStopInternalThread()
列に新しいデータが入力されるまで一時的にブロックされます。初期化
最初のソロverのData Reader reader au初期化の時、1.
xxxfree_.pop()
/free_
に入れました。これだけ多くのprefetch_free_
.2.BlockingQueue
は以下の分析を参照してください。3.xxxfree_
の残りのソロの中のData Reader reader au初期化の際、1.queue_pair_(new QueuePair(
param.data_param().prefetch() * param.data_param().batch_size()))
/free_中に入れました。こんなにたくさんのDatum.(4 x batch_)size)3.free_
分析:body_コンストラクタDatum
は、body_.reset(new Body(param));// Body.
関数を呼び出します。void DataReader::Body::InternalThreadEntry() {
shared_ptr<db::DB> db(db::GetDB(param_.data_param().backend()));
db->Open(param_.data_param().source(), db::READ);
shared_ptr<db::Cursor> cursor(db->NewCursor());
vector<shared_ptr<QueuePair> > qps;
try {
int solver_count = param_.phase() == TRAIN ? Caffe::solver_count() : 1;
// To ensure deterministic runs, only start running once all solvers
// are ready. But solvers need to peek on one item during initialization,
// so read one item, then wait for the next solver.
for (int i = 0; i < solver_count; ++i) {
shared_ptr<QueuePair> qp(new_queue_pairs_.pop());
read_one(cursor.get(), qp.get());
qps.push_back(qp);
}
// Main loop
/* while `body_` , solver `read_one` :*/
while (!must_stop()) {
for (int i = 0; i < solver_count; ++i) {
read_one(cursor.get(), qps[i].get());
}
// Check no additional readers have been created. This can happen if
// more than one net is trained at a time per process, whether single
// or multi solver. It might also happen if two data layers have same
// name and same source.
CHECK_EQ(new_queue_pairs_.size(), 0);
}
} catch (boost::thread_interrupted&) {
// Interrupted exception is expected on shutdown
}
}
void DataReader::Body::read_one(db::Cursor* cursor, QueuePair* qp) {
Datum* datum = qp->free_.pop();
// TODO deserialize in-place instead of copy?
datum->ParseFromString(cursor->value());
qp->full_.push(datum);
// go to the next iter
cursor->Next();
if (!cursor->valid()) {
DLOG(INFO) << "Restarting data prefetching from start.";
cursor->SeekToFirst();
}
}
body_->new_queue_pairs_.push(queue_pair_);
関数では、queue_pair_(new QueuePair(
param.data_param().prefetch() * param.data_param().batch_size()))
の列の中のpopが一つで、body_->new_queue_pairs_.push(queue_pair_);
の列の中に新しいデータが入ってきます。そうすると、各ソロverはプリ読み取りできます。このStartInternalThread()
は、InternalThreadEntry()
列のポップが空きました。その後、何が発生しますか?read_one
はfree_
ですので、次回full_
で新しいデータがpushされるまでしばらく渋滞します。いつfree_。で新しいデータがpushされますか?layer SetUp
Datum
関数には、for (int item_id = 0; item_id < batch_size; ++item_id) {
// get a datum
Datum& datum = *(reader_.full().pop("Waiting for data"));
reader_.free().push(const_cast<Datum*>(&datum));
}
上記の要約で言及されたfree_
に関しては、free_
は、関数BlockingQueue
にfree_
の関数を呼び出し、DataLayer::load_batch
の関数を呼び出す。BasePrefetchingDataLayer<Dtype>::InternalThreadEntry() {
while (!must_stop()) {
Batch<Dtype>* batch = prefetch_free_.pop();
load_batch(batch);
prefetch_full_.push(batch);
}
}
では、prefetch_free_
はいつまたプッシュしますか?void BasePrefetchingDataLayer<Dtype>::Forward_cpu(
const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
Batch<Dtype>* batch = prefetch_full_.pop("Data layer prefetch queue empty");
// Reshape to loaded data.
top[0]->ReshapeLike(batch->data_);
// Copy the data
caffe_copy(batch->data_.count(), batch->data_.cpu_data(),
top[0]->mutable_cpu_data());
DLOG(INFO) << "Prefetch copied";
if (this->output_labels_) {
// Reshape to loaded labels.
top[1]->ReshapeLike(batch->label_);
// Copy the labels.
caffe_copy(batch->label_.count(), batch->label_.cpu_data(),
top[1]->mutable_cpu_data());
}
prefetch_free_.push(batch);
}
RNGhttp://www.boost.org/doc/libs/1_46_0/doc/html/bootst_ランドドm/tutorial.http://stackoverflow.com/questions/2254909/boost-random-number-generator
定義
common.hpp
class Caffe {
public:
// This random number generator facade hides boost and CUDA rng
// implementation from one another (for cross-platform compatibility).
class RNG {
public:
RNG();
explicit RNG(unsigned int seed);
explicit RNG(const RNG&);
RNG& operator=(const RNG&);
void* generator();
private:
class Generator;
shared_ptr<Generator> generator_;
};
// Getters for boost rng, curand, and cublas handles
inline static RNG& rng_stream() {
if (!Get().random_generator_) {
Get().random_generator_.reset(new RNG());
}
return *(Get().random_generator_);
}
shared_ptr<RNG> random_generator_;
}
common.cppclass Caffe::RNG::Generator {
public:
Generator() : rng_(new caffe::rng_t(cluster_seedgen())) {}
explicit Generator(unsigned int seed) : rng_(new caffe::rng_t(seed)) {}
caffe::rng_t* rng() { return rng_.get(); }
private:
shared_ptr<caffe::rng_t> rng_;
};
Caffe::RNG::RNG() : generator_(new Generator()) { }
Caffe::RNG::RNG(unsigned int seed) : generator_(new Generator(seed)) { }
Caffe::RNG& Caffe::RNG::operator=(const RNG& other) {
generator_ = other.generator_;
return *this;
}
void* Caffe::RNG::generator() {
return static_cast<void*>(generator_->rng());
}
初期化prefetch_full_
にBasePrefetchingDataLayer<Dtype>::LayerSetUp
が使用しているところはStartInternalThread
にあります。if (Caffe::root_solver() && param_.random_seed() >= 0) {
Caffe::set_random_seed(param_.random_seed());
}
common.cppvoid Caffe::set_random_seed(const unsigned int seed) {
// RNG seed
Get().random_generator_.reset(new RNG(seed));
}
rng.cppnamespace caffe {
typedef boost::mt19937 rng_t;
inline rng_t* caffe_rng() {
return static_cast<caffe::rng_t*>(Caffe::rng_stream().generator());
}
//...
}
これで初期化されました。generator_,[class Caffe generator[u]クラスRNG"rng_(new caffe:rng_t(seed)[class Generator]shuffle the mages
イメージ.イメージdata_layer.cpp
//shared_ptr<Caffe::RNG> prefetch_rng_;
const unsigned int prefetch_rng_seed = caffe_rng_rand();
prefetch_rng_.reset(new Caffe::RNG(prefetch_rng_seed));
ShuffleImages();
mathfunctions.cppunsigned int caffe_rng_rand() {
return (*caffe_rng())();
}
rng.cppnamespace caffe {
typedef boost::mt19937 rng_t;
inline rng_t* caffe_rng() {
return static_cast<caffe::rng_t*>(Caffe::rng_stream().generator());
}
//...
}
common.hppinline static RNG& rng_stream() {
if (!Get().random_generator_) {
Get().random_generator_.reset(new RNG());
}
return *(Get().random_generator_);
}
void* Caffe::RNG::generator() {
return static_cast<void*>(generator_->rng());
}
caffe::rng_t* rng() { return rng_.get(); }
使い方2data_trnsformer.cpp
template <typename Dtype>
void DataTransformer<Dtype>::InitRand() {
const bool needs_rand = param_.mirror() ||
(phase_ == TRAIN && param_.crop_size());
//shared_ptr<Caffe::RNG> rng_;
if (needs_rand) {
const unsigned int rng_seed = caffe_rng_rand();
rng_.reset(new Caffe::RNG(rng_seed));
} else {
rng_.reset();
}
}
template <typename Dtype>
int DataTransformer<Dtype>::Rand(int n) {
CHECK(rng_);
CHECK_GT(n, 0);
caffe::rng_t* rng =
static_cast<caffe::rng_t*>(rng_->generator());
return ((*rng)() % n);
}
//to be continued////