Rinda2

Rindaは分散処理システムLindaのtuple spaceのRubyによる実装です Lindaではタプルとタプル空間の二つの概念があります。 タスクはタプルをタプル空間へ書き込んだり、 タプル空間からタプルを取り出したりすることで通信を行ないます。 書き込み、取り出しはアトミックな操作です。 パターンマッチングによって取り出すタプルを指定するところが特徴です。

タプルとパターンマッチング

いくつかの値が組になったデータをタプルと呼びます。 Rinda2ではArrayまたは限定されたHashでタプルを表現します。

Arrayで示すタプルの例を示します。

['abc', 2, 5]
[:matrix, 1,6, 3.14]
['family', 'is-sister', 'Carolyn', 'Elinor']

Rinda2はタプルの各要素を == または === で比較してパターンマッチングを 行ないます。またnilはワイルドカードとなってどの値ともマッチします。 タプルのすべての要素がマッチした場合、 パターンとタプルがマッチしたことになります。 各要素の比較を疑似コードで書くと次のようになります。

return true if pattern.nil?
return true if pattern == actual
retrun true if pattern === actual
return false

全ての要素について上記のルールで比較を行ない、全てがtrueであれば パターンマッチングが成功します。

次のスクリプト片はRinda::Templateクラスを直接使って パターンマッチングの様子を示しています。 Rinda::TemplateクラスはRindaの内部で生成されるもので アプリケーションが直接扱うものではありませんが、 説明のために使用しています。

require 'rinda/rinda'

template = Rinda::Template.new(['abc', nil, nil])
template.match(['abc', 2, 5])   # true
template.match(['abcd', 2, 5])  # false

パターンとの比較に == と === を用いるので、 クラスによる検査や正規表現による検査を記述することができます。

template = Rinda::Template.new([String, Integer, nil])
template.match(['abc', 2, 5])   # true
template.match(['abcd', 2, 5])  # true

template = Rinda::Template.new([/^abc/, Integer, nil])
template.match([/^abc/, Integer, nil])  # true
template.match(['abc', 2, 5])  # true
template.match(['def', 2, 5])  # false

HashによるタプルはRinda2で追加されました。 キーをStringに限定したHashもタプルとして扱えます。 パターンマッチングは基本的にArrayと同様です。

template = Rinda::Template.new({'name'=>String, 'age'=>Integer})
template.match({'name'=>'seki', 'age'=>0x20})  #true
template.match({'name'=>:seki, 'age'=>0x20})  #false

パターンの全ての要素がマッチしても、要素数の異なるタプルはマッチしません。

template.match({'name'=>'seki', 'age'=>0x20, 'url'=>'http://www.druby.org'}) # false

タプル空間

タプルを置いておく場所をタプル空間と呼びます。 掲示板をイメージすると良いかもしれません。

Rinda2ではRinda::TupleSpaceクラスがタプル空間を実現しています。

タプルの登録・取り出し

Rinda::TupleSpaceにタプルを登録するにはwriteメソッドを、 タプルを取り出すにはtakeメソッドを用います。

ts.write(tuple, sec=nil) # ^aTupleEntry

前述のようにtupleはArrayかHashで記述します。 Rinda2からtupleの有効期限を指定できるなりました。

writeはTupleEntryを返します。TupleEntryはこの操作で追加した タプル管理オブジェクトで、登録したタプルの有効期限を操作するのに 使います。くわしい使い方は後述します。

ts.take(pattern, sec=nil, &block) # ^anArray

タプル空間からパターンにマッチしたタプルを取り出すにはtakeメソッドを使います。 タプル空間にパターンにマッチするタプルがないとき、takeはブロックします。

第二引数secで有効な秒数を指定すると、 その時間が過ぎた時にtakeをあきらめRequestExpiredErrorが発生します。

検索

read, read_allを使ってタプル空間内のタプルを検索できます。

ts.read(pattern, sec=nil, &block) # ^anArray

takeと同様にタプル空間を検索するのがreadです。 takeとの違いはreadされたタプルはタプル空間から取り除かれない点です。

マッチするタプルがないとき、readはブロックします。 secで指定した秒数を過ぎるとRequestExpiredErrorが発生します。

