Writable Arrow_Fdwと、PL/CUDAがお払い箱になる話

昨年ラストのブログ記事は、pg2arrowに--appendモードを付けてApache Arrowファイルへの追記を行うというトピックだった。

kaigai.hatenablog.com

実は内部的には、PG-StromのArrow_Fdwとpg2arrowのコードは大半を共有していて*1、入り口がスタンドアロンlibpqを使うツールなのか、PostgreSQLのFDW APIなのかという程度の違いしかない。
そこで、Arrow_Fdw外部テーブルに対してINSERT文を実行できるようにして、PostgreSQL側でもApache Arrowファイルへの追記をできるようにしてみた。これは後述の、Python向け各種モジュールとのデータ交換を目的とした機能強化である。

Writable Arrow_Fdw

Arrow_Fdw外部テーブルを書き込み可能にするには、テーブルオプションに writable を付与する。

=# CREATE FOREIGN TABLE ft (
  id    int,
  x     real,
  y     real,
  z     real
) SERVER arrow_fdw
  OPTIONS (file '/dev/shm/ft.arrow', writable 'true');
CREATE FOREIGN TABLE

外部テーブルを定義する時点でfileで指定する Apache Arrow ファイルが存在している必要はないが、writableオプションを指定した場合は、外部テーブルの背後に複数の Apache Arrow ファイルを配置する事はできない。これは1個でないと、どのファイルに書き込むべきかを特定できないため。

ひとまず 1,000 行ほどデータを挿入してみる。

=# INSERT INTO ft (
  SELECT x, pgstrom.random_float() * 100.0,
            pgstrom.random_float() * 100.0,
            pgstrom.random_float() * 100.0
    FROM generate_series(1,1000) x);
INSERT 0 1000

このように、指定したパスに Apache Arrow ファイルが作成され、1,000行分のデータが書き込まれている事がわかる。

$ ls -l /dev/shm/ft.arrow
-rw-------. 1 kaigai users 17166 Feb 17 18:04 /dev/shm/ft.arrow
$ ./utils/pg2arrow --dump /dev/shm/ft.arrow
[Footer]
{Footer: version=V4, schema={Schema: endianness=little, fields=[{Field: name="id", nullable=true, type={Int32}, children=[], custom_metadata=[]}, {Field: name="x", nullable=true, type={Float32}, children=[], custom_metadata=[]}, {Field: name="y", nullable=true, type={Float32}, children=[], custom_metadata=[]}, {Field: name="z", nullable=true, type={Float32}, children=[], custom_metadata=[]}], custom_metadata=[]}, dictionaries=[], recordBatches=[{Block: offset=352, metaDataLength=296 bodyLength=16128}]}
[Record Batch 0]
{Block: offset=352, metaDataLength=296 bodyLength=16128}
{Message: version=V4, body={RecordBatch: length=1000, nodes=[{FieldNode: length=1000, null_count=0}, {FieldNode: length=1000, null_count=0}, {FieldNode: length=1000, null_count=0}, {FieldNode: length=1000, null_count=0}], buffers=[{Buffer: offset=0, length=0}, {Buffer: offset=0, length=4032}, {Buffer: offset=4032, length=0}, {Buffer: offset=4032, length=4032}, {Buffer: offset=8064, length=0}, {Buffer: offset=8064, length=4032}, {Buffer: offset=12096, length=0}, {Buffer: offset=12096, length=4032}]}, bodyLength=16128}

なお、PostgreSQLのFDWモジュールとして動作するからには、トランザクション制御の諸々に従う必要がある。
以下のように未コミットの書き込みに関しては、ABORTROLLBACKで取り消す事ができる。
ただし、効率的にトランザクションを実装するため、INSERTを実行できるのは同時に1トランザクションのみ。要は、少し強めのShareRowExclusiveLockを取っているので、その辺はご注意を。

postgres=# SELECT count(*) FROM ft;
 count
-------
  1000
(1 row)

postgres=# BEGIN;
BEGIN
postgres=# INSERT INTO ft (
  SELECT x, pgstrom.random_float() * 100.0,
            pgstrom.random_float() * 100.0,
            pgstrom.random_float() * 100.0
    FROM generate_series(1,300) x);
INSERT 0 300
postgres=# SELECT count(*) FROM ft;
 count
-------
  1300
(1 row)

postgres=# ABORT;
ROLLBACK
postgres=# SELECT count(*) FROM ft;
 count
-------
  1000
(1 row)

PL/CUDAがお払い箱になる話

Arrow_FdwのREAD系に関しては列データによるGPUへの高速なデータ供給という設計意図があるのだが、WRITE系に関しては少し異なる思惑がある。

f:id:kaigai:20200217182318p:plain

バイスから集まってくるログデータ、いわゆるIoT/M2M系のワークロードを処理する事を考えると、生データは割と簡単にTB級のデータサイズに膨れ上がってしまい、何らかの集計や前処理を行わないと機械学習・統計解析のエンジンに渡す事ができない。少なくともGBの単位まで落とす必要はあるだろうと考えており、おそらくその辺は、既存のSSD-to-GPU Direct SQLの役割となる。
問題は、前処理を終えたデータで、これを Python スクリプトで動く機械学習エンジンに渡す時に、いったんCSVで吐き出してから再度 Text -> Binary 変換というのは効率が悪い。できればバイナリのまま受け渡す方が効率的で、スマートであろう。

