Martin Probst's weblog

Wide Finder in Scala

Monday, September 24, 2007, 22:42 — 4 comments Edit

Tim Bray:

In my Finding Things chapter of Beautiful Code, the first complete program is a little Ruby script that reads the ongoing Apache logfile and figures out which articles have been fetched the most. It's a classic example of the culture, born in Awk, perfected in Perl, of getting useful work done by combining regular expressions and hash tables. I want to figure out how to write an equivalent program that runs fast on modern CPUs with low clock rates but many cores; this is the Wide Finder project.

So while it’s probably most sensible to do this with some map/reduce library, I tried implementing it using Scala actors. I’m not a Scala programmer, and have no clue about the Actors library, so this code is probably totally wrong, inefficient etc. But at least I can learn something this way :-)

First the original Ruby script:

counts = {}
counts.default = 0

ARGF.each_line do |line| if line =~ %r{GET /ongoing/When/\d\d\dx/(\d\d\d\d/\d\d/\d\d/[^ .]+) } counts[$1] += 1 end end

keys_by_count = counts.keys.sort { |a, b| counts[b] <=> counts[a] } keys_by_count[0 .. 9].each do |key| puts “#{counts[key]}: #{key}” end

Converted to Scala, that gives for the serial case:

object SerialAnalyzer extends Application {
  val pattern = Pattern.compile(“GET /root/(\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+)“)
  val reader = new BufferedReader(new FileReader(“/Users/martin/tmp/log”))

val counts = new HashMap[String, Int] var line = reader.readLine while (line != null) { val matcher = LogMatcher.pattern.matcher(line) if (matcher.find()) { val uri = matcher.group(1) val count = counts.getOrElse(uri, 0) counts(uri) = count + 1 } line = reader.readLine } }

This takes about 1.5 seconds to go through 250 M of log files on a dual core MacBook Pro 2GHz.

object Analyzer {
  def main(args: Array[String]): Unit = {
    val numAnalyzers = if (args.length > 0) Integer.parseInt(args(0)) else 4
    val logreader = new LogReader(numAnalyzers)
    logreader.start
  }
}

class LogReader(numAnalyzers: int) extends Actor {
  val reader = new BufferedReader(new FileReader("/Users/martin/tmp/log"))
  def hundredLines = (for (val i <- 0 to 10000) yield reader.readLine).toList
  
  val analyzers = (for (val i <- 1 to numAnalyzers) yield new LogMatcher).toList
  analyzers.foreach(_.start)
  
  def act = {
    while (reader.ready) analyzers.foreach(_ ! hundredLines)
    analyzers.foreach(_ ! Stop)
    for (analyzer <- analyzers) {
      receive {
        case result: HashMap[String, Int] => print("Done.\\n")
      }
    }
    val resultMap = new HashMap[String,Int]
    for (map <- analyzers.map(_.counts); (uri, count) <- map) {
      resultMap(uri) = resultMap.getOrElse(uri, 0) + count
    }
    for (entry <- resultMap) print(entry._1 + ": " + entry._2 + "\\n")
  }
}

object LogMatcher {
  val pattern = Pattern.compile("GET /root/(\\\\d\\\\d\\\\d\\\\d/\\\\d\\\\d/\\\\d\\\\d/[^ .]+)")
}
class LogMatcher extends Actor {
  val counts = new HashMap[String, Int]
  
  def act = {
    loop {
      react {
        case lines: List[String] =>
          for (line <- lines if line != null) { 
            val matcher = LogMatcher.pattern.matcher(line)
            if (matcher.find()) {
              val uri = matcher.group(1)
              val count = counts.getOrElse(uri, 0) 
              counts(uri) = count + 1
            }
          }
        case Stop => 
          sender ! counts
          exit()
      }
    }
  }
}

The code does work, but sadly the Actors version is not faster than the single threaded version on my dual core MacBook Pro. No idea why… also the program exhibits some sort of a memory leak - it seems to keep the whole file in memory, thus giving OutOfMemoryErrors if you don’t run it with a Java heap big enough for the whole log file. Again, no idea why, I don’t seem to keep any nasty pointers to anyone.

So what does this give? Ruby is an elegant language with a nice collections API. Scala is much nicer than Java, but still quite talkative. And I obviously didn’t really get something about the Scala actors…

PS: The Ruby version takes about 20 seconds to go through 270 MB of logs. The serial, no concurrency Scala version takes 18.5 seconds. Simply reading the data line-by-line using Scala takes over 12 seconds.


How long did the Ruby take to execute on your machine?


[...] Datei in den Speicher. Seine verbesserte Lösung beseitigt dieses Problem. Martin Probst liefert die Scala-Variante, Santiago Gala verwendet Python und den bereits in der letzten Kolumne erwähnten [...]


Shortly before I stumbled across this post, I wrote a Scala Actor MapReduce solution. Would you be willing to share with me your access logs? You can fax me a NDA if you’d like ;-). Up until now I’ve been fudging the logs. Would be nice to try my code out on something legit. Perhaps we can compare notes.

Oh, and if I understand your code correctly, LogMatcher’s code for incrementing isn’t threadsafe. I used AtomicInteger and some trickery


My code has changed since the above version quite significantly, but I think the Actor code here is correct - the counts hashmap exists for each Actor, and within a single Actor only one Thread executes at any time, so there should not be a problem there.

You can find the logs I used at Tim Bray’s site: http://www.tbray.org/tmp/o10k.ap