ts.read_all(pattern) # ^Array of タプル

タプル空間の中でpatternマッチするすべてのタプルを検索します。 マッチしたタプルの配列が返ります。 マッチするタプルが存在しないとき、空の配列([])が返ります。

イベント通知

ts.notify(event, pattern, sec=nil) # ^NotifyTemplateEntry

タプル空間に発生したイベントの通知を懇請します。 patternにマッチしたタプルに関するイベント('write' | 'take')を 受け取ることができます。 興味のあるイベントはeventで指定します。nilを指定するとすべての イベントが対象となります。

イベント通知の詳細は後述します。

n-Queen

Rindaを使って、大きな問題を複数のマシンに分散して処理を行なう例を示します。 ここでは素朴なアルゴリズムでn-Queenを解く処理を、分散してみます。

まずn-Queenを解くモジュールを示します。 分散処理で効率があがることを確認するのが目的であるので、 素朴に解いています。

# nqueen.rb
module NQueen
  def expand(size, &block)
    size.times do |row|
      nqueen(size, [], row, &block)
    end
  end

  def nqueen(size, board, row, &block)
    board.each_with_index do |v, c|
      check = v - row
      return if check == 0 || check.abs == board.length - c
    end

    board = board + [row]

    if board.length == size
      yield(board)
      return
    end

    size.times do |r|
      nqueen(size, board, r, &block)
    end
  end
  module_function :expand, :nqueen
end

if __FILE__ == $0
  size = (ARGV.shift || '5').to_i
  nq = NQueen.expand(size) do |x|
    puts x.join(" ")
  end
end
NQueen.expand(size, &block)

モジュールNQueenがに二つのメソッドが定義されています。 expandはn=sizeとしたn-Queenを解きます。 求めた解を一つずつblockにyieldします。

nqueen.rbの実行例を示します。

% ruby nqueen.rb
0 2 4 1 3
0 3 1 4 2
1 3 0 2 4
1 4 2 0 3
2 0 3 1 4
2 4 1 3 0
3 0 2 4 1
3 1 4 2 0
4 1 3 0 2
4 2 0 3 1

分散版n-Queen

登場するサービスは次の三種類です。

  • nq_ts.rb − TupleSpaceサービス
  • nq_engine.rb − n-Queenを求めるサービス
  • nq_root.rb − n-Queenのクライアントで、1行目の初期値ごとに処理を分割して処理を依頼する。 また、求めた解を集める係でもある。

nq_engine.rbが二つのときの構成図を示します。

http://www2a.biglobe.ne.jp/%7eseki/ruby/nq.jpg それぞれのサービスは [nq_ts] のTupleSpaceを介して通信します。 処理の要求と完了の通知を TupleSpace を使って交換します。 処理要求のタプルと完了通知のタプルは次の通り。

--------------------------------------
# request
[:nqueen, key, size, board, row, stream]  

# response
[:nqueen_done, key]
--------------------------------------

処理要求は6つの要素からなるタプルです。 先頭はメッセージの種類をしめすシンボル :nqueen です。 2つ目のkeyはそれぞれの処理を区別するための識別子で、 完了通知のタプルのkeyに用いられます。 3〜5番目の要素 size, board, row は NQueen.nqueen へ渡すパラメータです。 最後のstreamは結果を受け取るオブジェクトです。 解を一つ求めるごとに stream.push(x) で解を送ります。 実際には stream は処理を要求したプロセスにある分散オブジェクトで、 nq_engineは解一つ一つを分散オブジェクトのメソッド呼び出しを使って 転送することになります。

nq_engineは次のパターンでタプルを取り出します。

--------------------------------------
[:nqueen, nil, Integer, Array, Integer, nil]
--------------------------------------

完了通知は2つの要素からなるタプルです。 先頭はメッセージの種類をしめすシンボル :nqueen_done、 2つ目は処理の識別子で処理要求にあったkeyをそのまま転記します。

nq_root.rb

NQueenのメイン部分、nq_root.rbを見てみましょう。

# nq_root.rb
require 'drb/drb'
require 'rinda/rinda'
require 'monitor'

