LevelDB memtable

SkipList

SkipList 介绍的地方应该比较多,邓俊晖老师数据结构 9.2 节对它的时空复杂度都有一个分析。相对于我们常用的 binary search tree, SkipList 时空效率都有一点不如。但是如果要实现Thread-safe ordered maps,SkipList 相对简单,而且可以利用 lock-free 的方式做一些很棒的优化。

介绍 LevelDB SkipList, 本文重点关注它是如何实现一写多读的语义的。

https://github.com/google/leveldb/blob/master/db/skiplist.h#L8

这段注释提到了几点:

  • 写入操作需要外部同步,只能单线程的写入。这个我们之前介绍 Put 的时候大概讲过它是怎么实现的。
  • SkipList 这里只会InsertGet, 利用 Arena 申请内存,在SkipList 使用完毕的时候,一起释放内存。
  • 已经插入的数据不会被更新

同时,阅读代码之前需要注意:SkipList 把所有信息都丢到 Key 里去了

比较有趣的是 Node 的 barrier:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Implementation details follow
template <typename Key, class Comparator>
struct SkipList<Key, Comparator>::Node {
explicit Node(const Key& k) : key(k) {}

Key const key;

// Accessors/mutators for links. Wrapped in methods so we can
// add the appropriate barriers as necessary.
Node* Next(int n) {
assert(n >= 0);
// Use an 'acquire load' so that we observe a fully initialized
// version of the returned Node.
return next_[n].load(std::memory_order_acquire);
}
void SetNext(int n, Node* x) {
assert(n >= 0);
// Use a 'release store' so that anybody who reads through this
// pointer observes a fully initialized version of the inserted node.
next_[n].store(x, std::memory_order_release);
}

// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next(int n) {
assert(n >= 0);
return next_[n].load(std::memory_order_relaxed);
}
void NoBarrier_SetNext(int n, Node* x) {
assert(n >= 0);
next_[n].store(x, std::memory_order_relaxed);
}

private:
// Array of length equal to the node height. next_[0] is lowest level link.
std::atomic<Node*> next_[1];
};
  • NoBarrierrelaxed 下操作
  • 有 Barrier 的在 acquire/release 下操作

首先看看至关重要的 FindGreaterOrEqual:

1
2
3
4
5
6
// Return the earliest node that comes at or after key.
// Return nullptr if there is no such node.
//
// If prev is non-null, fills prev[level] with pointer to previous
// node at "level" for every level in [0..max_height_-1].
Node* FindGreaterOrEqual(const Key& key, Node** prev) const;

这个 FindGreaterOrEqual 的 prev 会把一个逐层的指针赋予 prev, 即 prev[4]level = 4 的 Node 中对应 prev 的一个,直到 level = 0.

具体实现的时候,我们注意它调用的是 x->Next(level)。它是一个 memory order 为 acquire 的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key,
Node** prev) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
Node* next = x->Next(level);
if (KeyIsAfterNode(key, next)) {
// Keep searching in this list
x = next;
} else {
if (prev != nullptr) prev[level] = x;
if (level == 0) {
return next;
} else {
// Switch to next list
level--;
}
}
}
}

所以 Insert 的时候,在有外部同步的情况下,我们可以:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
Node* prev[kMaxHeight];
Node* x = FindGreaterOrEqual(key, prev);

// Our data structure does not allow duplicate insertion
assert(x == nullptr || !Equal(key, x->key));

int height = RandomHeight();
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
// new level pointers from head_ (nullptr), or a new value set in
// the loop below. In the former case the reader will
// immediately drop to the next level since nullptr sorts after all
// keys. In the latter case the reader will use the new node.
max_height_.store(height, std::memory_order_relaxed);
}

x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
}
  • max_height_ 的 store 和 load 都是 relaxed 的,另一个线程只会读到旧的值,但不会读到过大的值。
  • 插入的时候是从 0 层到最高层插入的。FindGreaterOrEqual 是反过来的。
  • 插入的时候是 x->NoBarrier_SetNextprev[i]->SetNext 。前者是一个relaxed, 后者是一个release.

我们 Insert 是单线程的,但是 SkipList 的 Iterator 可能会调用 FindGreaterOrEqual。它的 Next 是一个 acquire 的操作。

