003 Project 3: Concurrent Log Analyzer

003 Build a Concurrent Log Analyzer

This CLI reads a log file and counts log levels using a worker pool.

file -> producer -> jobs channel -> N workers -> partial counts -> merge -> report

Full main.go

package main

import (
    "bufio"
    "flag"
    "fmt"
    "os"
    "runtime"
    "strings"
    "sync"
)

func worker(lines <-chan string, out chan<- map[string]int, wg *sync.WaitGroup) {
    defer wg.Done()
    local := map[string]int{"INFO": 0, "WARN": 0, "ERROR": 0, "DEBUG": 0}
    for line := range lines {
        for level := range local {
            if strings.Contains(line, level) {
                local[level]++
            }
        }
    }
    out <- local
}

func main() {
    workers := flag.Int("w", runtime.NumCPU(), "worker count")
    flag.Parse()
    if flag.NArg() != 1 {
        fmt.Println("usage: log-analyzer [-w N] <file>")
        os.Exit(2)
    }

    f, err := os.Open(flag.Arg(0))
    if err != nil {
        fmt.Fprintf(os.Stderr, "open failed: %v\n", err)
        os.Exit(1)
    }
    defer f.Close()

    jobs := make(chan string, 1024)
    partials := make(chan map[string]int, *workers)
    var wg sync.WaitGroup

    for i := 0; i < *workers; i++ {
        wg.Add(1)
        go worker(jobs, partials, &wg)
    }

    scanner := bufio.NewScanner(f)
    for scanner.Scan() {
        jobs <- scanner.Text()
    }
    close(jobs)
    if err := scanner.Err(); err != nil {
        fmt.Fprintf(os.Stderr, "scan error: %v\n", err)
        os.Exit(1)
    }

    wg.Wait()
    close(partials)

    total := map[string]int{"INFO": 0, "WARN": 0, "ERROR": 0, "DEBUG": 0}
    for p := range partials {
        for k, v := range p {
            total[k] += v
        }
    }

    fmt.Println("Log Summary")
    fmt.Printf("INFO:  %d\n", total["INFO"])
    fmt.Printf("WARN:  %d\n", total["WARN"])
    fmt.Printf("ERROR: %d\n", total["ERROR"])
    fmt.Printf("DEBUG: %d\n", total["DEBUG"])
}

Run

go run . -w 8 app.log

What to Extend

  1. Add JSON output mode.
  2. Add per-service breakdown.
  3. Add top-N error messages.

Step-by-Step Explanation

  1. Model jobs, workers, and outputs explicitly.
  2. Bound concurrency using worker pools and buffered channels.
  3. Use sync.WaitGroup for lifecycle control.
  4. Aggregate worker results in one place.
  5. Verify behavior under both normal and failure paths.

Code Anatomy

  • Producer pushes jobs into a channel.
  • Workers consume jobs and emit results.
  • Aggregator merges results and prints summary.

Learning Goals

  • Build leak-free goroutine patterns.
  • Balance throughput and resource limits.
  • Understand fan-out/fan-in architecture.