Rinda2
Rindaは分散処理システムLindaのtuple spaceのRubyによる実装です Lindaではタプルとタプル空間の二つの概念があります。 タスクはタプルをタプル空間へ書き込んだり、 タプル空間からタプルを取り出したりすることで通信を行ないます。 書き込み、取り出しはアトミックな操作です。 パターンマッチングによって取り出すタプルを指定するところが特徴です。
タプルとパターンマッチング
いくつかの値が組になったデータをタプルと呼びます。 Rinda2ではArrayまたは限定されたHashでタプルを表現します。
Arrayで示すタプルの例を示します。
['abc', 2, 5] [:matrix, 1,6, 3.14] ['family', 'is-sister', 'Carolyn', 'Elinor']
Rinda2はタプルの各要素を == または === で比較してパターンマッチングを 行ないます。またnilはワイルドカードとなってどの値ともマッチします。 タプルのすべての要素がマッチした場合、 パターンとタプルがマッチしたことになります。 各要素の比較を疑似コードで書くと次のようになります。
return true if pattern.nil? return true if pattern == actual retrun true if pattern === actual return false
全ての要素について上記のルールで比較を行ない、全てがtrueであれば パターンマッチングが成功します。
次のスクリプト片はRinda::Templateクラスを直接使って パターンマッチングの様子を示しています。 Rinda::TemplateクラスはRindaの内部で生成されるもので アプリケーションが直接扱うものではありませんが、 説明のために使用しています。
require 'rinda/rinda' template = Rinda::Template.new(['abc', nil, nil]) template.match(['abc', 2, 5]) # true template.match(['abcd', 2, 5]) # false
パターンとの比較に == と === を用いるので、 クラスによる検査や正規表現による検査を記述することができます。
template = Rinda::Template.new([String, Integer, nil]) template.match(['abc', 2, 5]) # true template.match(['abcd', 2, 5]) # true template = Rinda::Template.new([/^abc/, Integer, nil]) template.match([/^abc/, Integer, nil]) # true template.match(['abc', 2, 5]) # true template.match(['def', 2, 5]) # false
HashによるタプルはRinda2で追加されました。 キーをStringに限定したHashもタプルとして扱えます。 パターンマッチングは基本的にArrayと同様です。
template = Rinda::Template.new({'name'=>String, 'age'=>Integer}) template.match({'name'=>'seki', 'age'=>0x20}) #true template.match({'name'=>:seki, 'age'=>0x20}) #false
パターンの全ての要素がマッチしても、要素数の異なるタプルはマッチしません。
template.match({'name'=>'seki', 'age'=>0x20, 'url'=>'http://www.druby.org'}) # false
タプル空間
タプルを置いておく場所をタプル空間と呼びます。 掲示板をイメージすると良いかもしれません。
Rinda2ではRinda::TupleSpaceクラスがタプル空間を実現しています。
タプルの登録・取り出し
Rinda::TupleSpaceにタプルを登録するにはwriteメソッドを、 タプルを取り出すにはtakeメソッドを用います。
ts.write(tuple, sec=nil) # ^aTupleEntry
-
前述のようにtupleはArrayかHashで記述します。 Rinda2からtupleの有効期限を指定できるなりました。
writeはTupleEntryを返します。TupleEntryはこの操作で追加した タプル管理オブジェクトで、登録したタプルの有効期限を操作するのに 使います。くわしい使い方は後述します。
ts.take(pattern, sec=nil, &block) # ^anArray
-
タプル空間からパターンにマッチしたタプルを取り出すにはtakeメソッドを使います。 タプル空間にパターンにマッチするタプルがないとき、takeはブロックします。
第二引数secで有効な秒数を指定すると、 その時間が過ぎた時にtakeをあきらめRequestExpiredErrorが発生します。
検索
read, read_allを使ってタプル空間内のタプルを検索できます。
ts.read(pattern, sec=nil, &block) # ^anArray
-
takeと同様にタプル空間を検索するのがreadです。 takeとの違いはreadされたタプルはタプル空間から取り除かれない点です。
マッチするタプルがないとき、readはブロックします。 secで指定した秒数を過ぎるとRequestExpiredErrorが発生します。
ts.read_all(pattern) # ^Array of タプル
-
タプル空間の中でpatternマッチするすべてのタプルを検索します。 マッチしたタプルの配列が返ります。 マッチするタプルが存在しないとき、空の配列([])が返ります。
イベント通知
ts.notify(event, pattern, sec=nil) # ^NotifyTemplateEntry
-
タプル空間に発生したイベントの通知を懇請します。 patternにマッチしたタプルに関するイベント('write' | 'take')を 受け取ることができます。 興味のあるイベントはeventで指定します。nilを指定するとすべての イベントが対象となります。
イベント通知の詳細は後述します。
n-Queen
Rindaを使って、大きな問題を複数のマシンに分散して処理を行なう例を示します。 ここでは素朴なアルゴリズムでn-Queenを解く処理を、分散してみます。
まずn-Queenを解くモジュールを示します。 分散処理で効率があがることを確認するのが目的であるので、 素朴に解いています。
# nqueen.rb module NQueen def expand(size, &block) size.times do |row| nqueen(size, [], row, &block) end end def nqueen(size, board, row, &block) board.each_with_index do |v, c| check = v - row return if check == 0 || check.abs == board.length - c end board = board + [row] if board.length == size yield(board) return end size.times do |r| nqueen(size, board, r, &block) end end module_function :expand, :nqueen end if __FILE__ == $0 size = (ARGV.shift || '5').to_i nq = NQueen.expand(size) do |x| puts x.join(" ") end end
モジュールNQueenがに二つのメソッドが定義されています。 expandはn=sizeとしたn-Queenを解きます。 求めた解を一つずつblockにyieldします。
nqueen.rbの実行例を示します。
% ruby nqueen.rb 0 2 4 1 3 0 3 1 4 2 1 3 0 2 4 1 4 2 0 3 2 0 3 1 4 2 4 1 3 0 3 0 2 4 1 3 1 4 2 0 4 1 3 0 2 4 2 0 3 1
分散版n-Queen
登場するサービスは次の三種類です。
- nq_ts.rb − TupleSpaceサービス
- nq_engine.rb − n-Queenを求めるサービス
- nq_root.rb − n-Queenのクライアントで、1行目の初期値ごとに処理を分割して処理を依頼する。 また、求めた解を集める係でもある。
nq_engine.rbが二つのときの構成図を示します。
それぞれのサービスは [nq_ts] のTupleSpaceを介して通信します。 処理の要求と完了の通知を TupleSpace を使って交換します。 処理要求のタプルと完了通知のタプルは次の通り。
-------------------------------------- # request [:nqueen, key, size, board, row, stream] # response [:nqueen_done, key] --------------------------------------
処理要求は6つの要素からなるタプルです。 先頭はメッセージの種類をしめすシンボル :nqueen です。 2つ目のkeyはそれぞれの処理を区別するための識別子で、 完了通知のタプルのkeyに用いられます。 3〜5番目の要素 size, board, row は NQueen.nqueen へ渡すパラメータです。 最後のstreamは結果を受け取るオブジェクトです。 解を一つ求めるごとに stream.push(x) で解を送ります。 実際には stream は処理を要求したプロセスにある分散オブジェクトで、 nq_engineは解一つ一つを分散オブジェクトのメソッド呼び出しを使って 転送することになります。
nq_engineは次のパターンでタプルを取り出します。
-------------------------------------- [:nqueen, nil, Integer, Array, Integer, nil] --------------------------------------
完了通知は2つの要素からなるタプルです。 先頭はメッセージの種類をしめすシンボル :nqueen_done、 2つ目は処理の識別子で処理要求にあったkeyをそのまま転記します。
nq_root.rb
NQueenのメイン部分、nq_root.rbを見てみましょう。
# nq_root.rb require 'drb/drb' require 'rinda/rinda' require 'monitor' class RemoteNQueen class Stream include MonitorMixin def initialize(block) super() @block = block end def push(x) synchronize do @block.call(x) end end end def initialize(tuplespace) super() @ts = tuplespace end def expand(size, &block) key = "#{DRb.uri}:#{id}" stream = Stream.new(block) size.times do |row| @ts.write([:nqueen, key, size, [], row, DRbObject.new(stream)]) end size.times do |row| @ts.take([:nqueen_done, key]) end end end if __FILE__ == $0 DRb.start_service ts_uri = ARGV.shift || raise ts = DRbObject.new(nil, ts_uri) rq = RemoteNQueen.new(ts) rq.expand((ARGV.shift || '5').to_i) do |x| puts x.join(" ") end end
nq_rootの主処理部分は RemoteNQueen#expand です。 TupleSpaceにwriteするタプルとtakeするパターンに注目して下さい。
このサンプルでは一行ごとに処理を分割して要求し、 どの要求が完了したかについては、気にしないこととします。 ですから、 keyにはほかのプロセスからの要求と混同しない程度に一意な識別子として、 DRb.uriとselfのidから作った文字列を使います。 それぞれの要求には同じkeyが与えられるが、 他のプロセスとは異なるkeyとなります。
Streamは解を集めるためのシンプルなクラスで、 排他制御しつつpushされたオブジェクトを指定されたブロックへ渡す係です。 DRbObject.new(stream)としているのは、streamの分散オブジェクトをタプルへ 渡したいためです。
StreamをDRbUndumped化しても同じ効果が得られますが、タプルそのものが 一旦分散オブジェクトとして渡されるため若干効率が落ちてしまいます。
実行のパラメータにはTupleSpaceのURIを渡し、 nq_engine.rbのURIを交換していません。 nq_root.rbが直接知らなければならないのはTupleSpaceだけですむからです。
nq_engine.rb
NQueenのエンジン部分、nq_engine.rbを説明します。
# nq_engine.rb require 'drb/drb' require 'nqueen' class NQEngine def initialize(ts) @ts = ts end def main_loop while true sym, key, size, board, row, stream = take_request begin p [key, row] if $DEBUG do_it(key, size, board, row, stream) rescue ensure write_done(key) end end end def do_it(key, size, board, row, stream) NQueen.nqueen(size, board, row) do |x| stream.push(x) end end def take_request @ts.take([:nqueen, nil, Integer, Array, Integer, nil]) end def write_done(key, timeout=30) @ts.write([:nqueen_done, key], timeout) end end if __FILE__ == $0 DRb.start_service ts_uri = ARGV.shift || raise ts = DRbObject.new(nil, ts_uri) engine = NQEngine.new(ts) engine.main_loop end
NQEngine#main_loopが主処理です。次の処理を繰り返し行ないます。
- 処理要求のタプルをTupleSpaceからtakeし、(take_request)
- 解を求め、 (do_it)
- 完了通知をwriteする (write_done)
NQEngine#take_requestのパターンについては既に説明しました。 @ts.takeによって要求がくるまでブロックして待っています。
NQEngine#write_doneでwriteにtimeout=30を指定しています。 これは完了通知のタプルが30秒間有効であることをしめします。 デバッグ中など、nq_root.rbが全体の処理の終了を待たずに停止した場合、 完了通知のタプルが回収されることなくTupleSpaceに残り続けることを 避けるために有効期限を指定しています。 30秒間以上takeされることがなかった完了通知のタプルは TupleSpaceによって削除されます。
nq_ts.rb
最後にTupleSpaceサービスであるnq_ts.rbを示します。
# nq_ts.rb require 'rinda/tuplespace' require 'drb/drb' uri = ARGV.shift ts = Rinda::TupleSpace.new DRb.start_service(uri, ts) puts DRb.uri $stdin.gets
Rinda::TupleSpaceを生成してDRb.start_serviceをするだけの簡単なスクリプトです。
nq_ts.rb, nq_root.rb, nq_engine.b の三つのスクリプトのうち、 nq_ts.rbは最初に起動しておきます。 nq_root.rb, nq_engine.rbはどちらを先に起動してもかまいません。 それぞれ複数起動して試してみるのもおもしろいでしょう。
nq_engine.rbを複数のマシンで起動すると処理の負荷を分散させることができます。 通信のオーバーヘッドがあるためn台でn倍の性能とはいきませんが、 (大きなサイズのn-Queenでは特に)一台で処理を行なうよりも速く終了すると思います。
実行例を見てみましょう。
-- Terminal 1 ------------------------------------ % ruby nq_ts.rb druby://host:12345 -- Terminal 2 ------------------------------------ % ruby nq_engine.rb druby://host:12345 -- Terminal 3 ------------------------------------ % ruby nq_root.rb druby://host:12345 0 2 4 1 3 0 3 1 4 2 1 3 0 2 4 1 4 2 0 3 2 0 3 1 4 2 4 1 3 0 3 0 2 4 1 3 1 4 2 0 4 1 3 0 2 4 2 0 3 1
TupleSpaceProxy
分散版のn-Queenを次のように異常終了させると、 処理が終了しなくなることがあります。
-- Terminal 1 ------------------------------------ % ruby nq_ts.rb druby://host:12345 -- Terminal 2 ------------------------------------ % ruby nq_engine.rb druby://host:12345 ## このプロセスを [Cntl-C] などで一度停止させる /usr/local/lib/ruby/site_ruby/1.6/drb/drb.rb:103:in `read': Interrupt .... ## 再起動 % ruby nq_engine.rb druby://host:12345 -- Terminal 3 ------------------------------------ % ruby nq_root.rb druby://host:12345 1 3 0 2 4 1 4 2 0 3 2 0 3 1 4 2 4 1 3 0 3 0 2 4 1 3 1 4 2 0 4 1 3 0 2 4 2 0 3 1 ## 終了しない‥
nq_engineがtake中に停止したのに、nq_tsでのtake処理が中断しないために 発生します。これはdRubyの制限です。 dRubyではリモートのメソッド呼び出しで、 呼び出し側のプロセスやスレッドの終了がリモートに伝わらないため、 リモートでの処理が続行してしまうのです。
このために、TupleSpaceにはtakeによく似たmoveメソッドが用意されています。
ts.move(port, pattern, sec=nil, &block) # ^anArray
-
takeと同様だが、返却する値を port.push(value) してからタプルを削除します。 port.push(value)が失敗するとこのmoveは無効になります。 これによって、タプルの移動の失敗を捕捉することができます。
moveは単体で使われることはあまりありません。 rinda/rinda.rbには、このmoveメソッドを使って安全なtakeを行なう TupleSpaceProxyクラスが定義されています。
TupleSpaceProxyに準備されているメソッドはほぼTupleSpaceと同じです。 使用方法もTupleSpaceと同様です。
Rinda::TupleSpaceProxy.new(tuplespace)
-
リモートのTupleSpaceを与えてTupleSpaceProxyを生成します
TupleSpaceProxyを使用するように変更したnq_root2.rbとnq_engine2.rbを示します。 どちらも変更はTupleSpaceProxyを生成するようになった部分だけです。
# nq_root2.rb require 'drb/drb' require 'rinda/rinda' require 'monitor' class RemoteNQueen class Stream include MonitorMixin def initialize(block) super() @block = block end def push(x) synchronize do @block.call(x) end end end def initialize(tuplespace) super() @ts = tuplespace end def expand(size, &block) key = "#{DRb.uri}:#{id}" stream = Stream.new(block) size.times do |row| @ts.write([:nqueen, key, size, [], row, DRbObject.new(stream)]) end size.times do |row| @ts.take([:nqueen_done, key]) end end end if __FILE__ == $0 DRb.start_service ts_uri = ARGV.shift || raise ts = DRbObject.new(nil, ts_uri) rq = RemoteNQueen.new(Rinda::TupleSpaceProxy.new(ts)) ## rq.expand((ARGV.shift || '5').to_i) do |x| puts x.join(" ") end end
nq_engin2.rbです。require 'rinda/rinda'も追加されています。
# nq_engine2.rb require 'drb/drb' require 'rinda/rinda' ## require 'nqueen' class NQEngine def initialize(ts) @ts = ts end def main_loop while true sym, key, size, board, row, stream = take_request begin p [key, row] if $DEBUG do_it(key, size, board, row, stream) rescue ensure write_done(key) end end end def do_it(key, size, board, row, stream) NQueen.nqueen(size, board, row) do |x| stream.push(x) end end def take_request @ts.take([:nqueen, nil, Integer, Array, Integer, nil]) end def write_done(key, timeout=30) @ts.write([:nqueen_done, key], timeout) end end if __FILE__ == $0 DRb.start_service ts_uri = ARGV.shift || raise ts = DRbObject.new(nil, ts_uri) engine = NQEngine.new(Rinda::TupleSpaceProxy.new(ts)) ## engine.main_loop end
先ほどと同じ手順で実験してみましょう。
-- Terminal 1 ------------------------------------ % ruby nq_ts.rb druby://host:12345 -- Terminal 2 ------------------------------------ % ruby nq_engine.rb druby://host:12345 ## このプロセスを [Cntl-C] などで一度停止させる /usr/local/lib/ruby/site_ruby/1.6/drb/drb.rb:103:in `read': Interrupt .... ## 再起動 % ruby nq_engine.rb druby://host:12345 -- Terminal 3 ------------------------------------ % ruby nq_root.rb druby://host:12345 0 2 4 1 3 0 3 1 4 2 1 3 0 2 4 1 4 2 0 3 2 0 3 1 4 2 4 1 3 0 3 0 2 4 1 3 1 4 2 0 4 1 3 0 2 4 2 0 3 1
TupleSpaceProxyを使っているので、 takeが中途半端に動作してタプルを紛失することはなくなったので、 今度は正常に終了します。 *1
TupleEntry
writeしたタプルや、take要求を取り消したり有効期限を延長することができます。 これらの操作のためにTupleEntryが用意されています。
writeは、writeしたタプルを操作するTupleEntryを返します。
FIXME
イベントの通知
タプル空間へのwrite, takeが行なわれるとイベントが発生します。
イベントはwrite, takeされたタプルの前に操作名("write"または"take")を つけたタプルです。
ts.write(["Hello", "World"]) # -> ["write", ["Hello", "World"]] ts.take(["Hello", nil]) # -> ["take", ["Hello", "World"]]
イベントは次のように取り出します。
- タプル空間にnotifyメソッドでイベントの通知を懇請し、 イベントの取り出し口(NotifyTemplateEntry)をもらいます。
- eachメソッドでイベントを一つずつ取り出します。 notifierの期限が過ぎたり、cancelされるとeachメソッドは終了します。
---------------------------------- # イベント通知の懇請 notifier = ts.notify(nil, [String, nil]) # イベント取り出し notifier.each do |event, tuple| ... end ----------------------------------
ts.notify(event, pattern, sec=nil) # ^NotifyTemplateEntry
-
タプル空間に発生したイベントの通知を懇請します。 patternにマッチしたタプルに関するイベント('write' | 'take')を 受け取ることができます。 興味のあるイベントはeventで指定します。nilを指定するとすべての イベントが対象となります。
NotifyTemplateEntry#each
-
パターンにマッチしたイベントを一つずつyieldします。 期限が過ぎたり、cancelされるとeachは終了します。 イベントが空の時、現在のスレッドはブロックし、 イベントが届くまで停止します。
NotifyTemplateEntry#cancel
-
notifyをキャンセルします。 キューに蓄えられているイベントが尽きた時、eachは終了します。 キャンセルされたnotifyに新たなイベントが通知されることはありません。
*1ただし、処理中に停止させるとそのタプルは紛失してしまいます。