Ruby多线程
传统程序有一个单独的线程执行,包含该程序的语句或指令顺序执行直到程序终止。
一个多线程的程序有多个线程的执行。在每个线程是按顺序执行的,但是在多核CPU机器上线程可能并行地执行。例如,通常情况下在单一CPU的机器,多个线程实际上不是并行执行的,而是模拟并行交叉的线程的执行。
Ruby的可以使用 Thread 类很容易地编写多线程程序。 Ruby线程是一个轻量级的和高效的在代码中实现并行性。
创建Ruby线程:
要启动一个新线程,关联一个块通过调用Thread.new。将创建一个新的线程执行的代码块,原始线程将立即从Thread.new返回并继续执行下一个语句:
# Thread #1 is running here Thread.new { # Thread #2 runs this code } # Thread #1 runs this code
例如:
这里是一个例子说明,我们如何能够利用多线程的Ruby的程序。
#!/usr/bin/ruby def func1 i=0 while i<=2 puts "func1 at: #{Time.now}" sleep(2) i=i+1 end end def func2 j=0 while j<=2 puts "func2 at: #{Time.now}" sleep(1) j=j+1 end end puts "Started At #{Time.now}" t1=Thread.new{func1()} t2=Thread.new{func2()} t1.join t2.join puts "End at #{Time.now}"
这将产生以下结果:
Started At Wed May 14 08:21:54 -0700 2008 func1 at: Wed May 14 08:21:54 -0700 2008 func2 at: Wed May 14 08:21:54 -0700 2008 func2 at: Wed May 14 08:21:55 -0700 2008 func1 at: Wed May 14 08:21:56 -0700 2008 func2 at: Wed May 14 08:21:56 -0700 2008 func1 at: Wed May 14 08:21:58 -0700 2008 End at Wed May 14 08:22:00 -0700 2008
线程的生命周期:
创建一个新的线程用 Thread.new。也可以使用了同义词用 Thread.Start 和 Thread.fork。
没有必要启动一个线程在它被创建后,它会自动开始运行时,CPU 资源成为可用。
Thread 类定义了一些方法来查询和处理的线程在运行时。运行一个线程块中的代码调用Thread.new,然后它停止运行。
该块中的最后一个表达式的值是线程的值,可以通过调用 Thread对象值的方法。如果线程运行完成,则该值为线程的返回值。否则,该值方法会阻塞不会返回,直到该线程已完成。
可以等待一个特定的线程通过调用该线程的Thread.Join方法来完成。调用线程将被阻塞,直到给定线程完成。
线程和异常:
如果在主线程中引发一个异常,并没有任何地方处理,Ruby解释器打印一条消息并退出。在主线程以外的其他线程,未处理的异常导致线程停止运行。
如果线程 t 退出,因为未处理的异常,而另一个线程调用t.join或t.value,那么所发生的异常在 t 中提出的线程 s。
如果 Thread.abort_on_exception 为 false,默认情况下,出现未处理的异常只是杀死当前线程和所有其余的继续运行。
如果想在任何线程中的任何未处理的异常导致解释退出中,设置类方法Thread.abort_on_exception 为 true。
t = Thread.new { ... } t.abort_on_exception = true
线程变量:
一个线程可以正常访问是在范围内的任何变量的线程被创建时。一个线程块的局部变量是线程的局部,而不是共享。
Thread类提供一个特殊的功能,允许通过名称来创建和存取线程局部变量。只需把线程对象,如果它是一个Hash,写入元素使用[] =和读取他们带回使用[]。
在这个例子中,每个线程记录计数变量的当前值与该键mycount的一个threadlocal变量。
#!/usr/bin/ruby count = 0 arr = [] 10.times do |i| arr[i] = Thread.new { sleep(rand(0)/10.0) Thread.current["mycount"] = count count += 1 } end arr.each {|t| t.join; print t["mycount"], ", " } puts "count = #{count}"
这将产生下面的结果:
8, 0, 3, 7, 2, 1, 6, 5, 4, 9, count = 10
主线程等待子线程完成,然后打印出每个捕获count的值。
线程优先级:
影响线程调度的第一因素,是线程的优先级:高优先级线程之前计划的低优先级的线程。更确切地说,一个线程将只获得CPU时间,如果没有更高优先级的线程等待运行。
可以设置和查询一个Ruby线程对象的优先级=和优先级的优先级。新创建的线程开始在相同的优先级的线程创建它。启动主线程优先级为0。
没有任何方法设置线程优先级在开始运行前。然而,一个线程可以提高或降低自己的优先级的第一次操作。
线程排斥:
如果两个线程共享访问相同的数据,至少有一个线程修改数据,你必须要特别小心,以确保任何线程都不能看到数据处于不一致的状态。这称为线程排除。
Mutex类是一些共享资源的互斥访问,实现了一个简单的信号锁定。即,只有一个线程可持有的锁在给定时间。其他线程可能选择排队等候的锁变得可用,或者可以简单地选择立即得到错误,表示锁定不可用。
通过将所有访问共享数据的互斥体的控制下,我们确保一致性和原子操作。我们的尝试例子,第一个无需mutax,第二个使用mutax:
无需Mutax的例子:
#!/usr/bin/ruby require 'thread' count1 = count2 = 0 difference = 0 counter = Thread.new do loop do count1 += 1 count2 += 1 end end spy = Thread.new do loop do difference += (count1 - count2).abs end end sleep 1 puts "count1 : #{count1}" puts "count2 : #{count2}" puts "difference : #{difference}"
这将产生以下结果:
count1 : 1583766 count2 : 1583766 difference : 637992
#!/usr/bin/ruby require 'thread' mutex = Mutex.new count1 = count2 = 0 difference = 0 counter = Thread.new do loop do mutex.synchronize do count1 += 1 count2 += 1 end end end spy = Thread.new do loop do mutex.synchronize do difference += (count1 - count2).abs end end end sleep 1 mutex.lock puts "count1 : #{count1}" puts "count2 : #{count2}" puts "difference : #{difference}"
这将产生以下结果:
count1 : 696591 count2 : 696591 difference : 0
处理死锁:
当我们开始使用互斥对象的线程排除,我们必须小心地避免死锁。死锁的情况发生时,所有线程正在等待获取另一个线程持有的资源。因为所有的线程被阻塞,他们不能释放其所持有的锁。因为他们可以不释放锁,其它线程不能获得这些锁。
一个条件变量仅仅是一个信号,与资源相关联,并用于特定互斥锁的保护范围内的。当需要一个资源不可用,等待一个条件变量。这一行动释放相应的互斥锁。当一些其他线程发送信号的资源是可用的,原来的线程来等待,并同时恢复上的锁临界区。
例子:
#!/usr/bin/ruby require 'thread' mutex = Mutex.new cv = ConditionVariable.new a = Thread.new { mutex.synchronize { puts "A: I have critical section, but will wait for cv" cv.wait(mutex) puts "A: I have critical section again! I rule!" } } puts "(Later, back at the ranch...)" b = Thread.new { mutex.synchronize { puts "B: Now I am critical, but am done with cv" cv.signal puts "B: I am still critical, finishing up" } } a.join b.join
这将产生以下结果:
A: I have critical section, but will wait for cv (Later, back at the ranch...) B: Now I am critical, but am done with cv B: I am still critical, finishing up A: I have critical section again! I rule!
线程状态:
有五种可能的返回值对应于下表中所示的5个可能的状态。该的状态方法返回的线程状态。
线程状态 | 返回值 |
---|---|
Runnable | run |
Sleeping | Sleeping |
Aborting | aborting |
Terminated normally | false |
Terminated with exception | nil |
Thread类的方法:
Thread类提供以下方法,它们适用程序的所有线程。这些方法它们使用Thread类的名称来调用,如下所示:
Thread.abort_on_exception = true
这里是所有类方法的完整列表:
SN | 方法描述 |
---|---|
1 |
Thread.abort_on_exception Returns the status of the global abort on exception condition. The default is false. When set to true, will cause all threads to abort (the process will exit(0)) if an exception is raised in any thread. |
2 |
Thread.abort_on_exception= When set to true, all threads will abort if an exception is raised. Returns the new state. |
3 |
Thread.critical Returns the status of the global thread critical condition. |
4 |
Thread.critical= Sets the status of the global thread critical condition and returns it. When set to true, prohibits scheduling of any existing thread. Does not block new threads from being created and run. Certain thread operations (such as stopping or killing a thread, sleeping in the current thread, and raising an exception) may cause a thread to be scheduled even when in a critical section. |
5 |
Thread.current Returns the currently executing thread. |
6 |
Thread.exit Terminates the currently running thread and schedules another thread to be run. If this thread is already marked to be killed, exit returns the Thread. If this is the main thread, or the last thread, exit the process. |
7 |
Thread.fork { block } Synonym for Thread.new . |
8 |
Thread.kill( aThread ) Causes the given aThread to exit |
9 |
Thread.list Returns an array of Thread objects for all threads that are either runnable or stopped. Thread. |
10 |
Thread.main Returns the main thread for the process. |
11 |
Thread.new( [ arg ]* ) {| args | block } Creates a new thread to execute the instructions given in block, and begins running it. Any arguments passed to Thread.new are passed into the block. |
12 |
Thread.pass Invokes the thread scheduler to pass execution to another thread. |
13 |
Thread.start( [ args ]* ) {| args | block } Basically the same as Thread.new . However, if class Thread is subclassed, then calling start in that subclass will not invoke the subclass's initialize method. |
14 |
Thread.stop Stops execution of the current thread, putting it into a sleep state, and schedules execution of another thread. Resets the critical condition to false. |
线程实例方法:
这些方法是适用于一个线程的一个实例。这些方法将被调用,使用一个线程的一个实例如下:
#!/usr/bin/ruby thr = Thread.new do # Calling a class method new puts "In second thread" raise "Raise exception" end thr.join # Calling an instance method join
这里是所有实例方法的完整列表:
SN | 方法及描述 |
---|---|
1 |
thr[ aSymbol ] Attribute Reference - Returns the value of a thread-local variable, using either a symbol or a aSymbol name. If the specified variable does not exist, returns nil. |
2 |
thr[ aSymbol ] = Attribute Assignment - Sets or creates the value of a thread-local variable, using either a symbol or a string. |
3 |
thr.abort_on_exception Returns the status of the abort on exception condition for thr. The default is false. |
4 |
thr.abort_on_exception= When set to true, causes all threads (including the main program) to abort if an exception is raised in thr. The process will effectively exit(0). |
5 |
thr.alive? Returns true if thr is running or sleeping. |
6 |
thr.exit Terminates thr and schedules another thread to be run. If this thread is already marked to be killed, exit returns the Thread. If this is the main thread, or the last thread, exits the process. |
7 |
thr.join The calling thread will suspend execution and run thr. Does not return until thr exits. Any threads not joined will be killed when the main program exits. |
8 |
thr.key? Returns true if the given string (or symbol) exists as a thread-local variable. |
9 |
thr.kill Synonym for Thread.exit . |
10 |
thr.priority Returns the priority of thr. Default is zero; higher-priority threads will run before lower priority threads. |
11 |
thr.priority= Sets the priority of thr to an Integer. Higher-priority threads will run before lower priority threads. |
12 |
thr.raise( anException ) Raises an exception from thr. The caller does not have to be thr. |
13 |
thr.run Wakes up thr, making it eligible for scheduling. If not in a critical section, then invokes the scheduler. |
14 |
thr.safe_level Returns the safe level in effect for thr. |
15 |
thr.status Returns the status of thr: sleep if thr is sleeping or waiting on I/O, run if thr is executing, false if thr terminated normally, and nil if thr terminated with an exception. |
16 |
thr.stop? Returns true if thr is dead or sleeping. |
17 |
thr.value Waits for thr to complete via Thread.join and returns its value. |
18 |
thr.wakeup Marks thr as eligible for scheduling, it may still remain blocked on I/O, however. |