Raspberry PI2 にハニーポットを導入してみた

Raspberry PI2 にハニーポットをインストールして運用してみます。 今回、インストールしたハニーポットは、いろいろ評判が良さそうだった「dionaea」を利用します。

インストール

sudo echo "deb http://packages.s7t.de/raspbian wheezy main" >> /etc/apt/sources.list
sudo apt-get update
sudo su
apt-get install libglib2.0-dev 
apt-get install libssl-dev
apt-get install libemu
apt-get install libev
apt-get install libpcap 
apt-get install libcurl4-openssl-dev
apt-get install libreadline-dev -y
apt-get install libsqlite3-dev  -y
apt-get install libtool -y
apt-get install automake -y
apt-get install autoconf -y
apt-get install build-essential -y
apt-get install subversion -y
apt-get install git-core -y
apt-get install flex -y
apt-get install bison -y
apt-get install pkg-config -y
apt-get install libnl-3-dev -y
apt-get install libnl-genl-3-dev -y
apt-get install libnl-nf-3-dev -y
apt-get install libnl-route-3-dev -y 
apt-get install liblcfg -y
apt-get install dionaea-python -y
apt-get install dionaea-cython 
apt-get install udns 
apt-get install dionaea 

設定

待ち受け用IP設定

sudo cp /opt/dionaea/etc/dionaea/dionaea.conf.dist /opt/dionaea/etc/dionaea/dionaea.conf
emacs /opt/dionaea/dionaea.conf
  • dionaea.confの以下2行を変更し、通信を待ち受ける。

    mode = “manual” addrs = { eth0 = [“待ち受けるIP“] }

  • あわせてデフォルト設定ではログが大量に出力されるので5行目あたりを以下のように修正。 levels = "info,warning,error"

実行ユーザ・グループの設定

sudo groupadd dionaea
sudo useradd -g dionaea -s /usr/sbin/nologin dionaea
sudo chown -R dionaea:dionaea /opt/dionaea/var/dionaea/

実行

以下のコマンドで設定ファイルを指定しデーモンモードで実行する。

sudo /opt/dionaea/bin/dionaea -u dionaea -g dionaea -c /opt/dionaea/etc/dionaea/dionaea.conf -D

実行後、nmapで空いているポートを調べると複数のサービスが立ち上がっている。

Host is up (0.0052s latency).
Not shown: 989 closed ports
PORT     STATE SERVICE
21/tcp   open  ftp
22/tcp   open  ssh
42/tcp   open  nameserver
80/tcp   open  http
135/tcp  open  msrpc
443/tcp  open  https
445/tcp  open  microsoft-ds
1433/tcp open  ms-sql-s
3306/tcp open  mysql
5060/tcp open  sip
5061/tcp open  sip-tls

RaspberryPi2にELKスタック導入

RaspberryPI2に、Elasticsearch,Logstash,Kibanaを導入してみた。

必要なバイナリのダウンロード

sudo mkdir /usr/share/elasticsearch
cd /usr/share/elasticsearch
sudo wget https://download.elastic.co/kibana/kibana/kibana-4.0.1-linux-x64.tar.gz
sudo wget https://download.elastic.co/logstash/logstash/logstash-1.4.2.tar.gz
sudo wget https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.5.1.tar.gz

javaインストール

  • jdk8をupdate-alternativesで選択する

    $ sudo update-alternatives --config java There are 2 choices for the alternative java (providing /usr/bin/java).

    Selection    Path                                                 Priority   Status
    

    *0 /usr/lib/jvm/java-6-openjdk-armhf/jre/bin/java 1057 auto mode 1 /usr/lib/jvm/java-6-openjdk-armhf/jre/bin/java 1057 manual mode 2 /usr/lib/jvm/jdk-8-oracle-arm-vfp-hflt/jre/bin/java 318 manual mode

    Press enter to keep the current choice[*], or type selection number: 2 update-alternatives: using /usr/lib/jvm/jdk-8-oracle-arm-vfp-hflt/jre/bin/java to provide /usr/bin/java (java) in manual mode pi@raspberrypi ~ $ java -version java version "1.8.0" Java(TM) SE Runtime Environment (build 1.8.0-b132) Java HotSpot(TM) Client VM (build 25.0-b70, mixed mode)

  • jdk8がない場合は、別途インストール。(*他のでも動くかもしれないけど一応)

elasticsearchインストール

sudo tar zxvf elasticsearch-1.5.1.tar.gz
sudo mkdir /etc/elasticsearch
sudo cp -p /
sudo cp /usr/share/elasticsearch/elasticsearch-1.5.1/config/elasticsearch.yml /etc/elasticsearch/elasticsearch.yml
sudo mv elasticsearch-1.5.1 /opt/elasticsearch-1.5.1
sudo ln -s /opt/elastcisearch-1.5.1 /opt/elasticsearch

logstashインストール

sudo mv logstash-1.4.2 /opt/logstash-1.4.2
sudo ln -s /opt/logstash-1.4.2 /opt/logstash
sudo mkdir -p /etc/logstash/conf.d
sudo mkdir -p /var/log/logstash/


sudo git clone https://github.com/jnr/jffi.git
cd jffi/
sudo apt-get install -y ant
sudo ant jar
sudo mkdir -p /opt/logstash/vendor/jar/jni/arm-Linux/
sudo cp build/jni/libjffi-1.2.so /opt/logstash-1.4.2/vendor/jar/jni/arm-Linux/
sudo apt-get install zip
cd /opt/logstash/vendor/jar
sudo zip -g jruby-complete-1.7.11.jar jni/arm-Linux/libjffi-1.2.so

