Treasure Dataのデータを利用するプログラム(Java編)

こんばんは、SSTDの高橋です。 お盆もあっという間に過ぎ、8月も残すところ後わずかとなってしまいましたね。 今週の22日木曜日には、BIGDATAダッシュボード勉強会を開催致します。 おかげさまで、満員御礼となりました。ありがとうございます。 連休でリフレッシュした身体に目一杯ダッシュボードの知識を蓄えて頂ければと思います。

個人的な話ですが、お盆にArduinoとraspberry piを揃えてみました。何を作るか考え中です。 今のところはこんなことを考えています。

  • メッセージボトルに入れて、川から流してみる流水観測
  • 風船につけて飛ばして気象観測
  • 接触センサや距離センサを使った人計測
  • テレビを一緒に見るロボット(修論で似たようなことをしていたので)
それぞれどんなセンサをつけるかも検討中です。他にも良いアイデアがあればご連絡ください。

妄想は膨らむばかりですが、本題に移ります。 今回のブログテーマは、「Treasure Dataのデータを利用するプログラム(Java編)」です。

概要

githubにて公開されているtd-client-javaとそのサンプルを利用してTreasureData上のデータをJavaプログラムによって取得します。 サンプルとして、QuickStartに記載されている4種類のプログラムを利用します。

実行環境

OS
Windows 8 Professional 64bit
統合開発環境
Eclipse Standard 4.3
td-toolbelt version
0.10.84
td-client-java version
0.2.6
Jave version
Java SE 1.7.0

手順

1. td-client-javaライブラリの追加

こちらから td-client-0.2.6-jar-with-dependencies.jarをダウンロードし、プロジェクトのライブラリに追加します。

2. treasure-data.propertiesファイルの追加

TreasureDataのアカウントのAPIキーについてtreasure-data.propertiesファイルに記載します。 また、Proxyを通してアクセスする場合には、Proxy情報についても同じく記載します。 作成したpropertiesファイルをsrcフォルダ以下に配置します。

