Pg2Arrowに『ぐるぐるSQL』モードをつけてみた。

先月、ツイッタランドに『ぐるぐるSQL』なるワードが降臨した。

qiita.com

これは要するに、あるクエリの結果を取得しつつ、結果行から読み出した値をキーとして別のクエリを繰り返し実行するタイプのクエリを揶揄したもので、まぁ、通信遅延やパース処理、DB側での最適化が効かない諸々の理由で、遅いであろうというのは火を見るより明らかである。自分もまさか『ぐるぐるSQL』に付き合う事になるとは露ほども思わず、以下のようなキレッキレのダジャレを書き込んでニヤニヤしてる位である。

テーブルに定義できる列数の最大値は?

さて、話は変わるが、PostgreSQLのテーブルに定義する事のできる列数の最大値はいったいいくつだろうか?
答えは 1600 列。これはinclude/access/htup_details.hに次のように記載してある。

/*
 * MaxHeapAttributeNumber limits the number of (user) columns in a table.
 * This should be somewhat less than MaxTupleAttributeNumber.  It must be
 * at least one less, else we will fail to do UPDATEs on a maximal-width
 * table (because UPDATE has to form working tuples that include CTID).
 * In practice we want some additional daylight so that we can gracefully
 * support operations that add hidden "resjunk" columns, for example
 * SELECT * FROM wide_table ORDER BY foo, bar, baz.
 * In any case, depending on column data types you will likely be running
 * into the disk-block-based limit on overall tuple size if you have more
 * than a thousand or so columns.  TOAST won't help.
 */
#define MaxHeapAttributeNumber  1600    /* 8 * 200 */

この数字はどこから来ているかというと、今でこそ PostgreSQLストレージエンジンをプラガブルにできるようになったが、PostgreSQL v11以前は heap 形式が唯一にして絶対のテーブルデータ形式であった。

heapテーブルの各タプルはどのようなデータ形式を持っているかというと、以下のHeapTupleHeaderData構造体が表現するヘッダを持ち、その後ろにユーザデータがペイロードとして載るという構造を持っている。

struct HeapTupleHeaderData
{
    union
    {
        HeapTupleFields t_heap;
        DatumTupleFields t_datum;
    }           t_choice;
    ItemPointerData t_ctid;     /* current TID of this or newer tuple (or a
                                 * speculative insertion token) */
    uint16      t_infomask2;    /* number of attributes + various flags */
    uint16      t_infomask;     /* various flag bits, see below */
    uint8       t_hoff;         /* sizeof header incl. bitmap, padding */

    /* ^ - 23 bytes - ^ */
    bits8       t_bits[FLEXIBLE_ARRAY_MEMBER];  /* bitmap of NULLs */

    /* MORE DATA FOLLOWS AT END OF STRUCT */
};

t_choice.t_heapにはxmin, xmaxなどのMVCCに関連する情報が、t_infomaskおよびt_infomask2には様々なタプルの属性が記録されている。
このタプルがどこかにNULL値を含む場合、t_bitsからはじまる配列はNULL-bitmapとなり、どの列がNULL値であるのかを表現する。
また、t_hoffは上記のNULL-bitmapを含むヘッダ全体の大きさを格納する。つまり、ユーザデータが格納されたペイロード部分には((char *)htup + htup->t_hoff)で参照できることになり、この形式は広くPostgreSQLの実装で利用されている。

しかしである。t_hoffは8bit値であるため、(そうそう困ることはないものの)NULL-bitmapの長さは、この8bit値で表現できる長さに収まらねばならないという制限がある。
そうすると、t_hoffでポイントされる64bit-alignedな最大値248bytesから、他のヘッダ要素23bytes、および以前はヘッダ要素の一部だったOID列の4bytes分を引くと、(248 - 23 - 4) = 221bytes = 1768bits という事になり、あとは削除された列などのマージン分も含めて 1600 をリミットにすると記述がある。

つまり、PostgreSQLがテーブルに定義できる列数の上限は heap 形式のNULL-bitmap長によって制約されている、という事である。


だがしかし。駄菓子菓子。
CREATE TABLE(や、一部処理を共有する CREATE FOREIGN TABLE)やALTER TABLE ... ADD COLUMNでこれらの列数制限はチェックされているが、たとえば外部テーブルなど、heap形式を使わないテーブルに対してこのような制限を加えることは妥当であろうか?

試しに、列数が2000のArrowファイルをマップする外部テーブルを作成してみた。
普通にCREATE FOREIGN TABLEしても怒られるだけなので、特権ユーザでシステムカタログをゴニョゴニョしている*1

postgres=# \d widetest
                   Foreign table "public.widetest"
  Column   |   Type   | Collation | Nullable | Default | FDW options
-----------+----------+-----------+----------+---------+-------------
 object_id | integer  |           |          |         |
 c0000     | smallint |           |          |         |
 c0001     | smallint |           |          |         |
 c0002     | smallint |           |          |         |
    :          :          :      :      :
 c1998     | smallint |           |          |         |
 c1999     | smallint |           |          |         |
Server: arrow_fdw
FDW options: (file '/home/kaigai/wide2000.arrow')
postgres=# select * from widetest;
ERROR:  target lists can have at most 1664 entries

読み出そうとする列数が多すぎる場合、怒られが発生する。
(これはSQL処理の過程で中間データとしてHeapTupleを作成する事があり得るので妥当な制限)

postgres=# select object_id, c0164, c1275, c1878 from widetest where c1997 < 10;
 object_id | c0164 | c1275 | c1878
-----------+-------+-------+-------
        97 |     4 |     4 |     4
       136 |     2 |     2 |     2
       285 |     5 |     5 |     5
       311 |     6 |     6 |     6
       453 |     1 |     1 |     1
       623 |     9 |     9 |     9
       763 |     6 |     6 |     6
       859 |     6 |     6 |     6
       888 |     9 |     9 |     9
       915 |     9 |     9 |     9
(10 rows)

このように、読み出すべき列数を絞ってやると正しく動作する*2

Pg2Arrowの『ぐるぐるSQL』モード

さて、非常に列数の多いテーブルを使いたい時に、Arrow_Fdwのように内部データ形式が heap でない場合には、MaxHeapAttributeNumberを越える列数のテーブルを定義しても問題ない事が分かった。
一方でSELECT * FROM widetestがコケたように、PostgreSQLのデータをApache Arrow形式に変換するのは一苦労である。なにしろ、一度に1600列だけしか出力できないのであるので。

そもそもPostgreSQLのテーブルに格納されている時点で、たとえ生データが数千列を持つようなデータであっても、数百列ごとに複数のテーブルに分割されているハズで、例えば同じobject_idでテーブルA、テーブルB、テーブルC、...を検索して結合できるような構造になっているハズである。
それであれば、Pg2Arrowでテーブルをダンプしながら、読み出したキー値を元に他のテーブルと結合しながら処理を進めれば良いではないか、という点に思い至った。

まさしく『ぐるぐるSQL』である。

追加したオプションは--inner-join=COMMAND--outer-join=COMMANDである。
COMMANDには$(Field_Name)という形式で、-cまたは-tで指定した問合せ結果のフィールド名を指定する。
その名の如く、--inner-joinの場合は従属問い合わせの結果が空であった場合には、その行を生成しない。--outer-joinの場合は、従属問い合わせの結果をNULL値として埋めるという違いがある。

簡単な例で試してみる。

以下のコマンドにより、テーブルt_aを読み出しつつ、その結果id列に等しいid値を持つテーブルt_bの行を読み出す。

$ ./pg2arrow -d postgres -t t_a --inner-join 'SELECT b1,b2,b3,b4 FROM t_b WHERE $(id) = id' -o /tmp/test1.arrow

以下のように、PyArrowを用いて読み出すと、id値の等しい行だけが--inner-joinによって結合され、Apache Arrow形式で書き込まれている事がわかる。

