Dive into Apache Arrow(その2)- pg2arrow

前回のエントリApache Arrow のフォーマットについて調べていたが、これのゴールは、外部テーブル(Foreign Table)を介してApache Arrowファイルを読み出し、高速に集計・解析処理を実行する事にある。
特にPG-Stromの場合はSSD-to-GPU Direct SQLという飛び道具が使えるため、NVME-SSD上のApache Arrowファイルを直接GPUへ転送し、列指向データをGPUの大量のコアでぐわーーっと処理するという構成が取れるハズである。

で、FDWモジュールがApache Arrowファイルを読むためには、まずメタデータを解読してどの列がどういったデータ型を持っており、どこにどういう形式で配置されているのか特定できる必要がある。
そのために書いたコードを元に、先ずPostgreSQLのデータをApache Arrowファイルとしてダンプするためのツールを作ってみた。

github.com

pg2arrowの使い方

ある程度 psqlpg_dump のオプションを参考にしたので、PostgreSQL使いの人ならそれほど違和感なく使えるはず。
データベースやユーザ名などの指定は共通。-c COMMAND-f FILENAMEで指定したSQLを実行し、その結果を-o FILENAMEで指定したファイルへ書き出す。

$ ./pg2arrow --help
Usage:
  pg2arrow [OPTION]... [DBNAME [USERNAME]]

General options:
  -d, --dbname=DBNAME     database name to connect to
  -c, --command=COMMAND   SQL command to run
  -f, --file=FILENAME     SQL command from file
      (-c and -f are exclusive, either of them must be specified)
  -o, --output=FILENAME   result file in Apache Arrow format
      (default creates a temporary file)

Arrow format options:
  -s, --segment-size=SIZE size of record batch for each
      (default is 512MB)

Connection options:
  -h, --host=HOSTNAME     database server host
  -p, --port=PORT         database server port
  -U, --username=USERNAME database user name
  -w, --no-password       never prompt for password
  -W, --password          force password prompt

Debug options:
      --dump=FILENAME     dump information of arrow file
      --progress          shows progress of the job.

Report bugs to <pgstrom@heterodb.com>.

使用例

例として、hogehogeテーブルを日付順にソートしてダンプしてみる。

hogehogeテーブルの定義は以下の通り。c列は複合型(composite type)で、内部的にサブフィールドを持つ。

postgres=# \d hogehoge
                  Table "public.hogehoge"
 Column |       Type       | Collation | Nullable | Default
--------+------------------+-----------+----------+---------
 id     | integer          |           | not null |
 a      | bigint           |           | not null |
 b      | double precision |           | not null |
 c      | comp             |           |          |
 d      | text             |           |          |
 e      | double precision |           |          |
 ymd    | date             |           |          |
Indexes:
    "hogehoge_pkey" PRIMARY KEY, btree (id)

postgres=# \dS comp
                Composite type "public.comp"
 Column |       Type       | Collation | Nullable | Default
--------+------------------+-----------+----------+---------
 x      | integer          |           |          |
 y      | double precision |           |          |
 z      | numeric          |           |          |
 memo   | text             |           |          |

さっそく実行し、特にエラーもなく終了する。

$ ./pg2arrow postgres -o /tmp/hogehoge -c "SELECT * FROM hogehoge ORDER BY ymd"

PyArrowで結果を確認する

以下のように、PyArrowを用いて Apache Arrow 形式のファイルを読み出す事ができる。

>>> import pyarrow as pa
>>> X = pa.RecordBatchFileReader("/tmp/hogehoge").read_all()
>>> X.schema
id: int32
a: int64
b: double
c: struct<x: int32, y: double, z: decimal(30, 11), memo: string>
  child 0, x: int32
  child 1, y: double
  child 2, z: decimal(30, 11)
  child 3, memo: string
d: string
e: double
ymd: date32[day]

DBテーブルの定義に準じて、Apache Arrowとしてのスキーマが作られている事が分かる。

ちなみに、Pandasのread_sqlを使ってテーブルを読み出すと、複合型であるcは文字列型にされてしまっている。これは流石にチョットツライ。

