challenge ワーカスレッドを安全に終了させるまで待機

スレッドプールに複数のワーカスレッドが待機しており、メインスレッドはいつでもワーカスレッドに仕事を渡せるような状態になっているとします。

さて、メインスレッドからスレッドプールにいくつか仕事を与え、メインスレッドは与えた仕事すべてが終了するまで待機し、次の処理に行ってはいけない、というようなコードを書いてください。 #現実に書く機会が多そうなコードですね…。

ここでの仕事の内容は、適当に5秒から15秒の間スレッドをスリープする、というもので結構です。 また、ワーカスレッドのスレッドプール自体の使用を終了するか、または残して再利用するかは問いません。できればコメントにスレッドプールを残したかどうかを書いてください。

Posted feedbacks - Nested

Flatten Hidden
スレッドプールはつかっていなくて、指定した数だけスレッドをフォークしてそれぞれに仕事をさせています。スレッドの終了を待つのにセマフォを使っています。

実行例:
*Main> :main 5
Theread N.o. 4 filinshed
Theread N.o. 5 filinshed
Theread N.o. 3 filinshed
Theread N.o. 1 filinshed
Theread N.o. 2 filinshed
All threads finished
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import Control.Concurrent
import Control.Concurrent.QSemN
import System.Environment
import System.Random
import Text.Printf

main = do { a:_ <- getArgs
          ; let n = read a
          ; q   <- newQSemN n
          ; g <- getStdGen
          ; let rs = randomRs (5,15) g
          ; mapM_ (forkIO . work q) (zip [1..n] rs)
          ; waitQSemN q n
          ; putStrLn "All threads finished"
          }

work :: QSemN -> (Int, Int) -> IO ()
work q (i,d) =  waitQSemN q 1
             >> threadDelay (d*10^6) 
             >> signalQSemN q 1
             >> printf "Theread N.o. %d filinshed\n" i

マイナス評価をいただいたのを機会に(はじめてじゃないけど...)...

自分ではなかなか自分の書いたプログラムの不具合や不都合,不足に気づけない ので「△△が変だよ」「○○の条件を満たしてないよ」「××がだめじゃん」とか 具体的に指摘していただけると幸いです.

プログラムに不具合があるなら直しておきたいし,改良する手掛かりにもなるし, 今後の研鑽にもなるし,あるいは,後からコードを読む人の注意点にもなるので...

自分で気づいた誤り不具合は直せるのだけど...まだまだ未熟者ゆえ, よろしくお願いします.> みなさま.

マイナスを付けるときに匿名でコメント出来る機能があるといいかも知れませんね。

すみません、マイナスをつけた人です。 そんなに重い意味でつけたのではないのですが・・・

マイナスをつけたのは

「スレッドプールに複数のワーカスレッドが待機しており、メインスレッドはいつでもワーカスレッドに仕事を渡せるような状態になっているとします。

さて、メインスレッドからスレッドプールにいくつか仕事を与え、メインスレッドは与えた仕事すべてが終了するまで待機し、次の処理に行ってはいけない、というようなコードを書いてください。」

というように「スレッドプール」を使うということがお題に明確に書いてあるのに、スレッドプールを使っていないから、という理由でした。スレッドプールにスレッドが待機している、というのもお題に入っているんじゃないかと。そのコード自体がまずいから、という理由ではありません。

ご指摘ありがとうございます.

私の解答には「スレッドプールを使っていない」ことに対する説明(言い訳?)が
ありませんでした.これではいくらなんでも...(気付けよ > 私)

とうわけで改めて,説明(言い訳を(^^;))をばいたします.

Haskellのプログラムには,異る内部状態をもつ同一の計算対象とか,同じ内部
状態をもつ異る計算対象というものが存在しません.計算対象のエクステント
という概念もありません.つまり,計算対象の生成,消滅という概念もありません.

