Apache Arrowの統計情報を使ったログ検索の爆速化

PostgreSQLにはBRINインデックス(Block Range Index)という機能があり、ログデータに付属するタイムスタンプ値など、近しい値を持ったデータが物理的に近接するという特徴を持っているとき、検索範囲を効率的に絞り込むために使用する事ができる。

この機能はPG-Stromでも対応しており、その詳細は以前のエントリでも解説している。

kaigai.hatenablog.com

かいつまんで説明すると、時系列のログデータのように大半が追記(Insert-Only)であり、かつタイムスタンプ値のように近しい値同士が近接している場合、1MBのブロック((pages_per_rangeがデフォルトの128の場合、8kB * 128 = 1MB))ごとにその最小値/最大値を記録しておくことで『明らかに検索条件にマッチしない範囲』を読み飛ばす事ができる。

例えば以下の例であれば、WHERE ymd BETWEEN '2016-01-01' AND '2016-12-31'という検索条件は、ブロック内に含まれる個々の行をチェックしなくとも、最小値・最大値を参照すれば1番目と5番目のブロックに該当する行が無いのは自明である。そのため、検索条件にマッチする可能性のある2,3,4番目のブロックのみを読み出せばよい、というのがBRINインデックスの考え方である。

なお、B-treeインデックスとは異なり、BRINインデックスは個々の行を指し示すデータを持っていないため、1MBのブロック単位で読む/読まないを決めた後は、全件スキャンと同じ処理となる。

Apache Arrowに統計情報を埋め込む

話を Apache Arrow 形式に戻す。

Apache Arrow は列フォーマットの構造化データ形式であるが、例えば『このArrowファイルは1億件のデータを保持している』と言っても、単純に要素数1億の配列が並んでいるわけではない。
内部的にはRecord-Batchと呼ばれるブロックが複数並んだ構造となっており、このブロックの内側に、例えば1億件のうち100万件といったより小規模なデータの配列を保持している。(こうしたデータ構造を取る事により、より追記を行いやすくするという狙いがあるものと思われる。)

ファイルの末尾にはフッタ(Footer)があり、ここを読めば、ファイルのどこにRecord-Batchが配置されているかが分かる。
それと同時に、フッタにはデータ構造の定義を記述する Schema Definition の部分がある。これは基本的にはファイル先頭の Schema Definition のコピーであるものの、custom_metadataフィールドに独自のKey-Value値を埋め込んで書き込む程度の事は認められているようである。

このcustom_metadataフィールドは、テーブルだけでなく、カラムにも付与する事ができる。
そうすると、どういった使い方ができるか。フィールド毎のKey-Value値として、Record-Batchごとの最小値/最大値の配列(以下、統計情報)をApache Arrowファイルに埋め込む事ができる。

しかもこれはフッタ領域なので、Apache Arrowファイルを追記して Record-Batch を更新するたびに、統計情報をアップデートして書き直す事までできる。Yeah!!

pg2arrow の --stat オプション

早速、統計情報付きのApache Arrowファイルを生成してみる事にする。
これにはpg2arrowコマンドの--statオプションを使用する事ができ、統計情報を埋め込む対象列を指定する。現在のところ、Int、FloatingPoint、Decimal、Date、Time、Timestampの各データ型に対応している。

$ pg2arrow -d postgres -o /dev/shm/flineorder_sort.arrow -t lineorder_sort --stat=lo_orderdate --progress
RecordBatch[0]: offset=1640 length=268437080 (meta=920, body=268436160) nitems=1303085
RecordBatch[1]: offset=268438720 length=268437080 (meta=920, body=268436160) nitems=1303085
RecordBatch[2]: offset=536875800 length=268437080 (meta=920, body=268436160) nitems=1303085
RecordBatch[3]: offset=805312880 length=268437080 (meta=920, body=268436160) nitems=1303085
RecordBatch[4]: offset=1073749960 length=268437080 (meta=920, body=268436160) nitems=1303085
RecordBatch[5]: offset=1342187040 length=268437080 (meta=920, body=268436160) nitems=1303085
RecordBatch[6]: offset=1610624120 length=268437080 (meta=920, body=268436160) nitems=1303085
RecordBatch[7]: offset=1879061200 length=268437080 (meta=920, body=268436160) nitems=1303085
RecordBatch[8]: offset=2147498280 length=268437080 (meta=920, body=268436160) nitems=1303085
RecordBatch[9]: offset=2415935360 length=55668888 (meta=920, body=55667968) nitems=270231
$ ls -lh /dev/shm/flineorder_sort.arrow
-rw-r--r--. 1 kaigai users 2.4G  7月 19 10:45 /dev/shm/flineorder_sort.arrow

上記の例は、タイムスタンプに相当するlo_orderdate列でソート済みの*1テーブルをダンプし、Apache Arrowファイルとして保存したもの。

生のメタデータを可読な形式でダンプしてみると、確かにlo_orderdate列のフィールド定義にmin_valuesおよびmax_valuesメタデータが埋め込まれ、それぞれ19920101とか19920919とか「それっぽい」値が見えている。

なお、custom-metadataを用いたデータの埋め込みはApache Arrow形式で認められているものなので、例えば、PG-Stromのコードベースと全く関係のない PyArrow を使って上記のファイルを読み出す場合でも、きちんとメタデータとして認識される。
(特にスキャンの最適化などにも使われないが)

