Comment detail

アクセスログのIPアドレスを逆引き (Nested Flatten)
Pythonで組んでみました。最初 thread.join() をせず、Queue.Queue#get() でブロックしたままインタプリタを終了していたところ、時々例外がでるのに悩まされましたが、Queue に None を送ることで明示的にスレッドを終了し、joinで待つことで解決しました(と思う)いまいち美しくない気がしますが、しかたないのかな。(with文を使っているので、Python2.5以降です)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from __future__ import with_statement
import collections
import threading
import socket
import Queue
import sys
import re

class Line:
    _cond = threading.Condition()
    _dict = {} # addr: host

    def __init__(self, addr, rest):
        self._addr = addr
        self._rest = rest
        with Line._cond:
            if addr not in Line._dict:
                Line._dict[addr] = None
                self.resolve = self._resolve

    def resolve(self):
        pass

    def _resolve(self):
        try:
            host = socket.gethostbyaddr(self._addr)[0]
        except socket.error:
            host = self._addr
        with Line._cond:
            Line._dict[self._addr] = host
            Line._cond.notifyAll()

    def output(self):
        with Line._cond:
            while 1:
                host = Line._dict[self._addr]
                if host is not None:
                    break
                Line._cond.wait()
        sys.stdout.write(host)
        sys.stdout.write(self._rest)

def read():
    r = re.compile('^' + '\.'.join(['\d+'] * 4))
    for line in open("access.log"):
        m = r.search(line)
        if not m:
            raise ValueError("line should start with IP address")
        yield Line(m.group(0), line[m.end(0):])

def main():
    lines = collections.deque()
    def output(count):
        while len(lines) > count:
            lines.pop().output()
    queue = Queue.Queue()
    count = 10 # hint: configure thread count here
    threads = []
    for _ in xrange(count):
        def resolve():
            while 1:
                line = queue.get()
                if line is None: # terminator
                    break
                line.resolve()
        thread = threading.Thread(target=resolve)
        thread.start()
        threads.append(thread)
    for line in read():
        lines.appendleft(line)
        queue.put(line)
        output(count)
    output(0)
    for _ in xrange(count):
        queue.put(None) # terminator
    for thread in threads:
        thread.join()

if __name__ == '__main__':
    main()
そうか、デーモンスレッドでなければ、それが終了するまでメインスレッドは終了しないから、joinする必要はないはず・・・。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
--- prev.py     Mon Aug  6 23:55:36 2007
+++ main.py     Mon Aug  6 23:56:02 2007
@@ -55,7 +55,6 @@
             lines.pop().output()
     queue = Queue.Queue()
     count = 10 # hint: configure thread count here
-    threads = []
     for _ in xrange(count):
         def resolve():
             while 1:
@@ -65,7 +64,6 @@
                 line.resolve()
         thread = threading.Thread(target=resolve)
         thread.start()
-        threads.append(thread)
     for line in read():
         lines.appendleft(line)
         queue.put(line)
@@ -73,8 +71,6 @@
     output(0)
     for _ in xrange(count):
         queue.put(None) # terminator
-    for thread in threads:
-        thread.join()

 if __name__ == '__main__':
     main()
スレッドの生成コストが気にならなければ、この方がシンプルですね。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from __future__ import with_statement
import collections
import threading
import socket
import sys
import re

cache = {} # addr: host
cond = threading.Condition() # protect `cache'

class Resolver(threading.Thread):
    def __init__(self, addr):
        threading.Thread.__init__(self)
        self._addr = addr

    def run(self):
        try:
            host = socket.gethostbyaddr(self._addr)[0]
        except socket.error:
            host = self._addr
        with cond:
            cache[self._addr] = host
            cond.notify()

def main():
    queue = collections.deque()
    def output(count):
        while len(queue) > count:
            addr, rest = queue.pop()
            with cond:
                while cache[addr] is None:
                    cond.wait()
                sys.stdout.write("%s%s" % (cache[addr], rest))
    for line in open("access.log"):
        m = re.search('^' + '\.'.join(['\d+'] * 4), line)
        if not m:
            raise ValueError("line should start with IP address")
        addr, rest = m.group(0), line[m.end(0):]
        queue.appendleft((addr, rest))
        with cond:
            if addr not in cache:
                cache[addr] = None
                thread = Resolver(addr)
                thread.start()
        output(10) # configure this count!
    output(0)

if __name__ == '__main__':
    main()

Index

Feed

Other

Link

Pathtraq

loading...