Pipelines & Complex Patterns

Pipelines and sync.Cond

Beyond simple worker pools, Go concurrency shines in Data Pipelines (streaming data through stages) and Signaling (complex coordination).

1. The Pipeline Pattern (Fan-Out / Fan-In)

Pipelines allow you to process streams of data where each stage runs concurrently.

Stages: 1. Generator: Converts data (file lines, DB rows) into a channel. 2. Transformer: Reads one channel, modifies/filters, writes to another. 3. Sink: Consumes final output.

func Generator(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums { out <- n }
        close(out)
    }()
    return out
}

func Square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in { out <- n * n }
        close(out)
    }()
    return out
}

func Merge(channels ...<-chan int) <-chan int {
    // Advanced: Fan-In multiple channels to one
    var wg sync.WaitGroup
    out := make(chan int)

    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c { out <- n }
    }

    wg.Add(len(channels))
    for _, c := range channels {
        go output(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

Cancellation: Real pipelines must pass context.Context to every stage to support early cancellation (e.g., stop processing if the user disconnects).

2. The Overlooked sync.Cond

Channels are for passing data. Mutexes are for locking data. sync.Cond is for broadcasting signals.

Use Case: You have 100 goroutines waiting for a specific event (e.g., “Configuration Loaded” or “Queue is not empty”). * Channels: Sending 100 messages is O(N). * Channels (Close): Closing a channel broadcasts to all, but you can only do it once. * sync.Cond: Can Broadcast() multiple times.

Example: The Race Start

var mu sync.Mutex
cond := sync.NewCond(&mu)
ready := false

// Workers
for i := 0; i < 10; i++ {
    go func(id int) {
        mu.Lock()
        for !ready {
            cond.Wait() // Atomically unlocks mu and suspends execution
        }
        mu.Unlock()
        fmt.Println("Worker", id, "started")
    }(i)
}

// Coordinator
time.Sleep(1 * time.Second)
mu.Lock()
ready = true
mu.Unlock()
cond.Broadcast() // Wakes all 10 workers simultaneously

Warning: cond.Wait() must be in a loop (spurious wakeups are possible, though rare in Go, logic dictates checking the condition again).

Summary

  • Pipelines: Composable, streaming architecture. Great for ETL jobs.
  • Fan-In: Merging multiple concurrent streams.
  • sync.Cond: For “One-to-Many” signaling where the event can happen multiple times (unlike close(ch) which is one-off).