class RemoteNQueen
  class Stream
    include MonitorMixin
    def initialize(block)
      super()
      @block = block
    end

    def push(x)
      synchronize do
        @block.call(x)
      end
    end
  end

  def initialize(tuplespace)
    super()
    @ts = tuplespace
  end

  def expand(size, &block)
    key = "#{DRb.uri}:#{id}"
    stream = Stream.new(block)

    size.times do |row|
      @ts.write([:nqueen, key, size, [], row, DRbObject.new(stream)])
    end
    size.times do |row|
      @ts.take([:nqueen_done, key])
    end
  end
end

if __FILE__ == $0
  DRb.start_service

  ts_uri = ARGV.shift || raise
  ts = DRbObject.new(nil, ts_uri)

  rq = RemoteNQueen.new(ts)
  rq.expand((ARGV.shift || '5').to_i) do |x|
    puts x.join(" ")
  end
end

nq_rootの主処理部分は RemoteNQueen#expand です。 TupleSpaceにwriteするタプルとtakeするパターンに注目して下さい。

このサンプルでは一行ごとに処理を分割して要求し、 どの要求が完了したかについては、気にしないこととします。 ですから、 keyにはほかのプロセスからの要求と混同しない程度に一意な識別子として、 DRb.uriとselfのidから作った文字列を使います。 それぞれの要求には同じkeyが与えられるが、 他のプロセスとは異なるkeyとなります。

Streamは解を集めるためのシンプルなクラスで、 排他制御しつつpushされたオブジェクトを指定されたブロックへ渡す係です。 DRbObject.new(stream)としているのは、streamの分散オブジェクトをタプルへ 渡したいためです。

StreamをDRbUndumped化しても同じ効果が得られますが、タプルそのものが 一旦分散オブジェクトとして渡されるため若干効率が落ちてしまいます。

実行のパラメータにはTupleSpaceのURIを渡し、 nq_engine.rbのURIを交換していません。 nq_root.rbが直接知らなければならないのはTupleSpaceだけですむからです。

nq_engine.rb

NQueenのエンジン部分、nq_engine.rbを説明します。

# nq_engine.rb
require 'drb/drb'
require 'nqueen'

class NQEngine
  def initialize(ts)
    @ts = ts
  end

  def main_loop
    while true
      sym, key, size, board, row, stream = take_request
      begin
        p [key, row] if $DEBUG
        do_it(key, size, board, row, stream)
      rescue
      ensure
        write_done(key)
      end
    end
  end

  def do_it(key, size, board, row, stream)
    NQueen.nqueen(size, board, row) do |x|
      stream.push(x)
    end
  end

  def take_request
    @ts.take([:nqueen, nil, Integer, Array, Integer, nil])
  end

  def write_done(key, timeout=30)
    @ts.write([:nqueen_done, key], timeout)
  end
end

if __FILE__ == $0
  DRb.start_service

  ts_uri = ARGV.shift || raise
  ts = DRbObject.new(nil, ts_uri)

  engine = NQEngine.new(ts)
  engine.main_loop
end

NQEngine#main_loopが主処理です。次の処理を繰り返し行ないます。

  • 処理要求のタプルをTupleSpaceからtakeし、(take_request)
  • 解を求め、 (do_it)
  • 完了通知をwriteする (write_done)

NQEngine#take_requestのパターンについては既に説明しました。 @ts.takeによって要求がくるまでブロックして待っています。

NQEngine#write_doneでwriteにtimeout=30を指定しています。 これは完了通知のタプルが30秒間有効であることをしめします。 デバッグ中など、nq_root.rbが全体の処理の終了を待たずに停止した場合、 完了通知のタプルが回収されることなくTupleSpaceに残り続けることを 避けるために有効期限を指定しています。 30秒間以上takeされることがなかった完了通知のタプルは TupleSpaceによって削除されます。

nq_ts.rb

最後にTupleSpaceサービスであるnq_ts.rbを示します。

# nq_ts.rb
require 'rinda/tuplespace'
require 'drb/drb'

uri = ARGV.shift

ts = Rinda::TupleSpace.new
DRb.start_service(uri, ts)
puts DRb.uri
$stdin.gets

Rinda::TupleSpaceを生成してDRb.start_serviceをするだけの簡単なスクリプトです。

