Arrow Data System

  1. Apache Arrow 是由 Pandas 作者 Wes McKenny 主导的一款以内存为主体,辐射到数据交换、内存计算、多格式多平台转换的库。它的大概 Idea 可以见下图,写的非常直观。

    image

    我们简单介绍一下 Arrow 的内容:

    1. 一切的核心,Arrow 的内存 Columnar 结构(见官方介绍 [1] 和我之前的博客 [2])
      1. 在这上面的 ExtensionType,支持用户定义自己的类型,包括 Tensor 这样的类型
      2. IPC File
    2. 数据交换上成功的工具
      1. Arrow JDBC Converter [3]
      2. ADBC [4]
      3. Arrow Flight [5] 和基于 Arrow Flight SQL [6]
    3. 一些计算相关的 API
      1. 向量化计算函数的 Compute [7]
      2. Push-based Execution 的 Acero [8]
      3. LLVM Codegen 的执行 Gandiva [9] (已经快死了)
      4. SQL Planning 的 Datafusion [10]
      5. 分布式执行的 Ballista
    4. 一些数据源和写入相关的 API
      1. FileSystem: 对文件来源的层次的 Unify [11]
      2. Dataset API: 对某个 FileSystem 下的多个文件 / 分区 / 数据格式的抽象,也可以加上 Project, Filter 等操作 [12]

    就我个人评价而言,Arrow 在每个领域中很难说得上是性能最好的,但是胜在提供了比较完善的解决方案,api 相对侵入性也不会很大。很多地方扩展性比较强,例如提供了 ExtensionType(可以用 ExtensionType 去支持机器学习需要的 Tensor 这样的类型 [16] ),也允许用户按照 API 去 Inject 自己想要的东西。Arrow Flight 总体上也是比较开放的。这些合起来的原因和方便的生态导致 Arrow 其实被较为广泛的使用了,包括 Snowflake [13] 和 Bigquery [14] 这几家都在用,开源的数据项目,像 Ray 这样的也会积极拥抱 Arrow。DuckDB 甚至给支持 Arrow 的转换写了一篇博客 [15]。

    总的来说,Arrow 主要为生态开放的做了不少工作,虽然核心的性能不是最好的(参见 Velox 和诸多的大厂开放的引擎,一个比较经典的例子是 Firebolt,基于开源的引擎自己拼了一个数据仓库出来),但是因为很强的通用性和比较完善的生态,现在已经侵入了很多人的系统。

    Core: In Memory Format

    关于 Arrow 的 In Memory Format,之前尝试介绍过了:

    1. https://blog.mwish.me/2023/05/04/Type-and-Array-in-Columnar-System/
    2. https://blog.mwish.me/2022/10/04/Parquet-Part2-arrow-Parquet-code-path/

    这个地方本身没啥好说的,值得说的都在上面的内容了

    IPC Format

    IPC File 格式 基本等价于 内存中 Format 的 RecordBatch,这里它会尽量在 package 里面做 alignas (比较有意思和关联的 patch 看这个:https://github.com/apache/arrow/pull/35565/files ,在计算层要求输出是满足 align 的),然后让数据能够被 Zero-Copy 的解析到 Arrow 格式上。这里应该关心可能的数据和元数据

    IPC Format 有两种格式:

    1. Streaming Format: Appendable,必须整个解析
    2. Random Access: 一整个 RecordBatch 写的文件

    在协议上,这里允许(不定大小的)下列元素:

    1. Schema: 文件或者 Stream 的 Schema(这里需要注意,ipc 既可以写在文件里,又可以为了整个流来设置)
    2. DictionaryBatch
    3. RecordBatch

    在定义的格式中,基本的元素是 Message,下面是对应的 Message Format:

    ipc-file-format

    这里元数据由 FlatBuffer 组成( https://blog.mwish.me/2022/01/14/format-thinking/ )它的格式定义在 https://github.com/apache/arrow/blob/main/format/Message.fbs 中。这里包括:

    1
    2
    3
    4
    5
    6
    table Message {
    version: org.apache.arrow.flatbuf.MetadataVersion;
    header: MessageHeader;
    bodyLength: long;
    custom_metadata: [ KeyValue ];
    }

    Stream

    C9ACE6F5-A3F6-45B3-A4E6-169BFAAF96A0

    a7d9c337-14e0-44f8-8c1c-e8badd189e0d

    为什么之前强调了是文件/Stream的 Schema 呢,因为它本身是没有文件(内)级别的 Schema Evolution 的

    比较重要的是 Record Batch Message,这里会在写文件和读取的时候都保证 alignment (8B,也可以手动指定),这里有:https://arrow.apache.org/docs/format/Columnar.html#recordbatch-message

    这部分实现的内容很像 ORC 那套格式。

    同时,这部分字典格式是 INCREMENT 的(看上面的 DICTIONARY DELTA),增量的字典带来了一部分的扩展性:https://arrow.apache.org/docs/format/Columnar.html#dictionary-messages

    File

    9E626D1C-90D9-4261-80DD-761870A41306

    Extension Type

    用户可以以 Key-Value Metadata 的形式去做 Schema 上的扩展,GeoArrow,Tensor 这些,都可以收益于Arrow 包含的扩展:https://arrow.apache.org/docs/format/Columnar.html#extension-types

    数据交换的工具

    如果说之前的部分是内存和 Stream 层面的 kernel,那么下面来介绍数据交换。提到这部分,可能首先想起的是 IPC 层 CWI 的相关的论文 「Don’t Hold My Data Hostage – A Case For Client Protocol Redesign」。其实和文章结论比较接近,但是这部分其实比较有意思,也给用户可以科普一下 client-side 的 dirty work 和复杂度。

    Arrow Flight

    Arrow Flight 其实是在 grpc 和 protobuf 上开了一套洞的 Arrow 传输协议。

    Q: Arrow Flight 和 Arrow Streaming Format 有什么关系?

    A: Arrow Flight 主要工作在传输协议和分布式上

    如图,下面是 Arrow Flight 的工作模式

    flight-request-pattern

    如图,Arrow Flight 允许以如下方式去捞一个 Dataset,和这篇( https://zhuanlan.zhihu.com/p/340520316 )有异曲同工之妙(废话,AP 做法都差不多)

    flight-batch

    在代码实现上,Flight 在 Protobuf 代码上开了个洞,尽量让他解析 Arrow 数据的时候,被 Arrow 拦截,做到实现上的 Zero-copy (众所周知,pb 本身不是为了传输大对象设计的)。

    Flight 还支持一些非 gRPC 的访问,比如协议上准备兼容

    From JDBC Converter to Others

    JDBC / ODBC 实际上是 90 年代起数据交换的事实标准,如果考虑到使用的话,流程如左图:

    casting

    右图首先提供了一种 JDBC Client 的交互方式。即首先需要转一层。当然这并不能让人满意,所以接下来出现了下面的数据流

    data

    1. ADBC: 启发自 DuckDB 作者的论文 「Don’t Hold My Data Hostage – A Case For Client Protocol Redesign」。相当于列式的 JDBC。需要注意的是,这个格式虽然比较有潜力,但是才 0.4,可能需要等他自己发展一下
    2. Driver: 类似数据库的 Driver,能够把 Arrow 转成 JDBC / ADBC
    3. Flight SQL Protocol: Flight 之上的 Catalog / SQL 框架。与实现大概无关,愿景是一个通用协议层

    说实话这套东西竞争力还挺强的,BigQuery, Snowflake, DataBricks 都或多或少用了这套东西. 主要问题还是用数据库的 JDBC Driver 或者甚至行的接口来处理还是比较菜的。需要专门的接口来 handle 这套内容。

    计算相关的工具

    Compute & Acero

    在 Arrow 的核心中提供了 Arrow Compute 这样的工具,它的基本逻辑和定位见之前的博客:https://blog.mwish.me/2023/05/27/Arrow-Compute/

    Compute 定位的是向量化的算子( Function )。

    虽然 Compute 本身可能没有 Expression 的概念,但是 Arrow 为了别的地方使用,在 Compute 里面实现了 Expression。几乎所有数据库都有 Expression 相关的概念,它可以是:

    1. LiteralValue Or Datum
    2. FieldRef of input Datum
    3. Compute Function,Function Member 都是别的 Expression

    expressions

    Acero 是构建在这套系统上的 Operator 层。定位上,Acero 是一个 Push-Based 的算子层,虽然代码写的不是很细,但是比较易懂:

    acero-nodes

    这里可以根据 Declaration 生成一组对应的 ExecPlan / ExecNode,然后下层来驱动数据流的执行。

    Substrait

    Substrait 的定位是一个开放的 (可能被优化过的) Plan,后面可以是对应的执行器,前面产生对应的 Plan。Arrow C++ Acero 本身并不负责 Planning,只是一个执行的对应的流。

    B1AA6B5E-808A-43C6-97CC-D165C82B13A8

    Datafusion / Ballista

    D457D790-709D-4B27-846D-DFBDC4183DB2

    SQL 和执行对应的逻辑。

    Gandiva

    Gandiva 类似 Compute,是 Dremio 捐赠给项目的,对应的逻辑是 Codegen. 这个项目感觉差不多已经似了

    gandiva

    数据源和写入相关的 API

    0EC24B5E-7C33-4F1D-B813-B9C4BBA65D25

    Arrow 在对应的内容上有两层

    1. FileSystem: 处理 Local, S3, HDFS 等「存储后台」的文件
    2. DataSet: ORC/JSON/CSV 等文件是在 DataSet 层处理的,这层也使用了 FileSystem。有点像 Presto 的 Connector,可以 Filter Pushdown 和读/写

    References

    1. DuckDB quacks Arrow: A zero-copy data integration between Apache Arrow and DuckDB https://duckdb.org/2021/12/03/duck-arrow.html
    2. https://arrow.apache.org/docs/cpp/api/tensor.html 和 maillist 的讨论 https://lists.apache.org/thread/95ncmr4two34mxodlds7bzxvrrzmk1s6