LevelDB Log: WAL of LSMTree C0

如果对 WAL 这个名词陌生的话…我想这篇文章的读者应该不会真的没听过 WAL 吧…

实际上数据库的 WAL 还是比较难阅读的,以 B+Tree + Undo/Redo Log 为例,Log 的 Undo/Redo 和 checkout point, GC 本身有比较大的复杂性,这给阅读带来了很大的困难。

LevelDB 的结构是 LSMTree, LSMTree 原论文中,它的结构如下:

9F1865E8-603E-4385-B237-C776166512E8

这是论文中的 figure 2.1, 我们可以看到C0 是在内存中的。也就是说,LevelDB 有相对简单的 WAL,作为写 memtable 之前写入的日志。

我们需要关注的是:

  1. 日志的结构是什么样的
  2. 日志是什么时候开始写,有什么时候删除的
  3. 对于盘的故障,日志怎么处理
  4. 日志命名策略是什么

日志的结构

关于问题1,最好的阅读地点是:https://github.com/google/leveldb/blob/master/doc/log_format.md

大概有这样的逻辑关系:

  1. The log file contents are a sequence of 32KB blocks. The only exception is that the tail of the file may contain a partial block.
  2. Each block consists of a sequence of records:
1
2
3
4
5
6
block := record* trailer?
record :=
checksum: uint32 // crc32c of type and data[] ; little-endian
length: uint16 // little-endian
type: uint8 // One of FULL, FIRST, MIDDLE, LAST
data: uint8[length]

总结一下:

  • 日志文件是可能的一个 header 加上若干个 block
  • 每个 block 结构这个有点像正则表达式,包含 * 个 record 和可能存在的 trailer
  • 每个 record 包含 crc32, little-endian 的 length, 一个类型和具体的 byte 数据

A record never starts within the last six bytes of a block (since it won’t fit). Any leftover bytes here form the trailer, which must consist entirely of zero bytes and must be skipped by readers.

  • trailer 如上述内容,相当于是尾部的 padding,上一块的内容小于6即可跳过这些 padding。

为什么是 7byte

可以贴一下db/log_format.h 的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// Log format information shared by reader and writer.
// See ../doc/log_format.md for more detail.

#ifndef STORAGE_LEVELDB_DB_LOG_FORMAT_H_
#define STORAGE_LEVELDB_DB_LOG_FORMAT_H_

namespace leveldb {
namespace log {

enum RecordType {
// Zero is reserved for preallocated files
kZeroType = 0,

kFullType = 1,

// For fragments
kFirstType = 2,
kMiddleType = 3,
kLastType = 4
};
static const int kMaxRecordType = kLastType;

static const int kBlockSize = 32768;

// Header is checksum (4 bytes), length (2 bytes), type (1 byte).
static const int kHeaderSize = 4 + 2 + 1;

} // namespace log
} // namespace leveldb

#endif // STORAGE_LEVELDB_DB_LOG_FORMAT_H_
  • kBlockSize 是 32k, 即32 * 1024 = 32768
  • RecordType 是个 enum, 有下列几种类型。enum 在 C++ 默认是 int, 不过似乎它就用了一位
    • kFullType 全记录
    • kFirstType kMiddleType kLastType: 截断的记录的 first, middle..., last
    • kZeroType 不了解
  • kHeaderSize 包含 checksum, length, type。长度如前文所述。

关于 first middle last 还可以参考文档:

Example: consider a sequence of user records:

1
2
3
A: length 1000
B: length 97270
C: length 8000

A will be stored as a FULL record in the first block.

B will be split into three fragments: first fragment occupies the rest of the first block, second fragment occupies the entirety of the second block, and the third fragment occupies a prefix of the third block. This will leave six bytes free in the third block, which will be left empty as the trailer.

C will be stored as a FULL record in the fourth block.

(吐槽一下,这么大的 kv,内存肯定一次分配了hhh)

filename

1
2
3
4
std::string LogFileName(const std::string& dbname, uint64_t number) {
assert(number > 0);
return MakeFileName(dbname, number, "log");
}

接着看

1
2
3
4
5
6
7
static std::string MakeFileName(const std::string& dbname, uint64_t number,
const char* suffix) {
char buf[100];
std::snprintf(buf, sizeof(buf), "/%06llu.%s",
static_cast<unsigned long long>(number), suffix);
return dbname + buf;
}

