Dive into Apache Arrow(その4)- pg2arrow で追記モード

先日、Apache Arrow 東京ミートアップ 2019というイベントに参加させていただいた。

発表時の様子(photo by 畔勝さん)

発表自体は、SSD-to-GPU Direct SQLからArrow_Fdw、4GPU+16SSDによる最近のベンチマークの紹介などで、目新しいものというよりは総集編であるが、懇親会の際にこんな質問をいただいた。

Apache Arrowファイルの追記ってどうやってるんですか?』

曰く、FluentdのApache Arrow出力用プラグインにおいて、集積したログの書き出しのタイミングとその際のI/O負荷の問題だそうな。
確かに Apache Arrow は列志向なので、一見、既に100万行分のデータを保持しているファイルに、新たに10万行のデータを追加するには、ファイル全体の再編成と書き直しが必要に見えるかもしれない。
しかし、この人の内部フォーマットをよく読んでみると、データの追記に関してはかなり低いコストで実行する事ができるように設計されており、上の例で言えば、追記処理に関しては『既に書き込んだ100万行は全く変更せず、10万行分の書き込み+α程度』のコストで実行できる。

今回は、PostgreSQLのテーブル(クエリ結果)を Apache Arrow ファイルとして保存する pg2arrow コマンドにこの追記モードを付けてみたという話を書いてみる事にする。

内部データ構造 Record Batch

今年の頭に、pg2arrowArrow_Fdwの機能を実装するために Apache Arrow のファイル形式を調査してみた。割と読みごたえのあるバイナリであるが、追記処理を実装するにあたって重要なポイントは一つ。『Arrowファイルは尻から読む
kaigai.hatenablog.com

Apache Arrowファイルの内部構造を順に追っていくと、ざっくり、先頭から以下の順に並んでいる。

  • ヘッダ("ARROW1\0\0"
  • スキーマ定義(列の名前やデータ型など)
  • ディクショナリ・バッチ(辞書圧縮用の辞書;複数)
  • レコード・バッチ(N件分のデータを列形式で記録;複数)
  • フッタ(スキーマ定義の複製と、ディクショナリ/レコードバッチの位置情報)

意外かもしれないが、例えば100万件分のデータ要素を持つApache Arrow形式ファイルであっても、内部のデータ構造は100万個のXXX型配列が並んでいる・・・とは必ずしも言い切れない。(もちろん、そういう編成にもできるが)
Apache Arrow形式ファイルの内側には、任意の件数のデータを列形式で束ねたレコード・バッチ(Record Batch)と呼ばれる領域がある。例えば1万件ごとにRecord Batchで切るとすると、Record Batchの内側にはA列の要素が1万個、B列の要素が1万個、C列の要素が1万個・・・と並んでおり、全体で100万件なら、このRecord Batchが100個順番に並んでいるという形になる。
もちろん、100万件のデータを持つRecord Batchを1個で、総計100万件のデータを含むApache Arrowファイルを構成する事もできる。この場合は、単純にA列の要素が100万個、B列の要素が100万個・・・と配置される事になる。

Arrowファイルは尻から読む

Apache Arrow形式ファイルの『どこからどこまで』がレコード・バッチなのかという情報は、ファイルの一番最後、フッタ領域に書かれている。例えば『ファイルの先頭から1200バイト、以降10MBはRecord Batch-0である』というノリである。*1

さて、このフッタ領域は、レコード・バッチの並びの直後に存在する。
例えば、レコード・バッチを100個持つApache Arrowファイルのフッタには、レコード・バッチ領域を指し示す(オフセット・サイズ)の組が100個書き込まれているわけだが、ここで100番目のレコード・バッチの直後に101番目のレコード・バッチを追加してみる事にする。
そうすると、フッタ領域は当然ながら上書きされてしまう。南無。
そしてその後ろに(オフセット・サイズ)の組を101個持つフッタを新たに書き込んでやると、ファイルの前半への書き込み操作を一切行う事なく、Apache Arrow形式ファイルへの追記操作を行えることになる。

これを、pg2arrowコマンドに実装してみた。

pg2arrowによるテーブルのダンプ(Arrow形式)

pg2arrowコマンドは、PG-Stromのユーティリティとして配布しているコマンドで、pg_dumpに似たノリでSQLの実行結果をPostgreSQLからダンプし、Apache Arrow形式ファイルとして保存するためのツールである。
使い方は以下の通り。

$ ./pg2arrow --help
Usage:
  pg2arrow [OPTION]... [DBNAME [USERNAME]]

General options:
  -d, --dbname=DBNAME     database name to connect to
  -c, --command=COMMAND   SQL command to run
  -f, --file=FILENAME     SQL command from file
      (-c and -f are exclusive, either of them must be specified)
  -o, --output=FILENAME   result file in Apache Arrow format
      --append=FILENAME   result file to be appended

      --output and --append are exclusive to use at the same time.
      If neither of them are specified, it creates a temporary file.)

