読者です 読者をやめる 読者になる 読者になる

理系学生日記

おまえはいつまで学生気分なのか

忍者TOOLS

Cassandra に対する Java からの Insert

ここで Java を使うのは諸事情のためなのだけど、Cassandra に対するアクセスは Thrift 経由となる。じゃぁ Thrift って何よというと、公式では以下のように説明されている。

Thrift is a software framework for scalable cross-language services development. It combines a software stack with a code generation engine to build services that work efficiently and seamlessly between C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml.

Apache Thrift - Home

要するに、どんな言語でも対応できるような RPC フレームワークで、Thrift の定義を書けば、あとはコード生成エンジンが各言語用のコードを吐いてくれるよ、というヤツである。

ぼく自身は Thrift が流行っているかいないのかよく知らないのだけれど、「Thrift を直に使うのはフレームワーク開発者だけなのでは」という思いは強く持っている。Thrift から生み出される I/F は、人が好き好んで使いたくなるような I/F では決してない。もっと無骨で、無機質な I/F なので、それを使ってコーディングするのも決して楽しいものではない。Cassandra の Official からして、Thrift API をそのまま使うのはお勧めしていない。

Using one of these clients is strongly preferred to raw Thrift when developing applications (the Thrift API is primarily intended for client developers). What follows are clients that support Cassandra 0.8.

http://wiki.apache.org/cassandra/ClientOptions

ただし、実際に Cassandra アプリの開発を行う場合、何らかのライブラリを使うにしてもその内部動作は理解しておかないとダメなのだろうということで、以下では生 Thrift API を使っている。かなしい。それと、Cassandra のドキュメントはマジで何をどう見ればよいのか分からない。かなしい。

Cassandra への接続

Cassandra へのアクセスは、まずは接続を開くところから始まる。
この手続を逐一書くのは面倒なので、Connector というクラスにまとめた。ここで出てくる Cassandra.Client は Cassandra の jar に入っているクラスで、基本的なやりとりはこの Cassandra.Client を介して行うことになる。

public class Connector {
    
    private TTransport transport = null;
    
    // "keyspace" is like a database in RDBMS context
    public Cassandra.Client connect(String host, int port, String keyspace) 
    throws InvalidRequestException, TException {
        transport = new TSocket(host, port);
        TFramedTransport tf = new TFramedTransport(transport);
        TProtocol proto = new TBinaryProtocol(tf);

        // 接続
        transport.open();
        Cassandra.Client client = new Cassandra.Client(proto);
        client.set_keyspace(keyspace);
        
        return client;
    }
    
    public void close() throws TTransportException {
        
        try {
            transport.flush();
        } finally {
            transport.close();
        }
    }
}

大まかに言えば、こんなかんじの使い方を意図している。

    Connector connector = new Connector();
    Cassandra.client client = new connector.connect(
        [cassandra host], [cassandra (thrift) port], "Keyspace");

    // DAO
    YourDao dao = new YourDao(client);
    dao.insert(hogehoge);

Insert/Update

Insert、Update はコード的には全く変わらない。
ただし、Insert/Update する粒度には、Column 単位、Row 単位とあるので、それぞれについてのサンプルコードを書いてみた。これらは全部 SessionDao という DAO (Data Access Object) クラスとしてまとめている。

まず、カラム単位は以下のようになる。

    // rowKey is a primary key in RDBMS context.
    public void insert(String rowKey, String name, String value) 
    throws UnsupportedEncodingException, InvalidRequestException, UnavailableException, 
           TimedOutException, TException {
        long clock = System.nanoTime();
        
        Column column = new Column();
        column.setName( name.getBytes(Constants.UTF8) );
        column.setValue( value.getBytes(Constants.UTF8) );
        column.setTimestamp( clock );
        
        ColumnParent parent = new ColumnParent( COLUMN_FAMILY );
        
        // client is an instance of Cassandra.Client
        client.insert(
                ByteBuffer.wrap(rowKey.getBytes()), parent, column, ConsistencyLevel.ONE);
    }

だいたい直感的だと思うのだけれど、Key => Value の双方はバイトデータとして I/F する点には注意が必要。
また、これらとは別に Timestamp も設定しているが、Cassandra は key/value とともに当該データを変更したタイムスタンプも、Column 単位で保存している。そして、「以前のデータに紐づくタイムスタンプよりも古いタイムスタンプ」を用いた要求は完全に無視する動作をするため、タイムスタンプの設定は必須。