kibanaインストール

sudo tar zxvf kibana-4.0.1-linux-x64.tar.gz
sudo mv kibana-4.0.1-linux-x64 /opt/kibana-4.0.1-linux-x64
sudo ln -s /opt/kibana-4.0.1-linux-x64 /opt/kibana

sudo wget http://node-arm.herokuapp.com/node_latest_armhf.deb
sudo dpkg -i node_latest_armhf.deb
sudo mv /opt/kibana/node/bin/node /opt/kibana/node/bin/node.orig
sudo mv /opt/kibana/node/bin/npm /opt/kibana/node/bin/npm.orig
sudo ln -s /usr/local/bin/node /opt/kibana/node/bin/node
sudo ln -s /usr/local/bin/npm /opt/kibana/node/bin/npm

起動

  • elasticsearchを起動

    sudo /opt/elasticsearch/bin/elasticsearch & sudo /opt/kibana/bin/kibana &

  • 確認 ブラウザで「http://XXXXXXX:5301」にアクセス。 kibana の画面が表示すればひとまず成功。

参考

「JJUG CCC 2015 Spring」に行ってきた

Javaのユーザグループ主催のカンファレンス「JJUG CCC 2015 Spring」に参加してきました。 普段見れない他社さんの仕組みの話とか流行りの技術の話とか聞けて、個人的には大満足。

今年でJava15年目。何度も他の言語に乗り換えようとするんだけど、なんだかんだ硬くて素直なところはJavaの良さではあります。あと、積み上げてきたスタックの大きさというか厚みというか、やっぱり信頼おける。と、個人的には思ってます。

13:00- G-1 Java が支える人気ニュースアプリ NewsPicks の裏側 by 文字 拓郎さん(株式会社ユーザベース)

プレゼンメモ

  • NewsPicks

    • Platform/Publisher,Picker
    • gunocyとかSmartNewsとは違う層
    • 意思決定者層が多い、専門家が多い
    • 35万人くらい。有料課金ユーザも増えてる。
    • エンジニアの数は10人くらい
    • プッシュはあえて手動でやっていたりする
    • チーム(プラットフォーム、ブランドデザイン、編集部)
  • NewsPicks裏側

    • Desktop Mobile EC2
    • AWS or Die みたいな感じ
    • DATABASE
      • RDS(MySQL),マスタ
      • Dynamo DB,トランザクション
      • ElasticCache , タイムライン、ランキング、キャッシュ
      • Redshift:Access Log解析
      • ElasticSearch を検索で使ってる
    • JAVA アプリサーバ、バッチサーバ、取り込みサーバ
    • リコメンド、ダッシュボード、CMS、管理とかもぜんぶJava
  • APP Server内側

    • Nginx,Tomcat,fluentd
    • Java8 Sprng MVC
    • REST API (Java8 + LOMBOK + Spring)
    • JSONの定義はLOMBOK使うと楽。
    • SPRING はやっぱりしんどくて実はなくしたい。(コードをどこみればいいのかわからない)
    • JAX-RS + Guice/Daggerがおすすめ
  • SQS

    • 可用性が担保された分散キュー。
    • トランザクショナルではない。
  • ElasticSearch

  • Backend

    • CRONでスケジュール
    • 取り込んだ記事がタイムラインに反映されるまで
      • DynamoDBに一旦取り込む
      • カテゴライズサービスでカテゴライズする
      • ユーザのタイムラインに向けて波及させる
      • 間にはキューを挟む
  • 他の言語

    • 他の言語も考えたけど、なんだかんだJavaがシンプル
    • ruby はやっぱりそこそこの規模だとつらい
    • scalaコンパイルが遅くてつらい
  • Redis編

    • Redisのマスタースレイブ。たまにこけてタイムラインがなくなる。
    • ユーザ数10万人くらいで垂直分割
    • selectする層を用意して使う
    • 人気ユーザが寄っているので、実はユーザで分散しちゃダメ。
    • 結局、ハッシュ分散
    • Read/Writeをわけて読み出しはReadレプリカ
  • SEO

    • Angular で開発されたSPA。
    • Phantom でスナップショット
  • 急成長の代償

    • 後方互換性を維持したままひたすら拡張
    • レイヤリングがほどんどされていない
    • スタートアップ
  • グロース編

    • REDSHIFT トラッキングログを整備、KPIダッシュボード
    • SPRING BOOT , DOMA2 ,ANGULAR JS ,D3 一週間くらい
    • SQL20 がシンプル
  • Angularやめる

    • SEO
    • polyfill service ユーザエージェントでいろいろ
    • THYMELEAF
    • handlebars とか使うと便利かも

所感

  • 知らないライブラリが出てきたのと実際に動いているサービスの裏側のアーキの話をしていただけるのは大変貴重で、かなり勉強になった。
  • みんなreact使ってみたいんだな、と。

プレゼンスライド

https://speakerdeck.com/monzou/java-gazhi-eru-ren-qi-niyusuapuri-newspicks-falseli-ce

14:00- Web開発における最新テスト手法 by tokuhiromさん (subtech)