$ python3
Python 3.6.8 (default, Aug 24 2020, 17:57:11)
[GCC 8.3.1 20191121 (Red Hat 8.3.1-5)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyarrow as pa
>>> X = pa.RecordBatchFileReader('/dev/shm/flineorder_sort.arrow')
>>> X.schema
lo_orderkey: decimal(30, 8)
lo_linenumber: int32
lo_custkey: decimal(30, 8)
lo_partkey: int32
lo_suppkey: decimal(30, 8)
lo_orderdate: int32
  -- field metadata --
  min_values: '19920101,19920919,19930608,19940223,19941111,19950730,1996' + 31
  max_values: '19920919,19930608,19940223,19941111,19950730,19960417,1997' + 31
lo_orderpriority: fixed_size_binary[15]
lo_shippriority: fixed_size_binary[1]
lo_quantity: decimal(30, 8)
lo_extendedprice: decimal(30, 8)
lo_ordertotalprice: decimal(30, 8)
lo_discount: decimal(30, 8)
lo_revenue: decimal(30, 8)
lo_supplycost: decimal(30, 8)
lo_tax: decimal(30, 8)
lo_commit_date: fixed_size_binary[8]
lo_shipmode: fixed_size_binary[10]
-- schema metadata --
sql_command: 'SELECT * FROM lineorder_sort'

統計情報を使って Apache Arrow のスキャンを最適化する

さて、埋め込んだ統計情報も、集計処理の段でこれを参照しなければカピバラに小判、カピバラに真珠である。

という事で、Apache ArrowファイルをPostgreSQLからスキャンするためのArrow_Fdwに統計情報を参照するための機能を付加してみた。
検索条件が統計情報を含む列を参照し、その大小関係に基づく絞り込みを行う場合、統計情報に基づいて『明らかに無駄なRecord-Batch』の読み出しをスキップする。

postgres=# IMPORT FOREIGN SCHEMA flineorder_sort
           FROM SERVER arrow_fdw INTO public
           OPTIONS (file '/dev/shm/flineorder_sort.arrow');
IMPORT FOREIGN SCHEMA

postgres=# EXPLAIN ANALYZE
           SELECT count(*) FROM flineorder_sort
            WHERE lo_orderpriority = '2-HIGH'
              AND lo_orderdate BETWEEN 19940101 AND 19940630;

                                                                 QUERY PLAN
------------------------------------------------------------------------------------------
 Aggregate  (cost=33143.09..33143.10 rows=1 width=8) (actual time=115.591..115.593 rows=1loops=1)
   ->  Custom Scan (GpuPreAgg) on flineorder_sort  (cost=33139.52..33142.07 rows=204 width=8) (actual time=115.580..115.585 rows=1 loops=1)
         Reduction: NoGroup
         Outer Scan: flineorder_sort  (cost=4000.00..33139.42 rows=300 width=0) (actual time=10.682..21.555 rows=2606170 loops=1)
         Outer Scan Filter: ((lo_orderdate >= 19940101) AND (lo_orderdate <= 19940630) AND (lo_orderpriority = '2-HIGH'::bpchar))
         Rows Removed by Outer Scan Filter: 2425885
         referenced: lo_orderdate, lo_orderpriority
         Stats-Hint: (lo_orderdate >= 19940101), (lo_orderdate <= 19940630)  [loaded: 2, skipped: 8]
         files0: /dev/shm/flineorder_sort.arrow (read: 217.52MB, size: 2357.11MB)
 Planning Time: 0.210 ms
 Execution Time: 153.508 ms
(11 rows)

postgres=# SELECT count(*) FROM flineorder_sort
            WHERE lo_orderpriority = '2-HIGH'
              AND lo_orderdate BETWEEN 19940101 AND 19940630;
 count
--------
 180285
(1 row)

上の実行計画は、先ほどのApache ArrowファイルをPostgreSQLから外部テーブルとして参照できるようにしたflineorder_sortに、lo_orderdate列とそれ以外の検索条件を付加して検索したものである。

実行計画のStats-Hintを見ると、lo_orderdate BETWEEN 19940101 AND 19940630を展開した(lo_orderdate >= 19940101), (lo_orderdate <= 19940630)という条件を用いて不要Record-Batchの読み飛ばしを行い、全体で10個のRecord-Batchがある中で8個をスキップ、2個だけを読み出したと出力されている。

その結果、読み出した2個のRecord-BatchをGPUで評価し、2425885行を除去して残りが180285行であると集計している。

統計情報を使わない場合はどうなるのか?
以下の実行結果をご覧頂きたい。

postgres=# SET arrow_fdw.stats_hint_enabled = off;
SET
postgres=# EXPLAIN ANALYZE
           SELECT count(*) FROM flineorder_sort
            WHERE lo_orderpriority = '2-HIGH'
              AND lo_orderdate BETWEEN 19940101 AND 19940630;

                                                                 QUERY PLAN
------------------------------------------------------------------------------------------
 Aggregate  (cost=33143.09..33143.10 rows=1 width=8) (actual time=185.985..185.986 rows=1 loops=1)
   ->  Custom Scan (GpuPreAgg) on flineorder_sort  (cost=33139.52..33142.07 rows=204 width=8) (actual time=185.974..185.979 rows=1 loops=1)
         Reduction: NoGroup
         Outer Scan: flineorder_sort  (cost=4000.00..33139.42 rows=300 width=0) (actual time=10.626..100.734 rows=11997996 loops=1)
         Outer Scan Filter: ((lo_orderdate >= 19940101) AND (lo_orderdate <= 19940630) AND (lo_orderpriority = '2-HIGH'::bpchar))
         Rows Removed by Outer Scan Filter: 11817711
         referenced: lo_orderdate, lo_orderpriority
         files0: /dev/shm/flineorder_sort.arrow (read: 217.52MB, size: 2357.11MB)
 Planning Time: 0.186 ms
 Execution Time: 231.942 ms
(10 rows)

postgres=# SELECT count(*) FROM flineorder_sort
            WHERE lo_orderpriority = '2-HIGH'
              AND lo_orderdate BETWEEN 19940101 AND 19940630;
 count
--------
 180285
(1 row)

arrow_fdw.stats_hint_enabledにoffを設定すると、統計情報を使わなくなる。
その場合、Rows Removed by Outer Scan Filterが2,425,885から11,817,711へと増加しているが、条件に該当しない行を除いた最終結果は180285で同一である。
つまり、”明らかにマッチする行が一つもない” Record-Batch を読み出した挙句、検索条件をGPUで評価してフィルタしているワケである。大変にご苦労様なことである。

サイズの大きなApache Arrowファイルによるベンチマーク

数GB程度のスケールだと中々差が見えないので、60億件規模のlineorderテーブル((ログデータに似た構造とするため、予めlo_orderdateでソートしておいた))を検索するワークロードで、クエリの応答時間を比較してみた。
元のPostgreSQLテーブルでは875GBあり、これをApache Arrowにダンプすると682GBあった*2

単純にストレージからの読み出しの部分だけに絞って考えても、PostgreSQLテーブル(Heap)のような行データであれば、集計処理を行うために全てのデータを読み出さねばならない。
一方、Apache Arrowのような列形式データの場合は被参照列のみ読み出せば集計処理を行う事ができるため、相対的にI/O量が小さくなり、それが高速な処理速度の要因のひとつとなっている。

これに加えて、予め収集した統計情報を元にRecord-Batchを読み飛ばすという事は、列方向に加えて行方向に関してもI/O量を小さくするという事と同義である。どの程度、集計クエリの応答速度が短くなるのか実測してみた。

測定パターンの全てで概ね妥当なクエリ実行計画を生成するサンプルとして、Star Schema BenchmarkのQ1_2をピックアップしてみた。

select sum(lo_extendedprice*lo_discount) as revenue
  from flineorder_sort, date1
 where lo_orderdate = d_datekey
   and d_yearmonthnum = 199401
   and lo_discount between 4 and 6
   and lo_quantity between 26 and 35;

この人は、lo_orderdateが『1994年1月』のデータを抽出するように作られている。
ただ、これはdate1テーブルとのJOINによって記述されており、そのままでは統計情報を用いて不要Record-Batchをスキップする形には落ちてくれない。そのため、以下のように一部クエリを修正して条件を付加する事にした。

select sum(lo_extendedprice*lo_discount) as revenue
  from flineorder_sort, date1
 where lo_orderdate = d_datekey
   and d_yearmonthnum = 199401
   and lo_discount between 4 and 6
   and lo_quantity between 26 and 35;
   and lo_orderdate between 19940101 and 19940131;

その結果が、以下のグラフである。縦軸は Q1_2 クエリの応答時間であるので、結果が小さいほど性能が良いという事ができる。
また、行データをスキャンするパターン(青、橙)に関しては、Apache Arrowファイルをマップした flineorder_sort テーブルではなく、その元になった lineorder_sort テーブルをスキャンした。

ご覧の通り、I/O量の低減が功を奏してかなり極端な結果が出ている。
スケールが違いすぎて分かりにくいが、PostgreSQL v13.3で254秒を要した60億件のレコードのスキャンが、GPUDirect SQLApache Arrow、および統計情報による読み飛ばしの効果により1.2秒弱で終わっている。

以下のEXPLAIN ANALYZE結果を見ると、全体で2725個のRecord-Batchを含み、その大半(2688個)は検索条件に該当する行ナシとして捨ててしまっている。GPUが高速とはいえ、そもそも「何もしない」に勝る結果は無いのであるから、妥当な結果と言えるだろう。

postgres=# explain analyze
select sum(lo_extendedprice*lo_discount) as revenue
  from flineorder_sort, date1
 where lo_orderdate = d_datekey
    and d_yearmonthnum = 199401
    and lo_discount between 4 and 6
    and lo_quantity between 26 and 35
    and lo_orderdate between 19940101 and 19940131;
                                                                                           QUERY PLAN

------------------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate  (cost=15117511.53..15117511.54 rows=1 width=8) (actual time=1042.000..1092.485 rows=1 loops=1)
   ->  Gather  (cost=15117511.31..15117511.52 rows=2 width=8) (actual time=846.545..1092.475 rows=3 loops=1)
         Workers Planned: 2
         Workers Launched: 2
         ->  Partial Aggregate  (cost=15116511.31..15116511.32 rows=1 width=8) (actual time=781.303..781.308 rows=1 loops=3)
               ->  Hash Join  (cost=25060.71..15116511.29 rows=4 width=8) (actual time=435.810..719.086 rows=1404451 loops=3)
                     Hash Cond: (flineorder_sort.lo_orderdate = date1.d_datekey)
                     ->  Parallel Custom Scan (GpuScan) on flineorder_sort  (cost=24981.37..15116431.13 rows=312 width=12) (actual time=435.301..553.115 rows=1404451 loops=3)
                           GPU Filter: ((lo_discount >= 4) AND (lo_discount <= 6) AND (lo_quantity >= 26) AND (lo_quantity <= 35) AND (lo_orderdate >= 19940101) AND (lo_orderdate <= 1
9940131))
                           Rows Removed by GPU Filter: 77197339
                           GPU Preference: GPU0 (NVIDIA A100-PCIE-40GB) with GPUDirect SQL
                           referenced: lo_orderdate, lo_quantity, lo_extendedprice, lo_discount
                           Stats-Hint: (lo_orderdate >= 19940101), (lo_orderdate <= 19940131)  [loaded: 37, skipped: 2688]
                           files0: /opt/nvme0/flineorder_sort.arrow (read: 89.37GB, size: 681.05GB)
                     ->  Hash  (cost=78.95..78.95 rows=31 width=4) (actual time=0.472..0.473 rows=31 loops=3)
                           Buckets: 1024  Batches: 1  Memory Usage: 10kB
                           ->  Seq Scan on date1  (cost=0.00..78.95 rows=31 width=4) (actual time=0.140..0.464 rows=31 loops=3)
                                 Filter: (d_yearmonthnum = '199401'::numeric)
                                 Rows Removed by Filter: 2525
 Planning Time: 4.661 ms
 Execution Time: 1190.959 ms
(21 rows)

PostgreSQLパーティションとの比較

最後に、タイムスタンプ等を用いたデータ分散と集計処理の際の読み飛ばしという点で、類似する機能を有する PostgreSQL パーティションについても考察する。

PostgreSQLパーティションの場合、親テーブルと全く同一のスキーマ定義を持つ子テーブルに、他の子テーブルと重複しないようパーティションキー値の取り得る範囲を設定する。
通常のPostgreSQLテーブルの場合、これはINSERT/UPDATE/DELETE処理の際に自動的に子テーブル側の更新処理に振り分けられ、テーブルの内容に矛盾が生じる事はない。
一方、Apache Arrowファイルをマッピングする場合には、パーティション子テーブルとして振舞うArrow_Fdw外部テーブルに紐づいたApache Arrowファイルの内容に責任を持つのは、ユーザやDB管理者の役割である。

ログを保存するApache Arrowファイルを特定の時刻にバチッと切り替えるなら対応可能かもしれないが、例えば、一定のサイズに達したApache Arrowファイルは一旦クローズして新しいファイルを作ったり、非同期でマルチスレッドが次々とデータを書き出すために、明確なタイムスタンプの区切りを設けるのが難しい場合、パーティション設定と辻褄を合わせるのは中々難しい作業となる。

最小/最大値の統計情報であれば、機械的Apache Arrow ファイルに埋め込まれるだけなので、明確な境界値を設ける必要はなく、また、各Apache Arrowファイルに含まれるタイムスタンプ値に関しても、これを管理者が意識する必要はない。
つまり、どこかのディレクトリを『時系列ログをApache Arrow形式で放り込む場所』と決めてしまいさえすれば、あとはポンポンとファイルをコピーするだけで、それを分析する事ができるようになるのである。

現在のところ、Apache Arrowファイルに最小/最大値の統計情報を埋め込む事ができるのはpg2arrowだけであるが、Apache Arrowファイルの構造上、末尾のフッタ部分だけを書き換えれば統計情報を付加する事ができるようになるの。
したがって、他のツールによって生成されたファイルであっても、簡単なバッチ処理で統計情報を付加できるようにする事ができるハズであるし、そういったツールは追って提供する事としたい。

*1:ジェネレータで生成したデータはランダム分布なので、ログデータのようなタイムスタンプの局所性を満たさない

*2:行ごとのヘッダ

GPUDirect SQL on NFS-over-RDMAを試す

タイトルでほぼほぼ出オチですが、先日、NVIDIAからCUDA Toolkit 11.4と共にリリースされた新機能GPUDirect Storage 1.0のドキュメントを読んでいると、面白い記述を見つけた。

曰く、MOFEDドライバ5.3以降と、Mellanox Connect-X4/5の組み合わせで、NFS-over-RDMAとGPUDirect Storageを組み合わせ、リモートのNFS区画からローカルのGPUへと直接のデータ転送を行う事ができるようになる、と。

14.10. NFS Support with GPUDirect Storage
This section provides information about NFS support with GDS.

14.10.2. Install GPUDirect Storage Support for the NFS Client
Here is some information about installing GDS support for the NFS client.
To install a NFS client with GDS support complete the following steps:
Note: The client must have a Mellanox connect-X4/5 NIC with MLNX_OFED 5.3 or later installed.
:

結構な事である。
PG-Strom v3.0以前では、ローカルのNVME-SSDまたはリモートのNVME-oF区画(実験的)を Ext4 ファイルシステムで初期化したパターンに限って GPUDirect SQL が対応していたため、

  • 段階的にストレージを拡張するのに困難を伴った。
  • 共有ファイルシステムではないので、複数台のノードから書き込みができなかった。

という課題があった。NFS自体はものすごく高速なファイルシステム、というワケではないが、DB/GPUサーバからストレージを分離し、かつ複数のノードから書き込みができるのであれば、例えば、IoT/M2M系のワークロードでログデータを収集し、これをNFSサーバ上に置いておきさえすれば、DB/GPUサーバからこれを参照してGPUDirect SQLの処理スピードでもってコレを分析する事ができる。


結論:結構イケてる

セットアップ手順などは長くなるので後回しにするとして、ひとまずSSBM (Star Schema Benchmark) の結果を一言でまとめると「結構イケてる」という印象。

測定環境は以下の図の通りで、今回は1UサーバのSYS-1019GP-TTにNFSサーバになってもらった。この人には、エンクロージャ経由でNVME-SSDIntel DC P4510[1.0TB; U.2])を4台接続し、また Mellanox Connect-X5 という100Gb-NIC を接続している。
GPU/DBサーバには4UのSYS-4029GP-TRTを使い、この人には、同じPCI-Eスイッチの配下にGPUとConnect-X5を接続したペアと、もう一つGPUとNVME-SSD(同 DC P4510)を4台接続したペアを作った。これはローカルNVME-SSDとの性能比較用である。

NFSサーバは、SSD x4台をmd-raid0でストライピングした区画をNFSクライアントにエクスポートし、NFSクライアントは直結の100Gbネットワーク*1を介して、これをNFS-over-RDMAモードでマウント。

GPU/DBサーバ側では以下のようなストレージ構成となっている。
/opt/nvme0には、ローカルのNVME-SSD x4台をmd-raid0でストライピングした区画をマウント、/opt/nvme1には、1Uサーバ(192.168.80.106)のNFS区画が見えている。

[kaigai@kujira ~]$ df -h
Filesystem                   Size  Used Avail Use% Mounted on
devtmpfs                      94G     0   94G   0% /dev
tmpfs                         94G  257M   94G   1% /dev/shm
tmpfs                         94G   19M   94G   1% /run
tmpfs                         94G     0   94G   0% /sys/fs/cgroup
/dev/mapper/vg_disk-root     246G   15G  218G   7% /
/dev/nvme0n1p1               1.8T   35G  1.7T   2% /opt
/dev/md0p1                   3.6T  1.4T  2.1T  41% /opt/nvme0
/dev/sda2                    976M  189M  721M  21% /boot
/dev/mapper/vg_disk-home     393G   24G  349G   7% /home
/dev/sda1                    599M  6.9M  592M   2% /boot/efi
tmpfs                         19G     0   19G   0% /run/user/1000
192.168.80.106:/mnt/nfsroot  2.0T  1.2T  697G  64% /opt/nvme1