Arrow format options:
  -s, --segment-size=SIZE size of record batch for each
      (default: 256MB)

Connection options:
  -h, --host=HOSTNAME     database server host
  -p, --port=PORT         database server port
  -U, --username=USERNAME database user name
  -w, --no-password       never prompt for password
  -W, --password          force password prompt

Debug options:
      --dump=FILENAME     dump information of arrow file
      --progress          shows progress of the job.

Report bugs to <pgstrom@heterodb.com>.

ある程度複雑なデータ構造の方が面白いので、列挙型(Enum)、複合型を含むテーブルを作成し、テストデータを投入してみた。

postgres=# create type label as enum ('Tokyo','Osaka','Nagoya','Yokohama','Kyoto');
CREATE TYPE
postgres=# create type comp as (x int, y real, z text);
CREATE TYPE
postgres=# create table t (id int, a float, b numeric, c comp, d label, e timestamp, f text);
CREATE TABLE

postgres=# insert into t (select x, 1000*pgstrom.random_float(2),
                                    1000*pgstrom.random_float(2),
                                    null, /* set later */
                                    (case (5*random())::int when 0 then 'Tokyo'
                                                            when 1 then 'Osaka'
                                                            when 2 then 'Nagoya'
                                                            when 3 then 'Yokohama'
                                                            when 4 then 'Kyoto'
                                                            else null end)::label,
                                    pgstrom.random_timestamp(2),
                                    md5(x::text)
                            from generate_series(1,1000) x);
INSERT 0 1000
postgres=# update t set c.x = pgstrom.random_int(2,-1000,1000),
                        c.y = 1000*pgstrom.random_float(2),
                        c.z = 'ROW#' || id::text;
UPDATE 1000

pgstrom.random_xxxx()というのはランダムなデータを時々NULLを混ぜつつ生成してくれる関数。
でき上ったテストデータはこういった感じになっている。

postgres=# select * from t order by id limit 8;
 id |        a         |        b         |          c           |    d     |             e              |                f
