/// \brief A FileSystem implementation that delegates to another /// implementation but inserts latencies at various points. classARROW_EXPORT SlowFileSystem : public FileSystem { public: SlowFileSystem(std::shared_ptr<FileSystem> base_fs, std::shared_ptr<io::LatencyGenerator> latencies); SlowFileSystem(std::shared_ptr<FileSystem> base_fs, double average_latency); SlowFileSystem(std::shared_ptr<FileSystem> base_fs, double average_latency, int32_t seed);
MockFIleSystem: 把所有内容持有在内存中的 FileSystem,应该是给测试用的
SubTreeFileSystem: 感觉像是某个 base filesystem,给一个子路径的 “SubTree” 文件系统
/// EXPERIMENTAL: options provider for IO tasks /// /// Includes an Executor (which will be used to execute asynchronous reads), /// a MemoryPool (which will be used to allocate buffers when zero copy reads /// are not possible), and an external id (in case the executor receives tasks from /// multiple sources and must distinguish tasks associated with this IOContext). structARROW_EXPORT IOContext { private: MemoryPool* pool_; ::arrow::internal::Executor* executor_; int64_t external_id_; StopToken stop_token_; };
别的都很好理解,external_id_ 是一个类似 IOTag 的东西。我看它这里没有做很细的 tag,就基于 id 判断一下。这里它把 SubmitIO 包装成异步的了
// Hints about a task that may be used by an Executor. // They are ignored by the provided ThreadPool implementation. structTaskHints { // The lower, the more urgent int32_t priority = 0; // The IO transfer size in bytes int64_t io_size = -1; // The approximate CPU cost in number of instructions int64_t cpu_cost = -1; // An application-specific ID int64_t external_id = -1; };
classARROW_EXPORT Executor { public: using StopCallback = internal::FnOnce<void(const Status&)>;
virtual ~Executor();
// Spawn a fire-and-forget task. template <typename Function> Status Spawn(...); // Transfers a future to this executor. Any continuations added to the // returned future will run in this executor. Otherwise they would run // on the same thread that called MarkFinished. // // This is necessary when (for example) an I/O task is completing a future. // The continuations of that future should run on the CPU thread pool keeping // CPU heavy work off the I/O thread pool. So the I/O task should transfer // the future to the CPU executor before returning. // // By default this method will only transfer if the future is not already completed. If // the future is already completed then any callback would be run synchronously and so // no transfer is typically necessary. However, in cases where you want to force a // transfer (e.g. to help the scheduler break up units of work across multiple cores) // then you can override this behavior with `always_transfer`. // // Transfer 完成的是一个这样的逻辑,比方说用户的线程是一个 IO 线程,那么可能这里会访问的时候不太希望是在 // CPU Thread 去接受. 当然 Transfer 对于已经完成的 Future 可能会有点奇怪, 因为它实现是直接挂 callback // 的. template <typename T> Future<T> Transfer(Future<T> future);
// Overload of Transfer which will always schedule callbacks on new threads even if the // future is finished when the callback is added. // // This can be useful in cases where you want to ensure parallelism // // 强制 Transfer 操作. template <typename T> Future<T> TransferAlways(Future<T> future); // Return the level of parallelism (the number of tasks that may be executed // concurrently). This may be an approximate number. virtualintGetCapacity()= 0;
// Return true if the thread from which this function is called is owned by this // Executor. Returns false if this Executor does not support this property. virtualboolOwnsThisThread(){ returnfalse; }
// Return true if this is the current executor being called // n.b. this defaults to just calling OwnsThisThread // unless the threadpool is disabled virtualboolIsCurrentExecutor(){ returnOwnsThisThread(); }
/// \brief An interface to represent something with a custom destructor /// /// \see KeepAlive classARROW_EXPORT Resource { public: virtual ~Resource() = default; };
/// \brief Keep a resource alive until all executor threads have terminated /// /// Executors may have static storage duration. In particular, the CPU and I/O /// executors are currently implemented this way. These threads may access other /// objects with static storage duration such as the OpenTelemetry runtime context /// the default memory pool, or other static executors. /// /// The order in which these objects are destroyed is difficult to control. In order /// to ensure those objects remain alive until all threads have finished those objects /// should be wrapped in a Resource object and passed into this method. The given /// shared_ptr will be kept alive until all threads have finished their worker loops. virtualvoidKeepAlive(std::shared_ptr<Resource> resource); protected: // Subclassing API virtual Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken, StopCallback&&)= 0; };
// We could use AddCallback's ShouldSchedule::IfUnfinished but we can save a bit of // work by doing the test here. auto callback = [this, transferred](const FTSync& result) mutable { auto spawn_status = Spawn([transferred, result]() mutable { transferred.MarkFinished(result); }); if (!spawn_status.ok()) { transferred.MarkFinished(spawn_status); } }; auto callback_factory = [&callback]() { return callback; }; if (future.TryAddCallback(callback_factory)) { return transferred; } // If the future is already finished and we aren't going to force spawn a thread // then we don't need to add another layer of callback and can return the original // future return future; }
/// Get info for the given target. /// /// Any symlink is automatically dereferenced, recursively. /// A nonexistent or unreachable file returns an Ok status and /// has a FileType of value NotFound. An error status indicates /// a truly exceptional condition (low-level I/O error, etc.). virtual Result<FileInfo> GetFileInfo(const std::string& path)= 0; /// Same, for many targets at once. virtual Result<FileInfoVector> GetFileInfo(const std::vector<std::string>& paths); /// Same, according to a selector. /// /// The selector's base directory will not be part of the results, even if /// it exists. /// If it doesn't exist, see `FileSelector::allow_not_found`. virtual Result<FileInfoVector> GetFileInfo(const FileSelector& select)= 0;
/// Async version of GetFileInfo virtual Future<FileInfoVector> GetFileInfoAsync(const std::vector<std::string>& paths);
/// Streaming async version of GetFileInfo /// /// The returned generator is not async-reentrant, i.e. you need to wait for /// the returned future to complete before calling the generator again. virtual FileInfoGenerator GetFileInfoGenerator(const FileSelector& select);
这里观察到有一个 FileSelector 接口,实际上这里也会支持一些过滤规则,然后尽量下推
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/// \brief File selector for filesystem APIs structARROW_EXPORT FileSelector { /// The directory in which to select files. /// If the path exists but doesn't point to a directory, this should be an error. std::string base_dir; /// The behavior if `base_dir` isn't found in the filesystem. If false, /// an error is returned. If true, an empty selection is returned. bool allow_not_found; /// Whether to recurse into subdirectories. bool recursive; /// The maximum number of subdirectories to recurse into. int32_t max_recursion;
/// Create a directory and subdirectories. /// /// This function succeeds if the directory already exists. virtual Status CreateDir(const std::string& path, bool recursive = true)= 0;
/// Delete a directory and its contents, recursively. virtual Status DeleteDir(const std::string& path)= 0;
/// Delete a directory's contents, recursively. /// /// Like DeleteDir, but doesn't delete the directory itself. /// Passing an empty path ("" or "/") is disallowed, see DeleteRootDirContents. virtual Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false)= 0;
/// Async version of DeleteDirContents. virtual Future<> DeleteDirContentsAsync(const std::string& path, bool missing_dir_ok = false);
/// EXPERIMENTAL: Delete the root directory's contents, recursively. /// /// Implementations may decide to raise an error if this operation is /// too dangerous. // NOTE: may decide to remove this if it's deemed not useful virtual Status DeleteRootDirContents()= 0;
/// Delete a file. virtual Status DeleteFile(const std::string& path)= 0; /// Delete many files. /// /// The default implementation issues individual delete operations in sequence. virtual Status DeleteFiles(const std::vector<std::string>& paths);
/// Move / rename a file or directory. /// /// If the destination exists: /// - if it is a non-empty directory, an error is returned /// - otherwise, if it has the same type as the source, it is replaced /// - otherwise, behavior is unspecified (implementation-dependent). virtual Status Move(const std::string& src, const std::string& dest)= 0;
/// Copy a file. /// /// If the destination exists and is a directory, an error is returned. /// Otherwise, it is replaced. virtual Status CopyFile(const std::string& src, const std::string& dest)= 0;
/// Open an input stream for sequential reading. virtual Result<std::shared_ptr<io::InputStream>> OpenInputStream( const std::string& path) = 0; /// Open an input stream for sequential reading. /// /// This override assumes the given FileInfo validly represents the file's /// characteristics, and may optimize access depending on them (for example /// avoid querying the file size or its existence). virtual Result<std::shared_ptr<io::InputStream>> OpenInputStream(const FileInfo& info);
/// Open an input file for random access reading. virtual Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile( const std::string& path) = 0; /// Open an input file for random access reading. /// /// This override assumes the given FileInfo validly represents the file's /// characteristics, and may optimize access depending on them (for example /// avoid querying the file size or its existence). virtual Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile( const FileInfo& info);
/// Async version of OpenInputStream virtual Future<std::shared_ptr<io::InputStream>> OpenInputStreamAsync( const std::string& path); /// Async version of OpenInputStream virtual Future<std::shared_ptr<io::InputStream>> OpenInputStreamAsync( const FileInfo& info);
/// Async version of OpenInputFile virtual Future<std::shared_ptr<io::RandomAccessFile>> OpenInputFileAsync( const std::string& path); /// Async version of OpenInputFile virtual Future<std::shared_ptr<io::RandomAccessFile>> OpenInputFileAsync( const FileInfo& info);
/// Open an output stream for sequential writing. /// /// If the target already exists, existing data is truncated. virtual Result<std::shared_ptr<io::OutputStream>> OpenOutputStream( const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) = 0; Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(const std::string& path);
/// Open an output stream for appending. /// /// If the target doesn't exist, a new empty file is created. /// /// Note: some filesystem implementations do not support efficient appending /// to an existing file, in which case this method will return NotImplemented. /// Consider writing to multiple files (using e.g. the dataset layer) instead. virtual Result<std::shared_ptr<io::OutputStream>> OpenAppendStream( const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) = 0; Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(const std::string& path);
这里它打开文件,本身都是 Sync 或者 Async 的,而打开后的接口,可能有不同的 Sync Async 模型。根据我个人的理解,这个可能是个开发者责任制。比如某个接口在非关键路径上不是 async,就没有开发者愿意改它,然后这玩意就都是 Sync 的了。反正 Sync 是第一需要支持的东西。然后需要注意的是,似乎 FileSystem 这套接口使用的时候会尽量保证是 hold by shared_ptr 的。
// A RandomAccessFile that reads from a S3 object classObjectInputFilefinal : public io::RandomAccessFile { Status Init(){ // Issue a HEAD Object to get the content-length and ensure any // errors (e.g. file not found) don't wait until the first Read() call. // ... } };
Future<> CloseAsync() override { if (closed_) return Status::OK();
if (current_part_) { // Upload last part RETURN_NOT_OK(CommitCurrentPart()); }
// S3 mandates at least one part, upload an empty one if necessary if (part_number_ == 1) { RETURN_NOT_OK(UploadPart("", 0)); }
// Wait for in-progress uploads to finish (if async writes are enabled) returnFlushAsync().Then([this]() { ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
// At this point, all part uploads have finished successfully DCHECK_GT(part_number_, 1); DCHECK_EQ(upload_state_->completed_parts.size(), static_cast<size_t>(part_number_ - 1));
/// \brief The maximum distance in bytes between two consecutive /// ranges; beyond this value, ranges are not combined int64_t hole_size_limit; /// \brief The maximum size in bytes of a combined range; if /// combining two consecutive ranges would produce a range of a /// size greater than this, they are not combined int64_t range_size_limit; /// \brief A lazy cache does not perform any I/O until requested. bool lazy; /// \brief The maximum number of ranges to be prefetched. This is only used /// for lazy cache to asynchronously read some ranges after reading the target range. int64_t prefetch_limit = 0;
/// \brief Construct CacheOptions from network storage metrics (e.g. S3). /// /// \param[in] time_to_first_byte_millis Seek-time or Time-To-First-Byte (TTFB) in /// milliseconds, also called call setup latency of a new S3 request. /// The value is a positive integer. /// \param[in] transfer_bandwidth_mib_per_sec Data transfer Bandwidth (BW) in MiB/sec. /// The value is a positive integer. /// \param[in] ideal_bandwidth_utilization_frac Transfer bandwidth utilization fraction /// (per connection) to maximize the net data load. /// The value is a positive double precision number less than 1. /// \param[in] max_ideal_request_size_mib The maximum single data request size (in MiB) /// to maximize the net data load. /// The value is a positive integer. /// \return A new instance of CacheOptions. static CacheOptions MakeFromNetworkMetrics( int64_t time_to_first_byte_millis, int64_t transfer_bandwidth_mib_per_sec, double ideal_bandwidth_utilization_frac = kDefaultIdealBandwidthUtilizationFrac, int64_t max_ideal_request_size_mib = kDefaultMaxIdealRequestSizeMib);