Ir para o conteúdo

03 — DML no Delta Lake

O que este notebook faz

Demonstra as operações de DML que o Delta Lake oferece nativamente sobre os dados da camada bronze: INSERT (via MERGE), UPDATE, DELETE e Time Travel. Cada operação gera um novo commit no _delta_log/, mantendo o histórico completo da tabela.


Operações cobertas

  •   INSERT (MERGE)


    Adiciona novos registros sem duplicar os já existentes

  •   UPDATE


    Atualiza colunas com base em uma condição

  •   DELETE


    Remove linhas que satisfazem uma condição

  •   Time Travel


    Consulta versões anteriores via versionAsOf ou timestampAsOf


Setup

from delta.tables import DeltaTable
from pyspark.sql.functions import col, lit

BRONZE = "s3a://bronze"

dt_apolice  = DeltaTable.forPath(spark, f"{BRONZE}/apolice")
dt_sinistro = DeltaTable.forPath(spark, f"{BRONZE}/sinistro")
dt_carro    = DeltaTable.forPath(spark, f"{BRONZE}/carro")

INSERT — via MERGE

O MERGE é mais robusto que um append porque evita duplicatas — se o id já existir, o registro é ignorado.

novos = spark.createDataFrame(
    [(11, 5, 7, "Total", 3500.00, "2025-01-01", "2026-01-01")],
    ["id_apolice", "id_cliente", "id_carro",
     "cobertura", "valor_premio", "data_inicio", "data_fim"],
)

(
    dt_apolice.alias("t")
    .merge(novos.alias("s"), "t.id_apolice = s.id_apolice")
    .whenNotMatchedInsertAll()
    .execute()
)

UPDATE — reajuste condicional

Aplica um aumento de 15% no prêmio das apólices com cobertura Básica:

dt_apolice.update(
    condition = col("cobertura") == "Básica",
    set       = {"valor_premio": col("valor_premio") * lit(1.15)},
)

A versão equivalente em SQL Spark:

spark.sql("""
    UPDATE delta.`s3a://bronze/apolice`
    SET valor_premio = valor_premio * 1.15
    WHERE cobertura = 'Básica'
""")

DELETE — limpeza condicional

Remove sinistros com status Recusado e veículos anteriores a 2018:

dt_sinistro.delete(condition = col("status") == "Recusado")
dt_carro.delete(condition    = col("ano") < lit(2018))

Time Travel

Cada operação acima criou uma nova versão. Dá para inspecionar o histórico e ler qualquer versão anterior:

(
    DeltaTable.forPath(spark, f"{BRONZE}/apolice")
    .history()
    .select("version", "timestamp", "operation", "operationMetrics")
    .show(truncate=False)
)

Saída típica:

+-------+-------------------+---------+
|version|timestamp          |operation|
+-------+-------------------+---------+
|2      |2025-04-21 18:45:11|UPDATE   |
|1      |2025-04-21 18:44:53|MERGE    |
|0      |2025-04-21 18:30:02|WRITE    |
+-------+-------------------+---------+
df_v0 = (
    spark.read.format("delta")
    .option("versionAsOf", 0)
    .load(f"{BRONZE}/apolice")
)
df_v0.show()
df_ontem = (
    spark.read.format("delta")
    .option("timestampAsOf", "2025-04-20 23:59:59")
    .load(f"{BRONZE}/apolice")
)

Verificação final

for tabela in ["apolice", "sinistro", "carro"]:
    versoes = (
        DeltaTable.forPath(spark, f"{BRONZE}/{tabela}")
        .history()
        .count()
    )
    total = spark.read.format("delta").load(f"{BRONZE}/{tabela}").count()
    print(f"{tabela:10s}{total} linhas, {versoes} versões")

Pronto

O pipeline foi executado de ponta a ponta: SQL Server → CSV → Delta Lake, com DML transacional e histórico completo. Os dados na camada bronze já podem alimentar uma camada silver ou gold posteriormente.