プレゼンメモ

  • 導入
  • サーバサイドの開発はテストコードを書く
  • どこまで自動化するが
    • 手動でやったほうが楽なものは手動で
    • 繰り返すものは自動で
  • テストコード「契約」となるので堅牢なものになる
  • ビジネスロジックはモデルに入るのでモデルをテストする
  • RDBまわり
    • RDBが絡むテストをどこまでやるか
    • RDBとどう付き合うか
      • RDB独自の機能を使いたい派 
      • JPAを使いたい派
    • mysqlmavenから起動する?制御が大変なのでlocalに立てる。
    • junitでいきなりcreate databaseするのが楽。beforeClassで。スキーマを流しこむ。
  • 外部APIのテスト
    • embedded jetty を使う
    • apimock
  • コントローラのテスト
    • httpclientから叩く *結合テストのような感じか。
    • JSON APIは、
    • フォームのテストはテストコードにしてない。人力で結構見つかるので。
    • HTMLは変わりまくるので自動化テストの手間が見合わない。
  • ダミーデータ
  • テストライブラリ
    • asertj を使うとIDEフレンドリー
    • google truth もあるけどassertj と似てるけど足りない。

所感

  • なるべく動作環境に近い環境でテストしましょう、っていうのは同感。なのでjetty立ち上げTDDは試してみてもいいかも。
  • ただ、結局、何を確認したいかによるよねと。間違いようのないhttpアクセスのコードだったらモックでもいいじゃん、という気もする。というかそうしてる。

プレゼンスライド

  • 見つからなかったです。

15:00- 大規模な負荷でもドキドキしない為のJava EE by nagaseyasuhitoさん (java-ja / グリー株式会社)

プレゼンメモ

  • 負荷テスト
    • アンチパターン
      • シングルスレッドで実行 → 実践的ではない
      • ユースケーストかけ離れたシナリオ
      • 複数のサーバでコマンドを叩いて手動実行
    • 負荷テストの目的
      • システムの限界性能を知る 
      • 高負荷時の不具合を発見する
    • Stress Test meets Continuous Integration
    • JMeter
      • HTTP Test Script Recorder (プロキシとしてトレースしてくれる)
      • Selenium Web Driver Sampler
      • JUnit Requestサンプラ junitを実行できる
    • jmeter-maven-pluginで自動化
    • 負荷テストのパラメタはMavenのプロパティ化してチューニングするとよい。
    • jenkins performance plugin jmeterの結果を出力するプラグイン
    • 負荷をかけるとどうなるか
      • 他ユーザのレスポンすが返ってくる
      • レスポンセウが返ってこない
      • リソースは余ってるのにレスポンスが遅い
      • リクエストが遅い
      • レスポンスが遅い
    • ボトルネックを探す
    • Mission Control ,Flichg REcorderで取得したJVMの統計情報を可視化するツール
      • 以下のオプションをつけてアプリサーバを起動する -XX;LUnlockCommercialFeatures -XX:+FlightRecorder
  • JPAのスケールアウト戦略
    • enterprise とsocialの違いで戦略が違う
    • READ が頭打ち
      • Master-Slaveのレプリケーション
      • readの負荷状況によって2台、3台に増やす
      • Mysql ReplicationDriver
      • entityManager.unwrap(Connection.class).setReadOnly(true); なんかちょっと残念。
    • WRITEも頭打ち
      • パーティショニング。一定のルールに従ってクエリ発行するデータベースを振り分ける。
      • EclipseLink Partitioning
        • アノテーションを書くことで書き込み先のデータベースコネクションを切り替えられる。

所感

  • 意外とjmeterで地味に負荷かけてるんだな、っていうところが驚き。
  • CIにどこまでのシナリオを組み込むかは課題なのかな。uiが変わったら作り直しだし、毎回チューニングするわけにもいかないだろうし。
  • エンタープライズの人だからか、やっぱり、負荷テストやってから「限界を知る」っていうのには違和感がある。いやまあ、実際そういう面もあるんだろうけど、出来高でやってていいのかなあ、っと。

プレゼンスライド

16:00 『Embulk』に見るモダンJavaの実践的テクニック ~並列分散処理システムの実装手法~

by なひ さん(トレジャーデータ株式会社)