td.api.key=(your API key)
td.api.server.host=api.treasure-data.com
td.api.server.port=80
http.proxyHost=(your proxy server's host)
http.proxyPort=(your proxy server's port)

APIキーは以下のコマンドで確認することができます。

> td apikey

3. サンプルプログラムの実行

3.1. データベースとテーブルのリスト出力

List Databases and Tablesは、データベースの一覧を取得し、各データベースのテーブル名とそのレコード数を出力するプログラムになっています。 下記のプログラムでは、サンプルのクラス名に変更を加えています。また、メインクラスを別途作成して実行しています。

下記のプログラムを実行した結果は、コンソールにデータベース名とテーブル名及びテーブルのレコード数の一覧を出力されます。

testdb
www_access
100000
testdb
www_access2
5000

「ListDatabasesAndTables.java」
import java.io.IOException;
import java.util.List;
import java.util.Properties;

import com.treasure_data.client.ClientException;
import com.treasure_data.client.TreasureDataClient;
import com.treasure_data.model.DatabaseSummary;
import com.treasure_data.model.TableSummary;

public class ListDatabasesAndTables {
    static  {
        try {
            Properties props = System.getProperties();
            props.load(ListDatabasesAndTables.class.getClassLoader().getResourceAsStream("treasure-data.properties"));
        } catch (IOException e) {
            // do something
        }
    }
    public void doApp() throws ClientException {
        TreasureDataClient client = new TreasureDataClient();
        List<DatabaseSummary> databases = client.listDatabases();
        for (DatabaseSummary database : databases) {
            String databaseName = database.getName();
            List<TableSummary> tables = client.listTables(databaseName);
            for (TableSummary table : tables) {
                System.out.println(databaseName);
                System.out.println(table.getName());
                System.out.println(table.getCount());
            }
        }
    }
}

「main.java」
import com.treasure_data.client.ClientException;

public class Main {
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        ListDatabasesAndTables listdt = new ListDatabasesAndTables();
        try {
            listdt.doApp();
        } catch (ClientException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

プログラムのフォルダ構成

3.2. ジョブの発行

Issue Queriesは、testdbデータベースのwww_accessに対してデータ件数をカウントするHiveQLを実行するプログラムになっています。 そのため、予めtestdbデータベースとwww_accessテーブルを用意しておくか、プログラムを任意のDB名に変更する必要があります。 サンプルからの修正箇所は、3.1.と同様のため、省略します。 実行結果は、以下の通りです。1行目にJobIDが出力され、Jobが成功した場合のクエリの実行結果が2行目に表示されます。 www_accessのレコード件数は100000件となっていることが確認できます。

4342736
[100000]

3.3. jobの一覧と状態の取得

List and Get the Status of Jobsでは、直近から127番目までのJobのJobIDとJobの成否を出力します。 実行結果を下記に示します。 1行目にJobID、2行目にJobの成否が出力され、繰り返し出力されていることが分かります。

4342736
SUCCESS
4342720
SUCCESS

3.4. BulkImport機能によるデータアップロード

Bulk-Upload Data on Bulk Import Sessionは、データサイズが大きいファイルをアップロードするためのBulkImportの機能を使うプログラムです。 このプログラムによって、msgpack形式のファイルをTreasureData上にBulkImportすることができます。

今回のサンプルでは、msgpackのサンプルデータが用意されていないため、td sample:apacheによってJSONのサンプルデータを作成し、bulk_import:prepare_partsコマンドによってJSONをmsgpack形式に変換を行うことで、データを用意します。 コマンドラインにて、①のコマンドを実行します。


> td sample:apache test.json
> td bulk_import:prepare_parts test.json -f json -t time -o ./

Processing test.json...
Preparing part "test_0"...
sample: 2013-08-20 02:24:19 UTC {"host":"128.216.140.97","user":"-","method":" GET","path":"/item/sports/2511","code":200,"referer":"http://www.google.com/sear ch?ie=UTF-8&q=google&sclient=psy-ab&q=Sports+Electronics&oq=Sports+Electronics&a q=f&aqi=g-vL1&aql=&pbx=1&bav=on.2,or.r_gc.r_pw.r_qf.,cf.osb&biw=3994&bih=421","s ize":95,"agent":"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.7 (KHTML, l ike Gecko) Chrome/16.0.912.77 Safari/535.7","time":1376965459}
test.json: 5000 entries.

5000件のアクセスログがmsgpack形式に変換したことを確認します。 次にBulkImportを行うためにサンプルプログラムを確認します。 サンプル内の②の文字列を任意の値にします。nameはbulk_importのユニーク名となり、partIDはbulk_importにてアップロードするファイルを表すユニーク名となります。

上記の修正以外に、importが足りていなかったり、fileが無い場合のエラー処理が無いために警告が発生しますが、eclipseのデバッグ機能で容易に修正できるかと思います。


String name = "session_name";
String database = "database_name";
String table = "table_name";
String partID = "session_part01";

さて、サンプルの修正が終わっても、まだプログラムを実行することはできません。 まず、コマンドラインにて③のコマンドを実行し、BulkImportのセッションを作成しておく必要があります。


> td bulk_import:create session_name database_name table_name

上記コマンドにてセッションが作成し、プログラムを実行します。 コンソールには何も出力されませんが、コマンドラインにて、④コマンドを実行することで、アップロード状況を確認することができます。 Uploaded PartsにpartIDが出力されていたら、ファイルのアップロードは完了です。


> td bulk_import:show session_name
Organization :
Name : session_name
Database : database_name
Table : table_name

Uploaded Parts :
session_part01

しかし、このままではテーブルへのデータのインポートはまだ終わっていません。 ⑤のコマンドにて、perform処理とcommit処理を行うことによって、テーブルへのインポートが完了します。 注意点としては、commit処理を行うと、同じsessionにデータの追加アップロードを行うことができなくなります。 ファイルが複数ある場合には、全ファイルのアップロードが完了してから、commit処理を行いましょう。


>td bulk_import:perform -w session_name
>td bulk_import:commit session_name

以上で、Java編を終わります。 Javaを使うことで、痒いところに手が届くようなツールを作ることもできますね。 それでは、勉強会にて、皆様にお会いできることを楽しみにしています。

執筆:髙橋@SSTD