O_DIRECT Write with RocksDB

O_DIRECT 是什么东西呢?小编相信大家都知道这个 Flag,小编就不过多介绍了,一言以蔽之,如下图:

img

O_DIRECT 在机器上如果走本地文件系统,可能 VFS 对应的文件系统会直接丢给块层,这里有一定的逻辑块对齐需要,比如对齐 512/4096 bytes。如果是发给 Fuse,可能就要看 Fuse 那一端的实现。通过这样的方式来绕过 Page Cache。

这可能对读看起来不是个很强的约束,对写来说,我们会看到这里的事情复杂了很多。比如我们可以考察一个比较简单的场景:日志写。如果每个 Batch 的大小攒不到 BlockSize,不做任何缓存,对上层只提供一个 Appender 的抽象,那么下次写会需要对第一个 Block 进行读-改-写序列。这个地方其实写起来也没什么难度,不过仍然需要计入考量。这个地方我们还涉及一个小思考,比如用户 Append 11个 Block + 1Byte 的数据,来 write,那么我们会发现,这个地方我们实际上暗含了一种 Buffering 的需求。不做 Buffering,要么多一次 io (先去写第一个 Block,然后再 Direct 写后面的 Block,这样就靠多一次 io 来减少了 Buffering 的需求),要么去 buffering 来尽量减少这个开销。或者我们像 MySQL 数据(不包括 WAL)一样,直接按照 Page 的形式来写。还有一个小问题是,Buffer 某种程度上也需要是 Alignment 的,你可能需要申请 alignment buffer。

另一个比较有趣的地方我们会想到,在 Write 阶段,如果是最后一个 Block 的话,我们按照 Block 对齐,然后关闭文件,这个时候,fs 的 close…难道你要加上这些 Trailer Padding bytes 吗?所以这里可以补一个 fs truncate,在 close 的时候设置到固定的长度。

上面这些都可以见诸于 https://man7.org/linux/man-pages/man2/openat.2.html . 上面也是因为「利用操作系统」才有这些问题的,如果存储端提供了 Append Only Storage 的话,这里就不一定要走这些路了。

RocksDB 提供了 O_DIRECT IO: https://github.com/facebook/rocksdb/wiki/Direct-IO 。我们简单看看这些代码是怎么写的。RocksDB 大概可以看作是 Append 写的,我们不关注 checksum 等功能。

这里 fs 相关的支持见下面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// A file abstraction for sequential writing.  The implementation
// must provide buffering since callers may append small fragments
// at a time to the file.
class FSWritableFile {
public:
/// ...

// Indicates the upper layers if the current WritableFile implementation
// uses direct IO.
virtual bool use_direct_io() const;
};

class FileSystem : public Customizable {
public:
// Truncate the named file to the specified size.
virtual IOStatus Truncate(const std::string& /*fname*/, size_t /*size*/,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/);
};

// WritableFileWriter is a wrapper on top of Env::WritableFile. It provides
// facilities to:
// - Handle Buffered and Direct writes.
// - Rate limit writes.
// - Flush and Sync the data to the underlying filesystem.
// - Notify any interested listeners on the completion of a write.
// - Update IO stats.
class WritableFileWriter {
public:
// When this Append API is called, if the crc32c_checksum is not provided, we
// will calculate the checksum internally.
IOStatus Append(const IOOptions& opts, const Slice& data,
uint32_t crc32c_checksum = 0);

IOStatus Pad(const IOOptions& opts, const size_t pad_bytes);

IOStatus Flush(const IOOptions& opts);

IOStatus Close(const IOOptions& opts);

IOStatus Sync(const IOOptions& opts, bool use_fsync);

// Sync only the data that was already Flush()ed. Safe to call concurrently
// with Append() and Flush(). If !writable_file_->IsSyncThreadSafe(),
// returns NotSupported status.
IOStatus SyncWithoutFlush(const IOOptions& opts, bool use_fsync);

// Size including unflushed data written to this writer. If the next op is
// a successful Close, the file size will be this.
uint64_t GetFileSize() const {
return filesize_.load(std::memory_order_acquire);
}

// Returns the size of data flushed to the underlying `FSWritableFile`.
// Expected to match `writable_file()->GetFileSize()`.
// The return value can serve as a lower-bound for the amount of data synced
// by a future call to `SyncWithoutFlush()`.
uint64_t GetFlushedSize() const {
return flushed_size_.load(std::memory_order_acquire);
}

IOStatus InvalidateCache(size_t offset, size_t length) {
return writable_file_->InvalidateCache(offset, length);
}

FSWritableFile* writable_file() const { return writable_file_.get(); }

bool use_direct_io() { return writable_file_->use_direct_io(); }
private:

// Used when os buffering is OFF and we are writing
// DMA such as in Direct I/O mode
// `opts` should've been called with `FinalizeIOOptions()` before passing in
IOStatus WriteDirect(const IOOptions& opts);
};

