03 — DML no Delta Lake¶
O que este notebook faz¶
Demonstra as operações de DML que o Delta Lake oferece nativamente sobre os dados da camada bronze: INSERT (via MERGE), UPDATE, DELETE e Time Travel. Cada operação gera um novo commit no _delta_log/, mantendo o histórico completo da tabela.
Operações cobertas¶
-
INSERT (MERGE)
Adiciona novos registros sem duplicar os já existentes
-
UPDATE
Atualiza colunas com base em uma condição
-
DELETE
Remove linhas que satisfazem uma condição
-
Time Travel
Consulta versões anteriores via
versionAsOfoutimestampAsOf
Setup¶
from delta.tables import DeltaTable
from pyspark.sql.functions import col, lit
BRONZE = "s3a://bronze"
dt_apolice = DeltaTable.forPath(spark, f"{BRONZE}/apolice")
dt_sinistro = DeltaTable.forPath(spark, f"{BRONZE}/sinistro")
dt_carro = DeltaTable.forPath(spark, f"{BRONZE}/carro")
INSERT — via MERGE¶
O MERGE é mais robusto que um append porque evita duplicatas — se o id já existir, o registro é ignorado.
novos = spark.createDataFrame(
[(11, 5, 7, "Total", 3500.00, "2025-01-01", "2026-01-01")],
["id_apolice", "id_cliente", "id_carro",
"cobertura", "valor_premio", "data_inicio", "data_fim"],
)
(
dt_apolice.alias("t")
.merge(novos.alias("s"), "t.id_apolice = s.id_apolice")
.whenNotMatchedInsertAll()
.execute()
)
UPDATE — reajuste condicional¶
Aplica um aumento de 15% no prêmio das apólices com cobertura Básica:
dt_apolice.update(
condition = col("cobertura") == "Básica",
set = {"valor_premio": col("valor_premio") * lit(1.15)},
)
A versão equivalente em SQL Spark:
spark.sql("""
UPDATE delta.`s3a://bronze/apolice`
SET valor_premio = valor_premio * 1.15
WHERE cobertura = 'Básica'
""")
DELETE — limpeza condicional¶
Remove sinistros com status Recusado e veículos anteriores a 2018:
dt_sinistro.delete(condition = col("status") == "Recusado")
dt_carro.delete(condition = col("ano") < lit(2018))
Time Travel¶
Cada operação acima criou uma nova versão. Dá para inspecionar o histórico e ler qualquer versão anterior:
Verificação final¶
for tabela in ["apolice", "sinistro", "carro"]:
versoes = (
DeltaTable.forPath(spark, f"{BRONZE}/{tabela}")
.history()
.count()
)
total = spark.read.format("delta").load(f"{BRONZE}/{tabela}").count()
print(f"{tabela:10s} → {total} linhas, {versoes} versões")
Pronto
O pipeline foi executado de ponta a ponta: SQL Server → CSV → Delta Lake, com DML transacional e histórico completo. Os dados na camada bronze já podem alimentar uma camada silver ou gold posteriormente.