Arrow util: File & FileSystem

FileSystem 可以看作 Arrow 的数据访问层(不太像 fs,而是一个虚拟的文件系统)。只不过它兼容的不是 POSIX 语义,而是自己在上面封了一套各种接口。具体可以看:

相关目录在:

  • cpp/src/io/interface.h: 包含 file 本身部分接口、IO 的接口和所有的接口
  • cpp/src/io/file.h: 包含 file 本身部分接口、IO 的接口和所有的接口
  • cpp/src/arrow/filesystem: 包含 fs 部分实现和所有的接口
    • hdfs.h 下有个 arrow::io::FileSystem,这不是一套东西.

FileSystem

FileSystem 这层抽象能够辨别出一个类似文件的语义,它的概念没有下层到 FS/VFS/Node 这样的层面。我个人理解,它对应的语义是:文件/目录 的 带规则的 List / Open / Delete / Create / GetInfo,其中还包含 Copy / Move(Rename) 这样的语义。同时提供了一些 async 或者类似 Batch 的接口。

FileSystem 这里有一些下面的子类:

Subclassed by arrow::fs::GcsFileSystem, arrow::fs::HadoopFileSystem, arrow::fs::internal::MockFileSystem, arrow::fs::LocalFileSystem, arrow::fs::S3FileSystem, arrow::fs::SlowFileSystem, arrow::fs::SubTreeFileSystem

Local, S3, GCS, Hadoop 这几个虽然各有各的优化,但是一看名字你就懂他们是些啥。我们额外解释一下几个别的 FS:

  • SlowFileSystem 注入延时,是给测试用的,见下文:
