006 Project 6: Mini Job Runner
006 Build a Mini Job Runner with Retries
This project builds a small in-process job runner with retries and backoff.
enqueue jobs -> workers pick jobs -> run handler -> retry on failure -> final status
Full main.go
package main
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"time"
)
type Job struct {
ID int
Payload string
Attempts int
}
type Runner struct {
workers int
maxTry int
jobs chan Job
}
func NewRunner(workers, maxTry, queueSize int) *Runner {
return &Runner{
workers: workers,
maxTry: maxTry,
jobs: make(chan Job, queueSize),
}
}
func (r *Runner) Enqueue(j Job) {
r.jobs <- j
}
func process(ctx context.Context, j Job) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(100 * time.Millisecond):
// simulate intermittent failure
if rand.Intn(3) == 0 {
return errors.New("temporary failure")
}
fmt.Printf("job %d ok payload=%q\n", j.ID, j.Payload)
return nil
}
}
func (r *Runner) Start(ctx context.Context) {
var wg sync.WaitGroup
for w := 1; w <= r.workers; w++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case j, ok := <-r.jobs:
if !ok {
return
}
err := process(ctx, j)
if err == nil {
continue
}
j.Attempts++
if j.Attempts < r.maxTry {
backoff := time.Duration(j.Attempts*200) * time.Millisecond
time.AfterFunc(backoff, func() { r.Enqueue(j) })
fmt.Printf("worker=%d retry job=%d attempt=%d err=%v\n", workerID, j.ID, j.Attempts, err)
} else {
fmt.Printf("worker=%d dead-letter job=%d err=%v\n", workerID, j.ID, err)
}
}
}
}(w)
}
wg.Wait()
}
func (r *Runner) Stop() {
close(r.jobs)
}
func main() {
rand.Seed(time.Now().UnixNano())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
r := NewRunner(4, 4, 64)
for i := 1; i <= 20; i++ {
r.Enqueue(Job{ID: i, Payload: fmt.Sprintf("task-%d", i)})
}
go func() {
<-ctx.Done()
r.Stop()
}()
r.Start(ctx)
}Run
go run .What to Extend
- Persistent queue storage.
- Dead-letter queue file.
- Metrics endpoint (
/metrics) with processing stats.
Step-by-Step Explanation
- Model jobs, workers, and outputs explicitly.
- Bound concurrency using worker pools and buffered channels.
- Use
sync.WaitGroupfor lifecycle control. - Aggregate worker results in one place.
- Verify behavior under both normal and failure paths.
Code Anatomy
- Producer pushes jobs into a channel.
- Workers consume jobs and emit results.
- Aggregator merges results and prints summary.
Learning Goals
- Build leak-free goroutine patterns.
- Balance throughput and resource limits.
- Understand fan-out/fan-in architecture.