新たに追加した関数、pgstrom.arrow_fdw_export_cupy()を使えば、Arrow_Fdw外部テーブルに格納されたデータのうち、指定された列だけを抽出してcuPyのndarrayと呼ばれるデータフレームと同じ形式でGPUバイスメモリにロードする事ができる。この関数はGPUバイスメモリを外部からマップするための識別子を返すので、これを利用すれば、Zero-copyで*2PostgreSQLPythonスクリプトの間のデータ交換が可能になる。

加えて、PostgreSQLにはPL/Pythonという、Python言語でユーザ定義関数を記述するための機能があり、これを利用すれば、元々PL/CUDAでユーザにベタっとCUDA Cのコードを書いてもらっていた*3ところを、もっと一般的な Python + cuPy という形で代替できる。

cuPyにもカスタムGPUカーネルを記述する機能があり、cupy.RawKernelクラスを利用する。この人は裏でNVRTCを利用して、GPUカーネルの実行時コンパイルができるので、感覚としてはLL言語スクリプトを書くのとあまり大差ない。

以下にコードのサンプルを置いてみる。
このSQL関数は、①PL/Pythonを利用してcuPyの機能を呼び出し、②Arrow_Fdwからロードしたデータフレームの値の平均値を列ごとに導出する。

CREATE OR REPLACE FUNCTION custom_average(x_ident text)
RETURNS float[] AS
$$
import cupy
import cupy_strom

X = cupy_strom.ipc_import(x_ident)
nattrs = X.shape[0]
nitems = X.shape[1]
gridSz = (nitems + 2047) >> 11;

Y = cupy.zeros((nattrs))

source='''
extern "C" __global__
           __launch_bounds__(1024)
void
kern_gpu_sum(double *y, const float *x, int nitems)
{
    __shared__ float lvalues[2048];
    int     gridSz = (nitems + 2047) / 2048;
    int     colIdx = blockIdx.x / gridSz;
    int     rowBase = (blockIdx.x % gridSz) * 2048;
    int     localId = 2 * threadIdx.x;
    int     i, k;

    // Load values to local shared buffer
    x += colIdx * nitems;
    for (i=threadIdx.x; i < 2048; i+=blockDim.x)
        lvalues[i] = (rowBase + i < nitems ? x[rowBase + i] : 0.0);
    __syncthreads();

    // Run reduction operations
    for (k=0; k < 11; k++)
    {
        int     mask = ((1 << k) - 1);

        if ((threadIdx.x & mask) == 0)
            lvalues[localId] += lvalues[localId + (1<<k)];
        __syncthreads();
    }
    // Write back the total sum
    if (threadIdx.x == 0)
        atomicAdd(&y[colIdx], lvalues[0]);
}
'''
kern = cupy.RawKernel(source, 'kern_gpu_sum')
kern.__call__((gridSz * nattrs,0,0),
              (1024,0,0),
              (Y,X,nitems))
X = 0   # unmap GPU memory

return Y / nitems
$$ LANGUAGE 'plpython3u';

細かい説明は省略するが、入力値Xを2048要素ごとに領域分割し、各領域ごとに1024個のスレッドが協調して11ステップで総和を計算し、出力バッファYに書き出すというモノである。

分かりやすいように、x列、y列、z列の値がそれぞれ異なる分布を取るように初期化する。

=# INSERT INTO ft (SELECT x, pgstrom.random_int(0,1,10000)::float/100.0,
                             pgstrom.random_int(0,-7500,2500)::float/100.0,
                             pgstrom.random_int(0,5000,15000)::float/100.0
                     FROM generate_series(1,1000000) x);
=# SELECT avg(x), avg(y), avg(z) FROM ft;
       avg        |        avg        |       avg
------------------+-------------------+-----------------
 49.9972925601087 | -24.9707353391815 | 99.982088626751
(1 row)

で、件のPL/Pythonユーザ定義関数を呼び出す。
pgstrom.arrow_fdw_export_cupyがftテーブルのx列、y列、z列を抽出して作成したGPUバッファの識別子を、そのままPL/Pythonユーザ定義関数に入力し、GPUカーネルを呼び出して平均値を計算している。

=# SELECT custom_average(pgstrom.arrow_fdw_export_cupy('ft','{x,y,z}'::text[]));
                    custom_average
------------------------------------------------------
 {49.9972926015625,-24.9707353193359,99.982088671875}
(1 row)

上記のように同じ結果が出力された事と、もう一点、PL/CUDAと同じようにユーザ定義のGPUカーネル関数をPL/Python + cuPyの組み合わせで実行できることが実証できた。
イマドキだと、なかなかCUDA CでベタにGPUカーネルを書くという人は少ないらしいので、それよりはより間口の広いPython環境の道具を使えるようにしつつ、必要に応じてPL/CUDA相当のカリカリチューニングなコードを書けるようにする、というのがベターな方向性であろう。

今後は、PL/CUDAからPL/Python + cuPyやその他のPython向けモジュールという形態を推奨するようにしたい。

*1:そりゃそうだ

*2:厳密には外部テーブル⇒GPUへのデータロードが最初の一回だけ

*3:いや、しかし、PL/CUDAのコードを書いていた人なんて自分意外にいるの?いるの?