Solving Memory Pressure in a Python Data Pipeline

Speed problems have a clean shape. Something is too slow, you find the bottleneck, you fix the architecture, the number improves. You benchmark it. You know when you're done.
Memory problems announce themselves differently. They wait for production, for concurrent load, for the conditions you didn't model in your benchmark test. The vectorized rewrite had taken the engine from 30 minutes to 10 seconds for 1,000 records. What that benchmark didn't model was what happened when multiple clients triggered processing simultaneously on a server with 8GB of RAM.
The fast architecture had a cost that only showed up under load.
The cost of being wide
The vectorized approach works by merging all relevant data into a single wide DataFrame and computing columns across the full dataset at once. For a client with 25,000 employees, the working DataFrame had 50-plus columns across 25,000 rows, held in memory for the duration of calculation.
One client at a time, that's manageable. Several clients processing simultaneously, each holding a wide DataFrame in memory, competing for the same 8GB heap, and memory usage became unpredictable in ways the benchmark hadn't revealed. End-of-month processing windows were the worst: every client triggered their payroll run simultaneously, and the server had no way to know it was overcommitting until it was already struggling.
Pandas doesn't stream. There's no mechanism to say "process this in chunks, releasing memory as you go" within the standard API. The calculation engine was correct and fast. It hadn't been designed for the environment it was going to run in.
The Dask detour 🚧
The obvious next step was Dask. Dask wraps pandas operations behind a lazy evaluation interface and handles chunked execution internally. The pitch was appealing: pandas-compatible API, memory-aware execution, no need to rewrite the calculation logic.
What we ran into was the gap between "pandas-compatible" and "fully pandas-compatible." Specific join patterns we depended on behaved differently or weren't supported at the version we were targeting. Some aggregation operations produced results that didn't match the pandas baseline. A subset of the numpy-based column operations used for conditional calculations (the nested np.where chains that handled our more complex financial logic), which either didn't exist in the Dask API or required restructuring we couldn't arrive at cleanly.
We couldn't wait for the library to catch up. We'd already rebuilt the calculation engine once. What we needed was a solution that worked around the memory constraint without touching the vectorized core that was giving us the performance.
Batching the input
The first layer was batching. Instead of loading all records for a client at once, the input was split into fixed-size batches. Each batch went through the full vectorized pipeline independently, results were accumulated, and the batch was explicitly released before the next one loaded.
import gc
import pandas as pd
BATCH_SIZE = 5_000
def process_client_run(client_id: str, period_key: str) -> pd.DataFrame:
records = load_records_for_client(client_id, period_key)
results = []
for start in range(0, len(records), BATCH_SIZE):
batch = records.iloc[start : start + BATCH_SIZE].copy()
result = run_vectorized_pipeline(batch, client_id, period_key)
results.append(result)
del batch
del result
gc.collect()
return pd.concat(results, ignore_index=True)The important distinction: the loop was over batches, not over individual records. Inside each batch, the full merge chain and column-level operations ran unchanged. The working DataFrame was bounded to one batch at a time. The calculation architecture didn't change. The outer structure around it did.
This is a pattern worth internalizing beyond this specific case. When your operation is fast but wide, batching at the input level is often the right constraint. You're not changing what the computation does. You're changing how much data it holds at once.
Queuing the concurrency
Batching handled single-client memory pressure. Concurrent clients were a different problem.
If several clients triggered processing in the same window, several process_client_run calls would run simultaneously. Even with batching, that meant multiple active working pipelines competing for the same heap. A job queue with a concurrency semaphore limited how many jobs ran at the same time, regardless of how many had been submitted.
import queue
import threading
MAX_CONCURRENT = 3
_semaphore = threading.Semaphore(MAX_CONCURRENT)
_job_queue: queue.Queue = queue.Queue()
_results: dict = {}
_lock = threading.Lock()
def submit_job(job_id: str, client_id: str, period_key: str) -> None:
_job_queue.put((job_id, client_id, period_key))
def _worker() -> None:
while True:
job_id, client_id, period_key = _job_queue.get()
with _semaphore:
try:
result = process_client_run(client_id, period_key)
with _lock:
_results[job_id] = {'state': 'complete', 'data': result}
except Exception as exc:
with _lock:
_results[job_id] = {'state': 'failed', 'error': str(exc)}
_job_queue.task_done()The semaphore kept concurrent active jobs within the memory budget. The queue absorbed bursts without losing jobs. A client whose job queued behind others would wait, but the wait was bounded and predictable rather than a spike that degraded the entire system.
Polling without blocking
The queue introduced an interface problem. A request that submits a processing job can't block waiting for the result without holding a server thread for the job's full duration. The job becomes asynchronous: submit, get an ID, check back later.
Polling was the mechanism:
import time
def poll_job(job_id: str, interval: float = 1.0, timeout: float = 300.0) -> dict:
start = time.monotonic()
while (time.monotonic() - start) < timeout:
with _lock:
status = _results.get(job_id)
if status and status['state'] in ('complete', 'failed'):
return status
time.sleep(interval)
raise TimeoutError(f"Job {job_id} did not complete within {timeout}s")For jobs that completed in 10 to 12 seconds, a one-second interval meant callers received results almost immediately after completion without holding any resource during the wait. The polling approach generalizes cleanly to any long-running asynchronous operation, which is worth noting because the pattern comes up repeatedly in data-intensive backend work.
The honest accounting
After batching and queuing, the 1,000-record benchmark moved from 10 seconds to 12 seconds. A 20% overhead, paid to make the system deployable under real concurrent load on real hardware.
The tradeoff was straightforward. The system could now handle 100,000 records in a single processing run, with multiple client jobs queued behind the semaphore, on a server that stayed within memory bounds through peak windows. That number held in production from the start.
There's a version of this story that presents the vectorized rewrite as the insight and the memory work as a footnote. The reality is that both were necessary, and the second took as much careful thinking as the first. A fast algorithm that can't run reliably on the infrastructure it's deployed to isn't a complete solution.
The vectorized architecture was an insight about data. The memory layer was an insight about deployment. Engineering that ships needs both.
After two years of working through problems like these across multiple financial calculation engines, a pattern became visible: the infrastructure underneath all of it (the vectorized core, the batching, the queuing, the polling) was the same every time. That observation eventually became a Python library, and what made that decision worth making is what the next post is about.