$ python3
Python 3.6.8 (default, Aug 24 2020, 17:57:11)
[GCC 8.3.1 20191121 (Red Hat 8.3.1-5)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyarrow as pa
>>> X = pa.RecordBatchFileReader('/tmp/test1.arrow')
>>> X.read_all().to_pandas()
     id         a1         a2         a3         a4         a5   b1  b2  b3  b4
0     2  34.893696  59.925064   6.358087  81.492798   6.011221   10  38  37  40
1     4  10.005774  77.520073  17.843210  67.404022  52.072567   41  23  65  53
2     6  26.413124  36.939953  94.206100  26.846878   7.516258   57  18  45  27
3     8  43.582317  33.237537  18.829145  55.289623  21.512911   71  16  17  66
4    10  73.309898  93.423172  87.080872  37.176331  87.864304   40  79  76  47
5    12  37.077366  26.679760  85.896881  37.653671   1.374519   39  33  66  10
6    14  61.082752  95.813309   9.475588  50.992413  62.433903   39  20  10  70
7    16  42.964298  88.846252  78.952682  24.310852  51.272732   63  80  63  97
8    18  69.875244  39.434425  58.692245  18.880169  74.676041   44  76   5  66
9    20  54.711720  22.910282  57.094353  37.765366  95.790314   67  27  99  29
10   22  83.051926  67.801826  74.100807  64.762413  27.869209  100  54  95  16
11   24  97.913574  84.459969  40.165981  34.431095  47.260651   56  23  26  14
 :     :           :                  :

一方、--outer-joinモードを使うと、id値に一致する従属問い合わせの結果が存在しない場合、そのフィールドがNULLで埋められる。
テーブルt_bのid値は偶数のみであるため、奇数に対応するものはNaNとなっている。

$ ./pg2arrow -d postgres -t t_a --outer-join 'SELECT b1,b2,b3,b4 FROM t_b WHERE $(id) = id' -o /tmp/test2.arrow

$ python3
Python 3.6.8 (default, Aug 24 2020, 17:57:11)
[GCC 8.3.1 20191121 (Red Hat 8.3.1-5)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyarrow as pa
>>> X = pa.RecordBatchFileReader('/tmp/test2.arrow')
>>> X.read_all().to_pandas()
     id         a1         a2         a3         a4         a5    b1    b2    b3    b4
0     2  34.893696  59.925064   6.358087  81.492798   6.011221  10.0  38.0  37.0  40.0
1     3  28.479965  70.522125  65.990173  47.762203  53.936710   NaN   NaN   NaN   NaN
2     4  10.005774  77.520073  17.843210  67.404022  52.072567  41.0  23.0  65.0  53.0
3     5  50.943336  67.333290  26.790262  72.249977  96.062378   NaN   NaN   NaN   NaN
4     6  26.413124  36.939953  94.206100  26.846878   7.516258  57.0  18.0  45.0  27.0
..  ...        ...        ...        ...        ...        ...   ...   ...   ...   ...

分野によっては非常に多くの列を定義する事がある。
その場合、PostgreSQLにデータを突っ込む場合だと1600、MySQLは少し余裕があり4000個*3Oracleだと1000個*4という事のようだが、Apache Arrow形式でデータを保存し、それをArrow_Fdwでマッピングするという形にすれば、元々は同じテーブルに置かれていたデータを実行時にJOINで再構築する手間が省けるほか、列データ形式によるI/O削減効果で、多くの場合は高速化も期待できる。

*1:良い子は真似してはいけません

*2:なお、データ作成の際に同じ列をひたすら複製したので、cXXXX列の値は全て同じである

*3:MySQL :: MySQL 8.0 Reference Manual :: 8.4.7 Limits on Table Column Count and Row Size

*4:Logical Database Limits

GPUメモリストア(Gstore_Fdw)

この記事は「PostgreSQL Advent Calendar 2020」の 16日目です。

GPUPostGISの他に、今年のPG-Stromの機能強化のうち比較的大きめのものについてもご紹介したいと思います。

GPUメモリストア(Gstore_Fdw)とは

GPUバイスメモリ上に予め確保した領域にデータを保存し、これをPostgreSQLのFDW(Foreign Data Wrapper)を通じて読み書きする機能。GpuScan/GpuJoin/GpuPreAggといったPG-Stromの提供する各種ロジックにおいてデータソースとして活用する事ができ、その場合、ストレージやホストRAM上のバッファからデータを読み出す必要がないため、その分の処理を節約する事ができる。

この手の機能を持ったGPU-DBというのは他にもあるが、Gstore_Fdwのポイントは更新系ワークロードもきちんと考慮している点。通常、GPUバイスメモリを更新するには、PCI-Eバスを経由してデータを転送する必要があるが、これのレイテンシが馬鹿にならない。
大雑把に言って、ホストRAMの更新が数十nsのオーダーである一方、PCI-Eバスを介したGPUバイスメモリの読み書きには数十usを要する。つまり千倍差である。*1

では Gstore_Fdw ではどうしているのか?
およそ数百~数万行分の更新ログを溜めておいて、あとで一気にGPUへ転送し、GPU側では数千コアを同時に動かして更新ログをGPUバイスメモリ上のデータストアに適用する。そうする事で、比較的大きなPCI-Eのレイテンシも一行あたりに直せば大した値ではなくなる。

更新ログをGPUに転送するのは以下の3つのタイミング。

  • 未適用の更新ログがあり、それが一定の閾値を越えた場合。
  • 未適用の更新ログがあり、最終更新から一定の時間を経過した場合。
  • GPUバイスメモリを参照する分析クエリを実行する場合。

3つ目がポイントで、ここでは暗黙の裡に分析クエリの実行頻度は更新クエリよりも遥かに少ないという仮定を置いているが、GPUバイスメモリ側の更新がどれだけ遅延したとしても、分析クエリの実行より前に最新状態にリフレッシュできていれば問題ない、という事である。

この機能の使い道として想定しているのは、携帯電話や自動車、ドローンなど、GPSによる位置情報を時々刻々更新するようなパターンのワークロードで、GPUPostGISと組み合わせて利用するパターンを念頭に置いている。
この手のログデータは意外とデータサイズは小さい*2が、更新の頻度が極めて高いという特徴を持つ。

Gstore_Fdw で外部テーブルを定義してみる。

では早速、Gstore_Fdwを用いた外部テーブルを定義してみることにする。
開発用サーバには2台のGPUが搭載されているので、PostgreSQLパーティション機能を用いて両方のGPUにデータを振り分ける。

まず、パーティションのRootとなるテーブルを定義。

=# create table fpoints (
     dev_id int,
     ts timestamp,
     x float,
     y float)
   partition by hash ( dev_id );
CREATE TABLE

続いて、パーティションLeafとしてGstore_Fdw外部テーブルを定義。

=# create foreign table fpoints_p0
     partition of fpoints for values with (modulus 2, remainder 0)
     server gstore_fdw
     options (base_file '/opt/pgdata12/fpoints_p0.base',
              redo_log_file '/opt/pmem/fpoints_p0.redo',
              gpu_device_id '0',
              max_num_rows '8000000',
              primary_key 'dev_id');
CREATE FOREIGN TABLE
=# create foreign table fpoints_p1
     partition of fpoints for values with (modulus 2, remainder 1)
     server gstore_fdw
     options (base_file '/opt/pgdata12/fpoints_p1.base',
              redo_log_file '/opt/pmem/fpoints_p1.redo',
              gpu_device_id '1',
              max_num_rows '8000000',
              primary_key 'dev_id');
CREATE FOREIGN TABLE
  • base_fileというのはデータストアを記録するためのファイルで、再起動やクラッシュ後のリカバリのために使用される。
  • redo_log_fileというのは更新ログを記録するためのファイルで、Persistent Memory領域を使用するのが推奨。今回は、Intel製DCPMM[128GB]をマウントした/opt/pmem以下のファイルを指定している。
  • gpu_device_idは使用するGPUを指定し、max_num_rowsはバッファを確保する最大行数を指定する。
  • primary_keyには主キーとして振る舞うカラムを指定する。

これらの外部テーブルを定義すると、ログには以下のように表示され、GPUバイスメモリをそれぞれ 300MB 程度確保した事がわかる。

2020-12-16 08:51:48.668 UTC [11130] LOG:  gstore_fdw: initial load [fpoints_p0] - main 324000640 bytes, extra 0 bytes
2020-12-16 08:51:58.265 UTC [11129] LOG:  gstore_fdw: initial load [fpoints_p1] - main 324000640 bytes, extra 0 bytes

初期データの投入と更新クエリの実行

続いて、GPUメモリストアに初期データの投入を行う。これはINSERTでもCOPY FROMでもよい。

=# insert into fpoints (select x, now(), 100*random(), 100*random() from generate_series(0,12000000) x);
INSERT 0 12000001

Hashパーティショニングを使用しているため、概ね均等にデータが分散されていることが分かる。

postgres=# select count(*) from fpoints;
  count
----------
 12000001
(1 row)

postgres=# select count(*) from fpoints_p0;
  count
---------
 5998934
(1 row)

postgres=# select count(*) from fpoints_p1;
  count
---------
 6001067
(1 row)

このデータに対して、位置情報とタイムスタンプの更新を想定し、以下のようなクエリをpgbenchで実行してみることにする。
tsはタイムスタンプの更新、x,yはそれぞれ倍精度浮動小数点による座標のイメージである。

UPDATE fpoints SET ts = now(),
                   x = random() * 100.0,
                   y = random() * 100.0
             WHERE dev_id = (select (random() * 250000)::int * 48 + :client_id)

実行結果は以下のような感じ。

$ pgbench -n -f mytest.sql -c 48 -j 48 -T 15 postgres
transaction type: mytest.sql
scaling factor: 1
query mode: simple
number of clients: 48
number of threads: 48
duration: 15 s
number of transactions actually processed: 1471332
latency average = 0.490 ms
tps = 97912.340285 (including connections establishing)
tps = 97950.056405 (excluding connections establishing)

これに伴い、ログにも溜まった更新ログをGPUへ転送し、これをGPUバイスメモリ上のデータストアに適用した事が出力されている。

2020-12-16 09:24:27.484 UTC [11129] LOG:  gstore_fdw: Log applied (nitems=1905688, length=124817704, pos 521268808 => 612741824)
2020-12-16 09:24:28.294 UTC [11130] LOG:  gstore_fdw: Log applied (nitems=2037144, length=131644512, pos 521086144 => 618869072)
2020-12-16 09:24:42.567 UTC [11129] LOG:  gstore_fdw: Log applied (nitems=823198, length=76151024, pos 612741824 => 652255360)
2020-12-16 09:24:43.390 UTC [11130] LOG:  gstore_fdw: Log applied (nitems=824461, length=76733600, pos 618869072 => 658443232)
2020-12-16 09:24:57.686 UTC [11129] LOG:  gstore_fdw: Log applied (nitems=1574755, length=118524736, pos 652255360 => 727843584)
2020-12-16 09:24:58.497 UTC [11130] LOG:  gstore_fdw: Log applied (nitems=1436306, length=111847320, pos 658443232 => 727385888)

分析クエリの実行はこのような形で、fpointsテーブル(実際にはその配下のfpoints_p0とfpoints_p1外部テーブル)から、指定範囲内に属する座標を抽出するというような形での利用を想定している。
これは別に単純検索である必要はなく、例えば、前回の記事で紹介したようなGiSTインデックスを用いた多対多の範囲検索であってもよい。

=# SELECT count(*) FROM fpoints
 WHERE st_contains('polygon ((10 10,90 10,90 12,12 12,12 88,90 88,90 90,10 90,10 10))', st_makepoint(x,y));
 count
--------
 565047
(1 row)

Time: 254.274 ms

Gstore_Fdwの今後

現在、PG-Strom v3.0のリリースに向けてマルチGPUの対応をはじめとした諸々の改善や、テストケースの作成を行っています。
乞うご期待!

ちなみに、Advent Calendarとしてはずいぶんギリギリの公開となってしまいましたが、こういった悲しい事故がありました。

*1:HPC-oriented Latency Numbers Every Programmer Should Know · GitHub

*2:例えば、1デバイスあたり1kBのデータを保持していても、100万台分で1GBにしかならない。

GPU版PostGISとGiSTインデックス対応

今年も早いもので気がついたら Advent Calendar の季節ですが、今回のこちらの記事は RDBMS-GIS(MySQL,PostgreSQLなど) Advent Calendar 2020 - Qiita の 12/5(土) のものです。

2020年はこの辺からスタートして、かなり性能的に面白いところまで持ってくることができたので、年末の締めにGPUPostGISとGiSTインデックス対応についてご紹介させていただこうかと。

何をやりたいか?

元々、PG-StromではIoT/M2M領域でありがちな『時間経過とともに溜まっていくログデータの処理、分析』にフォーカスを当てていたが、携帯電話や自動車といった移動体デバイスの生成するログを考えると、その中にGPSの生成する『緯度、経度』情報が含まれる事もしばしばである。
ホンマに商売になるのか分からないところではあったが、こういったデータをPostGISを使って分析したい、という問合せはちょくちょくもらっていたので、相応に需要はあるのだろうという事で、春先からGPUPostGIS関数の実装を進めていた。

ターゲットとするワークロードは以下のようなもので、携帯電話や自動車から収集したログデータ(これには、典型的にはデバイスID、タイムスタンプ、緯度・経度、その他の属性が含まれる)と、例えばプッシュ広告を配信する地域や、事故・渋滞情報を配信するエリアを定義するエリア情報を突合するという処理を想定している。
つまり、単純化すると多角形領域の中に含まれる点を抽出するという処理になる。

これをナイーブに実装するだけであれば、st_containsst_dwithinといった関数をGPUで実装するだけで*1よいのだが、例えば10万個の多角形領域 × 1000万個の座標データの組み合わせを検査するとなればその組み合わせは1兆通りにのぼり、1枚あたり数千コアを搭載するGPUといえども、なかなかにしんどい処理となる。

こういったケースで絞り込みを効率的に行うため、PostgreSQL/PostGISにはGiSTインデックスという仕組みが存在する。

GiSTインデックス(R木)

位置データの場合、緯度・経度という二次元的な広がりがあるため、ある値が別の値より大きいのか、小さいのかという関係を定義する事ができない。
そのため、値の大小関係を利用して目的の要素へと直線的に木構造を降下していくB木のようにはいかないが、それでも効率的に絞り込みを行うためR木と呼ばれるインデックス構造を作る事ができる。

R木インデックスでは、多角形要素(Polygon)はどれだけ複雑な形状を持っていようとも、その形状を完全に包含する長方形(Bounding-Box)であるとして扱われる。ある点要素がその長方形の中に含まれていれば多角形要素の中に含まれる可能性があるが、その長方形の外側であれば、複雑な多角形要素との当たり判定を行うまでもなく『包含されていない』と結果を返す事ができる。

以下の図のケースでは、R木インデックスのRoot要素であるR1はその子要素であるR3、R4、R5を完全に包含し、R2は同じくその子要素であるR6、R7を完全に包含する長方形として定義されてる。
もし以下の図の位置が検索キーとして与えられた場合、R1およびR2は共に検索キーを包含し、その子要素をチェックする事となる。

次に、R1の子要素R3、R4、R5と検索キーの重なりを検査する。すると、R4は検索キーを包含するが、R3とR5は検索キーを包含しない。
したがって、R3の子要素R8、R9、R10およびR5の子要素R13、R14はそもそもチェックするまでもなく、検索キーを包含しないという事になる。

次に、R4の子要素R11とR12と検索キーの重なりを検査する。すると、R12は検索キーを包含するが、R11は包含しない。
この階層はR木のLeaf要素であるのでこれ以上探索が深くなることはなく、R12がインデックスする多角形要素が本当に検索キーを包含するのかどうか、st_contains関数などを用いてチェックする。

一方、Root要素で検索キーを包含していたR2の子要素R6、R7は共に検索キーを包含しないため、ここでインデックスの探索は打ち切りとなる。

この例では単純化されているが、実際にはR木の一階層を降下する毎に100要素程度の『当たり判定』をシーケンシャルに行っていくため、実はインデックス探索とはいえ、そこそこ計算ヘビーな処理にはなってしまう。
一方でGPU向きの性質としては、突合処理中のR木インデックスはRead-Onlyなデータ構造であるため並列度を上げやすく、その気になれば数千コアを同時に稼働してのインデックス探索を行う事ができる。

簡単なベンチマーク

PG-Stromに実装したGPUPostGISの機能には、多角形や点などジオメトリ要素間の包含関係を判定するst_contains関数も含まれており、これを用いてのエリア定義情報×座標情報の突合処理のベンチマークを行ってみる事にする。

サンプルとして使用するのは、国土地理院の公開している市区町村の形状データと、日本列島を概ね包含する領域にランダムに打った点との突合処理。
この中で、東京都に属する市区町村に含まれるエリアに打たれた点の数を、市区町村ごとにカウントするものとする。

実行するクエリは以下の通り。geo_japanテーブルには市町村の形状データを、fgeopointテーブルにはランダムに生成した点のデータを挿入する。

SELECT n03_001,n03_004,count(*)
  FROM geo_japan j, fgeopoint p
 WHERE st_contains(j.geom, st_makepoint(x,y))
   AND j.n03_001 like '東京都’
GROUP BY n03_001,n03_004;

ランダムなデータの生成は以下の通り。

=# CREATE TABLE
    geopoint (
      gid int primary key,
      x   float8,
      y   float8);
CREATE TABLE
=# INSERT INTO geopoint (SELECT x, pgstrom.random_float(0, 123.0, 154.2),
                                   pgstrom.random_float(0, 20.0, 46.2)
                           FROM generate_series(1,10000000) x);