というわけで,Haskellでは「生成されたスレッド」,「タスク実行中のスレッド」,
「アイドル中のスレッド」という計算対象を直接表現できません.そこで,
「再利用するかどうかは問わない」とあるのをこれ幸いに,「再利用しないなら
プールはいらない」とお題を曲解することにして,複数のスレッドの仕事の終了を
待つという部分だけ実装してお茶を濁してしまいました.

Haskellでばかりを使っているので,プログラミングの常識,非常識の感覚が
他の言語のプログラマとずれていたり逆転していたりします.自分の非常識を
確認しておきたいこともありますが,Haskellプログラミングのどんなところを
どんな風にプレゼンテーションしたら楽しんでもらえるか,あるいは,どこが
伝わり難いのかにも大変興味があります.プラスであれマイナスであれ評価が
付いたということは多少とも興味をもってもらったということなので,これ幸い
と「ついでにコメントも下さい」とお願いしてみたしだいです.

お題,回答,評価,コメントというコミュニケーションそのものが楽しめるし,
勉強になるので...

P.S.
私自身は,マイナス評価を付けることはプラス評価を付けるのと同様に気軽に
やればいいと思っていますし,評価を付けるのに正当な理由の説明が必要だとも
思っていません.


逆に、処理系が勝手にネイティブスレッドのプールを作っておいてforkIOした言語上のスレッドを適宜並列実行するってセマンティクスはありなんですよね。そういう実装は無いんでしょうか?

何を以ってスレッドプールと呼ぶかは微妙ですがHaskellでスレッドプールもどきを
を表現してみました.

11〜17行目でそれらしい型名を導入しています.
Worker というのがひとつのワーカスレッドです.これは IO () という型で,
Haskell 的には,アクションとよばれる値を表わす型です.アクションは実行
されると入出力を行うような値です.

26行目でワーカスレッドプールを作成しています.ここでは,n 個のワーカの
リストをforkIOをつかって「実行」されると別スレッドで動くアクションのリ
ストに変換しています.これがこのリストがスレッドプールを表現していると
考えることができます.

ワーカは mkWorkerを使って作ります.ワーカはワーカ番号とワーカの開始と
終了を知るためのセマフォとタスクを受けとる口とで作ります.ワーカ本体は,
59〜67行目までで定義されているとおり,
・口からタスクを読むアクション(60行目)
・タスクの有無で選択(61行目)
  ・タスクない場合は一旦ほかのスレッドに実行を渡すアクション+自分自身(62行目)
  ・タスクがあった場合は(63行目)
      ・タスクの種類で選択(64行目)
          ・終了マークなら,なにもしないアクション(65行目)
          ・仕事なら,セマフォを掴むアクション
                     +その仕事
                     +セマフォを戻すアクション
                     +自分自身
となっています.
タスクをワーカに渡すのにはMVarを経由します.

細部の説明はきりがないのでここでやめますが,注意点をひとつ.
「実行」と「評価」は別ものです.「実行」は Haskell のプログラムの意味
ににあらわれません.do構文のなかにならんでいる行は,命令型の言語でいう
ところの文(statement)とか命令(command)ではなく,式です.それぞれの式は
「評価」するとアクションとよばれる値を表わしています.
Haskell では do 構文(糖衣)を使ってアクションの並びをひとつのアクション
にまとめます.

ここでは,できるだけ命令型の構文や概念に近くなるように書きました.
正直に白状するとHaskell脳症の頭で考えるのは難しかったです.

より具体的な課題に対してはもっと関数的な Haskell らしい書き方というのが
あるような気がしていますが...


