Dive into Apache Arrow(その2)- pg2arrow
前回のエントリで Apache Arrow のフォーマットについて調べていたが、これのゴールは、外部テーブル(Foreign Table)を介してApache Arrowファイルを読み出し、高速に集計・解析処理を実行する事にある。
特にPG-Stromの場合はSSD-to-GPU Direct SQLという飛び道具が使えるため、NVME-SSD上のApache Arrowファイルを直接GPUへ転送し、列指向データをGPUの大量のコアでぐわーーっと処理するという構成が取れるハズである。

で、FDWモジュールがApache Arrowファイルを読むためには、まずメタデータを解読してどの列がどういったデータ型を持っており、どこにどういう形式で配置されているのか特定できる必要がある。
そのために書いたコードを元に、先ずPostgreSQLのデータをApache Arrowファイルとしてダンプするためのツールを作ってみた。
pg2arrowの使い方
ある程度 psql や pg_dump のオプションを参考にしたので、PostgreSQL使いの人ならそれほど違和感なく使えるはず。
データベースやユーザ名などの指定は共通。-c COMMANDか-f FILENAMEで指定したSQLを実行し、その結果を-o FILENAMEで指定したファイルへ書き出す。
$ ./pg2arrow --help
Usage:
pg2arrow [OPTION]... [DBNAME [USERNAME]]
General options:
-d, --dbname=DBNAME database name to connect to
-c, --command=COMMAND SQL command to run
-f, --file=FILENAME SQL command from file
(-c and -f are exclusive, either of them must be specified)
-o, --output=FILENAME result file in Apache Arrow format
(default creates a temporary file)
Arrow format options:
-s, --segment-size=SIZE size of record batch for each
(default is 512MB)
Connection options:
-h, --host=HOSTNAME database server host
-p, --port=PORT database server port
-U, --username=USERNAME database user name
-w, --no-password never prompt for password
-W, --password force password prompt
Debug options:
--dump=FILENAME dump information of arrow file
--progress shows progress of the job.
Report bugs to <pgstrom@heterodb.com>.
使用例
例として、hogehogeテーブルを日付順にソートしてダンプしてみる。
hogehogeテーブルの定義は以下の通り。c列は複合型(composite type)で、内部的にサブフィールドを持つ。
postgres=# \d hogehoge
Table "public.hogehoge"
Column | Type | Collation | Nullable | Default
--------+------------------+-----------+----------+---------
id | integer | | not null |
a | bigint | | not null |
b | double precision | | not null |
c | comp | | |
d | text | | |
e | double precision | | |
ymd | date | | |
Indexes:
"hogehoge_pkey" PRIMARY KEY, btree (id)
postgres=# \dS comp
Composite type "public.comp"
Column | Type | Collation | Nullable | Default
--------+------------------+-----------+----------+---------
x | integer | | |
y | double precision | | |
z | numeric | | |
memo | text | | |さっそく実行し、特にエラーもなく終了する。
$ ./pg2arrow postgres -o /tmp/hogehoge -c "SELECT * FROM hogehoge ORDER BY ymd"
PyArrowで結果を確認する
以下のように、PyArrowを用いて Apache Arrow 形式のファイルを読み出す事ができる。
>>> import pyarrow as pa
>>> X = pa.RecordBatchFileReader("/tmp/hogehoge").read_all()
>>> X.schema
id: int32
a: int64
b: double
c: struct<x: int32, y: double, z: decimal(30, 11), memo: string>
child 0, x: int32
child 1, y: double
child 2, z: decimal(30, 11)
child 3, memo: string
d: string
e: double
ymd: date32[day]DBテーブルの定義に準じて、Apache Arrowとしてのスキーマが作られている事が分かる。
ちなみに、Pandasのread_sqlを使ってテーブルを読み出すと、複合型であるcは文字列型にされてしまっている。これは流石にチョットツライ。
>>> A = pd.read_sql(sql="SELECT * FROM hogehoge LIMIT 1000", con="postgresql://localhost/postgres")
>>> B = pa.Table.from_pandas(A)
>>> B.schema
id: int64
a: int64
b: double
c: string
d: string
e: double
ymd: date32[day]
__index_level_0__: int64
metadata
--------
{b'pandas': b'{"index_columns": ["__index_level_0__"], "column_indexes": [{"fi'
b'eld_name": null, "name": null, "numpy_type": "object", "pandas_t'
b'ype": "unicode", "metadata": {"encoding": "UTF-8"}}], "columns":'
b' [{"field_name": "id", "name": "id", "numpy_type": "int64", "pan'
b'das_type": "int64", "metadata": null}, {"field_name": "a", "name'
b'": "a", "numpy_type": "int64", "pandas_type": "int64", "metadata'
b'": null}, {"field_name": "b", "name": "b", "numpy_type": "float6'
b'4", "pandas_type": "float64", "metadata": null}, {"field_name": '
b'"c", "name": "c", "numpy_type": "object", "pandas_type": "unicod'
b'e", "metadata": null}, {"field_name": "d", "name": "d", "numpy_t'
b'ype": "object", "pandas_type": "unicode", "metadata": null}, {"f'
b'ield_name": "e", "name": "e", "numpy_type": "float64", "pandas_t'
b'ype": "float64", "metadata": null}, {"field_name": "ymd", "name"'
b': "ymd", "numpy_type": "object", "pandas_type": "date", "metadat'
b'a": null}, {"field_name": "__index_level_0__", "name": null, "nu'
b'mpy_type": "int64", "pandas_type": "int64", "metadata": null}], '
b'"pandas_version": "0.22.0"}'}中身の方を見てみると、このような感じで正しくクエリの結果を保存できている事がわかる。
>>> X.to_pandas()
id a b c \
0 24 3379 96.200935 {'memo': '1ff1de774005f8da13f42943881c655f', '...
1 2041 2208 71.122772 {'memo': '3416a75f4cea9109507cacd8e2f2aefc', '...
2 2042 7040 54.081142 {'memo': 'a1d0c6e83f027327d8461063f4ac58a6', '...
3 2043 1635 92.302224 {'memo': '17e62166fc8586dfa4d1bc0e1742c08b', '...
4 2044 3295 58.273429 {'memo': 'f7177163c833dff4b38fc8d2872f1ec6', '...
5 2045 9671 58.085893 {'memo': '6c8349cc7260ae62e3b1396831a8398f', '...
: : :
d e ymd
0 f4c1893e352a4e08d1a3b3b444c2d692 34.916724 2025-08-08
1 13f5d46a9f51e30dac02b33b74e9043e 11.098757 2018-09-26
2 b041a9cb97b220c1b073266af31cb45f 81.570772 2025-03-02
3 77e882ed95bf5838abd1b4336a7d2fdc 16.990162 2016-12-29
4 f78d9fde23891ae293f07c576982155b 7.017451 2020-10-08
5 4dfe9131c7ee3a44583a8d21d5ca26a2 3.979350 2023-03-09
今後のロードマップ
ひとまず、pg2arrowを使う事で PostgreSQL ⇒ Apache Arrow へデータを変換する流れができた。
(と、同時に手を動かしてデータ形式を一通り勉強したといえる。えへん。)
次は本来の目標である、FDWを使って Apache Arrow ⇒ PostgreSQL へのデータの流れを作る事。
これによって、IoT/M2Mの領域で扱われるような大量データを、毎度DBへインポートする事なく集計・解析系のクエリで処理する事ができるようになる。
他に、既に個別に頂いているpg2arrowの拡張としては、既に存在するArrowファイルに差分だけを追加する--appendモードや、複数のArrowファイルをマージして一個の巨大なArrowファイルに再編する--mergeモードなど。この辺も追って作っていきたい。