pg2arrowを並列化する

今回は皆さんが大好きな便利ツール「pg2arrow」のお話です。

PostgreSQLでポータブルな列指向データ形式 Apache Arrow を読み出すには、Arrow_Fdwを利用する事ができます。
PG-StromではGPU-Direct SQLにも対応していますし、列指向データという事もあって、被参照列しかI/Oが発生しない、同じ列のデータが近傍に固まっているという大量データ処理に適した特性を持ってもいます。

また、Apache Arrow形式のファイルを作成するにはPyArrowやPandasなど様々なツールがありますが、我々DB屋としてはPostgreSQLに格納されたトランザクショナルなデータを、分析用にApache Arrow形式として吐き出せるととても嬉しい。そんな時に使えるツールがpg2arrowなのです。

pg2arrowは、PostgreSQLにクエリを投げ、その問合せ結果をApache Arrow形式のファイルとして保存するためのツールで、PG-Stromと同梱して配布されています(ソースコードのかなりの部分を Arrow_Fdw と共用しているためです)。問い合わせに使用するSQLコマンドは、テーブルを単純にダンプするだけではなく、例えばWHERE句で抽出条件を指定したり、複数のテーブルをJOINした結果を含めるといった事も可能です。
GitHubのログを確認したところ、最初のコミットが2019年4月ですので、およそ4年ほど前に設計・開発したツールという事ですね。

一方、pg2arrowには並列動作をサポートしていないという弱点がありました。
そのため、テーブルをダンプするPostgreSQL側のCPUも、データを受け取って書き込むpg2arrow側のCPUも、またその間のネットワークもリソースに余裕があるにも関わらず結構な遊び状態となっており、『例えば1TBくらいのテーブルをArrowに変換してベンチマークを取るぜ!』といった場合でも、夜2:00頃*1にポチっとpg2arrowを走らせて、翌朝結果を確認するという事が常でした。

普通に考えて、並列化すれば大きく高速化するはずです。

データの重複を防ぐために

DBから読み出したデータを別の形式にして保存するだけですので、原理的には、複数のセッションを作成して、それぞれが並列に処理を行えば済む話です。
例えば、あるテーブルをフルダンプしてArrowに変換するとして、各DBクライアントが互いに重なり合わず、しかも漏れの無いような条件で問い合わせを実行すれば良いわけです。典型的には何かのフィールドにハッシュ関数を与え、クライアント数で割った時の剰余がクライアント番号に一致するものだけを取り出せば事足ります。
(これは PostgreSQL のパラレルクエリでも使われているアイデアです)

しかし、考慮すべき点はもう一つあります。
例えば、クライアントAがDBに接続した後、別の誰かが対象テーブルを100行更新し、その後にクライアントB、クライアントCがDBに接続したとします。
この時、更新された100行のうち30行はオレンジ色の、40行は水色の、30行はウグイス色の行だとします。そうすると、結果として生成されたArrowファイルは「部分的に更新された」不整合な状態となってしまいます。

ここでは古典的なテクニックであるスナップショット同期関数を使います。
最初のクライアントAがDBに接続し、トランザクションを開始した後、pg_export_snapshot()を呼び出します。この関数はトランザクションのスナップショットに紐づいたユニークな識別子を返しますが、これを他のセッションでインポートすると、異なるセッションであっても全く同じビューを再現する事ができます。
これはお馴染みpg_dumpの並列ダンプでも用いられており、実際の挙動は以下のようなイメージです。

クライアントA(最初に接続する)

ssbm=# BEGIN READ ONLY;
BEGIN
ssbm=*# SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SET
ssbm=*# SELECT pg_catalog.pg_export_snapshot();
 pg_export_snapshot
---------------------
 0000000B-000000B0-1
(1 row)

クライアントB、C、...(ワーカー)

ssbm=# BEGIN READ ONLY;
BEGIN
ssbm=*# SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SET
ssbm=*# SET TRANSACTION SNAPSHOT '0000000B-000000B0-1';
SET