で、それぞれの区画に保持されているlineorderテーブルへの参照を含むSSBMクエリの実行速度は以下の通り。
分かりやすいように、(総DBサイズ)÷(クエリ応答時間)で導出した『クエリ処理スループット』で表記している。

見ての通り、ローカルのNVME-SSDに比べるとNFS-over-RDMAは1割程度遅いと*2言えるが、これは、1割程度遅いだけでストレージの拡張性やリモートアクセスといった特性を得られるという事を意味する。

クエリ実行中のストレージからの読み出し速度を見てみても、クエリ実行中の100Gbのネットワークで8.0GB/s強を出せているので、まずまずのパフォーマンスと言える。
なお、ローカルのNVME-SSDの場合、後半で突然読み出し速度が増しで10.0GB/s程度まで増速しているが、これについては現時点で謎である…。


結論

  • PG-StromのGPUDirect SQLNFS-over-RDMAの併用、低コストのログ集積&分析基盤としては結構アリかもよ。
  • ログデータを Apache Arrow 形式で書き込んでおけば、データをインポートする必要すらなくなります。

NFS-over-RDMAのセットアップ手順

NFS-over-RDMAのセットアップ手順は、以下のブログを参考にした…というか、ほとんどそのまま。
https://community.mellanox.com/s/article/howto-configure-nfs-over-rdma--roce-x

ソフトウェアの構成はざっくり以下の通り

  • CentOS 8.3 (kernel-4.18.0-240.22.1.el8_3.x86_64)
  • CUDA Toolkit 11.4 (NVIDIA Driver R470.42.01)
  • MOFED 5.3-1.0.0.1 (RHEL8.3; x86_64)
  • PostgreSQL v13.3 (PG-Strom v3.0-3)

MOFEDOドライバのインストール

まず、MellanoxのサイトからMOFEDドライバの最新版をダウンロードする。

[Version]->[OS Distribution]->[OS Distribution Version]->[Architecture]と選択していくと、バイナリパッケージを含む tgz のパッケージと、ソースコードの tgz パッケージの両方が表示されるので、両方ともダウンロード。実はソースコードも後で使います。

tgzファイルをダウンロードすると、まず GPUDirect Storage のドキュメント通りにドライバのインストールを行う。
途中、不足するパッケージがある場合には、インストールスクリプトがサジェスト通りに`dnf install ...`すればよいので、その通りに進めればMOFEDドライバのインストールは行えるはず。

$ sudo ./mlnxofedinstall --with-nvmf --with-nfsrdma --enable-gds --add-kernel-support
Note: This program will create MLNX_OFED_LINUX TGZ for rhel8.3 under /tmp/MLNX_OFED_LINUX-5.3-1.0.0.1-4.18.0-240.22.1.el8_3.x86_64 directory.
See log file /tmp/MLNX_OFED_LINUX-5.3-1.0.0.1-4.18.0-240.22.1.el8_3.x86_64/mlnx_iso.225746_logs/mlnx_ofed_iso.225746.log

Checking if all needed packages are installed...
Building MLNX_OFED_LINUX RPMS . Please wait...
    :
  <snip>
    :
$ sudo dracut -f
$ sudo shutdown -r now

これを、NFSサーバ側、NFSクライアント側の両方で行い、システムを再起動。

NFSサーバの設定

1UサーバのSYS-1019GP-TT側では、ローカルのNVME-SSDを4本束ねたmd-raid0区画を`/mnt/nfsroot`にマウントしている。
これを以下の手順でNFS-over-RDMA区画としてエクスポートする。

1. IPアドレス他ネットワーク設定

今回は安直に192.168.80.0/24を直結用のネットワークとして使用。
静的に192.168.80.106/24をConnect-X5デバイスに設定し、MTU=9000でNICを有効化しました。

2. /etc/exportsを記述。特にセキュリティとか何も考えてない設定です。

# cat /etc/exports
/mnt/nfsroot *(rw,async,insecure,no_root_squash)

3. RDMA Transport Kernel Moduleをロード。これはMOFEDドライバによって提供されるモジュール。

# modprobe svcrdma
# modinfo svcrdma
filename:       /lib/modules/4.18.0-240.22.1.el8_3.x86_64/extra/mlnx-nfsrdma/svcrdma.ko
version:        2.0.1
license:        Dual BSD/GPL
description:    svcrdma dummy kernel module
author:         Alaa Hleihel
rhelversion:    8.3
srcversion:     F7C50654667EBC6F832D608
depends:        mlx_compat
name:           svcrdma
vermagic:       4.18.0-240.22.1.el8_3.x86_64 SMP mod_unload modversions

4. NFSサーバを起動

# systemctl start nfs-server

5. RDMA転送用のポート番号を設定。一応、任意のポート番号を使用できるが、20049というのがwell-known defaultとのこと。

# echo rdma 20049 > /proc/fs/nfsd/portlist
# cat /proc/fs/nfsd/portlist
rdma 20049
rdma 20049
tcp 2049
tcp 2049

NFSクライアントの設定

1. IPアドレス他ネットワーク設定

サーバー側と同様、静的に192.168.80.108/24をConnect-X5デバイスに設定し、MTU=9000でNICを有効化しました。
ネットワークの有効化が終わったら、pingなどで導通確認。

$ ping 192.168.80.106
PING 192.168.80.106 (192.168.80.106) 56(84) bytes of data.
64 bytes from 192.168.80.106: icmp_seq=1 ttl=64 time=0.178 ms
64 bytes from 192.168.80.106: icmp_seq=2 ttl=64 time=0.197 ms
^C

2. クライアント側のRDMA Transport Kernel Moduleをロード。これもMOFEDドライバに含まれるモジュール。

# modprobe rpcrdma
# modinfo rpcrdma
filename:       /lib/modules/4.18.0-240.22.1.el8_3.x86_64/extra/mlnx-nfsrdma/rpcrdma.ko
alias:          xprtrdma
alias:          svcrdma
license:        Dual BSD/GPL
description:    RPC/RDMA Transport
author:         Open Grid Computing and Network Appliance, Inc.
rhelversion:    8.3
srcversion:     EFB4ED2B09C65AA7DA8D887
depends:        ib_core,sunrpc,mlx_compat,rdma_cm
name:           rpcrdma
vermagic:       4.18.0-240.22.1.el8_3.x86_64 SMP mod_unload modversions

3. 前節でエクスポートしたNFS区画をマウント

# mount -o rdma,port=20049 192.168.80.106:/mnt/nfsroot /opt/nvme1
# df -h
Filesystem                   Size  Used Avail Use% Mounted on
devtmpfs                      94G     0   94G   0% /dev
tmpfs                         94G  257M   94G   1% /dev/shm
tmpfs                         94G   19M   94G   1% /run
tmpfs                         94G     0   94G   0% /sys/fs/cgroup
/dev/mapper/vg_disk-root     246G   15G  218G   7% /
/dev/nvme0n1p1               1.8T   35G  1.7T   2% /opt
/dev/md0p1                   3.6T  1.4T  2.1T  41% /opt/nvme0
/dev/sda2                    976M  189M  721M  21% /boot
/dev/mapper/vg_disk-home     393G   24G  349G   7% /home
/dev/sda1                    599M  6.9M  592M   2% /boot/efi
tmpfs                         19G     0   19G   0% /run/user/1000
192.168.80.106:/mnt/nfsroot  2.0T  1.2T  697G  64% /opt/nvme1

これで準備完了。
導通確認を兼ねて、巨大なファイルの転送を行ってみる。

# dd if=/opt/nvme1/100GB of=/dev/null iflag=direct bs=32M
3106+1 records in
3106+1 records out
104230305696 bytes (104 GB, 97 GiB) copied, 11.8926 s, 8.8 GB/s

これは速い! 8.8GB/s も出ている。

一方、NFS-over-RDMAを使わないパターンだと。

# mount 192.168.80.106:/mnt/nfsroot /mnt/
# dd if=/mnt/100GB of=/dev/null iflag=direct bs=32M
3106+1 records in
3106+1 records out
104230305696 bytes (104 GB, 97 GiB) copied, 32.6171 s, 3.2 GB/s

御意。

GPUDirect StorageでNFS区画⇒GPUへの直接Readを行う

続いて本番。GPUDirect Storageを使って、リモートのNFS区画からGPUへの直接Readを行う。

今現在、NFS区画からGPUDirect Storageによる直接読み出しが可能な状態になっているかどうか、CUDA 11.4に添付のgdscheckというコマンドで確認する事ができる。。。。が、あらら。Unsupportedと表示されている。

# /usr/local/cuda/gds/tools/gdscheck -p
 GDS release version: 1.0.0.82
 nvidia_fs version:  2.7 libcufile version: 2.4
 ============
 ENVIRONMENT:
 ============
 =====================
 DRIVER CONFIGURATION:
 =====================
 NVMe               : Supported
 NVMeOF             : Supported
 SCSI               : Unsupported
 ScaleFlux CSD      : Unsupported
 NVMesh             : Unsupported
 DDN EXAScaler      : Unsupported
 IBM Spectrum Scale : Unsupported
 NFS                : Unsupported
 WekaFS             : Unsupported
 Userspace RDMA     : Unsupported
 --Mellanox PeerDirect : Enabled
 --rdma library        : Not Loaded (libcufile_rdma.so)
 --rdma devices        : Not configured
 --rdma_device_status  : Up: 0 Down: 0
        :

これは2時間くらいかけて調べたところ、どうやら、MOFEDドライバでバイナリ配布されているrpcrdmaモジュールでGPUDirect Storage対応のコードが有効化されないままビルド、配布されてしまっているという事のようである。

MOFEDドライバのソースコードを見てみると、もしCONFIG_GPU_DIRECT_STORAGE=yつきでビルドされているのであれば、/proc/kallsymsnvfs_opsという関数ポインタ表が出現してしかるべきであるのだが、それが出現していない。

# grep nvfs_ops /proc/kallsyms
ffffffffc0c256c0 b nvfs_ops     [nvme_rdma]
ffffffffc00dc718 b nvfs_ops     [nvme]

という事で、当該モジュールを野良ビルドしてみる事にする。
(なお、NVIDIAの開発チームにはエスカレーション済み。Mellanoxへも展開してくれるでしょう。)

ソースコードの tgz には SRPM が含まれているので、rpcrdmaモジュールを含むmlnx-nfsrdmaSRPMを展開し、これにCONFIG_GPU_DIRECT_STORAGE=yを付加してビルドする。

これをinsmodしてみると、rpcrdmaモジュールにもnvfs_opsシンボルがエクスポートされているのがわかる。

$ wget http://www.mellanox.com/downloads/ofed/MLNX_OFED-5.3-1.0.0.1/MLNX_OFED_SRC-5.3-1.0.0.1.tgz
$ tar zxvf MLNX_OFED_SRC-5.3-1.0.0.1.tgz
$ cd MLNX_OFED_SRC-5.3-1.0.0.1
$ rpm2cpio SRPMS/mlnx-nfsrdma-5.3-OFED.5.3.0.3.8.1.src.rpm | cpio -idu
$ tar zxvf mlnx-nfsrdma-5.3.tgz
$ cd mlnx-nfsrdma-5.3
$ make CONFIG_GPU_DIRECT_STORAGE=y
$ sudo insmod rpcrdma.ko
$ sudo grep nvfs_ops /proc/kallsyms
ffffffffc319ddc8 b nvfs_ops     [rpcrdma]
ffffffffc0c256c0 b nvfs_ops     [nvme_rdma]
ffffffffc00dc718 b nvfs_ops     [nvme]

この状態で、再度gdscheckコマンドを実行してみると。

