読者です 読者をやめる 読者になる 読者になる

俺様スキャンの並列実行

PostgreSQL

PostgreSQL v9.6からはパラレルスキャンが導入される事になっている。

この機能をざっくり説明すると

  1. 共有メモリ上に『次に読むブロックの番号』という状態を作っておく。
  2. Gatherノードが複数のワーカープロセスを起動する。
  3. 各ワーカーで実行されるSeqScanが『次に読むブロックの番号』をアトミックにインクリメントしながらシーケンシャルスキャンを行う。
  4. 結果、個々のワーカーは本来読むべきテーブルの内容を一部しか返却しない。しかし、各ワーカーの実行結果を全てマージするとテーブルの内容は全て読み出されている。(しかも並列に)

という代物である。
読み出すブロック番号を共有メモリ上の情報から得ている以外は、全てシングルスレッド実行と同じコードを使用しているところがポイントである。

で、これと同じ事をForeignScanやCustomScanを使用して実装したい場合、並列対応版とそうでないものの差は、①この共有メモリ上に作られる『次に読むブロックの番号』(や、それに類する情報)を作るための仕組みと、②この情報に基づいて読み出す範囲を決定するという部分だけである。

②に関してはモジュール側で実装する部分の範疇なので特にインターフェースの拡張は必要ないが、①に関しては、共有メモリを獲得・初期化するタイミングでモジュール側を呼び出してやる必要がある。

その呼び出しポイントは、以下の関数。

  • ExecParallelEstimate ... 共有メモリセグメントの長さを決定する
  • ExecParallelInitializeDSM ... 共有メモリセグメントの初期値を設定する
  • ExecParallelInitializeWorker ... ワーカー側で実行ノードの初期化を行う

そのそれぞれに対応するCustomScan/ForeignScanのコールバックを定義してみた。

CustomScanはこんな感じ。

Size (*EstimateDSMCustomScan) (CustomScanState *node,
                               ParallelContext *pcxt);
 → 必要な共有メモリのサイズを返す

void (*InitializeDSMCustomScan) (CustomScanState *node,
                                 ParallelContext *pcxt,
                                 void *coordinate);
 → coordinateに割り当てられた共有メモリを初期化

void (*InitializeWorkerCustomScan) (CustomScanState *node,
                                    shm_toc *toc,
                                    void *coordinate);
 → worker側で共有メモリの内容に基づいて、実行ノードの
    (追加的な)初期化を行う。

ForeignScanもこんな感じ。全く同様

Size EstimateDSMForeignScan(ForeignScanState *node,
                            ParallelContext *pcxt);
void InitializeDSMForeignScan(ForeignScanState *node,
                              ParallelContext *pcxt,
                              void *coordinate);
void InitializeWorkerForeignScan(ForeignScanState *node,
                                 shm_toc *toc,
                                 void *coordinate);

このコールバックを使って、file_fdwを並列実行対応にしてみた。
diff差分を見てみると、修正規模は300行強。非常に小さな手間で並列化ができた。

postgres=# set max_parallel_degree = 3;
SET
postgres=# explain analyze select * from test_csv where id % 20 = 6;
                                  QUERY PLAN
--------------------------------------------------------------------------------
 Gather  (cost=1000.00..194108.60 rows=94056 width=52)
         (actual time=0.570..19268.010 rows=2000000 loops=1)
   Number of Workers: 3
   ->  Parallel Foreign Scan on test_csv  (cost=0.00..183703.00 rows=94056 width=52)
                                  (actual time=0.180..12744.655 rows=500000 loops=4)
         Filter: ((id % 20) = 6)
         Rows Removed by Filter: 9500000
         Foreign File: /tmp/testdata.csv
         Foreign File Size: 1504892535
 Planning time: 0.147 ms
 Execution time: 19330.201 ms
(9 rows)

topで見ても、worker(ForeignScan)×3とmaster(Gather)×1がぶん回っているのが分かる。
f:id:kaigai:20160129011848p:plain