HyperLogLogを使ったカーディナリティの推測
高校生の頃までは滋賀県に住んでいた事もあり、夜、勉強の合間に、KBS京都で放送されていた『日髙のり子のはいぱぁナイト』を聞いており、日々ネタを考えては、番組へハガキを投稿する常連だった*1のですが(←勉強はどうした)、今回は、PG-Stromに実装した『はいぱぁ』な機能を紹介したいと思います。
SELECT COUNT(distinct KEY) は結構難しい
SELECT COUNT(KEY) FROM my_table;
が
SELECT COUNT(distinct KEY) FROM my_table;
になった瞬間、特にサイズの大きなテーブルをスキャンする場合には、非常に難しい問題になってしまいます。
最初の例は、KEYが非NULLである行数を全部カウントして返せば良いのですが、後者の場合はKEYが重複する場合にはカウントしないため、重複排除を行うための工夫が必要になります。これをカーディナリティを計算すると言います。
これをDBで実装するには2通りの方法が考えられます。
方法①
入力ストリームを予めKEY値でソートしておき、KEY値が変わるたびにカウンタをインクリメントする。
KEY値にインデックスが張られている場合などには有効な方法だが、そうでなければ、実行時にテーブル全体のソートが必要になる。しかも、領域分割による並列処理が不可能であるので、仮に入力レコードが数億行あったとすると、律儀にCOUNT(distinct KEY)関数を数億回実行せねばならない。
方法②
集約関数を実行する Agg ノードでハッシュ表を持っておき、KEY値がそれまでにスキャンしたレコードに含まれているかどうかを判定する。最終的なCOUNT(distinct KEY)の結果は、このハッシュ表のエントリ数となる。
ソートは必要ないが、メモリ消費量が事前に予測不可能で、ハッシュ表のサイズによっては並列処理も難しい。(通常、メモリ消費が問題になるような状況ではマージ処理も大変な負荷になる)
なので、大量のデータセットの中から正確なカーディナリティを出力しようとすると、そこそこ大変(= 処理時間がかかってしまう)という事になります。
「ざっくり」でもよくないですか?
ただこれは、厳密な重複排除を行った集計を行う上での制限事項で、世の中には「ざっくりとした数が知りたい」で十分なケースが存在します。例えば、アクセスログからアクティブなユーザ数を集計してグラフに出したい、といった場合など、多少の誤差は許容できるユースケースです。
これを比較的精度よく推定できる方法として、HyperLogLogという手法が知られており、いくつかのビッグデータ処理向けデータベースに実装されているものもあります。
- Amazon RedShift ... HLL_CARDINALITY 関数 - Amazon Redshift
- Google BigQuery ... 標準 SQL の HyperLogLog++ 関数
- Microsoft CitusDB ... Distributed Distinct Count with HyperLogLog on Postgres — Citus 10.2 documentation
今回は、GPU上でのGROUP BY処理を行うGpuPreAgg機能の拡張として、PG-StromにHyperLogLogを実装してみました。
HyperLogLogアルゴリズムの考え方
HyperLogLogアルゴリズムの考え方をざっくり説明します。
- 前提①:
COUNT(distinct KEY)のKEY値をハッシュ関数にかけると、ランダムなビット列が生成されるハズである。 - 前提②:KEY値のカーディナリティが高ければ、
...10100000のように0が連続するパターンも含まれるハズである。
したがって、テーブルをスキャンしてKEY値のハッシュを計算し、その中で下位ビットから連続する0の個数の最大値を記録しておけば、その集合のカーディナリティは程度であると推定する事ができます。
もちろん、このようにnの値に応じてで増えていく推定値というのはあまりにも誤差が大きいですので、もう少し工夫を加えます。
ハッシュ値の下位bビット分をm互いに独立したカウンタであるHLLレジスタのインデックスと見なし、残りのビット列から連続する0の個数をカウントして、インデックスされたHLLレジスタにその最大値を記録します。
最後に、これの平均値を計算する事で、しばしば混じってしまう例外的なハッシュ値の影響を排除し、もう少し真の値に近いKEY値のカーディナリティを推定する...という流れになります。
この手法の良いところは、入力値をソートする必要がなく、また、テーブルを分割統治して互いに独立なHLLレジスタを作ったとしても、それほど大量のメモリを消費しないため、並列処理に向いているところです。
例えば、64bitのハッシュ値でレジスタのセレクタに10bitを使った場合、各レジスタは8bitあればカウンタとして十分に機能するため、HLLレジスタとして必要なのは僅か1.0kBだけという事になります。
PG-StromにおけるHyperLogLog
ここでは例として、Star Schema Benchmarkデータセットの lineorder テーブルから、lo_custkey*2のカーディナリティを調べてみる事にします。
scale factorは100なので、テーブルのサイズは概ね87GBとなります。
nvme=# \d+
List of relations
Schema | Name | Type | Owner | Persistence | Size | Description
--------+-----------+-------+--------+-------------+--------+-------------
public | customer | table | kaigai | permanent | 406 MB |
public | date1 | table | kaigai | permanent | 416 kB |
public | lineorder | table | kaigai | permanent | 87 GB |
public | part | table | kaigai | permanent | 160 MB |
public | supplier | table | kaigai | permanent | 132 MB |
(5 rows)nvme=# explain select count(distinct lo_custkey) from lineorder;
QUERY PLAN
------------------------------------------------------------------------------
Aggregate (cost=18896094.80..18896094.81 rows=1 width=8)
Output: count(DISTINCT lo_custkey)
-> Seq Scan on public.lineorder (cost=0.00..17396057.84 rows=600014784 width=6)
Output: lo_orderkey, ...(snip)..., lo_shipmode
(4 rows)デフォルト設定では、このように count(distinct ...) を含むクエリをGPUで実行できません。
これは、HyperLogLogによる推定値で count(distinct ...) を代替する事で結果が変わってしまうため、デフォルトでは無効化されているためです。
nvme=# select count(distinct lo_custkey) from lineorder; count --------- 2000000 (1 row) Time: 409851.751 ms (06:49.852)
実行すると、厳密なcount(distinct lo_custkey)は 2,000,000 である一方、その実行には 409 秒を要している事が分かります。
(Sortの高速化を目的としたCPU並列クエリすら有効になっていないので、当然と言えば当然と言えます。)
次に、PG-StromのHyperLogLog機能による count(distinct ...) の置き換えを有効にします。
nvme=# set pg_strom.enable_hll_count = on; SET
実行計画を見てみましょう。
nvme=# explain verbose select count(distinct lo_custkey) from lineorder;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------
Aggregate (cost=7444397.37..7444397.38 rows=1 width=8)
Output: pgstrom.hll_count((pgstrom.hll_pcount(pgstrom.hll_hash(lo_custkey))))
-> Gather (cost=7444397.14..7444397.35 rows=2 width=32)
Output: (pgstrom.hll_pcount(pgstrom.hll_hash(lo_custkey)))
Workers Planned: 2
-> Parallel Custom Scan (GpuPreAgg) on public.lineorder (cost=7443397.14..7443397.15 rows=1 width=32)
Output: (pgstrom.hll_pcount(pgstrom.hll_hash(lo_custkey)))
GPU Output: (pgstrom.hll_pcount(pgstrom.hll_hash(lo_custkey)))
GPU Setup: pgstrom.hll_hash(lo_custkey)
Reduction: NoGroup
Outer Scan: public.lineorder (cost=2833.33..7365270.22 rows=250006160 width=6)
GPU Preference: GPU0 (Tesla V100-PCIE-16GB)
Kernel Source: /var/lib/pgdata/pgsql_tmp/pgsql_tmp_strom_35128.1.gpu
Kernel Binary: /var/lib/pgdata/pgsql_tmp/pgsql_tmp_strom_35128.2.ptx
(14 rows)GPUを用いた集約関数であるGpuPreAggが選択されているほか、元々count(DISTINCT lo_custkey)を出力していた Aggregate ノードが、代わりにpgstrom.hll_count((pgstrom.hll_pcount(pgstrom.hll_hash(lo_custkey)))) の実行結果を出力するように書き換えられています。
内側から順に説明すると、pgstrom.hll_hash(lo_custkey)関数は、HyperLogLogに使用するハッシュ値を計算するための関数で、ここでは軽量かつ比較的ランダムな64bitのハッシュ値が得られるという事でSipHashアルゴリズムを使用しています。
次に、pgstrom.hll_pcount(HASH)関数は、HLLレジスタ配列をセットアップし、引数として与えられた64bitのハッシュ値を元にこれを次々と更新していきます。重要なのは、pgstrom.hll_pcount(HASH)関数はHLLレジスタ配列だけを出力するため、どれだけ巨大なテーブルをスキャンする事になったとしても、pgstrom.hll_pcount(HASH)関数より後の工程ではたった1行しか(GROUP BY句が指定されている場合はグループの数だけしか)返さないという事です。
したがって、各ワーカープロセスから返却されるものも含め、CPUでHLLレジスタ配列をマージする事になるpgstrom.hll_count()関数は、僅か1行 x 3プロセス分の結果を処理するだけで、HyperLogLogによるlo_custkey値のカーディナリティの推定が可能になるという事です。
このような性質により、GPU/CPUの並列処理の恩恵を最大限に受ける事ができるため、パフォーマンスも良好です。
厳密な集計値を導出するために409秒を要していた一方、実際の値と 0.3% 程度しかズレのない 2,005,437 という推定値を9.2秒で導出しています。
nvme=# select count(distinct lo_custkey) from lineorder; count --------- 2005437 (1 row) Time: 9212.712 ms (00:09.213)
実行計画の詳細を見てみましょう。
nvme=# explain (verbose, analyze) select count(distinct lo_custkey) from lineorder;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------
Aggregate (cost=4992387.95..4992387.96 rows=1 width=8) (actual time=9045.729..9081.690 rows=1 loops=1)
Output: pgstrom.hll_count((pgstrom.hll_pcount(pgstrom.hll_hash(lo_custkey))))
-> Gather (cost=4992387.72..4992387.93 rows=2 width=32) (actual time=8892.195..9081.633 rows=3 loops=1)
Output: (pgstrom.hll_pcount(pgstrom.hll_hash(lo_custkey)))
Workers Planned: 2
Workers Launched: 2
-> Parallel Custom Scan (GpuPreAgg) on public.lineorder (cost=4991387.72..4991387.73 rows=1 width=32) (actual time=8760.881..8760.885 rows=1 loops=3)
Output: (pgstrom.hll_pcount(pgstrom.hll_hash(lo_custkey)))
GPU Output: (pgstrom.hll_pcount(pgstrom.hll_hash(lo_custkey)))
GPU Setup: pgstrom.hll_hash(lo_custkey)
Reduction: NoGroup
Outer Scan: public.lineorder (cost=2833.33..4913260.79 rows=250006160 width=6) (actual time=159.316..2800.578 rows=600037902 loops=1)
GPU Preference: GPU0 (Tesla V100-PCIE-16GB)
GPUDirect SQL: load=11395910
Kernel Source: /var/lib/pgdata/pgsql_tmp/pgsql_tmp_strom_39266.2.gpu
Kernel Binary: /var/lib/pgdata/pgsql_tmp/pgsql_tmp_strom_39266.3.ptx
Worker 0: actual time=8694.640..8694.644 rows=1 loops=1
Worker 1: actual time=8699.829..8699.833 rows=1 loops=1
Planning Time: 0.129 ms
Execution Time: 9194.200 ms
(20 rows)このクエリには全体で9.2秒を要していますが、そのうち8.760秒が GpuPreAgg での実行に要しています。
ここでは GPU-Direct SQL を用いて、4台のNVME-SSDから 10GB/s 程度のスループットで合計6億行を読み出していますが、GpuPreAggが出力しているのはHLLレジスタ配列の1行だけであるので、非常に効率的なデータ転送が行われていると言えます。
下の図で言えば、テーブル(ディスク)からデータを読み出し、GPU上で実行される hll_pcount() 関数にロードするところまでが、スループット番長である PG-Strom の真骨頂で、クエリの書き換えとアルゴリズムの工夫により、厄介なCOUNT(distinct KEY)をこのような形態の処理に書き換えるところが HyperLogLog の恩恵と言えるでしょう。

結論
COUNT(distinct KEY)関数で「大まかな推定値」を得れば十分である場合、HyperLogLogを使って相応に精度の良い推定値を得る事ができる。COUNT(distinct KEY)関数を、distinct句の付かない集約関数に書き換える事で、領域分割と並列処理が可能な形式に変換できる。このパターンに落とす事ができれば、GPU-Direct SQLでほぼほぼハードウェアの限界に近い速度で集計処理を回すことができる。
ひとまず、現状では論文に書かれている内容をそのまま何も考えた形なので、例えばカーディナリティが小さい時の推定値のズレや、より正確な推定値を得るための補正(関連研究でそういうのがあるらしい)については、全く何も入っていません。誰かそういうのに強い人がパッチを書いてくれたりすると助かります(ボソッ