*Main> :main 3
Worker 1 starts at 2007-12-28 10:15:29.973076 UTC
Work 1 starts at 2007-12-28 10:15:29.973809 UTC
Worker 2 starts at 2007-12-28 10:15:29.97446 UTC
Work 2 starts at 2007-12-28 10:15:29.975092 UTC
Worker 3 starts at 2007-12-28 10:15:29.975778 UTC
Work 3 starts at 2007-12-28 10:15:29.976253 UTC
Work 3 ends   at 2007-12-28 10:15:38.984953 UTC
Work 2 ends   at 2007-12-28 10:15:41.985189 UTC
Work 1 ends   at 2007-12-28 10:15:43.984072 UTC
Work 4 starts at 2007-12-28 10:15:43.984574 UTC
Work 6 starts at 2007-12-28 10:15:43.984975 UTC
Work 5 starts at 2007-12-28 10:15:43.985371 UTC
Work 4 ends   at 2007-12-28 10:15:50.993701 UTC
Work 6 ends   at 2007-12-28 10:15:53.994784 UTC
Work 5 ends   at 2007-12-28 10:15:54.995536 UTC
Worker 2 ends   at 2007-12-28 10:15:54.996034 UTC
Worker 3 ends   at 2007-12-28 10:15:54.996436 UTC
Worker 1 ends   at 2007-12-28 10:15:54.996838 UTC
Main thread finished.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
module Main (main) where

import Control.Concurrent
import Data.List
import Data.Time
import System.Environment
import System.IO
import System.Random
import Text.Printf

type Done     = ()      -- ワーカ終了合図
type Work     = IO ()   -- 仕事
type WorkID   = Int     -- 仕事番号
type Task     = (QSemN, Either Done Work) -- タスク(1クールセマフォ,仕事)
type Conn     = MVar Task -- ワーカにタスクを渡す口
type Worker   = IO ()     -- ワーカ
type WorkerID = Int       -- ワーカ番号

main :: IO ()
main = do {
; a:_ <- getArgs
; g <- getStdGen
; let { n = read a; rs = map (10^6*) $ randomRs (5,15) g ; cours = mkcours n rs}
; ps <- workerSems  n -- ワーカの状態を知るためのセマフォ
; cs <- connections n -- メインスレッドからワーカスレッドへの仕事を渡すための口
; mapM_ forkIO $ zipWith3 mkWorker [1..n] ps cs -- ワーカスレッドプール作成
; qn <- newQSemN n                           -- ワークの終了を知るためのセマフォ
; deliver qn cs (map Right $ cours !! 0)     -- 一回目の仕事の割り振り
; waitQSemN qn (2*n);                        -- 一クールの終了待ち
; signalQSemN qn n                           -- セマフォのリセット
; deliver qn cs (map Right $ cours !! 1)     -- 二回目の仕事の割り振り
; waitQSemN qn (2*n);                        -- 一クールの終了待ち
; signalQSemN qn n                           -- セマフォのリセット
; deliver qn cs (replicate n $ Left ())      -- ワーカに終了の合図
; mapM_ waitQSem ps                          -- すべてのワーカの終了を待ち
; hPutStrLn stderr "Main thread finished."
}

workerSems :: Int -> IO [QSem]
workerSems n = mapM (const $ newQSem 1) [1..n]

deliverSem :: Int -> IO QSemN
deliverSem = newQSemN

connections :: Int -> IO [Conn]
connections n = mapM (const newEmptyMVar) [1..n]

mkWorker :: WorkerID -> QSem -> Conn -> Worker
mkWorker wid wq conn 
 = do { waitQSem wq
      ; s <- getCurrentTime
      ; hPutStrLn stderr (printf "Worker %d starts at %s" wid (show s))
      ; worker
      ; e <- getCurrentTime
      ; hPutStrLn stderr (printf "Worker %d ends   at %s" wid (show e))
      ; signalQSem wq
      }
 where 
   worker
    = do { mt <- tryTakeMVar conn
         ; case mt of
             Nothing -> yield >> worker
             Just (q,dw)
               -> case dw of
                    Left  ()   -> return ()
                    Right work -> waitQSemN q 1 >> work >> signalQSemN q 2 >> worker
         }

deliver :: QSemN -> [Conn] -> [Either Done Work] -> IO ()
deliver q cs ws = mapM_ (uncurry putMVar) (zip cs (map ((,) q) ws))

slices :: Int -> [a] -> [[a]]
slices n = unfoldr phi
  where phi [] = Nothing
        phi xs = Just $ splitAt n xs