プレゼンメモ

  • Embulk
    • オープンソースのバルク転送ツール
      • A から B へレコード群を転送
    • プラグイン機構
    • バルク転送の難しさ
      • 入力データの正規化、エラー処理、メンテ、性能
      • 入力データ
      • エラー処理の扱い
        • 例外的な値が入ってきたときにどうするか。
        • ネットワークエラー、ディスクフル、重複データ転送の回避復旧。
      • メンテナンスの難しさ
      • 転送データ
        • 通常は増えていく
    • Embulkデモ
      • csvを入力するとカラムを推測してくれる
    • プラグイン
      • InputPlugin
      • FilterPlugin
      • OutputPlugin
      • Executorplugin
  • Java実装技術
    • Java7ネイティブ
      • try-with-resources
      • ファイル操作はFiles & Paths API
    • Guice
    • Jacksonによるモデルクラス
    • Immutableな変数はfinalにしておく
    • Nettyバッファアロケータ
      • レコード群のためのメモリをすべて自前管理
      • out of memory が起きる前に検出
      • GCコスト削減
      • 複数のバルクロードセッションをサーバプロセス内で実行しても大丈夫
    • Unsafe
      • airlift/slice - sun.misc.Unsafe API のラッパー
      • バイト列の直接操作(で・シリアライズ
      • コピー削減

所感

  • やっぱりEmbulk便利そう
  • 噂には聞いているけどUnsafe API。名前からして触ってはいけないもの感が漂ってるんだよなあ。いつか触ってみよう。

プレゼンスライド

17:00- Grails 第3章 進化したSpring-bootベースフレームワーク

by 山本 剛さん(日本Grails/Groovyユーザーグループ, newcast inc.)

プレゼンメモ

  • Grails
  • Road to 3
    • Grails1系:古代
    • grails2.3系まで:近代
    • Spring Bootをベースにする
    • grails2.4 はspring4
  • Grails 3
    • Groovy 2.4
    • Spring 4.1
    • Spring Boot 1.2
    • Gradle
      • JavaやっててGradleやってないのはおかしい
    • なぜspring boot?
    • *spring bootが簡単そう
    • コアフィーチャ
      • Groovy2.4 で速くなった
      • 内部フローの変更
    • 開発環境フィーチャ
      • Intellij IDEAのgradleプロジェクトで開発
    • テストフィーチャ
  • まとめ
    • spring boot
    • 3.0はまだ不安定かも。3.1に期待。

所感

  • うーむ、grails3もちょっと厳しそうだな。

プレゼンスライド

18:00 いろんなデータをKibana4で見てみよう by 大谷 純 さん (elastic.co)

プレゼンメモ

  • elastic
    • elasticsearch から elasticへ変更。
  • ELK Stack
    • DataをLogstash に食わせて、elasticsearchにためて、データをkibanaで見る。
    • Logstash
      • 概要
        • ログデータの収集・管理
        • 収集、パース・加工、送出
        • jruby
      • アーキ
        • input Filter output
      • 設定ファイルでinput,filter,outputを指定する
    • Elasticsearch
      • キーワードの検索、絞り込み、ハイライト、ページング、集計、サジェスト
      • githubで使ってる
      • スキーマフリー、分散ドキュメントストア、REST & JSON
    • Kibana4
      • データを探索しやすく、elsticsearchの新機能を活用できる
      • 3つの画面
        • Discover
          • どんなフィールドがあるか
          • 検索した結果はどんなデータか
          • 検索条件の保存
        • Visualize
          • 検索結果をもとにグラフを作成
          • グラフのタイプを選択
          • 検索対象のデータを選ぶ
          • 集計単位を選択
          • グラフを表示させたら保存する。elasticsearchにjson形式で保存される。
        • Dashboard
          • 保存したグラフを貼り付けていく
      • kibana3との違い
        • 基本的に一枚だけだった。異なるデータを一緒の画面に出せなかった。
        • 異なるフィールドのデータを一緒にグラフに出すことができなかった。
  • ユースケースアクセスログ
    • logstash
      • input:Accessログのパスを指定
      • filter:grok でパース。apacheのログの場合は構造化してくれるものが用意されている。
      • filter:geoipで緯度経度情報をとってきてくれる。
      • filter:useragent でよしなに抜き出してくれる
      • ouptut:elasticsearchにしてindexの名前を日毎に設定。
    • elasticsearch
      • Index template。
    • kibana
  • ユースケース:ツイート
  • ユースケース:git log
    • gitのヒストリを取り出してelasticsearchに投げる
  • ユースケースjavaのログ
    • multilineを使うとstacktraceとかの複数行も取れる
  • elasticsearch勉強会やってる

所感

  • kibana4になってますますsplunkっぽくなったなあ
  • raspberry pi2 に入れてみよっと

プレゼンスライド

  • 見つからなかったです。

Raspberry PI2にSnortインストール

snortsnortのアラートを見るためのGUI のsnorby をRaspberry PI2にインストールします。 snortは素直にインストールできましたが、snorbyはうまくいきませんでした。 やったところまで書いときます。

snortのインストール

 ほんとコレだけ。

$sudo apt-get install snort

snorbyのインストール

snorbyが使うmysqlをインストール。

$ sudo apt-get install mysql-server mysql-client

snort-mysqlとか必要なライブラリのインストール。

$apt-get install snort-mysql
$ sudo apt-get install libyaml-dev git-core default-jre imagemagick libmagickwand-dev wkhtmltopdf build-essential libssl-dev libreadline-gplv2-dev zlib1g-dev libsqlite3-dev libxslt1-dev libxml2-dev libmysqlclient-dev libmysql++-dev apache2-prefork-dev libcurl4-openssl-dev ruby ruby-dev
$ sudo apt-get install linux-headers-rpi

rails とsnorby とかいろいろインストール。

$ sudo gem install bundler rails
$ sudo gem install rake --version=0.9.2
$ cd /var/www/
$ sudo git clone http://github.com/Snorby/snorby.git
$ cd /var/www/snorby/config

$ sudo cp database.yml.example database.yml
$ sudo cp snorby_config.yml.example snorby_config.yml
$ sudo sed -i s/"\/usr\/local\/bin\/wkhtmltopdf"/"\/usr\/bin\/wkhtmltopdf"/g snorby_config.yml

database.yml のデータベースの設定を修正し、以下のコマンドでインストール

$ cd /var/www/snorby/
$ sudo bundle install --deployment
$ sudo bundle exec rake snort:setup

以下のエラーメッセージが表示されてインストール出来ず。 海外でも同様のエラーが報告されているが、解決策は見つからず・・・

rake aborted!
cannot load such file -- dm-rails/railtie

とりあえず、snort単体で動かすか。。。

Apache storm チュートリアルの和訳

英語の勉強も兼ねてApache Stormのチュートリアルを和訳してみました。 ほぼ直訳・一部意訳です。間違いは大目に見ていただけると助かります。

https://storm.apache.org/documentation/Tutorial.html

チュートリアル

このチュートリアルでは、Stormトポロジーの作り方とStormクラスタのデプロイ方法について学びます。Javaをメインの言語で使いますが、Stormの他言語対応を説明するためにPythonを使います。

序文

このチュートリアルは、storm-starterプロジェクトを使います。本プロジェクトをcloneし、例に従うことをおすすめします。「Setting up a development environment」と「Creating a new Storm projet」を読んで、セットアップしてください。

Stormクラスタコンポーネント

Stormクラスタは一見Hadoopクラスタに似ています。Hadoopでは、「MapReduce jobs」を使いますが、Stormでは「トポロジー」を使います。「Jobs」と「トポロジー」は異なります。一つの大きな違いは、MapReduce Jobは処理を終了するが、トポロジーはメッセージをずっと処理し続けます。

Stormクラスタにはマスターノードとワーカーノードの2つのノードがあります。マスターノードは、「Nimbus」と呼ばれるHadoopの「JobTracker」に似たデーモンを実行します。Nimbusクラスタへのコードの配布、マシンへのタスクのアサイン、障害の監視の責務をおいます。

ワーカーノードは「Supervisor」と呼ばれるデーモンを実行します。Supervisorは、マシンへの処理のアサインを待ち、Nimbusの指示によって必要に応じてワーカープロセスの開始・終了します。ワーカープロセスはトポロジーのサブセットを実行します。トポロジーの実行は、多くのマシンに分散した多くのワーカープロセスにより構成されます。

https://storm.apache.org/documentation/images/storm-cluster.png ※storm.apache.orgより引用

NimbusとSupervisor間の全ての調整はZookeperクラスタを通して行います。加えて、NimbusデーモンとSupervisorデーモンはfail-fastでステートレスです。全ての状態はZookeperかローカルディスクによって保持されます。これは、kill -9 NimbusまたはSupervisorしても何もなかったかのように処理を再開出来ることを意味します。この設計によってStormクラスタは信じられないほど安定しています。

トポロジー

Stormでリアルタイム処理をするためには、トポロジーと呼ばれるものを作ります。トポロジーは処理のグラフです。トポロジーのノードは処理ロジックを含み、ノード間のリンクはデータがどのようにノード間でやり取りされるかを示しています。トポロジーの実行は簡単です。まずはじめに、コードをパッケージングし、依存するライブラリを一つのjarに格納します。それから、以下のようなコマンドを実行します。

storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

上記は、backtype.storm.MyTopologyクラスを引数arg1,arg2で実行しています。クラスの主な機能は、トポロジの定義とNimbusへの登録です。Storm jar の部分は、Nimbusへの接続とjarのアップロードを行います。

トポロジーの定義は、Thrift構造体であり、NimbusはThriftサービスなので、いろんな言語でトポロジの作成と登録が行えます。上記の例は、JVMベース言語を用いた最も簡単な例です。さらにトポロジーの開始と終了に関する多くの情報は、「Running topologies on a production cluster」を参照してください。

Streams

Stormのコアは「stream」です。streamはタプルの無限の列です。Stormは、分散と信頼性の高い方法で新しいstreamにstreamを変換するためのプリミティブを提供します。例えば、つぶやきのストリームをトピックのトレンドのストリームに変換出来ます。 Streamの変換を行うためにStormが提供する基本要素は「spouts」と「bolts」です。SpoutsとBoltsは、アプリケーション固有のロジックを実行するインターフェースを備えています。 Spoutは、Streamの発行元となります。例えば、spoutは、Kestrelキューからタプルを読み込み、新たなストリームとして発行します。または、spoutは、TwitterAPIに接続し、つぶやきのストリームを新たに発行します。

boltはstreamを入力とし、何らかの処理を行い、新しいstreamを発行します。つぶやきから流行トピックのストリームを算出するような複雑なstream変換は、複数のステップ(複数のbolts)を要します。Boltsは機能の実行、タプルのフィルタリング、streamの計算、streamの結合、データベースとのやりとりなどを行うことが出来ます。 spoutsとboltsのネットワークは、Stormクラスタに登録するトポロジーのパッケージに含まれます。トポロジーは、spoutまたはboltsのノードによるstream変換のグラフです。グラフのエッジは、streamにsubscribeされたboltsを示します。spoutまたはboltsがstreamにタプルを発行したとき、streamにsubscribeされた全てのboltsにタプルが送信されます。

https://storm.apache.org/documentation/images/topology.png ※storm.apache.orgから引用

トポロジーのノード間のリンクは、タプルをどのようにやり取りするかを示します。例えば、SpoutAとBoltBの間のリンクがあり、SpoutAからBoltCへのリンク、BoltB からBoltCへのリンクがあった場合、常にSpoutAはタプルを発行し、BoltBとBoltCにタプルを送ります。BoltBのタプルは、BoltCへ送出されます。 Stormトポロジーのノードは、並列に実行されます。それぞれのノードに対してどの程度の並行性を識別することができ、Stormは実行のためにクラスタの間でいくつかのスレッドを生成します。 トポロジーはkillするまで永久に動作します。Stormは失敗タスクを自動的に再アサインします。加えて、Stormはマシンがダウンしメッセージを受け取れなくても、データロスがないことを保証します。

データモデル

Stormはタプルをデータモデルとして利用します。タプルは値の名前付きリストであり、タプルのフィールドはあらゆる種類のオブジェクトです。Stormは全てのプリミティブタイプと文字列とバイト配列をタプルのフィールドとしてサポートもしています。他のオブジェクトを利用するためには、オブジェクトのserializerを実装する必要があります。

トポロジーの全てのノードは、タプルの出力フィールドを宣言する必要があります。たとえが、このboltは出力する2つのタプルを「double」と「triple」のフィールドとともに宣言しています。

public class DoubleAndTripleBolt extends BaseRichBolt {
    private OutputCollectorBase _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        int val = input.getInteger(0);        
        _collector.emit(input, new Values(val*2, val*3));
        _collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("double", "triple"));
    }    
}

