Data Engineering a Escala: Procesando 1B+ Registros de Seguros con Apache Spark
A través de optimizaciones estratégicas en Apache Spark, logramos reducciones significativas en tiempo de ejecución (40-70%), reducción de shuffle (40-60%), y reducción de stages (20-30%).
El Problema
Antes de las optimizaciones, nuestro pipeline sufría de varios problemas críticos.
1. Particionamiento Inadecuado
Un patrón común era cargar tablas de referencia sin considerar el particionamiento estratégico.
def load_customer_segments(df):
return (
df.filter(col("segment_type") == "premium")
.select("customer_id", "segment_name", "tier")
.cache()
) Resultado - Data skew severo con algunas particiones procesando 10x más datos que otras.
2. Broadcast Joins Fallando
Spark intentaba hacer broadcast de tablas grandes que excedían la memoria disponible.
3. Shuffle Excesivo
Shuffle read/write de aproximadamente 800GB por job con stages innecesarios.
Solución 1 - Particionamiento Inteligente
La primera optimización fue implementar particionamiento estratégico basado en columnas de join.
Para tablas pequeñas de referencia
def load_customer_segments(df):
return (
df.filter(col("segment_type") == "premium")
.select("customer_id", "segment_name", "tier")
.repartition(4, "customer_id")
.cache()
) Beneficio - Co-location de datos reduce shuffle en joins posteriores en aproximadamente 50%.
Para datasets grandes
claims_df = (
claims_df
.repartition(200, "policy_id")
.filter(col("status").isin("approved", "pending"))
.filter(col("amount") > 1000)
) Fórmula para Particiones Óptimas
optimal_partitions = total_cores * 2.5
shuffle_partitions = 200
small_table_partitions = 4
target_size_mb = 128 Solución 2 - Broadcast Joins Selectivos
Matriz de Decisión
| Tamaño | Estrategia |
|---|---|
| Menor a 10 MB | Broadcast automático |
| 10-100 MB | Broadcast explícito |
| 100-500 MB | Shuffle hash |
| Mayor a 500 MB | Shuffle sort-merge |
Ejemplo con broadcast
from pyspark.sql.functions import broadcast
enriched_df = (
policies_df
.join(broadcast(product_catalog), "product_id", "left")
.join(broadcast(agent_info), "agent_id", "left")
) Solución 3 - Adaptive Query Execution
def setup_spark_optimizations(spark):
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
return spark Resultados
Tiempo de Ejecución
Antes - 180 minutos | Después - 65 minutos | Mejora - 64%
Shuffle
Antes - 820 GB | Después - 380 GB | Mejora - 54%
Costos
Antes - 11,000 USD/mes | Después - 3,000 USD/mes | Ahorro - 73%
Lecciones Aprendidas
1. Particionar Estratégicamente
def smart_partition(df, size_mb, join_cols):
if size_mb < 100:
return df.repartition(4, *join_cols)
elif size_mb < 1000:
return df.repartition(20, *join_cols)
else:
return df.repartition(200, *join_cols) 2. Monitorear Spark UI
Indicadores clave - Shuffle ratio mayor a 2x indica skew. Task variance alta indica mal particionamiento. GC time mayor a 10% indica memory pressure. Cualquier spill necesita más memoria.
3. Cache con Criterio
reference_data = (
spark.read.table("dim_products")
.repartition(4, "product_id")
.cache()
)
reference_data.count() Conclusión
Optimizar Spark a escala requiere particionamiento estratégico, uso inteligente de broadcast vs shuffle joins, aprovechamiento de AQE, y configuración adecuada del cluster.
Las optimizaciones mejoraron performance en 40-70% y redujeron costos en 73%, liberando presupuesto para otras iniciativas. Crearon una base sólida para modelos de ML que generan 1.8-2.2M USD anuales.