1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
concurrency = 100
n = 1000
inputfile = "access_log"
outputfile "access_log_out"

function processLog(input){
  nameservice = LRUcache(10000, reverseDns)
  pool = threadPool(concurrency)
  t = Object[n]
  i = 0
  for (line: readLines(input)){
    idx = line.indexOf(' ')
    t[i] = async({line, idx -> {->[nameservice[line[0..idx-1]], line[idx..]]}}(line, idx), pool)
    if (++i >= n){
      for (j: 0..n-1){
         r0, r1 = t[j]()
         println(r0, r1)
      }
      i = 0
    }
  }
}

import org.xbill.DNS.*
function reverseDns(hostIp){
  answers = Lookup(ReverseMap.fromAddress(hostIp), Type.PTR, DClass.IN).run()
  (answers == null || answers.length == 0) ? hostIp : answers[0].rdataToString()
}

input = reader(inputfile, "ASCII")
output = writer(outputfile, "ASCII")
addShutdownHook({->output.close(); input.close()})
getContext().setWriter(output) 
processLog(input)