Add tags

Add tags to the following comment
何を以ってスレッドプールと呼ぶかは微妙ですがHaskellでスレッドプールもどきを
を表現してみました.

11〜17行目でそれらしい型名を導入しています.
Worker というのがひとつのワーカスレッドです.これは IO () という型で,
Haskell 的には,アクションとよばれる値を表わす型です.アクションは実行
されると入出力を行うような値です.

26行目でワーカスレッドプールを作成しています.ここでは,n 個のワーカの
リストをforkIOをつかって「実行」されると別スレッドで動くアクションのリ
ストに変換しています.これがこのリストがスレッドプールを表現していると
考えることができます.

ワーカは mkWorkerを使って作ります.ワーカはワーカ番号とワーカの開始と
終了を知るためのセマフォとタスクを受けとる口とで作ります.ワーカ本体は,
59〜67行目までで定義されているとおり,
・口からタスクを読むアクション(60行目)
・タスクの有無で選択(61行目)
  ・タスクない場合は一旦ほかのスレッドに実行を渡すアクション+自分自身(62行目)
  ・タスクがあった場合は(63行目)
      ・タスクの種類で選択(64行目)
          ・終了マークなら,なにもしないアクション(65行目)
          ・仕事なら,セマフォを掴むアクション
                     +その仕事
                     +セマフォを戻すアクション
                     +自分自身
となっています.
タスクをワーカに渡すのにはMVarを経由します.

細部の説明はきりがないのでここでやめますが,注意点をひとつ.
「実行」と「評価」は別ものです.「実行」は Haskell のプログラムの意味
ににあらわれません.do構文のなかにならんでいる行は,命令型の言語でいう
ところの文(statement)とか命令(command)ではなく,式です.それぞれの式は
「評価」するとアクションとよばれる値を表わしています.
Haskell では do 構文(糖衣)を使ってアクションの並びをひとつのアクション
にまとめます.

ここでは,できるだけ命令型の構文や概念に近くなるように書きました.
正直に白状するとHaskell脳症の頭で考えるのは難しかったです.

より具体的な課題に対してはもっと関数的な Haskell らしい書き方というのが
あるような気がしていますが...


*Main> :main 3
Worker 1 starts at 2007-12-28 10:15:29.973076 UTC
Work 1 starts at 2007-12-28 10:15:29.973809 UTC
Worker 2 starts at 2007-12-28 10:15:29.97446 UTC
Work 2 starts at 2007-12-28 10:15:29.975092 UTC
Worker 3 starts at 2007-12-28 10:15:29.975778 UTC
Work 3 starts at 2007-12-28 10:15:29.976253 UTC
Work 3 ends   at 2007-12-28 10:15:38.984953 UTC
Work 2 ends   at 2007-12-28 10:15:41.985189 UTC
Work 1 ends   at 2007-12-28 10:15:43.984072 UTC
Work 4 starts at 2007-12-28 10:15:43.984574 UTC
Work 6 starts at 2007-12-28 10:15:43.984975 UTC
Work 5 starts at 2007-12-28 10:15:43.985371 UTC
Work 4 ends   at 2007-12-28 10:15:50.993701 UTC
Work 6 ends   at 2007-12-28 10:15:53.994784 UTC
Work 5 ends   at 2007-12-28 10:15:54.995536 UTC
Worker 2 ends   at 2007-12-28 10:15:54.996034 UTC
Worker 3 ends   at 2007-12-28 10:15:54.996436 UTC
Worker 1 ends   at 2007-12-28 10:15:54.996838 UTC
Main thread finished.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
module Main (main) where

import Control.Concurrent
import Data.List
import Data.Time
import System.Environment
import System.IO
import System.Random
import Text.Printf

type Done     = ()      -- ワーカ終了合図
type Work     = IO ()   -- 仕事
type WorkID   = Int     -- 仕事番号
type Task     = (QSemN, Either Done Work) -- タスク(1クールセマフォ,仕事)
type Conn     = MVar Task -- ワーカにタスクを渡す口
type Worker   = IO ()     -- ワーカ
type WorkerID = Int       -- ワーカ番号

main :: IO ()
main = do {
; a:_ <- getArgs
; g <- getStdGen
; let { n = read a; rs = map (10^6*) $ randomRs (5,15) g ; cours = mkcours n rs}
; ps <- workerSems  n -- ワーカの状態を知るためのセマフォ
; cs <- connections n -- メインスレッドからワーカスレッドへの仕事を渡すための口
; mapM_ forkIO $ zipWith3 mkWorker [1..n] ps cs -- ワーカスレッドプール作成
; qn <- newQSemN n                           -- ワークの終了を知るためのセマフォ
; deliver qn cs (map Right $ cours !! 0)     -- 一回目の仕事の割り振り
; waitQSemN qn (2*n);                        -- 一クールの終了待ち
; signalQSemN qn n                           -- セマフォのリセット
; deliver qn cs (map Right $ cours !! 1)     -- 二回目の仕事の割り振り
; waitQSemN qn (2*n);                        -- 一クールの終了待ち
; signalQSemN qn n                           -- セマフォのリセット
; deliver qn cs (replicate n $ Left ())      -- ワーカに終了の合図
; mapM_ waitQSem ps                          -- すべてのワーカの終了を待ち
; hPutStrLn stderr "Main thread finished."
}

workerSems :: Int -> IO [QSem]
workerSems n = mapM (const $ newQSem 1) [1..n]

deliverSem :: Int -> IO QSemN
deliverSem = newQSemN

connections :: Int -> IO [Conn]
connections n = mapM (const newEmptyMVar) [1..n]

mkWorker :: WorkerID -> QSem -> Conn -> Worker
mkWorker wid wq conn 
 = do { waitQSem wq
      ; s <- getCurrentTime
      ; hPutStrLn stderr (printf "Worker %d starts at %s" wid (show s))
      ; worker
      ; e <- getCurrentTime
      ; hPutStrLn stderr (printf "Worker %d ends   at %s" wid (show e))
      ; signalQSem wq
      }
 where 
   worker
    = do { mt <- tryTakeMVar conn
         ; case mt of
             Nothing -> yield >> worker
             Just (q,dw)
               -> case dw of
                    Left  ()   -> return ()
                    Right work -> waitQSemN q 1 >> work >> signalQSemN q 2 >> worker
         }

deliver :: QSemN -> [Conn] -> [Either Done Work] -> IO ()
deliver q cs ws = mapM_ (uncurry putMVar) (zip cs (map ((,) q) ws))

slices :: Int -> [a] -> [[a]]
slices n = unfoldr phi
  where phi [] = Nothing
        phi xs = Just $ splitAt n xs

mkcours :: Int -> [Int] -> [[Work]]
mkcours n = slices n . zipWith mkWork [1..]

type MuSec = Int                   -- 遅延microsec(サンプル用)
mkWork :: WorkID -> MuSec -> Work  -- サンプルの仕事作成
mkWork wid musec
 = do { s <- getCurrentTime 
      ; hPutStrLn stderr (printf "Work %d starts at %s" wid (show s))
      ; threadDelay musec
      ; e <- getCurrentTime
      ; hPutStrLn stderr (printf "Work %d ends   at %s" wid (show e))
      }

Add tags

The input will be splited to tags with space.

Index

Feed

Other

Link

Pathtraq

loading...