Skip to main content

Producer-Consumer Pattern

Introduction

Picture a busy restaurant kitchen. Chefs cook dishes at their own pace — some dishes take 2 minutes, others take 20. Waiters deliver completed dishes to tables. The chefs and waiters work at completely different speeds, and neither should wait for the other to be "ready."

The solution? An order window — a shelf between the kitchen and the dining area. Chefs place finished dishes on the shelf. Waiters pick dishes up from the shelf. The shelf decouples the two: chefs produce without caring which waiter takes the dish, and waiters consume without caring which chef made it.

But the shelf has limited space. If it is full, chefs must wait until a waiter picks something up. If it is empty, waiters must wait until a chef places a new dish. This natural flow control prevents both overflow and starvation.

This is the Producer-Consumer Pattern — one of the most fundamental concurrency patterns in software. Producers generate data and place it into a shared buffer. Consumers retrieve data from that buffer and process it. The buffer acts as a synchronization point that decouples production speed from consumption speed.

You encounter this pattern everywhere in real systems: log writers pushing entries to a log buffer while a background thread flushes to disk, web servers queuing incoming requests for worker threads, message brokers like Kafka and RabbitMQ, print spoolers queuing documents, and video encoders with frame-producer and encoding-consumer threads.

This pattern builds directly on the synchronization primitives from S14 — mutexes, condition variables, and semaphores. Here, we assemble those building blocks into a structured solution for a problem that appears in virtually every concurrent system.

Real-World Analogy

Think about an assembly line in a factory. Station A stamps metal parts and places them on a conveyor belt. Station B picks parts off the belt and welds them. Station C picks welded parts and paints them.