dbname_number.log 形式保存文件

Log 与 Writer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
namespace leveldb {

class WritableFile;

namespace log {

class Writer {
public:
// Create a writer that will append data to "*dest".
// "*dest" must be initially empty.
// "*dest" must remain live while this Writer is in use.
explicit Writer(WritableFile* dest);

// Create a writer that will append data to "*dest".
// "*dest" must have initial length "dest_length".
// "*dest" must remain live while this Writer is in use.
Writer(WritableFile* dest, uint64_t dest_length);

Writer(const Writer&) = delete;
Writer& operator=(const Writer&) = delete;

~Writer();

Status AddRecord(const Slice& slice);

private:
Status EmitPhysicalRecord(RecordType type, const char* ptr, size_t length);

WritableFile* dest_;
int block_offset_; // Current offset in block

// crc32c values for all supported record types. These are
// pre-computed to reduce the overhead of computing the crc of the
// record type stored in the header.
uint32_t type_crc_[kMaxRecordType + 1];
};

} // namespace log
} // namespace leveldb
  • dest_length 的相当于对 file 自带一定的 offset
  • AddRecord 是 public 的,外部交给它添加日志
  • EmitPhysicalRecord 是实际写入的函数

添加日志代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
Status Writer::AddRecord(const Slice& slice) {
// left is for slice.
const char* ptr = slice.data();
size_t left = slice.size();

// Fragment the record if necessary and emit it. Note that if slice
// is empty, we still want to iterate once to emit a single
// zero-length record
Status s;
bool begin = true;
do {
const int leftover = kBlockSize - block_offset_;
assert(leftover >= 0);
// < 7 byte, just padding it.
if (leftover < kHeaderSize) {
// Switch to a new block
if (leftover > 0) {
// Fill the trailer (literal below relies on kHeaderSize being 7)
static_assert(kHeaderSize == 7, "");
dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
}
// to a new block.
block_offset_ = 0;
}

// Invariant: we never leave < kHeaderSize bytes in a block.
assert(kBlockSize - block_offset_ - kHeaderSize >= 0);

// 目前 block 中可写入的量
const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
// 目前这个 block 中需要写入的数据量
const size_t fragment_length = (left < avail) ? left : avail;

RecordType type;
// end 表示在这个 block 内能否写完
const bool end = (left == fragment_length);
if (begin && end) {
type = kFullType;
} else if (begin) {
type = kFirstType;
} else if (end) {
type = kLastType;
} else {
type = kMiddleType;
}

s = EmitPhysicalRecord(type, ptr, fragment_length);
ptr += fragment_length;
left -= fragment_length;
begin = false;
} while (s.ok() && left > 0);
return s;
}

Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr,
size_t length) {
assert(length <= 0xffff); // Must fit in two bytes
assert(block_offset_ + kHeaderSize + length <= kBlockSize);

// Format the header
char buf[kHeaderSize];
buf[4] = static_cast<char>(length & 0xff);
buf[5] = static_cast<char>(length >> 8);
buf[6] = static_cast<char>(t);

// Compute the crc of the record type and the payload.
uint32_t crc = crc32c::Extend(type_crc_[t], ptr, length);
crc = crc32c::Mask(crc); // Adjust for storage
EncodeFixed32(buf, crc);

// Write the header and the payload
Status s = dest_->Append(Slice(buf, kHeaderSize));
if (s.ok()) {
s = dest_->Append(Slice(ptr, length));
if (s.ok()) {
s = dest_->Flush();
}
}
block_offset_ += kHeaderSize + length;
return s;
}

我感觉没啥好说的,这代码真的很好理解,我加的注释都感觉很多余…

EmitPhysicalRecord如果写坏了(即 s.ok 不满足),会直接跳过这条记录

Reader

