Raspberry PI2 にハニーポットを導入してみた
Raspberry PI2 にハニーポットをインストールして運用してみます。 今回、インストールしたハニーポットは、いろいろ評判が良さそうだった「dionaea」を利用します。
インストール
sudo echo "deb http://packages.s7t.de/raspbian wheezy main" >> /etc/apt/sources.list
sudo apt-get update
sudo su
apt-get install libglib2.0-dev
apt-get install libssl-dev
apt-get install libemu
apt-get install libev
apt-get install libpcap
apt-get install libcurl4-openssl-dev
apt-get install libreadline-dev -y
apt-get install libsqlite3-dev -y
apt-get install libtool -y
apt-get install automake -y
apt-get install autoconf -y
apt-get install build-essential -y
apt-get install subversion -y
apt-get install git-core -y
apt-get install flex -y
apt-get install bison -y
apt-get install pkg-config -y
apt-get install libnl-3-dev -y
apt-get install libnl-genl-3-dev -y
apt-get install libnl-nf-3-dev -y
apt-get install libnl-route-3-dev -y
apt-get install liblcfg -y
apt-get install dionaea-python -y
apt-get install dionaea-cython
apt-get install udns
apt-get install dionaea
設定
待ち受け用IP設定
sudo cp /opt/dionaea/etc/dionaea/dionaea.conf.dist /opt/dionaea/etc/dionaea/dionaea.conf
emacs /opt/dionaea/dionaea.conf
dionaea.confの以下2行を変更し、通信を待ち受ける。
mode = “manual” addrs = { eth0 = [“待ち受けるIP“] }
あわせてデフォルト設定ではログが大量に出力されるので5行目あたりを以下のように修正。 levels = "info,warning,error"
実行ユーザ・グループの設定
sudo groupadd dionaea
sudo useradd -g dionaea -s /usr/sbin/nologin dionaea
sudo chown -R dionaea:dionaea /opt/dionaea/var/dionaea/
実行
以下のコマンドで設定ファイルを指定しデーモンモードで実行する。
sudo /opt/dionaea/bin/dionaea -u dionaea -g dionaea -c /opt/dionaea/etc/dionaea/dionaea.conf -D
実行後、nmapで空いているポートを調べると複数のサービスが立ち上がっている。
Host is up (0.0052s latency).
Not shown: 989 closed ports
PORT STATE SERVICE
21/tcp open ftp
22/tcp open ssh
42/tcp open nameserver
80/tcp open http
135/tcp open msrpc
443/tcp open https
445/tcp open microsoft-ds
1433/tcp open ms-sql-s
3306/tcp open mysql
5060/tcp open sip
5061/tcp open sip-tls
RaspberryPi2にELKスタック導入
RaspberryPI2に、Elasticsearch,Logstash,Kibanaを導入してみた。
必要なバイナリのダウンロード
sudo mkdir /usr/share/elasticsearch
cd /usr/share/elasticsearch
sudo wget https://download.elastic.co/kibana/kibana/kibana-4.0.1-linux-x64.tar.gz
sudo wget https://download.elastic.co/logstash/logstash/logstash-1.4.2.tar.gz
sudo wget https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.5.1.tar.gz
javaインストール
jdk8をupdate-alternativesで選択する
$ sudo update-alternatives --config java There are 2 choices for the alternative java (providing /usr/bin/java).
Selection Path Priority Status
*0 /usr/lib/jvm/java-6-openjdk-armhf/jre/bin/java 1057 auto mode 1 /usr/lib/jvm/java-6-openjdk-armhf/jre/bin/java 1057 manual mode 2 /usr/lib/jvm/jdk-8-oracle-arm-vfp-hflt/jre/bin/java 318 manual mode
Press enter to keep the current choice[*], or type selection number: 2 update-alternatives: using /usr/lib/jvm/jdk-8-oracle-arm-vfp-hflt/jre/bin/java to provide /usr/bin/java (java) in manual mode pi@raspberrypi ~ $ java -version java version "1.8.0" Java(TM) SE Runtime Environment (build 1.8.0-b132) Java HotSpot(TM) Client VM (build 25.0-b70, mixed mode)
jdk8がない場合は、別途インストール。(*他のでも動くかもしれないけど一応)
elasticsearchインストール
sudo tar zxvf elasticsearch-1.5.1.tar.gz
sudo mkdir /etc/elasticsearch
sudo cp -p /
sudo cp /usr/share/elasticsearch/elasticsearch-1.5.1/config/elasticsearch.yml /etc/elasticsearch/elasticsearch.yml
sudo mv elasticsearch-1.5.1 /opt/elasticsearch-1.5.1
sudo ln -s /opt/elastcisearch-1.5.1 /opt/elasticsearch
logstashインストール
sudo mv logstash-1.4.2 /opt/logstash-1.4.2
sudo ln -s /opt/logstash-1.4.2 /opt/logstash
sudo mkdir -p /etc/logstash/conf.d
sudo mkdir -p /var/log/logstash/
sudo git clone https://github.com/jnr/jffi.git
cd jffi/
sudo apt-get install -y ant
sudo ant jar
sudo mkdir -p /opt/logstash/vendor/jar/jni/arm-Linux/
sudo cp build/jni/libjffi-1.2.so /opt/logstash-1.4.2/vendor/jar/jni/arm-Linux/
sudo apt-get install zip
cd /opt/logstash/vendor/jar
sudo zip -g jruby-complete-1.7.11.jar jni/arm-Linux/libjffi-1.2.so
kibanaインストール
sudo tar zxvf kibana-4.0.1-linux-x64.tar.gz
sudo mv kibana-4.0.1-linux-x64 /opt/kibana-4.0.1-linux-x64
sudo ln -s /opt/kibana-4.0.1-linux-x64 /opt/kibana
sudo wget http://node-arm.herokuapp.com/node_latest_armhf.deb
sudo dpkg -i node_latest_armhf.deb
sudo mv /opt/kibana/node/bin/node /opt/kibana/node/bin/node.orig
sudo mv /opt/kibana/node/bin/npm /opt/kibana/node/bin/npm.orig
sudo ln -s /usr/local/bin/node /opt/kibana/node/bin/node
sudo ln -s /usr/local/bin/npm /opt/kibana/node/bin/npm
起動
elasticsearchを起動
sudo /opt/elasticsearch/bin/elasticsearch & sudo /opt/kibana/bin/kibana &
確認 ブラウザで「http://XXXXXXX:5301」にアクセス。 kibana の画面が表示すればひとまず成功。
参考
「JJUG CCC 2015 Spring」に行ってきた
Javaのユーザグループ主催のカンファレンス「JJUG CCC 2015 Spring」に参加してきました。 普段見れない他社さんの仕組みの話とか流行りの技術の話とか聞けて、個人的には大満足。
今年でJava15年目。何度も他の言語に乗り換えようとするんだけど、なんだかんだ硬くて素直なところはJavaの良さではあります。あと、積み上げてきたスタックの大きさというか厚みというか、やっぱり信頼おける。と、個人的には思ってます。
13:00- G-1 Java が支える人気ニュースアプリ NewsPicks の裏側 by 文字 拓郎さん(株式会社ユーザベース)
プレゼンメモ
NewsPicks
- Platform/Publisher,Picker
- gunocyとかSmartNewsとは違う層
- 意思決定者層が多い、専門家が多い
- 35万人くらい。有料課金ユーザも増えてる。
- エンジニアの数は10人くらい
- プッシュはあえて手動でやっていたりする
- チーム(プラットフォーム、ブランドデザイン、編集部)
NewsPicks裏側
APP Server内側
SQS
- 可用性が担保された分散キュー。
- トランザクショナルではない。
ElasticSearch
- 全文検索。重複判定。
Backend
- CRONでスケジュール
- 取り込んだ記事がタイムラインに反映されるまで
- DynamoDBに一旦取り込む
- カテゴライズサービスでカテゴライズする
- ユーザのタイムラインに向けて波及させる
- 間にはキューを挟む
他の言語
Redis編
- Redisのマスタースレイブ。たまにこけてタイムラインがなくなる。
- ユーザ数10万人くらいで垂直分割
- selectする層を用意して使う
- 人気ユーザが寄っているので、実はユーザで分散しちゃダメ。
- 結局、ハッシュ分散
- Read/Writeをわけて読み出しはReadレプリカ
SEO編
- Angular で開発されたSPA。
- Phantom でスナップショット
急成長の代償
- 後方互換性を維持したままひたすら拡張
- レイヤリングがほどんどされていない
- スタートアップ
グロース編
- REDSHIFT トラッキングログを整備、KPIダッシュボード
- SPRING BOOT , DOMA2 ,ANGULAR JS ,D3 一週間くらい
- SQL20 がシンプル
Angularやめる
- SEO
- polyfill service ユーザエージェントでいろいろ
- THYMELEAF
- handlebars とか使うと便利かも
所感
- 知らないライブラリが出てきたのと実際に動いているサービスの裏側のアーキの話をしていただけるのは大変貴重で、かなり勉強になった。
- みんなreact使ってみたいんだな、と。
プレゼンスライド
https://speakerdeck.com/monzou/java-gazhi-eru-ren-qi-niyusuapuri-newspicks-falseli-ce
14:00- Web開発における最新テスト手法 by tokuhiromさん (subtech)
プレゼンメモ
- 導入
- サーバサイドの開発はテストコードを書く
- どこまで自動化するが
- 手動でやったほうが楽なものは手動で
- 繰り返すものは自動で
- テストコード「契約」となるので堅牢なものになる
- ビジネスロジックはモデルに入るのでモデルをテストする
- RDBまわり
- 外部APIのテスト
- embedded jetty を使う
- apimock
- コントローラのテスト
- ダミーデータ
- テストライブラリ
所感
- なるべく動作環境に近い環境でテストしましょう、っていうのは同感。なのでjetty立ち上げTDDは試してみてもいいかも。
- ただ、結局、何を確認したいかによるよねと。間違いようのないhttpアクセスのコードだったらモックでもいいじゃん、という気もする。というかそうしてる。
プレゼンスライド
- 見つからなかったです。
15:00- 大規模な負荷でもドキドキしない為のJava EE by nagaseyasuhitoさん (java-ja / グリー株式会社)
プレゼンメモ
- 負荷テスト
- アンチパターン
- 負荷テストの目的
- システムの限界性能を知る
- 高負荷時の不具合を発見する
- Stress Test meets Continuous Integration
- JMeter
- jmeter-maven-pluginで自動化
- 負荷テストのパラメタはMavenのプロパティ化してチューニングするとよい。
- jenkins performance plugin jmeterの結果を出力するプラグイン
- 負荷をかけるとどうなるか
- 他ユーザのレスポンすが返ってくる
- レスポンセウが返ってこない
- リソースは余ってるのにレスポンスが遅い
- リクエストが遅い
- レスポンスが遅い
- ボトルネックを探す
- Mission Control ,Flichg REcorderで取得したJVMの統計情報を可視化するツール
- 以下のオプションをつけてアプリサーバを起動する -XX;LUnlockCommercialFeatures -XX:+FlightRecorder
- JPAのスケールアウト戦略
所感
- 意外とjmeterで地味に負荷かけてるんだな、っていうところが驚き。
- CIにどこまでのシナリオを組み込むかは課題なのかな。uiが変わったら作り直しだし、毎回チューニングするわけにもいかないだろうし。
- エンタープライズの人だからか、やっぱり、負荷テストやってから「限界を知る」っていうのには違和感がある。いやまあ、実際そういう面もあるんだろうけど、出来高でやってていいのかなあ、っと。
プレゼンスライド
16:00 『Embulk』に見るモダンJavaの実践的テクニック ~並列分散処理システムの実装手法~
by なひ さん(トレジャーデータ株式会社)
プレゼンメモ
- Embulk
- Java実装技術
所感
- やっぱりEmbulk便利そう
- 噂には聞いているけどUnsafe API。名前からして触ってはいけないもの感が漂ってるんだよなあ。いつか触ってみよう。
プレゼンスライド
17:00- Grails 第3章 進化したSpring-bootベースフレームワーク
by 山本 剛さん(日本Grails/Groovyユーザーグループ, newcast inc.)
プレゼンメモ
- Grails
- Road to 3
- Grails1系:古代
- grails2.3系まで:近代
- Spring Bootをベースにする
- grails2.4 はspring4
- Grails 3
- Groovy 2.4
- Spring 4.1
- Spring Boot 1.2
- Gradle
- JavaやっててGradleやってないのはおかしい
- なぜspring boot?
- マイクロサービス
- fat jarと組み込みコンテナ
- モニタリングとヘルスチェック
- オートコンフィギュレーション
- Springの柔軟性
- *spring bootが簡単そう
- コアフィーチャ
- Groovy2.4 で速くなった
- 内部フローの変更
- 開発環境フィーチャ
- Intellij IDEAのgradleプロジェクトで開発
- テストフィーチャ
- まとめ
- spring boot
- 3.0はまだ不安定かも。3.1に期待。
所感
- うーむ、grails3もちょっと厳しそうだな。
プレゼンスライド
18:00 いろんなデータをKibana4で見てみよう by 大谷 純 さん (elastic.co)
プレゼンメモ
- elastic
- elasticsearch から elasticへ変更。
- ELK Stack
- DataをLogstash に食わせて、elasticsearchにためて、データをkibanaで見る。
- Logstash
- 概要
- ログデータの収集・管理
- 収集、パース・加工、送出
- jruby
- アーキ
- input Filter output
- 設定ファイルでinput,filter,outputを指定する
- 概要
- Elasticsearch
- Kibana4
- ユースケース:アクセスログ
- ユースケース:ツイート
- logstash
- ツイッターのinputもできる
- logstash
- ユースケース:git log
- gitのヒストリを取り出してelasticsearchに投げる
- ユースケース:javaのログ
- multilineを使うとstacktraceとかの複数行も取れる
- elasticsearch勉強会やってる
所感
- kibana4になってますますsplunkっぽくなったなあ
- raspberry pi2 に入れてみよっと
プレゼンスライド
- 見つからなかったです。
Raspberry PI2にSnortインストール
snort と snortのアラートを見るためのGUI のsnorby をRaspberry PI2にインストールします。 snortは素直にインストールできましたが、snorbyはうまくいきませんでした。 やったところまで書いときます。
snortのインストール
ほんとコレだけ。
$sudo apt-get install snort
snorbyのインストール
snorbyが使うmysqlをインストール。
$ sudo apt-get install mysql-server mysql-client
$apt-get install snort-mysql
$ sudo apt-get install libyaml-dev git-core default-jre imagemagick libmagickwand-dev wkhtmltopdf build-essential libssl-dev libreadline-gplv2-dev zlib1g-dev libsqlite3-dev libxslt1-dev libxml2-dev libmysqlclient-dev libmysql++-dev apache2-prefork-dev libcurl4-openssl-dev ruby ruby-dev
$ sudo apt-get install linux-headers-rpi
rails とsnorby とかいろいろインストール。
$ sudo gem install bundler rails
$ sudo gem install rake --version=0.9.2
$ cd /var/www/
$ sudo git clone http://github.com/Snorby/snorby.git
$ cd /var/www/snorby/config
$ sudo cp database.yml.example database.yml
$ sudo cp snorby_config.yml.example snorby_config.yml
$ sudo sed -i s/"\/usr\/local\/bin\/wkhtmltopdf"/"\/usr\/bin\/wkhtmltopdf"/g snorby_config.yml
database.yml のデータベースの設定を修正し、以下のコマンドでインストール
$ cd /var/www/snorby/
$ sudo bundle install --deployment
$ sudo bundle exec rake snort:setup
以下のエラーメッセージが表示されてインストール出来ず。 海外でも同様のエラーが報告されているが、解決策は見つからず・・・
rake aborted!
cannot load such file -- dm-rails/railtie
とりあえず、snort単体で動かすか。。。
Apache storm チュートリアルの和訳
英語の勉強も兼ねてApache Stormのチュートリアルを和訳してみました。 ほぼ直訳・一部意訳です。間違いは大目に見ていただけると助かります。
https://storm.apache.org/documentation/Tutorial.html
チュートリアル
このチュートリアルでは、Stormトポロジーの作り方とStormクラスタのデプロイ方法について学びます。Javaをメインの言語で使いますが、Stormの他言語対応を説明するためにPythonを使います。
序文
このチュートリアルは、storm-starterプロジェクトを使います。本プロジェクトをcloneし、例に従うことをおすすめします。「Setting up a development environment」と「Creating a new Storm projet」を読んで、セットアップしてください。
Stormクラスタのコンポーネント
Stormクラスタは一見Hadoopクラスタに似ています。Hadoopでは、「MapReduce jobs」を使いますが、Stormでは「トポロジー」を使います。「Jobs」と「トポロジー」は異なります。一つの大きな違いは、MapReduce Jobは処理を終了するが、トポロジーはメッセージをずっと処理し続けます。
Stormクラスタにはマスターノードとワーカーノードの2つのノードがあります。マスターノードは、「Nimbus」と呼ばれるHadoopの「JobTracker」に似たデーモンを実行します。Nimbusはクラスタへのコードの配布、マシンへのタスクのアサイン、障害の監視の責務をおいます。
ワーカーノードは「Supervisor」と呼ばれるデーモンを実行します。Supervisorは、マシンへの処理のアサインを待ち、Nimbusの指示によって必要に応じてワーカープロセスの開始・終了します。ワーカープロセスはトポロジーのサブセットを実行します。トポロジーの実行は、多くのマシンに分散した多くのワーカープロセスにより構成されます。
※storm.apache.orgより引用
NimbusとSupervisor間の全ての調整はZookeperクラスタを通して行います。加えて、NimbusデーモンとSupervisorデーモンはfail-fastでステートレスです。全ての状態はZookeperかローカルディスクによって保持されます。これは、kill -9 NimbusまたはSupervisorしても何もなかったかのように処理を再開出来ることを意味します。この設計によってStormクラスタは信じられないほど安定しています。
トポロジー
Stormでリアルタイム処理をするためには、トポロジーと呼ばれるものを作ります。トポロジーは処理のグラフです。トポロジーのノードは処理ロジックを含み、ノード間のリンクはデータがどのようにノード間でやり取りされるかを示しています。トポロジーの実行は簡単です。まずはじめに、コードをパッケージングし、依存するライブラリを一つのjarに格納します。それから、以下のようなコマンドを実行します。
storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
上記は、backtype.storm.MyTopologyクラスを引数arg1,arg2で実行しています。クラスの主な機能は、トポロジの定義とNimbusへの登録です。Storm jar の部分は、Nimbusへの接続とjarのアップロードを行います。
トポロジーの定義は、Thrift構造体であり、NimbusはThriftサービスなので、いろんな言語でトポロジの作成と登録が行えます。上記の例は、JVMベース言語を用いた最も簡単な例です。さらにトポロジーの開始と終了に関する多くの情報は、「Running topologies on a production cluster」を参照してください。
Streams
Stormのコアは「stream」です。streamはタプルの無限の列です。Stormは、分散と信頼性の高い方法で新しいstreamにstreamを変換するためのプリミティブを提供します。例えば、つぶやきのストリームをトピックのトレンドのストリームに変換出来ます。 Streamの変換を行うためにStormが提供する基本要素は「spouts」と「bolts」です。SpoutsとBoltsは、アプリケーション固有のロジックを実行するインターフェースを備えています。 Spoutは、Streamの発行元となります。例えば、spoutは、Kestrelキューからタプルを読み込み、新たなストリームとして発行します。または、spoutは、TwitterAPIに接続し、つぶやきのストリームを新たに発行します。
boltはstreamを入力とし、何らかの処理を行い、新しいstreamを発行します。つぶやきから流行トピックのストリームを算出するような複雑なstream変換は、複数のステップ(複数のbolts)を要します。Boltsは機能の実行、タプルのフィルタリング、streamの計算、streamの結合、データベースとのやりとりなどを行うことが出来ます。 spoutsとboltsのネットワークは、Stormクラスタに登録するトポロジーのパッケージに含まれます。トポロジーは、spoutまたはboltsのノードによるstream変換のグラフです。グラフのエッジは、streamにsubscribeされたboltsを示します。spoutまたはboltsがstreamにタプルを発行したとき、streamにsubscribeされた全てのboltsにタプルが送信されます。
※storm.apache.orgから引用
トポロジーのノード間のリンクは、タプルをどのようにやり取りするかを示します。例えば、SpoutAとBoltBの間のリンクがあり、SpoutAからBoltCへのリンク、BoltB からBoltCへのリンクがあった場合、常にSpoutAはタプルを発行し、BoltBとBoltCにタプルを送ります。BoltBのタプルは、BoltCへ送出されます。 Stormトポロジーのノードは、並列に実行されます。それぞれのノードに対してどの程度の並行性を識別することができ、Stormは実行のためにクラスタの間でいくつかのスレッドを生成します。 トポロジーはkillするまで永久に動作します。Stormは失敗タスクを自動的に再アサインします。加えて、Stormはマシンがダウンしメッセージを受け取れなくても、データロスがないことを保証します。
データモデル
Stormはタプルをデータモデルとして利用します。タプルは値の名前付きリストであり、タプルのフィールドはあらゆる種類のオブジェクトです。Stormは全てのプリミティブタイプと文字列とバイト配列をタプルのフィールドとしてサポートもしています。他のオブジェクトを利用するためには、オブジェクトのserializerを実装する必要があります。
トポロジーの全てのノードは、タプルの出力フィールドを宣言する必要があります。たとえが、このboltは出力する2つのタプルを「double」と「triple」のフィールドとともに宣言しています。
public class DoubleAndTripleBolt extends BaseRichBolt {
private OutputCollectorBase _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
_collector = collector;
}
@Override
public void execute(Tuple input) {
int val = input.getInteger(0);
_collector.emit(input, new Values(val*2, val*3));
_collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("double", "triple"));
}
}
declareOutputFields は出力するフィールド{double,triple}を宣言する関数です。boltの残りの部分は続くセクションで説明します。
シンプルなトポロジー
さらなるコンセプトの探求とどのようにコードを実装するかを確認するために、シンプルなトポロジーを確認してみましょう。storm-starterのExclamationTopologyを見てください。
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
.shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
.shuffleGrouping("exclaim1");
このトポロジーはspoutと2つのboltを含みます。spoutはwordsを生成し、boltは「!!!」の文字列を入力に対して付与します。spoutは最初のboltにemitし、その次のboltに対してemitします。spoutがタプル「bob」と「john」をemitした場合二番目のboltは「bob!!!!!!」と「john!!!!!!」を生成します。 コードはsetSpoutとsetBoltメソッドを使うノードを定義しています。これらのメソッドは、userを識別するid、処理するためのロジックを含んだオブジェクトそして、ノードに求める並列化の量をインプットとします。この例では、spoutは「words」をidとして与えboltsは「exclaim1」「exclaim2」をidとして与えられています。 処理ロジックを含んだオブジェクトは、spoutにIRichSpoutインターフェースを実装し、boltsにIRichBoltインターフェースを実装しています。 最後のパラメタは、ノードに求める並列化の量を指します。どれほどのクラスタを超えたコンポーネントを実行するスレッドの数を指し示します。omitした場合、Stormは1つのスレッドのみを割り当てます。 setBolt はBoltの入力を定義するために利用されるInputDeclarerオブジェクトを返します。ここでは、コンポーネント「exclaim1」をシャッフルグルーピングを用いて「words」コンポーネントによって発行された全てのタプルを読むことを宣言します。「exclaim2」コンポーネントは、「exclaim1」コンポーネントによって発行されたすべてのタプルを読むコを宣言します。「シャッフルグルーピング」とはタプルが入力タスクからランダムにboltのタスクに配送されることを意味します。コンポーネント間のデータのグルーピングには多くの方法があります。いくつかのセクションで説明します。
builder.setBolt("exclaim2", new ExclamationBolt(), 5)
.shuffleGrouping("words")
.shuffleGrouping("exclaim1");
上記の通り、Boltの入力宣言は複数のソースをつなげて記述出来ます。
このトポロジーのspoutとboltの実装について深く確認してみましょう。Spoutsはトポロジーに新しいメッセージを発行することについて責務をおいます。このトポロジーのTestWordSpoutはリスト「nathan mike jackson golda bertels」のリストから100msごとにランダムな単語を発行します。TestWordSpoutのnextTuplue()はこのようになります。
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}
上述の通り、実装は単純です。
ExclamationBolt は入力に"!!!"を追加します。ExclamtionBoltのすべての実装を見てみましょう。
public static class ExclamationBolt implements IRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
public Map getComponentConfiguration() {
return null;
}
}
prepareメソッドは、このboltからのタプルの発行に使われるOutputCollectorを提供します。タプルはboltからいつでも発行出来ます。prepare,executeまたはcleanupメソッド、他スレッドで非同期に発行することも出来ます。このprepareの実装は、シンプルにOutputCollecotrをexecuteメソッドで後ほど利用するインスタンス変数として保持しています。
executeメソッドはboltの入力の一つからタプルを受け取ります。ExclamationBoltはタプルのはじめのフィールドを引き回し、そこに!!!を追加して新しいタプルを発行します。もし複数のソースを入力とするboltを実装した場合、Tuple#getSourceComponentメソッドを使ってどのコンポーネントからタプルがきたかを調べることが出来る。 executeメソッドには他の事柄もあります。すなわち、入力タプルはemitの第一引数渡され、最後の行で応答されます。データをロスすることないStormの信頼APIがあるが後ほど説明します。
cleanupメソッドは、Boltが停止するときのリソースをクリーンアップするときに呼び出される。このメソッドはクラスタ上で呼び出される保証がない。 例えば、マシンのタスクが実行中のとき、メソッドを実行する手段がない。cleanupメソッドは,ローカルモードでトポロジーを実行している場合に利用されることを意図しており、リソースがリークすることなく多くのトポロジーの実行とkillを行うことが出来る。 declareOutputFieldsメソッドは、ExclamationBoltが「word」と呼ばれるフィールドを持つ1タプルを発行することを宣言している。
getComponentConfigurationメソッドは、どのようにコンポーネントを実行するかの各種設定を行うことが出来る。これはConfigurationを説明する高度なトピックです。 cleanupやgetComponentConfigurationのようなメソッドは、boltの実装に必ずしも必要なものではありません。boltは、デフォルトの実装を提供するベースクラスを利用することでより簡潔に定義できます。ExclamationBoltは、BaseRichBoltを継承することでより簡潔に書くことができます。
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
ローカルモードでのExclamationTopologyの実行
どのようにローカルモードでExclamationTopologを実行・動作するかをみていきましょう。
Stormは2つの実行モードがあります。ローカルモードとdistributedモード。ローカルモードでは、Stormはワーカーノードをスレッドを用いてシミュレーションして実行します。ローカルモードはトポロジーのテストや開発に利用するのに適しています。storm-starterでトポロジーを実行する場合、ローカルモードで起動し、コンポーネントが発行されるメッセージを確認することができます。より詳細なトポロジーのローカルモードでの実行については、Local mode(https://storm.apache.org/documentation/Local-mode.html)に示します。
distributedモードでは、Stormはマシンのクラスタ上で動作します。マスターにトポロジーをsubmitすると、トポロジーの実行に必要な全てのコードがsubmitされます。マスターはコードの配布とトポロジーを実行するためのワーカーの準備を行います。ワーカーが停止する場合、マスターは他のものにアサインし直します。より詳細なクラスター上での実行については、Runnning topologies on a production cluster(https://storm.apache.org/documentation/Running-topologies-on-a-production-cluster.html)に示します。
このコードは、ExclamationTopologyをローカルモードで実行します。
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
はじめに、コードはLocalClusterオブジェクトによって作成されるクラスタを定義しています。仮想クラスタへのトポロジーのsubmitは、distributedクラスタへのsubmitと同様です。submitTopologyメソッドによって、LocalClusterにトポロジーがsubmitしています。引数は、トポロジーの名前、トポロジーの設定、トポロジー本体です。 引数nameはトポロジーを停止するときなどに、トポロジーを識別することに使われます。トポロジーはkillされるまで無期限で動作します。 引数configurationは、トポロジーを動作させるための各種設定に利用されます。2つの設定は共通的に扱われます。
- TOPOLOGY_WORKERS(set with setNumWorkers) トポロジーを実行するために、必要なプロセス数を定義します。トポロジーのコンポーネントは多くのスレッドで実行されます。コンポーネントに割り当てられるスレッドの数は、setBoltとsetSpoutメソッドによって設定されます。これらのスレッドは、ワーカープロセス内に存在します。ワーカープロセスは、いくつかのコンポーネントのためのいくつかのスレッドを含んでいます。例えば、300スレッドを全てのコンポーネントと50ワーカープロセスを設定で指定します。それぞれのワーカープロセスは、6スレッドで実行されます。Stormトポロジーの性能を、それぞれのコンポーネントとスレッドが実行されるワーカープロセス数の並列処理の微調整で、チューニングすることができます。
- TOPOLOGY_DEBUG(set with setDebug) trueに設定した場合、Stormはコンポーネントにメッセージを発行するごとにログ出力します。ローカルモードでトポロジーのテストを行うときに有益です。ただし、トポロジーをクラスター上で実行する場合は、停止したほうがよいでしょう。
他にも多くのトポロジーの設定があります。各種設定の詳細については、Javadoc for Config(https://storm.apache.org/javadoc/apidocs/backtype/storm/Config.html)に示します。 ローカルモードでトポロジーを実行するためにどのように開発環境(Eclipseなど)をセットアップするかについては、Creating a new Storm Project(https://storm.apache.org/documentation/Creating-a-new-Storm-project.html)に示します。
Stream groupings
streamのグルーピングは、2つのコンポーネント間でタプルをどのように送信するかをトポロジーに示します。spoutsとboltsは、多くのタスクをクラスタ間で並列に実行します。トポロジーのタスクレベルの実行は、以下のように行われています。
※storm.apache.orgより引用
BoltAのタスクがBoltBにタプルを発行するとき、どのタスクにタプルを送信すればよいでしょうか? stream groupingは、タスクの組み合わせの間をどのようにタプルを送信するかをStormに指示するものです。異なる種類のstream groupingsについて深堀するまえに、storm-starterのもう一つのトポロジーを確認してみましょう。
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8)
.shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 12)
.fieldsGrouping("split", new Fields("word"));
SplitSentenceは、受け取った文章中のそれぞれの単語をタプルとして発行し、WordCountは単語とカウントをmapに保持します。WordCountがwordを受け取るごとに、状態と新しい単語数を発行します。
stream groupingsにはいくつかの種類があります。
単純なグルーピングは、「shuffule grouping」と呼ばれ、タプルをランダムにタスクに送出します。shuffule groupingはWordCountTopologyのRandomSentenceSpoutからSplitSentence boltへのタプルの送出に利用されています。SplitSentence boltのタスクへ均等にタプルを配布する効果があります。
さらに興味深いgroupingの種類は「fields grouping」です。field groupingは、SplitSentence boltとWordCount boltの間で利用されています。同じwordが同じタスクに送信される必要があるWordCount boltの機能にとって重大です。一つ以上のタスクが同じ単語を見てしまうと、それぞれ不完全な値を持つため、誤ったカウントを送信してしまいます。fields groupingは、フィールドによってストリームをグルーピングします。これによって、同じタスクに同じフィールド値を持つものが送信されます。WordCountがSplitSentenceのwordフィールドによるfield groupingの出力ストリームを受け取ることにより、同じ単語は常に同じタスクに送信され、boltは正確な値を作成することができます。
Fields groupingはストリームの結合やストリームの集計やその他のユースケースの基本となります。fields groupingsはmodハッシングを利用しています。 他にもいくつかのstream groupingsがあります。詳細については、Consepts(https://storm.apache.org/documentation/Concepts.html)に示します。
Boltsを他言語で定義する
省略
メッセージ処理の保証
このチュートリアルでは、タプルの発行方法についてのいくつかについてはスキップしました。これらのaspectsはStormの信頼性APIの一部です。Stormがどのようにspoutから送信される全てのメッセージが処理されるかを保証方法です。どのように働き、Stormの信頼性機能を利用するかの情報については、Guaranteeing message processing(https://storm.apache.org/documentation/Guaranteeing-message-processing.html)を確認してください。
トランザクショナルトポロジー
Stormは、全てのメッセージが最低1回トポロジーで処理されることを保証します。共通の質問としてたずねられるのが「どのようにStormのtopでカウントしているのか?overcountすることはないか?」です。Stormは、計算のために正確に1回メッセージ処理を実現するトランザクショナルトポロジーと呼ばれる機能を持っています。詳しい内容についてはこちら(https://storm.apache.org/documentation/Transactional-topologies.html)に示します。
配送されたRPC
このチュートリアルでは、Stormによるstream処理の基本を説明しました。他にも多くのことがStreamの基本によって出来ます。Stormのアプリケーションの中で興味深い一つはDistributed RPCです、on the fly で並列計算を行うときに。Distributed RPCについてはこちら(https://storm.apache.org/documentation/Distributed-RPC.html)に示します。
Conclusion
このチュートリアルでは、Stormトポロジーの開発、テスト、配備の概要について説明しました。他のドキュメントはStormを利用する際の側面についてより詳細に示しています。
Raspberry PI2セットアップ
注文していたRaspberry PI2が届いたのでさっそくセットアップしてみた。 正常に動作するまでは、SDカードの書きミス、電源不足という壁があったが、きちんとしたものをそろえれば問題なし。
用意するもの
- Raspberry PI2
- HDMIケーブル
- microUSB - USBのケーブル(そこそこいいやつ) ELECOM TB-AMB2A12BK
- USBのACアダプタ(2A以上推奨とのこと) iBuffalo BSMPA09BA
- microSDカード 東芝の64GB
※ USBのACアダプタおよびUSBのケーブルは、そこそこのものを使わないと、電力が安定せず、再起動を繰り返すなぞの機械が出来上がります。 はじめこちらを注文して試してたけど、評価コメントにある通り、RPI2ではうまく動きません。要注意。
Amazon.co.jp: Raspberry Pi対応 USB電源アダプタ 2.0A 1ポート 高品質電源 (PSE、CEマーク付き) +USBケーブル付: パソコン・周辺機器
OSのインストール
OSイメージのダウンロード
Raspberry PI用のOSをダウンロードする。 以下のURLからお好みのOSをダウンロード
rhel系をよく使ってるので、PIDORAを使おうとしたけど、PI2には対応しておらず。 おとなしく一番情報がありそうなRASPBIANを使うことにする。
ダウンロードできたら、unzipコマンドで解凍する。 以下のファイルがzipファイルに含まれているはず。
SD カードに書き込み
sdカードのフォーマット。 フォマット用のソフトを以下のリンクからダウンロード。 - https://www.sdcard.org/downloads/formatter_4/eula_mac/index.html
とりあえず、上記のソフトの「オプション」から論理アドレスの調整を「On」にしてフォーマット。
dfコマンドで mount 場所を確認してimgファイルを書き込み。
$ sudo diskutil unmountDisk /dev/disk1s1
$ sudo dd bs=1m if=2015-02-16-raspbian-wheezy.img of=/dev/rdisk1
3125+0 records in
3125+0 records out
3276800000 bytes transferred in 159.860334 secs (20497893 bytes/sec)
起動
いろいろつなぐ
起動する前にいろいろつなぎます。
電源いれる
というか、電源ケーブルをさすと電源ON状態になる。 電源が点くと電源ランプが点灯します。
起動後の設定
パーティションの拡張 SDカードの容量が余ってる場合は、パーティションの拡張を行う。後からでも出来るけど。 「1 Expand FileSystem」を選択してメニューに従う。
ロケール・タイムゾーン・キーボードの設定 「Internationallisation Options」のメニューからそれぞれ - ja_JP.UTF-8 - Tokyo - Generic 102-key PC -> Other-> Japanese -> Japanese -> The Default for the keyboard layout -> No Compose key -> yes を選択。
ログイン
デフォルトで user:pi password:raspberry でログイン出来る。
リモートログインの確認
RASPI側のネットワークアドレスを確認する。DHCPを使っていればアドレスがふられている。
Mac側から以下のコマンドでSSHログイン。ユーザ:pi パスワード:raspberry がデフォルト。
$ ssh pi@[IPアドレス]
ログイン後
ログイン後、apt-getしておく。
$ sudo apt-get update
$ sudo apt-get upgrade
emacs が入っていないので入れておくこと。 $ sudo apt-get install emacs