nq_ts.rb, nq_root.rb, nq_engine.b の三つのスクリプトのうち、 nq_ts.rbは最初に起動しておきます。 nq_root.rb, nq_engine.rbはどちらを先に起動してもかまいません。 それぞれ複数起動して試してみるのもおもしろいでしょう。

nq_engine.rbを複数のマシンで起動すると処理の負荷を分散させることができます。 通信のオーバーヘッドがあるためn台でn倍の性能とはいきませんが、 (大きなサイズのn-Queenでは特に)一台で処理を行なうよりも速く終了すると思います。

実行例を見てみましょう。

-- Terminal 1 ------------------------------------
% ruby nq_ts.rb
druby://host:12345

-- Terminal 2 ------------------------------------
% ruby nq_engine.rb druby://host:12345

-- Terminal 3 ------------------------------------
% ruby nq_root.rb druby://host:12345
0 2 4 1 3
0 3 1 4 2
1 3 0 2 4
1 4 2 0 3
2 0 3 1 4
2 4 1 3 0
3 0 2 4 1
3 1 4 2 0
4 1 3 0 2
4 2 0 3 1

TupleSpaceProxy

分散版のn-Queenを次のように異常終了させると、 処理が終了しなくなることがあります。

-- Terminal 1 ------------------------------------
% ruby nq_ts.rb
druby://host:12345

-- Terminal 2 ------------------------------------
% ruby nq_engine.rb druby://host:12345
## このプロセスを [Cntl-C] などで一度停止させる
/usr/local/lib/ruby/site_ruby/1.6/drb/drb.rb:103:in `read': Interrupt
     ....

## 再起動
% ruby nq_engine.rb druby://host:12345

-- Terminal 3 ------------------------------------
% ruby nq_root.rb druby://host:12345
1 3 0 2 4
1 4 2 0 3
2 0 3 1 4
2 4 1 3 0
3 0 2 4 1
3 1 4 2 0
4 1 3 0 2
4 2 0 3 1
## 終了しない‥

nq_engineがtake中に停止したのに、nq_tsでのtake処理が中断しないために 発生します。これはdRubyの制限です。 dRubyではリモートのメソッド呼び出しで、 呼び出し側のプロセスやスレッドの終了がリモートに伝わらないため、 リモートでの処理が続行してしまうのです。

http://www2a.biglobe.ne.jp/%7eseki/ruby/rinda_lost.jpg

このために、TupleSpaceにはtakeによく似たmoveメソッドが用意されています。

ts.move(port, pattern, sec=nil, &block) # ^anArray

takeと同様だが、返却する値を port.push(value) してからタプルを削除します。 port.push(value)が失敗するとこのmoveは無効になります。 これによって、タプルの移動の失敗を捕捉することができます。

moveは単体で使われることはあまりありません。 rinda/rinda.rbには、このmoveメソッドを使って安全なtakeを行なう TupleSpaceProxyクラスが定義されています。

TupleSpaceProxyに準備されているメソッドはほぼTupleSpaceと同じです。 使用方法もTupleSpaceと同様です。

Rinda::TupleSpaceProxy.new(tuplespace)

リモートのTupleSpaceを与えてTupleSpaceProxyを生成します

TupleSpaceProxyを使用するように変更したnq_root2.rbとnq_engine2.rbを示します。 どちらも変更はTupleSpaceProxyを生成するようになった部分だけです。

# nq_root2.rb
require 'drb/drb'
require 'rinda/rinda'
require 'monitor'

class RemoteNQueen
  class Stream
    include MonitorMixin
    def initialize(block)
      super()
      @block = block
    end

    def push(x)
      synchronize do
        @block.call(x)
      end
    end
  end

  def initialize(tuplespace)
    super()
    @ts = tuplespace
  end

  def expand(size, &block)
    key = "#{DRb.uri}:#{id}"
    stream = Stream.new(block)

    size.times do |row|
      @ts.write([:nqueen, key, size, [], row, DRbObject.new(stream)])
    end
    size.times do |row|
      @ts.take([:nqueen_done, key])
    end
  end
end

if __FILE__ == $0
  DRb.start_service

  ts_uri = ARGV.shift || raise
  ts = DRbObject.new(nil, ts_uri)

  rq = RemoteNQueen.new(Rinda::TupleSpaceProxy.new(ts)) ##
  rq.expand((ARGV.shift || '5').to_i) do |x|
    puts x.join(" ")
  end
end

nq_engin2.rbです。require 'rinda/rinda'も追加されています。

# nq_engine2.rb
require 'drb/drb'
require 'rinda/rinda'  ##
require 'nqueen'

class NQEngine
  def initialize(ts)
    @ts = ts
  end

  def main_loop
    while true
      sym, key, size, board, row, stream = take_request
      begin
        p [key, row] if $DEBUG
        do_it(key, size, board, row, stream)
      rescue
      ensure
        write_done(key)
      end
    end
  end

  def do_it(key, size, board, row, stream)
    NQueen.nqueen(size, board, row) do |x|
      stream.push(x)
    end
  end

  def take_request
    @ts.take([:nqueen, nil, Integer, Array, Integer, nil])
  end

  def write_done(key, timeout=30)
    @ts.write([:nqueen_done, key], timeout)
  end
end

if __FILE__ == $0
  DRb.start_service

  ts_uri = ARGV.shift || raise
  ts = DRbObject.new(nil, ts_uri)

  engine = NQEngine.new(Rinda::TupleSpaceProxy.new(ts)) ##
  engine.main_loop
end

先ほどと同じ手順で実験してみましょう。

-- Terminal 1 ------------------------------------
% ruby nq_ts.rb
druby://host:12345

-- Terminal 2 ------------------------------------
% ruby nq_engine.rb druby://host:12345
## このプロセスを [Cntl-C] などで一度停止させる
/usr/local/lib/ruby/site_ruby/1.6/drb/drb.rb:103:in `read': Interrupt
     ....