pg2arrowを並列に起動してみる。

pg2arrowの並列モードをサポートするために追加されたのは、-n|--num-workers=N_WORKERSオプションと、-k|--parallel-keys=PARALLEL_KEYSオプションの2つです。
これらは互いに排他的で、同じテーブルをスキャンする際に読み出すデータが重ならないよう検索条件を調整するための方法が若干異なってきます。
しかし、内部的な並列処理(ワーカースレッドの挙動)は変わりませんので、対象となるテーブルの設計やデータの特性によって使い分けてください。

$ ./pg2arrow --help
Usage:
  pg2arrow [OPTION] [database] [username]

General options:
  -d, --dbname=DBNAME   Database name to connect to
  -c, --command=COMMAND SQL command to run
  -t, --table=TABLENAME Equivalent to '-c SELECT * FROM TABLENAME'
      (-c and -t are exclusive, either of them must be given)
  -n, --num-workers=N_WORKERS    Enables parallel dump mode.
                        It requires the SQL command contains $(WORKER_ID)
                        and $(N_WORKERS), to be replaced by the numeric
                        worker-id and number of workers.
  -k, --parallel-keys=PARALLEL_KEYS Enables yet another parallel dump.
                        It requires the SQL command contains $(PARALLEL_KEY)
                        to be replaced by the comma separated token in the
                        PARALLEL_KEYS.
      (-n and -k are exclusive, either of them can be give if parallel dump.
       It is user's responsibility to avoid data duplication.)
      --inner-join=SUB_COMMAND
      --outer-join=SUB_COMMAND
  -o, --output=FILENAME result file in Apache Arrow format
      --append=FILENAME result Apache Arrow file to be appended
      (--output and --append are exclusive. If neither of them
       are given, it creates a temporary file.)
  -S, --stat[=COLUMNS] embeds min/max statistics for each record batch
                       COLUMNS is a comma-separated list of the target
                       columns if partially enabled.

Arrow format options:
  -s, --segment-size=SIZE size of record batch for each

Connection options:
  -h, --host=HOSTNAME  database server host
  -p, --port=PORT      database server port
  -u, --user=USERNAME  database user name
  -w, --no-password    never prompt for password
  -W, --password       force password prompt

Other options:
      --dump=FILENAME  dump information of arrow file
      --progress       shows progress of the job
      --set=NAME:VALUE config option to set before SQL execution
      --help           shows this message

Report bugs to <pgstrom@heterodb.com>.
-n|--num-workers=N_WORKERSオプション

-nオプションはシンプルにワーカー数を指定します。
この時、Arrowファイルの元となる問合せSQL文には$(WORKER_ID)$(N_WORKERS)というマクロを埋め込む事ができ、この部分は0から始まるユニークなワーカーIDと、オプションで指定した並列度にそれぞれ置き換えられます。
つまり、-cオプションで指定するSQLをこのように変えれば良いわけです。

オリジナル:

SELECT * FROM lineorder

修正後:

SELECT * FROM lineorder WHERE lo_orderkey % $(N_WORKERS) = $(WORKER_ID)
-k|--parallel-keys=PARALLEL_KEYSオプション

-kオプションはPARALLEL_KEYSに指定したカンマ区切りのトークン毎にワーカースレッドを起動し、そのトークンをそれぞれSQLコマンド中の$(PARALLEL_KEY)に置き換えます。

例を使って説明しましょう。以下のようにパーティション化されたテーブルが存在し、パーティションの子テーブル毎にワーカースレッドを起動して並列にテーブルを読み出したい場合、-kオプションを次のように使います。

ssbm=# \d+
                                                List of relations
 Schema |       Name       |       Type        | Owner  | Persistence | Access method |    Size    | Description
--------+------------------+-------------------+--------+-------------+---------------+------------+-------------
 public | lineorder        | partitioned table | kaigai | permanent   |               | 0 bytes    |
 public | lineorder__p1992 | table             | kaigai | permanent   | heap          | 13 GB      |
 public | lineorder__p1993 | table             | kaigai | permanent   | heap          | 13 GB      |
 public | lineorder__p1994 | table             | kaigai | permanent   | heap          | 13 GB      |
 public | lineorder__p1995 | table             | kaigai | permanent   | heap          | 13 GB      |
 public | lineorder__p1996 | table             | kaigai | permanent   | heap          | 13 GB      |
 public | lineorder__p1997 | table             | kaigai | permanent   | heap          | 13 GB      |
 public | lineorder__p1998 | table             | kaigai | permanent   | heap          | 7894 MB    |
 public | lineorder__p1999 | table             | kaigai | permanent   | heap          | 8192 bytes |
 public | lineorder_unsort | table             | kaigai | permanent   | heap          | 87 GB      |
(10 rows)

以下の例では、パーティション子テーブルのサフィックスである年号部分を$(PARALLEL_KEY)によって置き換えます。
そうすると、-kオプションで指定したカンマ区切りのキー値ごとにそれぞれワーカーが生成され、結果として、それが各々パーティション子テーブルの条件なしスキャンを実行しています。

$ pg2arrow -d ssbm -c 'SELECT * FROM lineorder__p$(PARALLEL_KEY)' -o /opt/hoge/f_lineorder.arrow -k=1992,1993,1994,1995,1996,1997,1998 --progress
worker:1 SQL=[SELECT * FROM lineorder__p1993]
worker:3 SQL=[SELECT * FROM lineorder__p1995]
worker:2 SQL=[SELECT * FROM lineorder__p1994]
worker:4 SQL=[SELECT * FROM lineorder__p1996]
worker:5 SQL=[SELECT * FROM lineorder__p1997]
worker:6 SQL=[SELECT * FROM lineorder__p1998]
2024-03-31 20:33:58 RecordBatch[0]: offset=1648 length=268436376 (meta=920, body=268435456) nitems=1303083 by worker:0
2024-03-31 20:33:59 RecordBatch[1]: offset=268438024 length=268436376 (meta=920, body=268435456) nitems=1303083 by worker:3
          :
          :
worker:0 merged pending results by worker:4
2024-03-31 20:38:15 RecordBatch[460]: offset=123480734608 length=127664216 (meta=920, body=127663296) nitems=619722 by worker:0
Total elapsed time: 00:04:26

ワーカーの動作について

ここで、大雑把な処理の流れにも触れておく事にします。
Pg2Arrowが並列モードで起動すると、mainスレッドであるworker-0がスキーマ定義を作成するなどの初期設定を行い、その後、他のワーカーを順次起動していきます。
各ワーカーはそれぞれPostgreSQLに接続し、それぞれ約256MBのバッファ((-sオプションで変更可))が埋まるたびに出力先のArrowファイルへと書き込みを行います。
Arrowファイルは内部がRecord Batchと呼ばれるブロックに分割されており、ファイルポインタを進める部分さえアトミックに行ってしまえば、以降の書込み処理はマルチスレッドが複数の別個の領域に独立して書き込む事ができます。そのため、シーケンシャルに実行しなければならないクリティカルセクションは最小限に抑えられています。

また、クエリの最後まで読み出したにも関わらず256MBのバッファを埋められなかった場合は、隣接スレッドのバッファにマージされ、最終的には worker-0 のバッファにマージされますので、並列ワーカーの数を増やしたとしても Record Batch の大きさが極端に小さい(= PG-Stromでの実行効率が低下する)データが作られるわけではありません。


生成した Arrow ファイルをFDW経由で参照する

それでは、生成した Arrow ファイルを参照してみる事にします。
個々のテーブル定義(カラム名、データ型)を指定するのは面倒ですので、IMPORT FOREIGN SCHEMA文を使用するのがお勧めです。

ssbm=# import foreign schema f_lineorder from server arrow_fdw into public options (file '/opt/hoge/f_lineorder.arrow');
IMPORT FOREIGN SCHEMA
ssbm=# \d f_lineorder
                        Foreign table "public.f_lineorder"
       Column       |     Type      | Collation | Nullable | Default | FDW options
--------------------+---------------+-----------+----------+---------+-------------
 lo_orderkey        | numeric       |           |          |         |
 lo_linenumber      | integer       |           |          |         |
 lo_custkey         | numeric       |           |          |         |
 lo_partkey         | integer       |           |          |         |
 lo_suppkey         | numeric       |           |          |         |
 lo_orderdate       | integer       |           |          |         |
 lo_orderpriority   | character(15) |           |          |         |
 lo_shippriority    | character(1)  |           |          |         |
 lo_quantity        | numeric       |           |          |         |
 lo_extendedprice   | numeric       |           |          |         |
 lo_ordertotalprice | numeric       |           |          |         |
 lo_discount        | numeric       |           |          |         |
 lo_revenue         | numeric       |           |          |         |
 lo_supplycost      | numeric       |           |          |         |
 lo_tax             | numeric       |           |          |         |
 lo_commit_date     | character(8)  |           |          |         |
 lo_shipmode        | character(10) |           |          |         |
Server: arrow_fdw
FDW options: (file '/opt/hoge/f_lineorder.arrow')

試しに、SSBMのQ1_2を走らせてみます。

ssbm=# select sum(lo_extendedprice*lo_discount) as revenue
from f_lineorder, date1
where lo_orderdate = d_datekey
  and d_yearmonthnum = 199401
  and lo_discount between 4 and 6
  and lo_quantity between 26 and 35;
    revenue
---------------
 9624332170119
(1 row)

もちろん、Arrowファイルの元となったlineorderテーブルを使っても同じ値が返ってきます。

ssbm=# select sum(lo_extendedprice*lo_discount) as revenue
from lineorder, date1
where lo_orderdate = d_datekey
  and d_yearmonthnum = 199401
  and lo_discount between 4 and 6
  and lo_quantity between 26 and 35;
    revenue
---------------
 9624332170119
(1 row)

並列Pg2Arrowのパフォーマンス

最後に、もちろん気になる並列Pg2Arrowのパフォーマンスを見てみる事にします。

測定環境のスペックは以下の通り。

以下のオプションで pg2arrow を起動し、87GBのlineorderテーブルを全てArrow形式に変換するまでの時間を計測しました。

$ pg2arrow -d ssbm -c 'SELECT * FROM lineorder_unsort WHERE lo_orderkey % $(N_WORKERS) = $(WORKER_ID)' -o /opt/hoge/f_lineorder.arrow -n N_WORKERS --progress

まず緑の縦棒が並列実行なしのpg2arrowで87GBのlineorderテーブルをArrowに変換した時のもので、1,520秒、おおむね25分ちょい要しています。
これを-nオプションで並列数を増やした場合、およそ400秒(6分40秒)を少し下回った辺りで頭打ちとなっているようです。

これは検索条件(lo_orderkey % $(N_WORKERS) = $(WORKER_ID))で重複行を排除しているため、クライアント数が増加するにしたがって、同時に lineorder テーブルを重複してスキャンしなければならなくなり、トータルのバッファからの読出し負荷が増えてしまったためと言えるでしょう。

一方、パーティション子テーブルの名前を一部を-kオプションで置き換え、それぞれのクライアントが完全に独立した領域をスキャンする事になったパターンでは、並列度が7である(しかもlineorder__p1998の容量は他の子テーブルの半分なので、実質6.5並列)にも関わらず、266秒と並列実行なしのパターンに比べて5.7倍の実行時間を記録しています。

この事から、並列Pg2Arrowのパフォーマンス向上を享受するには、以下の点に注意を払う必要があるでしょう。

  • 単純に並列度を増やすだけでも効果はあるが、同じ領域を重複してスキャンする処理が増えると、効果は限定的になりがち。
  • PostgreSQLがスキャンする量を減らすような検索条件の与え方が望ましい。パーティションなどで分割されていたら理想的。

*1:もっと早く寝ろw