Comment detail

ワーカスレッドを安全に終了させるまで待機 (Nested Flatten)
スレッドプールはつかっていなくて、指定した数だけスレッドをフォークしてそれぞれに仕事をさせています。スレッドの終了を待つのにセマフォを使っています。

実行例:
*Main> :main 5
Theread N.o. 4 filinshed
Theread N.o. 5 filinshed
Theread N.o. 3 filinshed
Theread N.o. 1 filinshed
Theread N.o. 2 filinshed
All threads finished
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import Control.Concurrent
import Control.Concurrent.QSemN
import System.Environment
import System.Random
import Text.Printf

main = do { a:_ <- getArgs
          ; let n = read a
          ; q   <- newQSemN n
          ; g <- getStdGen
          ; let rs = randomRs (5,15) g
          ; mapM_ (forkIO . work q) (zip [1..n] rs)
          ; waitQSemN q n
          ; putStrLn "All threads finished"
          }

work :: QSemN -> (Int, Int) -> IO ()
work q (i,d) =  waitQSemN q 1
             >> threadDelay (d*10^6) 
             >> signalQSemN q 1
             >> printf "Theread N.o. %d filinshed\n" i

マイナス評価をいただいたのを機会に(はじめてじゃないけど...)...

自分ではなかなか自分の書いたプログラムの不具合や不都合,不足に気づけない ので「△△が変だよ」「○○の条件を満たしてないよ」「××がだめじゃん」とか 具体的に指摘していただけると幸いです.

プログラムに不具合があるなら直しておきたいし,改良する手掛かりにもなるし, 今後の研鑽にもなるし,あるいは,後からコードを読む人の注意点にもなるので...

自分で気づいた誤り不具合は直せるのだけど...まだまだ未熟者ゆえ, よろしくお願いします.> みなさま.

マイナスを付けるときに匿名でコメント出来る機能があるといいかも知れませんね。

すみません、マイナスをつけた人です。 そんなに重い意味でつけたのではないのですが・・・

マイナスをつけたのは

「スレッドプールに複数のワーカスレッドが待機しており、メインスレッドはいつでもワーカスレッドに仕事を渡せるような状態になっているとします。

さて、メインスレッドからスレッドプールにいくつか仕事を与え、メインスレッドは与えた仕事すべてが終了するまで待機し、次の処理に行ってはいけない、というようなコードを書いてください。」

というように「スレッドプール」を使うということがお題に明確に書いてあるのに、スレッドプールを使っていないから、という理由でした。スレッドプールにスレッドが待機している、というのもお題に入っているんじゃないかと。そのコード自体がまずいから、という理由ではありません。

ご指摘ありがとうございます.

私の解答には「スレッドプールを使っていない」ことに対する説明(言い訳?)が
ありませんでした.これではいくらなんでも...(気付けよ > 私)

とうわけで改めて,説明(言い訳を(^^;))をばいたします.

Haskellのプログラムには,異る内部状態をもつ同一の計算対象とか,同じ内部
状態をもつ異る計算対象というものが存在しません.計算対象のエクステント
という概念もありません.つまり,計算対象の生成,消滅という概念もありません.

というわけで,Haskellでは「生成されたスレッド」,「タスク実行中のスレッド」,
「アイドル中のスレッド」という計算対象を直接表現できません.そこで,
「再利用するかどうかは問わない」とあるのをこれ幸いに,「再利用しないなら
プールはいらない」とお題を曲解することにして,複数のスレッドの仕事の終了を
待つという部分だけ実装してお茶を濁してしまいました.

Haskellでばかりを使っているので,プログラミングの常識,非常識の感覚が
他の言語のプログラマとずれていたり逆転していたりします.自分の非常識を
確認しておきたいこともありますが,Haskellプログラミングのどんなところを
どんな風にプレゼンテーションしたら楽しんでもらえるか,あるいは,どこが
伝わり難いのかにも大変興味があります.プラスであれマイナスであれ評価が
付いたということは多少とも興味をもってもらったということなので,これ幸い
と「ついでにコメントも下さい」とお願いしてみたしだいです.

