Comment detail

ワーカスレッドを安全に終了させるまで待機 (Nested Flatten)

This comment is reply for 4986 shiro: 逆に、処理系が勝手にネイティブスレッドの...(ワーカスレッドを安全に終了させるまで待機). Go to thread root.

何を以ってスレッドプールと呼ぶかは微妙ですがHaskellでスレッドプールもどきを
を表現してみました.

11〜17行目でそれらしい型名を導入しています.
Worker というのがひとつのワーカスレッドです.これは IO () という型で,
Haskell 的には,アクションとよばれる値を表わす型です.アクションは実行
されると入出力を行うような値です.

26行目でワーカスレッドプールを作成しています.ここでは,n 個のワーカの
リストをforkIOをつかって「実行」されると別スレッドで動くアクションのリ
ストに変換しています.これがこのリストがスレッドプールを表現していると
考えることができます.

ワーカは mkWorkerを使って作ります.ワーカはワーカ番号とワーカの開始と
終了を知るためのセマフォとタスクを受けとる口とで作ります.ワーカ本体は,
59〜67行目までで定義されているとおり,
・口からタスクを読むアクション(60行目)
・タスクの有無で選択(61行目)
  ・タスクない場合は一旦ほかのスレッドに実行を渡すアクション+自分自身(62行目)
  ・タスクがあった場合は(63行目)
      ・タスクの種類で選択(64行目)
          ・終了マークなら,なにもしないアクション(65行目)
          ・仕事なら,セマフォを掴むアクション
                     +その仕事
                     +セマフォを戻すアクション
                     +自分自身
となっています.
タスクをワーカに渡すのにはMVarを経由します.

細部の説明はきりがないのでここでやめますが,注意点をひとつ.
「実行」と「評価」は別ものです.「実行」は Haskell のプログラムの意味
ににあらわれません.do構文のなかにならんでいる行は,命令型の言語でいう
ところの文(statement)とか命令(command)ではなく,式です.それぞれの式は
「評価」するとアクションとよばれる値を表わしています.
Haskell では do 構文(糖衣)を使ってアクションの並びをひとつのアクション
にまとめます.

ここでは,できるだけ命令型の構文や概念に近くなるように書きました.
正直に白状するとHaskell脳症の頭で考えるのは難しかったです.

より具体的な課題に対してはもっと関数的な Haskell らしい書き方というのが
あるような気がしていますが...


*Main> :main 3
Worker 1 starts at 2007-12-28 10:15:29.973076 UTC
Work 1 starts at 2007-12-28 10:15:29.973809 UTC
Worker 2 starts at 2007-12-28 10:15:29.97446 UTC
Work 2 starts at 2007-12-28 10:15:29.975092 UTC
Worker 3 starts at 2007-12-28 10:15:29.975778 UTC
Work 3 starts at 2007-12-28 10:15:29.976253 UTC
Work 3 ends   at 2007-12-28 10:15:38.984953 UTC
Work 2 ends   at 2007-12-28 10:15:41.985189 UTC
Work 1 ends   at 2007-12-28 10:15:43.984072 UTC
Work 4 starts at 2007-12-28 10:15:43.984574 UTC
Work 6 starts at 2007-12-28 10:15:43.984975 UTC
Work 5 starts at 2007-12-28 10:15:43.985371 UTC
Work 4 ends   at 2007-12-28 10:15:50.993701 UTC
Work 6 ends   at 2007-12-28 10:15:53.994784 UTC
Work 5 ends   at 2007-12-28 10:15:54.995536 UTC
Worker 2 ends   at 2007-12-28 10:15:54.996034 UTC
Worker 3 ends   at 2007-12-28 10:15:54.996436 UTC
Worker 1 ends   at 2007-12-28 10:15:54.996838 UTC
Main thread finished.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
module Main (main) where

import Control.Concurrent
import Data.List
import Data.Time
import System.Environment
import System.IO
import System.Random
import Text.Printf

type Done     = ()      -- ワーカ終了合図
type Work     = IO ()   -- 仕事
type WorkID   = Int     -- 仕事番号
type Task     = (QSemN, Either Done Work) -- タスク(1クールセマフォ,仕事)
type Conn     = MVar Task -- ワーカにタスクを渡す口
type Worker   = IO ()     -- ワーカ
type WorkerID = Int       -- ワーカ番号

main :: IO ()
main = do {
; a:_ <- getArgs
; g <- getStdGen
; let { n = read a; rs = map (10^6*) $ randomRs (5,15) g ; cours = mkcours n rs}
; ps <- workerSems  n -- ワーカの状態を知るためのセマフォ
; cs <- connections n -- メインスレッドからワーカスレッドへの仕事を渡すための口
; mapM_ forkIO $ zipWith3 mkWorker [1..n] ps cs -- ワーカスレッドプール作成
; qn <- newQSemN n                           -- ワークの終了を知るためのセマフォ
; deliver qn cs (map Right $ cours !! 0)     -- 一回目の仕事の割り振り
; waitQSemN qn (2*n);                        -- 一クールの終了待ち
; signalQSemN qn n                           -- セマフォのリセット
; deliver qn cs (map Right $ cours !! 1)     -- 二回目の仕事の割り振り
; waitQSemN qn (2*n);                        -- 一クールの終了待ち
; signalQSemN qn n                           -- セマフォのリセット
; deliver qn cs (replicate n $ Left ())      -- ワーカに終了の合図
; mapM_ waitQSem ps                          -- すべてのワーカの終了を待ち
; hPutStrLn stderr "Main thread finished."
}

workerSems :: Int -> IO [QSem]
workerSems n = mapM (const $ newQSem 1) [1..n]

deliverSem :: Int -> IO QSemN
deliverSem = newQSemN

connections :: Int -> IO [Conn]
connections n = mapM (const newEmptyMVar) [1..n]

mkWorker :: WorkerID -> QSem -> Conn -> Worker
mkWorker wid wq conn 
 = do { waitQSem wq
      ; s <- getCurrentTime
      ; hPutStrLn stderr (printf "Worker %d starts at %s" wid (show s))
      ; worker
      ; e <- getCurrentTime
      ; hPutStrLn stderr (printf "Worker %d ends   at %s" wid (show e))
      ; signalQSem wq
      }
 where 
   worker
    = do { mt <- tryTakeMVar conn
         ; case mt of
             Nothing -> yield >> worker
             Just (q,dw)
               -> case dw of
                    Left  ()   -> return ()
                    Right work -> waitQSemN q 1 >> work >> signalQSemN q 2 >> worker
         }

deliver :: QSemN -> [Conn] -> [Either Done Work] -> IO ()
deliver q cs ws = mapM_ (uncurry putMVar) (zip cs (map ((,) q) ws))

slices :: Int -> [a] -> [[a]]
slices n = unfoldr phi
  where phi [] = Nothing
        phi xs = Just $ splitAt n xs

mkcours :: Int -> [Int] -> [[Work]]
mkcours n = slices n . zipWith mkWork [1..]

type MuSec = Int                   -- 遅延microsec(サンプル用)
mkWork :: WorkID -> MuSec -> Work  -- サンプルの仕事作成
mkWork wid musec
 = do { s <- getCurrentTime 
      ; hPutStrLn stderr (printf "Work %d starts at %s" wid (show s))
      ; threadDelay musec
      ; e <- getCurrentTime
      ; hPutStrLn stderr (printf "Work %d ends   at %s" wid (show e))
      }

Index

Feed

Other

Link

Pathtraq

loading...