$ /usr/local/cuda/gds/tools/gdscheck -p
 GDS release version: 1.0.0.82
 nvidia_fs version:  2.7 libcufile version: 2.4
 ============
 ENVIRONMENT:
 ============
 =====================
 DRIVER CONFIGURATION:
 =====================
 NVMe               : Supported
 NVMeOF             : Supported
 SCSI               : Unsupported
 ScaleFlux CSD      : Unsupported
 NVMesh             : Unsupported
 DDN EXAScaler      : Unsupported
 IBM Spectrum Scale : Unsupported
 NFS                : Supported
 WekaFS             : Unsupported
 Userspace RDMA     : Unsupported
 --Mellanox PeerDirect : Enabled
 --rdma library        : Not Loaded (libcufile_rdma.so)
 --rdma devices        : Not configured
 --rdma_device_status  : Up: 0 Down: 0
        :

イヤッホゥゥゥゥ!!!

早速、GPUDirect StorageのRaw-I/O性能を測定してみる事にする。

$ /usr/local/cuda/gds/tools/gdsio -x 0 -f /mnt/100GB -d 1 -s 96G -i 16M -w 6
IoType: READ XferType: GPUD Threads: 6 DataSetSize: 63143936/100663296(KiB) IOSize: 16384(KiB) Throughput: 7.642794 GiB/sec, Avg_Latency: 12874.833807 usecs ops: 3854 total_time 7.879154 secs

イヤッホゥゥゥゥ!!!

サーバ機材は有り合わせなので、もしかするとSkylake-SP内蔵のPCI-Eコントローラで詰まっているかも(帯域的にはそんな感じがしないでもない)しれないが、NFSという言葉から受ける印象とはずいぶん違ったレベルのパフォーマンスを出しているように見える。

さて、それでは、最も重要な PG-Strom でGPUDirect SQLを用いた場合のパフォーマンスを計測してみる事にする。
(⇒先頭に戻る)

8/21追記:5.4-1.0.3.0 ドライバでは直ってた

上記、rpcrdmaモジュールがGPUDirect Storage対応でビルドされていなかった問題ですが、本エントリを書いた時点のMOFEDドライバ(5.3-1.0.0.1)ではなく、最新の 5.4-1.0.3.0 を使用すれば GPUDirect Storage 関連の機能を有効にしてビルドされるようです。

ドライバ標準のインストールスクリプトを実行しただけの状態で

[root@magro ~]# modinfo rpcrdma
filename:       /lib/modules/4.18.0-305.12.1.el8_4.x86_64/extra/mlnx-nfsrdma/rpcrdma.ko
alias:          xprtrdma
alias:          svcrdma
license:        Dual BSD/GPL
description:    RPC/RDMA Transport
author:         Open Grid Computing and Network Appliance, Inc.
rhelversion:    8.4
srcversion:     6144CA5B71903B01293DD5F
depends:        ib_core,sunrpc,mlx_compat,rdma_cm
name:           rpcrdma
vermagic:       4.18.0-305.12.1.el8_4.x86_64 SMP mod_unload modversions
[root@magro ~]# modprobe rpcrdma
[root@magro ~]# grep nvfs_ops /proc/kallsyms
ffffffffc0f20dc8 b nvfs_ops     [rpcrdma]
ffffffffc0970700 b nvfs_ops     [nvme_rdma]
ffffffffc02ce718 b nvfs_ops     [nvme]
[root@magro ~]# /usr/local/cuda/gds/tools/gdscheck -p
 GDS release version: 1.0.1.3
 nvidia_fs version:  2.7 libcufile version: 2.4
 ============
 ENVIRONMENT:
 ============
 =====================
 DRIVER CONFIGURATION:
 =====================
 NVMe               : Supported
 NVMeOF             : Supported
 SCSI               : Unsupported
 ScaleFlux CSD      : Unsupported
 NVMesh             : Unsupported
 DDN EXAScaler      : Unsupported
 IBM Spectrum Scale : Unsupported
 NFS                : Supported
 WekaFS             : Unsupported
 Userspace RDMA     : Unsupported
 --Mellanox PeerDirect : Enabled
 --rdma library        : Not Loaded (libcufile_rdma.so)
 --rdma devices        : Not configured
 --rdma_device_status  : Up: 0 Down: 0
 =====================
 CUFILE CONFIGURATION:
 =====================
 properties.use_compat_mode : true
 properties.gds_rdma_write_support : true
 properties.use_poll_mode : false
 properties.poll_mode_max_size_kb : 4
 properties.max_batch_io_timeout_msecs : 5
 properties.max_direct_io_size_kb : 16384
 properties.max_device_cache_size_kb : 131072
 properties.max_device_pinned_mem_size_kb : 33554432
 properties.posix_pool_slab_size_kb : 4 1024 16384
 properties.posix_pool_slab_count : 128 64 32
 properties.rdma_peer_affinity_policy : RoundRobin
 properties.rdma_dynamic_routing : 0
 fs.generic.posix_unaligned_writes : false
 fs.lustre.posix_gds_min_kb: 0
 fs.weka.rdma_write_support: false
 profile.nvtx : false
 profile.cufile_stats : 0
 miscellaneous.api_check_aggressive : false
 =========
 GPU INFO:
 =========
 GPU index 0 Tesla V100-PCIE-16GB bar:1 bar size (MiB):16384 supports GDS
 ==============
 PLATFORM INFO:
 ==============
 IOMMU: disabled
 Platform verification succeeded

*1:100GbのN/Wスイッチて結構高いんです。涙。

*2:Q3_1で逆転している原因については調査中

Pg2Arrowに『ぐるぐるSQL』モードをつけてみた。

先月、ツイッタランドに『ぐるぐるSQL』なるワードが降臨した。

qiita.com

これは要するに、あるクエリの結果を取得しつつ、結果行から読み出した値をキーとして別のクエリを繰り返し実行するタイプのクエリを揶揄したもので、まぁ、通信遅延やパース処理、DB側での最適化が効かない諸々の理由で、遅いであろうというのは火を見るより明らかである。自分もまさか『ぐるぐるSQL』に付き合う事になるとは露ほども思わず、以下のようなキレッキレのダジャレを書き込んでニヤニヤしてる位である。

テーブルに定義できる列数の最大値は?

さて、話は変わるが、PostgreSQLのテーブルに定義する事のできる列数の最大値はいったいいくつだろうか?
答えは 1600 列。これはinclude/access/htup_details.hに次のように記載してある。

/*
 * MaxHeapAttributeNumber limits the number of (user) columns in a table.
 * This should be somewhat less than MaxTupleAttributeNumber.  It must be
 * at least one less, else we will fail to do UPDATEs on a maximal-width
 * table (because UPDATE has to form working tuples that include CTID).
 * In practice we want some additional daylight so that we can gracefully
 * support operations that add hidden "resjunk" columns, for example
 * SELECT * FROM wide_table ORDER BY foo, bar, baz.
 * In any case, depending on column data types you will likely be running
 * into the disk-block-based limit on overall tuple size if you have more
 * than a thousand or so columns.  TOAST won't help.
 */
#define MaxHeapAttributeNumber  1600    /* 8 * 200 */

この数字はどこから来ているかというと、今でこそ PostgreSQLストレージエンジンをプラガブルにできるようになったが、PostgreSQL v11以前は heap 形式が唯一にして絶対のテーブルデータ形式であった。

heapテーブルの各タプルはどのようなデータ形式を持っているかというと、以下のHeapTupleHeaderData構造体が表現するヘッダを持ち、その後ろにユーザデータがペイロードとして載るという構造を持っている。

struct HeapTupleHeaderData
{
    union
    {
        HeapTupleFields t_heap;
        DatumTupleFields t_datum;
    }           t_choice;
    ItemPointerData t_ctid;     /* current TID of this or newer tuple (or a
                                 * speculative insertion token) */
    uint16      t_infomask2;    /* number of attributes + various flags */
    uint16      t_infomask;     /* various flag bits, see below */
    uint8       t_hoff;         /* sizeof header incl. bitmap, padding */

    /* ^ - 23 bytes - ^ */
    bits8       t_bits[FLEXIBLE_ARRAY_MEMBER];  /* bitmap of NULLs */

    /* MORE DATA FOLLOWS AT END OF STRUCT */
};

t_choice.t_heapにはxmin, xmaxなどのMVCCに関連する情報が、t_infomaskおよびt_infomask2には様々なタプルの属性が記録されている。
このタプルがどこかにNULL値を含む場合、t_bitsからはじまる配列はNULL-bitmapとなり、どの列がNULL値であるのかを表現する。
また、t_hoffは上記のNULL-bitmapを含むヘッダ全体の大きさを格納する。つまり、ユーザデータが格納されたペイロード部分には((char *)htup + htup->t_hoff)で参照できることになり、この形式は広くPostgreSQLの実装で利用されている。

しかしである。t_hoffは8bit値であるため、(そうそう困ることはないものの)NULL-bitmapの長さは、この8bit値で表現できる長さに収まらねばならないという制限がある。
そうすると、t_hoffでポイントされる64bit-alignedな最大値248bytesから、他のヘッダ要素23bytes、および以前はヘッダ要素の一部だったOID列の4bytes分を引くと、(248 - 23 - 4) = 221bytes = 1768bits という事になり、あとは削除された列などのマージン分も含めて 1600 をリミットにすると記述がある。

つまり、PostgreSQLがテーブルに定義できる列数の上限は heap 形式のNULL-bitmap長によって制約されている、という事である。


だがしかし。駄菓子菓子。
CREATE TABLE(や、一部処理を共有する CREATE FOREIGN TABLE)やALTER TABLE ... ADD COLUMNでこれらの列数制限はチェックされているが、たとえば外部テーブルなど、heap形式を使わないテーブルに対してこのような制限を加えることは妥当であろうか?

試しに、列数が2000のArrowファイルをマップする外部テーブルを作成してみた。
普通にCREATE FOREIGN TABLEしても怒られるだけなので、特権ユーザでシステムカタログをゴニョゴニョしている*1

postgres=# \d widetest
                   Foreign table "public.widetest"
  Column   |   Type   | Collation | Nullable | Default | FDW options
-----------+----------+-----------+----------+---------+-------------
 object_id | integer  |           |          |         |
 c0000     | smallint |           |          |         |
 c0001     | smallint |           |          |         |
 c0002     | smallint |           |          |         |
    :          :          :      :      :
 c1998     | smallint |           |          |         |
 c1999     | smallint |           |          |         |
Server: arrow_fdw
FDW options: (file '/home/kaigai/wide2000.arrow')
postgres=# select * from widetest;
ERROR:  target lists can have at most 1664 entries

読み出そうとする列数が多すぎる場合、怒られが発生する。
(これはSQL処理の過程で中間データとしてHeapTupleを作成する事があり得るので妥当な制限)

postgres=# select object_id, c0164, c1275, c1878 from widetest where c1997 < 10;
 object_id | c0164 | c1275 | c1878
-----------+-------+-------+-------
        97 |     4 |     4 |     4
       136 |     2 |     2 |     2
       285 |     5 |     5 |     5
       311 |     6 |     6 |     6
       453 |     1 |     1 |     1
       623 |     9 |     9 |     9
       763 |     6 |     6 |     6
       859 |     6 |     6 |     6
       888 |     9 |     9 |     9
       915 |     9 |     9 |     9
(10 rows)

このように、読み出すべき列数を絞ってやると正しく動作する*2

Pg2Arrowの『ぐるぐるSQL』モード

さて、非常に列数の多いテーブルを使いたい時に、Arrow_Fdwのように内部データ形式が heap でない場合には、MaxHeapAttributeNumberを越える列数のテーブルを定義しても問題ない事が分かった。
一方でSELECT * FROM widetestがコケたように、PostgreSQLのデータをApache Arrow形式に変換するのは一苦労である。なにしろ、一度に1600列だけしか出力できないのであるので。

そもそもPostgreSQLのテーブルに格納されている時点で、たとえ生データが数千列を持つようなデータであっても、数百列ごとに複数のテーブルに分割されているハズで、例えば同じobject_idでテーブルA、テーブルB、テーブルC、...を検索して結合できるような構造になっているハズである。
それであれば、Pg2Arrowでテーブルをダンプしながら、読み出したキー値を元に他のテーブルと結合しながら処理を進めれば良いではないか、という点に思い至った。

