Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 8 additions & 28 deletions include/caffe/data_layers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

#include "leveldb/db.h"
#include "lmdb.h"
#include "pthread.h"
#include "hdf5.h"
#include "boost/scoped_ptr.hpp"

Expand All @@ -18,6 +17,7 @@
#include "caffe/filler.hpp"
#include "caffe/layer.hpp"
#include "caffe/proto/caffe.pb.h"
#include "caffe/internal_thread.hpp"

namespace caffe {

Expand All @@ -27,15 +27,8 @@ namespace caffe {
// TODO: DataLayer, ImageDataLayer, and WindowDataLayer all have the
// same basic structure and a lot of duplicated code.

// This function is used to create a pthread that prefetches the data.
template <typename Dtype>
void* DataLayerPrefetch(void* layer_pointer);

template <typename Dtype>
class DataLayer : public Layer<Dtype> {
// The function used to perform prefetching.
friend void* DataLayerPrefetch<Dtype>(void* layer_pointer);

class DataLayer : public Layer<Dtype>, public InternalThread {
public:
explicit DataLayer(const LayerParameter& param)
: Layer<Dtype>(param) {}
Expand Down Expand Up @@ -63,6 +56,8 @@ class DataLayer : public Layer<Dtype> {
virtual void CreatePrefetchThread();
virtual void JoinPrefetchThread();
virtual unsigned int PrefetchRand();
// The thread's function
virtual void InternalThreadEntry();

shared_ptr<Caffe::RNG> prefetch_rng_;

Expand All @@ -80,7 +75,6 @@ class DataLayer : public Layer<Dtype> {
int datum_height_;
int datum_width_;
int datum_size_;
pthread_t thread_;
Blob<Dtype> prefetch_data_;
Blob<Dtype> prefetch_label_;
Blob<Dtype> data_mean_;
Expand Down Expand Up @@ -182,15 +176,8 @@ class HDF5OutputLayer : public Layer<Dtype> {
Blob<Dtype> label_blob_;
};

// This function is used to create a pthread that prefetches the data.
template <typename Dtype>
void* ImageDataLayerPrefetch(void* layer_pointer);

template <typename Dtype>
class ImageDataLayer : public Layer<Dtype> {
// The function used to perform prefetching.
friend void* ImageDataLayerPrefetch<Dtype>(void* layer_pointer);

class ImageDataLayer : public Layer<Dtype>, public InternalThread {
public:
explicit ImageDataLayer(const LayerParameter& param)
: Layer<Dtype>(param) {}
Expand Down Expand Up @@ -219,6 +206,7 @@ class ImageDataLayer : public Layer<Dtype> {
virtual void CreatePrefetchThread();
virtual void JoinPrefetchThread();
virtual unsigned int PrefetchRand();
virtual void InternalThreadEntry();

shared_ptr<Caffe::RNG> prefetch_rng_;
vector<std::pair<std::string, int> > lines_;
Expand All @@ -227,7 +215,6 @@ class ImageDataLayer : public Layer<Dtype> {
int datum_height_;
int datum_width_;
int datum_size_;
pthread_t thread_;
Blob<Dtype> prefetch_data_;
Blob<Dtype> prefetch_label_;
Blob<Dtype> data_mean_;
Expand Down Expand Up @@ -277,15 +264,8 @@ class MemoryDataLayer : public Layer<Dtype> {
int pos_;
};

// This function is used to create a pthread that prefetches the window data.
template <typename Dtype>
void* WindowDataLayerPrefetch(void* layer_pointer);

template <typename Dtype>
class WindowDataLayer : public Layer<Dtype> {
// The function used to perform prefetching.
friend void* WindowDataLayerPrefetch<Dtype>(void* layer_pointer);

class WindowDataLayer : public Layer<Dtype>, public InternalThread {
public:
explicit WindowDataLayer(const LayerParameter& param)
: Layer<Dtype>(param) {}
Expand All @@ -312,9 +292,9 @@ class WindowDataLayer : public Layer<Dtype> {
virtual void CreatePrefetchThread();
virtual void JoinPrefetchThread();
virtual unsigned int PrefetchRand();
virtual void InternalThreadEntry();

shared_ptr<Caffe::RNG> prefetch_rng_;
pthread_t thread_;
Blob<Dtype> prefetch_data_;
Blob<Dtype> prefetch_label_;
Blob<Dtype> data_mean_;
Expand Down
46 changes: 46 additions & 0 deletions include/caffe/internal_thread.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2014 BVLC and contributors.

#ifndef CAFFE_INTERNAL_THREAD_HPP_
#define CAFFE_INTERNAL_THREAD_HPP_

#include <pthread.h>

namespace caffe {

/**
* Virutal class encapsulate pthread for use in base class
* The child class will acquire the ability to run a single pthread,
* by reimplementing the virutal function InternalThreadEntry.
*/
class InternalThread {
public:
InternalThread() {}
virtual ~InternalThread() {}

/** Returns true if the thread was successfully started **/
bool StartInternalThread() {
return pthread_create(&_thread, NULL, InternalThreadEntryFunc, this);
}

/** Will not return until the internal thread has exited. */
bool WaitForInternalThreadToExit() {
return pthread_join(_thread, NULL);
}

protected:
/* Implement this method in your subclass
with the code you want your thread to run. */
virtual void InternalThreadEntry() = 0;

private:
static void * InternalThreadEntryFunc(void * This) {
reinterpret_cast<InternalThread *>(This)->InternalThreadEntry();
return NULL;
}

pthread_t _thread;
};

} // namespace caffe

#endif
83 changes: 38 additions & 45 deletions src/caffe/layers/data_layer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include <stdint.h>
#include <leveldb/db.h>
#include <pthread.h>

#include <string>
#include <vector>
Expand All @@ -16,46 +15,44 @@

namespace caffe {

// This function is used to create a pthread that prefetches the data.
template <typename Dtype>
void* DataLayerPrefetch(void* layer_pointer) {
CHECK(layer_pointer);
DataLayer<Dtype>* layer = static_cast<DataLayer<Dtype>*>(layer_pointer);
CHECK(layer);
void DataLayer<Dtype>::InternalThreadEntry() {
Datum datum;
CHECK(layer->prefetch_data_.count());
Dtype* top_data = layer->prefetch_data_.mutable_cpu_data();
CHECK(prefetch_data_.count());
Dtype* top_data = prefetch_data_.mutable_cpu_data();
Dtype* top_label = NULL; // suppress warnings about uninitialized variables
if (layer->output_labels_) {
top_label = layer->prefetch_label_.mutable_cpu_data();
if (output_labels_) {
top_label = prefetch_label_.mutable_cpu_data();
}
const Dtype scale = layer->layer_param_.data_param().scale();
const int batch_size = layer->layer_param_.data_param().batch_size();
const int crop_size = layer->layer_param_.data_param().crop_size();
const bool mirror = layer->layer_param_.data_param().mirror();
const Dtype scale = this->layer_param_.data_param().scale();
const int batch_size = this->layer_param_.data_param().batch_size();
const int crop_size = this->layer_param_.data_param().crop_size();
const bool mirror = this->layer_param_.data_param().mirror();

if (mirror && crop_size == 0) {
LOG(FATAL) << "Current implementation requires mirror and crop_size to be "
<< "set at the same time.";
}
// datum scales
const int channels = layer->datum_channels_;
const int height = layer->datum_height_;
const int width = layer->datum_width_;
const int size = layer->datum_size_;
const Dtype* mean = layer->data_mean_.cpu_data();
const int channels = datum_channels_;
const int height = datum_height_;
const int width = datum_width_;
const int size = datum_size_;
const Dtype* mean = data_mean_.cpu_data();
for (int item_id = 0; item_id < batch_size; ++item_id) {
// get a blob
switch (layer->layer_param_.data_param().backend()) {
switch (this->layer_param_.data_param().backend()) {
case DataParameter_DB_LEVELDB:
CHECK(layer->iter_);
CHECK(layer->iter_->Valid());
datum.ParseFromString(layer->iter_->value().ToString());
CHECK(iter_);
CHECK(iter_->Valid());
datum.ParseFromString(iter_->value().ToString());
break;
case DataParameter_DB_LMDB:
CHECK_EQ(mdb_cursor_get(layer->mdb_cursor_, &layer->mdb_key_,
&layer->mdb_value_, MDB_GET_CURRENT), MDB_SUCCESS);
datum.ParseFromArray(layer->mdb_value_.mv_data,
layer->mdb_value_.mv_size);
CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_,
&mdb_value_, MDB_GET_CURRENT), MDB_SUCCESS);
datum.ParseFromArray(mdb_value_.mv_data,
mdb_value_.mv_size);
break;
default:
LOG(FATAL) << "Unknown database backend";
Expand All @@ -66,14 +63,14 @@ void* DataLayerPrefetch(void* layer_pointer) {
CHECK(data.size()) << "Image cropping only support uint8 data";
int h_off, w_off;
// We only do random crop when we do training.
if (layer->phase_ == Caffe::TRAIN) {
h_off = layer->PrefetchRand() % (height - crop_size);
w_off = layer->PrefetchRand() % (width - crop_size);
if (phase_ == Caffe::TRAIN) {
h_off = PrefetchRand() % (height - crop_size);
w_off = PrefetchRand() % (width - crop_size);
} else {
h_off = (height - crop_size) / 2;
w_off = (width - crop_size) / 2;
}
if (mirror && layer->PrefetchRand() % 2) {
if (mirror && PrefetchRand() % 2) {
// Copy mirrored version
for (int c = 0; c < channels; ++c) {
for (int h = 0; h < crop_size; ++h) {
Expand Down Expand Up @@ -118,34 +115,32 @@ void* DataLayerPrefetch(void* layer_pointer) {
}
}

if (layer->output_labels_) {
if (output_labels_) {
top_label[item_id] = datum.label();
}
// go to the next iter
switch (layer->layer_param_.data_param().backend()) {
switch (this->layer_param_.data_param().backend()) {
case DataParameter_DB_LEVELDB:
layer->iter_->Next();
if (!layer->iter_->Valid()) {
iter_->Next();
if (!iter_->Valid()) {
// We have reached the end. Restart from the first.
DLOG(INFO) << "Restarting data prefetching from start.";
layer->iter_->SeekToFirst();
iter_->SeekToFirst();
}
break;
case DataParameter_DB_LMDB:
if (mdb_cursor_get(layer->mdb_cursor_, &layer->mdb_key_,
&layer->mdb_value_, MDB_NEXT) != MDB_SUCCESS) {
if (mdb_cursor_get(mdb_cursor_, &mdb_key_,
&mdb_value_, MDB_NEXT) != MDB_SUCCESS) {
// We have reached the end. Restart from the first.
DLOG(INFO) << "Restarting data prefetching from start.";
CHECK_EQ(mdb_cursor_get(layer->mdb_cursor_, &layer->mdb_key_,
&layer->mdb_value_, MDB_FIRST), MDB_SUCCESS);
CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_,
&mdb_value_, MDB_FIRST), MDB_SUCCESS);
}
break;
default:
LOG(FATAL) << "Unknown database backend";
}
}

return static_cast<void*>(NULL);
}

template <typename Dtype>
Expand Down Expand Up @@ -323,14 +318,12 @@ void DataLayer<Dtype>::CreatePrefetchThread() {
} else {
prefetch_rng_.reset();
}
// Create the thread.
CHECK(!pthread_create(&thread_, NULL, DataLayerPrefetch<Dtype>,
static_cast<void*>(this))) << "Pthread execution failed.";
CHECK(!StartInternalThread()) << "Pthread execution failed";
}

template <typename Dtype>
void DataLayer<Dtype>::JoinPrefetchThread() {
CHECK(!pthread_join(thread_, NULL)) << "Pthread joining failed.";
CHECK(!WaitForInternalThreadToExit()) << "Pthread joining failed";
}

template <typename Dtype>
Expand Down
Loading