// A file abstraction for sequential writing. The implementation // must provide buffering since callers may append small fragments // at a time to the file. classFSWritableFile { public: /// ...
// Indicates the upper layers if the current WritableFile implementation // uses direct IO. virtual bool use_direct_io() const; };
classFileSystem : public Customizable { public: // Truncate the named file to the specified size. virtual IOStatus Truncate(const std::string& /*fname*/, size_t /*size*/, const IOOptions& /*options*/, IODebugContext* /*dbg*/); };
// WritableFileWriter is a wrapper on top of Env::WritableFile. It provides // facilities to: // - Handle Buffered and Direct writes. // - Rate limit writes. // - Flush and Sync the data to the underlying filesystem. // - Notify any interested listeners on the completion of a write. // - Update IO stats. classWritableFileWriter { public: // When this Append API is called, if the crc32c_checksum is not provided, we // will calculate the checksum internally. IOStatus Append(const IOOptions& opts, const Slice& data, uint32_t crc32c_checksum = 0);
// Sync only the data that was already Flush()ed. Safe to call concurrently // with Append() and Flush(). If !writable_file_->IsSyncThreadSafe(), // returns NotSupported status. IOStatus SyncWithoutFlush(const IOOptions& opts, bool use_fsync);
// Size including unflushed data written to this writer. If the next op is // a successful Close, the file size will be this. uint64_t GetFileSize() const { return filesize_.load(std::memory_order_acquire); }
// Returns the size of data flushed to the underlying `FSWritableFile`. // Expected to match `writable_file()->GetFileSize()`. // The return value can serve as a lower-bound for the amount of data synced // by a future call to `SyncWithoutFlush()`. uint64_t GetFlushedSize() const { return flushed_size_.load(std::memory_order_acquire); }
// Used when os buffering is OFF and we are writing // DMA such as in Direct I/O mode // `opts` should've been called with `FinalizeIOOptions()` before passing in IOStatus WriteDirect(const IOOptions& opts); };
// This file contains utilities to handle the alignment of pages and buffers.
// Truncate to a multiple of page_size, which is also a page boundary. This // helps to figuring out the right alignment. // Example: // TruncateToPageBoundary(4096, 5000) => 4096 // TruncateToPageBoundary((4096, 10000) => 8192 inlinesize_tTruncateToPageBoundary(size_t page_size, size_t s){ s -= (s & (page_size - 1)); assert((s % page_size) == 0); return s; }
// Round up x to a multiple of y. // Example: // Roundup(13, 5) => 15 // Roundup(201, 16) => 208 inlinesize_tRoundup(size_t x, size_t y){ return ((x + y - 1) / y) * y; }
// Round down x to a multiple of y. // Example: // Rounddown(13, 5) => 10 // Rounddown(201, 16) => 192 inlinesize_tRounddown(size_t x, size_t y){ return (x / y) * y; }
// AlignedBuffer manages a buffer by taking alignment into consideration, and // aligns the buffer start and end positions. It is mainly used for direct I/O, // though it can be used other purposes as well. // It also supports expanding the managed buffer, and copying whole or part of // the data from old buffer into the new expanded buffer. Such a copy especially // helps in cases avoiding an IO to re-fetch the data from disk. // // Example: // AlignedBuffer buf; // buf.Alignment(alignment); // buf.AllocateNewBuffer(user_requested_buf_size); // ... // buf.AllocateNewBuffer(2*user_requested_buf_size, /*copy_data*/ true, // copy_offset, copy_len); classAlignedBuffer { size_t alignment_; FSAllocationPtr buf_; size_t capacity_; size_t cursize_; char* bufstart_; public: size_tAlignment()const{ return alignment_; } size_tCapacity()const{ return capacity_; } size_tCurrentSize()const{ return cursize_; } constchar* BufferStart()const{ return bufstart_; } char* BufferStart(){ return bufstart_; } // 切换成新的 buffer voidAllocateNewBuffer(size_t requested_capacity, bool copy_data = false, uint64_t copy_offset = 0, size_t copy_len = 0);
// Append to the buffer. // // src : source to copy the data from. // append_size : number of bytes to copy from src. // Returns the number of bytes appended. // // If append_size is more than the remaining buffer size only the // remaining-size worth of bytes are copied. size_tAppend(constchar* src, size_t append_size); // Pad to the end of alignment with "padding" voidPadToAlignmentWith(int padding); // After a partial flush move the tail to the beginning of the buffer. voidRefitTail(size_t tail_offset, size_t tail_size); };
我下面截取一下 Append 代码(删除了非 direct io 相关的部分,和 crc 相关的部分):
// See whether we need to enlarge the buffer to avoid the flush if (buf_.Capacity() - buf_.CurrentSize() < left) { for (size_t cap = buf_.Capacity(); cap < max_buffer_size_; // There is still room to increase cap *= 2) { // See whether the next available size is large enough. // Buffer will never be increased to more than max_buffer_size_. size_t desired_capacity = std::min(cap * 2, max_buffer_size_); if (desired_capacity - buf_.CurrentSize() >= left || (use_direct_io() && desired_capacity == max_buffer_size_)) { buf_.AllocateNewBuffer(desired_capacity, true); break; } } }
// Flush only when buffered I/O if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) { if (buf_.CurrentSize() > 0) { if (!buffered_data_with_checksum_) { // If we're not calculating checksum of buffered data, fill the // buffer before flushing so that the writes are aligned. This will // benefit file system performance. size_t appended = buf_.Append(src, left); left -= appended; src += appended; } s = Flush(io_options); if (!s.ok()) { set_seen_error(s); return s; } } assert(buf_.CurrentSize() == 0); }
{ // In this case, either we do not need to do the data verification or // caller does not provide the checksum of the data (crc32c_checksum = 0). // // We never write directly to disk with direct I/O on. // or we simply use it for its original purpose to accumulate many small // chunks if (use_direct_io() || (buf_.Capacity() >= left)) { while (left > 0) { size_t appended = buf_.Append(src, left); left -= appended; src += appended;
if (left > 0) { s = Flush(io_options); if (!s.ok()) { break; } } } } }
// This flushes the accumulated data in the buffer. We pad data with zeros if // necessary to the whole page. // However, during automatic flushes padding would not be necessary. // We always use RateLimiter if available. We move (Refit) any buffer bytes // that are left over the // whole number of pages to be written again on the next flush because we can // only write on aligned // offsets. IOStatus WritableFileWriter::WriteDirect(const IOOptions& opts) { if (seen_error()) { assert(false);
return IOStatus::IOError("Writer has previous error."); }
// Calculate whole page final file advance if all writes succeed // // 只显示 buf 能刷的页面. constsize_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize());
// Calculate the leftover tail, we write it here padded with zeros BUT we // will write it again in the future either on Close() OR when the current // whole page fills out. // // 把 Tail 记录一下. constsize_t leftover_tail = buf_.CurrentSize() - file_advance;
// Round up and pad // // 把 buf_ 后面补 0 buf_.PadToAlignmentWith(0);
if (s.ok()) { // Move the tail to the beginning of the buffer // This never happens during normal Append but rather during // explicit call to Flush()/Sync() or Close() buf_.RefitTail(file_advance, leftover_tail); // This is where we start writing next time which may or not be // the actual file size on disk. They match if the buffer size // is a multiple of whole pages otherwise filesize_ is leftover_tail // behind next_write_offset_ += file_advance; } else { set_seen_error(s); } return s; }
这里相当于写入 Padding 之后的页面,然后再去 RefitTail,移动最后一个 Block 到开头。我们最后关注 Close,可以看到这里被包装成了 Close + Truncate。
IOStatus WritableFileWriter::Close(const IOOptions& opts){ IOOptions io_options = FinalizeIOOptions(opts); if (seen_error()) { IOStatus interim; if (writable_file_.get() != nullptr) { interim = writable_file_->Close(io_options, nullptr); writable_file_.reset(); } if (interim.ok()) { return IOStatus::IOError( "File is closed but data not flushed as writer has previous error."); } else { return interim; } }
// Do not quit immediately on failure the file MUST be closed
// Possible to close it twice now as we MUST close // in __dtor, simply flushing is not enough // Windows when pre-allocating does not fill with zeros // also with unbuffered access we also set the end of data. if (writable_file_.get() == nullptr) { return IOStatus::OK(); }
IOStatus s; s = Flush(io_options); // flush cache to OS
IOStatus interim; // In direct I/O mode we write whole pages so // we need to let the file know where data ends. if (use_direct_io()) { { uint64_t filesz = filesize_.load(std::memory_order_acquire); interim = writable_file_->Truncate(filesz, io_options, nullptr); } if (interim.ok()) { { interim = writable_file_->Fsync(io_options, nullptr); } } if (!interim.ok() && s.ok()) { s = interim; } }
TEST_KILL_RANDOM("WritableFileWriter::Close:0"); { interim = writable_file_->Close(io_options, nullptr); } if (!interim.ok() && s.ok()) { s = interim; }