这里最终会创建一个 WritableFileWriter,然后包装一层 FSWritableFile,通过 Append 来实现 append 写入的抽象。

这里还有个重要的类型是 AlignedBuffer,这里是 Direct IO 比较重要的辅助工具。可以把这里的 Direct IO 当成是一个 Buffering Direct IO。这个 AlignedBuffer 比较重要的功能是 Padding 和 RefitTail。Padding 能够将最后一个 Block 对齐,然后也有别的工具告诉你这个对齐补充了多少 bytes (和最后一个 Block 剩余了多少 Bytes),当写入的时候,如果有最后一个 Block,这里可能会 RefitTail,来把 Tail Block 移动到开头。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// This file contains utilities to handle the alignment of pages and buffers.

// Truncate to a multiple of page_size, which is also a page boundary. This
// helps to figuring out the right alignment.
// Example:
// TruncateToPageBoundary(4096, 5000) => 4096
// TruncateToPageBoundary((4096, 10000) => 8192
inline size_t TruncateToPageBoundary(size_t page_size, size_t s) {
s -= (s & (page_size - 1));
assert((s % page_size) == 0);
return s;
}

// Round up x to a multiple of y.
// Example:
// Roundup(13, 5) => 15
// Roundup(201, 16) => 208
inline size_t Roundup(size_t x, size_t y) { return ((x + y - 1) / y) * y; }

// Round down x to a multiple of y.
// Example:
// Rounddown(13, 5) => 10
// Rounddown(201, 16) => 192
inline size_t Rounddown(size_t x, size_t y) { return (x / y) * y; }

// AlignedBuffer manages a buffer by taking alignment into consideration, and
// aligns the buffer start and end positions. It is mainly used for direct I/O,
// though it can be used other purposes as well.
// It also supports expanding the managed buffer, and copying whole or part of
// the data from old buffer into the new expanded buffer. Such a copy especially
// helps in cases avoiding an IO to re-fetch the data from disk.
//
// Example:
// AlignedBuffer buf;
// buf.Alignment(alignment);
// buf.AllocateNewBuffer(user_requested_buf_size);
// ...
// buf.AllocateNewBuffer(2*user_requested_buf_size, /*copy_data*/ true,
// copy_offset, copy_len);
class AlignedBuffer {
size_t alignment_;
FSAllocationPtr buf_;
size_t capacity_;
size_t cursize_;
char* bufstart_;
public:
size_t Alignment() const { return alignment_; }
size_t Capacity() const { return capacity_; }
size_t CurrentSize() const { return cursize_; }
const char* BufferStart() const { return bufstart_; }
char* BufferStart() { return bufstart_; }

// 切换成新的 buffer
void AllocateNewBuffer(size_t requested_capacity, bool copy_data = false,
uint64_t copy_offset = 0, size_t copy_len = 0);


// Append to the buffer.
//
// src : source to copy the data from.
// append_size : number of bytes to copy from src.
// Returns the number of bytes appended.
//
// If append_size is more than the remaining buffer size only the
// remaining-size worth of bytes are copied.
size_t Append(const char* src, size_t append_size);

// Pad to the end of alignment with "padding"
void PadToAlignmentWith(int padding);

// After a partial flush move the tail to the beginning of the buffer.
void RefitTail(size_t tail_offset, size_t tail_size);
};

