// Accessors/mutators for links. Wrapped in methods so we can // add the appropriate barriers as necessary. Node* Next(int n){ assert(n >= 0); // Use an 'acquire load' so that we observe a fully initialized // version of the returned Node. return next_[n].load(std::memory_order_acquire); } voidSetNext(int n, Node* x){ assert(n >= 0); // Use a 'release store' so that anybody who reads through this // pointer observes a fully initialized version of the inserted node. next_[n].store(x, std::memory_order_release); }
// No-barrier variants that can be safely used in a few locations. Node* NoBarrier_Next(int n){ assert(n >= 0); return next_[n].load(std::memory_order_relaxed); } voidNoBarrier_SetNext(int n, Node* x){ assert(n >= 0); next_[n].store(x, std::memory_order_relaxed); }
private: // Array of length equal to the node height. next_[0] is lowest level link. std::atomic<Node*> next_[1]; };
NoBarrier 在 relaxed 下操作
有 Barrier 的在 acquire/release 下操作
首先看看至关重要的 FindGreaterOrEqual:
1 2 3 4 5 6
// Return the earliest node that comes at or after key. // Return nullptr if there is no such node. // // If prev is non-null, fills prev[level] with pointer to previous // node at "level" for every level in [0..max_height_-1]. Node* FindGreaterOrEqual(const Key& key, Node** prev)const;
template <typename Key, classComparator> void SkipList<Key, Comparator>::Insert(const Key& key) { // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual() // here since Insert() is externally synchronized. Node* prev[kMaxHeight]; Node* x = FindGreaterOrEqual(key, prev);
// Our data structure does not allow duplicate insertion assert(x == nullptr || !Equal(key, x->key));
int height = RandomHeight(); if (height > GetMaxHeight()) { for (int i = GetMaxHeight(); i < height; i++) { prev[i] = head_; } // It is ok to mutate max_height_ without any synchronization // with concurrent readers. A concurrent reader that observes // the new value of max_height_ will see either the old value of // new level pointers from head_ (nullptr), or a new value set in // the loop below. In the former case the reader will // immediately drop to the next level since nullptr sorts after all // keys. In the latter case the reader will use the new node. max_height_.store(height, std::memory_order_relaxed); }
x = NewNode(key, height); for (int i = 0; i < height; i++) { // NoBarrier_SetNext() suffices since we will add a barrier when // we publish a pointer to "x" in prev[i]. x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i)); prev[i]->SetNext(i, x); } }
max_height_ 的 store 和 load 都是 relaxed 的,另一个线程只会读到旧的值,但不会读到过大的值。
classMemTable { public: // MemTables are reference counted. The initial reference count // is zero and the caller must call Ref() at least once. explicitMemTable(const InternalKeyComparator& comparator);
// Drop reference count. Delete if no more references exist. voidUnref(){ --refs_; assert(refs_ >= 0); if (refs_ <= 0) { deletethis; } }
// Returns an estimate of the number of bytes of data in use by this // data structure. It is safe to call when MemTable is being modified. size_tApproximateMemoryUsage();
// Return an iterator that yields the contents of the memtable. // // The caller must ensure that the underlying MemTable remains live // while the returned iterator is live. The keys returned by this // iterator are internal keys encoded by AppendInternalKey in the // db/format.{h,cc} module. Iterator* NewIterator();
// Add an entry into memtable that maps key to value at the // specified sequence number and with the specified type. // Typically value will be empty if type==kTypeDeletion. voidAdd(SequenceNumber seq, ValueType type, const Slice& key, const Slice& value);
// If memtable contains a value for key, store it in *value and return true. // If memtable contains a deletion for key, store a NotFound() error // in *status and return true. // Else, return false. boolGet(const LookupKey& key, std::string* value, Status* s);
boolMemTable::Get(const LookupKey& key, std::string* value, Status* s){ Slice memkey = key.memtable_key(); Table::Iterator iter(&table_); iter.Seek(memkey.data()); if (iter.Valid()) { // entry format is: // klength varint32 // userkey char[klength] // tag uint64 // vlength varint32 // value char[vlength] // Check that it belongs to same user key. We do not check the // sequence number since the Seek() call above should have skipped // all entries with overly large sequence numbers. constchar* entry = iter.key(); uint32_t key_length; constchar* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); if (comparator_.comparator.user_comparator()->Compare( Slice(key_ptr, key_length - 8), key.user_key()) == 0) { // Correct user key constuint64_t tag = DecodeFixed64(key_ptr + key_length - 8); switch (static_cast<ValueType>(tag & 0xff)) { case kTypeValue: { Slice v = GetLengthPrefixedSlice(key_ptr + key_length); value->assign(v.data(), v.size()); returntrue; } case kTypeDeletion: *s = Status::NotFound(Slice()); returntrue; } } } returnfalse; }