INSERT 0 10000000
postgres=# SELECT * FROM geopoint LIMIT 4;
 gid |         x          |         y
-----+--------------------+--------------------
   1 |  133.5737876430963 | 23.438477765972948
   2 | 133.47253950874904 | 22.512966607004856
   3 |  136.9879882356096 |  22.22637613640464
   4 |  126.3652188637132 | 27.186177021165463
(4 rows)

GPU側はこれと同じ内容をGPUメモリストアに挿入する。
GPUメモリストアに関しては、後日、PostgreSQL Advent Calendar 2020の16日目でも記載する予定ですので、そちらをご覧あれ。

=# CREATE FOREIGN TABLE
    fgeopoint (
      gid int,
      x   float,
      y   float)
    SERVER gstore_fdw
    OPTIONS (gpu_device_id '0',
             base_file '/opt/nvme/fgeopoint.base',
             redo_log_file '/opt/pmem/fgeopoint.redo',
             max_num_rows '12000000',
             primary_key 'gid');
CREATE FOREIGN TABLE
=# INSERT INTO fgeopoint (SELECT * FROM geopoint);
INSERT 0 10000000

これで準備が整った。さぁ、実行してみる事にしよう。
先ずはCPUのみのケース。予め並列度を上げておく。

=# set pg_strom.enabled = off;
SET
=# set max_parallel_workers_per_gather = 99;
SET
=# SELECT n03_001,n03_004,count(*)
  FROM geo_japan j, geopoint p
 WHERE st_contains(j.geom, st_makepoint(x,y))
   AND j.n03_001 like '東京都'
GROUP BY n03_001,n03_004;
 n03_001 |  n03_004   | count
---------+------------+-------
 東京都  | あきる野市 |    90
 東京都  | 三宅村     |    53
 東京都  | 三鷹市     |    14
    :            :
 東京都  | 青ヶ島村   |     4
 東京都  | 青梅市     |   109
(63 rows)

Time: 30539.097 ms (00:30.539)

約30秒と言ったところ。

なおEXPLAIN ANALYZEを見てみると、応答時間30.7秒のうち30.68秒をNested Loop+Index Scanの箇所で消費しており、ここがワークロードの肝である事がわかる。

postgres=# EXPLAIN ANALYZE
SELECT n03_001,n03_004,count(*)
  FROM geo_japan j, geopoint p
 WHERE st_contains(j.geom, st_makepoint(x,y))
   AND j.n03_001 like '東京都'
GROUP BY n03_001,n03_004;
                                                                                QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Finalize GroupAggregate  (cost=7483675086.12..7483829745.31 rows=4858 width=29) (actual time=30709.855..30710.080 rows=63 loops=1)
   Group Key: j.n03_001, j.n03_004
   ->  Gather Merge  (cost=7483675086.12..7483829550.99 rows=19432 width=29) (actual time=30709.838..30732.270 rows=244 loops=1)
         Workers Planned: 4
         Workers Launched: 3
         ->  Partial GroupAggregate  (cost=7483674086.06..7483826236.39 rows=4858 width=29) (actual time=30687.466..30687.572 rows=61 loops=4)
               Group Key: j.n03_001, j.n03_004
               ->  Sort  (cost=7483674086.06..7483712111.50 rows=15210175 width=21) (actual time=30687.452..30687.475 rows=638 loops=4)
                     Sort Key: j.n03_001, j.n03_004
                     Sort Method: quicksort  Memory: 73kB
                     Worker 0:  Sort Method: quicksort  Memory: 74kB
                     Worker 1:  Sort Method: quicksort  Memory: 74kB
                     Worker 2:  Sort Method: quicksort  Memory: 76kB
                     ->  Nested Loop  (cost=0.41..7481859623.72 rows=15210175 width=21) (actual time=71.496..30686.278 rows=638 loops=4)
                           ->  Parallel Seq Scan on geopoint p  (cost=0.00..88695.29 rows=2500029 width=16) (actual time=0.012..207.553 rows=2500000 loops=4)
                           ->  Index Scan using geo_japan_geom_idx on geo_japan j  (cost=0.41..2992.66 rows=1 width=1868) (actual time=0.012..0.012 rows=0 loops=10000000)
                                 Index Cond: (geom ~ st_makepoint(p.x, p.y))
                                 Filter: (((n03_001)::text ~~ '東京都'::text) AND st_contains(geom, st_makepoint(p.x, p.y)))
                                 Rows Removed by Filter: 0
 Planning Time: 0.156 ms
 Execution Time: 30732.422 ms
(21 rows)

次に、GPUによるGiSTインデックスの探索を試してみる。
使用したのは、先週届いたばかりの最新鋭モデル、NVIDIA A100。

こやつを利用して実行した結果がコレ。

=# SELECT n03_001,n03_004,count(*)
  FROM geo_japan j, fgeopoint p
 WHERE st_contains(j.geom, st_makepoint(x,y))
   AND j.n03_001 like '東京都'
GROUP BY n03_001,n03_004;
 n03_001 |  n03_004   | count
---------+------------+-------
 東京都  | あきる野市 |    90
 東京都  | 三宅村     |    53
    :            :
 東京都  | 青ヶ島村   |     4
 東京都  | 青梅市     |   109
(63 rows)

Time: 316.673 ms

どやっ!!316msで100倍近い高速化!

EXPLAIN ANALYZEを覗いてみると、GpuJoinのdepth=1でGpuGiSTJoinが選択され、13.28MBのGiSTインデックスをGPUへロードして結合処理を行っている事がわかる。

=# EXPLAIN ANALYZE
SELECT n03_001,n03_004,count(*)
  FROM geo_japan j, fgeopoint p
 WHERE st_contains(j.geom, st_makepoint(x,y))
   AND j.n03_001 like '東京都'
GROUP BY n03_001,n03_004;
                                                                QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------
 GroupAggregate  (cost=6141933.29..6142042.59 rows=4858 width=29) (actual time=329.118..329.139 rows=63 loops=1)
   Group Key: j.n03_001, j.n03_004
   ->  Sort  (cost=6141933.29..6141945.43 rows=4858 width=29) (actual time=329.107..329.110 rows=63 loops=1)
         Sort Key: j.n03_001, j.n03_004
         Sort Method: quicksort  Memory: 29kB
         ->  Custom Scan (GpuPreAgg)  (cost=6141575.10..6141635.83 rows=4858 width=29) (actual time=328.902..328.911 rows=63 loops=1)
               Reduction: Local
               Combined GpuJoin: enabled
               ->  Custom Scan (GpuJoin) on fgeopoint p  (cost=3759781.06..6712457.81 rows=60840000 width=21) (never executed)
                     Outer Scan: fgeopoint p  (cost=0.00..100000.00 rows=10000000 width=16) (never executed)
                     Depth 1: GpuGiSTJoin(plan nrows: 10000000...60840000, actual nrows: 10000000...2553)
                              HeapSize: 7841.91KB (estimated: 3113.70KB), IndexSize: 13.28MB
                              IndexFilter: (j.geom ~ st_makepoint(p.x, p.y)) on geo_japan_geom_idx
                              Rows Fetched by Index: 4952
                              JoinQuals: st_contains(j.geom, st_makepoint(p.x, p.y))
                     ->  Seq Scan on geo_japan j  (cost=0.00..8928.24 rows=6084 width=1868) (actual time=0.164..17.723 rows=6173 loops=1)
                           Filter: ((n03_001)::text ~~ '東京都'::text)
                           Rows Removed by Filter: 112726
 Planning Time: 0.344 ms
 Execution Time: 340.415 ms
(20 rows)

まとめ

ワークロードによってはPostGIS関数の処理を大幅に高速化できる可能性があるので、PG-StromとGPUPostGIS、ぜひ使ってください。

*1:いや、そこそこ大変ではあるが…。

mysql2arrowでMySQLからデータを抜く

以前からPG-Stromのパッケージにpg2arrowというユーティリティを同梱しており、これを使うと、PostgreSQLに投げたクエリからApache Arrow形式のファイルを作成する事ができる。

kaigai.hatenablog.com
qiita.com

昨年、当初のバージョンを作った時から、内部的には色々ゴチャゴチャ変わっていて*1、Arrow_Fdwとコードを共有するための改良や、RDBMSへの接続に固有の部分だけを別ファイルに切り出すという事をやっていた。
これは、PostgreSQLだけをデータソースにするのではなく、Webアプリやゲームの業界でよく使われる MySQL や、将来的にはNoSQLなどへも簡易に対応できるようにという意味での基礎工事のようなものである。今回はまず、これを MySQL に対応させてみた。

MySQLからWebアプリやゲームのログ情報を Apache Arrow 形式で抜き出し、これを単純なファイルとして NVME-SSD 上のボリュームに保存する。
これらのファイルを Arrow_Fdw 外部テーブルを用いて PostgreSQLマッピングすれば、解析系DBにわざわざデータを再度インポートしなくても、Webアプリやゲームのログを集計処理や異常検知に回す事ができるようになる。
加えて、PG-StromであればArrow_Fdw外部テーブルに対してSSD-to-GPU Direct SQLを実行する事ができるので、きちんとシステムを設計してやれば、秒速で10億レコード超を処理する事だって不可能ではない。

使い方自体はそれほど複雑なものではない。
大半のオプションが pg2arrow と共通*2で、今回の機能強化に合わせて-tオプションを追加した程度である。

$ mysql2arrow --help
Usage:
  mysql2arrow [OPTION] [database] [username]

General options:
  -d, --dbname=DBNAME   Database name to connect to
  -c, --command=COMMAND SQL command to run
  -t, --table=TABLENAME Table name to be dumped
      (-c and -t are exclusive, either of them must be given)
  -o, --output=FILENAME result file in Apache Arrow format
      --append=FILENAME result Apache Arrow file to be appended
      (--output and --append are exclusive. If neither of them
       are given, it creates a temporary file.)

Arrow format options:
  -s, --segment-size=SIZE size of record batch for each

Connection options:
  -h, --host=HOSTNAME  database server host
  -p, --port=PORT      database server port
  -u, --user=USERNAME  database user name
  -P, --password=PASS  Password to use when connecting to server

Other options:
      --dump=FILENAME  dump information of arrow file
      --progress       shows progress of the job
      --set=NAME:VALUE config option to set before SQL execution
      --help           shows this message

Report bugs to <pgstrom@heterodb.com>.

簡単な例でデータを抽出してみる。
なお-tオプションは、SELECT * FROM tablenameの省略形。

$ mysql2arrow -d mysql -u root -t t1 -o /dev/shm/hoge.arrow

生成された Apache Arrow ファイルのスキーマ定義、データの配置はこんな感じ

$ mysql2arrow --dump /dev/shm/hoge.arrow
[Footer]
{Footer: version=V4, schema={Schema: endianness=little, fields=[{Field: name="id", nullable=true, type={Int32}, children=[], custom_metadata=[]}, {Field: name="a", nullable=true, type={Int32}, children=[], custom_metadata=[]}, {Field: name="b", nullable=true, type={Float64}, children=[], custom_metadata=[]}, {Field: name="c", nullable=true, type={Utf8}, children=[], custom_metadata=[]}, {Field: name="d", nullable=true, type={Timestamp: unit=sec}, children=[], custom_metadata=[]}], custom_metadata=[{KeyValue: key="sql_command" value="SELECT * FROM t1"}]}, dictionaries=[], recordBatches=[{Block: offset=472, metaDataLength=360 bodyLength=60480}]}
[Record Batch 0]
{Block: offset=472, metaDataLength=360 bodyLength=60480}
{Message: version=V4, body={RecordBatch: length=1000, nodes=[{FieldNode: length=1000, null_count=0}, {FieldNode: length=1000, null_count=26}, {FieldNode: length=1000, null_count=18}, {FieldNode: length=1000, null_count=0}, {FieldNode: length=1000, null_count=17}], buffers=[{Buffer: offset=0, length=0}, {Buffer: offset=0, length=4032}, {Buffer: offset=4032, length=128}, {Buffer: offset=4160, length=4032}, {Buffer: offset=8192, length=128}, {Buffer: offset=8320, length=8000}, {Buffer: offset=16320, length=0}, {Buffer: offset=16320, length=4032}, {Buffer: offset=20352, length=32000}, {Buffer: offset=52352, length=128}, {Buffer: offset=52480, length=8000}]}, bodyLength=60480}

