6 マルチスレッド

dRubyのシステムは複数のプロセスで構成されます。 サーバにとっては、クライアントからのメソッド呼び出しがどのような タイミングで発生するか、予想がつきません。 いつでもメソッドが呼び出されてしまうかもしれない、という点で dRubyではマルチスレッドのプログラミングと同様の準備が必要です。

この章ではRubyのマルチスレッド機能とスレッド間通信のしくみを紹介し、 dRubyのサービスをマルチスレッドに対応するための方法を紹介します。

6.1 dRubyとマルチスレッド

dRubyとスレッドはとても深い関係にあり、 dRubyを使用する上でマルチスレッドプログラミングは必須です。 この節ではdRubyとマルチスレッドの関係を説明します。

6.1.1 いつでもマルチスレッド

dRubyではどのような場合にスレッドが生成されるのでしょう。 はじめにDRb.start_serviceによって、他のプロセスからのメソッド呼び出しに備える サーバスレッドが生成されます。 そして、サーバスレッドはクライアントからのメソッド呼び出し要求が届くたびに、 そのメソッド呼び出しの実行を担当するスレッドを生成します。

一つのリモートメソッド呼び出しを実行中であっても、 クライアントからの要求があれば新たなスレッドを起こしてメソッドを実行します。

プロセスAからプロセスBのオブジェクトを呼び出す様子を見てみましょう。

there = DRbObject.new_with_uri('...')
there.foo()

http://www2a.biglobe.ne.jp/%7eseki/ruby/d2drbsrv0.jpg

図6.1 リモートメソッド呼び出しの仕組み

プロセスBへのメソッド呼び出しリクエストが発生すると、 サーバスレッドのDRbServerがリクエストを受信します。 DRbServerはこのメソッド呼び出しのために新しいスレッド生成し、処理を任せます。 サーバスレッドはすぐに解放されて、次のメソッド呼び出しに備えるのです。

http://www2a.biglobe.ne.jp/%7eseki/ruby/d2drbsrv.jpg

図6.2 リモートメソッド呼び出しの仕組み(実装)

この仕組みによって、一つのリモートメソッド呼び出しの実行中であっても、 別の呼び出しを受け付け実行できるのです。 dRubyではリモートメソッド呼び出しの実行中に別の呼び出しはブロックされる といった制約はありませんから、 二つのプロセスが相互に呼び合うような状態であっても、 デッドロックすることなくメソッドを実行できます。 ブロック引数付きメソッドのリモートメソッド呼び出しはこの機能があればこそ 実現可能なのです。

ary = DRbObject.new_with_uri(..)
ary.each do |x|
  x.foo()
end

http://www2a.biglobe.ne.jp/%7eseki/ruby/d2yield.jpg

図6.3 相互に呼び合うメソッド呼び出しの様子。

Rubyメソッド呼び出しをそのまま分散環境に拡張したものがdRubyのコンセプトです。 通常のRubyスクリプトのように、オブジェクトが相互に呼び合うような状況でも 正しく動作させるためには、このような実装が必要となりました。

その代わり、dRubyを使用するスクリプトではマルチスレッドに注意を払う必要が でてきました。特にオブジェクトを公開するスクリプトでは、 いつでもメソッドが呼ばれることがある、という点に気を配らなくてはなりません。

6.2 Rubyのマルチスレッド

Rubyには同時に複数の「プログラムの制御の流れ(スレッド)」を扱う機能が 備わっています。 この機能をマルチスレッドと呼びます。

Rubyには、Rubyインタプリタによって実装されたユーザーレベルのマルチスレッドが 用意されています。 OSのスレッド機構ではなく、インタプリタによって実現されているため DOSのようなスレッドを持たないOS上でもマルチスレッドを利用できます。

多くのシステムのスレッドは複数のスレッド間で同じメモリ空間を共有します。 システムは「制御の流れ」をスイッチしますが、 メモリ空間をスイッチする必要がありません。 このためスレッドはオーバーヘッドが少なくしばしば軽量プロセスと呼ばれます。 Rubyのスレッドも多くのシステムと同様にスレッド間で同じメモリ空間を共有します。 それぞれのスレッドが同時に同じオブジェクトを操作することが可能なため、 1スレッドのときには気付かなかった問題が発生することがあります。

マルチスレッドを利用することで ネットワークのアプリケーションやGUIのアプリケーションのような、 複数のイベントを扱うアプリケーションを簡潔に記述できます。

6.2.1 スレッドの操作

Rubyのスクリプトの実行がはじまると、最初にメインスレッドという特別なスレッドが 起動されます。 メインスレッドはこのスクリプトの主処理です。 メインスレッドは一番最初に起動され、 メインスレッドが終了するとスクリプトは終了します。 メインスレッドはスクリプトの中で最も長い時間「生きている」スレッドです。

http://www2a.biglobe.ne.jp/%7eseki/ruby/d2main_thread.jpg

図6.4 起動時のスレッドの様子

RubyのスレッドはThreadクラスにブロックを渡して作成します。 ブロックにはThread.newに与えた引数がパラメータとして渡ります。

thread = Thread.new(1,2,3) do |x, y, z|
  # スレッドにしたい処理
end

ブロックの内容を実行する「プログラムの制御の流れ」が生成されます。 ブロックにパラメータを渡す機能がなぜ必要なのか後で説明します。

http://www2a.biglobe.ne.jp/%7eseki/ruby/d2thread1.jpg

図6.5 二つ目のスレッドを起動

スレッドには実行中、休眠中などの「状態」を持ち、「実行中」状態ではじまります。 IOや他のスレッドとの待ち合わせなど、 なにかの事象を待つことにより休眠中になることがあります。 実行中と休眠中の状態を行き来して、最後には終了状態になります。

終了したスレッドには値があり、最後に評価した値がそのスレッドの値となります。 スレッドが例外によって異常終了した場合、そのスレッドの値を調べようとすると 例外が発生します。

http://www2a.biglobe.ne.jp/%7eseki/ruby/d2thread3.jpg

図6.6 スレッドの状態