我下面截取一下 Append 代码(删除了非 direct io 相关的部分,和 crc 相关的部分):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
IOStatus WritableFileWriter::Append(const IOOptions& opts, const Slice& data,
uint32_t crc32c_checksum) {
if (seen_error()) {
return GetWriterHasPreviousErrorStatus();
}

StopWatch sw(clock_, stats_, hist_type_,
GetFileWriteHistograms(hist_type_, opts.io_activity));

const IOOptions io_options = FinalizeIOOptions(opts);
const char* src = data.data();
size_t left = data.size();
IOStatus s;
pending_sync_ = true;

TEST_KILL_RANDOM_WITH_WEIGHT("WritableFileWriter::Append:0", REDUCE_ODDS2);

// Calculate the checksum of appended data
UpdateFileChecksum(data);

{
IOSTATS_TIMER_GUARD(prepare_write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left,
io_options, nullptr);
}

// See whether we need to enlarge the buffer to avoid the flush
if (buf_.Capacity() - buf_.CurrentSize() < left) {
for (size_t cap = buf_.Capacity();
cap < max_buffer_size_; // There is still room to increase
cap *= 2) {
// See whether the next available size is large enough.
// Buffer will never be increased to more than max_buffer_size_.
size_t desired_capacity = std::min(cap * 2, max_buffer_size_);
if (desired_capacity - buf_.CurrentSize() >= left ||
(use_direct_io() && desired_capacity == max_buffer_size_)) {
buf_.AllocateNewBuffer(desired_capacity, true);
break;
}
}
}

// Flush only when buffered I/O
if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) {
if (buf_.CurrentSize() > 0) {
if (!buffered_data_with_checksum_) {
// If we're not calculating checksum of buffered data, fill the
// buffer before flushing so that the writes are aligned. This will
// benefit file system performance.
size_t appended = buf_.Append(src, left);
left -= appended;
src += appended;
}
s = Flush(io_options);
if (!s.ok()) {
set_seen_error(s);
return s;
}
}
assert(buf_.CurrentSize() == 0);
}

{
// In this case, either we do not need to do the data verification or
// caller does not provide the checksum of the data (crc32c_checksum = 0).
//
// We never write directly to disk with direct I/O on.
// or we simply use it for its original purpose to accumulate many small
// chunks
if (use_direct_io() || (buf_.Capacity() >= left)) {
while (left > 0) {
size_t appended = buf_.Append(src, left);
left -= appended;
src += appended;

if (left > 0) {
s = Flush(io_options);
if (!s.ok()) {
break;
}
}
}
}
}

TEST_KILL_RANDOM("WritableFileWriter::Append:1");
if (s.ok()) {
uint64_t cur_size = filesize_.load(std::memory_order_acquire);
filesize_.store(cur_size + data.size(), std::memory_order_release);
} else {
set_seen_error(s);
}
return s;
}

这个 buf 的 Alignment 是怎么来的呢?是 fs 上 file 给予的:

1
2
buf_.Alignment(writable_file_->GetRequiredBufferAlignment());
buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_));

这里在 io_posix 下来自

1
2
3
4
5
6
size_t PosixFileSystem::GetLogicalBlockSizeForReadIfNeeded(
const EnvOptions& options, const std::string& fname, int fd) {
return options.use_direct_reads
? PosixFileSystem::GetLogicalBlockSize(fname, fd)
: kDefaultPageSize;
}