declareOutputFields は出力するフィールド{double,triple}を宣言する関数です。boltの残りの部分は続くセクションで説明します。

シンプルなトポロジー

さらなるコンセプトの探求とどのようにコードを実装するかを確認するために、シンプルなトポロジーを確認してみましょう。storm-starterのExclamationTopologyを見てください。

TopologyBuilder builder = new TopologyBuilder();        
builder.setSpout("words", new TestWordSpout(), 10);        
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
        .shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
        .shuffleGrouping("exclaim1");

このトポロジーはspoutと2つのboltを含みます。spoutはwordsを生成し、boltは「!!!」の文字列を入力に対して付与します。spoutは最初のboltにemitし、その次のboltに対してemitします。spoutがタプル「bob」と「john」をemitした場合二番目のboltは「bob!!!!!!」と「john!!!!!!」を生成します。 コードはsetSpoutとsetBoltメソッドを使うノードを定義しています。これらのメソッドは、userを識別するid、処理するためのロジックを含んだオブジェクトそして、ノードに求める並列化の量をインプットとします。この例では、spoutは「words」をidとして与えboltsは「exclaim1」「exclaim2」をidとして与えられています。 処理ロジックを含んだオブジェクトは、spoutにIRichSpoutインターフェースを実装し、boltsにIRichBoltインターフェースを実装しています。 最後のパラメタは、ノードに求める並列化の量を指します。どれほどのクラスタを超えたコンポーネントを実行するスレッドの数を指し示します。omitした場合、Stormは1つのスレッドのみを割り当てます。 setBolt はBoltの入力を定義するために利用されるInputDeclarerオブジェクトを返します。ここでは、コンポーネント「exclaim1」をシャッフルグルーピングを用いて「words」コンポーネントによって発行された全てのタプルを読むことを宣言します。「exclaim2」コンポーネントは、「exclaim1」コンポーネントによって発行されたすべてのタプルを読むコを宣言します。「シャッフルグルーピング」とはタプルが入力タスクからランダムにboltのタスクに配送されることを意味します。コンポーネント間のデータのグルーピングには多くの方法があります。いくつかのセクションで説明します。

