ColumnFamilyData* CreateColumnFamily(conststd::string& name, uint32_t id, Version* dummy_version, const ColumnFamilyOptions& options); iterator begin(){ return iterator(dummy_cfd_->next_); } iterator end(){ return iterator(dummy_cfd_); } ............................... private: friendclassColumnFamilyData; // helper function that gets called from cfd destructor // REQUIRES: DB mutex held voidRemoveColumnFamily(ColumnFamilyData* cfd);
// column_families_ and column_family_data_ need to be protected: // * when mutating both conditions have to be satisfied: // 1. DB mutex locked // 2. thread currently in single-threaded write thread // * when reading, at least one condition needs to be satisfied: // 1. DB mutex locked // 2. accessed from a single-threaded write thread std::unordered_map<std::string, uint32_t> column_families_; std::unordered_map<uint32_t, ColumnFamilyData*> column_family_data_;
uint32_t max_column_family_; ColumnFamilyData* dummy_cfd_; // We don't hold the refcount here, since default column family always exists // We are also not responsible for cleaning up default_cfd_cache_. This is // just a cache that makes common case (accessing default column family) // faster ColumnFamilyData* default_cfd_cache_;
// Ref() can only be called from a context where the caller can guarantee // that ColumnFamilyData is alive (while holding a non-zero ref already, // holding a DB mutex, or as the leader in a write batch group). voidRef(){ refs_.fetch_add(1, std::memory_order_relaxed); }
// Unref decreases the reference count, but does not handle deletion // when the count goes to 0. If this method returns true then the // caller should delete the instance immediately, or later, by calling // FreeDeadColumnFamilies(). Unref() can only be called while holding // a DB mutex, or during single-threaded recovery. boolUnref(){ int old_refs = refs_.fetch_sub(1, std::memory_order_relaxed); assert(old_refs > 0); return old_refs == 1; } ..............................
uint32_t id_; conststd::string name_; Version* dummy_versions_; // Head of circular doubly-linked list of versions. Version* current_; // == dummy_versions->prev_ ......................................................
// Thread's local copy of SuperVersion pointer // This needs to be destructed before mutex_ std::unique_ptr<ThreadLocalPtr> local_sv_;
// pointers for a circular linked list. we use it to support iterations over // all column families that are alive (note: dropped column families can also // be alive as long as client holds a reference) ColumnFamilyData* next_; ColumnFamilyData* prev_; ...................................
// ColumnFamilyHandleImpl is the class that clients use to access different // column families. It has non-trivial destructor, which gets called when client // is done using the column family classColumnFamilyHandleImpl :public ColumnFamilyHandle { public: // create while holding the mutex ColumnFamilyHandleImpl( ColumnFamilyData* cfd, DBImpl* db, InstrumentedMutex* mutex); // destroy without mutex virtual ~ColumnFamilyHandleImpl(); virtual ColumnFamilyData* cfd()const{ return cfd_; } ......................................
// LogAndApply will both write the creation in MANIFEST and create // ColumnFamilyData object { // write thread WriteThread::Writer w; write_thread_.EnterUnbatched(&w, &mutex_); // LogAndApply will both write the creation in MANIFEST and create // ColumnFamilyData object s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options), &edit, &mutex_, directories_.GetDbDir(), false, &cf_options); write_thread_.ExitUnbatched(&w); } if (s.ok()) { ........................................ *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_); ROCKS_LOG_INFO(immutable_db_options_.info_log, "Created column family [%s] (ID %u)", column_family_name.c_str(), (unsigned)cfd->GetID()); } ............................................. } // InstrumentedMutexLock l(&mutex_)
// under a DB mutex AND from a write thread void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) { auto cfd_iter = column_family_data_.find(cfd->GetID()); assert(cfd_iter != column_family_data_.end()); column_family_data_.erase(cfd_iter); column_families_.erase(cfd->GetName()); }
// DB mutex held ColumnFamilyData::~ColumnFamilyData() { assert(refs_.load(std::memory_order_relaxed) == 0); // remove from linked list auto prev = prev_; auto next = next_; prev->next_ = next; next->prev_ = prev;
if (!dropped_ && column_family_set_ != nullptr) { // If it's dropped, it's already removed from column family set // If column_family_set_ == nullptr, this is dummy CFD and not in // ColumnFamilySet column_family_set_->RemoveColumnFamily(this); }
if (current_ != nullptr) { current_->Unref(); } .............................. }
while (reader.ReadRecord(&record, &scratch) && s.ok()) { VersionEdit edit; s = edit.DecodeFrom(record); if (!s.ok()) { break; }
// Not found means that user didn't supply that column // family option AND we encountered column family add // record. Once we encounter column family drop record, // we will delete the column family from // column_families_not_found. bool cf_in_not_found = column_families_not_found.find(edit.column_family_) != column_families_not_found.end(); // in builders means that user supplied that column family // option AND that we encountered column family add record bool cf_in_builders = builders.find(edit.column_family_) != builders.end();
// they can't both be true assert(!(cf_in_not_found && cf_in_builders));
ColumnFamilyData* cfd = nullptr;
if (edit.is_column_family_add_) { if (cf_in_builders || cf_in_not_found) { s = Status::Corruption( "Manifest adding the same column family twice"); break; } auto cf_options = cf_name_to_options.find(edit.column_family_name_); if (cf_options == cf_name_to_options.end()) { column_families_not_found.insert( {edit.column_family_, edit.column_family_name_}); } else { cfd = CreateColumnFamily(cf_options->second, &edit); cfd->set_initialized(); builders.insert( {edit.column_family_, new BaseReferencedVersionBuilder(cfd)}); } } elseif (edit.is_column_family_drop_) { if (cf_in_builders) { auto builder = builders.find(edit.column_family_); assert(builder != builders.end()); delete builder->second; builders.erase(builder); cfd = column_family_set_->GetColumnFamily(edit.column_family_); if (cfd->Unref()) { delete cfd; cfd = nullptr; } else { // who else can have reference to cfd!? assert(false); } } elseif (cf_in_not_found) { column_families_not_found.erase(edit.column_family_); } else { s = Status::Corruption( "Manifest - dropping non-existing column family"); break; } } elseif (!cf_in_not_found) { if (!cf_in_builders) { s = Status::Corruption( "Manifest record referencing unknown column family"); break; }
cfd = column_family_set_->GetColumnFamily(edit.column_family_); // this should never happen since cf_in_builders is true assert(cfd != nullptr);
// if it is not column family add or column family drop, // then it's a file add/delete, which should be forwarded // to builder auto builder = builders.find(edit.column_family_); assert(builder != builders.end()); builder->second->version_builder()->Apply(&edit); }
if (cfd != nullptr) { if (edit.has_log_number_) { if (cfd->GetLogNumber() > edit.log_number_) { ROCKS_LOG_WARN( db_options_->info_log, "MANIFEST corruption detected, but ignored - Log numbers in " "records NOT monotonically increasing"); } else { cfd->SetLogNumber(edit.log_number_); have_log_number = true; } } if (edit.has_comparator_ && edit.comparator_ != cfd->user_comparator()->Name()) { s = Status::InvalidArgument( cfd->user_comparator()->Name(), "does not match existing comparator " + edit.comparator_); break; } }
if (edit.has_prev_log_number_) { previous_log_number = edit.prev_log_number_; have_prev_log_number = true; }
if (edit.has_next_file_number_) { next_file = edit.next_file_number_; have_next_file = true; }
if (edit.has_max_column_family_) { max_column_family = edit.max_column_family_; }