まさしく『ぐるぐるSQL』である。

追加したオプションは--inner-join=COMMAND--outer-join=COMMANDである。
COMMANDには$(Field_Name)という形式で、-cまたは-tで指定した問合せ結果のフィールド名を指定する。
その名の如く、--inner-joinの場合は従属問い合わせの結果が空であった場合には、その行を生成しない。--outer-joinの場合は、従属問い合わせの結果をNULL値として埋めるという違いがある。

簡単な例で試してみる。

以下のコマンドにより、テーブルt_aを読み出しつつ、その結果id列に等しいid値を持つテーブルt_bの行を読み出す。

$ ./pg2arrow -d postgres -t t_a --inner-join 'SELECT b1,b2,b3,b4 FROM t_b WHERE $(id) = id' -o /tmp/test1.arrow

以下のように、PyArrowを用いて読み出すと、id値の等しい行だけが--inner-joinによって結合され、Apache Arrow形式で書き込まれている事がわかる。

$ python3
Python 3.6.8 (default, Aug 24 2020, 17:57:11)
[GCC 8.3.1 20191121 (Red Hat 8.3.1-5)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyarrow as pa
>>> X = pa.RecordBatchFileReader('/tmp/test1.arrow')
>>> X.read_all().to_pandas()
     id         a1         a2         a3         a4         a5   b1  b2  b3  b4
0     2  34.893696  59.925064   6.358087  81.492798   6.011221   10  38  37  40
1     4  10.005774  77.520073  17.843210  67.404022  52.072567   41  23  65  53
2     6  26.413124  36.939953  94.206100  26.846878   7.516258   57  18  45  27
3     8  43.582317  33.237537  18.829145  55.289623  21.512911   71  16  17  66
4    10  73.309898  93.423172  87.080872  37.176331  87.864304   40  79  76  47
5    12  37.077366  26.679760  85.896881  37.653671   1.374519   39  33  66  10
6    14  61.082752  95.813309   9.475588  50.992413  62.433903   39  20  10  70
7    16  42.964298  88.846252  78.952682  24.310852  51.272732   63  80  63  97
8    18  69.875244  39.434425  58.692245  18.880169  74.676041   44  76   5  66
9    20  54.711720  22.910282  57.094353  37.765366  95.790314   67  27  99  29
10   22  83.051926  67.801826  74.100807  64.762413  27.869209  100  54  95  16
11   24  97.913574  84.459969  40.165981  34.431095  47.260651   56  23  26  14
 :     :           :                  :

一方、--outer-joinモードを使うと、id値に一致する従属問い合わせの結果が存在しない場合、そのフィールドがNULLで埋められる。
テーブルt_bのid値は偶数のみであるため、奇数に対応するものはNaNとなっている。

$ ./pg2arrow -d postgres -t t_a --outer-join 'SELECT b1,b2,b3,b4 FROM t_b WHERE $(id) = id' -o /tmp/test2.arrow

$ python3
Python 3.6.8 (default, Aug 24 2020, 17:57:11)
[GCC 8.3.1 20191121 (Red Hat 8.3.1-5)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyarrow as pa
>>> X = pa.RecordBatchFileReader('/tmp/test2.arrow')
>>> X.read_all().to_pandas()
     id         a1         a2         a3         a4         a5    b1    b2    b3    b4
0     2  34.893696  59.925064   6.358087  81.492798   6.011221  10.0  38.0  37.0  40.0
1     3  28.479965  70.522125  65.990173  47.762203  53.936710   NaN   NaN   NaN   NaN
2     4  10.005774  77.520073  17.843210  67.404022  52.072567  41.0  23.0  65.0  53.0
3     5  50.943336  67.333290  26.790262  72.249977  96.062378   NaN   NaN   NaN   NaN
4     6  26.413124  36.939953  94.206100  26.846878   7.516258  57.0  18.0  45.0  27.0
..  ...        ...        ...        ...        ...        ...   ...   ...   ...   ...

分野によっては非常に多くの列を定義する事がある。
その場合、PostgreSQLにデータを突っ込む場合だと1600、MySQLは少し余裕があり4000個*3Oracleだと1000個*4という事のようだが、Apache Arrow形式でデータを保存し、それをArrow_Fdwでマッピングするという形にすれば、元々は同じテーブルに置かれていたデータを実行時にJOINで再構築する手間が省けるほか、列データ形式によるI/O削減効果で、多くの場合は高速化も期待できる。

*1:良い子は真似してはいけません

*2:なお、データ作成の際に同じ列をひたすら複製したので、cXXXX列の値は全て同じである

*3:MySQL :: MySQL 8.0 Reference Manual :: 8.4.7 Limits on Table Column Count and Row Size

*4:Logical Database Limits

GPUメモリストア(Gstore_Fdw)

この記事は「PostgreSQL Advent Calendar 2020」の 16日目です。

GPUPostGISの他に、今年のPG-Stromの機能強化のうち比較的大きめのものについてもご紹介したいと思います。

GPUメモリストア(Gstore_Fdw)とは

GPUバイスメモリ上に予め確保した領域にデータを保存し、これをPostgreSQLのFDW(Foreign Data Wrapper)を通じて読み書きする機能。GpuScan/GpuJoin/GpuPreAggといったPG-Stromの提供する各種ロジックにおいてデータソースとして活用する事ができ、その場合、ストレージやホストRAM上のバッファからデータを読み出す必要がないため、その分の処理を節約する事ができる。

この手の機能を持ったGPU-DBというのは他にもあるが、Gstore_Fdwのポイントは更新系ワークロードもきちんと考慮している点。通常、GPUバイスメモリを更新するには、PCI-Eバスを経由してデータを転送する必要があるが、これのレイテンシが馬鹿にならない。
大雑把に言って、ホストRAMの更新が数十nsのオーダーである一方、PCI-Eバスを介したGPUバイスメモリの読み書きには数十usを要する。つまり千倍差である。*1

では Gstore_Fdw ではどうしているのか?
およそ数百~数万行分の更新ログを溜めておいて、あとで一気にGPUへ転送し、GPU側では数千コアを同時に動かして更新ログをGPUバイスメモリ上のデータストアに適用する。そうする事で、比較的大きなPCI-Eのレイテンシも一行あたりに直せば大した値ではなくなる。

更新ログをGPUに転送するのは以下の3つのタイミング。

  • 未適用の更新ログがあり、それが一定の閾値を越えた場合。
  • 未適用の更新ログがあり、最終更新から一定の時間を経過した場合。
  • GPUバイスメモリを参照する分析クエリを実行する場合。

3つ目がポイントで、ここでは暗黙の裡に分析クエリの実行頻度は更新クエリよりも遥かに少ないという仮定を置いているが、GPUバイスメモリ側の更新がどれだけ遅延したとしても、分析クエリの実行より前に最新状態にリフレッシュできていれば問題ない、という事である。

この機能の使い道として想定しているのは、携帯電話や自動車、ドローンなど、GPSによる位置情報を時々刻々更新するようなパターンのワークロードで、GPUPostGISと組み合わせて利用するパターンを念頭に置いている。
この手のログデータは意外とデータサイズは小さい*2が、更新の頻度が極めて高いという特徴を持つ。

Gstore_Fdw で外部テーブルを定義してみる。

では早速、Gstore_Fdwを用いた外部テーブルを定義してみることにする。
開発用サーバには2台のGPUが搭載されているので、PostgreSQLパーティション機能を用いて両方のGPUにデータを振り分ける。

まず、パーティションのRootとなるテーブルを定義。

=# create table fpoints (
     dev_id int,
     ts timestamp,
     x float,
     y float)
   partition by hash ( dev_id );
CREATE TABLE

続いて、パーティションLeafとしてGstore_Fdw外部テーブルを定義。

=# create foreign table fpoints_p0
     partition of fpoints for values with (modulus 2, remainder 0)
     server gstore_fdw
     options (base_file '/opt/pgdata12/fpoints_p0.base',
              redo_log_file '/opt/pmem/fpoints_p0.redo',
              gpu_device_id '0',
              max_num_rows '8000000',
              primary_key 'dev_id');
CREATE FOREIGN TABLE
=# create foreign table fpoints_p1
     partition of fpoints for values with (modulus 2, remainder 1)
     server gstore_fdw
     options (base_file '/opt/pgdata12/fpoints_p1.base',
              redo_log_file '/opt/pmem/fpoints_p1.redo',
              gpu_device_id '1',
              max_num_rows '8000000',
              primary_key 'dev_id');
CREATE FOREIGN TABLE
  • base_fileというのはデータストアを記録するためのファイルで、再起動やクラッシュ後のリカバリのために使用される。
  • redo_log_fileというのは更新ログを記録するためのファイルで、Persistent Memory領域を使用するのが推奨。今回は、Intel製DCPMM[128GB]をマウントした/opt/pmem以下のファイルを指定している。
  • gpu_device_idは使用するGPUを指定し、max_num_rowsはバッファを確保する最大行数を指定する。
  • primary_keyには主キーとして振る舞うカラムを指定する。

これらの外部テーブルを定義すると、ログには以下のように表示され、GPUバイスメモリをそれぞれ 300MB 程度確保した事がわかる。

2020-12-16 08:51:48.668 UTC [11130] LOG:  gstore_fdw: initial load [fpoints_p0] - main 324000640 bytes, extra 0 bytes
2020-12-16 08:51:58.265 UTC [11129] LOG:  gstore_fdw: initial load [fpoints_p1] - main 324000640 bytes, extra 0 bytes

初期データの投入と更新クエリの実行

続いて、GPUメモリストアに初期データの投入を行う。これはINSERTでもCOPY FROMでもよい。

=# insert into fpoints (select x, now(), 100*random(), 100*random() from generate_series(0,12000000) x);
INSERT 0 12000001

Hashパーティショニングを使用しているため、概ね均等にデータが分散されていることが分かる。

postgres=# select count(*) from fpoints;
  count
----------
 12000001
(1 row)

postgres=# select count(*) from fpoints_p0;
  count
---------
 5998934
(1 row)

postgres=# select count(*) from fpoints_p1;
  count
---------
 6001067
(1 row)

このデータに対して、位置情報とタイムスタンプの更新を想定し、以下のようなクエリをpgbenchで実行してみることにする。
tsはタイムスタンプの更新、x,yはそれぞれ倍精度浮動小数点による座標のイメージである。

UPDATE fpoints SET ts = now(),
                   x = random() * 100.0,
                   y = random() * 100.0
             WHERE dev_id = (select (random() * 250000)::int * 48 + :client_id)

実行結果は以下のような感じ。

$ pgbench -n -f mytest.sql -c 48 -j 48 -T 15 postgres
transaction type: mytest.sql
scaling factor: 1
query mode: simple
number of clients: 48
number of threads: 48
duration: 15 s
number of transactions actually processed: 1471332
latency average = 0.490 ms
tps = 97912.340285 (including connections establishing)
tps = 97950.056405 (excluding connections establishing)

これに伴い、ログにも溜まった更新ログをGPUへ転送し、これをGPUバイスメモリ上のデータストアに適用した事が出力されている。

2020-12-16 09:24:27.484 UTC [11129] LOG:  gstore_fdw: Log applied (nitems=1905688, length=124817704, pos 521268808 => 612741824)
2020-12-16 09:24:28.294 UTC [11130] LOG:  gstore_fdw: Log applied (nitems=2037144, length=131644512, pos 521086144 => 618869072)
2020-12-16 09:24:42.567 UTC [11129] LOG:  gstore_fdw: Log applied (nitems=823198, length=76151024, pos 612741824 => 652255360)
2020-12-16 09:24:43.390 UTC [11130] LOG:  gstore_fdw: Log applied (nitems=824461, length=76733600, pos 618869072 => 658443232)
2020-12-16 09:24:57.686 UTC [11129] LOG:  gstore_fdw: Log applied (nitems=1574755, length=118524736, pos 652255360 => 727843584)
2020-12-16 09:24:58.497 UTC [11130] LOG:  gstore_fdw: Log applied (nitems=1436306, length=111847320, pos 658443232 => 727385888)

分析クエリの実行はこのような形で、fpointsテーブル(実際にはその配下のfpoints_p0とfpoints_p1外部テーブル)から、指定範囲内に属する座標を抽出するというような形での利用を想定している。
これは別に単純検索である必要はなく、例えば、前回の記事で紹介したようなGiSTインデックスを用いた多対多の範囲検索であってもよい。

=# SELECT count(*) FROM fpoints
 WHERE st_contains('polygon ((10 10,90 10,90 12,12 12,12 88,90 88,90 90,10 90,10 10))', st_makepoint(x,y));
 count
--------
 565047
(1 row)

Time: 254.274 ms

Gstore_Fdwの今後

現在、PG-Strom v3.0のリリースに向けてマルチGPUの対応をはじめとした諸々の改善や、テストケースの作成を行っています。
乞うご期待!

ちなみに、Advent Calendarとしてはずいぶんギリギリの公開となってしまいましたが、こういった悲しい事故がありました。

*1:HPC-oriented Latency Numbers Every Programmer Should Know · GitHub

*2:例えば、1デバイスあたり1kBのデータを保持していても、100万台分で1GBにしかならない。

GPU版PostGISとGiSTインデックス対応

今年も早いもので気がついたら Advent Calendar の季節ですが、今回のこちらの記事は RDBMS-GIS(MySQL,PostgreSQLなど) Advent Calendar 2020 - Qiita の 12/5(土) のものです。

2020年はこの辺からスタートして、かなり性能的に面白いところまで持ってくることができたので、年末の締めにGPUPostGISとGiSTインデックス対応についてご紹介させていただこうかと。

何をやりたいか?

元々、PG-StromではIoT/M2M領域でありがちな『時間経過とともに溜まっていくログデータの処理、分析』にフォーカスを当てていたが、携帯電話や自動車といった移動体デバイスの生成するログを考えると、その中にGPSの生成する『緯度、経度』情報が含まれる事もしばしばである。
ホンマに商売になるのか分からないところではあったが、こういったデータをPostGISを使って分析したい、という問合せはちょくちょくもらっていたので、相応に需要はあるのだろうという事で、春先からGPUPostGIS関数の実装を進めていた。

ターゲットとするワークロードは以下のようなもので、携帯電話や自動車から収集したログデータ(これには、典型的にはデバイスID、タイムスタンプ、緯度・経度、その他の属性が含まれる)と、例えばプッシュ広告を配信する地域や、事故・渋滞情報を配信するエリアを定義するエリア情報を突合するという処理を想定している。
つまり、単純化すると多角形領域の中に含まれる点を抽出するという処理になる。

これをナイーブに実装するだけであれば、st_containsst_dwithinといった関数をGPUで実装するだけで*1よいのだが、例えば10万個の多角形領域 × 1000万個の座標データの組み合わせを検査するとなればその組み合わせは1兆通りにのぼり、1枚あたり数千コアを搭載するGPUといえども、なかなかにしんどい処理となる。

こういったケースで絞り込みを効率的に行うため、PostgreSQL/PostGISにはGiSTインデックスという仕組みが存在する。

GiSTインデックス(R木)

位置データの場合、緯度・経度という二次元的な広がりがあるため、ある値が別の値より大きいのか、小さいのかという関係を定義する事ができない。
そのため、値の大小関係を利用して目的の要素へと直線的に木構造を降下していくB木のようにはいかないが、それでも効率的に絞り込みを行うためR木と呼ばれるインデックス構造を作る事ができる。

R木インデックスでは、多角形要素(Polygon)はどれだけ複雑な形状を持っていようとも、その形状を完全に包含する長方形(Bounding-Box)であるとして扱われる。ある点要素がその長方形の中に含まれていれば多角形要素の中に含まれる可能性があるが、その長方形の外側であれば、複雑な多角形要素との当たり判定を行うまでもなく『包含されていない』と結果を返す事ができる。

以下の図のケースでは、R木インデックスのRoot要素であるR1はその子要素であるR3、R4、R5を完全に包含し、R2は同じくその子要素であるR6、R7を完全に包含する長方形として定義されてる。
もし以下の図の位置が検索キーとして与えられた場合、R1およびR2は共に検索キーを包含し、その子要素をチェックする事となる。

次に、R1の子要素R3、R4、R5と検索キーの重なりを検査する。すると、R4は検索キーを包含するが、R3とR5は検索キーを包含しない。
したがって、R3の子要素R8、R9、R10およびR5の子要素R13、R14はそもそもチェックするまでもなく、検索キーを包含しないという事になる。

次に、R4の子要素R11とR12と検索キーの重なりを検査する。すると、R12は検索キーを包含するが、R11は包含しない。
この階層はR木のLeaf要素であるのでこれ以上探索が深くなることはなく、R12がインデックスする多角形要素が本当に検索キーを包含するのかどうか、st_contains関数などを用いてチェックする。

一方、Root要素で検索キーを包含していたR2の子要素R6、R7は共に検索キーを包含しないため、ここでインデックスの探索は打ち切りとなる。

この例では単純化されているが、実際にはR木の一階層を降下する毎に100要素程度の『当たり判定』をシーケンシャルに行っていくため、実はインデックス探索とはいえ、そこそこ計算ヘビーな処理にはなってしまう。
一方でGPU向きの性質としては、突合処理中のR木インデックスはRead-Onlyなデータ構造であるため並列度を上げやすく、その気になれば数千コアを同時に稼働してのインデックス探索を行う事ができる。

簡単なベンチマーク

PG-Stromに実装したGPUPostGISの機能には、多角形や点などジオメトリ要素間の包含関係を判定するst_contains関数も含まれており、これを用いてのエリア定義情報×座標情報の突合処理のベンチマークを行ってみる事にする。

サンプルとして使用するのは、国土地理院の公開している市区町村の形状データと、日本列島を概ね包含する領域にランダムに打った点との突合処理。
この中で、東京都に属する市区町村に含まれるエリアに打たれた点の数を、市区町村ごとにカウントするものとする。

実行するクエリは以下の通り。geo_japanテーブルには市町村の形状データを、fgeopointテーブルにはランダムに生成した点のデータを挿入する。

SELECT n03_001,n03_004,count(*)
  FROM geo_japan j, fgeopoint p
 WHERE st_contains(j.geom, st_makepoint(x,y))
   AND j.n03_001 like '東京都’
GROUP BY n03_001,n03_004;

ランダムなデータの生成は以下の通り。

=# CREATE TABLE
    geopoint (
      gid int primary key,
      x   float8,
      y   float8);
CREATE TABLE
=# INSERT INTO geopoint (SELECT x, pgstrom.random_float(0, 123.0, 154.2),
                                   pgstrom.random_float(0, 20.0, 46.2)
                           FROM generate_series(1,10000000) x);