スレッドの状態はalive?、status、valueなどのメソッドで調べることができます。

  • alive? --- スレッドが生きているかどうかを真偽値で返します。
  • status --- スレッドの状態を返します。
    • "run" --- 実行中
    • "sleep" --- 休眠中
    • "aborting" --- 終了処理中
    • false --- 正常終了した
    • nil --- 例外により終了した
  • stop? --- スレッドが終了している、あるいは休眠中であるとき、trueを返します。
  • value --- スレッドの終了を待ち、その結果を返します。 スレッドが既に終了していれば、直ちに結果を返します。 例外で終了した場合は例外を再発生させます。 valueは何度でも問い合わせることができ、例外で終了の場合にはそのたびに 例外を再発生します。

valueはスレッドの終了を待ってから値を返しますが、単にスレッドの終了を 待ちたい場合にはjoinを使います。 joinを呼ぶとレシーバのスレッドが終了するまで、呼び側のスレッドはブロック (停止)します。

http://www2a.biglobe.ne.jp/%7eseki/ruby/d2thread2.jpg

図6.7 joinによって待ち合わせる。このシーケンス図中の矩形は実行中であることを示します。

スレッドの実行状態は次のメソッドで制御します。

  • exit --- スレッドを終了させます。
  • wakeup --- スレッドを実行状態にします。
  • run --- スレッドを実行状態にします。ただちにスレッドを切り替えます。
  • raise --- そのスレッドに例外を発生させます。

他のスレッドを休眠状態にするメソッドは用意されていません。 このため、自分のスレッドを休眠させることはできますが、別のスレッドを 休眠状態にすることはできません。 スレッド自らが休眠状態になるには、sleep(やThreadのクラスメソッドstop)を 使います。 sleepを使うと指定した時間(あるいは無限)実行を停止することができます。

スレッドのクラスメソッドを使って、プロセス全体のスレッドの様子を 調べることができます。

  • Thread.list --- 生きているスレッドの一覧を返す。
  • Thread.main --- メインスレッドを返す。
  • Thread.current --- 現在実行中のスレッドを返す。

irbを使って実験しながら見て行きましょう。

スクリプトの起動直後には、メインスレッドただ一つがあるはずです。