Python (PyArrow) で読み込んでみるとこんな感じですね。

$ python
Python 3.6.8 (default, Oct  7 2019, 17:58:22)
[GCC 8.2.1 20180905 (Red Hat 8.2.1-3)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyarrow as pa
>>> f = pa.ipc.open_file('/dev/shm/hoge.arrow')
>>> f.get_record_batch(0).to_pandas()
       id      a           b                                 c                   d
0       1  750.0  884.851090  c4ca4238a0b923820dcc509a6f75849b 2020-11-25 17:14:56
1       2  962.0  373.533847  c81e728d9d4c2f636f067f89cc14862c 2019-03-26 01:19:29
2       3  287.0  384.895995  eccbc87e4b5ce2fe28308fd9f2a7baf3 2018-11-04 21:55:32
3       4  573.0  890.063600  a87ff679a2f3e71d9181a67b7542122c 2023-05-14 15:24:37
4       5  948.0  778.885925  e4da3b7fbbce2345d7772b0674a318d5 2023-01-18 16:41:12
..    ...    ...         ...                               ...                 ...
995   996 -295.0  424.007169  0b8aff0438617c055eb55f0ba5d226fa 2017-08-15 01:40:31
996   997 -849.0  648.545034  ec5aa0b7846082a2415f0902f0da88f2 2023-05-23 08:58:55
997   998  530.0  865.244230  9ab0d88431732957a618d4a469a0d4c3 2024-07-20 14:13:06
998   999  244.0   96.534528  b706835de79a2b4e80506f582af3676a 2018-08-10 01:42:04
999  1000  997.0  157.958900  a9b7ba70783b617e9998dc4dd82eb3c5 2016-12-27 08:06:44

[1000 rows x 5 columns]

ビルドは PG-Strom のモジュールと一緒にやればよいのですが、mysql-develパッケージをインストールしていない人もいるという想定で((なおpostgresql-develパッケージは全人類がインストールするという想定で))、makeの実行時にWITH_MYSQL2ARROW=1を付加します。

$ make WITH_MYSQL2ARROW=1
gcc -D__MYSQL2ARROW__=1 -D_GNU_SOURCE -g -Wall -I ../src -I ../utils -I /usr/local/pgsql-11/include/server -I/usr/include/mysql -m64  -L/usr/lib64/mysql  -Wl,-rpath,-L/usr/lib64/mysql ../utils/sql2arrow.c ../utils/mysql_client.c ../src/arrow_nodes.c ../src/arrow_write.c -o ../utils/mysql2arrow -lmysqlclient

追記モードで異常終了した時のファイルの回復

もう一点。MySQL対応にするついでに、以前からあった設計上の問題の修正を行っている。

pg2arrowmysql2arrow--appendを指定し、追記モードでSQLの処理結果をApache Arrowファイルに追加する場合、以前のエントリで紹介したように、ファイル末尾のフッター領域を上書きして新しいデータを追加し、最後にフッター領域を再構築する。

kaigai.hatenablog.com

この時、SQLの異常終了やコマンド自体のバグによってプロセスが異常終了してしまったら、元々のApache Arrowファイルが破損したまま残ってしまう事になっていた。
これを修正するため、最新版では元々のApache Arrowファイルのフッタ領域の内容(このサイズ自体は大した量ではないので)を別の領域に退避し、シグナルハンドラとon_exit()ハンドラを用いて、終了コード 0 以外でプロセスが exit したり、SIGSEGVやSIGBUSを受け取った場合にはこれを元の位置に書き戻すという処理を行っている。

例えば、6GB程度の大きさがあるテーブル t0 から100行だけ取り出す。これは生成された Apache Arrow ファイルも5kB程度のもの。

$ pg2arrow -d postgres -c "SELECT * FROM t0 LIMIT 100" -o /dev/shm/monu.arrow
$ ls -l /dev/shm/monu.arrow
-rw-r--r--. 1 kaigai users 4934 Mar 25 12:41 /dev/shm/monu.arrow

ここに、今度はテーブル全体を追記中にコマンドを ctrl-c で中断してみる事にする。

$ pg2arrow -d postgres -c "SELECT * FROM t0" --append /dev/shm/monu.arrow
^C

別ターミナルでファイルの大きさを観察してみると、確かに途中までデータが書き込まれ、順調にApache Arrowファイルが肥大化している事が分かるが、pg2arrowの異常終了後、最終的には元の大きさに戻っている。

$ ls -lh /dev/shm/monu.arrow
-rw-r--r--. 1 kaigai users 769M Mar 25 12:46 /dev/shm/monu.arrow
$ ls -lh /dev/shm/monu.arrow
-rw-r--r--. 1 kaigai users 1.1G Mar 25 12:47 /dev/shm/monu.arrow
$ ls -lh /dev/shm/monu.arrow
-rw-r--r--. 1 kaigai users 1.3G Mar 25 12:47 /dev/shm/monu.arrow
$ ls -l /dev/shm/monu.arrow
-rw-r--r--. 1 kaigai users 4934 Mar 25 12:47 /dev/shm/monu.arrow

PyArrowで当該ファイルをオープンしてみても、元通り100行のデータを含む Apache Arrow ファイルである。

$ python
>>> import pyarrow as pa
>>> f = pa.ipc.open_file('/dev/shm/monu.arrow')
>>> f.get_record_batch(0).num_rows
100
>>> f.num_record_batches
1
>>> f.get_record_batch(0).num_rows
100

SIGKILLで強制終了した場合など救えないケースもあるが、一応、こういった運用面での安定性に寄与する機能も強化されているという事で。

*1:リファクタリングと呼ぼう!

*2:PostgreSQL系コマンドは-Wでパスワードのプロンプトを出すが、MySQL系はコマンドラインでパスワードを与えるお作法のよう

Writable Arrow_Fdwと、PL/CUDAがお払い箱になる話

昨年ラストのブログ記事は、pg2arrowに--appendモードを付けてApache Arrowファイルへの追記を行うというトピックだった。

kaigai.hatenablog.com

実は内部的には、PG-StromのArrow_Fdwとpg2arrowのコードは大半を共有していて*1、入り口がスタンドアロンlibpqを使うツールなのか、PostgreSQLのFDW APIなのかという程度の違いしかない。
そこで、Arrow_Fdw外部テーブルに対してINSERT文を実行できるようにして、PostgreSQL側でもApache Arrowファイルへの追記をできるようにしてみた。これは後述の、Python向け各種モジュールとのデータ交換を目的とした機能強化である。

Writable Arrow_Fdw

Arrow_Fdw外部テーブルを書き込み可能にするには、テーブルオプションに writable を付与する。

=# CREATE FOREIGN TABLE ft (
  id    int,
  x     real,
  y     real,
  z     real
) SERVER arrow_fdw
  OPTIONS (file '/dev/shm/ft.arrow', writable 'true');
CREATE FOREIGN TABLE

外部テーブルを定義する時点でfileで指定する Apache Arrow ファイルが存在している必要はないが、writableオプションを指定した場合は、外部テーブルの背後に複数の Apache Arrow ファイルを配置する事はできない。これは1個でないと、どのファイルに書き込むべきかを特定できないため。

ひとまず 1,000 行ほどデータを挿入してみる。

=# INSERT INTO ft (
  SELECT x, pgstrom.random_float() * 100.0,
            pgstrom.random_float() * 100.0,
            pgstrom.random_float() * 100.0
    FROM generate_series(1,1000) x);
INSERT 0 1000

このように、指定したパスに Apache Arrow ファイルが作成され、1,000行分のデータが書き込まれている事がわかる。

$ ls -l /dev/shm/ft.arrow
-rw-------. 1 kaigai users 17166 Feb 17 18:04 /dev/shm/ft.arrow
$ ./utils/pg2arrow --dump /dev/shm/ft.arrow
[Footer]
{Footer: version=V4, schema={Schema: endianness=little, fields=[{Field: name="id", nullable=true, type={Int32}, children=[], custom_metadata=[]}, {Field: name="x", nullable=true, type={Float32}, children=[], custom_metadata=[]}, {Field: name="y", nullable=true, type={Float32}, children=[], custom_metadata=[]}, {Field: name="z", nullable=true, type={Float32}, children=[], custom_metadata=[]}], custom_metadata=[]}, dictionaries=[], recordBatches=[{Block: offset=352, metaDataLength=296 bodyLength=16128}]}
[Record Batch 0]
{Block: offset=352, metaDataLength=296 bodyLength=16128}
{Message: version=V4, body={RecordBatch: length=1000, nodes=[{FieldNode: length=1000, null_count=0}, {FieldNode: length=1000, null_count=0}, {FieldNode: length=1000, null_count=0}, {FieldNode: length=1000, null_count=0}], buffers=[{Buffer: offset=0, length=0}, {Buffer: offset=0, length=4032}, {Buffer: offset=4032, length=0}, {Buffer: offset=4032, length=4032}, {Buffer: offset=8064, length=0}, {Buffer: offset=8064, length=4032}, {Buffer: offset=12096, length=0}, {Buffer: offset=12096, length=4032}]}, bodyLength=16128}

なお、PostgreSQLのFDWモジュールとして動作するからには、トランザクション制御の諸々に従う必要がある。
以下のように未コミットの書き込みに関しては、ABORTROLLBACKで取り消す事ができる。
ただし、効率的にトランザクションを実装するため、INSERTを実行できるのは同時に1トランザクションのみ。要は、少し強めのShareRowExclusiveLockを取っているので、その辺はご注意を。

postgres=# SELECT count(*) FROM ft;
 count
-------
  1000
(1 row)

postgres=# BEGIN;
BEGIN
postgres=# INSERT INTO ft (
  SELECT x, pgstrom.random_float() * 100.0,
            pgstrom.random_float() * 100.0,
            pgstrom.random_float() * 100.0
    FROM generate_series(1,300) x);
INSERT 0 300
postgres=# SELECT count(*) FROM ft;
 count
-------
  1300
(1 row)

postgres=# ABORT;
ROLLBACK
postgres=# SELECT count(*) FROM ft;
 count
-------
  1000
(1 row)

PL/CUDAがお払い箱になる話

Arrow_FdwのREAD系に関しては列データによるGPUへの高速なデータ供給という設計意図があるのだが、WRITE系に関しては少し異なる思惑がある。

f:id:kaigai:20200217182318p:plain

バイスから集まってくるログデータ、いわゆるIoT/M2M系のワークロードを処理する事を考えると、生データは割と簡単にTB級のデータサイズに膨れ上がってしまい、何らかの集計や前処理を行わないと機械学習・統計解析のエンジンに渡す事ができない。少なくともGBの単位まで落とす必要はあるだろうと考えており、おそらくその辺は、既存のSSD-to-GPU Direct SQLの役割となる。
問題は、前処理を終えたデータで、これを Python スクリプトで動く機械学習エンジンに渡す時に、いったんCSVで吐き出してから再度 Text -> Binary 変換というのは効率が悪い。できればバイナリのまま受け渡す方が効率的で、スマートであろう。

新たに追加した関数、pgstrom.arrow_fdw_export_cupy()を使えば、Arrow_Fdw外部テーブルに格納されたデータのうち、指定された列だけを抽出してcuPyのndarrayと呼ばれるデータフレームと同じ形式でGPUバイスメモリにロードする事ができる。この関数はGPUバイスメモリを外部からマップするための識別子を返すので、これを利用すれば、Zero-copyで*2PostgreSQLPythonスクリプトの間のデータ交換が可能になる。

加えて、PostgreSQLにはPL/Pythonという、Python言語でユーザ定義関数を記述するための機能があり、これを利用すれば、元々PL/CUDAでユーザにベタっとCUDA Cのコードを書いてもらっていた*3ところを、もっと一般的な Python + cuPy という形で代替できる。

cuPyにもカスタムGPUカーネルを記述する機能があり、cupy.RawKernelクラスを利用する。この人は裏でNVRTCを利用して、GPUカーネルの実行時コンパイルができるので、感覚としてはLL言語スクリプトを書くのとあまり大差ない。

以下にコードのサンプルを置いてみる。
このSQL関数は、①PL/Pythonを利用してcuPyの機能を呼び出し、②Arrow_Fdwからロードしたデータフレームの値の平均値を列ごとに導出する。

CREATE OR REPLACE FUNCTION custom_average(x_ident text)
RETURNS float[] AS
$$
import cupy
import cupy_strom

X = cupy_strom.ipc_import(x_ident)
nattrs = X.shape[0]
nitems = X.shape[1]
gridSz = (nitems + 2047) >> 11;