这里逻辑比较直白,就是首先看看 buf_ 需不需要调整,最大到 max_buffer_size_,然后尝试去 AppendBuffer & Drain,这里没有直接用用户的 Buffer 来 io。这里 Flush 几乎直接调用了 WriteDirect:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// This flushes the accumulated data in the buffer. We pad data with zeros if
// necessary to the whole page.
// However, during automatic flushes padding would not be necessary.
// We always use RateLimiter if available. We move (Refit) any buffer bytes
// that are left over the
// whole number of pages to be written again on the next flush because we can
// only write on aligned
// offsets.
IOStatus WritableFileWriter::WriteDirect(const IOOptions& opts) {
if (seen_error()) {
assert(false);

return IOStatus::IOError("Writer has previous error.");
}

assert(use_direct_io());
IOStatus s;
const size_t alignment = buf_.Alignment();
assert((next_write_offset_ % alignment) == 0);

// Calculate whole page final file advance if all writes succeed
//
// 只显示 buf 能刷的页面.
const size_t file_advance =
TruncateToPageBoundary(alignment, buf_.CurrentSize());

// Calculate the leftover tail, we write it here padded with zeros BUT we
// will write it again in the future either on Close() OR when the current
// whole page fills out.
//
// 把 Tail 记录一下.
const size_t leftover_tail = buf_.CurrentSize() - file_advance;

// Round up and pad
//
// 把 buf_ 后面补 0
buf_.PadToAlignmentWith(0);

const char* src = buf_.BufferStart();
uint64_t write_offset = next_write_offset_;
size_t left = buf_.CurrentSize();
DataVerificationInfo v_info;
char checksum_buf[sizeof(uint32_t)];
Env::IOPriority rate_limiter_priority_used = opts.rate_limiter_priority;

while (left > 0) {
// Check how much is allowed
size_t size = left;
if (rate_limiter_ != nullptr &&
rate_limiter_priority_used != Env::IO_TOTAL) {
size = rate_limiter_->RequestToken(left, buf_.Alignment(),
rate_limiter_priority_used, stats_,
RateLimiter::OpType::kWrite);
}

{
IOSTATS_TIMER_GUARD(write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
opts, nullptr);


if (!s.ok()) {
buf_.Size(file_advance + leftover_tail);
set_seen_error(s);
return s;
}
}

IOSTATS_ADD(bytes_written, size);
left -= size;
src += size;
write_offset += size;
uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
flushed_size_.store(cur_size + size, std::memory_order_release);
assert((next_write_offset_ % alignment) == 0);
}

if (s.ok()) {
// Move the tail to the beginning of the buffer
// This never happens during normal Append but rather during
// explicit call to Flush()/Sync() or Close()
buf_.RefitTail(file_advance, leftover_tail);
// This is where we start writing next time which may or not be
// the actual file size on disk. They match if the buffer size
// is a multiple of whole pages otherwise filesize_ is leftover_tail
// behind
next_write_offset_ += file_advance;
} else {
set_seen_error(s);
}
return s;
}

这里相当于写入 Padding 之后的页面,然后再去 RefitTail,移动最后一个 Block 到开头。我们最后关注 Close,可以看到这里被包装成了 Close + Truncate。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
IOStatus WritableFileWriter::Close(const IOOptions& opts) {
IOOptions io_options = FinalizeIOOptions(opts);
if (seen_error()) {
IOStatus interim;
if (writable_file_.get() != nullptr) {
interim = writable_file_->Close(io_options, nullptr);
writable_file_.reset();
}
if (interim.ok()) {
return IOStatus::IOError(
"File is closed but data not flushed as writer has previous error.");
} else {
return interim;
}
}

// Do not quit immediately on failure the file MUST be closed

// Possible to close it twice now as we MUST close
// in __dtor, simply flushing is not enough
// Windows when pre-allocating does not fill with zeros
// also with unbuffered access we also set the end of data.
if (writable_file_.get() == nullptr) {
return IOStatus::OK();
}

IOStatus s;
s = Flush(io_options); // flush cache to OS

IOStatus interim;
// In direct I/O mode we write whole pages so
// we need to let the file know where data ends.
if (use_direct_io()) {
{
uint64_t filesz = filesize_.load(std::memory_order_acquire);
interim = writable_file_->Truncate(filesz, io_options, nullptr);
}
if (interim.ok()) {
{
interim = writable_file_->Fsync(io_options, nullptr);
}
}
if (!interim.ok() && s.ok()) {
s = interim;
}
}

TEST_KILL_RANDOM("WritableFileWriter::Close:0");
{
interim = writable_file_->Close(io_options, nullptr);
}
if (!interim.ok() && s.ok()) {
s = interim;
}

writable_file_.reset();
TEST_KILL_RANDOM("WritableFileWriter::Close:1");

if (s.ok()) {
if (checksum_generator_ != nullptr && !checksum_finalized_) {
checksum_generator_->Finalize();
checksum_finalized_ = true;
}
} else {
set_seen_error(s);
}

return s;
}