% irb --prompt simple
>> Thread.list
=> [#<Thread:0x40.... run>]
>> Thread.list[0] == Thread.main
=> true
>> Thread.current == Thread.main
=> true

sleepで停止しながら数字を0から9まで出力するスレッドを生成します。 このスレッドは繰り返しのたびにsleepするようにします。

>> th = Thread.new { 10.times {|x| sleep; p [Thread.current, x]} }
=> #<Thread:0x... sleep>

thが差しているスレッド(以下th)はsleep中です。 Thread.list、status、alive?、stop?を使ってスレッドの状態を調べてみましょう。

>> Thread.list
=> [#<Thread:0x..... sleep>, #<Thread:0x.... run>]
>> th.status
=> "sleep"
>> th.alive?
=> true
>> th.stop?
=> true

thをwakeupして実行状態にしてみます。

>> th.wakeup
=> #<Thread:0x2.... run>
[#<Thread:0x2.... run>, 0]>> 

thは実行中になり、0を出力しました。 メインのスレッドとthのそれぞれが文字列を印刷するので、図中では プロンプトが乱れています。 thは次の繰り返しで再び休眠しているはずです。

>> th.status
=> "sleep"

thをrunで実行状態にしてみましょう。 runでは直ちにスレッドが切り替わるので、wakeupの場合と 画面の乱れ方が異なるかもしれません。

>> th.run
[#<Thread:0x2..... run>, 1]=> #<Thread:0x2..... run>

>> th.run
[#<Thread:0x2..... run>, 2]=> #<Thread:0x2..... run>

>> th.wakeup
=> #<Thread:0x2..... run>
[#<Thread:0x2..... run>, 3]

今度はthに例外を発生させ、異常終了させてみましょう。

>> th.raise('stop!')
=> nil
>> Thread.list
=> [#<Thread:0x..... run>]
>> th.status
=> nil
>> th.alive?
=> false
>> th.stop?
=> true

異常終了したため、th.statusはnilとなります。 Thread.listは生きているスレッドを返すため、メインスレッドだけを返します。 すでに終了しているためにalive?はfalseを返し、stop?はtrueを返します。

ではth.valueでスレッドの「値」を問い合わせましょう。 th.raiseで発生させた例外(RuntimeError: stop!)が再発生するはずです。

>> th.value
RuntimeError: stop!
        from (irb):7
        from (irb):11:in `value'
        from (irb):11

正常終了した場合にはどうなるでしょう。 sleepせずに0から9まで印字して、最後に文字列'complete'を返すスレッドを 作って実験します。 メインスレッドはスレッド生成後、すぐth.joinを呼びスレッドの終了を待ちます。

>> th = Thread.new { 10.times { |x| p x} ; 'complete' }; th.join
0
1
2
3
4
5
6
7
8
9
=> #<Thread:0x..... dead>
>> th.status
=> false
>> th.alive?
=> false
>> th.stop?
=> true

th.joinはスレッドの終了を待ちますが、すでに終了しているので 直ちに返ってきます。

>> th.join
=> #<Thread:0x2ac4d35c dead>

th.valueはスレッドが最後に評価した値となるので、文字列'complete'となります。

>> th.value
=> "complete"

スレッドの基本的な操作を紹介し、irbで実験してみました。

6.2.2 セーフレベル

セーフレベルは信用できないオブジェクトをどのように扱うか、 外部の資源へのアクセスを許すかどうかを決める値です。 セーフレベルはスレッドローカル変数$SAFEで設定します。 $SAFEはグローバル変数風の名前をしていますが、 スレッドごとに存在する変数です。 $SAFEの値が大きいほど禁止される操作が増え、制限がきびしくなります。 また、セーフレベルによってオブジェクトが汚染されたり、 汚染されていないオブジェクトへの操作が制限されたりします。 セーフレベルと「オブジェクトの汚染」によってRubyのセキュリティモデルは なりたっています。オブジェクトの汚染については後述します。

メインスレッドの$SAFEの初期値は0です。 $SAFEは値を大きくする方向にだけ設定できます。 $SAFE=0の状態から$SAFE=1にすることはできますが、 $SAFE=1の状態から$SAFE=0にすることはできません。

% irb --prompt simple
>> $SAFE=1
=> 1
>> $SAFE=0
SecurityError: tried to downgrade safe level from 1 to 0
        from (irb):2

あるスレッドの$SAFEの値は、そのスレッドから生成したスレッドに引き継がれます。 スレッドを生成し、その$SAFEを調べてみましょう。

>> th = Thread.new { $SAFE }
=> #<Thread:0x2ac6fd58 dead>
>> th.value
=> 1

ある処理を一時的に異なるセーフレベルで実行したい場合にはどうするのでしょう。 スレッドを新たに生成し、その中で$SAFEを設定してから処理を行なうのが手軽です。

% irb --prompt simple
>> $SAFE
=> 0
>> th = Thread.new { $SAFE = 1; $SAFE.to_s }
=> #<Thread:0x2ac90ef4 dead>
>> th.value
=> "1"
>> $SAFE
=> 0

サブスレッドを生成するこの方法でも、$SAFEを小さくすることはできません。

>> $SAFE = 1
=> 1
>> th = Thread.new { $SAFE = 0; $SAFE.to_s }
=> #<Thread:0x2ac74088 dead>
>> th.value
SecurityError: tried to downgrade safe level from 1 to 0
        from (irb):13
        from (irb):14:in `value'
        from (irb):14

スレッドはちょっと気の利いたeval()としても使用できます。 eval()は文字列(またブロック)をスクリプトとして評価するメソッドですが、 外部から得た文字列を評価する際に一時的により厳しいセーフレベルにしたい ことがよく発生します。 eval()する文字列の中で$SAFEを設定するとそのスレッドの$SAFEも 変化してしまいますから期待した動作になりません。 このような場合には、新しいスレッドを生成して$SAFEをした後に eval()を行うとよいでしょう。 また、時間のかかる処理をeval()してしまい、プログラム全体が 止まってしまうのを防ぐこともできます。

6.3 スレッド間の通信

スレッドの生成や操作について見てきました。 複数の資源(ファイル、ネットワーク、GUIなど)を扱う場合や、 時間のかかる処理などでは、スレッドを用いると全体を簡潔に 記述できますが、スレッド同士が協調してオブジェクトをやりとりする 機能がなければ処理の結果などを取り出すことができません。 この節ではスレッド間で通信するためのクラスを紹介します。

6.3.1 排他制御

同時に複数のスレッドが、一つのオブジェクトを操作するとオブジェクトが 「壊れた」状態になる可能性があります。 「壊れた」状態とは、メモリが破壊されるといったことではなく、 例えばインスタンス変数の内容が意図しない状態になっていることです。 複数のスレッドが一度に操作をしないよう、排他的に制御する必要があります。

この節では はじめに基本的な排他制御メカニズムであるThread.exclusiveとMutexを説明し、 より高度なMonitor/MonitorMixinを紹介します。

Thread.exclusive

もっとも単純な排他制御としては、スレッドの切り替えを禁止してしまうという 作戦があります。スレッド切り替えの禁止について見ていきましょう。 threadライブラリに定義されているThread.exclusiveを使うのが簡単です。 他のスレッドに切り替わると困る処理をブロックにしてThread.exclusiveに 与えます。 Thread.exclusiveを使用するにはrequire 'thread'する必要があります。

require 'thread'

Thread.exclusive do
  # 排他制御したい処理
end

Thread.exclusiveによるスレッド切り替えの禁止を使用した例を見てみましょう。 20回くりかえし文字列を出力する処理を複数のスレッドで行ないます。

% irb --prompt simple
>> def foo(name)
>>   20.times do
?>     print name
>>     print ' '
>>   end
>> end
=> nil
>> foo('a')
a a a a a a a a a a a a a a a a a a a a => 20

メソッドfooをマルチスレッドで動かしてみます。

>> def test1
>>   t1 = Thread.new { foo('a') }
>>   t2 = Thread.new { foo('b') }
>>   t1.join
>>   t2.join
>> end
=> nil
>> test1
ab  ab  a ba  ba  ba  ba  ba  ba  ba  ba  ba  ba  ba  ba b  ab  ab  ab  ab  ab  => #<Thread:0x2ac2b9ec dead>

aとb(さらに空白文字)が交じりあって出力されています。 メソッドfooの繰り返しの最中にスレッドt1とt2が切り替わるためです。

Thread.exclusiveを使うように書きなおしてみましょう。

>> def test2
>>   t1 = Thread.new { Thread.exclusive { foo('a') } }
>>   t2 = Thread.new { Thread.exclusive { foo('b') } }
>>   t1.join
>>   t2.join
>> end
=> nil
>> test2
a a a a a a a a a a a a a a a a a a a a b b b b b b b b b b b b b b b b b b b b => #<Thread:0x2ac0df8c dead>

Thread.exclusive { foo('a') }によって、foo('a')実行中のメソッド切り替えが 禁止されるため、途中でbが混じることなくaが連続して出力されます。

Threadのクラスメソッドcriticalとcritical=を使ってスレッドの切り替えの 禁止状態を制御できます。 Thread.critical = trueとするとスレッドの切り替えが禁止されます。 この状態では実行中のスレッドから他のスレッドに切り替わることがありません。 スレッドの切り替えの禁止を解除するにはThread.critical = falseとします。 スレッド切り替えが禁止されているかどうかは、Thread.criticalで 調べることができます。 Thread.criticalによるスレッド切り替えの制御をする場合には、 Thread.critical = trueをする前に、以前の値をメモしておいて排他制御したかった 処理をぬけた後にメモしておいた値に戻すのがイディオムです。 このメモしてまた戻す処理を行うのがThread.exclusiveです。

実は、Rubyインタプリタに組み込みの排他制御の仕組みは、Thread.criticalによる スレッド切り替えの禁止の制御だけです。 MutexやQueueなどのクラスライブラリはThread.criticalを元に作られています。

Mutex

Thread.exclusiveはカレントスレッド以外、 全てのスレッドを停止させることにほかなりません。 これは自然でないし、なんだかもったいない感じがします。 あるオブジェクト/資源にアクセスするスレッドだけを排他制御し、 無関係なスレッドを停止させない方がより実行効率があがると考えられます。

このような制御をRubyではMutexを使って記述することが一般的です。 Mutexはmutual exclusionの略です。 Mutexはロック状態を持ちます。 Mutexをロックできるのはただ一つのスレッドだけ、 ロックを解除(アンロック)してよいのは通常ロックしたスレッドだけです。 アンロックはどのスレッドからでも可能です。*1 ロックしている最中にさらにロックしようとしたスレッドは、 ロックが解除されるまでブロックします。

Mutexのロックはlockメソッド、アンロックはunlockメソッドで行ないます。 一般にlock、unlockメソッドを直接呼び出すことは稀で、 これらをまとめて呼び出すsynchronizeメソッドを使います。

MutexなどRubyのスレッド同期メカニズムもRubyのオブジェクトですから、 dRubyからそのまま利用することが可能です。 irbとdRubyを用いてMutexのロックについて実験してみましょう。

[ターミナル1]
% irb --prompt simple
>> require 'drb/drb'
>> require 'thread'
>> m = Mutex.new
>> DRb.start_service('druby://localhost:12345', m)
>> m.locked?
=> false

[ターミナル2]
% irb --prompt simple
>> require 'drb/drb'
>> DRb.start_service
>> m = DRbObject.new_with_uri('druby://localhost:12345')
>> m.locked?
=> false

準備ができました。 それぞれのirbの変数mはターミナル1のMutexを参照しています。 ターミナル1でロックをした後、ターミナル2でロックを試みます。

[ターミナル1]
>> m.lock
=> #<Mutex:0x2ad9bcc0 @locked=true, @waiting=[]>
>> m.locked?
=> true

[ターミナル2]
>> m.lock

ターミナル2はプロンプトが戻りませんね? ターミナル1でMutexをロックしているため、後から行なったロックが ブロックしているのです。ターミナル1のロックを解除してみましょう。

[ターミナル1]
>> m.unlock
=> #<Mutex:0x2ad9bcc0 @locked=true, @waiting=[]>
>> m.locked?
=> true

unlockの後、すぐにlocked?でロック状態を確認するとtrueになっています。 先ほどターミナル2で行なったロックのブロックが解け、ターミナル2が ロックを獲得したためです。 ターミナル2のプロンプトが表示されたでしょうか?

[ターミナル2]
>> m.lock           # ← 先ほど入力した式
=> #<DRb::DRbObject:0x2ad9bcc0 @ref=nil, @uri="druby://localhost:12345">
>> m.locked?
=> true

ブロックなしでロックを試みるtry_lockメソッドを試してみましょう。 現在はロック状態ですからブロックする代わりにfalseが返るはずです。

[ターミナル1]
>> m.try_lock
=> false

現在ロックを獲得しているのはターミナル2です。ターミナル1からロックを 解除することは可能でしょうか?

[ターミナル1]
>> m.unlock
=> #<Mutex:0x2ad9bcc0 @locked=false, @waiting=[]>
>> m.locked?
=> false

ロックを獲得した者であるかどうかに関わらず、ロックの解除ができました。 これは注意が必要です。 はじめにアンロックしてしまったら、 せっかくのMutexの意味がなくなってしまいます。 ロックとアンロックをペアで用いるためにsynchronizeという便利な メソッドがあります。 synchoronizeはロック−ブロックの実行−アンロックを行なうメソッドです。

次の実験は忙しいので気をつけて下さい。 両方のターミナルに同じ式を入力します。

[ターミナル1]
>> m.synchronize { puts('lock'); sleep(10); puts('unlock') }

[ターミナル2]
>> m.synchronize { puts('lock'); sleep(10); puts('unlock') }

次のような順序で印字されると思います。

(ターミナル1) lock
  10秒後
(ターミナル1) unlock
(ターミナル2) lock
  10秒後
(ターミナル2) unlock

また、ロックされていない状態でunlockするとどうなるのでしょう。

[ターミナル1]

>> m.unlock
=> nil
>> m.locked?
=> false

再びtry_lockしてみましょう。 ロックされていないので、ロックを獲得できtrueが返るはずです。

[ターミナル1]
>> m.try_lock
=> true
>> m.locked?
=> true

排他制御したい資源を管理するクラス/メソッドを書く場合、 資源と対になるようにMutexを用意します。

List 6.1 counter0.rb

# counter0.rb
class Counter
  def initialize
    @mutex = Mutex.new
    @value = 0
  end
  attr_reader :value

  def up
    @mutex.synchronize do
      @value = @value + 1
    end
  end
end

counter0.rbは簡単なカウンターです。 upメソッドで値が一つ大きくなります。

@value = @value + 1

@valueに1を足して@valueに代入していますが、 @valueを取り出してから、加算し、@valueに代入するまでの間に、 別のスレッドが@valueを取り出してしまうと困ったことになります。 以下に「困ったこと」の例を示します。 二つのスレッドがupを呼ぶにも関わらず、1しか値が大きくならない例です。

  1. 初期状態:@valueの値が5。
  2. スレッドAが@valueの値5を取り出す。
  3. スレッドBが@valueの値5を取り出す。
  4. スレッドAが@valueに加算した結果(6)を代入。
  5. スレッドBが@valueに加算した結果(6)を代入。
  6. @valueは6となる。期待した値は7である。

あるスレッドが@valueを操作している最中に、 他のスレッドが@valueを操作しないようにしなくてはなりません。 Mutexを利用して、同時に一つのスレッドだけが@valueを操作するように プログラミングします。

def up
  @mutex.synchronize do
    @value = @value + 1
  end
end

Mutexのsynchronizeメソッドは、次の処理を順次行うメソッドです。

  1. Mutexのロック(lock)
  2. 与えられたブロックの実行(yield)
  3. Mutexのロック解除(unlock)

unlockはensureで行われるので、ブロックが異常終了してもロックの解除は 必ず行われるので安心です。

upメソッドでは、Mutexのロック、ブロック(@value = @value + 1)の実行、 ロック解除を行ないます。

つまり、Mutexのロックを獲得したスレッドだけが@valueを加算するのです。 Mutexのロックを獲得できるのは、同時にはただ一つだけですから、 複数のスレッドが同時に@valueの操作を行なうことはないことが保証できるわけです。

前章で紹介したReminderは、実はマルチスレッドセーフではありません。 もう一度reminder0.rbを見ながら危ういところを探して修正しましょう。

List 6.2 reminder01.rb

# reminder01.rb
class Reminder
  def initialize
    @item = {}
    @serial = 0
  end

  def [](key)
    @item[key]
  end

  def add(str) # @serialのインクリメントが危険(1)
    @serial += 1
    @item[@serial] = str
    @serial
  end

  def delete(key) # 削除されたものも配列にされる(2)
    @item.delete(key)
  end

  def to_a # 削除されたものも配列にされる(2)
    @item.keys.sort.collect do |k|
      [k, @item[k]]
    end
  end
end

addメソッドでは新しい項目のためのキーとして@serialをインクリメントします。 この@serialのインクリメントは先ほどのCounterクラスのupメソッドと同様に マルチスレッドセーフではありません。

また、to_aメソッドで@itemを元に配列を生成しますが、削除された項目を 返す可能性があります。

それぞれを一つのMutexで保護してみましょう。 まず、initializeでMutexを生成し@mutexに保持します。

def initialize
  @mutex = Mutex.new
  @item = {}
  @serial = 0
end

項目を取り出す[](key)メソッドは排他制御の必要がありませんから変更しません。 *2

項目を追加するaddメソッドは、@serialをインクリメントしていく部分が 問題になりそうです。 まずはaddメソッド全体をMutexで保護します。簡単ですね。

def add(str)
  @mutex.synchronize do
    @serial += 1
    @item[@serial] = str
    @serial
  end
end

Mutexで保護する範囲は狭ければ狭いほど、複数のスレッドが同時に実行できる 部分が増えると考えられます。 synchronizeするコードは、必要最小限にする方が効率よく実行されると言えます。 addメソッドのうち本当に保護しなければならなかったのはどこでしょうか? addメソッドは次のような流れです。

  1. 新しい項目のためのキーを生成する。
  2. ハッシュに項目を登録する。
  3. キーを返す。

ハッシュの[]=(key, value)メソッドがマルチスレッドセーフであると仮定すると、 新しいキーの生成部分だけを保護すればよさそうです。 ハッシュへの項目の登録が同時に発生したとしても、 キーの生成が安全であれば同じキーへの操作は発生しないからです。 この作戦で書き直したaddメソッドを以下に示します。 新しいキーの生成はserialメソッドとして分離し、 こちらをMutexで保護するようにしています。 serialメソッドはprivateにするべきかもしれません。

def serial
  @mutex.synchronize do
    @serial += 1
    return @serial
  end
end

def add(str)
  key = serial
  @item[key] = str
  key
end

Mutexがロックされている間にlockを呼ぶと、ロックが解除されるまで lockを呼んだスレッドはブロックします。たとえば、 うっかりserialとaddのそれぞれのメソッドが@mutex.synchronizeを 呼んでしまうとはじめにlockしたスレッドが再びlockを試みるために デッドロックしてしまいます

def serial
  @mutex.synchronize do
    @serial += 1
    return @serial
  end
end

def add(str)
  @mutex.synchronize do
    key = serial
    @item[key] = str
    return key
  end
end

これはデッドロックしてしまう例です。まず、addメソッドのsynchronizeで Mutexのロックを獲得しています。 さらにserialメソッドを呼び出しserialメソッドのsynchronizeで二重に Mutexのロックを獲得しようとするところでデッドロックとなります。

http://www2a.biglobe.ne.jp/%7eseki/ruby/d2mutex0.jpg

図6.8 入れ子のsynchronizeによるデッドロック

Mutexではこの様な二重のロックが発生しないように注意が必要です。 後に紹介するMonitorMixinではロックしているスレッドが再びロックが 可能になっています。

MutexのsynchronizeメソッドなどRubyのスレッド間通信のメカニズムは、 どれもThread.criticalを基礎にRubyスクリプトで実装されていることを 既に説明しました。 serialメソッドはごく簡単な処理しか行いませんが、 この様な単純なメソッドの保護にMutexのsynchronizeを用いても コストは見合うのでしょうか? 単にThread.exclusiveでも良いのではないでしょうか?

def serial
  Thread.exclusive do
    @serial += 1
    return @serial
  end
end

このserialメソッドはThread.exclusiveで書き直したものです。 この版のserialももちろん正しく動作しますし、並列性が下がることもありません。 可読性はどうでしょう?Thread.exclusiveは排他的に実行することを表明している 点では読みやすいと言えますが、Counterクラスの他のメソッドがMutexを用いて 排他制御を実現している中で唐突にThread.exclusiveが登場するのは少しびっくり してしまいます。 筆者はわずかな効率と引き換えにびっくりするコードを増やすよりも、 Counterクラス内で一貫したコードを採用するべきだと考えます。

最後にMutexで保護するように変更したdeleteとto_aメソッドを示します。 この二つのメソッドは全体を保護しています。

def delete(key)
  @mutex.synchronize do
    @item.delete(key)
  end
end

def to_a
  @mutex.synchronize do
    @item.keys.sort.collect do |k|
      [k, @item[k]]
    end
  end
end

Monitor、MonitorMixin

ここで紹介するMonitorMixinは筆者のお気に入りのクラスライブラリです。 MonitorはMutexと同様な排他制御とConditionVariable(状態変数)を 組み合わせた高機能なスレッド間協調のメカニズムです。 単純な排他制御では書きにくいような複雑な同期を記述するのに向きます。 また、 MonitorMixinのsynchronizeはロックの入れ子に対応しており、 状態変数と組み合わせなくとも、便利なMutexとして使用することも可能です。

「便利なMutex」としての使用方法を説明したのちに、 ConditionVariableを含めた使用方法を説明します。

Monitorはrequire 'monitor'で使用可能となります。 任意のオブジェクトにmix-in継承で混ぜ合わせるMonitorMixinと 独立したクラスとしたMonitorクラスが提供されます。

Mutexでもやったように、ReminderをMonitorMixinを使用して マルチスレッドセーフにしてみましょう。 まずライブラリのロードとクラスの定義です。 MonitorMixinを使用するにはrequire 'monitor'が必要です。

require 'monitor'

class Reminder
  include MonitorMixin
  def initialize
    super
    @item = {}
    @serial = 0
  end

MonitorMixinをReminderにインクルードします。 MonitorMixinの初期化を行なうために、initializeメソッドで superを呼びます。

各メソッドにsynchronizeを追加します。 MonitorMixinをインクルードしているのでMutex版の場合のように @mutex.synchronizeではなく、自身のメソッドを呼ぶことになります。

def add(str)
  synchronize do
    @serial += 1
    @item[@serial] = str
    @serial
  end
end

def delete(key)
  synchronize do
    @item.delete(key)
  end
end

def to_a
  synchronize do
    @item.keys.sort.collect do |k|
      [k, @item[k]]
    end
  end
end

MonitorMixinでは入れ子のsynchronizeに対応しており、 Mutex版のときのように次のコードでデッドロックする心配はありません。

def serial
  synchronize do
    @serial += 1
    return @serial
  end
end

def add(str)
  synchronize do
    key = serial
    @item[key] = str
    return key
  end
end

少し乱暴ですが、MonitorMixinのsynchronizeでは入れ子のロックに 対応しているので、マルチスレッドセーフにする必要のあるクラスの 各メソッドを、機械的にsynchronizeで包んでしまうことも可能です。 Javaでいうところのsynchronizedキーワードを機械的につけるような感覚ですね。 初期の段階ではとりあえず、すべてのメソッドをsynchronize化してから、 ボトルネックな部分だけを注意深く調整していく、というのも アプリケーションを作成していく上では良い作戦かもしれません。

Monitorはmon_enter、mon_exitメソッドでロック、アンロックを行ないます。 Monitorではモニターへ入る、モニタから出ると考えるため このようなメソッド名になっていると思われます。

[ターミナル1]
% irb --prompt simple
>> require 'drb/drb'
>> require 'monitor'
>> m = Monitor.new
>> DRb.start_service('druby://localhost:12345', m)
>> m.mon_enter
=> false
>> m.mon_enter
=> false
>> m.mon_exit
=> nil
>> m.mon_exit
=> nil
>> m.mon_exit
ThreadError: current thread not owner
        from /usr/local/lib/ruby/1.8/monitor.rb:249:in `mon_exit'
        from (irb):16

このように同じスレッドからのmon_enterを行なってもロックしません。 また、mon_enterの数よりも多くmon_exitすると例外ThreadErrorとなります。

複数のターミナルを使って、mon_enterを実験してみましょう。 はじめにmon_enterでターミナル1のスレッドがモニタに入ります。

[ターミナル1]
>> m.mon_enter
=> false

この状態でターミナル2からsynchronizeを使ってモニタへの進入を試みます。

[ターミナル2]
% irb --prompt simple
>> require 'drb/drb'
>> DRb.start_service
>> m = DRbObject.new_with_uri('druby://localhost:12345')
>> m.synchronize { puts('hello') }

現在ターミナル1が入っているのでブロックするはずです。 mon_exitでターミナル1のスレッドがモニタから抜けると、 ターミナル2のブロックが実行されるはずです。

[ターミナル1]
>> m.mon_exit
=> nil

[ターミナル2] 続き
>> m.synchronize { puts('hello') }
hello
=> nil
>>

プロンプトが戻りましたか? Monitor/MonitorMixinのsynchronizeでは、 同じスレッドからの進入を許されるますが 他のスレッドの進入はブロックされることがわかります。

ここでRubyとdRubyで動作が異なってしまう例をお見せします。 入れ子になったsynchronizeがdRubyではうまくうごきません。 まず、ローカルのオブジェクトに対する操作です。

[ターミナル1]
>> m.synchronize { m.synchronize { puts('nest') } }
nest
=> nil

Monitorは同じスレッドからの進入を許すので、このように入れ子のsynchronizeも ブロックせずに動作します。

同様な式をターミナル2で評価するとどうなるでしょう。 ターミナル2のmはターミナル1のMonitorの参照ですから、dRubyを経由した リモートのメソッド呼び出しとなります。

[ターミナル2]
>> m.synchronize { m.synchronize { puts('nest') } }

おや?もどってきません。ブロックしてしまいました。 最初のsynchronizeを行ったスレッドと、ブロックの中でsynchronizeを 実行するスレッドが異なるため、ターミナル2ではデッドロックしてしまうのです。

ターミナル1から、もう一度トライします。

[ターミナル1]
>> m.synchronize { m.synchronize { puts('nest') } }

ターミナル2の最初のsynchronizeがまだ完了していないのでモニタに 進入せず停止します。

Ctrl-C、Ctrl-Dを押すなどしてターミナル2のirbを終了させましょう。 irbが終了するとターミナル2のsynchronizeも終了し、 ターミナル1は処理が再開します。

[ターミナル1]続き
>> m.synchronize { m.synchronize { puts('nest') } }
nest
=> nil

Monitorでこのような使い方をすることはありませんが、 dRubyではブロック付きメソッドのメソッド呼び出しをしたスレッドと、 ブロックを実行するスレッドが異なることを示す例として紹介しました。

続いて状態変数の使い方を紹介します。 状態変数(ConditionVariable)はMonitorと組み合わせて使う同期メカニズムです。 Monitorのnew_condというメソッドで生成します。 ConditionVariableは、モニタに入ってから、つまりロックを獲得してから ある条件が満たされるまで一時的にモニタから出る(ロックを返上する) ときに用います。

典型的な使い方ではアプリケーションを二つの役割で構成します。 一つは状態が満たされるまで待つ係、 もう一つは状態の変化を通知する係です。 はじめに状態を満たされるまで待つ疑似コードをみてみましょう。

def foo
  synchronize do
    until some_condition()
      @cond.wait
    end
    do_foo()
  end
end

ロックを獲得し、ある状態(some_condition())が満たされるまで 状態変数@condのwaitメソッドで待ちロックを返上します。 waitメソッドはその状態変数のsignalメソッド、broadcastメソッドが 呼ばれるまでブロックします。 @cond.waitのブロックが解け制御が戻る時、スレッドは再びロックを獲得しています。

until some_condition()
  @cond.wait
end

while some_condition()
  @cond.wait
end

という繰り返しは状態変数でよく使われるイディオムです。 ConditionVariableにはこの繰り返しを行うユーティリティメソッドが 用意されています。 以下、monitor.rbの抜粋です。

class ConditionVariable
  ...
  def wait_while
    while yield
      wait
    end
  end

  def wait_until
    until yield
      wait
    end
  end
end

次のように使用します。

@cond.wait_until { some_condition() }

続いて、状態の変化を通知する側の疑似コードをみてみましょう。

def bar
  synchronize do
    do_bar()
    @cond.broadcast
  end
end

状態変数でwaitしているスレッドを一つ再開(wakeup)するのがsignalメソッド、 waitしている全てのスレッドを再開するのがbroadcastメソッドです。 どちらの場合も再開されたスレッドはロックの獲得を試みます。 ロックが獲得できたスレッドのみ、waitメソッドのブロックが 解け制御が戻ります。 signalやbroadcastが呼ばれても、waitメソッドがいつも完了するわけではなく ロックを獲得したスレッドのwaitだけが完了するのです。 同時にモニタを走行するスレッドはただ一つだけであるという点も忘れてはなりません。

この疑似コードではbroadcastで全てのスレッドを再開しています。 ただ一つのスレッドを再開させるsignalは、本当にただ一つをwakeupすれば 大丈夫とわかっている場合にのみ使用するべきです。たとえば、 wakeupしたスレッドが制御が適切な処理をする前にkillされたとすると デッドロックを誘発する可能性があります。 どちらとも言えない場合にはbroadcastを使う方が安心です。

簡単な同期メカニズムをMonitorを使って記述してみましょう。 ここではランデブーと呼ばれるメッセージ交換のメカニズムを書きます。 次のような仕様とします。

  • ランデブーはsendとrecvの二つの操作を持ちます。
  • 一方のスレッドがsend、もう一方のスレッドがrecvを行います。
  • 先にsendを行うとsendを呼んだスレッドはブロックし、 recvが呼ばれるまで待ちます。
  • 先にrecvを行うとrecvを呼んだスレッドはブロックし、 sendが呼ばれるまで待ちます。
  • sendされている状態でsendが発生すると、どちらもブロックします。 recvが呼ばれるとどちらか一方だけブロックが解除されます。
  • recvされている状態でrecvが発生すると、どちらもブロックします。 sendが呼ばれるとどちらか一方だけブロックが解除されます。

サイズが0の特殊なSizedQueueと考えることができます。

まず、ライブラリのロードとクラスの宣言です。

require 'monitor'

class Rendezvous
  include MonitorMixin
  def initialize
    super
    @arrived_cond = new_cond
    @removed_cond = new_cond
    @box = nil
    @arrived = false
  end

二つの状態変数@arrived_condと@remove_condと到着したメッセージを 格納する@box、メッセージが到着していることを示す@arrivedを準備します。 @arrived_condはメッセージが到着した事を通知する状態変数です。 sendが行われた際に通知します。 @removed_condはメッセージが取り除かれた事を通知する状態変数です。 @boxのデータが受信された際に通知します。

ではsendメソッドを見てみましょう。

def send(obj)
  synchronize do
    while @arrived          # (1)
      @removed_cond.wait    # (1)'条件が満たされない場合、
                            #     waitにより一時的にロックを返上する
    end
    @arrived = true         # (2)
    @box = obj
    @arrived_cond.broadcast # (3)
    @removed_cond.wait      # (4)
  end
end

sendメソッドの流れは次のようになります。

  1. 受信されていないメッセージがなくなるまで待ちます。 これは@arrivedがtrueの間は待つということです。
  2. @arrivedをtrueに変更し、@boxにobjを代入します。
  3. メッセージが到着したことを通知します。
  4. メッセージが受信されるまで待ちます。

説明の通りのスクリプトがそこにありますね。

whileで条件が満たされるのを待つ部分がモニタと状態変数のミソです。 *3 もしモニタがなければ、@arrivedの検査をして条件が揃ったときループを 抜けたとしても、次の瞬間には別のスレッドが@arrivedの状態を 変えてしまうかもしれません。 状態の変化を安全に獲得できる点がモニタと状態変数のうれしい部分です。

次にrecvの流れを追って行きましょう。

def recv
  synchronize do
    until @arrived           # (1)
      @arrived_cond.wait
    end
    @arrived = false         # (2)
    @removed_cond.broadcast  # (3)
    return @box              # (4)
  end
end
  1. メッセージが到着するまで待ちます。 これは@arrivedがtrueになるまで待つということです。
  2. @arrivedをfalseに変更します。
  3. メッセージを受信したことを通知します。
  4. @boxをreturnします。

通知により次のsendが活性化され、@boxを変更してしまいそうですが 実際にはそうなりません。 waitを呼ぶか、synchronizeのブロックを抜けるまで他のスレッドが モニタに進入することはありませんから、(3)通知によって別スレッドが @boxを変更する心配はありません。

最後に完全なRendezvousを載せます。 waitの繰り返しは、wait_until, wait_whileに変形してあります。

List 6.3 rendezvous.rb

# rendezvous.rb
require 'monitor'

class Rendezvous
  include MonitorMixin
  def initialize
    super
    @arrived_cond = new_cond
    @removed_cond = new_cond
    @box = nil
    @arrived = false
  end

  def send(obj)
    synchronize do
      @removed_cond.wait_while { @arrived }
      @arrived = true
      @box = obj
      @arrived_cond.broadcast
      @removed_cond.wait
    end
  end

  def recv
    synchronize do
      @arrived_cond.wait_until { @arrived }
      @arrived = false
      @removed_cond.broadcast
      return @box
    end
  end
end

6.3.2 スレッド間のオブジェクトのやりとり

先ほど説明した排他制御は主に「何かを守る」ための同期メカニズムでした。 ここではスレッド間でオブジェクトをやりとりするため「仲介役」の 同期メカニズムを紹介します。

「仲介役」はスレッドからスレッドへオブジェクトを伝えるのが仕事です。 伝えるべきデータがない場合には受信をブロックしたり、 データを蓄えられない場合には送信をブロックしたりするのが一般的です。

Queue、SizedQueue

Queueはオブジェクトを交換するFIFOバッファを提供します。 Queueはもっとも一般的な仲介役と言えるでしょう。

Queueにデータを入れるにはenqメソッドまたはpushメソッドを用います。 Queueには特に上限は設定されていません。 Queueからデータを取り出すにはdeqメソッドまたはpopメソッドを用います。 Queueのデータが空の状態でdeqを行なうと、そのスレッドはブロックします。 データがenqされるとブロックは解除されます。 データのenq、deqは割り込まれないアトミックな操作で、 一つのデータを複数のスレッドがdeqしてしまうことはありません。

複数のターミナルからirbを用いて、Queueの実験しましょう。 Queueを使用するにはrequire 'thread'をロードします。

[ターミナル1]
% irb --prompt simple -r drb/drb
>> require 'thread'
>> q = Queue.new
>> DRb.start_service('druby://localhost:12345', q)
>> q.enq(1)

Queueを生成して、dRubyで公開します。 Queueのenqメソッドで整数1をQueueに投入します。

[ターミナル2]
% irb --prompt simple -r drb/drb
>> DRb.start_service
>> q = DRbObject.new_with_uri('druby://localhost:12345')
>> q.deq
=>1
>> q.deq

ターミナル2では、公開されたQueueへの参照を作成しdeqを行います。 最初のdeqでは先に1がenqされているのでただちに1が返ります。 二回目のdeqはデータがないのでブロックします。

ターミナル1でもう一度enqするとdeqは解除されます。

[ターミナル1]
>> q.enq(2)

[ターミナル2] 続き
>> q.deq
=> 2

SizedQueueはQueueのサブクラスで、 保持できるオブジェクトの数に上限を設けたQueueです。 Queueでは保持できるオブジェクト数に制限がないので、 enqメソッドがブロックすることはありませんが、 SizedQueueでは上限を越えてenqを呼び出すとブロックします。 deqが行われ、SizedQueueに保持されているオブジェクトが減ると enqのブロックは解除されます。 同様に実験してみましょう。

はじめにターミナル1で長さ4のSizedQueueを生成し、 'druby://localhost:12345'で公開します。

[ターミナル1]
% irb --prompt simple -r drb/drb
>> require 'thread'
>> q = SizedQueue.new(4)
>> DRb.start_service('druby://localhost:12345', q)

ターミナル2で、SizedQueueへの参照を準備します。 いつものようにDRbを初期化して、URI 'druby://localhost:12345'の DRbObjectを作ります。

[ターミナル2]
% irb --prompt simple -r drb/drb
>> DRb.start_service
>> q = DRbObject.new_with_uri('druby://localhost:12345')

ターミナル2からSizedQueueにオブジェクトを4つenqしてみましょう。 最後にlengthメソッドでSizedQueueの長さを調べます。

[ターミナル2]
>> q.enq(1)
>> q.enq(2)
>> q.enq(3)
>> q.enq(4)
>> q.length
=> 4

もう一度投入enqすると、そのenqはブロックされるはずですね。 では今度はターミナル1からenqしてみましょう。

[ターミナル1]
>> q.enq(5)

どうですか?プロンプトが表示されずブロックされているでしょうか? ターミナル2からdeqしてブロックを解除してみましょう。

[ターミナル2]
>> q.deq
=> 1

最初にenqした1が帰ってきました。ターミナル1はブロックが解除されて、 プロンプトが表示されているでしょうか?

[ターミナル1] 続き
>> q.enq(5)
=> nil
>> 

最後に、Queueの場合と同様にdeqでブロックする様子を確かめましょう。 nilが到着するまでループすることにします。

[ターミナル1]
>> loop do
?>   puts "length: #{q.length}"
>>   it = q.deq
>>   puts "deq: #{it}"
>>   break if it.nil?
>> end
length: 4
deq: 2
length: 3
deq: 3
length: 2
deq: 4
length: 1
deq: 5
length: 0

q.lengthが0となった繰り返しのdeqがブロックしました。 ターミナル2でenqしてブロックを解除してみましょう。

[ターミナル2]
>> q.enq(6)

[ターミナル1]続き
length: 0
deq: 6
length: 0

ブロックは解除され次の繰り返しに進み、次の繰り返しのdeqでブロックしました。 ターミナル2からnilをenqして繰り返しを終らせます。

[ターミナル2]
>> q.enq(nil)

[ターミナル1]続き
length: 0
deq: 
=> nil
>> 

Monitorの例で示したRendezvousをSizedQueueで実装してみましょう。 排他制御も「仲介役」も実は表裏一体で一方があればもう一方を表現できます。 SizedQueue版は非常に短くなりました。 SizedQueueの方が抽象度が高いせいもありますが、 この問題に向いていたからでしょう。

List 6.4 rendezvous_q.rb

# rendezvous_q.rb
require 'thread'

class Rendezvous
  def initialize
    super
    @send_queue = SizedQueue.new(1)
    @recv_queue = SizedQueue.new(1)
  end

  def send(obj)
    @send_queue.enq(obj)
    @recv_queue.deq
  end

  def recv
    @send_queue.deq
  ensure
    @recv_queue.enq(nil)
  end
end

スレッド間でオブジェクトをやりとりするメカニズムには他にRinda::TupleSpaceが あります。LindaのタプルスペースをRubyで実装したものがRinda::TupleSpaceです。 Rinda::TupleSpaceの話はまた別の章ですることとします。

これまで実験してきたように、スレッド間の同期メカニズムはそのままdRubyでも 使用可能です。 dRubyであっても、Rubyと同様MonitorやQueueなどを用いて マルチスレッドセーフとするのが一般的です。


*1FIXME
*2果たして本当にそうでしょうか?Hashの[](key)がマルチスレッドセーフであると 信じれば大丈夫そうですが、もし再定義されていたらどうなのでしょう。
*3ifでなくwhileで繰り返すのは、signalでなくbroadcastしているからですね。