>>> A = pd.read_sql(sql="SELECT * FROM hogehoge LIMIT 1000", con="postgresql://localhost/postgres")
>>> B = pa.Table.from_pandas(A)
>>> B.schema
id: int64
a: int64
b: double
c: string
d: string
e: double
ymd: date32[day]
__index_level_0__: int64
metadata
--------
{b'pandas': b'{"index_columns": ["__index_level_0__"], "column_indexes": [{"fi'
            b'eld_name": null, "name": null, "numpy_type": "object", "pandas_t'
            b'ype": "unicode", "metadata": {"encoding": "UTF-8"}}], "columns":'
            b' [{"field_name": "id", "name": "id", "numpy_type": "int64", "pan'
            b'das_type": "int64", "metadata": null}, {"field_name": "a", "name'
            b'": "a", "numpy_type": "int64", "pandas_type": "int64", "metadata'
            b'": null}, {"field_name": "b", "name": "b", "numpy_type": "float6'
            b'4", "pandas_type": "float64", "metadata": null}, {"field_name": '
            b'"c", "name": "c", "numpy_type": "object", "pandas_type": "unicod'
            b'e", "metadata": null}, {"field_name": "d", "name": "d", "numpy_t'
            b'ype": "object", "pandas_type": "unicode", "metadata": null}, {"f'
            b'ield_name": "e", "name": "e", "numpy_type": "float64", "pandas_t'
            b'ype": "float64", "metadata": null}, {"field_name": "ymd", "name"'
            b': "ymd", "numpy_type": "object", "pandas_type": "date", "metadat'
            b'a": null}, {"field_name": "__index_level_0__", "name": null, "nu'
            b'mpy_type": "int64", "pandas_type": "int64", "metadata": null}], '
            b'"pandas_version": "0.22.0"}'}

中身の方を見てみると、このような感じで正しくクエリの結果を保存できている事がわかる。

>>> X.to_pandas()
       id     a          b                                                  c  \
0      24  3379  96.200935  {'memo': '1ff1de774005f8da13f42943881c655f', '...
1    2041  2208  71.122772  {'memo': '3416a75f4cea9109507cacd8e2f2aefc', '...
2    2042  7040  54.081142  {'memo': 'a1d0c6e83f027327d8461063f4ac58a6', '...
3    2043  1635  92.302224  {'memo': '17e62166fc8586dfa4d1bc0e1742c08b', '...
4    2044  3295  58.273429  {'memo': 'f7177163c833dff4b38fc8d2872f1ec6', '...
5    2045  9671  58.085893  {'memo': '6c8349cc7260ae62e3b1396831a8398f', '...
        :          :              :
                                    d          e        ymd
0    f4c1893e352a4e08d1a3b3b444c2d692  34.916724 2025-08-08
1    13f5d46a9f51e30dac02b33b74e9043e  11.098757 2018-09-26
2    b041a9cb97b220c1b073266af31cb45f  81.570772 2025-03-02
3    77e882ed95bf5838abd1b4336a7d2fdc  16.990162 2016-12-29
4    f78d9fde23891ae293f07c576982155b   7.017451 2020-10-08
5    4dfe9131c7ee3a44583a8d21d5ca26a2   3.979350 2023-03-09

今後のロードマップ

ひとまず、pg2arrowを使う事で PostgreSQLApache Arrow へデータを変換する流れができた。
(と、同時に手を動かしてデータ形式を一通り勉強したといえる。えへん。)

次は本来の目標である、FDWを使って Apache Arrow ⇒ PostgreSQL へのデータの流れを作る事。
これによって、IoT/M2Mの領域で扱われるような大量データを、毎度DBへインポートする事なく集計・解析系のクエリで処理する事ができるようになる。

他に、既に個別に頂いているpg2arrowの拡張としては、既に存在するArrowファイルに差分だけを追加する--appendモードや、複数のArrowファイルをマージして一個の巨大なArrowファイルに再編する--mergeモードなど。この辺も追って作っていきたい。