Apache storm チュートリアルの和訳
英語の勉強も兼ねてApache Stormのチュートリアルを和訳してみました。 ほぼ直訳・一部意訳です。間違いは大目に見ていただけると助かります。
https://storm.apache.org/documentation/Tutorial.html
チュートリアル
このチュートリアルでは、Stormトポロジーの作り方とStormクラスタのデプロイ方法について学びます。Javaをメインの言語で使いますが、Stormの他言語対応を説明するためにPythonを使います。
序文
このチュートリアルは、storm-starterプロジェクトを使います。本プロジェクトをcloneし、例に従うことをおすすめします。「Setting up a development environment」と「Creating a new Storm projet」を読んで、セットアップしてください。
Stormクラスタのコンポーネント
Stormクラスタは一見Hadoopクラスタに似ています。Hadoopでは、「MapReduce jobs」を使いますが、Stormでは「トポロジー」を使います。「Jobs」と「トポロジー」は異なります。一つの大きな違いは、MapReduce Jobは処理を終了するが、トポロジーはメッセージをずっと処理し続けます。
Stormクラスタにはマスターノードとワーカーノードの2つのノードがあります。マスターノードは、「Nimbus」と呼ばれるHadoopの「JobTracker」に似たデーモンを実行します。Nimbusはクラスタへのコードの配布、マシンへのタスクのアサイン、障害の監視の責務をおいます。
ワーカーノードは「Supervisor」と呼ばれるデーモンを実行します。Supervisorは、マシンへの処理のアサインを待ち、Nimbusの指示によって必要に応じてワーカープロセスの開始・終了します。ワーカープロセスはトポロジーのサブセットを実行します。トポロジーの実行は、多くのマシンに分散した多くのワーカープロセスにより構成されます。
※storm.apache.orgより引用
NimbusとSupervisor間の全ての調整はZookeperクラスタを通して行います。加えて、NimbusデーモンとSupervisorデーモンはfail-fastでステートレスです。全ての状態はZookeperかローカルディスクによって保持されます。これは、kill -9 NimbusまたはSupervisorしても何もなかったかのように処理を再開出来ることを意味します。この設計によってStormクラスタは信じられないほど安定しています。
トポロジー
Stormでリアルタイム処理をするためには、トポロジーと呼ばれるものを作ります。トポロジーは処理のグラフです。トポロジーのノードは処理ロジックを含み、ノード間のリンクはデータがどのようにノード間でやり取りされるかを示しています。トポロジーの実行は簡単です。まずはじめに、コードをパッケージングし、依存するライブラリを一つのjarに格納します。それから、以下のようなコマンドを実行します。
storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
上記は、backtype.storm.MyTopologyクラスを引数arg1,arg2で実行しています。クラスの主な機能は、トポロジの定義とNimbusへの登録です。Storm jar の部分は、Nimbusへの接続とjarのアップロードを行います。
トポロジーの定義は、Thrift構造体であり、NimbusはThriftサービスなので、いろんな言語でトポロジの作成と登録が行えます。上記の例は、JVMベース言語を用いた最も簡単な例です。さらにトポロジーの開始と終了に関する多くの情報は、「Running topologies on a production cluster」を参照してください。
Streams
Stormのコアは「stream」です。streamはタプルの無限の列です。Stormは、分散と信頼性の高い方法で新しいstreamにstreamを変換するためのプリミティブを提供します。例えば、つぶやきのストリームをトピックのトレンドのストリームに変換出来ます。 Streamの変換を行うためにStormが提供する基本要素は「spouts」と「bolts」です。SpoutsとBoltsは、アプリケーション固有のロジックを実行するインターフェースを備えています。 Spoutは、Streamの発行元となります。例えば、spoutは、Kestrelキューからタプルを読み込み、新たなストリームとして発行します。または、spoutは、TwitterAPIに接続し、つぶやきのストリームを新たに発行します。
boltはstreamを入力とし、何らかの処理を行い、新しいstreamを発行します。つぶやきから流行トピックのストリームを算出するような複雑なstream変換は、複数のステップ(複数のbolts)を要します。Boltsは機能の実行、タプルのフィルタリング、streamの計算、streamの結合、データベースとのやりとりなどを行うことが出来ます。 spoutsとboltsのネットワークは、Stormクラスタに登録するトポロジーのパッケージに含まれます。トポロジーは、spoutまたはboltsのノードによるstream変換のグラフです。グラフのエッジは、streamにsubscribeされたboltsを示します。spoutまたはboltsがstreamにタプルを発行したとき、streamにsubscribeされた全てのboltsにタプルが送信されます。
※storm.apache.orgから引用
トポロジーのノード間のリンクは、タプルをどのようにやり取りするかを示します。例えば、SpoutAとBoltBの間のリンクがあり、SpoutAからBoltCへのリンク、BoltB からBoltCへのリンクがあった場合、常にSpoutAはタプルを発行し、BoltBとBoltCにタプルを送ります。BoltBのタプルは、BoltCへ送出されます。 Stormトポロジーのノードは、並列に実行されます。それぞれのノードに対してどの程度の並行性を識別することができ、Stormは実行のためにクラスタの間でいくつかのスレッドを生成します。 トポロジーはkillするまで永久に動作します。Stormは失敗タスクを自動的に再アサインします。加えて、Stormはマシンがダウンしメッセージを受け取れなくても、データロスがないことを保証します。
データモデル
Stormはタプルをデータモデルとして利用します。タプルは値の名前付きリストであり、タプルのフィールドはあらゆる種類のオブジェクトです。Stormは全てのプリミティブタイプと文字列とバイト配列をタプルのフィールドとしてサポートもしています。他のオブジェクトを利用するためには、オブジェクトのserializerを実装する必要があります。
トポロジーの全てのノードは、タプルの出力フィールドを宣言する必要があります。たとえが、このboltは出力する2つのタプルを「double」と「triple」のフィールドとともに宣言しています。
public class DoubleAndTripleBolt extends BaseRichBolt {
private OutputCollectorBase _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
_collector = collector;
}
@Override
public void execute(Tuple input) {
int val = input.getInteger(0);
_collector.emit(input, new Values(val*2, val*3));
_collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("double", "triple"));
}
}
declareOutputFields は出力するフィールド{double,triple}を宣言する関数です。boltの残りの部分は続くセクションで説明します。
シンプルなトポロジー
さらなるコンセプトの探求とどのようにコードを実装するかを確認するために、シンプルなトポロジーを確認してみましょう。storm-starterのExclamationTopologyを見てください。
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
.shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
.shuffleGrouping("exclaim1");
このトポロジーはspoutと2つのboltを含みます。spoutはwordsを生成し、boltは「!!!」の文字列を入力に対して付与します。spoutは最初のboltにemitし、その次のboltに対してemitします。spoutがタプル「bob」と「john」をemitした場合二番目のboltは「bob!!!!!!」と「john!!!!!!」を生成します。 コードはsetSpoutとsetBoltメソッドを使うノードを定義しています。これらのメソッドは、userを識別するid、処理するためのロジックを含んだオブジェクトそして、ノードに求める並列化の量をインプットとします。この例では、spoutは「words」をidとして与えboltsは「exclaim1」「exclaim2」をidとして与えられています。 処理ロジックを含んだオブジェクトは、spoutにIRichSpoutインターフェースを実装し、boltsにIRichBoltインターフェースを実装しています。 最後のパラメタは、ノードに求める並列化の量を指します。どれほどのクラスタを超えたコンポーネントを実行するスレッドの数を指し示します。omitした場合、Stormは1つのスレッドのみを割り当てます。 setBolt はBoltの入力を定義するために利用されるInputDeclarerオブジェクトを返します。ここでは、コンポーネント「exclaim1」をシャッフルグルーピングを用いて「words」コンポーネントによって発行された全てのタプルを読むことを宣言します。「exclaim2」コンポーネントは、「exclaim1」コンポーネントによって発行されたすべてのタプルを読むコを宣言します。「シャッフルグルーピング」とはタプルが入力タスクからランダムにboltのタスクに配送されることを意味します。コンポーネント間のデータのグルーピングには多くの方法があります。いくつかのセクションで説明します。
builder.setBolt("exclaim2", new ExclamationBolt(), 5)
.shuffleGrouping("words")
.shuffleGrouping("exclaim1");
上記の通り、Boltの入力宣言は複数のソースをつなげて記述出来ます。
このトポロジーのspoutとboltの実装について深く確認してみましょう。Spoutsはトポロジーに新しいメッセージを発行することについて責務をおいます。このトポロジーのTestWordSpoutはリスト「nathan mike jackson golda bertels」のリストから100msごとにランダムな単語を発行します。TestWordSpoutのnextTuplue()はこのようになります。
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}
上述の通り、実装は単純です。
ExclamationBolt は入力に"!!!"を追加します。ExclamtionBoltのすべての実装を見てみましょう。
public static class ExclamationBolt implements IRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
public Map getComponentConfiguration() {
return null;
}
}
prepareメソッドは、このboltからのタプルの発行に使われるOutputCollectorを提供します。タプルはboltからいつでも発行出来ます。prepare,executeまたはcleanupメソッド、他スレッドで非同期に発行することも出来ます。このprepareの実装は、シンプルにOutputCollecotrをexecuteメソッドで後ほど利用するインスタンス変数として保持しています。
executeメソッドはboltの入力の一つからタプルを受け取ります。ExclamationBoltはタプルのはじめのフィールドを引き回し、そこに!!!を追加して新しいタプルを発行します。もし複数のソースを入力とするboltを実装した場合、Tuple#getSourceComponentメソッドを使ってどのコンポーネントからタプルがきたかを調べることが出来る。 executeメソッドには他の事柄もあります。すなわち、入力タプルはemitの第一引数渡され、最後の行で応答されます。データをロスすることないStormの信頼APIがあるが後ほど説明します。
cleanupメソッドは、Boltが停止するときのリソースをクリーンアップするときに呼び出される。このメソッドはクラスタ上で呼び出される保証がない。 例えば、マシンのタスクが実行中のとき、メソッドを実行する手段がない。cleanupメソッドは,ローカルモードでトポロジーを実行している場合に利用されることを意図しており、リソースがリークすることなく多くのトポロジーの実行とkillを行うことが出来る。 declareOutputFieldsメソッドは、ExclamationBoltが「word」と呼ばれるフィールドを持つ1タプルを発行することを宣言している。
getComponentConfigurationメソッドは、どのようにコンポーネントを実行するかの各種設定を行うことが出来る。これはConfigurationを説明する高度なトピックです。 cleanupやgetComponentConfigurationのようなメソッドは、boltの実装に必ずしも必要なものではありません。boltは、デフォルトの実装を提供するベースクラスを利用することでより簡潔に定義できます。ExclamationBoltは、BaseRichBoltを継承することでより簡潔に書くことができます。
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
ローカルモードでのExclamationTopologyの実行
どのようにローカルモードでExclamationTopologを実行・動作するかをみていきましょう。
Stormは2つの実行モードがあります。ローカルモードとdistributedモード。ローカルモードでは、Stormはワーカーノードをスレッドを用いてシミュレーションして実行します。ローカルモードはトポロジーのテストや開発に利用するのに適しています。storm-starterでトポロジーを実行する場合、ローカルモードで起動し、コンポーネントが発行されるメッセージを確認することができます。より詳細なトポロジーのローカルモードでの実行については、Local mode(https://storm.apache.org/documentation/Local-mode.html)に示します。
distributedモードでは、Stormはマシンのクラスタ上で動作します。マスターにトポロジーをsubmitすると、トポロジーの実行に必要な全てのコードがsubmitされます。マスターはコードの配布とトポロジーを実行するためのワーカーの準備を行います。ワーカーが停止する場合、マスターは他のものにアサインし直します。より詳細なクラスター上での実行については、Runnning topologies on a production cluster(https://storm.apache.org/documentation/Running-topologies-on-a-production-cluster.html)に示します。
このコードは、ExclamationTopologyをローカルモードで実行します。
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
はじめに、コードはLocalClusterオブジェクトによって作成されるクラスタを定義しています。仮想クラスタへのトポロジーのsubmitは、distributedクラスタへのsubmitと同様です。submitTopologyメソッドによって、LocalClusterにトポロジーがsubmitしています。引数は、トポロジーの名前、トポロジーの設定、トポロジー本体です。 引数nameはトポロジーを停止するときなどに、トポロジーを識別することに使われます。トポロジーはkillされるまで無期限で動作します。 引数configurationは、トポロジーを動作させるための各種設定に利用されます。2つの設定は共通的に扱われます。
- TOPOLOGY_WORKERS(set with setNumWorkers) トポロジーを実行するために、必要なプロセス数を定義します。トポロジーのコンポーネントは多くのスレッドで実行されます。コンポーネントに割り当てられるスレッドの数は、setBoltとsetSpoutメソッドによって設定されます。これらのスレッドは、ワーカープロセス内に存在します。ワーカープロセスは、いくつかのコンポーネントのためのいくつかのスレッドを含んでいます。例えば、300スレッドを全てのコンポーネントと50ワーカープロセスを設定で指定します。それぞれのワーカープロセスは、6スレッドで実行されます。Stormトポロジーの性能を、それぞれのコンポーネントとスレッドが実行されるワーカープロセス数の並列処理の微調整で、チューニングすることができます。
- TOPOLOGY_DEBUG(set with setDebug) trueに設定した場合、Stormはコンポーネントにメッセージを発行するごとにログ出力します。ローカルモードでトポロジーのテストを行うときに有益です。ただし、トポロジーをクラスター上で実行する場合は、停止したほうがよいでしょう。
他にも多くのトポロジーの設定があります。各種設定の詳細については、Javadoc for Config(https://storm.apache.org/javadoc/apidocs/backtype/storm/Config.html)に示します。 ローカルモードでトポロジーを実行するためにどのように開発環境(Eclipseなど)をセットアップするかについては、Creating a new Storm Project(https://storm.apache.org/documentation/Creating-a-new-Storm-project.html)に示します。
Stream groupings
streamのグルーピングは、2つのコンポーネント間でタプルをどのように送信するかをトポロジーに示します。spoutsとboltsは、多くのタスクをクラスタ間で並列に実行します。トポロジーのタスクレベルの実行は、以下のように行われています。
※storm.apache.orgより引用
BoltAのタスクがBoltBにタプルを発行するとき、どのタスクにタプルを送信すればよいでしょうか? stream groupingは、タスクの組み合わせの間をどのようにタプルを送信するかをStormに指示するものです。異なる種類のstream groupingsについて深堀するまえに、storm-starterのもう一つのトポロジーを確認してみましょう。
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8)
.shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 12)
.fieldsGrouping("split", new Fields("word"));
SplitSentenceは、受け取った文章中のそれぞれの単語をタプルとして発行し、WordCountは単語とカウントをmapに保持します。WordCountがwordを受け取るごとに、状態と新しい単語数を発行します。
stream groupingsにはいくつかの種類があります。
単純なグルーピングは、「shuffule grouping」と呼ばれ、タプルをランダムにタスクに送出します。shuffule groupingはWordCountTopologyのRandomSentenceSpoutからSplitSentence boltへのタプルの送出に利用されています。SplitSentence boltのタスクへ均等にタプルを配布する効果があります。
さらに興味深いgroupingの種類は「fields grouping」です。field groupingは、SplitSentence boltとWordCount boltの間で利用されています。同じwordが同じタスクに送信される必要があるWordCount boltの機能にとって重大です。一つ以上のタスクが同じ単語を見てしまうと、それぞれ不完全な値を持つため、誤ったカウントを送信してしまいます。fields groupingは、フィールドによってストリームをグルーピングします。これによって、同じタスクに同じフィールド値を持つものが送信されます。WordCountがSplitSentenceのwordフィールドによるfield groupingの出力ストリームを受け取ることにより、同じ単語は常に同じタスクに送信され、boltは正確な値を作成することができます。
Fields groupingはストリームの結合やストリームの集計やその他のユースケースの基本となります。fields groupingsはmodハッシングを利用しています。 他にもいくつかのstream groupingsがあります。詳細については、Consepts(https://storm.apache.org/documentation/Concepts.html)に示します。
Boltsを他言語で定義する
省略
メッセージ処理の保証
このチュートリアルでは、タプルの発行方法についてのいくつかについてはスキップしました。これらのaspectsはStormの信頼性APIの一部です。Stormがどのようにspoutから送信される全てのメッセージが処理されるかを保証方法です。どのように働き、Stormの信頼性機能を利用するかの情報については、Guaranteeing message processing(https://storm.apache.org/documentation/Guaranteeing-message-processing.html)を確認してください。
トランザクショナルトポロジー
Stormは、全てのメッセージが最低1回トポロジーで処理されることを保証します。共通の質問としてたずねられるのが「どのようにStormのtopでカウントしているのか?overcountすることはないか?」です。Stormは、計算のために正確に1回メッセージ処理を実現するトランザクショナルトポロジーと呼ばれる機能を持っています。詳しい内容についてはこちら(https://storm.apache.org/documentation/Transactional-topologies.html)に示します。
配送されたRPC
このチュートリアルでは、Stormによるstream処理の基本を説明しました。他にも多くのことがStreamの基本によって出来ます。Stormのアプリケーションの中で興味深い一つはDistributed RPCです、on the fly で並列計算を行うときに。Distributed RPCについてはこちら(https://storm.apache.org/documentation/Distributed-RPC.html)に示します。
Conclusion
このチュートリアルでは、Stormトポロジーの開発、テスト、配備の概要について説明しました。他のドキュメントはStormを利用する際の側面についてより詳細に示しています。