お題,回答,評価,コメントというコミュニケーションそのものが楽しめるし,
勉強になるので...

P.S.
私自身は,マイナス評価を付けることはプラス評価を付けるのと同様に気軽に
やればいいと思っていますし,評価を付けるのに正当な理由の説明が必要だとも
思っていません.


逆に、処理系が勝手にネイティブスレッドのプールを作っておいてforkIOした言語上のスレッドを適宜並列実行するってセマンティクスはありなんですよね。そういう実装は無いんでしょうか?

何を以ってスレッドプールと呼ぶかは微妙ですがHaskellでスレッドプールもどきを
を表現してみました.

11〜17行目でそれらしい型名を導入しています.
Worker というのがひとつのワーカスレッドです.これは IO () という型で,
Haskell 的には,アクションとよばれる値を表わす型です.アクションは実行
されると入出力を行うような値です.

26行目でワーカスレッドプールを作成しています.ここでは,n 個のワーカの
リストをforkIOをつかって「実行」されると別スレッドで動くアクションのリ
ストに変換しています.これがこのリストがスレッドプールを表現していると
考えることができます.

ワーカは mkWorkerを使って作ります.ワーカはワーカ番号とワーカの開始と
終了を知るためのセマフォとタスクを受けとる口とで作ります.ワーカ本体は,
59〜67行目までで定義されているとおり,
・口からタスクを読むアクション(60行目)
・タスクの有無で選択(61行目)
  ・タスクない場合は一旦ほかのスレッドに実行を渡すアクション+自分自身(62行目)
  ・タスクがあった場合は(63行目)
      ・タスクの種類で選択(64行目)
          ・終了マークなら,なにもしないアクション(65行目)
          ・仕事なら,セマフォを掴むアクション
                     +その仕事
                     +セマフォを戻すアクション
                     +自分自身
となっています.
タスクをワーカに渡すのにはMVarを経由します.

細部の説明はきりがないのでここでやめますが,注意点をひとつ.
「実行」と「評価」は別ものです.「実行」は Haskell のプログラムの意味
ににあらわれません.do構文のなかにならんでいる行は,命令型の言語でいう
ところの文(statement)とか命令(command)ではなく,式です.それぞれの式は
「評価」するとアクションとよばれる値を表わしています.
Haskell では do 構文(糖衣)を使ってアクションの並びをひとつのアクション
にまとめます.

ここでは,できるだけ命令型の構文や概念に近くなるように書きました.
正直に白状するとHaskell脳症の頭で考えるのは難しかったです.

より具体的な課題に対してはもっと関数的な Haskell らしい書き方というのが
あるような気がしていますが...


*Main> :main 3
Worker 1 starts at 2007-12-28 10:15:29.973076 UTC
Work 1 starts at 2007-12-28 10:15:29.973809 UTC
Worker 2 starts at 2007-12-28 10:15:29.97446 UTC
Work 2 starts at 2007-12-28 10:15:29.975092 UTC
Worker 3 starts at 2007-12-28 10:15:29.975778 UTC
Work 3 starts at 2007-12-28 10:15:29.976253 UTC
Work 3 ends   at 2007-12-28 10:15:38.984953 UTC
Work 2 ends   at 2007-12-28 10:15:41.985189 UTC
Work 1 ends   at 2007-12-28 10:15:43.984072 UTC
Work 4 starts at 2007-12-28 10:15:43.984574 UTC
Work 6 starts at 2007-12-28 10:15:43.984975 UTC
Work 5 starts at 2007-12-28 10:15:43.985371 UTC
Work 4 ends   at 2007-12-28 10:15:50.993701 UTC
Work 6 ends   at 2007-12-28 10:15:53.994784 UTC
Work 5 ends   at 2007-12-28 10:15:54.995536 UTC
Worker 2 ends   at 2007-12-28 10:15:54.996034 UTC
Worker 3 ends   at 2007-12-28 10:15:54.996436 UTC
Worker 1 ends   at 2007-12-28 10:15:54.996838 UTC
Main thread finished.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
module Main (main) where