## 再起動
% ruby nq_engine.rb druby://host:12345

-- Terminal 3 ------------------------------------
% ruby nq_root.rb druby://host:12345
0 2 4 1 3
0 3 1 4 2
1 3 0 2 4
1 4 2 0 3
2 0 3 1 4
2 4 1 3 0
3 0 2 4 1
3 1 4 2 0
4 1 3 0 2
4 2 0 3 1

TupleSpaceProxyを使っているので、 takeが中途半端に動作してタプルを紛失することはなくなったので、 今度は正常に終了します。 *1

TupleEntry

writeしたタプルや、take要求を取り消したり有効期限を延長することができます。 これらの操作のためにTupleEntryが用意されています。

writeは、writeしたタプルを操作するTupleEntryを返します。

FIXME

イベントの通知

タプル空間へのwrite, takeが行なわれるとイベントが発生します。

イベントはwrite, takeされたタプルの前に操作名("write"または"take")を つけたタプルです。

ts.write(["Hello", "World"]) # -> ["write", ["Hello", "World"]]
ts.take(["Hello", nil])      # -> ["take", ["Hello", "World"]]

イベントは次のように取り出します。

  1. タプル空間にnotifyメソッドでイベントの通知を懇請し、 イベントの取り出し口(NotifyTemplateEntry)をもらいます。
  2. eachメソッドでイベントを一つずつ取り出します。 notifierの期限が過ぎたり、cancelされるとeachメソッドは終了します。
----------------------------------
# イベント通知の懇請
notifier = ts.notify(nil, [String, nil])
# イベント取り出し
notifier.each do |event, tuple|
  ...
end
----------------------------------
ts.notify(event, pattern, sec=nil) # ^NotifyTemplateEntry

タプル空間に発生したイベントの通知を懇請します。 patternにマッチしたタプルに関するイベント('write' | 'take')を 受け取ることができます。 興味のあるイベントはeventで指定します。nilを指定するとすべての イベントが対象となります。

NotifyTemplateEntry#each

パターンにマッチしたイベントを一つずつyieldします。 期限が過ぎたり、cancelされるとeachは終了します。 イベントが空の時、現在のスレッドはブロックし、 イベントが届くまで停止します。

NotifyTemplateEntry#cancel

notifyをキャンセルします。 キューに蓄えられているイベントが尽きた時、eachは終了します。 キャンセルされたnotifyに新たなイベントが通知されることはありません。


*1ただし、処理中に停止させるとそのタプルは紛失してしまいます。