先日、Apache Arrow 東京ミートアップ 2019というイベントに参加させていただいた。
発表自体は、SSD-to-GPU Direct SQLからArrow_Fdw、4GPU+16SSDによる最近のベンチマークの紹介などで、目新しいものというよりは総集編であるが、懇親会の際にこんな質問をいただいた。
『Apache Arrowファイルの追記ってどうやってるんですか?』
曰く、FluentdのApache Arrow出力用プラグインにおいて、集積したログの書き出しのタイミングとその際のI/O負荷の問題だそうな。
確かに Apache Arrow は列志向なので、一見、既に100万行分のデータを保持しているファイルに、新たに10万行のデータを追加するには、ファイル全体の再編成と書き直しが必要に見えるかもしれない。
しかし、この人の内部フォーマットをよく読んでみると、データの追記に関してはかなり低いコストで実行する事ができるように設計されており、上の例で言えば、追記処理に関しては『既に書き込んだ100万行は全く変更せず、10万行分の書き込み+α程度』のコストで実行できる。
今回は、PostgreSQLのテーブル(クエリ結果)を Apache Arrow ファイルとして保存する pg2arrow コマンドにこの追記モードを付けてみたという話を書いてみる事にする。
内部データ構造 Record Batch
今年の頭に、pg2arrow
やArrow_Fdw
の機能を実装するために Apache Arrow のファイル形式を調査してみた。割と読みごたえのあるバイナリであるが、追記処理を実装するにあたって重要なポイントは一つ。『Arrowファイルは尻から読む』
kaigai.hatenablog.com
Apache Arrowファイルの内部構造を順に追っていくと、ざっくり、先頭から以下の順に並んでいる。
- ヘッダ(
"ARROW1\0\0"
) - スキーマ定義(列の名前やデータ型など)
- ディクショナリ・バッチ(辞書圧縮用の辞書;複数)
- レコード・バッチ(N件分のデータを列形式で記録;複数)
- フッタ(スキーマ定義の複製と、ディクショナリ/レコードバッチの位置情報)
意外かもしれないが、例えば100万件分のデータ要素を持つApache Arrow形式ファイルであっても、内部のデータ構造は100万個のXXX型配列が並んでいる・・・とは必ずしも言い切れない。(もちろん、そういう編成にもできるが)
Apache Arrow形式ファイルの内側には、任意の件数のデータを列形式で束ねたレコード・バッチ(Record Batch)と呼ばれる領域がある。例えば1万件ごとにRecord Batchで切るとすると、Record Batchの内側にはA列の要素が1万個、B列の要素が1万個、C列の要素が1万個・・・と並んでおり、全体で100万件なら、このRecord Batchが100個順番に並んでいるという形になる。
もちろん、100万件のデータを持つRecord Batchを1個で、総計100万件のデータを含むApache Arrowファイルを構成する事もできる。この場合は、単純にA列の要素が100万個、B列の要素が100万個・・・と配置される事になる。
Arrowファイルは尻から読む
Apache Arrow形式ファイルの『どこからどこまで』がレコード・バッチなのかという情報は、ファイルの一番最後、フッタ領域に書かれている。例えば『ファイルの先頭から1200バイト、以降10MBはRecord Batch-0である』というノリである。*1
さて、このフッタ領域は、レコード・バッチの並びの直後に存在する。
例えば、レコード・バッチを100個持つApache Arrowファイルのフッタには、レコード・バッチ領域を指し示す(オフセット・サイズ)の組が100個書き込まれているわけだが、ここで100番目のレコード・バッチの直後に101番目のレコード・バッチを追加してみる事にする。
そうすると、フッタ領域は当然ながら上書きされてしまう。南無。
そしてその後ろに(オフセット・サイズ)の組を101個持つフッタを新たに書き込んでやると、ファイルの前半への書き込み操作を一切行う事なく、Apache Arrow形式ファイルへの追記操作を行えることになる。
これを、pg2arrow
コマンドに実装してみた。
pg2arrowによるテーブルのダンプ(Arrow形式)
pg2arrow
コマンドは、PG-Stromのユーティリティとして配布しているコマンドで、pg_dump
に似たノリでSQLの実行結果をPostgreSQLからダンプし、Apache Arrow形式ファイルとして保存するためのツールである。
使い方は以下の通り。
$ ./pg2arrow --help Usage: pg2arrow [OPTION]... [DBNAME [USERNAME]] General options: -d, --dbname=DBNAME database name to connect to -c, --command=COMMAND SQL command to run -f, --file=FILENAME SQL command from file (-c and -f are exclusive, either of them must be specified) -o, --output=FILENAME result file in Apache Arrow format --append=FILENAME result file to be appended --output and --append are exclusive to use at the same time. If neither of them are specified, it creates a temporary file.) Arrow format options: -s, --segment-size=SIZE size of record batch for each (default: 256MB) Connection options: -h, --host=HOSTNAME database server host -p, --port=PORT database server port -U, --username=USERNAME database user name -w, --no-password never prompt for password -W, --password force password prompt Debug options: --dump=FILENAME dump information of arrow file --progress shows progress of the job. Report bugs to <pgstrom@heterodb.com>.
ある程度複雑なデータ構造の方が面白いので、列挙型(Enum)、複合型を含むテーブルを作成し、テストデータを投入してみた。
postgres=# create type label as enum ('Tokyo','Osaka','Nagoya','Yokohama','Kyoto'); CREATE TYPE postgres=# create type comp as (x int, y real, z text); CREATE TYPE postgres=# create table t (id int, a float, b numeric, c comp, d label, e timestamp, f text); CREATE TABLE postgres=# insert into t (select x, 1000*pgstrom.random_float(2), 1000*pgstrom.random_float(2), null, /* set later */ (case (5*random())::int when 0 then 'Tokyo' when 1 then 'Osaka' when 2 then 'Nagoya' when 3 then 'Yokohama' when 4 then 'Kyoto' else null end)::label, pgstrom.random_timestamp(2), md5(x::text) from generate_series(1,1000) x); INSERT 0 1000 postgres=# update t set c.x = pgstrom.random_int(2,-1000,1000), c.y = 1000*pgstrom.random_float(2), c.z = 'ROW#' || id::text; UPDATE 1000
pgstrom.random_xxxx()
というのはランダムなデータを時々NULL
を混ぜつつ生成してくれる関数。
でき上ったテストデータはこういった感じになっている。
postgres=# select * from t order by id limit 8; id | a | b | c | d | e | f ----+------------------+------------------+----------------------+----------+----------------------------+---------------------------------- 1 | 793.46183025905 | 718.62576097186 | (649,104.976,ROW#1) | | 2016-10-20 02:42:38.101797 | c4ca4238a0b923820dcc509a6f75849b 2 | 626.670837228499 | 913.748125505516 | (-582,598.061,ROW#2) | Yokohama | 2018-06-29 06:38:45.351404 | c81e728d9d4c2f636f067f89cc14862c 3 | 862.318314082137 | 810.705138747909 | (419,382.42,ROW#3) | Yokohama | 2017-03-19 19:20:20.993358 | eccbc87e4b5ce2fe28308fd9f2a7baf3 4 | 686.473733599518 | | (4,176.449,ROW#4) | Osaka | 2022-01-10 23:46:09.343218 | a87ff679a2f3e71d9181a67b7542122c 5 | 957.214601783647 | 324.905697873284 | (180,320.756,ROW#5) | | 2022-08-31 14:58:22.203866 | e4da3b7fbbce2345d7772b0674a318d5 6 | 284.569805620504 | 32.4126081692114 | (585,601.726,ROW#6) | | 2015-09-09 13:00:28.160389 | 1679091c5a880faf6fb5e6087eb1b2dc 7 | 595.694404372803 | 324.796066770701 | (663,489.07,ROW#7) | Yokohama | 2019-07-28 00:20:45.679467 | 8f14e45fceea167a5a36dedd4bea2543 8 | 770.799666070752 | 44.1467431579469 | (603,646.233,ROW#8) | Osaka | 2017-07-14 05:46:05.558446 | c9f0f895fb98ab9159f51fd0297e236d (8 rows)
ゴチャゴチャしているが、c
列は数値と文字列の複合型、d
列は見た目文字列だが内部的には32bit整数値のEnum型のデータである。
このテーブルをpg2arrow
を使ってダンプする。1000件程度なので、レコードバッチは1個だけ。
$ ./pg2arrow -h localhost -d postgres -c "SELECT * FROM t" -o /tmp/hoge.arrow
PythonのApache Arrowバインディングである PyArrow を使って中身を検算してみる事にする。
$ python3 Python 3.6.8 (default, Oct 7 2019, 17:58:22) [GCC 8.2.1 20180905 (Red Hat 8.2.1-3)] on linux Type "help", "copyright", "credits" or "license" for more information. >>> import pyarrow as pa >>> import pandas as pd >>> f = pa.ipc.open_file('/tmp/hoge.arrow') >>> f.schema id: int32 a: double b: decimal(11, 30) c: struct<x: int32, y: float, z: string> child 0, x: int32 child 1, y: float child 2, z: string d: dictionary<values=string, indices=int32, ordered=0> e: timestamp[us] f: string
ArrowのStruct型にマップされたPostgreSQLの複合型や、Dictionary付きのString型としてマップされたPostgreSQLの列挙型も含め、スキーマ定義はきちんと反映されている事が分かる。
続いて、中身を確認してみる。レコードバッチの数は一個。
>>> f.num_record_batches 1
Pandasデータフレームに変換してみる
>>> X = f.get_record_batch(0) >>> Y = X.to_pandas() >>> Y id ... f 0 1 ... c4ca4238a0b923820dcc509a6f75849b 1 2 ... c81e728d9d4c2f636f067f89cc14862c 2 3 ... eccbc87e4b5ce2fe28308fd9f2a7baf3 3 4 ... a87ff679a2f3e71d9181a67b7542122c 4 5 ... e4da3b7fbbce2345d7772b0674a318d5 .. ... ... ... 995 996 ... 0b8aff0438617c055eb55f0ba5d226fa 996 997 ... ec5aa0b7846082a2415f0902f0da88f2 997 998 ... 9ab0d88431732957a618d4a469a0d4c3 998 999 ... b706835de79a2b4e80506f582af3676a 999 1000 ... a9b7ba70783b617e9998dc4dd82eb3c5
一応、md5チェックサムの値(文字列)をぶち込んだf
列はSQLで見た値と同じものが入っている。
タイムスタンプ型であるe
列の内容をSQLの出力と比較してみる。
>>> Y['e'] 0 2016-10-20 02:42:38.101797 1 2018-06-29 06:38:45.351404 2 2017-03-19 19:20:20.993358 3 2022-01-10 23:46:09.343218 4 2022-08-31 14:58:22.203866 ... 995 2016-05-12 00:03:53.986811 996 2023-12-15 02:51:58.066008 997 2020-10-29 14:27:54.705099 998 2015-10-02 13:18:13.312924 999 2020-05-06 04:06:50.749883 Name: e, Length: 1000, dtype: datetime64[ns]
postgres=# select id,e from t order by id limit 8; id | e ----+---------------------------- 1 | 2016-10-20 02:42:38.101797 2 | 2018-06-29 06:38:45.351404 3 | 2017-03-19 19:20:20.993358 4 | 2022-01-10 23:46:09.343218 5 | 2022-08-31 14:58:22.203866 6 | 2015-09-09 13:00:28.160389 7 | 2019-07-28 00:20:45.679467 8 | 2017-07-14 05:46:05.558446 (8 rows)
pg2arrowによる追記
では、今回の新機能 --append
モードを試してみる。
同じテーブルを再度追記し、レコードバッチを2個、計2,000行のApache Arrowファイルを作成する事にするが、その前に少し錯乱要因として、列挙型に新しいラベルを追加しておく事にする。
postgres=# alter type label add value 'Kobe'; ALTER TYPE postgres=# update t set d = 'Kobe' where d is null; UPDATE 94
今度は、先ほど-o /tmp/hoge.arrow
と指定した部分を--append /tmp/hoge.arrow
と変えてみる。
$ ./pg2arrow -h localhost -d postgres -c "SELECT * FROM t" --append /tmp/hoge.arrow
PyArrowを介して中身を見てみると、スキーマ定義は前の通り(あたり前)で、レコードバッチが2個に増えている。
$ python3 Python 3.6.8 (default, Oct 7 2019, 17:58:22) [GCC 8.2.1 20180905 (Red Hat 8.2.1-3)] on linux Type "help", "copyright", "credits" or "license" for more information. >>> import pyarrow as pa >>> import pandas as pd >>> f = pa.ipc.open_file('/tmp/hoge.arrow') >>> f.schema id: int32 a: double b: decimal(11, 30) c: struct<x: int32, y: float, z: string> child 0, x: int32 child 1, y: float child 2, z: string d: dictionary<values=string, indices=int32, ordered=0> e: timestamp[us] f: string >>> f.num_record_batches 2
追加した方のレコードバッチを確認すると、きちんと列挙型であるd
列にKobe
というラベルが出現している。
>>> X = f.get_record_batch(1) >>> Y = X.to_pandas() >>> Y['d'] 0 Kobe 1 Yokohama 2 Yokohama 3 Osaka 4 Kobe ... 995 Nagoya 996 Kobe 997 Nagoya 998 Kyoto 999 Yokohama Name: d, Length: 1000, dtype: category Categories (6, object): [Tokyo, Osaka, Nagoya, Yokohama, Kyoto, Kobe]
もちろん、元から存在する方のレコードバッチで内容が化けたりは、ない。
(PandasではNULL値はNaN
と表記されるようだ)
>>> X = f.get_record_batch(0) >>> Y = X.to_pandas() >>> Y['d'] 0 NaN 1 Yokohama 2 Yokohama 3 Osaka 4 NaN ... 995 Nagoya 996 NaN 997 Nagoya 998 Kyoto 999 Yokohama Name: d, Length: 1000, dtype: category Categories (6, object): [Tokyo, Osaka, Nagoya, Yokohama, Kyoto, Kobe]
これは何の布石なのか?
元々PG-StromにはGstore_Fdwという機能があり、GPUメモリ上に列ストアを作って、そこにデータをINSERTする事ができるようになっている。
kaigai.hatenablog.com
ただ、これはNVIDIAがRAPIDSフレームワークを発表する前に設計・開発したもので、内部のデータ形式はPG-Stromの独自。Pythonとの連携はcuPyで行うか、あるいはPL/CUDAから参照するしか手がなかった。
もう一つのFDWであるArrow_Fdwは、元々SSD-to-GPU Direct SQLを使うレベルの大量データ処理において、列データ形式を採用する事でより効率的なI/Oを目指したものだが、
- RAPIDSフレームワーク(cuDF)でも内部データ構造にArrowを使っている
- データの置き場所を色々選択できるようにしたら、同じコードを流用できる
- バルクデータの追記のみ(あと無条件DELETE)に限れば、書き込みもさほど無理が無い
という事で、Arrow_Fdwを拡張してGPUデバイスメモリ、あるいは容量単価の安い(とされる)Persistent Memoryなどのデータストアとして利用可能とし、Pythonスクリプトなどから容易にデータ連携を可能にするという青写真を描いている。
そうすると、TBを越えてくるような大量データは、先ずSSD-to-GPU Direct SQL + Arrow_Fdwを用いて高速な集計・前処理を実行。SQLでゴニョっと前処理したデータを、今度はGPUデバイスメモリ上のArrow_FdwにINSERTすれば、Pythonスクリプト側ではCSVファイルを読んだりすることなく、DBから直接バイナリのデータをインポートできる。
データセットを少し変えてみたくなった場合でも、WHERE句を少しいじれば異なるデータセットに対して機械学習エンジンがきちんと作用するかどうかを確認する事ができ、TRY&ERRORの生産性向上にかなりのレベルで寄与できるのではないかと考えている。
いかがなものだろうか? メリー・クリスマス