昨年ラストのブログ記事は、pg2arrowに--appendモードを付けてApache Arrowファイルへの追記を行うというトピックだった。
実は内部的には、PG-StromのArrow_Fdwとpg2arrowのコードは大半を共有していて*1、入り口がスタンドアロンのlibpqを使うツールなのか、PostgreSQLのFDW APIなのかという程度の違いしかない。
そこで、Arrow_Fdw外部テーブルに対してINSERT
文を実行できるようにして、PostgreSQL側でもApache Arrowファイルへの追記をできるようにしてみた。これは後述の、Python向け各種モジュールとのデータ交換を目的とした機能強化である。
Writable Arrow_Fdw
Arrow_Fdw外部テーブルを書き込み可能にするには、テーブルオプションに writable
を付与する。
=# CREATE FOREIGN TABLE ft ( id int, x real, y real, z real ) SERVER arrow_fdw OPTIONS (file '/dev/shm/ft.arrow', writable 'true'); CREATE FOREIGN TABLE
外部テーブルを定義する時点でfile
で指定する Apache Arrow ファイルが存在している必要はないが、writable
オプションを指定した場合は、外部テーブルの背後に複数の Apache Arrow ファイルを配置する事はできない。これは1個でないと、どのファイルに書き込むべきかを特定できないため。
ひとまず 1,000 行ほどデータを挿入してみる。
=# INSERT INTO ft ( SELECT x, pgstrom.random_float() * 100.0, pgstrom.random_float() * 100.0, pgstrom.random_float() * 100.0 FROM generate_series(1,1000) x); INSERT 0 1000
このように、指定したパスに Apache Arrow ファイルが作成され、1,000行分のデータが書き込まれている事がわかる。
$ ls -l /dev/shm/ft.arrow -rw-------. 1 kaigai users 17166 Feb 17 18:04 /dev/shm/ft.arrow $ ./utils/pg2arrow --dump /dev/shm/ft.arrow [Footer] {Footer: version=V4, schema={Schema: endianness=little, fields=[{Field: name="id", nullable=true, type={Int32}, children=[], custom_metadata=[]}, {Field: name="x", nullable=true, type={Float32}, children=[], custom_metadata=[]}, {Field: name="y", nullable=true, type={Float32}, children=[], custom_metadata=[]}, {Field: name="z", nullable=true, type={Float32}, children=[], custom_metadata=[]}], custom_metadata=[]}, dictionaries=[], recordBatches=[{Block: offset=352, metaDataLength=296 bodyLength=16128}]} [Record Batch 0] {Block: offset=352, metaDataLength=296 bodyLength=16128} {Message: version=V4, body={RecordBatch: length=1000, nodes=[{FieldNode: length=1000, null_count=0}, {FieldNode: length=1000, null_count=0}, {FieldNode: length=1000, null_count=0}, {FieldNode: length=1000, null_count=0}], buffers=[{Buffer: offset=0, length=0}, {Buffer: offset=0, length=4032}, {Buffer: offset=4032, length=0}, {Buffer: offset=4032, length=4032}, {Buffer: offset=8064, length=0}, {Buffer: offset=8064, length=4032}, {Buffer: offset=12096, length=0}, {Buffer: offset=12096, length=4032}]}, bodyLength=16128}
なお、PostgreSQLのFDWモジュールとして動作するからには、トランザクション制御の諸々に従う必要がある。
以下のように未コミットの書き込みに関しては、ABORT
やROLLBACK
で取り消す事ができる。
ただし、効率的にトランザクションを実装するため、INSERT
を実行できるのは同時に1トランザクションのみ。要は、少し強めのShareRowExclusiveLock
を取っているので、その辺はご注意を。
postgres=# SELECT count(*) FROM ft; count ------- 1000 (1 row) postgres=# BEGIN; BEGIN postgres=# INSERT INTO ft ( SELECT x, pgstrom.random_float() * 100.0, pgstrom.random_float() * 100.0, pgstrom.random_float() * 100.0 FROM generate_series(1,300) x); INSERT 0 300 postgres=# SELECT count(*) FROM ft; count ------- 1300 (1 row) postgres=# ABORT; ROLLBACK postgres=# SELECT count(*) FROM ft; count ------- 1000 (1 row)
PL/CUDAがお払い箱になる話
Arrow_FdwのREAD系に関しては列データによるGPUへの高速なデータ供給という設計意図があるのだが、WRITE系に関しては少し異なる思惑がある。
デバイスから集まってくるログデータ、いわゆるIoT/M2M系のワークロードを処理する事を考えると、生データは割と簡単にTB級のデータサイズに膨れ上がってしまい、何らかの集計や前処理を行わないと機械学習・統計解析のエンジンに渡す事ができない。少なくともGBの単位まで落とす必要はあるだろうと考えており、おそらくその辺は、既存のSSD-to-GPU Direct SQLの役割となる。
問題は、前処理を終えたデータで、これを Python スクリプトで動く機械学習エンジンに渡す時に、いったんCSVで吐き出してから再度 Text -> Binary 変換というのは効率が悪い。できればバイナリのまま受け渡す方が効率的で、スマートであろう。
新たに追加した関数、pgstrom.arrow_fdw_export_cupy()
を使えば、Arrow_Fdw外部テーブルに格納されたデータのうち、指定された列だけを抽出してcuPyのndarray
と呼ばれるデータフレームと同じ形式でGPUデバイスメモリにロードする事ができる。この関数はGPUデバイスメモリを外部からマップするための識別子を返すので、これを利用すれば、Zero-copyで*2PostgreSQLとPythonスクリプトの間のデータ交換が可能になる。
加えて、PostgreSQLにはPL/Pythonという、Python言語でユーザ定義関数を記述するための機能があり、これを利用すれば、元々PL/CUDAでユーザにベタっとCUDA Cのコードを書いてもらっていた*3ところを、もっと一般的な Python + cuPy という形で代替できる。
cuPyにもカスタムGPUカーネルを記述する機能があり、cupy.RawKernel
クラスを利用する。この人は裏でNVRTCを利用して、GPUカーネルの実行時コンパイルができるので、感覚としてはLL言語でスクリプトを書くのとあまり大差ない。
以下にコードのサンプルを置いてみる。
このSQL関数は、①PL/Pythonを利用してcuPyの機能を呼び出し、②Arrow_Fdwからロードしたデータフレームの値の平均値を列ごとに導出する。
CREATE OR REPLACE FUNCTION custom_average(x_ident text) RETURNS float[] AS $$ import cupy import cupy_strom X = cupy_strom.ipc_import(x_ident) nattrs = X.shape[0] nitems = X.shape[1] gridSz = (nitems + 2047) >> 11; Y = cupy.zeros((nattrs)) source=''' extern "C" __global__ __launch_bounds__(1024) void kern_gpu_sum(double *y, const float *x, int nitems) { __shared__ float lvalues[2048]; int gridSz = (nitems + 2047) / 2048; int colIdx = blockIdx.x / gridSz; int rowBase = (blockIdx.x % gridSz) * 2048; int localId = 2 * threadIdx.x; int i, k; // Load values to local shared buffer x += colIdx * nitems; for (i=threadIdx.x; i < 2048; i+=blockDim.x) lvalues[i] = (rowBase + i < nitems ? x[rowBase + i] : 0.0); __syncthreads(); // Run reduction operations for (k=0; k < 11; k++) { int mask = ((1 << k) - 1); if ((threadIdx.x & mask) == 0) lvalues[localId] += lvalues[localId + (1<<k)]; __syncthreads(); } // Write back the total sum if (threadIdx.x == 0) atomicAdd(&y[colIdx], lvalues[0]); } ''' kern = cupy.RawKernel(source, 'kern_gpu_sum') kern.__call__((gridSz * nattrs,0,0), (1024,0,0), (Y,X,nitems)) X = 0 # unmap GPU memory return Y / nitems $$ LANGUAGE 'plpython3u';
細かい説明は省略するが、入力値X
を2048要素ごとに領域分割し、各領域ごとに1024個のスレッドが協調して11ステップで総和を計算し、出力バッファY
に書き出すというモノである。
分かりやすいように、x列、y列、z列の値がそれぞれ異なる分布を取るように初期化する。
=# INSERT INTO ft (SELECT x, pgstrom.random_int(0,1,10000)::float/100.0, pgstrom.random_int(0,-7500,2500)::float/100.0, pgstrom.random_int(0,5000,15000)::float/100.0 FROM generate_series(1,1000000) x); =# SELECT avg(x), avg(y), avg(z) FROM ft; avg | avg | avg ------------------+-------------------+----------------- 49.9972925601087 | -24.9707353391815 | 99.982088626751 (1 row)
で、件のPL/Pythonユーザ定義関数を呼び出す。
pgstrom.arrow_fdw_export_cupy
がftテーブルのx列、y列、z列を抽出して作成したGPUバッファの識別子を、そのままPL/Pythonユーザ定義関数に入力し、GPUカーネルを呼び出して平均値を計算している。
=# SELECT custom_average(pgstrom.arrow_fdw_export_cupy('ft','{x,y,z}'::text[])); custom_average ------------------------------------------------------ {49.9972926015625,-24.9707353193359,99.982088671875} (1 row)
上記のように同じ結果が出力された事と、もう一点、PL/CUDAと同じようにユーザ定義のGPUカーネル関数をPL/Python + cuPyの組み合わせで実行できることが実証できた。
イマドキだと、なかなかCUDA CでベタにGPUカーネルを書くという人は少ないらしいので、それよりはより間口の広いPython環境の道具を使えるようにしつつ、必要に応じてPL/CUDA相当のカリカリチューニングなコードを書けるようにする、というのがベターな方向性であろう。
今後は、PL/CUDAからPL/Python + cuPyやその他のPython向けモジュールという形態を推奨するようにしたい。