Back to Home

Building data pipelines for fintech with Python

At Mercury, we needed to process millions of transactions daily for reconciliation, reporting, and compliance. Python's data ecosystem-Pandas, Polars, and Dask-made this manageable, but building reliable pipelines required careful design.

The Challenge

Financial data pipelines must be:

  • Accurate (no data loss or corruption)
  • Fast (process millions of records)
  • Reliable (handle failures gracefully)
  • Auditable (track every transformation)

Our Architecture

We used a combination of:

  • Pandas for in-memory transformations
  • Polars for larger datasets (faster, more memory-efficient)
  • Dask for distributed processing
  • Apache Airflow for orchestration

Example Pipeline

import pandas as pd
import polars as pl
from dask import dataframe as dd

def process_transactions(file_path: str) -> pd.DataFrame:
    # Read with Polars for speed
    df = pl.read_csv(file_path)
    
    # Transformations
    df = df.filter(pl.col("amount") > 0)
    df = df.with_columns([
        pl.col("timestamp").str.strptime(pl.Datetime),
        pl.col("amount").cast(pl.Decimal(10, 2))
    ])
    
    # Convert to Pandas for final processing
    return df.to_pandas()

def reconcile_transactions(transactions: pd.DataFrame, 
                          ledger: pd.DataFrame) -> pd.DataFrame:
    merged = transactions.merge(
        ledger, 
        on=["transaction_id", "amount"],
        how="outer",
        indicator=True
    )
    
    # Find discrepancies
    discrepancies = merged[merged["_merge"] != "both"]
    return discrepancies

Error Handling

def safe_process(file_path: str) -> Optional[pd.DataFrame]:
    try:
        return process_transactions(file_path)
    except FileNotFoundError:
        logger.error(f"File not found: {file_path}")
        return None
    except pd.errors.EmptyDataError:
        logger.warning(f"Empty file: {file_path}")
        return pd.DataFrame()
    except Exception as e:
        logger.exception(f"Unexpected error: {e}")
        raise

Performance Optimization

  • Use Polars for large datasets
  • Parallelize with Dask when possible
  • Cache intermediate results
  • Use appropriate data types
  • Profile and optimize bottlenecks

Testing

def test_transaction_processing():
    test_data = pd.DataFrame({
        "transaction_id": ["T1", "T2"],
        "amount": [100.50, 200.75],
        "timestamp": ["2023-04-01 10:00:00", "2023-04-01 11:00:00"]
    })
    
    result = process_transactions(test_data)
    assert len(result) == 2
    assert result["amount"].sum() == 301.25

Lessons Learned

  1. Start with Pandas, optimize with Polars when needed
  2. Always validate data at pipeline boundaries
  3. Log everything for debugging
  4. Test with production-like data volumes
  5. Monitor pipeline performance and costs

"Data pipelines are only as good as their error handling."

Related Posts