Y = cupy.zeros((nattrs))

source='''
extern "C" __global__
           __launch_bounds__(1024)
void
kern_gpu_sum(double *y, const float *x, int nitems)
{
    __shared__ float lvalues[2048];
    int     gridSz = (nitems + 2047) / 2048;
    int     colIdx = blockIdx.x / gridSz;
    int     rowBase = (blockIdx.x % gridSz) * 2048;
    int     localId = 2 * threadIdx.x;
    int     i, k;

    // Load values to local shared buffer
    x += colIdx * nitems;
    for (i=threadIdx.x; i < 2048; i+=blockDim.x)
        lvalues[i] = (rowBase + i < nitems ? x[rowBase + i] : 0.0);
    __syncthreads();

    // Run reduction operations
    for (k=0; k < 11; k++)
    {
        int     mask = ((1 << k) - 1);

        if ((threadIdx.x & mask) == 0)
            lvalues[localId] += lvalues[localId + (1<<k)];
        __syncthreads();
    }
    // Write back the total sum
    if (threadIdx.x == 0)
        atomicAdd(&y[colIdx], lvalues[0]);
}
'''
kern = cupy.RawKernel(source, 'kern_gpu_sum')
kern.__call__((gridSz * nattrs,0,0),
              (1024,0,0),
              (Y,X,nitems))
X = 0   # unmap GPU memory

return Y / nitems
$$ LANGUAGE 'plpython3u';

細かい説明は省略するが、入力値Xを2048要素ごとに領域分割し、各領域ごとに1024個のスレッドが協調して11ステップで総和を計算し、出力バッファYに書き出すというモノである。

分かりやすいように、x列、y列、z列の値がそれぞれ異なる分布を取るように初期化する。

=# INSERT INTO ft (SELECT x, pgstrom.random_int(0,1,10000)::float/100.0,
                             pgstrom.random_int(0,-7500,2500)::float/100.0,
                             pgstrom.random_int(0,5000,15000)::float/100.0
                     FROM generate_series(1,1000000) x);
=# SELECT avg(x), avg(y), avg(z) FROM ft;
       avg        |        avg        |       avg
------------------+-------------------+-----------------
 49.9972925601087 | -24.9707353391815 | 99.982088626751
(1 row)

で、件のPL/Pythonユーザ定義関数を呼び出す。
pgstrom.arrow_fdw_export_cupyがftテーブルのx列、y列、z列を抽出して作成したGPUバッファの識別子を、そのままPL/Pythonユーザ定義関数に入力し、GPUカーネルを呼び出して平均値を計算している。

=# SELECT custom_average(pgstrom.arrow_fdw_export_cupy('ft','{x,y,z}'::text[]));
                    custom_average
------------------------------------------------------
 {49.9972926015625,-24.9707353193359,99.982088671875}
(1 row)

上記のように同じ結果が出力された事と、もう一点、PL/CUDAと同じようにユーザ定義のGPUカーネル関数をPL/Python + cuPyの組み合わせで実行できることが実証できた。
イマドキだと、なかなかCUDA CでベタにGPUカーネルを書くという人は少ないらしいので、それよりはより間口の広いPython環境の道具を使えるようにしつつ、必要に応じてPL/CUDA相当のカリカリチューニングなコードを書けるようにする、というのがベターな方向性であろう。

今後は、PL/CUDAからPL/Python + cuPyやその他のPython向けモジュールという形態を推奨するようにしたい。

*1:そりゃそうだ

*2:厳密には外部テーブル⇒GPUへのデータロードが最初の一回だけ

*3:いや、しかし、PL/CUDAのコードを書いていた人なんて自分意外にいるの?いるの?

Dive into Apache Arrow(その4)- pg2arrow で追記モード

先日、Apache Arrow 東京ミートアップ 2019というイベントに参加させていただいた。

発表時の様子(photo by 畔勝さん)

発表自体は、SSD-to-GPU Direct SQLからArrow_Fdw、4GPU+16SSDによる最近のベンチマークの紹介などで、目新しいものというよりは総集編であるが、懇親会の際にこんな質問をいただいた。

Apache Arrowファイルの追記ってどうやってるんですか?』

曰く、FluentdのApache Arrow出力用プラグインにおいて、集積したログの書き出しのタイミングとその際のI/O負荷の問題だそうな。
確かに Apache Arrow は列志向なので、一見、既に100万行分のデータを保持しているファイルに、新たに10万行のデータを追加するには、ファイル全体の再編成と書き直しが必要に見えるかもしれない。
しかし、この人の内部フォーマットをよく読んでみると、データの追記に関してはかなり低いコストで実行する事ができるように設計されており、上の例で言えば、追記処理に関しては『既に書き込んだ100万行は全く変更せず、10万行分の書き込み+α程度』のコストで実行できる。

今回は、PostgreSQLのテーブル(クエリ結果)を Apache Arrow ファイルとして保存する pg2arrow コマンドにこの追記モードを付けてみたという話を書いてみる事にする。

内部データ構造 Record Batch

今年の頭に、pg2arrowArrow_Fdwの機能を実装するために Apache Arrow のファイル形式を調査してみた。割と読みごたえのあるバイナリであるが、追記処理を実装するにあたって重要なポイントは一つ。『Arrowファイルは尻から読む
kaigai.hatenablog.com

Apache Arrowファイルの内部構造を順に追っていくと、ざっくり、先頭から以下の順に並んでいる。

  • ヘッダ("ARROW1\0\0"
  • スキーマ定義(列の名前やデータ型など)
  • ディクショナリ・バッチ(辞書圧縮用の辞書;複数)
  • レコード・バッチ(N件分のデータを列形式で記録;複数)
  • フッタ(スキーマ定義の複製と、ディクショナリ/レコードバッチの位置情報)

意外かもしれないが、例えば100万件分のデータ要素を持つApache Arrow形式ファイルであっても、内部のデータ構造は100万個のXXX型配列が並んでいる・・・とは必ずしも言い切れない。(もちろん、そういう編成にもできるが)
Apache Arrow形式ファイルの内側には、任意の件数のデータを列形式で束ねたレコード・バッチ(Record Batch)と呼ばれる領域がある。例えば1万件ごとにRecord Batchで切るとすると、Record Batchの内側にはA列の要素が1万個、B列の要素が1万個、C列の要素が1万個・・・と並んでおり、全体で100万件なら、このRecord Batchが100個順番に並んでいるという形になる。
もちろん、100万件のデータを持つRecord Batchを1個で、総計100万件のデータを含むApache Arrowファイルを構成する事もできる。この場合は、単純にA列の要素が100万個、B列の要素が100万個・・・と配置される事になる。

Arrowファイルは尻から読む

Apache Arrow形式ファイルの『どこからどこまで』がレコード・バッチなのかという情報は、ファイルの一番最後、フッタ領域に書かれている。例えば『ファイルの先頭から1200バイト、以降10MBはRecord Batch-0である』というノリである。*1

さて、このフッタ領域は、レコード・バッチの並びの直後に存在する。
例えば、レコード・バッチを100個持つApache Arrowファイルのフッタには、レコード・バッチ領域を指し示す(オフセット・サイズ)の組が100個書き込まれているわけだが、ここで100番目のレコード・バッチの直後に101番目のレコード・バッチを追加してみる事にする。
そうすると、フッタ領域は当然ながら上書きされてしまう。南無。
そしてその後ろに(オフセット・サイズ)の組を101個持つフッタを新たに書き込んでやると、ファイルの前半への書き込み操作を一切行う事なく、Apache Arrow形式ファイルへの追記操作を行えることになる。

これを、pg2arrowコマンドに実装してみた。

pg2arrowによるテーブルのダンプ(Arrow形式)

pg2arrowコマンドは、PG-Stromのユーティリティとして配布しているコマンドで、pg_dumpに似たノリでSQLの実行結果をPostgreSQLからダンプし、Apache Arrow形式ファイルとして保存するためのツールである。
使い方は以下の通り。

$ ./pg2arrow --help
Usage:
  pg2arrow [OPTION]... [DBNAME [USERNAME]]

General options:
  -d, --dbname=DBNAME     database name to connect to
  -c, --command=COMMAND   SQL command to run
  -f, --file=FILENAME     SQL command from file
      (-c and -f are exclusive, either of them must be specified)
  -o, --output=FILENAME   result file in Apache Arrow format
      --append=FILENAME   result file to be appended

      --output and --append are exclusive to use at the same time.
      If neither of them are specified, it creates a temporary file.)

Arrow format options:
  -s, --segment-size=SIZE size of record batch for each
      (default: 256MB)

Connection options:
  -h, --host=HOSTNAME     database server host
  -p, --port=PORT         database server port
  -U, --username=USERNAME database user name
  -w, --no-password       never prompt for password
  -W, --password          force password prompt

Debug options:
      --dump=FILENAME     dump information of arrow file
      --progress          shows progress of the job.

Report bugs to <pgstrom@heterodb.com>.

ある程度複雑なデータ構造の方が面白いので、列挙型(Enum)、複合型を含むテーブルを作成し、テストデータを投入してみた。

postgres=# create type label as enum ('Tokyo','Osaka','Nagoya','Yokohama','Kyoto');
CREATE TYPE
postgres=# create type comp as (x int, y real, z text);
CREATE TYPE
postgres=# create table t (id int, a float, b numeric, c comp, d label, e timestamp, f text);
CREATE TABLE

postgres=# insert into t (select x, 1000*pgstrom.random_float(2),
                                    1000*pgstrom.random_float(2),
                                    null, /* set later */
                                    (case (5*random())::int when 0 then 'Tokyo'
                                                            when 1 then 'Osaka'
                                                            when 2 then 'Nagoya'
                                                            when 3 then 'Yokohama'
                                                            when 4 then 'Kyoto'
                                                            else null end)::label,
                                    pgstrom.random_timestamp(2),
                                    md5(x::text)
                            from generate_series(1,1000) x);
INSERT 0 1000
postgres=# update t set c.x = pgstrom.random_int(2,-1000,1000),
                        c.y = 1000*pgstrom.random_float(2),
                        c.z = 'ROW#' || id::text;
UPDATE 1000

pgstrom.random_xxxx()というのはランダムなデータを時々NULLを混ぜつつ生成してくれる関数。
でき上ったテストデータはこういった感じになっている。

postgres=# select * from t order by id limit 8;
 id |        a         |        b         |          c           |    d     |             e              |                f
