01 — Extração: SQL Server → landing-zone¶
O que este notebook faz¶
Lê todas as tabelas do banco seguradora via pyodbc, serializa cada uma como CSV em memória e faz upload para o bucket landing-zone do MinIO usando boto3. É a primeira etapa da arquitetura Medallion — raw ingestion, sem transformação alguma.
graph LR
A[(SQL Server<br/>seguradora)] -->|pyodbc<br/>SELECT *| B[StringIO + csv.writer]
B --> C[boto3<br/>put_object]
C --> D[MinIO<br/>landing-zone/<tabela>/<tabela>.csv]
style A fill:#cc2927,stroke:#7a1717,color:#fff
style D fill:#c72e29,stroke:#7a1c19,color:#fff
Mapeamento origem → destino¶
| Origem | Destino |
|---|---|
seguradora.regiao |
landing-zone/regiao/regiao.csv |
seguradora.estado |
landing-zone/estado/estado.csv |
seguradora.municipio |
landing-zone/municipio/municipio.csv |
| ... | ... |
seguradora.sinistro |
landing-zone/sinistro/sinistro.csv |
A pasta intermediária com o nome da tabela facilita listagem e leitura pelo Spark no notebook seguinte.
Cliente S3 (boto3)¶
import os
import boto3
from dotenv import load_dotenv
load_dotenv()
s3 = boto3.client(
"s3",
endpoint_url=os.getenv("MINIO_ENDPOINT"),
aws_access_key_id=os.getenv("MINIO_ACCESS_KEY"),
aws_secret_access_key=os.getenv("MINIO_SECRET_KEY"),
region_name="us-east-1",
)
LANDING = os.getenv("MINIO_LANDING_BUCKET", "landing-zone")
Loop de extração¶
import csv, io, pyodbc
cn = pyodbc.connect(
f"DRIVER={{ODBC Driver 18 for SQL Server}};"
f"SERVER={os.getenv('SQLSERVER_HOST')},{os.getenv('SQLSERVER_PORT')};"
f"DATABASE={os.getenv('SQLSERVER_DB')};"
f"UID={os.getenv('SQLSERVER_USER')};"
f"PWD={os.getenv('SQLSERVER_PASSWORD')};"
f"TrustServerCertificate=yes;"
)
cursor = cn.cursor()
TABELAS = ["regiao", "estado", "municipio", "endereco", "cliente",
"telefone", "marca", "modelo", "carro", "apolice", "sinistro"]
for table in TABELAS:
cursor.execute(f"SELECT * FROM {table}")
cols = [c[0] for c in cursor.description]
rows = cursor.fetchall()
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(cols)
writer.writerows(rows)
payload = buf.getvalue().encode("utf-8")
key = f"{table}/{table}.csv"
s3.put_object(
Bucket=LANDING,
Key=key,
Body=io.BytesIO(payload),
ContentType="text/csv",
)
print(f"✓ {table:12s} → s3://{LANDING}/{key} ({len(rows)} linhas)")
Verificação¶
Lista os objetos do bucket após a extração:
resp = s3.list_objects_v2(Bucket=LANDING)
for obj in resp.get("Contents", []):
print(f"{obj['Key']:50s} {obj['Size']} bytes")
Saída esperada:
apolice/apolice.csv ~1.2 KB
carro/carro.csv ~0.9 KB
cliente/cliente.csv ~1.5 KB
...
sinistro/sinistro.csv ~0.8 KB
telefone/telefone.csv ~0.4 KB
Por que CSV?
A camada landing-zone é o ponto de entrada do data lake. CSV mantém os dados em formato de texto, fáceis de inspecionar, antes de qualquer conversão para formatos colunares (Parquet/Delta) na bronze.