1
2
3
4
5
6
7
8
9
/// \brief A FileSystem implementation that delegates to another
/// implementation but inserts latencies at various points.
class ARROW_EXPORT SlowFileSystem : public FileSystem {
public:
SlowFileSystem(std::shared_ptr<FileSystem> base_fs,
std::shared_ptr<io::LatencyGenerator> latencies);
SlowFileSystem(std::shared_ptr<FileSystem> base_fs, double average_latency);
SlowFileSystem(std::shared_ptr<FileSystem> base_fs, double average_latency,
int32_t seed);
  • MockFIleSystem: 把所有内容持有在内存中的 FileSystem,应该是给测试用的
  • SubTreeFileSystem: 感觉像是某个 base filesystem,给一个子路径的 “SubTree” 文件系统

这里 arrow 内部还支持了一套统一的 URL 系统,然后从 URL 系统中 load 出来对应的 path。比如 gcs://, 内容是 FileSystemFromUri. 需要注意的是本地文件是 file:// 这类的前缀。

话说到这里,每个文件系统还有一些文件和路径的概念,Arrow 是怎么映射这些文件和路径的呢?Arrow FileSystem 定义了 NormalizeFilePath,专门处理一些 Windows Path / SubTreeSystem 之类的路径之类的内容,来给路径的逻辑做了统一的处理

1
2
3
4
5
Result<std::string> SubTreeFileSystem::NormalizePath(std::string path) {
ARROW_ASSIGN_OR_RAISE(auto real_path, PrependBase(path));
ARROW_ASSIGN_OR_RAISE(auto normalized, base_fs_->NormalizePath(real_path));
return StripBase(std::move(normalized));
}

但是在介绍 FileSystem 之前,我们先介绍一下系统运行的上下文。

IOContext and Executor

IOContext 是 Parquet 文件读取的「上下文」。因为 arrow 用的 C++17 没有标准的 async 模型,所以这里基本上还是用 IO 线程池包装了同步的接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
/// EXPERIMENTAL: options provider for IO tasks
///
/// Includes an Executor (which will be used to execute asynchronous reads),
/// a MemoryPool (which will be used to allocate buffers when zero copy reads
/// are not possible), and an external id (in case the executor receives tasks from
/// multiple sources and must distinguish tasks associated with this IOContext).
struct ARROW_EXPORT IOContext {
private:
MemoryPool* pool_;
::arrow::internal::Executor* executor_;
int64_t external_id_;
StopToken stop_token_;
};

别的都很好理解,external_id_ 是一个类似 IOTag 的东西。我看它这里没有做很细的 tag,就基于 id 判断一下。这里它把 SubmitIO 包装成异步的了

1
2
3
4
5
6
7
8
template <typename... SubmitArgs>
auto SubmitIO(IOContext io_context, SubmitArgs&&... submit_args)
-> decltype(std::declval<::arrow::internal::Executor*>()->Submit(submit_args...)) {
::arrow::internal::TaskHints hints;
hints.external_id = io_context.external_id();
return io_context.executor()->Submit(hints, io_context.stop_token(),
std::forward<SubmitArgs>(submit_args)...);
}

这里具体提交 IO 的时候还有一些 TaskHint,但是好像没有什么人真的用了这一套。

1
2
3
4
5
6
7
8
9
10
11
12
// Hints about a task that may be used by an Executor.
// They are ignored by the provided ThreadPool implementation.
struct TaskHints {
// The lower, the more urgent
int32_t priority = 0;
// The IO transfer size in bytes
int64_t io_size = -1;
// The approximate CPU cost in number of instructions
int64_t cpu_cost = -1;
// An application-specific ID
int64_t external_id = -1;
};

关于 Executor,这里可以当成 IO 线程的包装器接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class ARROW_EXPORT Executor {
public:
using StopCallback = internal::FnOnce<void(const Status&)>;

virtual ~Executor();

// Spawn a fire-and-forget task.
template <typename Function>
Status Spawn(...);

// Transfers a future to this executor. Any continuations added to the
// returned future will run in this executor. Otherwise they would run
// on the same thread that called MarkFinished.
//
// This is necessary when (for example) an I/O task is completing a future.
// The continuations of that future should run on the CPU thread pool keeping
// CPU heavy work off the I/O thread pool. So the I/O task should transfer
// the future to the CPU executor before returning.
//
// By default this method will only transfer if the future is not already completed. If
// the future is already completed then any callback would be run synchronously and so
// no transfer is typically necessary. However, in cases where you want to force a
// transfer (e.g. to help the scheduler break up units of work across multiple cores)
// then you can override this behavior with `always_transfer`.
//
// Transfer 完成的是一个这样的逻辑,比方说用户的线程是一个 IO 线程,那么可能这里会访问的时候不太希望是在
// CPU Thread 去接受. 当然 Transfer 对于已经完成的 Future 可能会有点奇怪, 因为它实现是直接挂 callback
// 的.
template <typename T>
Future<T> Transfer(Future<T> future);

// Overload of Transfer which will always schedule callbacks on new threads even if the
// future is finished when the callback is added.
//
// This can be useful in cases where you want to ensure parallelism
//
// 强制 Transfer 操作.
template <typename T>
Future<T> TransferAlways(Future<T> future);

// Return the level of parallelism (the number of tasks that may be executed
// concurrently). This may be an approximate number.
virtual int GetCapacity() = 0;

// Return true if the thread from which this function is called is owned by this
// Executor. Returns false if this Executor does not support this property.
virtual bool OwnsThisThread() { return false; }

// Return true if this is the current executor being called
// n.b. this defaults to just calling OwnsThisThread
// unless the threadpool is disabled
virtual bool IsCurrentExecutor() { return OwnsThisThread(); }

/// \brief An interface to represent something with a custom destructor
///
/// \see KeepAlive
class ARROW_EXPORT Resource {
public:
virtual ~Resource() = default;
};

/// \brief Keep a resource alive until all executor threads have terminated
///
/// Executors may have static storage duration. In particular, the CPU and I/O
/// executors are currently implemented this way. These threads may access other
/// objects with static storage duration such as the OpenTelemetry runtime context
/// the default memory pool, or other static executors.
///
/// The order in which these objects are destroyed is difficult to control. In order
/// to ensure those objects remain alive until all threads have finished those objects
/// should be wrapped in a Resource object and passed into this method. The given
/// shared_ptr will be kept alive until all threads have finished their worker loops.
virtual void KeepAlive(std::shared_ptr<Resource> resource);

protected:
// Subclassing API
virtual Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
StopCallback&&) = 0;
};

这部分接口其实都相对比较好理解.

先介绍几个比较好理解的接口:

  1. KeepAlive: 生命周期 Ordering 维护器
  2. OwnsThisThread() 这部分实现很有意思,ThreadPool 实现这个的时候套了一层 threadlocal,我们可以看看
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
thread_local ThreadPool* current_thread_pool_ = nullptr;

bool ThreadPool::OwnsThisThread() { return current_thread_pool_ == this; }