mkcours :: Int -> [Int] -> [[Work]]
mkcours n = slices n . zipWith mkWork [1..]

type MuSec = Int                   -- 遅延microsec(サンプル用)
mkWork :: WorkID -> MuSec -> Work  -- サンプルの仕事作成
mkWork wid musec
 = do { s <- getCurrentTime 
      ; hPutStrLn stderr (printf "Work %d starts at %s" wid (show s))
      ; threadDelay musec
      ; e <- getCurrentTime
      ; hPutStrLn stderr (printf "Work %d ends   at %s" wid (show e))
      }
workerThreadはexecを利用しているので、任意のpythonコードを実行できます。
threadPoolは引数を変更することで、workerThreadの数を変更できます。

現在はthreadPoolがworkerThreadを監視して、タスクが空のやつにタスクを渡していますが、
workerThreadはタスクをリストで持っているので、実はいくらでもタスクを突っ込めます。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import threading
import time

class workerThread(threading.Thread):
    def __init__(self):
        self.l = threading.Lock()
        self.workque = []
        threading.Thread.__init__(self)
    
    def run(self):
        while(True):
            self.l.acquire()
            if len(self.workque) == 0:
                self.l.release()
                time.sleep(0.001)
                continue
            
            if self.workque[0] == "exit":
                self.l.release()
                break
            else:
                w = self.workque[0]
                self.workque = self.workque[1:]
                self.l.release()
                exec(w)
                    
    def appendTask(self, task):
        self.l.acquire()
        self.workque.append(task)
        self.l.release()

class threadPool(threading.Thread):
    def __init__(self, n):
        self.l = threading.Lock()
        self.task = []
        self.w = []
        self.isExit = False
        for i in xrange(n):
            self.w.append(workerThread())
            self.w[-1].start()
        threading.Thread.__init__(self)


    def run(self):
        while(True):
            self.l.acquire()
            
            if len(self.task) == 0:
                self.l.release()
                if self.isExit:
                    break
                time.sleep(0.001)
                continue

            isAppend = False
            for w in self.w:
                if len(w.workque) == 0:
                    isAppend = True
                    w.appendTask(self.task[0])
                    self.task = self.task[1:]
                    self.l.release()
                    break

            if isAppend == False:
                self.l.release()
                time.sleep(0.001)

        for w in self.w:
            w.appendTask("exit")
        for w in self.w:
            w.join()

    def appendTask(self, task):
        self.l.acquire()
        self.task.append(task)
        self.l.release()
        
    def exit(self):
        self.l.acquire()
        self.isExit = True
        self.l.release()

tp = threadPool(2)
tp.start()

tp.appendTask("print 'hello world'")
time.sleep(1)
tp.appendTask("print 'sleeping...'\ntime.sleep(10)\nprint 'wake up!'")
tp.appendTask("print 'sleeping...'\ntime.sleep(10)\nprint 'wake up!'")

time.sleep(1)
tp.exit()
tp.join()
C# では Delegate.BeginInvoke を使うとスレッドプール内のスレッドが使われるので、今回はそれを使ってみました。今回は複数タスクを待機するので、AsyncWaitHandle の配列を WaitHandle.WaitAll メソッドに渡しています。いろんな意味であまり好ましくない実装ですが……。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
using System;
using System.Collections.Generic;
using System.Threading;
static class Program {
    static void Main() {
        Console.WriteLine("start.");
        WaitCallback func = delegate(object obj) {   // 実行されるタスク
            Console.WriteLine("start: {0}", obj);
            Thread.Sleep(new Random().Next(5, 10) * 1000);
            Console.WriteLine("finish: {0}", obj);
        };
        List<WaitHandle> waitHandles = new List<WaitHandle>();
        for(int i = 0; i < 10; i++) {
            waitHandles.Add(func.BeginInvoke(i, null, null).AsyncWaitHandle);
        }
        WaitHandle.WaitAll(waitHandles.ToArray());   // メインスレッドはここで待機
        Console.WriteLine("join.");
    }
}