builder.setBolt("exclaim2", new ExclamationBolt(), 5)
            .shuffleGrouping("words")
            .shuffleGrouping("exclaim1");

上記の通り、Boltの入力宣言は複数のソースをつなげて記述出来ます。

このトポロジーのspoutとboltの実装について深く確認してみましょう。Spoutsはトポロジーに新しいメッセージを発行することについて責務をおいます。このトポロジーのTestWordSpoutはリスト「nathan mike jackson golda bertels」のリストから100msごとにランダムな単語を発行します。TestWordSpoutのnextTuplue()はこのようになります。

public void nextTuple() {
    Utils.sleep(100);
    final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
    final Random rand = new Random();
    final String word = words[rand.nextInt(words.length)];
    _collector.emit(new Values(word));
}

上述の通り、実装は単純です。

ExclamationBolt は入力に"!!!"を追加します。ExclamtionBoltのすべての実装を見てみましょう。

public static class ExclamationBolt implements IRichBolt {
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    public void execute(Tuple tuple) {
        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
        _collector.ack(tuple);
    }

    public void cleanup() {
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    public Map getComponentConfiguration() {
        return null;
    }
}

prepareメソッドは、このboltからのタプルの発行に使われるOutputCollectorを提供します。タプルはboltからいつでも発行出来ます。prepare,executeまたはcleanupメソッド、他スレッドで非同期に発行することも出来ます。このprepareの実装は、シンプルにOutputCollecotrをexecuteメソッドで後ほど利用するインスタンス変数として保持しています。

executeメソッドはboltの入力の一つからタプルを受け取ります。ExclamationBoltはタプルのはじめのフィールドを引き回し、そこに!!!を追加して新しいタプルを発行します。もし複数のソースを入力とするboltを実装した場合、Tuple#getSourceComponentメソッドを使ってどのコンポーネントからタプルがきたかを調べることが出来る。 executeメソッドには他の事柄もあります。すなわち、入力タプルはemitの第一引数渡され、最後の行で応答されます。データをロスすることないStormの信頼APIがあるが後ほど説明します。

cleanupメソッドは、Boltが停止するときのリソースをクリーンアップするときに呼び出される。このメソッドクラスタ上で呼び出される保証がない。 例えば、マシンのタスクが実行中のとき、メソッドを実行する手段がない。cleanupメソッドは,ローカルモードでトポロジーを実行している場合に利用されることを意図しており、リソースがリークすることなく多くのトポロジーの実行とkillを行うことが出来る。 declareOutputFieldsメソッドは、ExclamationBoltが「word」と呼ばれるフィールドを持つ1タプルを発行することを宣言している。

getComponentConfigurationメソッドは、どのようにコンポーネントを実行するかの各種設定を行うことが出来る。これはConfigurationを説明する高度なトピックです。 cleanupやgetComponentConfigurationのようなメソッドは、boltの実装に必ずしも必要なものではありません。boltは、デフォルトの実装を提供するベースクラスを利用することでより簡潔に定義できます。ExclamationBoltは、BaseRichBoltを継承することでより簡潔に書くことができます。

public static class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    public void execute(Tuple tuple) {
        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
        _collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }    
}