では、1 Row に紐づく複数 Column を一気に insert/update したい、という要求に対しては、Cassandra.client#batch_mutate を使う。

    // columnMap is assumed to contain [key => value] pairs.
    public void batch_insert( String rowKey, Map<String, String> columnMap ) 
    throws UnsupportedEncodingException, InvalidRequestException, UnavailableException, 
           TimedOutException, TException {
        
        Map<ByteBuffer, Map<String, List<Mutation>>> mutationMap = 
            new HashMap<ByteBuffer, Map<String,List<Mutation>>>();
        List<Mutation> mutationList = new ArrayList<Mutation>();
        Map<String, List<Mutation>> columnFamilyMap = new HashMap<String, List<Mutation>>();
        
        for ( String key : columnMap.keySet() ) {
            Mutation mutation = new Mutation();
            mutation.setColumn_or_supercolumn(
                    ColumnManager.createCOSColumn( key, columnMap.get(key) ) );
            mutationList.add(mutation);
        }
        
        columnFamilyMap.put( COLUMN_FAMILY, mutationList );
        mutationMap.put(
                ByteBuffer.wrap( rowKey.getBytes( Constants.UTF8 ) ), 
                columnFamilyMap );
        
        client.batch_mutate(mutationMap, ConsistencyLevel.ONE);
    }

Mutation オブジェクトは、どの ColumnFamily のどのカラムを "Mutate" したいのかを保持するオブジェクトと考えると良い。これを変更したいカラムの分だけリストに詰め込んで、batch_mutate に渡すと insert/update がなされる。
ちなみに、上記で使われている ColumnManager は、逐一 Column オブジェクトだの、ColumnOrSuperColumn オブジェクトを作るのがメンドくさいので作ったユーティリティクラス。

public class ColumnManager {

    public static Column createColumn(String name, String value) throws UnsupportedEncodingException {

        Column column = new Column();
        column.setName( ByteBuffer.wrap( name.getBytes(Constants.UTF8) ) );
        column.setValue( ByteBuffer.wrap( value.getBytes(Constants.UTF8) ) );
        column.setTimestamp( System.nanoTime() );
        
        return column;
    }
    

    public static ColumnOrSuperColumn createCOSColumn(String name, String value) throws UnsupportedEncodingException {

        Column column = createColumn(name, value);
        ColumnOrSuperColumn cosColumn = new ColumnOrSuperColumn();
        cosColumn.setColumn(column);
        
        return cosColumn;
    }
}

実際に確認してみる

以下のような main クラスから、batch_insert を呼び出してみる。

    public static void main(String[] args) {
        String host = "localhost";
        int port = 9160;
        String keyspace = "session";
        
        Connector connector = new Connector();
        try {
            Cassandra.Client client = connector.connect(host, port, keyspace);
            SessionDAO sessionDao = new SessionDAO(client);
            
            Map<String, String> hm = new HashMap<String, String>();
            for ( int j = 0; j < 5; j++ ) {
                for( int i = 0; i < 5; i++ ) {
                    hm.put("key" + i, "value" + i);
                }
                sessionDao.batch_insert("testrow" + j, hm);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                connector.close();
            } catch (TTransportException e) {
                e.printStackTrace();
            }
        }
    }

実行結果の確認には Cassandra に標準でバインドされているツールである cassandra-cli を使用。これは、Oracle でいう sqlplus に相当するツールになる。

[kiririmode@kiririmode-MBA(job:0)]% ./cassandra-cli
Welcome to Cassandra CLI version 1.0.8

Type 'help;' or '?' for help.
Type 'quit;' or 'exit;' to quit.

[default@unknown] connect localhost/9160;
Connected to: "Test Cluster" on localhost/9160
[default@unknown] use session;
Authenticated to keyspace: session
[default@session] get session[utf8('testrow1')];
=> (column=key0, value=76616c756530, timestamp=1331965760309476000)
=> (column=key1, value=76616c756531, timestamp=1331965760309530000)
=> (column=key2, value=76616c756532, timestamp=1331965760309502000)
=> (column=key3, value=76616c756533, timestamp=1331965760309450000)
=> (column=key4, value=76616c756534, timestamp=1331965760309418000)
Returned 5 results.
Elapsed time: 79 msec(s).