Each station is both a consumer (of the previous station's output) and a producer (for the next station). The conveyor belt segments between stations are the bounded buffers.

Three critical properties make this work:

  1. Decoupled speeds: Station A can stamp faster than Station B welds. The belt absorbs the difference — up to a point. If the belt is full, Station A pauses. If the belt is empty, Station B waits. Neither station needs to know the other's speed.

  2. Multiple workers per station: You can add a second welder at Station B without changing Station A or C. The belt does not care who places parts or who picks them up — it just holds items.

  3. Bounded capacity: The belt can only hold N items. This prevents Station A from producing millions of parts that overwhelm memory, create backpressure, or crash the system. The bound is the safety valve.

These three properties — speed decoupling, worker scalability, and bounded capacity — are exactly what the Producer-Consumer Pattern provides in software.

How It Works — Structure & Participants

The Producer-Consumer Pattern has three participants:

1. Producer

A thread (or set of threads) that generates data items and places them into the shared buffer. The producer does not know or care who will consume its items. It only interacts with the buffer.

Key behavior: If the buffer is full, the producer blocks (waits) until space becomes available. This prevents unbounded memory growth and creates natural backpressure.

2. Consumer

A thread (or set of threads) that retrieves data items from the shared buffer and processes them. The consumer does not know or care who produced the items. It only interacts with the buffer.

Key behavior: If the buffer is empty, the consumer blocks (waits) until new data arrives. This prevents busy-waiting (spinning the CPU doing nothing useful).

3. Bounded Buffer (Blocking Queue)

The shared data structure that sits between producers and consumers. It has a fixed maximum capacity and provides thread-safe put() and get() operations.

Critical internal mechanics:

  • Thread safety: A mutex (lock) protects the buffer's internal state. Only one thread at a time can modify the buffer.
  • Blocking on full: When a producer calls put() and the buffer is at capacity, the producer releases the lock and sleeps on a "not full" condition variable. When a consumer removes an item, it signals "not full," waking the sleeping producer.
  • Blocking on empty: When a consumer calls get() and the buffer is empty, the consumer releases the lock and sleeps on a "not empty" condition variable. When a producer adds an item, it signals "not empty," waking the sleeping consumer.

This two-condition-variable design is the canonical implementation, first described in the classic concurrency literature and covered in Java Concurrency in Practice.

The Flow

  1. Producers call buffer.put(item) — this blocks if full
  2. Consumers call buffer.get() — this blocks if empty
  3. The buffer automatically coordinates blocking and waking using condition variables
  4. Neither producers nor consumers know about each other — the buffer is the only shared point
  5. The system achieves natural flow control: fast producers slow down when the buffer fills, fast consumers slow down when the buffer empties

Visualization

Producer-Consumer Pattern — Bounded Buffer Coordination

Code Implementation

We will implement the Producer-Consumer Pattern in two ways for each language:

  1. From scratch — using a mutex and condition variables, to understand the internal mechanics
  2. Using the standard library — using the language's built-in blocking queue, which is what you should use in production

The "from scratch" version teaches you how blocking queues work internally. The standard library version shows you the production-ready approach.

import threading
import time
import random
from queue import Queue
from collections import deque
from typing import Any


# === Version 1: From Scratch (mutex + condition variables) ===

class BoundedBuffer:
    """Thread-safe bounded buffer using a lock and two condition variables."""

    def __init__(self, capacity: int) -> None:
        self._buffer: deque = deque()
        self._capacity = capacity
        self._lock = threading.Lock()
        self._not_full = threading.Condition(self._lock)
        self._not_empty = threading.Condition(self._lock)

    def put(self, item: Any) -> None:
        """Add an item to the buffer. Blocks if the buffer is full."""
        with self._not_full:  # acquires the lock
            while len(self._buffer) >= self._capacity:
                self._not_full.wait()  # releases lock, sleeps until signaled
            self._buffer.append(item)
            self._not_empty.notify()  # wake up a waiting consumer

    def get(self) -> Any:
        """Remove and return an item. Blocks if the buffer is empty."""
        with self._not_empty:  # acquires the lock
            while len(self._buffer) == 0:
                self._not_empty.wait()  # releases lock, sleeps until signaled
            item = self._buffer.popleft()
            self._not_full.notify()  # wake up a waiting producer
            return item


def producer(buffer: BoundedBuffer, name: str, items: int) -> None:
    """Produces items and places them into the buffer."""
    for i in range(items):
        item = f"{name}-item-{i}"
        buffer.put(item)
        print(f"  [Producer {name}] produced {item}")
        time.sleep(random.uniform(0.05, 0.2))  # simulate variable production time


def consumer(buffer: BoundedBuffer, name: str, items: int) -> None:
    """Consumes items from the buffer."""
    for _ in range(items):
        item = buffer.get()
        print(f"  [Consumer {name}] consumed {item}")
        time.sleep(random.uniform(0.1, 0.3))  # simulate processing time


# Run the from-scratch version
print("=== From-Scratch BoundedBuffer ===")
buffer = BoundedBuffer(capacity=3)

producers = [
    threading.Thread(target=producer, args=(buffer, "P1", 4)),
    threading.Thread(target=producer, args=(buffer, "P2", 4)),
]
consumers = [
    threading.Thread(target=consumer, args=(buffer, "C1", 4)),
    threading.Thread(target=consumer, args=(buffer, "C2", 4)),
]

for t in producers + consumers:
    t.start()
for t in producers + consumers:
    t.join()

print("All done.\n")


# === Version 2: Using Python's queue.Queue (production approach) ===

def producer_stdlib(q: Queue, name: str, items: int) -> None:
    for i in range(items):
        item = f"{name}-item-{i}"
        q.put(item)  # blocks if queue is full
        print(f"  [Producer {name}] produced {item}")
        time.sleep(random.uniform(0.05, 0.15))


def consumer_stdlib(q: Queue, name: str, items: int) -> None:
    for _ in range(items):
        item = q.get()  # blocks if queue is empty
        print(f"  [Consumer {name}] consumed {item}")
        q.task_done()
        time.sleep(random.uniform(0.1, 0.2))


print("=== Python queue.Queue (Standard Library) ===")
q: Queue = Queue(maxsize=3)  # bounded to 3 items

producers = [
    threading.Thread(target=producer_stdlib, args=(q, "P1", 3)),
    threading.Thread(target=producer_stdlib, args=(q, "P2", 3)),
]
consumers = [
    threading.Thread(target=consumer_stdlib, args=(q, "C1", 3)),
    threading.Thread(target=consumer_stdlib, args=(q, "C2", 3)),
]

for t in producers + consumers:
    t.start()
for t in producers + consumers:
    t.join()

print("All done.")
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

// === Version 1: From Scratch (synchronized + wait/notify) ===

class BoundedBuffer<T> {
    private final Queue<T> buffer = new LinkedList<>();
    private final int capacity;

    public BoundedBuffer(int capacity) {
        this.capacity = capacity;
    }

    /** Add an item. Blocks the calling thread if the buffer is full. */
    public synchronized void put(T item) throws InterruptedException {
        while (buffer.size() >= capacity) {
            wait(); // releases the monitor, sleeps until notified
        }
        buffer.add(item);
        notifyAll(); // wake up waiting consumers (and other producers)
    }

    /** Remove and return an item. Blocks if the buffer is empty. */
    public synchronized T get() throws InterruptedException {
        while (buffer.isEmpty()) {
            wait(); // releases the monitor, sleeps until notified
        }
        T item = buffer.poll();
        notifyAll(); // wake up waiting producers (and other consumers)
        return item;
    }
}

// === Version 2: Using java.util.concurrent (production approach) ===

public class ProducerConsumerDemo {

    // --- From-scratch demo ---

    static void runFromScratch() throws InterruptedException {
        BoundedBuffer<String> buffer = new BoundedBuffer<>(3);

        Runnable producerTask = () -> {
            String name = Thread.currentThread().getName();
            for (int i = 0; i < 4; i++) {
                String item = name + "-item-" + i;
                try {
                    buffer.put(item);
                    System.out.println("  [Producer " + name + "] produced " + item);
                    Thread.sleep((long) (Math.random() * 150 + 50));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        };

        Runnable consumerTask = () -> {
            String name = Thread.currentThread().getName();
            for (int i = 0; i < 4; i++) {
                try {
                    String item = buffer.get();
                    System.out.println("  [Consumer " + name + "] consumed " + item);
                    Thread.sleep((long) (Math.random() * 200 + 100));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        };

        Thread p1 = new Thread(producerTask, "P1");
        Thread p2 = new Thread(producerTask, "P2");
        Thread c1 = new Thread(consumerTask, "C1");
        Thread c2 = new Thread(consumerTask, "C2");

        p1.start(); p2.start(); c1.start(); c2.start();
        p1.join(); p2.join(); c1.join(); c2.join();
    }

    // --- Standard library demo using ArrayBlockingQueue ---

    static void runWithBlockingQueue() throws InterruptedException {
        // ArrayBlockingQueue is a bounded, thread-safe queue — the production choice
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);

        Runnable producerTask = () -> {
            String name = Thread.currentThread().getName();
            for (int i = 0; i < 3; i++) {
                String item = name + "-item-" + i;
                try {
                    queue.put(item); // blocks if full
                    System.out.println("  [Producer " + name + "] produced " + item);
                    Thread.sleep((long) (Math.random() * 100 + 50));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        };

        Runnable consumerTask = () -> {
            String name = Thread.currentThread().getName();
            for (int i = 0; i < 3; i++) {
                try {
                    String item = queue.take(); // blocks if empty
                    System.out.println("  [Consumer " + name + "] consumed " + item);
                    Thread.sleep((long) (Math.random() * 150 + 100));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        };

        Thread p1 = new Thread(producerTask, "P1");
        Thread p2 = new Thread(producerTask, "P2");
        Thread c1 = new Thread(consumerTask, "C1");
        Thread c2 = new Thread(consumerTask, "C2");

        p1.start(); p2.start(); c1.start(); c2.start();
        p1.join(); p2.join(); c1.join(); c2.join();
    }

    public static void main(String[] args) throws InterruptedException {
        System.out.println("=== From-Scratch BoundedBuffer ===");
        runFromScratch();

        System.out.println("\n=== ArrayBlockingQueue (Standard Library) ===");
        runWithBlockingQueue();

        System.out.println("\nAll done.");
    }
}

When to Use the Producer-Consumer Pattern

Use Producer-Consumer when:

  • Production and consumption speeds differ. If the producer is bursty (generates data in rapid spikes) while the consumer processes at a steady rate, the buffer absorbs the spikes. Without it, you either drop data during spikes or waste resources during idle periods.

  • You need to decouple stages of a pipeline. Log collection (produce log entries) and log writing (consume and flush to disk) should be independent. Request reception (produce incoming HTTP requests) and request processing (consume and handle) should be independent. The buffer between stages allows each to operate at its own pace.

  • You want to scale producers and consumers independently. Need more throughput? Add consumer threads. Having too many concurrent requests? Reduce consumers. The buffer and the pattern handle the coordination automatically.

  • You need backpressure. A bounded buffer naturally throttles fast producers when consumers cannot keep up. When the buffer is full, producers block — this prevents memory exhaustion, queue overflow, and downstream service overload. This backpressure mechanism is essential for resilient systems.

  • You're building any kind of work queue. Task schedulers, print spoolers, message processing pipelines, video frame encoding, batch data processing — all are instances of Producer-Consumer.

When NOT to Use the Producer-Consumer Pattern

Avoid Producer-Consumer when:

  • Production and consumption are inherently synchronous. If the producer must wait for the consumer's result before continuing (request-response), a direct method call or Future/Promise is simpler. Producer-Consumer is for fire-and-forget or batched processing, not for synchronous request-response interactions.

  • There is only one producer and one consumer doing the same thing. If a single thread produces and consumes at the same speed, the buffer adds complexity without benefit. A simple function call is enough.

  • The data volume is tiny and predictable. If you produce 5 items total and process them once, threading and a bounded buffer are overkill. Use a plain list and iterate.

  • Ordering is critical and there are multiple consumers. Multiple consumers pulling from the same queue may process items out of order. If strict ordering matters (e.g., transaction log replay), use a single consumer or partition the queue by key (like Kafka partitions).

  • You need immediate processing with guaranteed latency. The buffer introduces variable latency — items sit in the queue for an unpredictable time. If you need sub-millisecond guaranteed response time, direct invocation is better.

Common Mistakes

1. Using if Instead of while for the Wait Condition

Mistake: Checking the buffer condition with if instead of while before calling wait().

# WRONG
if len(self._buffer) >= self._capacity:
    self._not_full.wait()
# Proceeds to add to buffer — but another producer may have filled it while we woke up!

Why it is dangerous: This is called a spurious wakeup — the thread can be woken up even when the condition has not actually changed (a known behavior in most threading libraries). Or another thread may have changed the condition between the signal and the wakeup. With if, the thread proceeds without re-checking, leading to buffer overflow or reading from an empty buffer.

Fix: Always use a while loop:

while len(self._buffer) >= self._capacity:
    self._not_full.wait()

2. Using an Unbounded Buffer

Mistake: Using an unbounded queue (no maximum capacity) because "it's simpler."

Why it is dangerous: If producers are faster than consumers over sustained periods, the queue grows without limit, consuming all available memory until the process crashes with an OutOfMemoryError. This is especially treacherous because it works fine in testing (low load), then fails catastrophically in production (high load). Always set a capacity limit.

Fix: Always use a bounded buffer. Choose a capacity based on expected burst size and consumer processing time. In Python: Queue(maxsize=N). In Java: ArrayBlockingQueue(N). Monitor queue depth in production.

3. Forgetting to Signal the Other Side

Mistake: After a put(), forgetting to signal the "not empty" condition. After a get(), forgetting to signal the "not full" condition.

Why it is dangerous: Without signaling, blocked threads never wake up. Consumers wait forever for data that is already in the buffer. Producers wait forever for space that is already available. The system deadlocks — threads are alive but permanently asleep.

Fix: Every put() must signal "not empty" (a consumer might be waiting). Every get() must signal "not full" (a producer might be waiting). This is symmetric and non-optional.

4. Not Handling Thread Interruption (Java)

Mistake: Catching InterruptedException and swallowing it with an empty catch block.

Why it is dangerous: InterruptedException is Java's cooperative shutdown mechanism. Swallowing it means your application cannot be cleanly shut down — threads ignore the interrupt and keep running, preventing graceful termination.

Fix: Either re-throw the exception or restore the interrupt flag with Thread.currentThread().interrupt(). Never ignore it.

Related Patterns

  • Thread Pool Pattern (S15): The thread pool IS a Producer-Consumer system. Tasks are "produced" by submitting them to the pool, and worker threads "consume" tasks from an internal work queue. Understanding Producer-Consumer is prerequisite to understanding thread pools — we will build on this directly in the next tutorials.

  • Observer Pattern (S12): Both involve one side generating events and another side reacting. But Observer is synchronous within a single thread (the subject calls observers directly in the same thread), while Producer-Consumer is asynchronous across threads (producers and consumers run independently, decoupled by the buffer).

  • Active Object Pattern (S15): Active Object uses a Producer-Consumer internally — method calls are "produced" as request objects and placed in a scheduling queue, where a dedicated thread "consumes" and executes them. We will cover this in detail later in this section.

  • Command Pattern (S12): Commands can be the items in a Producer-Consumer buffer. Producers create command objects representing work to be done, and consumers dequeue and execute them. This combination of Command + Producer-Consumer is the foundation of task scheduling systems.

  • Pub-Sub (Design Problems course): A large-scale evolution of Producer-Consumer. Publishers produce messages to topics, subscribers consume messages from topics, and a message broker acts as the buffer. You will implement this pattern when designing the Pub-Sub System in the Design Problems course.

Key Takeaways

  1. Producer-Consumer decouples data production from consumption using a shared bounded buffer. Producers and consumers run at independent speeds — the buffer absorbs the difference.
  2. The bounded buffer provides natural backpressure — producers block when the buffer is full, preventing memory exhaustion. Consumers block when the buffer is empty, preventing busy-waiting.
  3. Two condition variables coordinate the threads: "not full" wakes blocked producers when a consumer removes an item; "not empty" wakes blocked consumers when a producer adds an item.
  4. Always use while loops around wait() calls, never if — spurious wakeups and race conditions require re-checking the condition after waking.
  5. In production, use the standard library: Python's queue.Queue and Java's ArrayBlockingQueue implement this pattern correctly and efficiently. Build from scratch only to learn the internals.
  6. This pattern is the foundation for thread pools, message queues, logging frameworks, and request processing pipelines — understanding it deeply is essential for LLD interviews involving any concurrent system.