----+------------------+------------------+----------------------+----------+----------------------------+----------------------------------
  1 |  793.46183025905 |  718.62576097186 | (649,104.976,ROW#1)  |          | 2016-10-20 02:42:38.101797 | c4ca4238a0b923820dcc509a6f75849b
  2 | 626.670837228499 | 913.748125505516 | (-582,598.061,ROW#2) | Yokohama | 2018-06-29 06:38:45.351404 | c81e728d9d4c2f636f067f89cc14862c
  3 | 862.318314082137 | 810.705138747909 | (419,382.42,ROW#3)   | Yokohama | 2017-03-19 19:20:20.993358 | eccbc87e4b5ce2fe28308fd9f2a7baf3
  4 | 686.473733599518 |                  | (4,176.449,ROW#4)    | Osaka    | 2022-01-10 23:46:09.343218 | a87ff679a2f3e71d9181a67b7542122c
  5 | 957.214601783647 | 324.905697873284 | (180,320.756,ROW#5)  |          | 2022-08-31 14:58:22.203866 | e4da3b7fbbce2345d7772b0674a318d5
  6 | 284.569805620504 | 32.4126081692114 | (585,601.726,ROW#6)  |          | 2015-09-09 13:00:28.160389 | 1679091c5a880faf6fb5e6087eb1b2dc
  7 | 595.694404372803 | 324.796066770701 | (663,489.07,ROW#7)   | Yokohama | 2019-07-28 00:20:45.679467 | 8f14e45fceea167a5a36dedd4bea2543
  8 | 770.799666070752 | 44.1467431579469 | (603,646.233,ROW#8)  | Osaka    | 2017-07-14 05:46:05.558446 | c9f0f895fb98ab9159f51fd0297e236d
(8 rows)

ゴチャゴチャしているが、c列は数値と文字列の複合型、d列は見た目文字列だが内部的には32bit整数値のEnum型のデータである。

このテーブルをpg2arrowを使ってダンプする。1000件程度なので、レコードバッチは1個だけ。

$ ./pg2arrow -h localhost -d postgres -c "SELECT * FROM t" -o /tmp/hoge.arrow

PythonApache Arrowバインディングである PyArrow を使って中身を検算してみる事にする。

$ python3
Python 3.6.8 (default, Oct  7 2019, 17:58:22)
[GCC 8.2.1 20180905 (Red Hat 8.2.1-3)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyarrow as pa
>>> import pandas as pd
>>> f = pa.ipc.open_file('/tmp/hoge.arrow')
>>> f.schema
id: int32
a: double
b: decimal(11, 30)
c: struct<x: int32, y: float, z: string>
  child 0, x: int32
  child 1, y: float
  child 2, z: string
d: dictionary<values=string, indices=int32, ordered=0>
e: timestamp[us]
f: string

ArrowのStruct型にマップされたPostgreSQLの複合型や、Dictionary付きのString型としてマップされたPostgreSQLの列挙型も含め、スキーマ定義はきちんと反映されている事が分かる。

続いて、中身を確認してみる。レコードバッチの数は一個。

>>> f.num_record_batches
1

Pandasデータフレームに変換してみる

>>> X = f.get_record_batch(0)
>>> Y = X.to_pandas()
>>> Y
       id  ...                                 f
0       1  ...  c4ca4238a0b923820dcc509a6f75849b
1       2  ...  c81e728d9d4c2f636f067f89cc14862c
2       3  ...  eccbc87e4b5ce2fe28308fd9f2a7baf3
3       4  ...  a87ff679a2f3e71d9181a67b7542122c
4       5  ...  e4da3b7fbbce2345d7772b0674a318d5
..    ...  ...                               ...
995   996  ...  0b8aff0438617c055eb55f0ba5d226fa
996   997  ...  ec5aa0b7846082a2415f0902f0da88f2
997   998  ...  9ab0d88431732957a618d4a469a0d4c3
998   999  ...  b706835de79a2b4e80506f582af3676a
999  1000  ...  a9b7ba70783b617e9998dc4dd82eb3c5

一応、md5チェックサムの値(文字列)をぶち込んだf列はSQLで見た値と同じものが入っている。

タイムスタンプ型であるe列の内容をSQLの出力と比較してみる。

>>> Y['e']
0     2016-10-20 02:42:38.101797
1     2018-06-29 06:38:45.351404
2     2017-03-19 19:20:20.993358
3     2022-01-10 23:46:09.343218
4     2022-08-31 14:58:22.203866
                 ...
995   2016-05-12 00:03:53.986811
996   2023-12-15 02:51:58.066008
997   2020-10-29 14:27:54.705099
998   2015-10-02 13:18:13.312924
999   2020-05-06 04:06:50.749883
Name: e, Length: 1000, dtype: datetime64[ns]
postgres=# select id,e from t order by id limit 8;
 id |             e
----+----------------------------
  1 | 2016-10-20 02:42:38.101797
  2 | 2018-06-29 06:38:45.351404
  3 | 2017-03-19 19:20:20.993358
  4 | 2022-01-10 23:46:09.343218
  5 | 2022-08-31 14:58:22.203866
  6 | 2015-09-09 13:00:28.160389
  7 | 2019-07-28 00:20:45.679467
  8 | 2017-07-14 05:46:05.558446
(8 rows)

pg2arrowによる追記

では、今回の新機能 --append モードを試してみる。

同じテーブルを再度追記し、レコードバッチを2個、計2,000行のApache Arrowファイルを作成する事にするが、その前に少し錯乱要因として、列挙型に新しいラベルを追加しておく事にする。

postgres=# alter type label add value 'Kobe';
ALTER TYPE
postgres=# update t set d = 'Kobe' where d is null;
UPDATE 94

今度は、先ほど-o /tmp/hoge.arrowと指定した部分を--append /tmp/hoge.arrowと変えてみる。

$ ./pg2arrow -h localhost -d postgres -c "SELECT * FROM t" --append /tmp/hoge.arrow

PyArrowを介して中身を見てみると、スキーマ定義は前の通り(あたり前)で、レコードバッチが2個に増えている。

$ python3
Python 3.6.8 (default, Oct  7 2019, 17:58:22)
[GCC 8.2.1 20180905 (Red Hat 8.2.1-3)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyarrow as pa
>>> import pandas as pd
>>> f = pa.ipc.open_file('/tmp/hoge.arrow')
>>> f.schema
id: int32
a: double
b: decimal(11, 30)
c: struct<x: int32, y: float, z: string>
  child 0, x: int32
  child 1, y: float
  child 2, z: string
d: dictionary<values=string, indices=int32, ordered=0>
e: timestamp[us]
f: string
>>> f.num_record_batches
2

追加した方のレコードバッチを確認すると、きちんと列挙型であるd列にKobeというラベルが出現している。

>>> X = f.get_record_batch(1)
>>> Y = X.to_pandas()
>>> Y['d']
0          Kobe
1      Yokohama
2      Yokohama
3         Osaka
4          Kobe
         ...
995      Nagoya
996        Kobe
997      Nagoya
998       Kyoto
999    Yokohama
Name: d, Length: 1000, dtype: category
Categories (6, object): [Tokyo, Osaka, Nagoya, Yokohama, Kyoto, Kobe]

もちろん、元から存在する方のレコードバッチで内容が化けたりは、ない。
(PandasではNULL値はNaNと表記されるようだ)

>>> X = f.get_record_batch(0)
>>> Y = X.to_pandas()
>>> Y['d']
0           NaN
1      Yokohama
2      Yokohama
3         Osaka
4           NaN
         ...
995      Nagoya
996         NaN
997      Nagoya
998       Kyoto
999    Yokohama
Name: d, Length: 1000, dtype: category
Categories (6, object): [Tokyo, Osaka, Nagoya, Yokohama, Kyoto, Kobe]

これは何の布石なのか?

元々PG-StromにはGstore_Fdwという機能があり、GPUメモリ上に列ストアを作って、そこにデータをINSERTする事ができるようになっている。
kaigai.hatenablog.com

ただ、これはNVIDIAがRAPIDSフレームワークを発表する前に設計・開発したもので、内部のデータ形式はPG-Stromの独自。Pythonとの連携はcuPyで行うか、あるいはPL/CUDAから参照するしか手がなかった。

もう一つのFDWであるArrow_Fdwは、元々SSD-to-GPU Direct SQLを使うレベルの大量データ処理において、列データ形式を採用する事でより効率的なI/Oを目指したものだが、

  • RAPIDSフレームワーク(cuDF)でも内部データ構造にArrowを使っている
  • データの置き場所を色々選択できるようにしたら、同じコードを流用できる
  • バルクデータの追記のみ(あと無条件DELETE)に限れば、書き込みもさほど無理が無い

という事で、Arrow_Fdwを拡張してGPUバイスメモリ、あるいは容量単価の安い(とされる)Persistent Memoryなどのデータストアとして利用可能とし、Pythonスクリプトなどから容易にデータ連携を可能にするという青写真を描いている。

そうすると、TBを越えてくるような大量データは、先ずSSD-to-GPU Direct SQL + Arrow_Fdwを用いて高速な集計・前処理を実行。SQLでゴニョっと前処理したデータを、今度はGPUバイスメモリ上のArrow_FdwにINSERTすれば、Pythonスクリプト側ではCSVファイルを読んだりすることなく、DBから直接バイナリのデータをインポートできる。
データセットを少し変えてみたくなった場合でも、WHERE句を少しいじれば異なるデータセットに対して機械学習エンジンがきちんと作用するかどうかを確認する事ができ、TRY&ERRORの生産性向上にかなりのレベルで寄与できるのではないかと考えている。

いかがなものだろうか? メリー・クリスマス

*1:この他には、ファイルの先頭にも書かれているスキーマ定義のコピーもフッタ領域に書かれている。おそらく、これはファイルの末尾と先頭を読むためにディスクをシークせずに済ますための工夫であろう

CitusDB + PG-StromでScale-up+outする。

PostgreSQL Advent Calendar 2019の14日目です。

PG-Stromの開発をやってると、しばしば聞かれるのが

『マルチノードの並列処理って対応してるんですか?』

という質問。

まぁ、『対応しておりませんし、対応する予定もございません』という回答になるんですが、別にこれはウチのやる気の問題ではなく、PG-StromはPostgreSQLの拡張モジュールとして設計されているため、並列分散処理に関しては他のメカニズムに任せてしまえばよい、というだけの話である。

そこで、今回は同じくPostgreSQLの拡張モジュールとして実装されているスケールアウト機能の Citus と、PG-Stromを組み合わせてちゃんと動作するんですよという事を検証してみる事にする。

Citusとは?

PostgreSQLにデータ分散と並列処理機構を付加する拡張モジュールで、PostgreSQLの拡張モジュールとして実装されている。*1
この人はコーディネータ+多数のワーカーノードという構成になっており、指定したテーブルを複数のワーカーノードに分割して保存するシャーディング機能を持っている。したがって、テーブルサイズが肥大化しても、一台のワーカーノードで処理すべきデータ量を抑える事ができる。
また、巨大テーブルをスキャンする集計クエリを受け取ると、コーディネータはそれを各ワーカーノードで分割実行できるよう切り分け、ワーカーノードは自らの担当範囲だけを集計した上で、コーディネータが最終結果を生成してユーザに返すというMap-Reduceチックな並列処理を行う事ができる。

一部機能を限定はしているものの、Citusの拡張モジュールはオープンソースとして公開されており、お手軽に試すことができる。
開発元の CitusData社 は2011年創業のスタートアップで、2019年には Microsoft に買収される(パチパチ)。現在は Azure上で HyperScale というブランドで展開されている。

検証構成

CitusをPG-Stromを組み合わせて使うケースで最も素直な構成は、各ワーカーノードが GPU + NVME を持ち、そこでPG-Stromが動作するという構成である。
Citusのコーディネータがクエリを受け取った後、それを各ワーカーノードが実行可能な形に書き換えて処理をキックするが、これはワーカーノードにとっては通常のSQL処理と変わらないので、そこでGPUを用いる方が合理的であれば(つまり、オプティマイザがそう判断すれば)GPUを使用するし、NVME-SSDからのデータ読出しであればSSD-to-GPU Direct SQLを使用する事もできる。

少し機材の都合で、自宅のパソコンのPCIEカードを以下のように組み替えて、2台の独立したサーバに相当する構成を作ってみた。
この場合、NVME0~3⇒GPU0と、NVME4~7⇒GPU1へのデータ転送は全く他方に影響を与えないため、I/O負荷は完全に独立して処理されると考えてよい。

まず、これらディスク領域の準備からはじめる。

[kaigai@kujira ~]$ ls -l /dev/nvme*n1p1
brw-rw----. 1 root disk 259,  1 Dec  6 14:47 /dev/nvme0n1p1
brw-rw----. 1 root disk 259,  3 Dec  6 14:47 /dev/nvme1n1p1
brw-rw----. 1 root disk 259, 14 Dec  6 14:47 /dev/nvme2n1p1
brw-rw----. 1 root disk 259,  5 Dec  6 14:47 /dev/nvme3n1p1
brw-rw----. 1 root disk 259, 13 Dec  6 14:47 /dev/nvme4n1p1
brw-rw----. 1 root disk 259, 15 Dec  6 14:47 /dev/nvme5n1p1
brw-rw----. 1 root disk 259, 12 Dec  6 14:47 /dev/nvme6n1p1
brw-rw----. 1 root disk 259, 10 Dec  6 14:47 /dev/nvme7n1p1

ご覧のように、8台のNVME-SSDが見えており、これらを4台ずつ束ねてストライピング構成を作る。
ここで使用しているNVME-SSDIntelDC P4510 (U.2; 1.0TB)で、SeqReadのカタログスペックは 2850MB/s。つまり、4台束ねれば理論上 11.4GB/s 程度までは出る事になる。*2

以下のようにmdadmコマンドを用いて md-raid0 区画を作成する。
最後のは、/etc/mdadm.confファイルを作成して再起動時に区画が復元されるようにするための設定。

[root@kujira kaigai]# mdadm -C /dev/md0 -c 128 -l 0 -n 4 /dev/nvme0n1p1 /dev/nvme1n1p1 /dev/nvme2n1p1 /dev/nvme3n1p1
mdadm: Defaulting to version 1.2 metadata
mdadm: array /dev/md0 started.

[root@kujira kaigai]# mdadm -C /dev/md1 -c 128 -l 0 -n 4 /dev/nvme4n1p1 /dev/nvme5n1p1 /dev/nvme6n1p1 /dev/nvme7n1p1
mdadm: Defaulting to version 1.2 metadata
mdadm: array /dev/md1 started.

[root@kujira kaigai]# mdadm --detail --scan > /etc/mdadm.conf

この辺は流れ作業だが、md-raid0区画上にパーティションを切り、Ext4で初期化する。

# fdisk /dev/md0
# fdisk /dev/md1

# mkfs.ext4 -LNVME0 /dev/md0p1
# mkfs.ext4 -LNVME1 /dev/md1p1

# cat /etc/fstab
        :
    (snip)
        :
LABEL=NVME0             /nvme/0                 ext4    nofail          0 0
LABEL=NVME1             /nvme/1                 ext4    nofail          0 0

ここで設定した /nvme/0 ボリューム上にCitus+PG-Stromのワーカー1号を、/nvme/1ボリューム上にワーカー2号を配置する。

PostgreSQL (Citus + PG-Strom) の構築

まず、各ワーカーノードの構築を行う。物理的にI/Oは切り離されているとはいえ、同じマシンなので、ポート番号を微妙にずらすことにする。
ワーカー1号を 5433 ポートで、ワーカー2号を 5434 ポートで動かすことにする。

initdbでデータベースクラスタを作成する。

# mkdir /nvme/0/pgdata
# chown postgres:postgres -R /nvme/0/pgdata
# su - postgres -c 'initdb -D /nvme/0/pgdata'

次に、ワーカー側のpostgreql.confを修正する。とはいってもshared_preload_librariesなど基本的なものだけ。

port = 5433
shared_buffers = 12GB
work_mem = 10GB
max_worker_processes = 100
shared_preload_libraries = 'citus,pg_strom'

これで、ワーカー側のPostgreSQLを起動する事ができる。

# su - postgres -c 'pg_ctl start -D /nvme/0/pgdata'

ログを見ると、2台のTesla P40にそれぞれ近傍のNVME-SSDが4台ずつ認識されている事がわかる。
(見やすさのため、タイムスタンプをカット)

LOG:  number of prepared transactions has not been configured, overriding
LOG:  PG-Strom version 2.3 built for PostgreSQL 11
LOG:  PG-Strom: GPU1 Tesla P40 (3840 CUDA cores; 1531MHz, L2 3072kB), RAM 22.38GB (384bits, 3.45GHz), CC 6.1
LOG:  PG-Strom: GPU2 Tesla P40 (3840 CUDA cores; 1531MHz, L2 3072kB), RAM 22.38GB (384bits, 3.45GHz), CC 6.1
LOG:    -  PCIe[0000:17]
LOG:        -  PCIe(0000:17:00.0)
LOG:            -  PCIe(0000:18:00.0)
LOG:                -  PCIe(0000:19:08.0)
LOG:                    -  PCIe(0000:1a:00.0)
LOG:                        -  PCIe(0000:1b:00.0)
LOG:                            -  PCIe(0000:1c:00.0) nvme0 (INTEL SSDPE2KX010T8)
LOG:                        -  PCIe(0000:1b:01.0)
LOG:                            -  PCIe(0000:1d:00.0) nvme1 (INTEL SSDPE2KX010T8)
LOG:                        -  PCIe(0000:1b:02.0)
LOG:                            -  PCIe(0000:1e:00.0) nvme2 (INTEL SSDPE2KX010T8)
LOG:                        -  PCIe(0000:1b:03.0)
LOG:                            -  PCIe(0000:1f:00.0) nvme3 (INTEL SSDPE2KX010T8)
LOG:                -  PCIe(0000:19:10.0)
LOG:                    -  PCIe(0000:20:00.0) GPU1 (Tesla P40)
LOG:    -  PCIe[0000:ae]
LOG:        -  PCIe(0000:ae:00.0)
LOG:            -  PCIe(0000:af:00.0)
LOG:                -  PCIe(0000:b0:08.0)
LOG:                    -  PCIe(0000:b1:00.0) GPU2 (Tesla P40)
LOG:                -  PCIe(0000:b0:10.0)
LOG:                    -  PCIe(0000:b2:00.0)
LOG:                        -  PCIe(0000:b3:00.0)
LOG:                            -  PCIe(0000:b4:00.0) nvme4 (INTEL SSDPE2KX010T8)
LOG:                        -  PCIe(0000:b3:01.0)
LOG:                            -  PCIe(0000:b5:00.0) nvme5 (INTEL SSDPE2KX010T8)
LOG:                        -  PCIe(0000:b3:02.0)
LOG:                            -  PCIe(0000:b6:00.0) nvme6 (INTEL SSDPE2KX010T8)
LOG:                        -  PCIe(0000:b3:03.0)
LOG:                            -  PCIe(0000:b7:00.0) nvme7 (INTEL SSDPE2KX010T8)
LOG:  GPU<->SSD Distance Matrix
LOG:             GPU0     GPU1
LOG:      nvme1  (   5)     -1
LOG:      nvme0  (   5)     -1
LOG:      nvme7     -1   (   5)
LOG:      nvme4     -1   (   5)
LOG:      nvme5     -1   (   5)
LOG:      nvme3  (   5)     -1
LOG:      nvme6     -1   (   5)
LOG:      nvme2  (   5)     -1
LOG:  HeteroDB License: { "version" : 2, "serial_nr" : "HDB-TRIAL", "issued_at" : "6-Dec-2019", "expired_at" : "1-Jan-2030", "gpus" : [ { "uuid" : "GPU-b44c118b-1058-16cb-1cbb-5dbe0fe6181a", "pci_id" : "0000:20:00.0" } , { "uuid" : "GPU-a137b1df-53c9-197f-2801-f2dccaf9d42f", "pci_id" : "0000:b1:00.0" } ] }
LOG:  listening on IPv6 address "::1", port 5434
LOG:  listening on IPv4 address "127.0.0.1", port 5434
LOG:  listening on Unix socket "/var/run/postgresql/.s.PGSQL.5434"
LOG:  listening on Unix socket "/tmp/.s.PGSQL.5434"
LOG:  redirecting log output to logging collector process
HINT:  Future log output will appear in directory "log".

この様に、それぞれ/nvme/0/nvme/1上にワーカーのPostgreSQLインスタンスを起動した。
最後に、これらワーカー側に直接接続してCREATE EXTENSIONを実行する。

$ psql -U postgres -p 5433
psql (11.5)
Type "help" for help.

postgres=# create extension pg_strom;
CREATE EXTENSION
postgres=# create extension citus;
CREATE EXTENSION

次にコーディネータの構築であるが、この人は自分でデータを持つわけでもなく、大量のスキャンを行うわけでもないので、どこか適当な磁気ディスク上の区画でも割り当てておけばよい。
デフォルトの/var/lib/pgdataにDBを構築し、Citusモジュールだけをロードするように構成した。

以上でコーディネータ、ワーカーの初期設定は完了である。

テーブルの作成とデータの投入

テスト用に、いつもの SSBM (Star Schema Benchmark) のテーブルを構築し、このうち最もサイズの大きなlineorderテーブルを各ワーカーに分散配置する事にする。
scale factorは401で、DBサイズにすると 353GB 程度。数字に深い意味はないが、これまでこのサイズで性能計測をしていた事が多かったので。

まず、通常通りCREATE TABLE文を投入する。例えばlineorderテーブルであれば以下のような、いたって何の変哲もないCREATE TABLEである。

CREATE TABLE lineorder (
    lo_orderkey numeric,
    lo_linenumber integer,
    lo_custkey numeric,
    lo_partkey integer,
    lo_suppkey numeric,
    lo_orderdate integer,
    lo_orderpriority character(15),
    lo_shippriority character(1),
    lo_quantity numeric,
    lo_extendedprice numeric,
    lo_ordertotalprice numeric,
    lo_discount numeric,
    lo_revenue numeric,
    lo_supplycost numeric,
    lo_tax numeric,
    lo_commit_date character(8),
    lo_shipmode character(10)
);

少し Citus のオリジナル要素はこの次。lineorderを分散配置する場合は、create_distributed_table関数を用いて、システムに分散配置すべきテーブルと分散キーに用いるカラムを教えてやる。

postgres=# SELECT create_distributed_table('lineorder', 'lo_orderkey');
 create_distributed_table
--------------------------

(1 row)

テーブルを分散配置せず、各ワーカーに複製をコピーするというやり方もある。この場合はcreate_reference_table関数を使用する。

postgres=# SELECT create_reference_table('date1');
 create_reference_table
------------------------

(1 row)

このように設定する事でdistributed tableとreference tableのJOINをワーカー側で実行する事が可能となり、コーディネータの負荷を最大限に下げる事ができる。

そして、最後にコーディネータ側からデータを投入する。

postgres=# \copy lineorder from program './dbgen-ssbm -X -Tl -s 401' delimiter '|'
SSBM (Star Schema Benchmark) Population Generator (Version 1.0.0)
Copyright Transaction Processing Performance Council 1994 - 2000
COPY 2406009932

コーディネータ側から見ても実際のデータサイズは分からないが、、、

postgres=# \d+
                        List of relations
 Schema |   Name    | Type  |  Owner   |    Size    | Description
--------+-----------+-------+----------+------------+-------------
 public | customer  | table | postgres | 8192 bytes |
 public | date1     | table | postgres | 8192 bytes |
 public | lineorder | table | postgres | 8192 bytes |
 public | part      | table | postgres | 8192 bytes |
 public | supplier  | table | postgres | 8192 bytes |
(5 rows)

ワーカー側に接続してみると、確かに分散テーブルが細切れになって保存されている様子が分かる。
(そして reference table は単純にコピーされている)

[kaigai@kujira ~]$ psql -U postgres postgres -p 5433
psql (11.5)
Type "help" for help.

postgres=# \d+
                          List of relations
 Schema |       Name       | Type  |  Owner   |  Size   | Description
--------+------------------+-------+----------+---------+-------------
 public | customer_102072  | table | postgres | 1627 MB |
 public | date1_102075     | table | postgres | 416 kB  |
 public | lineorder_102040 | table | postgres | 11 GB   |
 public | lineorder_102042 | table | postgres | 11 GB   |
 public | lineorder_102044 | table | postgres | 11 GB   |
 public | lineorder_102046 | table | postgres | 11 GB   |
 public | lineorder_102048 | table | postgres | 11 GB   |
 public | lineorder_102050 | table | postgres | 11 GB   |
 public | lineorder_102052 | table | postgres | 11 GB   |
 public | lineorder_102054 | table | postgres | 11 GB   |
 public | lineorder_102056 | table | postgres | 11 GB   |
 public | lineorder_102058 | table | postgres | 11 GB   |
 public | lineorder_102060 | table | postgres | 11 GB   |
 public | lineorder_102062 | table | postgres | 11 GB   |
 public | lineorder_102064 | table | postgres | 11 GB   |
 public | lineorder_102066 | table | postgres | 11 GB   |
 public | lineorder_102068 | table | postgres | 11 GB   |
 public | lineorder_102070 | table | postgres | 11 GB   |
 public | part_102073      | table | postgres | 206 MB  |
 public | supplier_102074  | table | postgres | 528 MB  |
(20 rows)

vacuum analyzeを走らせる

最後に、SSD-to-GPU Direct SQLに必要な visibility map を強制的に作成するために、vacuum analyzeを実行する。まさかコーディネータ側の空っぽテーブルをvacuumするだけじゃないよね?と思ったが、さすがに杞憂だったというか、ワーカー側できちんと分散処理してくれていた。

postgres=# vacuum analyze lineorder ;

を、実行中のtopコマンドの出力

top - 16:14:08 up  4:03,  5 users,  load average: 5.79, 1.35, 1.04
Tasks: 418 total,  24 running, 394 sleeping,   0 stopped,   0 zombie
%Cpu(s): 15.2 us, 33.0 sy,  0.0 ni,  7.9 id, 43.8 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem : 19651427+total,   525488 free,  5696528 used, 19029225+buff/cache
KiB Swap:  8388604 total,  8380412 free,     8192 used. 18758867+avail Mem

   PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
   150 root      20   0       0      0      0 R  59.3  0.0   0:52.53 kswapd0
 17275 postgres  20   0   12.9g  48344  44076 D  56.3  0.0   0:05.91 postgres
   151 root      20   0       0      0      0 R  53.7  0.0   0:27.33 kswapd1
   321 root      20   0       0      0      0 S  51.0  0.0   0:33.39 kworker/u274:5
 17311 root      20   0       0      0      0 S  48.7  0.0   0:01.91 kworker/u274:0
 17286 postgres  20   0   12.9g  48016  43760 D  44.7  0.0   0:04.55 postgres
 17295 postgres  20   0   12.9g  47956  43704 R  42.3  0.0   0:04.33 postgres
 17298 postgres  20   0   12.9g  47932  43680 R  42.0  0.0   0:04.65 postgres
  6657 postgres  20   0   12.9g 339180 324212 R  41.3  0.2   6:54.01 postgres
 17281 postgres  20   0   12.9g  48340  44076 R  41.3  0.0   0:05.74 postgres
 17301 postgres  20   0   12.9g  48180  43928 R  41.3  0.0   0:05.26 postgres
 17278 postgres  20   0   12.9g  47804  43540 R  40.7  0.0   0:04.32 postgres
 17277 postgres  20   0   12.9g  48180  43916 R  40.3  0.0   0:05.22 postgres
 17287 postgres  20   0   12.9g  47928  43660 R  38.7  0.0   0:04.15 postgres
 17279 postgres  20   0   12.9g  48000  43732 R  38.3  0.0   0:04.10 postgres
 17284 postgres  20   0   12.9g  48056  43796 R  36.3  0.0   0:04.96 postgres
 17280 postgres  20   0   12.9g  48196  43928 R  34.0  0.0   0:04.84 postgres
  6658 postgres  20   0   12.9g 330544 322924 D  31.3  0.2   6:29.77 postgres
 17292 postgres  20   0   12.9g  47804  43552 R  31.3  0.0   0:04.45 postgres
 17294 postgres  20   0   12.9g  48068  43812 D  27.7  0.0   0:04.30 postgres
 17283 postgres  20   0   12.9g  47832  43560 R  26.0  0.0   0:04.14 postgres
 17289 postgres  20   0   12.9g  48060  43800 R  25.7  0.0   0:05.18 postgres
 17291 postgres  20   0   12.9g  47804  43552 R  25.7  0.0   0:04.01 postgres
 17302 postgres  20   0   12.9g  47940  43688 R  24.0  0.0   0:05.04 postgres
 17299 postgres  20   0   12.9g  48000  43744 R  21.7  0.0   0:04.01 postgres
 17304 postgres  20   0   12.9g  47792  43540 R  21.3  0.0   0:04.53 postgres
 17282 postgres  20   0   12.9g  47828  43564 R  19.3  0.0   0:03.52 postgres
 17290 postgres  20   0   12.9g  48080  43816 R  17.3  0.0   0:04.83 postgres
 17285 postgres  20   0   12.9g  47956  43688 D  15.0  0.0   0:04.18 postgres
 17288 postgres  20   0   12.9g  47980  43724 D  15.0  0.0   0:04.75 postgres
 17303 postgres  20   0   12.9g  48084  43828 D  14.0  0.0   0:04.89 postgres
 17276 postgres  20   0   12.9g  47968  43692 D  13.0  0.0   0:04.28 postgres
 17300 postgres  20   0   12.9g  47800  43548 R  12.3  0.0   0:03.76 postgres

集計クエリを実行してみる(Take.1)

早速、Star Schema Benchmarkの集計クエリを実行してみる事にする。
先ずは実行計画。

postgres=# explain
select sum(lo_extendedprice*lo_discount) as revenue
from lineorder,date1
where lo_orderdate = d_datekey
and d_year = 1993
and lo_discount between 1 and 3
and lo_quantity < 25;
                                                                          QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=0.00..0.00 rows=0 width=0)
   ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)
         Task Count: 32
         Tasks Shown: One of 32
         ->  Task
               Node: host=localhost port=5433 dbname=postgres
               ->  Aggregate  (cost=958010.91..958010.92 rows=1 width=32)
                     ->  Gather  (cost=957962.46..958006.83 rows=408 width=8)
                           Workers Planned: 2
                           ->  Parallel Custom Scan (GpuPreAgg)  (cost=956962.46..956966.03 rows=204 width=8)
                                 Reduction: NoGroup
                                 Combined GpuJoin: enabled
                                 GPU Preference: GPU1 (Tesla P40)
                                 ->  Parallel Custom Scan (GpuJoin) on lineorder_102040 lineorder  (cost=30257.08..962321.28 rows=590200 width=10)
                                       Outer Scan: lineorder_102040 lineorder  (cost=30259.93..963572.01 rows=4133020 width=14)
                                       Outer Scan Filter: ((lo_discount >= '1'::numeric) AND (lo_discount <= '3'::numeric) AND (lo_quantity < '25'::numeric))
                                       Depth 1: GpuHashJoin  (hash-size: 66.06KB, nrows 4133020...1416481)
                                                HashKeys: lineorder.lo_orderdate
                                                JoinQuals: (lineorder.lo_orderdate = date1.d_datekey)
                                       GPU Preference: GPU1 (Tesla P40)
                                       ->  Seq Scan on date1_102075 date1  (cost=0.00..78.95 rows=365 width=4)
                                             Filter: (d_year = 1993)
(22 rows)

これを見ると、確かに Citus の独自プランであるCustom Scan (Citus Adaptive)と、PG-Stromの独自プランであるCustom Scan (GpuPreAgg)Custom Scan (GpuJoin)が混在している。

ではさっそく実行してみると、、、、Out of managed memoryエラーで止まってしまった。要はGPUリソースの食いすぎである。

GPUを使用するプログラムは、GPUバイス上に「CUDAコンテキスト」と呼ばれる実行時情報を保持する必要がある。これが最低でも150MB程度の大きさがあり、さらに、PG-Stromで処理すべきデータを保持すると、プロセスあたり平均して600MB~800MB程度のメモリを消費する事になる。
今回は、Citusがlineorderテーブルを32個のセグメントに分割し、ワーカーノード1個あたり16個のセグメントを持っている事になる。加えて、ワーカーノードで実行される集計クエリはPostgreSQLのパラレルクエリ機能により、3並列で実行((Workers Planned: 2と表示されているため、自分自身+バックグラウンドワーカー×2))されるため、GPUあたり48プロセスが全力で回るという事になる。
さすがにこれは辛みがあるので、そもそも16セグメントに分割されている分、ワーカー側の並列実行を止める事にする。

また、この実行計画には NVMe-Strom: enabledというメッセージが表示されておらず、せっかくのNVME-SSDなのに、SSD-to-GPU Direct SQLモードを利用できない。
これは、lineorderが細かく分割された結果、テーブル1個あたりのサイズが、SSD-to-GPU Direct SQLを使用する閾値よりも小さな値になっているからである。

そこで、上記2つの問題を解決するため、ワーカー側で以下の設定を追加した。

max_parallel_workers_per_gather = 0
pg_strom.nvme_strom_threshold = 1GB

集計クエリを実行してみる(Take.2)

さて、再度集計クエリを実行してみる事にする。

まずは実行計画を確認。

postgres=# explain
select sum(lo_extendedprice*lo_discount) as revenue
from lineorder,date1
where lo_orderdate = d_datekey
and d_year = 1993
and lo_discount between 1 and 3
and lo_quantity < 25;
                                                                       QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=0.00..0.00 rows=0 width=0)
   ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)
         Task Count: 32
         Tasks Shown: One of 32
         ->  Task
               Node: host=localhost port=5433 dbname=postgres
               ->  Aggregate  (cost=1007098.32..1007098.33 rows=1 width=32)
                     ->  Custom Scan (GpuPreAgg)  (cost=1007092.71..1007096.28 rows=204 width=8)
                           Reduction: NoGroup
                           Combined GpuJoin: enabled
                           GPU Preference: GPU1 (Tesla P40)
                           ->  Custom Scan (GpuJoin) on lineorder_102040 lineorder  (cost=9114.62..1019939.70 rows=1416481 width=10)
                                 Outer Scan: lineorder_102040 lineorder  (cost=9426.71..1098945.56 rows=9919249 width=14)
                                 Outer Scan Filter: ((lo_discount >= '1'::numeric) AND (lo_discount <= '3'::numeric) AND (lo_quantity < '25'::numeric))
                                 Depth 1: GpuHashJoin  (hash-size: 66.06KB, nrows 9919249...1416481)
                                          HashKeys: lineorder.lo_orderdate
                                          JoinQuals: (lineorder.lo_orderdate = date1.d_datekey)
                                 GPU Preference: GPU1 (Tesla P40)
                                 NVMe-Strom: enabled
                                 ->  Seq Scan on date1_102075 date1  (cost=0.00..78.95 rows=365 width=4)
                                       Filter: (d_year = 1993)
(21 rows)

ワーカー側でのパラレルクエリが消えており、また、今度はNVMe-Strom: enabledの表示が出た。

今度はEXPLAIN ANALYZEで実際にクエリを実行してみる。
実行時間は24.67sで、合計 353GB のテーブルをスキャンした結果としては中々のものである。

postgres=# explain analyze
select sum(lo_extendedprice*lo_discount) as revenue
from lineorder,date1
where lo_orderdate = d_datekey
and d_year = 1993
and lo_discount between 1 and 3
and lo_quantity < 25;
                                                                                  QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=0.00..0.00 rows=0 width=0) (actual time=24666.217..24666.218 rows=1 loops=1)
   ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0) (actual time=24666.174..24666.181 rows=32 loops=1)
         Task Count: 32
         Tasks Shown: One of 32
         ->  Task
               Node: host=localhost port=5433 dbname=postgres
               ->  Aggregate  (cost=1007098.32..1007098.33 rows=1 width=32) (actual time=3569.988..3569.989 rows=1 loops=1)
                     ->  Custom Scan (GpuPreAgg)  (cost=1007092.71..1007096.28 rows=204 width=8) (actual time=3569.953..3569.960 rows=1 loops=1)
                           Reduction: NoGroup
                           Combined GpuJoin: enabled
                           GPU Preference: GPU1 (Tesla P40)
                           ->  Custom Scan (GpuJoin) on lineorder_102040 lineorder  (cost=9114.62..1019939.70 rows=1416481 width=10) (never executed)
                                 Outer Scan: lineorder_102040 lineorder  (cost=9426.71..1098945.56 rows=9919249 width=14) (actual time=261.049..628.315 rows=75214815 loops=1)
                                 Outer Scan Filter: ((lo_discount >= '1'::numeric) AND (lo_discount <= '3'::numeric) AND (lo_quantity < '25'::numeric))
                                 Rows Removed by Outer Scan Filter: 65374058
                                 Depth 1: GpuHashJoin  (hash-size: 66.06KB actual-size: 23.29KB, plan nrows: 9919249...1416481, actual nrows: 9840757...1490844)
                                          HashKeys: lineorder.lo_orderdate
                                          JoinQuals: (lineorder.lo_orderdate = date1.d_datekey)
                                 GPU Preference: GPU1 (Tesla P40)
                                 NVMe-Strom: load=1437612
                                 ->  Seq Scan on date1_102075 date1  (cost=0.00..78.95 rows=365 width=4) (actual time=0.075..0.348 rows=365 loops=1)
                                       Filter: (d_year = 1993)
                                       Rows Removed by Filter: 2191
                   Planning Time: 0.485 ms
                   Execution Time: 3759.495 ms
 Planning Time: 2.875 ms
 Execution Time: 24666.270 ms
(27 rows)

以下は、集計クエリ実行中の iostat の出力。これはいつも通り、I/Oの帯域をほぼ限界まで使い切っている事が分かる。

Device:            tps    MB_read/s    MB_wrtn/s    MB_read    MB_wrtn
nvme0n1       21134.00      2641.50         0.01       5283          0
nvme1n1       21214.00      2641.22         0.00       5282          0
nvme2n1       21128.50      2640.96         0.00       5281          0
nvme6n1       20678.00      2584.75         0.00       5169          0
nvme5n1       20761.50      2584.89         0.01       5169          0
nvme4n1       20683.00      2585.31         0.00       5170          0
nvme7n1       20753.00      2585.35         0.00       5170          0
nvme3n1       21194.50      2639.98         0.00       5279          0
md0           84611.50     10556.17         0.01      21112          0
md1           82911.00     10344.69         0.01      20689          0

結論

PostgreSQL向けスケールアウト拡張であるCitusと、スケールアップ拡張であるPG-Stromを組み合わせて実行できることを確認した。
こういった形で、PostgreSQL向けに設計された周辺ソフトウェアと必要に応じて組み合わせる事ができるのが、他のGPU-DBにはないPG-Stromの強みであろう。

ただし、分散テーブルの分割度合いと、PG-Stromの得意とするデータサイズを見極めて、セグメント数や並列ワーカー数を設定する必要がある。
現実問題として、PG-Stromがシングルノードで処理し切れないレベルの大規模DBという事になると、それなりの体制を組んで事前検証を行うはずなので問題にはならないハズだが。CitusはCitusで、PG-StromはPG-Stromで、それぞれ単独で使ったケースを想定してのデフォルト値設定なので、これは致し方ないところ。

*1:PG-Stromと同じくCustomScan APIを利用している

*2:実際にはこのPCIeホストカードでは10.5GB/s程度が上限っぽいが…。ただし、別の製品では11.5GB/sを記録した事はある。