INSERT 0 10000000
postgres=# SELECT * FROM geopoint LIMIT 4;
 gid |         x          |         y
-----+--------------------+--------------------
   1 |  133.5737876430963 | 23.438477765972948
   2 | 133.47253950874904 | 22.512966607004856
   3 |  136.9879882356096 |  22.22637613640464
   4 |  126.3652188637132 | 27.186177021165463
(4 rows)

GPU側はこれと同じ内容をGPUメモリストアに挿入する。
GPUメモリストアに関しては、後日、PostgreSQL Advent Calendar 2020の16日目でも記載する予定ですので、そちらをご覧あれ。

=# CREATE FOREIGN TABLE
    fgeopoint (
      gid int,
      x   float,
      y   float)
    SERVER gstore_fdw
    OPTIONS (gpu_device_id '0',
             base_file '/opt/nvme/fgeopoint.base',
             redo_log_file '/opt/pmem/fgeopoint.redo',
             max_num_rows '12000000',
             primary_key 'gid');
CREATE FOREIGN TABLE
=# INSERT INTO fgeopoint (SELECT * FROM geopoint);
INSERT 0 10000000

これで準備が整った。さぁ、実行してみる事にしよう。
先ずはCPUのみのケース。予め並列度を上げておく。

=# set pg_strom.enabled = off;
SET
=# set max_parallel_workers_per_gather = 99;
SET
=# SELECT n03_001,n03_004,count(*)
  FROM geo_japan j, geopoint p
 WHERE st_contains(j.geom, st_makepoint(x,y))
   AND j.n03_001 like '東京都'
GROUP BY n03_001,n03_004;
 n03_001 |  n03_004   | count
---------+------------+-------
 東京都  | あきる野市 |    90
 東京都  | 三宅村     |    53
 東京都  | 三鷹市     |    14
    :            :
 東京都  | 青ヶ島村   |     4
 東京都  | 青梅市     |   109
(63 rows)

Time: 30539.097 ms (00:30.539)

約30秒と言ったところ。

なおEXPLAIN ANALYZEを見てみると、応答時間30.7秒のうち30.68秒をNested Loop+Index Scanの箇所で消費しており、ここがワークロードの肝である事がわかる。

postgres=# EXPLAIN ANALYZE
SELECT n03_001,n03_004,count(*)
  FROM geo_japan j, geopoint p
 WHERE st_contains(j.geom, st_makepoint(x,y))
   AND j.n03_001 like '東京都'
GROUP BY n03_001,n03_004;
                                                                                QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Finalize GroupAggregate  (cost=7483675086.12..7483829745.31 rows=4858 width=29) (actual time=30709.855..30710.080 rows=63 loops=1)
   Group Key: j.n03_001, j.n03_004
   ->  Gather Merge  (cost=7483675086.12..7483829550.99 rows=19432 width=29) (actual time=30709.838..30732.270 rows=244 loops=1)
         Workers Planned: 4
         Workers Launched: 3
         ->  Partial GroupAggregate  (cost=7483674086.06..7483826236.39 rows=4858 width=29) (actual time=30687.466..30687.572 rows=61 loops=4)
               Group Key: j.n03_001, j.n03_004
               ->  Sort  (cost=7483674086.06..7483712111.50 rows=15210175 width=21) (actual time=30687.452..30687.475 rows=638 loops=4)
                     Sort Key: j.n03_001, j.n03_004
                     Sort Method: quicksort  Memory: 73kB
                     Worker 0:  Sort Method: quicksort  Memory: 74kB
                     Worker 1:  Sort Method: quicksort  Memory: 74kB
                     Worker 2:  Sort Method: quicksort  Memory: 76kB
                     ->  Nested Loop  (cost=0.41..7481859623.72 rows=15210175 width=21) (actual time=71.496..30686.278 rows=638 loops=4)
                           ->  Parallel Seq Scan on geopoint p  (cost=0.00..88695.29 rows=2500029 width=16) (actual time=0.012..207.553 rows=2500000 loops=4)
                           ->  Index Scan using geo_japan_geom_idx on geo_japan j  (cost=0.41..2992.66 rows=1 width=1868) (actual time=0.012..0.012 rows=0 loops=10000000)
                                 Index Cond: (geom ~ st_makepoint(p.x, p.y))
                                 Filter: (((n03_001)::text ~~ '東京都'::text) AND st_contains(geom, st_makepoint(p.x, p.y)))
                                 Rows Removed by Filter: 0
 Planning Time: 0.156 ms
 Execution Time: 30732.422 ms
(21 rows)

次に、GPUによるGiSTインデックスの探索を試してみる。
使用したのは、先週届いたばかりの最新鋭モデル、NVIDIA A100。

こやつを利用して実行した結果がコレ。

=# SELECT n03_001,n03_004,count(*)
  FROM geo_japan j, fgeopoint p
 WHERE st_contains(j.geom, st_makepoint(x,y))
   AND j.n03_001 like '東京都'