void ThreadPool::LaunchWorkersUnlocked(int threads) {
std::shared_ptr<State> state = sp_state_;

for (int i = 0; i < threads; i++) {
state_->workers_.emplace_back();
auto it = --(state_->workers_.end());
*it = std::thread([this, state, it] {
current_thread_pool_ = this;
WorkerLoop(state, it);
});
}
}

我们可以直接看看 Transfer 的实现,这段处理的比较有意思

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
template <typename T, typename FT = Future<T>, typename FTSync = typename FT::SyncType>
Future<T> DoTransfer(Future<T> future, bool always_transfer = false) {
auto transferred = Future<T>::Make();
if (always_transfer) {
CallbackOptions callback_options = CallbackOptions::Defaults();
callback_options.should_schedule = ShouldSchedule::Always;
callback_options.executor = this;
auto sync_callback = [transferred](const FTSync& result) mutable {
transferred.MarkFinished(result);
};
future.AddCallback(sync_callback, callback_options);
return transferred;
}

// We could use AddCallback's ShouldSchedule::IfUnfinished but we can save a bit of
// work by doing the test here.
auto callback = [this, transferred](const FTSync& result) mutable {
auto spawn_status =
Spawn([transferred, result]() mutable { transferred.MarkFinished(result); });
if (!spawn_status.ok()) {
transferred.MarkFinished(spawn_status);
}
};
auto callback_factory = [&callback]() { return callback; };
if (future.TryAddCallback(callback_factory)) {
return transferred;
}
// If the future is already finished and we aren't going to force spawn a thread
// then we don't need to add another layer of callback and can return the original
// future
return future;
}

这段代码很清晰:

  1. 如果是 always_transfer,那么就可以创建一个 callback,直接挂过去
  2. 否则,尝试开一个 transfer & callback 的 thread, 尝试 async 去 MarkFinish. 如果 Future 已经 finish 了,这部分逻辑倒是很好解决了。

文件系统的接口

这里有些比较好玩的,首先看 FileSystem 这个基类

List 文件

这里会有一些对 Stream Open 的支持,可以打开 File 之类的操作,和根据 FileSelector 去 List 的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/// Get info for the given target.
///
/// Any symlink is automatically dereferenced, recursively.
/// A nonexistent or unreachable file returns an Ok status and
/// has a FileType of value NotFound. An error status indicates
/// a truly exceptional condition (low-level I/O error, etc.).
virtual Result<FileInfo> GetFileInfo(const std::string& path) = 0;
/// Same, for many targets at once.
virtual Result<FileInfoVector> GetFileInfo(const std::vector<std::string>& paths);
/// Same, according to a selector.
///
/// The selector's base directory will not be part of the results, even if
/// it exists.
/// If it doesn't exist, see `FileSelector::allow_not_found`.
virtual Result<FileInfoVector> GetFileInfo(const FileSelector& select) = 0;

/// Async version of GetFileInfo
virtual Future<FileInfoVector> GetFileInfoAsync(const std::vector<std::string>& paths);

/// Streaming async version of GetFileInfo
///
/// The returned generator is not async-reentrant, i.e. you need to wait for
/// the returned future to complete before calling the generator again.
virtual FileInfoGenerator GetFileInfoGenerator(const FileSelector& select);

这里观察到有一个 FileSelector 接口,实际上这里也会支持一些过滤规则,然后尽量下推

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/// \brief File selector for filesystem APIs
struct ARROW_EXPORT FileSelector {
/// The directory in which to select files.
/// If the path exists but doesn't point to a directory, this should be an error.
std::string base_dir;
/// The behavior if `base_dir` isn't found in the filesystem. If false,
/// an error is returned. If true, an empty selection is returned.
bool allow_not_found;
/// Whether to recurse into subdirectories.
bool recursive;
/// The maximum number of subdirectories to recurse into.
int32_t max_recursion;

FileSelector() : allow_not_found(false), recursive(false), max_recursion(INT32_MAX) {}
};

Dir Operations

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/// Create a directory and subdirectories.
///
/// This function succeeds if the directory already exists.
virtual Status CreateDir(const std::string& path, bool recursive = true) = 0;

/// Delete a directory and its contents, recursively.
virtual Status DeleteDir(const std::string& path) = 0;