Memtable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
namespace leveldb {

class InternalKeyComparator;
class MemTableIterator;

class MemTable {
public:
// MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once.
explicit MemTable(const InternalKeyComparator& comparator);

MemTable(const MemTable&) = delete;
MemTable& operator=(const MemTable&) = delete;

// Increase reference count.
void Ref() { ++refs_; }

// Drop reference count. Delete if no more references exist.
void Unref() {
--refs_;
assert(refs_ >= 0);
if (refs_ <= 0) {
delete this;
}
}

// Returns an estimate of the number of bytes of data in use by this
// data structure. It is safe to call when MemTable is being modified.
size_t ApproximateMemoryUsage();

// Return an iterator that yields the contents of the memtable.
//
// The caller must ensure that the underlying MemTable remains live
// while the returned iterator is live. The keys returned by this
// iterator are internal keys encoded by AppendInternalKey in the
// db/format.{h,cc} module.
Iterator* NewIterator();

// Add an entry into memtable that maps key to value at the
// specified sequence number and with the specified type.
// Typically value will be empty if type==kTypeDeletion.
void Add(SequenceNumber seq, ValueType type, const Slice& key,
const Slice& value);

// If memtable contains a value for key, store it in *value and return true.
// If memtable contains a deletion for key, store a NotFound() error
// in *status and return true.
// Else, return false.
bool Get(const LookupKey& key, std::string* value, Status* s);

private:
friend class MemTableIterator;
friend class MemTableBackwardIterator;

struct KeyComparator {
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) {}
int operator()(const char* a, const char* b) const;
};

typedef SkipList<const char*, KeyComparator> Table;

~MemTable(); // Private since only Unref() should be used to delete it

KeyComparator comparator_;
int refs_;
Arena arena_;
Table table_;
};

} // namespace leveldb

这个还是很清晰的…

Put Get Codec

codec 需要理解 varint: https://developers.google.com/protocol-buffers/docs/encoding . 数字的表示形式成为了 msb + 7byte 的形式。因为这实际上是个 unsigned, 所以不需要别的处理。如果是 signed 的话或许还要考虑 zigzag 来编码。

在 LevelDB 中,我们会 Add(SequenceNumber s, ValueType type, const Slice& key, const Slice& value)。但是我们在 SkipList 中,可以看到,Node 里面是只有 Key 的。 LevelDB 会把这四个进行 codec, 同时让我们能够高效读写,支持 Get 函数。

Key 编码如下:

  • varint32 的 key length
  • key 的内容(长度为 key.size()
  • (s << 8) | type 。type 是一个 1byte 的 enum, 表示是 Put 还是 delete,剩下的 s 表示操作的递增的序列号。
  • varint32 的 value length
  • value 的内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
const Slice& value) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
const size_t encoded_len = VarintLength(internal_key_size) +
internal_key_size + VarintLength(val_size) +
val_size;
char* buf = arena_.Allocate(encoded_len);
char* p = EncodeVarint32(buf, internal_key_size);
std::memcpy(p, key.data(), key_size);
p += key_size;
EncodeFixed64(p, (s << 8) | type);
p += 8;
p = EncodeVarint32(p, val_size);
std::memcpy(p, value.data(), val_size);
assert(p + val_size == buf + encoded_len);
table_.Insert(buf);
}

同时可以关注一下 Encode 的逻辑,x86 CPU 是小端序的:

1
2
3
4
5
6
7
8
9
10
11
12
13
inline void EncodeFixed64(char* dst, uint64_t value) {
uint8_t* const buffer = reinterpret_cast<uint8_t*>(dst);

// Recent clang and gcc optimize this to a single mov / str instruction.
buffer[0] = static_cast<uint8_t>(value);
buffer[1] = static_cast<uint8_t>(value >> 8);
buffer[2] = static_cast<uint8_t>(value >> 16);
buffer[3] = static_cast<uint8_t>(value >> 24);
buffer[4] = static_cast<uint8_t>(value >> 32);
buffer[5] = static_cast<uint8_t>(value >> 40);
buffer[6] = static_cast<uint8_t>(value >> 48);
buffer[7] = static_cast<uint8_t>(value >> 56);
}

同时需要注意具体的 Comparator:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
// Order by:
// increasing user key (according to user-supplied comparator)
// decreasing sequence number
// decreasing type (though sequence# should be enough to disambiguate)
int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
if (r == 0) {
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
if (anum > bnum) {
r = -1;
} else if (anum < bnum) {
r = +1;
}
}
return r;
}

具体 Get 的时候,代码如下:

  • 某一个 key 长度是一定的,很容易构造同样的 key_length 和 key, 尝试在 SkipList 中 调用 Seek, 找到 Key
  • 我们使用的编码是 7byte 的 seq, 1byte 的 put/del flag,所以 Seek 会尝试找到
    • 如果 user_key 不一样,即本 key 不存在
    • 存在的话,能够保证 Seek 到的是最新版本,或是给定 seq 内的最新版本。判断 type 并读出 value 即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);
iter.Seek(memkey.data());
if (iter.Valid()) {
// entry format is:
// klength varint32
// userkey char[klength]
// tag uint64
// vlength varint32
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
const char* entry = iter.key();
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8), key.user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
return true;
}
case kTypeDeletion:
*s = Status::NotFound(Slice());
return true;
}
}
}
return false;
}