GROUP BY n03_001,n03_004;
 n03_001 |  n03_004   | count
---------+------------+-------
 東京都  | あきる野市 |    90
 東京都  | 三宅村     |    53
    :            :
 東京都  | 青ヶ島村   |     4
 東京都  | 青梅市     |   109
(63 rows)

Time: 316.673 ms

どやっ!!316msで100倍近い高速化!

EXPLAIN ANALYZEを覗いてみると、GpuJoinのdepth=1でGpuGiSTJoinが選択され、13.28MBのGiSTインデックスをGPUへロードして結合処理を行っている事がわかる。

=# EXPLAIN ANALYZE
SELECT n03_001,n03_004,count(*)
  FROM geo_japan j, fgeopoint p
 WHERE st_contains(j.geom, st_makepoint(x,y))
   AND j.n03_001 like '東京都'
GROUP BY n03_001,n03_004;
                                                                QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------
 GroupAggregate  (cost=6141933.29..6142042.59 rows=4858 width=29) (actual time=329.118..329.139 rows=63 loops=1)
   Group Key: j.n03_001, j.n03_004
   ->  Sort  (cost=6141933.29..6141945.43 rows=4858 width=29) (actual time=329.107..329.110 rows=63 loops=1)
         Sort Key: j.n03_001, j.n03_004
         Sort Method: quicksort  Memory: 29kB
         ->  Custom Scan (GpuPreAgg)  (cost=6141575.10..6141635.83 rows=4858 width=29) (actual time=328.902..328.911 rows=63 loops=1)
               Reduction: Local
               Combined GpuJoin: enabled
               ->  Custom Scan (GpuJoin) on fgeopoint p  (cost=3759781.06..6712457.81 rows=60840000 width=21) (never executed)
                     Outer Scan: fgeopoint p  (cost=0.00..100000.00 rows=10000000 width=16) (never executed)
                     Depth 1: GpuGiSTJoin(plan nrows: 10000000...60840000, actual nrows: 10000000...2553)
                              HeapSize: 7841.91KB (estimated: 3113.70KB), IndexSize: 13.28MB
                              IndexFilter: (j.geom ~ st_makepoint(p.x, p.y)) on geo_japan_geom_idx
                              Rows Fetched by Index: 4952
                              JoinQuals: st_contains(j.geom, st_makepoint(p.x, p.y))
                     ->  Seq Scan on geo_japan j  (cost=0.00..8928.24 rows=6084 width=1868) (actual time=0.164..17.723 rows=6173 loops=1)
                           Filter: ((n03_001)::text ~~ '東京都'::text)
                           Rows Removed by Filter: 112726
 Planning Time: 0.344 ms
 Execution Time: 340.415 ms
(20 rows)

まとめ

ワークロードによってはPostGIS関数の処理を大幅に高速化できる可能性があるので、PG-StromとGPUPostGIS、ぜひ使ってください。

*1:いや、そこそこ大変ではあるが…。

mysql2arrowでMySQLからデータを抜く

以前からPG-Stromのパッケージにpg2arrowというユーティリティを同梱しており、これを使うと、PostgreSQLに投げたクエリからApache Arrow形式のファイルを作成する事ができる。

kaigai.hatenablog.com
qiita.com

昨年、当初のバージョンを作った時から、内部的には色々ゴチャゴチャ変わっていて*1、Arrow_Fdwとコードを共有するための改良や、RDBMSへの接続に固有の部分だけを別ファイルに切り出すという事をやっていた。
これは、PostgreSQLだけをデータソースにするのではなく、Webアプリやゲームの業界でよく使われる MySQL や、将来的にはNoSQLなどへも簡易に対応できるようにという意味での基礎工事のようなものである。今回はまず、これを MySQL に対応させてみた。

MySQLからWebアプリやゲームのログ情報を Apache Arrow 形式で抜き出し、これを単純なファイルとして NVME-SSD 上のボリュームに保存する。
これらのファイルを Arrow_Fdw 外部テーブルを用いて PostgreSQLマッピングすれば、解析系DBにわざわざデータを再度インポートしなくても、Webアプリやゲームのログを集計処理や異常検知に回す事ができるようになる。
加えて、PG-StromであればArrow_Fdw外部テーブルに対してSSD-to-GPU Direct SQLを実行する事ができるので、きちんとシステムを設計してやれば、秒速で10億レコード超を処理する事だって不可能ではない。

使い方自体はそれほど複雑なものではない。
大半のオプションが pg2arrow と共通*2で、今回の機能強化に合わせて-tオプションを追加した程度である。

$ mysql2arrow --help
Usage:
  mysql2arrow [OPTION] [database] [username]

General options:
  -d, --dbname=DBNAME   Database name to connect to
  -c, --command=COMMAND SQL command to run
  -t, --table=TABLENAME Table name to be dumped
      (-c and -t are exclusive, either of them must be given)
  -o, --output=FILENAME result file in Apache Arrow format
      --append=FILENAME result Apache Arrow file to be appended
      (--output and --append are exclusive. If neither of them
       are given, it creates a temporary file.)

Arrow format options:
  -s, --segment-size=SIZE size of record batch for each

Connection options:
  -h, --host=HOSTNAME  database server host
  -p, --port=PORT      database server port
  -u, --user=USERNAME  database user name
  -P, --password=PASS  Password to use when connecting to server

Other options:
      --dump=FILENAME  dump information of arrow file
      --progress       shows progress of the job
      --set=NAME:VALUE config option to set before SQL execution
      --help           shows this message

Report bugs to <pgstrom@heterodb.com>.

簡単な例でデータを抽出してみる。
なお-tオプションは、SELECT * FROM tablenameの省略形。

$ mysql2arrow -d mysql -u root -t t1 -o /dev/shm/hoge.arrow

生成された Apache Arrow ファイルのスキーマ定義、データの配置はこんな感じ

$ mysql2arrow --dump /dev/shm/hoge.arrow
[Footer]
{Footer: version=V4, schema={Schema: endianness=little, fields=[{Field: name="id", nullable=true, type={Int32}, children=[], custom_metadata=[]}, {Field: name="a", nullable=true, type={Int32}, children=[], custom_metadata=[]}, {Field: name="b", nullable=true, type={Float64}, children=[], custom_metadata=[]}, {Field: name="c", nullable=true, type={Utf8}, children=[], custom_metadata=[]}, {Field: name="d", nullable=true, type={Timestamp: unit=sec}, children=[], custom_metadata=[]}], custom_metadata=[{KeyValue: key="sql_command" value="SELECT * FROM t1"}]}, dictionaries=[], recordBatches=[{Block: offset=472, metaDataLength=360 bodyLength=60480}]}
[Record Batch 0]
{Block: offset=472, metaDataLength=360 bodyLength=60480}
{Message: version=V4, body={RecordBatch: length=1000, nodes=[{FieldNode: length=1000, null_count=0}, {FieldNode: length=1000, null_count=26}, {FieldNode: length=1000, null_count=18}, {FieldNode: length=1000, null_count=0}, {FieldNode: length=1000, null_count=17}], buffers=[{Buffer: offset=0, length=0}, {Buffer: offset=0, length=4032}, {Buffer: offset=4032, length=128}, {Buffer: offset=4160, length=4032}, {Buffer: offset=8192, length=128}, {Buffer: offset=8320, length=8000}, {Buffer: offset=16320, length=0}, {Buffer: offset=16320, length=4032}, {Buffer: offset=20352, length=32000}, {Buffer: offset=52352, length=128}, {Buffer: offset=52480, length=8000}]}, bodyLength=60480}

Python (PyArrow) で読み込んでみるとこんな感じですね。

$ python
Python 3.6.8 (default, Oct  7 2019, 17:58:22)
[GCC 8.2.1 20180905 (Red Hat 8.2.1-3)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyarrow as pa
>>> f = pa.ipc.open_file('/dev/shm/hoge.arrow')
>>> f.get_record_batch(0).to_pandas()
       id      a           b                                 c                   d
0       1  750.0  884.851090  c4ca4238a0b923820dcc509a6f75849b 2020-11-25 17:14:56
1       2  962.0  373.533847  c81e728d9d4c2f636f067f89cc14862c 2019-03-26 01:19:29
2       3  287.0  384.895995  eccbc87e4b5ce2fe28308fd9f2a7baf3 2018-11-04 21:55:32
3       4  573.0  890.063600  a87ff679a2f3e71d9181a67b7542122c 2023-05-14 15:24:37
4       5  948.0  778.885925  e4da3b7fbbce2345d7772b0674a318d5 2023-01-18 16:41:12
..    ...    ...         ...                               ...                 ...
995   996 -295.0  424.007169  0b8aff0438617c055eb55f0ba5d226fa 2017-08-15 01:40:31
996   997 -849.0  648.545034  ec5aa0b7846082a2415f0902f0da88f2 2023-05-23 08:58:55
997   998  530.0  865.244230  9ab0d88431732957a618d4a469a0d4c3 2024-07-20 14:13:06
998   999  244.0   96.534528  b706835de79a2b4e80506f582af3676a 2018-08-10 01:42:04
999  1000  997.0  157.958900  a9b7ba70783b617e9998dc4dd82eb3c5 2016-12-27 08:06:44

[1000 rows x 5 columns]

ビルドは PG-Strom のモジュールと一緒にやればよいのですが、mysql-develパッケージをインストールしていない人もいるという想定で((なおpostgresql-develパッケージは全人類がインストールするという想定で))、makeの実行時にWITH_MYSQL2ARROW=1を付加します。

$ make WITH_MYSQL2ARROW=1
gcc -D__MYSQL2ARROW__=1 -D_GNU_SOURCE -g -Wall -I ../src -I ../utils -I /usr/local/pgsql-11/include/server -I/usr/include/mysql -m64  -L/usr/lib64/mysql  -Wl,-rpath,-L/usr/lib64/mysql ../utils/sql2arrow.c ../utils/mysql_client.c ../src/arrow_nodes.c ../src/arrow_write.c -o ../utils/mysql2arrow -lmysqlclient

追記モードで異常終了した時のファイルの回復

もう一点。MySQL対応にするついでに、以前からあった設計上の問題の修正を行っている。

pg2arrowmysql2arrow--appendを指定し、追記モードでSQLの処理結果をApache Arrowファイルに追加する場合、以前のエントリで紹介したように、ファイル末尾のフッター領域を上書きして新しいデータを追加し、最後にフッター領域を再構築する。

kaigai.hatenablog.com

この時、SQLの異常終了やコマンド自体のバグによってプロセスが異常終了してしまったら、元々のApache Arrowファイルが破損したまま残ってしまう事になっていた。
これを修正するため、最新版では元々のApache Arrowファイルのフッタ領域の内容(このサイズ自体は大した量ではないので)を別の領域に退避し、シグナルハンドラとon_exit()ハンドラを用いて、終了コード 0 以外でプロセスが exit したり、SIGSEGVやSIGBUSを受け取った場合にはこれを元の位置に書き戻すという処理を行っている。

例えば、6GB程度の大きさがあるテーブル t0 から100行だけ取り出す。これは生成された Apache Arrow ファイルも5kB程度のもの。

$ pg2arrow -d postgres -c "SELECT * FROM t0 LIMIT 100" -o /dev/shm/monu.arrow
$ ls -l /dev/shm/monu.arrow
-rw-r--r--. 1 kaigai users 4934 Mar 25 12:41 /dev/shm/monu.arrow

ここに、今度はテーブル全体を追記中にコマンドを ctrl-c で中断してみる事にする。

$ pg2arrow -d postgres -c "SELECT * FROM t0" --append /dev/shm/monu.arrow
^C

別ターミナルでファイルの大きさを観察してみると、確かに途中までデータが書き込まれ、順調にApache Arrowファイルが肥大化している事が分かるが、pg2arrowの異常終了後、最終的には元の大きさに戻っている。

$ ls -lh /dev/shm/monu.arrow
-rw-r--r--. 1 kaigai users 769M Mar 25 12:46 /dev/shm/monu.arrow
$ ls -lh /dev/shm/monu.arrow
-rw-r--r--. 1 kaigai users 1.1G Mar 25 12:47 /dev/shm/monu.arrow
$ ls -lh /dev/shm/monu.arrow
-rw-r--r--. 1 kaigai users 1.3G Mar 25 12:47 /dev/shm/monu.arrow
$ ls -l /dev/shm/monu.arrow
-rw-r--r--. 1 kaigai users 4934 Mar 25 12:47 /dev/shm/monu.arrow

PyArrowで当該ファイルをオープンしてみても、元通り100行のデータを含む Apache Arrow ファイルである。

$ python
>>> import pyarrow as pa
>>> f = pa.ipc.open_file('/dev/shm/monu.arrow')
>>> f.get_record_batch(0).num_rows
100
>>> f.num_record_batches
1
>>> f.get_record_batch(0).num_rows
100

SIGKILLで強制終了した場合など救えないケースもあるが、一応、こういった運用面での安定性に寄与する機能も強化されているという事で。

*1:リファクタリングと呼ぼう!

*2:PostgreSQL系コマンドは-Wでパスワードのプロンプトを出すが、MySQL系はコマンドラインでパスワードを与えるお作法のよう

Writable Arrow_Fdwと、PL/CUDAがお払い箱になる話

昨年ラストのブログ記事は、pg2arrowに--appendモードを付けてApache Arrowファイルへの追記を行うというトピックだった。

kaigai.hatenablog.com

実は内部的には、PG-StromのArrow_Fdwとpg2arrowのコードは大半を共有していて*1、入り口がスタンドアロンlibpqを使うツールなのか、PostgreSQLのFDW APIなのかという程度の違いしかない。
そこで、Arrow_Fdw外部テーブルに対してINSERT文を実行できるようにして、PostgreSQL側でもApache Arrowファイルへの追記をできるようにしてみた。これは後述の、Python向け各種モジュールとのデータ交換を目的とした機能強化である。

Writable Arrow_Fdw

Arrow_Fdw外部テーブルを書き込み可能にするには、テーブルオプションに writable を付与する。

=# CREATE FOREIGN TABLE ft (
  id    int,
  x     real,
  y     real,
  z     real
) SERVER arrow_fdw
  OPTIONS (file '/dev/shm/ft.arrow', writable 'true');