import Control.Concurrent
import Data.List
import Data.Time
import System.Environment
import System.IO
import System.Random
import Text.Printf

type Done     = ()      -- ワーカ終了合図
type Work     = IO ()   -- 仕事
type WorkID   = Int     -- 仕事番号
type Task     = (QSemN, Either Done Work) -- タスク(1クールセマフォ,仕事)
type Conn     = MVar Task -- ワーカにタスクを渡す口
type Worker   = IO ()     -- ワーカ
type WorkerID = Int       -- ワーカ番号

main :: IO ()
main = do {
; a:_ <- getArgs
; g <- getStdGen
; let { n = read a; rs = map (10^6*) $ randomRs (5,15) g ; cours = mkcours n rs}
; ps <- workerSems  n -- ワーカの状態を知るためのセマフォ
; cs <- connections n -- メインスレッドからワーカスレッドへの仕事を渡すための口
; mapM_ forkIO $ zipWith3 mkWorker [1..n] ps cs -- ワーカスレッドプール作成
; qn <- newQSemN n                           -- ワークの終了を知るためのセマフォ
; deliver qn cs (map Right $ cours !! 0)     -- 一回目の仕事の割り振り
; waitQSemN qn (2*n);                        -- 一クールの終了待ち
; signalQSemN qn n                           -- セマフォのリセット
; deliver qn cs (map Right $ cours !! 1)     -- 二回目の仕事の割り振り
; waitQSemN qn (2*n);                        -- 一クールの終了待ち
; signalQSemN qn n                           -- セマフォのリセット
; deliver qn cs (replicate n $ Left ())      -- ワーカに終了の合図
; mapM_ waitQSem ps                          -- すべてのワーカの終了を待ち
; hPutStrLn stderr "Main thread finished."
}

workerSems :: Int -> IO [QSem]
workerSems n = mapM (const $ newQSem 1) [1..n]

deliverSem :: Int -> IO QSemN
deliverSem = newQSemN

connections :: Int -> IO [Conn]
connections n = mapM (const newEmptyMVar) [1..n]

mkWorker :: WorkerID -> QSem -> Conn -> Worker
mkWorker wid wq conn 
 = do { waitQSem wq
      ; s <- getCurrentTime
      ; hPutStrLn stderr (printf "Worker %d starts at %s" wid (show s))
      ; worker
      ; e <- getCurrentTime
      ; hPutStrLn stderr (printf "Worker %d ends   at %s" wid (show e))
      ; signalQSem wq
      }
 where 
   worker
    = do { mt <- tryTakeMVar conn
         ; case mt of
             Nothing -> yield >> worker
             Just (q,dw)
               -> case dw of
                    Left  ()   -> return ()
                    Right work -> waitQSemN q 1 >> work >> signalQSemN q 2 >> worker
         }

deliver :: QSemN -> [Conn] -> [Either Done Work] -> IO ()
deliver q cs ws = mapM_ (uncurry putMVar) (zip cs (map ((,) q) ws))

slices :: Int -> [a] -> [[a]]
slices n = unfoldr phi
  where phi [] = Nothing
        phi xs = Just $ splitAt n xs

mkcours :: Int -> [Int] -> [[Work]]
mkcours n = slices n . zipWith mkWork [1..]

type MuSec = Int                   -- 遅延microsec(サンプル用)
mkWork :: WorkID -> MuSec -> Work  -- サンプルの仕事作成
mkWork wid musec
 = do { s <- getCurrentTime 
      ; hPutStrLn stderr (printf "Work %d starts at %s" wid (show s))
      ; threadDelay musec
      ; e <- getCurrentTime
      ; hPutStrLn stderr (printf "Work %d ends   at %s" wid (show e))
      }

Index

Feed

Other

Link

Pathtraq

loading...