02 — Bronze: landing-zone → Delta Lake¶
O que este notebook faz¶
Lê os CSVs do bucket landing-zone com Apache Spark e grava cada tabela no bucket bronze no formato Delta Lake. É aqui que os dados deixam de ser "arquivos soltos" e passam a viver em uma estrutura transacional.
graph LR
A[MinIO<br/>landing-zone<br/>CSV] -->|spark.read.csv| B[DataFrame]
B -->|.write.format delta| C[MinIO<br/>bronze<br/>Delta Lake]
style A fill:#c72e29,stroke:#7a1c19,color:#fff
style C fill:#00add4,stroke:#005f74,color:#fff
Por que Delta Lake?¶
O Parquet puro não suporta UPDATE nem DELETE — para modificar dados gravados seria preciso reescrever arquivos manualmente. O Delta Lake adiciona uma camada de transaction log (_delta_log/) sobre os Parquets, garantindo:
-
Transações ACID
Operações atômicas, sem estados parciais
-
Time Travel
Consulta qualquer versão anterior da tabela
-
Schema enforcement
Bloqueia gravação com schema divergente por padrão
-
MERGE (UPSERT)
Combina INSERT e UPDATE em uma única operação
Estrutura final no MinIO¶
bronze/
├── apolice/
│ ├── part-00000-*.snappy.parquet
│ └── _delta_log/
│ └── 00000000000000000000.json ← carga inicial
├── carro/
├── cliente/
└── ... (uma pasta Delta por tabela)
SparkSession com Delta + S3A¶
import os
from pyspark.sql import SparkSession
from dotenv import load_dotenv
load_dotenv()
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT")
ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY")
SECRET_KEY = os.getenv("MINIO_SECRET_KEY")
spark = (
SparkSession.builder
.appName("csv-to-delta")
.config("spark.jars.packages",
"io.delta:delta-spark_2.12:3.2.0,"
"org.apache.hadoop:hadoop-aws:3.3.4,"
"com.amazonaws:aws-java-sdk-bundle:1.12.262")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT)
.config("spark.hadoop.fs.s3a.access.key", ACCESS_KEY)
.config("spark.hadoop.fs.s3a.secret.key", SECRET_KEY)
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.getOrCreate()
)
Conversão CSV → Delta¶
LANDING = "s3a://landing-zone"
BRONZE = "s3a://bronze"
TABELAS = ["regiao", "estado", "municipio", "endereco", "cliente",
"telefone", "marca", "modelo", "carro", "apolice", "sinistro"]
for tabela in TABELAS:
src = f"{LANDING}/{tabela}/{tabela}.csv"
dest = f"{BRONZE}/{tabela}"
df = (
spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(src)
)
(
df.write
.format("delta")
.mode("overwrite")
.save(dest)
)
count = spark.read.format("delta").load(dest).count()
print(f"✓ {tabela:12s} → {dest} ({count} linhas)")
Validação¶
df_apolice = spark.read.format("delta").load(f"{BRONZE}/apolice")
df_apolice.printSchema()
df_apolice.show(5)
root
|-- id_apolice: integer (nullable = true)
|-- id_cliente: integer (nullable = true)
|-- id_carro: integer (nullable = true)
|-- cobertura: string (nullable = true)
|-- valor_premio: double (nullable = true)
|-- data_inicio: date (nullable = true)
|-- data_fim: date (nullable = true)
mode('overwrite')
O modo overwrite reescreve os arquivos Parquet, mas o _delta_log/ continua acumulando o histórico. Isso permite voltar a versões anteriores via Time Travel mesmo após reprocessamentos.