CREATE FOREIGN TABLE

外部テーブルを定義する時点でfileで指定する Apache Arrow ファイルが存在している必要はないが、writableオプションを指定した場合は、外部テーブルの背後に複数の Apache Arrow ファイルを配置する事はできない。これは1個でないと、どのファイルに書き込むべきかを特定できないため。

ひとまず 1,000 行ほどデータを挿入してみる。

=# INSERT INTO ft (
  SELECT x, pgstrom.random_float() * 100.0,
            pgstrom.random_float() * 100.0,
            pgstrom.random_float() * 100.0
    FROM generate_series(1,1000) x);
INSERT 0 1000

このように、指定したパスに Apache Arrow ファイルが作成され、1,000行分のデータが書き込まれている事がわかる。

$ ls -l /dev/shm/ft.arrow
-rw-------. 1 kaigai users 17166 Feb 17 18:04 /dev/shm/ft.arrow
$ ./utils/pg2arrow --dump /dev/shm/ft.arrow
[Footer]
{Footer: version=V4, schema={Schema: endianness=little, fields=[{Field: name="id", nullable=true, type={Int32}, children=[], custom_metadata=[]}, {Field: name="x", nullable=true, type={Float32}, children=[], custom_metadata=[]}, {Field: name="y", nullable=true, type={Float32}, children=[], custom_metadata=[]}, {Field: name="z", nullable=true, type={Float32}, children=[], custom_metadata=[]}], custom_metadata=[]}, dictionaries=[], recordBatches=[{Block: offset=352, metaDataLength=296 bodyLength=16128}]}
[Record Batch 0]
{Block: offset=352, metaDataLength=296 bodyLength=16128}
{Message: version=V4, body={RecordBatch: length=1000, nodes=[{FieldNode: length=1000, null_count=0}, {FieldNode: length=1000, null_count=0}, {FieldNode: length=1000, null_count=0}, {FieldNode: length=1000, null_count=0}], buffers=[{Buffer: offset=0, length=0}, {Buffer: offset=0, length=4032}, {Buffer: offset=4032, length=0}, {Buffer: offset=4032, length=4032}, {Buffer: offset=8064, length=0}, {Buffer: offset=8064, length=4032}, {Buffer: offset=12096, length=0}, {Buffer: offset=12096, length=4032}]}, bodyLength=16128}

なお、PostgreSQLのFDWモジュールとして動作するからには、トランザクション制御の諸々に従う必要がある。
以下のように未コミットの書き込みに関しては、ABORTROLLBACKで取り消す事ができる。
ただし、効率的にトランザクションを実装するため、INSERTを実行できるのは同時に1トランザクションのみ。要は、少し強めのShareRowExclusiveLockを取っているので、その辺はご注意を。

postgres=# SELECT count(*) FROM ft;
 count
-------
  1000
(1 row)

postgres=# BEGIN;
BEGIN
postgres=# INSERT INTO ft (
  SELECT x, pgstrom.random_float() * 100.0,
            pgstrom.random_float() * 100.0,
            pgstrom.random_float() * 100.0
    FROM generate_series(1,300) x);
INSERT 0 300
postgres=# SELECT count(*) FROM ft;
 count
-------
  1300
(1 row)

postgres=# ABORT;
ROLLBACK
postgres=# SELECT count(*) FROM ft;
 count
-------
  1000
(1 row)

PL/CUDAがお払い箱になる話

Arrow_FdwのREAD系に関しては列データによるGPUへの高速なデータ供給という設計意図があるのだが、WRITE系に関しては少し異なる思惑がある。

f:id:kaigai:20200217182318p:plain

バイスから集まってくるログデータ、いわゆるIoT/M2M系のワークロードを処理する事を考えると、生データは割と簡単にTB級のデータサイズに膨れ上がってしまい、何らかの集計や前処理を行わないと機械学習・統計解析のエンジンに渡す事ができない。少なくともGBの単位まで落とす必要はあるだろうと考えており、おそらくその辺は、既存のSSD-to-GPU Direct SQLの役割となる。
問題は、前処理を終えたデータで、これを Python スクリプトで動く機械学習エンジンに渡す時に、いったんCSVで吐き出してから再度 Text -> Binary 変換というのは効率が悪い。できればバイナリのまま受け渡す方が効率的で、スマートであろう。

新たに追加した関数、pgstrom.arrow_fdw_export_cupy()を使えば、Arrow_Fdw外部テーブルに格納されたデータのうち、指定された列だけを抽出してcuPyのndarrayと呼ばれるデータフレームと同じ形式でGPUバイスメモリにロードする事ができる。この関数はGPUバイスメモリを外部からマップするための識別子を返すので、これを利用すれば、Zero-copyで*2PostgreSQLPythonスクリプトの間のデータ交換が可能になる。

加えて、PostgreSQLにはPL/Pythonという、Python言語でユーザ定義関数を記述するための機能があり、これを利用すれば、元々PL/CUDAでユーザにベタっとCUDA Cのコードを書いてもらっていた*3ところを、もっと一般的な Python + cuPy という形で代替できる。

cuPyにもカスタムGPUカーネルを記述する機能があり、cupy.RawKernelクラスを利用する。この人は裏でNVRTCを利用して、GPUカーネルの実行時コンパイルができるので、感覚としてはLL言語スクリプトを書くのとあまり大差ない。

以下にコードのサンプルを置いてみる。
このSQL関数は、①PL/Pythonを利用してcuPyの機能を呼び出し、②Arrow_Fdwからロードしたデータフレームの値の平均値を列ごとに導出する。

CREATE OR REPLACE FUNCTION custom_average(x_ident text)
RETURNS float[] AS
$$
import cupy
import cupy_strom

X = cupy_strom.ipc_import(x_ident)
nattrs = X.shape[0]
nitems = X.shape[1]
gridSz = (nitems + 2047) >> 11;

Y = cupy.zeros((nattrs))

source='''
extern "C" __global__
           __launch_bounds__(1024)
void
kern_gpu_sum(double *y, const float *x, int nitems)
{
    __shared__ float lvalues[2048];
    int     gridSz = (nitems + 2047) / 2048;
    int     colIdx = blockIdx.x / gridSz;
    int     rowBase = (blockIdx.x % gridSz) * 2048;
    int     localId = 2 * threadIdx.x;
    int     i, k;

    // Load values to local shared buffer
    x += colIdx * nitems;
    for (i=threadIdx.x; i < 2048; i+=blockDim.x)
        lvalues[i] = (rowBase + i < nitems ? x[rowBase + i] : 0.0);
    __syncthreads();

    // Run reduction operations
    for (k=0; k < 11; k++)
    {
        int     mask = ((1 << k) - 1);

        if ((threadIdx.x & mask) == 0)
            lvalues[localId] += lvalues[localId + (1<<k)];
        __syncthreads();
    }
    // Write back the total sum
    if (threadIdx.x == 0)
        atomicAdd(&y[colIdx], lvalues[0]);
}
'''
kern = cupy.RawKernel(source, 'kern_gpu_sum')
kern.__call__((gridSz * nattrs,0,0),
              (1024,0,0),
              (Y,X,nitems))
X = 0   # unmap GPU memory

return Y / nitems
$$ LANGUAGE 'plpython3u';

細かい説明は省略するが、入力値Xを2048要素ごとに領域分割し、各領域ごとに1024個のスレッドが協調して11ステップで総和を計算し、出力バッファYに書き出すというモノである。

分かりやすいように、x列、y列、z列の値がそれぞれ異なる分布を取るように初期化する。

=# INSERT INTO ft (SELECT x, pgstrom.random_int(0,1,10000)::float/100.0,
                             pgstrom.random_int(0,-7500,2500)::float/100.0,
                             pgstrom.random_int(0,5000,15000)::float/100.0
                     FROM generate_series(1,1000000) x);
=# SELECT avg(x), avg(y), avg(z) FROM ft;
       avg        |        avg        |       avg
------------------+-------------------+-----------------
 49.9972925601087 | -24.9707353391815 | 99.982088626751
(1 row)

で、件のPL/Pythonユーザ定義関数を呼び出す。
pgstrom.arrow_fdw_export_cupyがftテーブルのx列、y列、z列を抽出して作成したGPUバッファの識別子を、そのままPL/Pythonユーザ定義関数に入力し、GPUカーネルを呼び出して平均値を計算している。

=# SELECT custom_average(pgstrom.arrow_fdw_export_cupy('ft','{x,y,z}'::text[]));
                    custom_average
------------------------------------------------------
 {49.9972926015625,-24.9707353193359,99.982088671875}
(1 row)

上記のように同じ結果が出力された事と、もう一点、PL/CUDAと同じようにユーザ定義のGPUカーネル関数をPL/Python + cuPyの組み合わせで実行できることが実証できた。
イマドキだと、なかなかCUDA CでベタにGPUカーネルを書くという人は少ないらしいので、それよりはより間口の広いPython環境の道具を使えるようにしつつ、必要に応じてPL/CUDA相当のカリカリチューニングなコードを書けるようにする、というのがベターな方向性であろう。

今後は、PL/CUDAからPL/Python + cuPyやその他のPython向けモジュールという形態を推奨するようにしたい。

*1:そりゃそうだ

*2:厳密には外部テーブル⇒GPUへのデータロードが最初の一回だけ

*3:いや、しかし、PL/CUDAのコードを書いていた人なんて自分意外にいるの?いるの?