/// Delete a directory's contents, recursively.
///
/// Like DeleteDir, but doesn't delete the directory itself.
/// Passing an empty path ("" or "/") is disallowed, see DeleteRootDirContents.
virtual Status DeleteDirContents(const std::string& path,
bool missing_dir_ok = false) = 0;

/// Async version of DeleteDirContents.
virtual Future<> DeleteDirContentsAsync(const std::string& path,
bool missing_dir_ok = false);

/// EXPERIMENTAL: Delete the root directory's contents, recursively.
///
/// Implementations may decide to raise an error if this operation is
/// too dangerous.
// NOTE: may decide to remove this if it's deemed not useful
virtual Status DeleteRootDirContents() = 0;

File and Stream Operations

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/// Delete a file.
virtual Status DeleteFile(const std::string& path) = 0;
/// Delete many files.
///
/// The default implementation issues individual delete operations in sequence.
virtual Status DeleteFiles(const std::vector<std::string>& paths);

/// Move / rename a file or directory.
///
/// If the destination exists:
/// - if it is a non-empty directory, an error is returned
/// - otherwise, if it has the same type as the source, it is replaced
/// - otherwise, behavior is unspecified (implementation-dependent).
virtual Status Move(const std::string& src, const std::string& dest) = 0;

/// Copy a file.
///
/// If the destination exists and is a directory, an error is returned.
/// Otherwise, it is replaced.
virtual Status CopyFile(const std::string& src, const std::string& dest) = 0;

/// Open an input stream for sequential reading.
virtual Result<std::shared_ptr<io::InputStream>> OpenInputStream(
const std::string& path) = 0;
/// Open an input stream for sequential reading.
///
/// This override assumes the given FileInfo validly represents the file's
/// characteristics, and may optimize access depending on them (for example
/// avoid querying the file size or its existence).
virtual Result<std::shared_ptr<io::InputStream>> OpenInputStream(const FileInfo& info);

/// Open an input file for random access reading.
virtual Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
const std::string& path) = 0;
/// Open an input file for random access reading.
///
/// This override assumes the given FileInfo validly represents the file's
/// characteristics, and may optimize access depending on them (for example
/// avoid querying the file size or its existence).
virtual Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
const FileInfo& info);

/// Async version of OpenInputStream
virtual Future<std::shared_ptr<io::InputStream>> OpenInputStreamAsync(
const std::string& path);
/// Async version of OpenInputStream
virtual Future<std::shared_ptr<io::InputStream>> OpenInputStreamAsync(
const FileInfo& info);

/// Async version of OpenInputFile
virtual Future<std::shared_ptr<io::RandomAccessFile>> OpenInputFileAsync(
const std::string& path);
/// Async version of OpenInputFile
virtual Future<std::shared_ptr<io::RandomAccessFile>> OpenInputFileAsync(
const FileInfo& info);

/// Open an output stream for sequential writing.
///
/// If the target already exists, existing data is truncated.
virtual Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
const std::string& path,
const std::shared_ptr<const KeyValueMetadata>& metadata) = 0;
Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(const std::string& path);

/// Open an output stream for appending.
///
/// If the target doesn't exist, a new empty file is created.
///
/// Note: some filesystem implementations do not support efficient appending
/// to an existing file, in which case this method will return NotImplemented.
/// Consider writing to multiple files (using e.g. the dataset layer) instead.
virtual Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
const std::string& path,
const std::shared_ptr<const KeyValueMetadata>& metadata) = 0;
Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(const std::string& path);

这里它打开文件,本身都是 Sync 或者 Async 的,而打开后的接口,可能有不同的 Sync Async 模型。根据我个人的理解,这个可能是个开发者责任制。比如某个接口在非关键路径上不是 async,就没有开发者愿意改它,然后这玩意就都是 Sync 的了。反正 Sync 是第一需要支持的东西。然后需要注意的是,似乎 FileSystem 这套接口使用的时候会尽量保证是 hold by shared_ptr 的。

Async 的实现目前也比较直接,应该是默认实现是包一套 SubmitIO,然后内部实现可以在继承的时候允许自己去做一些 hack. 这里举例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
namespace {

template <typename DeferredFunc>
auto FileSystemDefer(FileSystem* fs, bool synchronous, DeferredFunc&& func)
-> decltype(DeferNotOk(
fs->io_context().executor()->Submit(func, std::shared_ptr<FileSystem>{}))) {
auto self = fs->shared_from_this();
if (synchronous) {
return std::forward<DeferredFunc>(func)(std::move(self));
}
return DeferNotOk(io::internal::SubmitIO(
fs->io_context(), std::forward<DeferredFunc>(func), std::move(self)));
}

} // namespace

