Building data pipelines for fintech with Python
At Mercury, we needed to process millions of transactions daily for reconciliation, reporting, and compliance. Python's data ecosystem-Pandas, Polars, and Dask-made this manageable, but building reliable pipelines required careful design.
The Challenge
Financial data pipelines must be:
- Accurate (no data loss or corruption)
- Fast (process millions of records)
- Reliable (handle failures gracefully)
- Auditable (track every transformation)
Our Architecture
We used a combination of:
- Pandas for in-memory transformations
- Polars for larger datasets (faster, more memory-efficient)
- Dask for distributed processing
- Apache Airflow for orchestration
Example Pipeline
import pandas as pd
import polars as pl
from dask import dataframe as dd
def process_transactions(file_path: str) -> pd.DataFrame:
# Read with Polars for speed
df = pl.read_csv(file_path)
# Transformations
df = df.filter(pl.col("amount") > 0)
df = df.with_columns([
pl.col("timestamp").str.strptime(pl.Datetime),
pl.col("amount").cast(pl.Decimal(10, 2))
])
# Convert to Pandas for final processing
return df.to_pandas()
def reconcile_transactions(transactions: pd.DataFrame,
ledger: pd.DataFrame) -> pd.DataFrame:
merged = transactions.merge(
ledger,
on=["transaction_id", "amount"],
how="outer",
indicator=True
)
# Find discrepancies
discrepancies = merged[merged["_merge"] != "both"]
return discrepancies
Error Handling
def safe_process(file_path: str) -> Optional[pd.DataFrame]:
try:
return process_transactions(file_path)
except FileNotFoundError:
logger.error(f"File not found: {file_path}")
return None
except pd.errors.EmptyDataError:
logger.warning(f"Empty file: {file_path}")
return pd.DataFrame()
except Exception as e:
logger.exception(f"Unexpected error: {e}")
raise
Performance Optimization
- Use Polars for large datasets
- Parallelize with Dask when possible
- Cache intermediate results
- Use appropriate data types
- Profile and optimize bottlenecks
Testing
def test_transaction_processing():
test_data = pd.DataFrame({
"transaction_id": ["T1", "T2"],
"amount": [100.50, 200.75],
"timestamp": ["2023-04-01 10:00:00", "2023-04-01 11:00:00"]
})
result = process_transactions(test_data)
assert len(result) == 2
assert result["amount"].sum() == 301.25
Lessons Learned
- Start with Pandas, optimize with Polars when needed
- Always validate data at pipeline boundaries
- Log everything for debugging
- Test with production-like data volumes
- Monitor pipeline performance and costs
"Data pipelines are only as good as their error handling."