Javaのconcurrentユーティリティを使用。スレッドプールはpool.shutdown();で終了します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.util.*;
import java.util.concurrent.*;

class MeApp {
    public static void main(String[] args) throws Exception {
        final ExecutorService pool = Executors.newFixedThreadPool(3);
        try {
            final ArrayList<Future<?> > futures = new ArrayList<Future<?> >();
            final Random random = new Random();
            for (int i = 0; i < 10; ++i) {
                final int id = i;
                final int wait = 5 + random.nextInt(10);
                futures.add(pool.submit(new Runnable() {
                    public void run() {
                        System.out.format("task %d start (%2d sec)\n", id, wait);
                        try {
                            Thread.sleep(wait * 1000);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                        System.out.format("task %d done\n", id);
                    }
                }));
            }
            for (Future<?> future : futures) {
                future.get(); // wait for task completion
            }
        } finally {
            pool.shutdown();
        }
        System.out.println("completed");
    }
}

oceanさん作バージョンはFuture#get()を用いて各スレッドの終了を待つような実装でした。 もうひとつの解として、ExecutorService#awaitTermination()を用いて、 スレッドプールの各タスクがすべて安全に終了するまでを待つような実装も投稿します。

(今回のプログラムにてExecutorService#submit() を execute()に変えたのは、Futureを使わないことを明示する以外に特に意味はありません。)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class MeApp2 {
    public static void main(String[] args) throws Exception {
        final ExecutorService pool = Executors.newFixedThreadPool(3);
        try {
            final Random random = new Random();
            for (int i = 0; i < 10; ++i) {
                final int id = i;
                final int wait = 5 + random.nextInt(10);
                pool.execute(new Runnable() {
                    public void run() {
                        System.out.format("task %d start (%2d sec)\n", id, wait);
                        try {
                            Thread.sleep(wait * 1000);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                        System.out.format("task %d done\n", id);
                    }
                });
            }
        } finally {
            pool.shutdown();
            //ここで全タスクの終了を待つ
            pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
        }
        System.out.println("completed");
    }
}

Queue.Queueを使って排他制御します。Javaのconcurrentに似てますが、Javaのと違ってThreadPool#shutdownがスレッドの終了を待ちます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import threading
import random
import Queue
import time

class ThreadPool:
    def __init__(self, count):
        self._queue = Queue.Queue(0) # infinite sized queue
        self._threads = [threading.Thread(target=self._run) for _ in xrange(count)]
        for thread in self._threads:
            thread.start()

    def _run(self):
        while 1:
            func = self._queue.get()
            if func is None:
                break
            func()

    def submit(self, func):
        if func is None:
            raise ValueError("None is not allowed here")
        self._queue.put(func)

    def shutdown(self):
        for thread in self._threads:
            self._queue.put(None) # terminator
        for thread in self._threads:
            thread.join()

def create(id, wait):
    def func():
        print "task %d started (%2d sec)" % (id, wait)
        time.sleep(wait)
        print "task %d done" % id
    return func

def main():
    pool = ThreadPool(3)
    try:
        for id in xrange(10):
            pool.submit(create(id, random.randint(5, 15)))
    finally:
        pool.shutdown()
    print "completed"

if __name__ == '__main__':
    main()
自分が自前で実装した機能のほとんどがPythonの標準でできたのか。 自分が今書いているコードの参考にさせてもらいます。

threadingモジュールにないし発見しにくいんですよね > Queue.Queue

Squeak Smalltalk のスレッドは、Erlang にこそ及びませんが、それでも非常に軽量な部類に属するので、アドホックに作って(#fork)使い捨てて使います(プールして大事に使い回す、という処理も書いて書けなくもないですが…)。

他方で、待ち合わせにはセマフォ(a Semaphore)を用いるのが普通ですが、今回は簡単のため、スレッドセーフなキュー(a SharedQueue)をもって各ワーカスレッドからの出力とそれとの待ち合わせを兼ねることにしました。ばらばらのタイミングでフォークしたワーカスレッドが 5〜15 秒の停止時間順にソートされて終了しているところで動きは見て取れると思います。

例によって copy fixTemps は、Smalltlak 処理系としては古典的な Squeak Smalltalk で、ブロック(無名関数オブジェクト)をクロージャっぽく使うためのおまじないです。今どきの Smalltalk では必要ありません。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
| out nThreads |
out := SharedQueue new.
nThreads := 5.
nThreads timesRepeat: [
    [   | secs |
        out nextPut: {#start. (secs :=  (5 to: 15) atRandom)}.
        (Delay forSeconds: secs) wait.
        out nextPut: {#stop. secs}.
    ] copy fixTemps fork].
World findATranscript: nil.
nThreads * 2 timesRepeat: [Transcript cr; show: out next]

"=> #(#start 8)
    #(#start 15)
    #(#start 14)
    #(#start 6)
    #(#start 10)
    #(#stop 6)
    #(#stop 8)
    #(#stop 10)
    #(#stop 14)
    #(#stop 15) "

GNU Smalltalk に訳してみました。シンプルなのがいいですね。

1
2
3
4
5
6
7
8
9
| out | out := SharedQueue new.
((Smalltalk arguments at: 1 ifAbsent: [ ^'usage: gst 116.st -a <nThread>' displayNl ])
 asNumber timesRepeat: [
  [| secs |
    out nextPut: { #start. secs := Random next * 11 // 1 + 5 }.
    (Delay forSeconds: secs) wait.
    out nextPut: { #stop.  secs }
  ] fork
]) * 2 timesRepeat: [ out next printNl ]!

さらに Rhino で。

1
2
3
4
5
6
7
8
9
(function doukaku116(nThread){
  var out = new java.util.concurrent.ArrayBlockingQueue(nThread * 2);
  for(var i = nThread; i--;) spawn(function(secs){
    out.put('#start '+ (secs = Math.random() * 11 + 5 | 0));
    java.lang.Thread.sleep(secs * 1000);
    out.put('#stop  '+ secs);
  });
  for(i = nThread * 2; i--;) print(out.take());
})(5);

Perlでthreadsを使うためには、-DusethreadsをつけてConfigureする必要がありますが、いちおうそうなっているという前提で。

本質的なのは、

1 while threads->list(threads::running);

だけです。threadsを再利用するか否かは関係なく使えます。

どちらかというと、threadsは使い捨てにした方がコードは書きやすいですね。

Dan the Threaded Perl Monger

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#!/usr/local/bin/perl
use strict;
use warnings;
use threads;
use Time::Hires qw/sleep/;

my @thr;
for my $i ( 1 .. shift || 10 ) {
    push @thr, threads->create(
        sub {
            my $n = shift;
            my $s = rand(10.0);
            sleep $s;
            warn "$n:slept $s sec.";
        },
        $i
    );
}

1 while threads->list(threads::running);
warn "All Threads Done";

$thr[0]->join();    # on purpose

END {
    $_->is_joinable() and $_->join() for @thr;
}
Common Lispの標準的なスレッドの扱いというものを良く分かっていないのですが、
スレッドプールを使ってスレッドを使い回すようなライブラリは探してみたものの
自分には見付けられませんでした。ということで、Portable-Threadsを利用して、
サブの全スレッドの終了をみとって次の仕事へ…というような内容になっています。
識者のツッコミ大歓迎です。
実行結果:
(spawn-thread 'main #'main
	      (lambda () (subjob (gensym)))
	      (lambda () (subjob (gensym)))
	      (lambda () (subjob (gensym)))
	      (lambda () (subjob (gensym))) )
; =>
;G3233(14 sec.) Start.
;G3236( 6 sec.) Start.
;G3238(12 sec.) Start.
;G3239( 7 sec.) Start.
;G3236( 6 sec.) Stop.
;G3239( 7 sec.) Stop.
;G3238(12 sec.) Stop.
;G3233(14 sec.) Stop.
;All threads finished.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
(defpackage #:doukaku-116 (:use #:cl #:portable-threads))
(in-package #:doukaku-116)

(defun subjob (name)
  (let ((sleep (+ 5 (random 10))))
    (format t "~&~A(~2D sec.) Start.~%" name sleep)
    (sleep sleep)
    (format t "~&~A(~2D sec.) Stop.~%" name sleep)))

(defun main (&rest jobs)
  (do ((ths (mapcar (lambda (exe) (spawn-thread (gensym) exe)) 
                    jobs)))
      ((notany #'thread-alive-p ths)
       (format t "~&All threads finished.~%"))))

「標準的」というのは私も知りませんです。Allegro CLの場合はmultiprocessingパッケージというのがついてくるので普段はそれを使っています (名前はprocessですが実態はネイティブスレッド)。

スレッドプールについても標準的なものがあるのかどうかは知りません。昔、Allegroのmultiprocessingの上に書いたスレッドプールパッケージがあるんですが、ここにそのまま出せない事情があるので、もし時間が取れれば簡単なバージョンを書いてポストします。 (ほんとはコードを書いてポストしようと思ってたんだけど、時間がきつきつなので…)

コメント頂きありがとうございます。とても参考になりました。自分もAllegro CL試してみたいと思います。 スレッドプールのコードについては、興味津々です。いつかお手隙の際にでもポストして頂けたらと思います。

自前でスレッドプールを実装しました。

val pool = new ThreadPool(4)
pool.submit(() =>{
  Thread.sleep(1000)
  println("10")
})
pool.submit(() =>{
  Thread.sleep(2000)
  println("20")
})
pool.submit(() =>{
  Thread.sleep(5000)
  println("50")
})
pool.submit(() =>{
  Thread.sleep(500)
  println("5")
})
pool.shutdown
println("done")

スレッドプールは残しています。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import scala.collection.mutable.SynchronizedQueue

implicit def f2r(f:() => unit) = new Runnable{ def run = f() }
class ThreadPool(num:int) {
  type Proc = ()=>unit
  val _queue = new SynchronizedQueue[Option[Proc]]
  val _cs = (1 to num).map{ i => new Thread(() => {
             var flag = true; while(flag){
               var proc:Option[Proc] = Some(()=>{})
               _queue.synchronized {
                 if(!_queue.isEmpty) proc = _queue.dequeue
               }
               proc match {
                 case Some(f) => f()
                 case _ => flag = false
               }
             }
           })}.toList.map{c => c.start;c}

  def submit(f :Proc) = _queue += Some(f)
  def shutdown = {
    _cs.foreach{c => _queue += None }
    _cs.foreach(_.join)
  }
}
題意から
・スレッドプールに複数の仕事を与え
・全ての仕事が終わるまで待機する
ことができれば良い事になります。

これは、ExecutorService#invokeAll を使えば1メソッドで実現できます。

この例では意図的に仕事の数よりスレッドの数を少なく設定しています。

#スレッドプールは再利用可能です。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.concurrent.*;
import java.util.*;

public class Sample implements Callable<Void> {
    public static final int NUM_OF_THREADS = 10;
    private static ExecutorService es = Executors.newFixedThreadPool(NUM_OF_THREADS);
    private int sleepTime;

    public static void main(String[] args) throws InterruptedException {
        try {
            List<Callable<Void>> taskList = new ArrayList<Callable<Void>>();
            Random r = new Random();
            for (int i = 0; i < 15; i++) {
                taskList.add(new Sample(5 + r.nextInt(11)));
            }
            es.invokeAll(taskList);
            System.out.println("done.");
        } finally {
            es.shutdown();
        }
    }

    public Sample(int sleepTime) {
        this.sleepTime = sleepTime;
    }

    public Void call() throws Exception {
        Thread.currentThread().sleep(1000 * sleepTime);
        System.out.println