ローカルモードでのExclamationTopologyの実行

どのようにローカルモードでExclamationTopologを実行・動作するかをみていきましょう。

Stormは2つの実行モードがあります。ローカルモードとdistributedモード。ローカルモードでは、Stormはワーカーノードをスレッドを用いてシミュレーションして実行します。ローカルモードはトポロジーのテストや開発に利用するのに適しています。storm-starterでトポロジーを実行する場合、ローカルモードで起動し、コンポーネントが発行されるメッセージを確認することができます。より詳細なトポロジーのローカルモードでの実行については、Local mode(https://storm.apache.org/documentation/Local-mode.html)に示します。

distributedモードでは、Stormはマシンのクラスタ上で動作します。マスターにトポロジーをsubmitすると、トポロジーの実行に必要な全てのコードがsubmitされます。マスターはコードの配布とトポロジーを実行するためのワーカーの準備を行います。ワーカーが停止する場合、マスターは他のものにアサインし直します。より詳細なクラスター上での実行については、Runnning topologies on a production cluster(https://storm.apache.org/documentation/Running-topologies-on-a-production-cluster.html)に示します。

このコードは、ExclamationTopologyをローカルモードで実行します。

Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();

はじめに、コードはLocalClusterオブジェクトによって作成されるクラスタを定義しています。仮想クラスタへのトポロジーのsubmitは、distributedクラスタへのsubmitと同様です。submitTopologyメソッドによって、LocalClusterにトポロジーがsubmitしています。引数は、トポロジーの名前、トポロジーの設定、トポロジー本体です。 引数nameはトポロジーを停止するときなどに、トポロジーを識別することに使われます。トポロジーはkillされるまで無期限で動作します。 引数configurationは、トポロジーを動作させるための各種設定に利用されます。2つの設定は共通的に扱われます。

  1. TOPOLOGY_WORKERS(set with setNumWorkers) トポロジーを実行するために、必要なプロセス数を定義します。トポロジーコンポーネントは多くのスレッドで実行されます。コンポーネントに割り当てられるスレッドの数は、setBoltとsetSpoutメソッドによって設定されます。これらのスレッドは、ワーカープロセス内に存在します。ワーカープロセスは、いくつかのコンポーネントのためのいくつかのスレッドを含んでいます。例えば、300スレッドを全てのコンポーネントと50ワーカープロセスを設定で指定します。それぞれのワーカープロセスは、6スレッドで実行されます。Stormトポロジーの性能を、それぞれのコンポーネントとスレッドが実行されるワーカープロセス数の並列処理の微調整で、チューニングすることができます。
  2. TOPOLOGY_DEBUG(set with setDebug) trueに設定した場合、Stormはコンポーネントにメッセージを発行するごとにログ出力します。ローカルモードでトポロジーのテストを行うときに有益です。ただし、トポロジークラスター上で実行する場合は、停止したほうがよいでしょう。

他にも多くのトポロジーの設定があります。各種設定の詳細については、Javadoc for Config(https://storm.apache.org/javadoc/apidocs/backtype/storm/Config.html)に示します。 ローカルモードでトポロジーを実行するためにどのように開発環境(Eclipseなど)をセットアップするかについては、Creating a new Storm Project(https://storm.apache.org/documentation/Creating-a-new-Storm-project.html)に示します。

Stream groupings

streamのグルーピングは、2つのコンポーネント間でタプルをどのように送信するかをトポロジーに示します。spoutsとboltsは、多くのタスクをクラスタ間で並列に実行します。トポロジーのタスクレベルの実行は、以下のように行われています。

https://storm.apache.org/documentation/images/topology-tasks.png ※storm.apache.orgより引用

BoltAのタスクがBoltBにタプルを発行するとき、どのタスクにタプルを送信すればよいでしょうか? stream groupingは、タスクの組み合わせの間をどのようにタプルを送信するかをStormに指示するものです。異なる種類のstream groupingsについて深堀するまえに、storm-starterのもう一つのトポロジーを確認してみましょう。

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("sentences", new RandomSentenceSpout(), 5);        
builder.setBolt("split", new SplitSentence(), 8)
        .shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 12)
        .fieldsGrouping("split", new Fields("word"));

SplitSentenceは、受け取った文章中のそれぞれの単語をタプルとして発行し、WordCountは単語とカウントをmapに保持します。WordCountがwordを受け取るごとに、状態と新しい単語数を発行します。

stream groupingsにはいくつかの種類があります。

単純なグルーピングは、「shuffule grouping」と呼ばれ、タプルをランダムにタスクに送出します。shuffule groupingはWordCountTopologyのRandomSentenceSpoutからSplitSentence boltへのタプルの送出に利用されています。SplitSentence boltのタスクへ均等にタプルを配布する効果があります。

さらに興味深いgroupingの種類は「fields grouping」です。field groupingは、SplitSentence boltとWordCount boltの間で利用されています。同じwordが同じタスクに送信される必要があるWordCount boltの機能にとって重大です。一つ以上のタスクが同じ単語を見てしまうと、それぞれ不完全な値を持つため、誤ったカウントを送信してしまいます。fields groupingは、フィールドによってストリームをグルーピングします。これによって、同じタスクに同じフィールド値を持つものが送信されます。WordCountがSplitSentenceのwordフィールドによるfield groupingの出力ストリームを受け取ることにより、同じ単語は常に同じタスクに送信され、boltは正確な値を作成することができます。

Fields groupingはストリームの結合やストリームの集計やその他のユースケースの基本となります。fields groupingsはmodハッシングを利用しています。 他にもいくつかのstream groupingsがあります。詳細については、Consepts(https://storm.apache.org/documentation/Concepts.html)に示します。

Boltsを他言語で定義する

省略

メッセージ処理の保証

このチュートリアルでは、タプルの発行方法についてのいくつかについてはスキップしました。これらのaspectsはStormの信頼性APIの一部です。Stormがどのようにspoutから送信される全てのメッセージが処理されるかを保証方法です。どのように働き、Stormの信頼性機能を利用するかの情報については、Guaranteeing message processing(https://storm.apache.org/documentation/Guaranteeing-message-processing.html)を確認してください。

トランザクショナルトポロジー

Stormは、全てのメッセージが最低1回トポロジーで処理されることを保証します。共通の質問としてたずねられるのが「どのようにStormのtopでカウントしているのか?overcountすることはないか?」です。Stormは、計算のために正確に1回メッセージ処理を実現するトランザクショナルトポロジーと呼ばれる機能を持っています。詳しい内容についてはこちら(https://storm.apache.org/documentation/Transactional-topologies.html)に示します。

配送されたRPC

このチュートリアルでは、Stormによるstream処理の基本を説明しました。他にも多くのことがStreamの基本によって出来ます。Stormのアプリケーションの中で興味深い一つはDistributed RPCです、on the fly で並列計算を行うときに。Distributed RPCについてはこちら(https://storm.apache.org/documentation/Distributed-RPC.html)に示します。

Conclusion

このチュートリアルでは、Stormトポロジーの開発、テスト、配備の概要について説明しました。他のドキュメントはStormを利用する際の側面についてより詳細に示しています。

Raspberry PI2セットアップ

注文していたRaspberry PI2が届いたのでさっそくセットアップしてみた。 正常に動作するまでは、SDカードの書きミス、電源不足という壁があったが、きちんとしたものをそろえれば問題なし。

用意するもの

  • Raspberry PI2
  • HDMIケーブル
  • microUSB - USBのケーブル(そこそこいいやつ) ELECOM TB-AMB2A12BK
  • USBのACアダプタ(2A以上推奨とのこと) iBuffalo BSMPA09BA
  • microSDカード 東芝の64GB

※ USBのACアダプタおよびUSBのケーブルは、そこそこのものを使わないと、電力が安定せず、再起動を繰り返すなぞの機械が出来上がります。 はじめこちらを注文して試してたけど、評価コメントにある通り、RPI2ではうまく動きません。要注意。

Amazon.co.jp: Raspberry Pi対応 USB電源アダプタ 2.0A 1ポート 高品質電源 (PSE、CEマーク付き) +USBケーブル付: パソコン・周辺機器

OSのインストール

OSイメージのダウンロード

Raspberry PI用のOSをダウンロードする。 以下のURLからお好みのOSをダウンロード

rhel系をよく使ってるので、PIDORAを使おうとしたけど、PI2には対応しておらず。 おとなしく一番情報がありそうなRASPBIANを使うことにする。

ダウンロードできたら、unzipコマンドで解凍する。 以下のファイルがzipファイルに含まれているはず。

SD カードに書き込み

sdカードのフォーマット。 フォマット用のソフトを以下のリンクからダウンロード。 - https://www.sdcard.org/downloads/formatter_4/eula_mac/index.html

とりあえず、上記のソフトの「オプション」から論理アドレスの調整を「On」にしてフォーマット。

dfコマンドで mount 場所を確認してimgファイルを書き込み。

$ sudo diskutil unmountDisk /dev/disk1s1
$ sudo dd bs=1m if=2015-02-16-raspbian-wheezy.img of=/dev/rdisk1
3125+0 records in
3125+0 records out
3276800000 bytes transferred in 159.860334 secs (20497893 bytes/sec)

起動

いろいろつなぐ

起動する前にいろいろつなぎます。

電源いれる

というか、電源ケーブルをさすと電源ON状態になる。 電源が点くと電源ランプが点灯します。

起動後の設定

パーティションの拡張 SDカードの容量が余ってる場合は、パーティションの拡張を行う。後からでも出来るけど。 「1 Expand FileSystem」を選択してメニューに従う。

ロケールタイムゾーン・キーボードの設定 「Internationallisation Options」のメニューからそれぞれ - ja_JP.UTF-8 - Tokyo - Generic 102-key PC -> Other-> Japanese -> Japanese -> The Default for the keyboard layout -> No Compose key -> yes を選択。

ログイン

デフォルトで user:pi password:raspberry でログイン出来る。

リモートログインの確認

RASPI側のネットワークアドレスを確認する。DHCPを使っていればアドレスがふられている。

Mac側から以下のコマンドでSSHログイン。ユーザ:pi パスワード:raspberry がデフォルト。

$ ssh pi@[IPアドレス]

ログイン後

ログイン後、apt-getしておく。

$ sudo apt-get update
$ sudo apt-get upgrade

emacs が入っていないので入れておくこと。 $ sudo apt-get install emacs

参考