読者です 読者をやめる 読者になる 読者になる

RiakのBackendのコードを読む(事前調査篇)

Erlang Riak

僕よりも詳しそうな人はいっぱいいるのに、なぜかRiak Source Code Reading @東京 #1の担当になってしまったので、Riakのbitcask, eleveldbバックエンドのコードを読んでいく。

たぶん最終的にはgistかgithubに資料をまとめるけど、ここに書いてあるのはその前段階のかなり個人的なメモ。

あたりをつける

grep 'bitcask'やgrep 'eleveldb'であたりをつけると、riak_kv_*_backend.erlがバックエンドを実装しているモジュールであることが推測できる。

riak_kv_*_backend.erlには次のようなファイルがあった。

% ls deps/riak_kv/src/*backend*.erl
deps/riak_kv/src/riak_kv_backend.erl
deps/riak_kv/src/riak_kv_eleveldb_backend.erl
deps/riak_kv/src/riak_kv_multi_backend.erl
deps/riak_kv/src/riak_kv_bitcask_backend.erl
deps/riak_kv/src/riak_kv_memory_backend.erl
deps/riak_kv/src/riak_kv_yessir_backend.erl

試しにriak_kv_eleveldb_backend.erlを見てみる。

-behavior(riak_kv_backend).

ということなので、バックエンドとなる各モジュールはriak_kv_backendというbehaviorを実装しているらしい。

上からざっと見ていくと、まずcpabilitiesという関数が目に入る。riak_kv_elevel_backend.erlだと以下のようになっている。

-define(CAPABILITIES, [async_fold, indexes]).
capabilities(_) ->
    {ok, ?CAPABILITIES}.

他のバックエンド・モジュールを見ていくと、

% grep CAPABILITIES deps/riak_kv/src/*.erl | grep define
deps/riak_kv/src/riak_kv_bitcask_backend.erl:-define(CAPABILITIES, [async_fold]).
deps/riak_kv/src/riak_kv_eleveldb_backend.erl:-define(CAPABILITIES, [async_fold, indexes]).
deps/riak_kv/src/riak_kv_memory_backend.erl:-define(CAPABILITIES, [async_fold, indexes]).
deps/riak_kv/src/riak_kv_multi_backend.erl:-define(CAPABILITIES, [async_fold]).
deps/riak_kv/src/riak_kv_yessir_backend.erl:-define(CAPABILITIES, [async_fold]).

ということなので、eleveldbとmemoryがindexesに対応していて、他はasync_foldのみということらしい。

bitcaskの方が機能が少ないってことだろうから、bitcaskから読んでいったほうがよいかなと考え、ここからriak_kv_bitcask_backend.erlに切り替えてbehaviorを追ってみる。

バックエンドAPI

あらためてexportしているAPIを見てみると、数は非常に少なく、名前から何をやっているかが明瞭なものが多い。これならソースを読んでいくのも楽そうだという感触を得る。

%% KV Backend API
-export([api_version/0,
         capabilities/1,
         capabilities/2,
         start/2,
         stop/1,
         get/3,
         put/5,
         delete/4,
         drop/1,
         fold_buckets/4,
         fold_keys/4,
         fold_objects/4,
         is_empty/1,
         status/1,
         callback/3]).

start, stopはだれがこのプロセスを起動しているのかがまだ良く分からないので、後回しにする。

Bitcask

get, put, delete

get, put, deleteはbitcask:get, bitcask:put, bitcask:deleteを呼び出す非常に薄いグルーコードになっているだけなのでほぼ自明。

riak_kv_bitcask_backend:fold_buckets
fold_buckets(FoldBucketsFun, Acc, Opts, #state{opts=BitcaskOpts,
                                               data_dir=DataFile,
                                               ref=Ref,
                                               root=DataRoot}) ->
    FoldFun = fold_buckets_fun(FoldBucketsFun),

fold_buckets_funは、Bucketに対する処理を行うFoldBucketsFunをとって、このBackendのすべてのBackendにFoldBucketsFunを適用する関数を返す関数。

async_foldがオプションに設定されているかどうかで分岐する。

    case lists:member(async_fold, Opts) of
        true ->
            %% omit
        false ->
            %% omit

処理の実体は、

bitcask:fold_keys(Ref, FoldFun, {Acc, sets:new()})

ということで、FoldFunで全キーをfoldするということだが、async_foldの場合は{async, fun()}を返すのみ。

riak_kv_bitcask_backend:fold_keys

fold_bucketsと構造は似ている。

fold_keys(FoldKeysFun, Acc, Opts, #state{opts=BitcaskOpts,
                                         data_dir=DataFile,
                                         ref=Ref,
                                         root=DataRoot}) ->
    Bucket =  proplists:get_value(bucket, Opts),
    FoldFun = fold_keys_fun(FoldKeysFun, Bucket),
    case lists:member(async_fold, Opts) of
        true ->
            %% omit
        false ->
            %% omit
    end.
riak_kv_bitcask_backend:fold_objects

これも、fold_buckets, fold_keysと構造は同じ。

fold_objects(FoldObjectsFun, Acc, Opts, #state{opts=BitcaskOpts,
                                               data_dir=DataFile,
                                               ref=Ref,
                                               root=DataRoot}) ->
    Bucket =  proplists:get_value(bucket, Opts),
    FoldFun = fold_objects_fun(FoldObjectsFun, Bucket),
    case lists:member(async_fold, Opts) of
        true ->
            %% omit
        false ->
            %% omit
    end.
riak_kv_bitcask_backend:drop
%% @doc Delete all objects from this bitcask backend
%% @TODO once bitcask has a more friendly drop function
%%  of its own, use that instead.
-spec drop(state()) -> {ok, state()} | {error, term(), state()}.

かなり親切にコメントが書かれているので、なんとなく分かった気になっちゃうけど、実際のbitcaskが作るファイル・ディレクトリの構造を把握していないので、もう少し深追いしたほうがよいかも。

riak_kv_bitcask_backend:is_empty

要調査。

%% @doc Returns true if this bitcasks backend contains any
%% non-tombstone values; otherwise returns false.
-spec is_empty(state()) -> boolean().
    %% Estimate if we are empty or not as determining for certain
    %% requires a fold over the keyspace that may block. The estimate may
    %% return false when this bitcask is actually empty, but it will never
    %% return true when the bitcask has data.
    bitcask:is_empty_estimate(Ref).
riak_kv_bitcask_backend:status

bitcask:statusのラッパーで自明。

%% @doc Get the status information for this bitcask backend
-spec status(state()) -> [{atom(), term()}].
status(#state{ref=Ref}) ->
    {KeyCount, Status} = bitcask:status(Ref),
    [{key_count, KeyCount}, {status, Status}].
riak_kv_bitcask_backend:callback
%% @doc Register an asynchronous callback
-spec callback(reference(), any(), state()) -> {ok, state()}.

ソースを読んでいくと、bitcaskバックエンドが対応しているcallbackは、

{sync, SyncInterval}

と、

merge_check

callbackとはバックエンド特有の処理を登録するものらしく、eleveldbのcallbackでは何もやっていなかった。syncとmerge_checkが何をやっているのかを追っていけば、bitcaskの特性がわかるかもしれない。

eleveldb

ここからはeleveldbバックエンドを読む。

riak_kv_eleveldb_backend:get

bitcask:getがeleveldb:getに変わったくらいで、riak_kv_bitcask_backend:getとほぼ同じ。

riak_kv_eleveldb_backend:put

これもやっていることは単純なのだが、bitcaskと違ってindexに対応しているので、そのための処理が増えている。

    %% Create the KV update...
    StorageKey = to_object_key(Bucket, PrimaryKey),
    Updates1 = [{put, StorageKey, Val}],

Bucket名とキーから実際にデータを書き込む際のキーを作成して、eleveldbに与えるリストUpdate1を作る。

    %% Convert IndexSpecs to index updates...
    F = fun({add, Field, Value}) ->
                {put, to_index_key(Bucket, PrimaryKey, Field, Value), <<>>};
           ({remove, Field, Value}) ->
                {delete, to_index_key(Bucket, PrimaryKey, Field, Value)}
        end,
    Updates2 = [F(X) || X <- IndexSpecs],

IndexSpecsの内容に応じて、putかdeleteでeleveldbに与えるリストUpdate2を作る。

    %% Perform the write...
    case eleveldb:write(Ref, Updates1 ++ Updates2, WriteOpts) of
        ok ->
            {ok, State};
        {error, Reason} ->
            {error, Reason, State}
    end.

最後にUpdate1とUpdate2の内容を実際にeleveldbに書き込む。

riak_kv_eleveldb_backend:delete

putがdeleteになるだけで、riak_kv_eleveldb_backend:putと処理の流れはほぼ同じ。

riak_kv_eleveldb_backend:fold_buckets

eleveldb:fold_keysを呼び出す前にFoldOptsをいじっている。

    FirstKey = to_first_key(undefined),
    FoldOpts1 = [{first_key, FirstKey} | FoldOpts],

のto_first_keyが、

%% @private Given a scope limiter, use sext to encode an expression
%% that represents the starting key for the scope. For example, since
%% we store objects under {o, Bucket, Key}, the first key for the
%% bucket "foo" would be `sext:encode({o, <<"foo">>, <<>>}).`
to_first_key(undefined) ->
    %% Start at the first object in LevelDB...
    to_object_key(<<>>, <<>>);

のように定義されているので、ここではLevelDBの最初のオブジェクトからfoldするということ。

次にやること

残りのriak_kv_eleveldb_backendの関数は軽く眺めただけだが、ここまで読んでみての次にやる必要が感じたこと。

  • バックエンドのプロセスを起動しているのは誰かを調べる。(riak_kv_vnode?)
  • async_foldとは何かを調べる。
  • %%get, put, deleteとかは自明として%%fold_buckets, fold_keys, fold_objectsの実際の内部での使い方を調べる。
    • riak_kv_vnodeのprivateな関数を眺めていると、putとかでもパッと見てもよく分からない処理をやっているので、少し詳しく追っていた方がよいかも。