Reader 代码相对会复杂很多,这里简单介绍一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
class Reader {
public:
// Interface for reporting errors.
class Reporter {
public:
virtual ~Reporter();

// Some corruption was detected. "size" is the approximate number
// of bytes dropped due to the corruption.
virtual void Corruption(size_t bytes, const Status& status) = 0;
};

// Create a reader that will return log records from "*file".
// "*file" must remain live while this Reader is in use.
//
// If "reporter" is non-null, it is notified whenever some data is
// dropped due to a detected corruption. "*reporter" must remain
// live while this Reader is in use.
//
// If "checksum" is true, verify checksums if available.
//
// The Reader will start reading at the first record located at physical
// position >= initial_offset within the file.
Reader(SequentialFile* file, Reporter* reporter, bool checksum,
uint64_t initial_offset);

Reader(const Reader&) = delete;
Reader& operator=(const Reader&) = delete;

~Reader();

// Read the next record into *record. Returns true if read
// successfully, false if we hit end of the input. May use
// "*scratch" as temporary storage. The contents filled in *record
// will only be valid until the next mutating operation on this
// reader or the next mutation to *scratch.
bool ReadRecord(Slice* record, std::string* scratch);

// Returns the physical offset of the last record returned by ReadRecord.
//
// Undefined before the first call to ReadRecord.
uint64_t LastRecordOffset();

private:
// Extend record types with the following special values
enum {
kEof = kMaxRecordType + 1,
// Returned whenever we find an invalid physical record.
// Currently there are three situations in which this happens:
// * The record has an invalid CRC (ReadPhysicalRecord reports a drop)
// * The record is a 0-length record (No drop is reported)
// * The record is below constructor's initial_offset (No drop is reported)
kBadRecord = kMaxRecordType + 2
};

// Skips all blocks that are completely before "initial_offset_".
//
// Returns true on success. Handles reporting.
bool SkipToInitialBlock();

// 返回 kMaxRecordType 定义的 type 或者 eof
// Return type, or one of the preceding special values
unsigned int ReadPhysicalRecord(Slice* result);

// Reports dropped bytes to the reporter.
// buffer_ must be updated to remove the dropped bytes prior to invocation.
void ReportCorruption(uint64_t bytes, const char* reason);
void ReportDrop(uint64_t bytes, const Status& reason);

// SequentialFile, * const 即不会指向别的对象
SequentialFile* const file_;
// Reporter 上报事件
Reporter* const reporter_;
// 是否使用 checksum
bool const checksum_;
// 感觉上 backing_store_ 是一个 kBlockSize 大小的数组
// 用来存放读到的信息。
char* const backing_store_;
// 读 buffer, 指向 backing_store
Slice buffer_;
// EOF 处理
bool eof_; // Last Read() indicated EOF by returning < kBlockSize

// 文件中 last_record 读的 offset
// Offset of the last record returned by ReadRecord.
uint64_t last_record_offset_;

// 维护的 buffer 的 offset
// Offset of the first location past the end of buffer_.
uint64_t end_of_buffer_offset_;

// The Reader will start reading at the first record located at physical
// position >= initial_offset within the file.
// Offset at which to start looking for the first record to return
uint64_t const initial_offset_;

// True if we are resynchronizing after a seek (initial_offset_ > 0). In
// particular, a run of kMiddleType and kLastType records can be silently
// skipped in this mode
// 是否需要重新同步,通常因为 initial_offset_ seek 之后导致
bool resyncing_;
};
  • 外部调用 read-offset 来读取信息
  • backing_store_buffer_ 用来管理读到的信息,end_of_buffer_offset_ 是标注buffer_ 读到的信息的 length 的
  • eof_ 表示读取是否是 eof 状态
  • initial_offset_ 表示最初跳过的 offset, 实现的时候为了对齐之类的理由不会直接 seek 到对应位置,而是 seek 到 block 的开头,然后 resyncing_

reader 构造函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum,
uint64_t initial_offset)
: file_(file),
reporter_(reporter),
checksum_(checksum),
backing_store_(new char[kBlockSize]),
buffer_(),
eof_(false),
last_record_offset_(0),
end_of_buffer_offset_(0),
initial_offset_(initial_offset),
resyncing_(initial_offset > 0) {}

其实比较有意思的是 backing_store_ 的初始化,为什么要在堆上捏=,=我也不太了解

读流程大致如下,可以参考 ReadRecordReadPhysicalRecord

  • 尝试seek到 block 的开头,然后每次读取的时候,读取一个 block。数据会被读取到 backing_store_ 里,并以 buffer_ 的形式访问
  • buffer_ 中还有足够内容的时候,从里面读取 record
  • 读取不完整的话会写入到 scratch 里,ReadRecord 实现了一个类似状态机的模型,最后会把 record 完整吐出来。

日志与错误处理

大部分错误处理我觉得看 reader 代码能学到不少。

文档里还提到了和 RecordIO Data Format 的对比:http://mesos.apache.org/documentation/latest/recordio/