Future<std::shared_ptr<io::InputStream>> FileSystem::OpenInputStreamAsync(
const std::string& path) {
return FileSystemDefer(
this, default_async_is_sync_,
[path](std::shared_ptr<FileSystem> self) { return self->OpenInputStream(path); });
}

为什么 OpenFile 需要是 async 的呢?举例子就是 S3 的 Open:

这里 Init 本身会发出一个 HeadObject,所以可能需要尽量在对应的异步线程里打开

1
2
3
4
5
6
7
8
// A RandomAccessFile that reads from a S3 object
class ObjectInputFile final : public io::RandomAccessFile {
Status Init() {
// Issue a HEAD Object to get the content-length and ensure any
// errors (e.g. file not found) don't wait until the first Read() call.
// ...
}
};

File

File 有好几个接口,这些代码本身是非常清晰的。这里接口类似 concept,每个地方是一个

1
2
3
4
5
6
7
8
9
FileIterface(Close, CloseAsync, Abort, Tell. 这里特殊说一下 CloseAsync, 因为可能文件读取也要释放一些资源的)
Seekable(Seek)
Writable(Write, Flush. Write 可以传入 non-owned buffer, 也可以丢个 shared_ptr<Buffer> 进来来避免拷贝)
Readable(Read. Read 可以拷贝到用户的 Buffer, 也可以返回一个 shared_ptr<Buffer>)
OutputStream: FileIterface
InputStream: FileIterface, Readable (Advance, Peak, supports_zero_copy, ReadMetadata, ReadMetadataAsync)
RandomAccessFile: InputStream, Seekable (GetStream, GetSize, ReadAt, ReadAsync, ReadManyAsync, WillNeed)
WritableFile: OutputStream (WriteAt)
ReadWriteFileInterface: RandomAccessFile, WritableFile

这个部分注意一下 async 的实现即可。我们首先关注一下 S3 的 CloseAsync:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
Future<> CloseAsync() override {
if (closed_) return Status::OK();

if (current_part_) {
// Upload last part
RETURN_NOT_OK(CommitCurrentPart());
}

// S3 mandates at least one part, upload an empty one if necessary
if (part_number_ == 1) {
RETURN_NOT_OK(UploadPart("", 0));
}

// Wait for in-progress uploads to finish (if async writes are enabled)
return FlushAsync().Then([this]() {
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());

// At this point, all part uploads have finished successfully
DCHECK_GT(part_number_, 1);
DCHECK_EQ(upload_state_->completed_parts.size(),
static_cast<size_t>(part_number_ - 1));

S3Model::CompletedMultipartUpload completed_upload;
completed_upload.SetParts(upload_state_->completed_parts);
S3Model::CompleteMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
req.SetUploadId(upload_id_);
req.SetMultipartUpload(std::move(completed_upload));

auto outcome =
client_lock.Move()->CompleteMultipartUploadWithErrorFixup(std::move(req));
if (!outcome.IsSuccess()) {
return ErrorToStatus(
std::forward_as_tuple("When completing multiple part upload for key '",
path_.key, "' in bucket '", path_.bucket, "': "),
"CompleteMultipartUpload", outcome.GetError());
}

holder_ = nullptr;
closed_ = true;
return Status::OK();
});
}

这里因为完成的时候还得 Flush 然后发个 Complete 请求,所以很适合作为一个 Async。

对于 RandomAccessFile::ReadAt 来说,默认实现是个很挫的实现…,比如:

1
2
3
4
5
6
7
8
9
10
11
12
Result<int64_t> RandomAccessFile::ReadAt(int64_t position, int64_t nbytes, void* out) {
std::lock_guard<std::mutex> lock(interface_impl_->lock_);
RETURN_NOT_OK(Seek(position));
return Read(nbytes, out);
}

Result<std::shared_ptr<Buffer>> RandomAccessFile::ReadAt(int64_t position,
int64_t nbytes) {
std::lock_guard<std::mutex> lock(interface_impl_->lock_);
RETURN_NOT_OK(Seek(position));
return Read(nbytes);
}

