006 Project 6: Mini Job Runner

006 Build a Mini Job Runner with Retries

This project builds a small in-process job runner with retries and backoff.

enqueue jobs -> workers pick jobs -> run handler -> retry on failure -> final status

Full main.go

package main

import (
    "context"
    "errors"
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Job struct {
    ID       int
    Payload  string
    Attempts int
}

type Runner struct {
    workers int
    maxTry  int
    jobs    chan Job
}

func NewRunner(workers, maxTry, queueSize int) *Runner {
    return &Runner{
        workers: workers,
        maxTry:  maxTry,
        jobs:    make(chan Job, queueSize),
    }
}

func (r *Runner) Enqueue(j Job) {
    r.jobs <- j
}

func process(ctx context.Context, j Job) error {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case <-time.After(100 * time.Millisecond):
        // simulate intermittent failure
        if rand.Intn(3) == 0 {
            return errors.New("temporary failure")
        }
        fmt.Printf("job %d ok payload=%q\n", j.ID, j.Payload)
        return nil
    }
}

func (r *Runner) Start(ctx context.Context) {
    var wg sync.WaitGroup
    for w := 1; w <= r.workers; w++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for {
                select {
                case <-ctx.Done():
                    return
                case j, ok := <-r.jobs:
                    if !ok {
                        return
                    }

                    err := process(ctx, j)
                    if err == nil {
                        continue
                    }

                    j.Attempts++
                    if j.Attempts < r.maxTry {
                        backoff := time.Duration(j.Attempts*200) * time.Millisecond
                        time.AfterFunc(backoff, func() { r.Enqueue(j) })
                        fmt.Printf("worker=%d retry job=%d attempt=%d err=%v\n", workerID, j.ID, j.Attempts, err)
                    } else {
                        fmt.Printf("worker=%d dead-letter job=%d err=%v\n", workerID, j.ID, err)
                    }
                }
            }
        }(w)
    }

    wg.Wait()
}

func (r *Runner) Stop() {
    close(r.jobs)
}

func main() {
    rand.Seed(time.Now().UnixNano())
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    r := NewRunner(4, 4, 64)
    for i := 1; i <= 20; i++ {
        r.Enqueue(Job{ID: i, Payload: fmt.Sprintf("task-%d", i)})
    }

    go func() {
        <-ctx.Done()
        r.Stop()
    }()

    r.Start(ctx)
}

Run

go run .

What to Extend

  1. Persistent queue storage.
  2. Dead-letter queue file.
  3. Metrics endpoint (/metrics) with processing stats.

Step-by-Step Explanation

  1. Model jobs, workers, and outputs explicitly.
  2. Bound concurrency using worker pools and buffered channels.
  3. Use sync.WaitGroup for lifecycle control.
  4. Aggregate worker results in one place.
  5. Verify behavior under both normal and failure paths.

Code Anatomy

  • Producer pushes jobs into a channel.
  • Workers consume jobs and emit results.
  • Aggregator merges results and prints summary.

Learning Goals

  • Build leak-free goroutine patterns.
  • Balance throughput and resource limits.
  • Understand fan-out/fan-in architecture.