----+------------------+------------------+----------------------+----------+----------------------------+----------------------------------
  1 |  793.46183025905 |  718.62576097186 | (649,104.976,ROW#1)  |          | 2016-10-20 02:42:38.101797 | c4ca4238a0b923820dcc509a6f75849b
  2 | 626.670837228499 | 913.748125505516 | (-582,598.061,ROW#2) | Yokohama | 2018-06-29 06:38:45.351404 | c81e728d9d4c2f636f067f89cc14862c
  3 | 862.318314082137 | 810.705138747909 | (419,382.42,ROW#3)   | Yokohama | 2017-03-19 19:20:20.993358 | eccbc87e4b5ce2fe28308fd9f2a7baf3
  4 | 686.473733599518 |                  | (4,176.449,ROW#4)    | Osaka    | 2022-01-10 23:46:09.343218 | a87ff679a2f3e71d9181a67b7542122c
  5 | 957.214601783647 | 324.905697873284 | (180,320.756,ROW#5)  |          | 2022-08-31 14:58:22.203866 | e4da3b7fbbce2345d7772b0674a318d5
  6 | 284.569805620504 | 32.4126081692114 | (585,601.726,ROW#6)  |          | 2015-09-09 13:00:28.160389 | 1679091c5a880faf6fb5e6087eb1b2dc
  7 | 595.694404372803 | 324.796066770701 | (663,489.07,ROW#7)   | Yokohama | 2019-07-28 00:20:45.679467 | 8f14e45fceea167a5a36dedd4bea2543
  8 | 770.799666070752 | 44.1467431579469 | (603,646.233,ROW#8)  | Osaka    | 2017-07-14 05:46:05.558446 | c9f0f895fb98ab9159f51fd0297e236d
(8 rows)

ゴチャゴチャしているが、c列は数値と文字列の複合型、d列は見た目文字列だが内部的には32bit整数値のEnum型のデータである。

このテーブルをpg2arrowを使ってダンプする。1000件程度なので、レコードバッチは1個だけ。

$ ./pg2arrow -h localhost -d postgres -c "SELECT * FROM t" -o /tmp/hoge.arrow

PythonApache Arrowバインディングである PyArrow を使って中身を検算してみる事にする。

$ python3
Python 3.6.8 (default, Oct  7 2019, 17:58:22)
[GCC 8.2.1 20180905 (Red Hat 8.2.1-3)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyarrow as pa
>>> import pandas as pd
>>> f = pa.ipc.open_file('/tmp/hoge.arrow')
>>> f.schema
id: int32
a: double
b: decimal(11, 30)
c: struct<x: int32, y: float, z: string>
  child 0, x: int32
  child 1, y: float
  child 2, z: string
d: dictionary<values=string, indices=int32, ordered=0>
e: timestamp[us]
f: string

ArrowのStruct型にマップされたPostgreSQLの複合型や、Dictionary付きのString型としてマップされたPostgreSQLの列挙型も含め、スキーマ定義はきちんと反映されている事が分かる。

続いて、中身を確認してみる。レコードバッチの数は一個。

>>> f.num_record_batches
1

Pandasデータフレームに変換してみる

>>> X = f.get_record_batch(0)
>>> Y = X.to_pandas()
>>> Y
       id  ...                                 f
0       1  ...  c4ca4238a0b923820dcc509a6f75849b
1       2  ...  c81e728d9d4c2f636f067f89cc14862c
2       3  ...  eccbc87e4b5ce2fe28308fd9f2a7baf3
3       4  ...  a87ff679a2f3e71d9181a67b7542122c
4       5  ...  e4da3b7fbbce2345d7772b0674a318d5
..    ...  ...                               ...
995   996  ...  0b8aff0438617c055eb55f0ba5d226fa
996   997  ...  ec5aa0b7846082a2415f0902f0da88f2
997   998  ...  9ab0d88431732957a618d4a469a0d4c3
998   999  ...  b706835de79a2b4e80506f582af3676a
999  1000  ...  a9b7ba70783b617e9998dc4dd82eb3c5

一応、md5チェックサムの値(文字列)をぶち込んだf列はSQLで見た値と同じものが入っている。

タイムスタンプ型であるe列の内容をSQLの出力と比較してみる。

>>> Y['e']
0     2016-10-20 02:42:38.101797
1     2018-06-29 06:38:45.351404
2     2017-03-19 19:20:20.993358
3     2022-01-10 23:46:09.343218
4     2022-08-31 14:58:22.203866
                 ...
995   2016-05-12 00:03:53.986811
996   2023-12-15 02:51:58.066008
997   2020-10-29 14:27:54.705099
998   2015-10-02 13:18:13.312924
999   2020-05-06 04:06:50.749883
Name: e, Length: 1000, dtype: datetime64[ns]
postgres=# select id,e from t order by id limit 8;
 id |             e
----+----------------------------
  1 | 2016-10-20 02:42:38.101797
  2 | 2018-06-29 06:38:45.351404
  3 | 2017-03-19 19:20:20.993358
  4 | 2022-01-10 23:46:09.343218
  5 | 2022-08-31 14:58:22.203866
  6 | 2015-09-09 13:00:28.160389
  7 | 2019-07-28 00:20:45.679467
  8 | 2017-07-14 05:46:05.558446
(8 rows)

pg2arrowによる追記

では、今回の新機能 --append モードを試してみる。

同じテーブルを再度追記し、レコードバッチを2個、計2,000行のApache Arrowファイルを作成する事にするが、その前に少し錯乱要因として、列挙型に新しいラベルを追加しておく事にする。

postgres=# alter type label add value 'Kobe';
ALTER TYPE
postgres=# update t set d = 'Kobe' where d is null;
UPDATE 94

今度は、先ほど-o /tmp/hoge.arrowと指定した部分を--append /tmp/hoge.arrowと変えてみる。

$ ./pg2arrow -h localhost -d postgres -c "SELECT * FROM t" --append /tmp/hoge.arrow

PyArrowを介して中身を見てみると、スキーマ定義は前の通り(あたり前)で、レコードバッチが2個に増えている。

$ python3
Python 3.6.8 (default, Oct  7 2019, 17:58:22)
[GCC 8.2.1 20180905 (Red Hat 8.2.1-3)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyarrow as pa
>>> import pandas as pd
>>> f = pa.ipc.open_file('/tmp/hoge.arrow')
>>> f.schema
id: int32
a: double
b: decimal(11, 30)
c: struct<x: int32, y: float, z: string>
  child 0, x: int32
  child 1, y: float
  child 2, z: string
d: dictionary<values=string, indices=int32, ordered=0>
e: timestamp[us]
f: string
>>> f.num_record_batches
2

追加した方のレコードバッチを確認すると、きちんと列挙型であるd列にKobeというラベルが出現している。

>>> X = f.get_record_batch(1)
>>> Y = X.to_pandas()
>>> Y['d']
0          Kobe
1      Yokohama
2      Yokohama
3         Osaka
4          Kobe
         ...
995      Nagoya
996        Kobe
997      Nagoya
998       Kyoto
999    Yokohama
Name: d, Length: 1000, dtype: category
Categories (6, object): [Tokyo, Osaka, Nagoya, Yokohama, Kyoto, Kobe]

もちろん、元から存在する方のレコードバッチで内容が化けたりは、ない。
(PandasではNULL値はNaNと表記されるようだ)

>>> X = f.get_record_batch(0)
>>> Y = X.to_pandas()
>>> Y['d']
0           NaN
1      Yokohama
2      Yokohama
3         Osaka
4           NaN
         ...
995      Nagoya
996         NaN
997      Nagoya
998       Kyoto
999    Yokohama
Name: d, Length: 1000, dtype: category
Categories (6, object): [Tokyo, Osaka, Nagoya, Yokohama, Kyoto, Kobe]

これは何の布石なのか?

元々PG-StromにはGstore_Fdwという機能があり、GPUメモリ上に列ストアを作って、そこにデータをINSERTする事ができるようになっている。
kaigai.hatenablog.com

ただ、これはNVIDIAがRAPIDSフレームワークを発表する前に設計・開発したもので、内部のデータ形式はPG-Stromの独自。Pythonとの連携はcuPyで行うか、あるいはPL/CUDAから参照するしか手がなかった。

もう一つのFDWであるArrow_Fdwは、元々SSD-to-GPU Direct SQLを使うレベルの大量データ処理において、列データ形式を採用する事でより効率的なI/Oを目指したものだが、

  • RAPIDSフレームワーク(cuDF)でも内部データ構造にArrowを使っている
  • データの置き場所を色々選択できるようにしたら、同じコードを流用できる
  • バルクデータの追記のみ(あと無条件DELETE)に限れば、書き込みもさほど無理が無い

という事で、Arrow_Fdwを拡張してGPUバイスメモリ、あるいは容量単価の安い(とされる)Persistent Memoryなどのデータストアとして利用可能とし、Pythonスクリプトなどから容易にデータ連携を可能にするという青写真を描いている。

そうすると、TBを越えてくるような大量データは、先ずSSD-to-GPU Direct SQL + Arrow_Fdwを用いて高速な集計・前処理を実行。SQLでゴニョっと前処理したデータを、今度はGPUバイスメモリ上のArrow_FdwにINSERTすれば、Pythonスクリプト側ではCSVファイルを読んだりすることなく、DBから直接バイナリのデータをインポートできる。
データセットを少し変えてみたくなった場合でも、WHERE句を少しいじれば異なるデータセットに対して機械学習エンジンがきちんと作用するかどうかを確認する事ができ、TRY&ERRORの生産性向上にかなりのレベルで寄与できるのではないかと考えている。

いかがなものだろうか? メリー・クリスマス

*1:この他には、ファイルの先頭にも書かれているスキーマ定義のコピーもフッタ領域に書かれている。おそらく、これはファイルの末尾と先頭を読むためにディスクをシークせずに済ますための工夫であろう