File 系还有几个 Concurrency Wrapper,但在 Release 编译的时候其实是不上锁的( Concurrency Wrapper 实现了。当然,系统如果支持这样,可以做一些相对好的操作。HDFS 这样有的 client 不支持 pread 的可以走这套,但是如果走 pread 肯定可以有更好的性能.

1
2
// A RandomAccessFile that reads from a S3 object
class ObjectInputFile final : public io::RandomAccessFile;

这里就实现了不带锁的 pread,来加速对应的操作。

Buffered and Memory Layer

Buffered 是 arrow 包装的一套 Buffer IO 系统,我觉得有点类似 Rust 标准库的那套 Buffer IO,然后也包装了一些 Arrow 特有的逻辑。读取的

BufferedOutputStreamBufferedInputStream 是 Buffered IO 的实现,分别把输入输出 Buffer 化。

src/arrow/io/memory.h 有一些用 Memory 中的 Buffer 或者别的东西当成输入或者输出的工具,可以在测试之类的代码很方便的使用。

Transform

TransformInputStream 允许用户给 inputStream 读 bytes 的时候定制一个 transform 函数

1
2
3
4
5
class ARROW_EXPORT TransformInputStream : public InputStream {
public:
using TransformFunc =
std::function<Result<std::shared_ptr<Buffer>>(const std::shared_ptr<Buffer>&)>;
};

感觉这个得定义一个有状态的函数,比如说那种 utf8 parsing 的时候返回的 buf 肯定不一定是完整的 buf 了…

Memory Caching Layer

src/arrow/io/caching.h 处理了预读、缓存、IO合并的逻辑。ReadRangeCache 会预读并且缓存数据,它有两种形式:

  1. Lazy: 等待用户的读,来触发 IO
  2. Default: 把所有 IO 发出去

上面两种接口都会发送 IO 合并,然后读一个大块的时候 IO 发出去之后会缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
struct ARROW_EXPORT CacheOptions {
static constexpr double kDefaultIdealBandwidthUtilizationFrac = 0.9;
static constexpr int64_t kDefaultMaxIdealRequestSizeMib = 64;

/// \brief The maximum distance in bytes between two consecutive
/// ranges; beyond this value, ranges are not combined
int64_t hole_size_limit;
/// \brief The maximum size in bytes of a combined range; if
/// combining two consecutive ranges would produce a range of a
/// size greater than this, they are not combined
int64_t range_size_limit;
/// \brief A lazy cache does not perform any I/O until requested.
bool lazy;
/// \brief The maximum number of ranges to be prefetched. This is only used
/// for lazy cache to asynchronously read some ranges after reading the target range.
int64_t prefetch_limit = 0;

bool operator==(const CacheOptions& other) const {
return hole_size_limit == other.hole_size_limit &&
range_size_limit == other.range_size_limit && lazy == other.lazy &&
prefetch_limit == other.prefetch_limit;
}

/// \brief Construct CacheOptions from network storage metrics (e.g. S3).
///
/// \param[in] time_to_first_byte_millis Seek-time or Time-To-First-Byte (TTFB) in
/// milliseconds, also called call setup latency of a new S3 request.
/// The value is a positive integer.
/// \param[in] transfer_bandwidth_mib_per_sec Data transfer Bandwidth (BW) in MiB/sec.
/// The value is a positive integer.
/// \param[in] ideal_bandwidth_utilization_frac Transfer bandwidth utilization fraction
/// (per connection) to maximize the net data load.
/// The value is a positive double precision number less than 1.
/// \param[in] max_ideal_request_size_mib The maximum single data request size (in MiB)
/// to maximize the net data load.
/// The value is a positive integer.
/// \return A new instance of CacheOptions.
static CacheOptions MakeFromNetworkMetrics(
int64_t time_to_first_byte_millis, int64_t transfer_bandwidth_mib_per_sec,
double ideal_bandwidth_utilization_frac = kDefaultIdealBandwidthUtilizationFrac,
int64_t max_ideal_request_size_mib = kDefaultMaxIdealRequestSizeMib);

static CacheOptions Defaults();
static CacheOptions LazyDefaults();
};

这里比较新颖的一个地方是 MakeFromNetworkMetrics, 根据网络状况生成对应的 IO 状况.