Pipeline de Integração e Processamento de Dados

Luciano Magalhães Luciano Magalhães   |   Janeiro, 2025   | Engenharia de Dados

Imagem de Integração de Dados

1. Contexto

Em ambientes de dados complexos é comum enfrentar o desafio de integrar informações provenientes de diversas fontes — como bancos de dados relacionais e arquivos estruturados — de forma a garantir que os dados estejam limpos, consistentes e facilmente acessíveis para usos futuros. Este trabalho busca construir um pipeline de integração, tratamento, validação de qualidade, rastreabilidade, auditoria e geração automatizada de relatórios.


2. Objetivo

Desenvolver, em Python, um pipeline completo de engenharia de dados que seja capaz de integrar múltiplas fontes (PostgreSQL, SQLite, CSV e JSON), realizar a limpeza, transformação e padronização dos dados, e aplicar validações estruturais e de negócio. O pipeline também deve contemplar rastreabilidade via logs, controle de versão por meio de backups automáticos e geração de relatório final da execução. Tudo isso para garantir a confiabilidade, escalabilidade e reutilização dos dados nas etapas seguintes da arquitetura analítica.


3. Metodologia

Utilizaremos práticas de DataOps para implementar um pipeline de ETL/ELT que envolve:

- Extração: Conexão e coleta de dados de diversas fontes (PostgreSQL, SQLite, CSV, JSON).
- Transformação: Limpeza, padronização e enriquecimento dos dados por meio de validações, tratamento de outliers e conversão de tipos, garantindo a integridade dos registros.
- Carregamento: Integração dos dados processados em uma base consolidada, com mecanismos de armazenamento flexível e otimizado.
- Backup Automático e Controle de Versão: Para preservar o histórico das alterações e permitir a recuperação dos dados, aumentando a confiabilidade do pipeline.
- Monitoramento com Logs: Registro detalhado de cada etapa para garantir rastreabilidade e facilitar a identificação de falhas.


3.1 Configuração Inicial e Importação de Bibliotecas

Neste bloco inicial vamos configurar o sistema de logs para monitorar o pipeline de dados. Também vamos realizar a importação das bibliotecas necessárias e aplicar filtros para suprimir avisos desnecessários, garantindo um ambiente de execução mais limpo e monitorado.

In [1]:
# 1. Bibliotecas para manipulação de dados e machine learning
import json
import numpy as np
import pandas as pd
from scipy.stats import zscore
from sklearn.ensemble import IsolationForest

# 2. Conexão com bancos de dados relacionais
import sqlite3
from sqlalchemy import create_engine

# 3. Manipulação de arquivos e sistema
import os
import shutil
import sys
import time
from pathlib import Path



# 4. Configuração de logs com rotação, personalização e aviso
import logging
from logging.handlers import RotatingFileHandler
import warnings
from datetime import datetime

# Definindo o nível AUDIT (entre INFO e WARNING)
AUDIT_LEVEL_NUM = 25
logging.addLevelName(AUDIT_LEVEL_NUM, "AUDIT")

def audit(self, message, *args, **kwargs):
    if self.isEnabledFor(AUDIT_LEVEL_NUM):
        self._log(AUDIT_LEVEL_NUM, message, args, **kwargs)

logging.Logger.audit = audit

# Criando logger_custom() com cores e categorias
from colorama import Fore, Style, init
init(autoreset=True)  # Auto reset para terminal

class CustomFormatter(logging.Formatter):
    COLOR_MAP = {
        "INFO": Fore.CYAN,
        "AUDIT": Fore.GREEN,
        "WARNING": Fore.YELLOW,
        "ERROR": Fore.RED,
        "CRITICAL": Fore.RED + Style.BRIGHT,
        "DEBUG": Fore.MAGENTA
    }

    def format(self, record):
        levelname = record.levelname
        color = self.COLOR_MAP.get(levelname, "")
        reset = Style.RESET_ALL
        message = super().format(record)
        return f"{color}{message}{reset}"

def logger_custom(diretorio_logs="logs", nome_arquivo="pipeline.log", nivel=logging.DEBUG):
    try:
        os.makedirs(diretorio_logs, exist_ok=True)
        caminho_log = os.path.join(diretorio_logs, nome_arquivo)

        formatter_padrao = logging.Formatter("%(asctime)s - [%(levelname)s] - %(name)s - %(message)s")
        formatter_terminal = CustomFormatter("%(asctime)s - [%(levelname)s] - %(message)s")

        # File Handler com rotação
        handler_arquivo = RotatingFileHandler(caminho_log, maxBytes=5 * 1024 * 1024, backupCount=3)
        handler_arquivo.setFormatter(formatter_padrao)

        # Console Handler com cor
        handler_console = logging.StreamHandler()
        handler_console.setFormatter(formatter_terminal)

        logger = logging.getLogger()
        logger.setLevel(nivel)

        # Evitar handlers duplicados
        if not logger.hasHandlers():
            logger.addHandler(handler_arquivo)
            logger.addHandler(handler_console)

        logger.info(f"Ambiente customizado de logs configurado com sucesso em: {caminho_log}")

    except Exception as e:
        print(f"❌ Erro ao configurar logger customizado: {str(e)}")

def configurar_logs(diretorio_logs=Path.cwd() / "logs", nome_arquivo="pipeline.log", nivel=logging.DEBUG):
    """
    Configura sistema de logging com rotação automática, saída no console
    e nível personalizável para rastreabilidade desde o início do pipeline.

    Parâmetros:
        diretorio_logs (str): Diretório onde os arquivos de log serão armazenados.
        nome_arquivo (str): Nome do arquivo principal de log.
        nivel (int): Nível mínimo de log (DEBUG, INFO, WARNING, ERROR, CRITICAL).

    Retorno:
        None
    """
    try:
        # 1. Criação do diretório de logs
        os.makedirs(diretorio_logs, exist_ok=True)

        # 2. Formato do log
        formato_log = logging.Formatter(
            "%(asctime)s - [%(levelname)s] - %(name)s - %(message)s"
        )

        # 3. Configuração do handler de arquivo com rotação
        caminho_log = os.path.join(diretorio_logs, nome_arquivo)
        handler_arquivo = RotatingFileHandler(
            caminho_log, maxBytes=5 * 1024 * 1024, backupCount=3
        )
        handler_arquivo.setFormatter(formato_log)

        # 4. Handler de console
        handler_console = logging.StreamHandler()
        handler_console.setFormatter(formato_log)

        # 5. Logger principal
        logger = logging.getLogger()
        if not logger.hasHandlers():
            logger.setLevel(nivel)
            logger.addHandler(handler_arquivo)
            logger.addHandler(handler_console)

        logging.info(f"Ambiente de logs configurado com sucesso em: {caminho_log}")

    except Exception as e:
        print(f"Erro ao configurar logs: {str(e)}")
        raise

# Executando a configuração inicial de logs
logger_custom()

# 5. Suprimindo avisos irrelevantes
warnings.filterwarnings("ignore", category=UserWarning, module="sqlalchemy")
warnings.filterwarnings("ignore", category=FutureWarning)

# 6. Adicionando caminho da pasta 'scripts' ao sys.path (caso uso de módulos auxiliares)
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..", "scripts")))

# 7. Importação segura de credenciais de banco (PostgreSQL)
try:
    from config import banco, host, senha, usuario
except ModuleNotFoundError:
    logging.error(
        "Arquivo 'config.py' não encontrado. Verifique o caminho e tente novamente."
    )
    raise
2025-07-13 16:03:17,645 - [INFO] - Ambiente customizado de logs configurado com sucesso em: logs\pipeline.log
In [2]:
# Função auxiliar para apresentar moldura de texto no console

def moldura_texto(texto):
    # Lista que não serão capitalizadas
    preposicoes = [
        "de",
        "da",
        "do",
        "das",
        "dos",
        "em",
        "para",
        "com",
        "por",
        "sem",
        "sob",
        "sobre",
        "as",
        "a",
        "o",
        "e",
        "à",
        "às",
        "ao",
        "no",
        "na",
        "nos",
        "nas",
    ]

    # Formatamos o texto inicial maiúscula, exceto preposições
    palavras = texto.split()
    texto_formatado = " ".join(
        palavra.capitalize() if palavra.lower() not in preposicoes else palavra
        for palavra in palavras
    )

    # adicionamos negrito usando ANSI Escape Codes
    texto_negrito = f"\033[1m{texto_formatado}\033[0m"

    # adicionamos moldura ao texto
    moldura = "-" * 23
    resultado = f"\n{moldura}>>>> {texto_negrito} <<<<{moldura}\n"

    print(resultado)

3.2 Visualização da Estrutura Interna do Pipeline

Para compreender melhor a arquitetura lógica deste pipeline, disponiblizamos no botão abaixo para visualização o mapa interativo de dependências técnicas utilizadas ao longo do projeto.

×
📦 Estrutura do Pipeline de Integração e Processamento de Dados

pipeline_de_integracao_e_processamento_de_dados
├── [3.1] configurar_logs() → Configura logs com rotação automática e nível DEBUG
├── [4.1] carregar_dados() → Extrai dados de PostgreSQL, SQLite, CSV, JSON
│    └── Usa credenciais de config.py
├── [4.3] transformar_dados() → Padroniza canais, tipo_acesso, cria faixa_idade, faixa_valor, ano
├── [4.4] tratar_qualidade_dados() → Remove duplicatas, preenche nulos, corrige outliers, valida e-mails
├── [4.5] unir_bases() → Realiza inner/left joins, valida chaves e calcula perdas
├── [4.6] gerar_resumo_estatistico() → Analisa distribuições, médias, intervalos por coluna
├── [4.7] validar_tratar_dados_integrados() → Reintegra registros ausentes e ajusta tipos
├── [4.8] validar_e_testar_qualidade() → Finaliza preenchimento e confirma integridade da base
├── [4.9] armazenar_dados() → Exporta os dados tratados em formatos CSV, JSON e Parquet
├── [5] gerar_backup() → Salva versão histórica com timestamp
├── [6] concluir_pipeline() → Gera relatório automatizado (relatorio_pipeline.txt)
│    └── Executa auditoria via auditar_qualidade_dados()
└── [7] Estrutura de Arquivos → logs/, dados/output/, dados/backup/

🧾 Arquivos Suplementares:
- config.py → Fornece credenciais PostgreSQL
- validacao_dados.py → Regras de estrutura e negócio para auditoria final
    

4. Carregamento de Dados

Neste bloco vamos estabelecer conexões com diferentes fontes de dados, como bancos relacionais e arquivos estruturados. Também vamos validar a existência dos arquivos e formatos, registrando logs detalhados para monitorar cada etapa do carregamento, garantindo a integridade e a disponibilidade das informações.

In [3]:
# Função para carregar dados de múltiplas fontes (SQLite, PostgreSQL, CSV, JSON)


def carregar_dados(fonte, tipo, query=None, caminho=None):
    """
    Carrega dados de diferentes formatos e origens.

    Parâmetros:
        fonte (str): Nome da fonte de dados (para logs).
        tipo (str): Tipo da origem: 'sqlite', 'postgresql', 'csv' ou 'json'.
        query (str): Consulta SQL (obrigatória para banco).
        caminho (str): Caminho do arquivo (obrigatório para arquivo).

    Retorno:
        pd.DataFrame: base de dados carregada.
    """
    try:
        if tipo in ["sqlite", "csv", "json"]:
            if caminho is None:
                raise ValueError(f"Caminho é obrigatório para a fonte '{tipo}'.")
            caminho = Path(caminho).resolve()
            if not caminho.exists():
                raise FileNotFoundError(f"Arquivo '{caminho}' não encontrado.")

        df = None

        if tipo == "sqlite":
            conn = sqlite3.connect(str(caminho))
            df = pd.read_sql(query, conn)
            conn.close()
            log_msg = f"SQLite ({fonte}) carregado com sucesso!"

        elif tipo == "postgresql":
            engine = create_engine(f"postgresql://{usuario}:{senha}@{host}/{banco}")
            df = pd.read_sql(query, engine)
            log_msg = f"PostgreSQL ({fonte}) carregado com sucesso!"

        elif tipo == "csv":
            df = pd.read_csv(str(caminho))
            log_msg = f"Arquivo CSV ({fonte}) carregado com sucesso!"

        elif tipo == "json":
            df = pd.read_json(caminho, lines=True)
            log_msg = f"Arquivo JSON ({fonte}) carregado com sucesso!"

        else:
            raise ValueError(f"Tipo '{tipo}' não suportado.")

        logging.info(log_msg)
        return df

    except Exception as e:
        logging.error(f"Erro ao carregar dados de {fonte}: {e}")
        raise


# Carregamento das bases
clientes_df = carregar_dados(
    fonte="Clientes do SQLite",
    tipo="sqlite",
    query="SELECT * FROM clientes",
    caminho="dados/clientes.db",
)

vendas_df = carregar_dados(
    fonte="Vendas do PostgreSQL", tipo="postgresql", query="SELECT * FROM vendas"
)

marketing_df = carregar_dados(
    fonte="Marketing do arquivo CSV", tipo="csv", caminho="dados/marketing.csv"
)

acessos_df = carregar_dados(
    fonte="Histórico de Acessos do JSON",
    tipo="json",
    caminho="dados/historico_acessos.json",
)

# Moldura e resumo das dimensões
moldura_texto("Resumo dos Dados Carregados:")
print(f"\nClientes: {clientes_df.shape}")
print(f"Vendas: {vendas_df.shape}")
print(f"Marketing: {marketing_df.shape}")
print(f"Histórico de Acessos: {acessos_df.shape}")
2025-07-13 16:03:17,929 - [INFO] - SQLite (Clientes do SQLite) carregado com sucesso!
2025-07-13 16:03:18,084 - [INFO] - PostgreSQL (Vendas do PostgreSQL) carregado com sucesso!
2025-07-13 16:03:18,093 - [INFO] - Arquivo CSV (Marketing do arquivo CSV) carregado com sucesso!
2025-07-13 16:03:18,109 - [INFO] - Arquivo JSON (Histórico de Acessos do JSON) carregado com sucesso!
----------------------->>>> Resumo dos Dados Carregados: <<<<-----------------------


Clientes: (5000, 4)
Vendas: (5000, 4)
Marketing: (5000, 5)
Histórico de Acessos: (5000, 4)

4.2 Exploração da Estrutura Inicial das Tabelas Carregadas

Agora que já carregamos os dados, vamos explorar a estrutura das tabelas carregadas. Também vamos analisar a quantidade de registros, os tipos de dados e os valores nulos em cada coluna, garantindo que os dados estão organizados e prontos para os próximos passos do pipeline.

In [4]:
# Explorando a estrutura de cada base

moldura_texto("Informações dos dados de Clientes")
print(clientes_df.info())

moldura_texto("Informações dos dados de Vendas")
print(vendas_df.info())

moldura_texto("Informações dos dados de Marketing")
print(marketing_df.info())

moldura_texto("Informações dos dados de Acessos")
print(acessos_df.info())
----------------------->>>> Informações dos Dados de Clientes <<<<-----------------------

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5000 entries, 0 to 4999
Data columns (total 4 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   cliente_id  5000 non-null   int64 
 1   nome        5000 non-null   object
 2   idade       5000 non-null   int64 
 3   email       5000 non-null   object
dtypes: int64(2), object(2)
memory usage: 156.4+ KB
None

----------------------->>>> Informações dos Dados de Vendas <<<<-----------------------

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5000 entries, 0 to 4999
Data columns (total 4 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   venda_id    5000 non-null   int64  
 1   cliente_id  5000 non-null   int64  
 2   data_venda  5000 non-null   object 
 3   valor       5000 non-null   float64
dtypes: float64(1), int64(2), object(1)
memory usage: 156.4+ KB
None

----------------------->>>> Informações dos Dados de Marketing <<<<-----------------------

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5000 entries, 0 to 4999
Data columns (total 5 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   campanha_id  5000 non-null   int64 
 1   cliente_id   5000 non-null   int64 
 2   canal        4501 non-null   object
 3   data_envio   5000 non-null   object
 4   resposta     5000 non-null   bool  
dtypes: bool(1), int64(2), object(2)
memory usage: 161.3+ KB
None

----------------------->>>> Informações dos Dados de Acessos <<<<-----------------------

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5000 entries, 0 to 4999
Data columns (total 4 columns):
 #   Column           Non-Null Count  Dtype 
---  ------           --------------  ----- 
 0   cliente_id       5000 non-null   int64 
 1   data_acesso      5000 non-null   int64 
 2   tipo_acesso      5000 non-null   object
 3   duracao_minutos  5000 non-null   int64 
dtypes: int64(3), object(1)
memory usage: 156.4+ KB
None

4.3 Transformações Avançadas com Validações e Enriquecimento de Dados

Vamos definir neste bloco a função que será utilizada posteriormente para aplicar transformações avançadas sobre os dados, incluindo padronização e enriquecimento. Dessa forma a função será invocada nas etapas finais do pipeline para garantir consistência e qualidade analítica.

In [5]:
def transformar_dados(df, nome_base):
    """
    Realiza transformações avançadas e enriquecimento dos dados.

    Parâmetros:
        df (pd.DataFrame): base a ser transformada.
        nome_base (str): nome da base para controle de logs.

    Retorno:
        pd.DataFrame: base transformada.
    """
    logging.info(f"Iniciando transformações avançadas para a base '{nome_base}'.")

    # Padronização de tipo_acesso
    if "tipo_acesso" in df.columns:
        antes = df["tipo_acesso"].nunique()
        df["tipo_acesso"] = df["tipo_acesso"].str.lower().str.capitalize()
        depois = df["tipo_acesso"].nunique()
        logging.info(
            f"Coluna 'tipo_acesso' padronizada. Valores únicos alterados de {antes} para {depois}."
        )

    # Padronização de canal
    if "canal" in df.columns:
        antes_nulos = df["canal"].isnull().sum()
        df["canal"] = df["canal"].str.upper().fillna("NÃO INFORMADO")
        depois_nulos = df["canal"].isnull().sum()
        logging.info(
            f"Coluna 'canal' padronizada. Valores nulos corrigidos: {antes_nulos - depois_nulos}."
        )

    # Coluna derivada: faixa_valor
    if "valor" in df.columns:
        df["faixa_valor"] = pd.cut(
            df["valor"], bins=[0, 100, 500, 1000], labels=["Baixo", "Médio", "Alto"]
        )
        logging.info("Coluna derivada 'faixa_valor' criada com faixas definidas.")

    # Coluna derivada: ano
    if "data_venda" in df.columns:
        df["ano"] = pd.to_datetime(df["data_venda"], errors="coerce").dt.year
        logging.info("Coluna 'ano' criada com base na 'data_venda'.")

    # Coluna derivada: faixa_idade
    if "idade" in df.columns:
        df["faixa_idade"] = pd.cut(
            df["idade"],
            bins=[18, 30, 45, 60, 100],
            labels=["Jovem", "Adulto", "Meia Idade", "Idoso"],
        )
        logging.info("Coluna derivada 'faixa_idade' criada para segmentação etária.")

    novas_colunas = [
        col for col in ["faixa_valor", "ano", "faixa_idade"] if col in df.columns
    ]
    logging.info(
        f"Transformações concluídas para '{nome_base}'. Novas colunas criadas: {novas_colunas}"
    )

    return df

4.4 Limpeza Individual das Bases

Vamos realizar neste bloco a limpeza individual de cada base de dados. Vamos tratar duplicatas, corrigir valores ausentes e identificar outliers utilizando métodos estatísticos, como Z-Score e Isolation Forest. Também vamos ajustar os tipos de dados para manter a consistência e a qualidade das informações processadas.

In [6]:
def tratar_qualidade_dados(
    df, nome_base, contexto="base_separada", metodo_outlier="zscore", limiar_zscore=3
):
    """
    Realiza análise e tratamento de qualidade dos dados.

    Parâmetros:
        df (pd.DataFrame): base a ser tratada.
        nome_base (str): nome identificador para logs.
        contexto (str): 'base_separada' ou 'base_integrada'.
        metodo_outlier (str): 'zscore' ou 'isolation_forest'.
        limiar_zscore (float): limite para z-score (default = 3).

    Retorno:
        pd.DataFrame: base tratada e validada.
    """
    if df is None or df.empty:
        logging.warning(
            f"Base '{nome_base}' está nula ou vazia. Tratamento será ignorado."
        )
        return df

    logging.info(
        f"Iniciando tratamento de qualidade para {nome_base.upper()} ({contexto})."
    )

    # 1. Remoção de duplicatas
    duplicatas = df.duplicated().sum()
    if duplicatas > 0:
        df = df.drop_duplicates()
        logging.info(f"{duplicatas} duplicatas removidas na base '{nome_base}'.")

    # 2. Tratamento de valores ausentes
    nulos = df.isnull().sum()
    if nulos.any():
        logging.info(f"Valores ausentes detectados em {nome_base}:\n{nulos[nulos > 0]}")
        for coluna in df.columns:
            if df[coluna].isnull().sum() > 0:
                if df[coluna].dtype in ["float64", "int64"]:
                    valor = df[coluna].median()
                    df[coluna].fillna(valor, inplace=True)
                    logging.info(
                        f"Preenchido com mediana ({valor}) os nulos da coluna '{coluna}'."
                    )
                elif df[coluna].dtype == "object":
                    df[coluna].fillna("Não Informado", inplace=True)
                    logging.info(
                        f"Preenchido com 'Não Informado' os nulos da coluna '{coluna}'."
                    )

    # 3. Tratamento de outliers
    if metodo_outlier == "zscore":
        for col in df.select_dtypes(include=["float64", "int64"]).columns:
            z = zscore(df[col])
            outliers = np.abs(z) > limiar_zscore
            if outliers.sum() > 0:
                df.loc[outliers, col] = df[col].median()
                logging.info(f"Outliers corrigidos na coluna '{col}' com Z-Score.")

    elif metodo_outlier == "isolation_forest":
        modelo = IsolationForest(contamination=0.05, random_state=42)
        for col in df.select_dtypes(include=["float64", "int64"]).columns:
            outliers = modelo.fit_predict(df[[col]]) == -1
            if outliers.sum() > 0:
                df.loc[outliers, col] = df[col].median()
                logging.info(
                    f"Outliers corrigidos na coluna '{col}' com Isolation Forest."
                )

    # 4. Regras específicas para base integrada
    if contexto == "base_integrada":
        if "duracao_minutos" in df.columns:
            df["duracao_minutos"].fillna(0, inplace=True)
            logging.info("Campo 'duracao_minutos' preenchido com 0.")
        if "resposta" in df.columns:
            df["resposta"].fillna("Não", inplace=True)
            logging.info("Campo 'resposta' preenchido com 'Não'.")

    # 5. Faixa de restrição para colunas críticas
    if "idade" in df.columns:
        df["idade"] = df["idade"].clip(lower=18, upper=100)
        logging.info("Limites aplicados na coluna 'idade' (18 a 100).")
    if "valor" in df.columns:
        df["valor"] = df["valor"].clip(lower=0, upper=1000)
        logging.info("Limites aplicados na coluna 'valor' (0 a 1000).")

    # 6. Validação simples de e-mails
    if "email" in df.columns:
        emails_invalidos = df[~df["email"].astype(str).str.contains("@", na=False)]
        if not emails_invalidos.empty:
            logging.warning(
                f"E-mails inválidos detectados: {len(emails_invalidos)} registros."
            )
            df["email"] = df["email"].apply(
                lambda x: None if isinstance(x, str) and "@" not in x else x
            )

    logging.info(f"Tratamento concluído para '{nome_base.upper()}' ({contexto}).")
    return df


# Aplicação da limpeza individual nas bases separadas
clientes_df = tratar_qualidade_dados(clientes_df, "Clientes", contexto="base_separada")
vendas_df = tratar_qualidade_dados(
    vendas_df, "Vendas", contexto="base_separada", metodo_outlier="isolation_forest"
)
marketing_df = tratar_qualidade_dados(
    marketing_df, "Marketing", contexto="base_separada"
)
acessos_df = tratar_qualidade_dados(
    acessos_df, "Acessos", contexto="base_separada", metodo_outlier="isolation_forest"
)
2025-07-13 16:03:19,005 - [INFO] - Iniciando tratamento de qualidade para CLIENTES (base_separada).
2025-07-13 16:03:19,014 - [INFO] - Limites aplicados na coluna 'idade' (18 a 100).
2025-07-13 16:03:19,020 - [INFO] - Tratamento concluído para 'CLIENTES' (base_separada).
2025-07-13 16:03:19,022 - [INFO] - Iniciando tratamento de qualidade para VENDAS (base_separada).
2025-07-13 16:03:19,176 - [INFO] - Outliers corrigidos na coluna 'venda_id' com Isolation Forest.
2025-07-13 16:03:19,363 - [INFO] - Outliers corrigidos na coluna 'cliente_id' com Isolation Forest.
2025-07-13 16:03:19,547 - [INFO] - Outliers corrigidos na coluna 'valor' com Isolation Forest.
2025-07-13 16:03:19,552 - [INFO] - Limites aplicados na coluna 'valor' (0 a 1000).
2025-07-13 16:03:19,554 - [INFO] - Tratamento concluído para 'VENDAS' (base_separada).
2025-07-13 16:03:19,558 - [INFO] - Iniciando tratamento de qualidade para MARKETING (base_separada).
2025-07-13 16:03:19,564 - [INFO] - Valores ausentes detectados em Marketing:
canal    499
dtype: int64
2025-07-13 16:03:19,567 - [INFO] - Preenchido com 'Não Informado' os nulos da coluna 'canal'.
2025-07-13 16:03:19,571 - [INFO] - Tratamento concluído para 'MARKETING' (base_separada).
2025-07-13 16:03:19,574 - [INFO] - Iniciando tratamento de qualidade para ACESSOS (base_separada).
2025-07-13 16:03:19,768 - [INFO] - Outliers corrigidos na coluna 'cliente_id' com Isolation Forest.
2025-07-13 16:03:19,957 - [INFO] - Outliers corrigidos na coluna 'data_acesso' com Isolation Forest.
2025-07-13 16:03:20,135 - [INFO] - Outliers corrigidos na coluna 'duracao_minutos' com Isolation Forest.
2025-07-13 16:03:20,138 - [INFO] - Tratamento concluído para 'ACESSOS' (base_separada).

4.5 União Dinâmica e Flexível de Bases para Análises Completas

Chegamos ao momento de unir as bases carregadas em uma estrutura integrada para futuras análises mais completas. Antes da união, vamos garantir a consistência dos tipos de dados nas chaves, aplicando conversões explícitas para inteiros. Também vamos remover duplicatas mantendo apenas a primeira ocorrência, preservando a integridade dos registros. Após a união, vamos verificar se existem registros que não possuem correspondência entre as bases e registrar essas informações nos logs para futura análise.

In [7]:
def unir_bases(
    base_esquerda,
    base_direita,
    chave,
    tipo_uniao="inner",
    nome_base_esquerda="Base Esquerda",
    nome_base_direita="Base Direita",
):
    """
    Realiza a união dinâmica entre duas bases de dados.

    Parâmetros:
        base_esquerda (pd.DataFrame): primeira base.
        base_direita (pd.DataFrame): segunda base.
        chave (list): lista de colunas-chave para união.
        tipo_uniao (str): tipo de união ('inner', 'left', etc.).
        nome_base_esquerda (str): nome da base 1 para logs.
        nome_base_direita (str): nome da base 2 para logs.

    Retorno:
        pd.DataFrame: base integrada.
    """
    logger = logging.getLogger()
    logger.info(f"Iniciando união entre '{nome_base_esquerda}' e '{nome_base_direita}'.")

    try:
        # Diagnóstico inicial
        if base_esquerda is None or base_direita is None:
            logger.error("Uma das bases fornecidas é 'None'.")
            raise ValueError("Base nula identificada.")

        if base_esquerda.empty:
            logger.warning(f"A base '{nome_base_esquerda}' está vazia.")
        else:
            logger.info(f"Base '{nome_base_esquerda}' possui {base_esquerda.shape[0]} registros.")

        if base_direita.empty:
            logger.warning(f"A base '{nome_base_direita}' está vazia.")
        else:
            logger.info(f"Base '{nome_base_direita}' possui {base_direita.shape[0]} registros.")

        # Validação das chaves
        chaves_invalidas = [
            col for col in chave
            if col not in base_esquerda.columns or col not in base_direita.columns
        ]
        if chaves_invalidas:
            logger.error(f"Chaves inválidas: {chaves_invalidas}")
            raise ValueError(f"Chaves não encontradas: {chaves_invalidas}")

        logger.info(f"Chaves validadas: {chave}")

        # Padronização de tipo e remoção de duplicatas
        for col in chave:
            base_esquerda[col] = base_esquerda[col].astype(str)
            base_direita[col] = base_direita[col].astype(str)
            logger.info(f"Coluna '{col}' convertida para string.")

        antes_esq = base_esquerda.shape[0]
        antes_dir = base_direita.shape[0]

        base_esquerda = base_esquerda.drop_duplicates(subset=chave, keep="first")
        base_direita = base_direita.drop_duplicates(subset=chave, keep="first")

        logger.info(f"Duplicatas removidas em '{nome_base_esquerda}': {antes_esq - base_esquerda.shape[0]}")
        logger.info(f"Duplicatas removidas em '{nome_base_direita}': {antes_dir - base_direita.shape[0]}")

        # União
        base_integrada = pd.merge(
            base_esquerda,
            base_direita,
            on=chave,
            how=tipo_uniao,
            indicator=True
        )
        logger.info(f"União '{tipo_uniao}' realizada com sucesso.")

        # Diagnóstico com nível personalizado AUDIT
        logger.audit(f"Registros apenas na base esquerda: {(base_integrada['_merge'] == 'left_only').sum()}")
        logger.audit(f"Registros apenas na base direita: {(base_integrada['_merge'] == 'right_only').sum()}")

        # Limpeza do indicador
        base_integrada.drop(columns=["_merge"], inplace=True)
        logger.info(f"Base integrada contém {base_integrada.shape[0]} registros após união.")

        return base_integrada

    except Exception as e:
        logger.error(f"Erro ao unir bases: {str(e)}")
        raise

# Execucao das Oniões
# 🔹 1ª união: Vendas + Clientes
dados_integrados = unir_bases(
    vendas_df,
    clientes_df,
    chave=["cliente_id"],
    tipo_uniao="inner",
    nome_base_esquerda="Vendas",
    nome_base_direita="Clientes",
)

# 🔹 2ª união: + Acessos
dados_integrados = unir_bases(
    dados_integrados,
    acessos_df,
    chave=["cliente_id"],
    tipo_uniao="left",
    nome_base_esquerda="Dados Integrados",
    nome_base_direita="Acessos",
)

# 🔹 3ª união: + Marketing
dados_integrados = unir_bases(
    dados_integrados,
    marketing_df,
    chave=["cliente_id"],
    tipo_uniao="left",
    nome_base_esquerda="Dados Integrados",
    nome_base_direita="Marketing",
)
2025-07-13 16:03:20,156 - [INFO] - Iniciando união entre 'Vendas' e 'Clientes'.
2025-07-13 16:03:20,160 - [INFO] - Base 'Vendas' possui 5000 registros.
2025-07-13 16:03:20,166 - [INFO] - Base 'Clientes' possui 5000 registros.
2025-07-13 16:03:20,169 - [INFO] - Chaves validadas: ['cliente_id']
2025-07-13 16:03:20,179 - [INFO] - Coluna 'cliente_id' convertida para string.
2025-07-13 16:03:20,187 - [INFO] - Duplicatas removidas em 'Vendas': 1995
2025-07-13 16:03:20,190 - [INFO] - Duplicatas removidas em 'Clientes': 0
2025-07-13 16:03:20,201 - [INFO] - União 'inner' realizada com sucesso.
2025-07-13 16:03:20,204 - [AUDIT] - Registros apenas na base esquerda: 0
2025-07-13 16:03:20,208 - [AUDIT] - Registros apenas na base direita: 0
2025-07-13 16:03:20,212 - [INFO] - Base integrada contém 3005 registros após união.
2025-07-13 16:03:20,214 - [INFO] - Iniciando união entre 'Dados Integrados' e 'Acessos'.
2025-07-13 16:03:20,217 - [INFO] - Base 'Dados Integrados' possui 3005 registros.
2025-07-13 16:03:20,219 - [INFO] - Base 'Acessos' possui 5000 registros.
2025-07-13 16:03:20,222 - [INFO] - Chaves validadas: ['cliente_id']
2025-07-13 16:03:20,227 - [INFO] - Coluna 'cliente_id' convertida para string.
2025-07-13 16:03:20,231 - [INFO] - Duplicatas removidas em 'Dados Integrados': 0
2025-07-13 16:03:20,233 - [INFO] - Duplicatas removidas em 'Acessos': 2044
2025-07-13 16:03:20,242 - [INFO] - União 'left' realizada com sucesso.
2025-07-13 16:03:20,244 - [AUDIT] - Registros apenas na base esquerda: 1163
2025-07-13 16:03:20,246 - [AUDIT] - Registros apenas na base direita: 0
2025-07-13 16:03:20,249 - [INFO] - Base integrada contém 3005 registros após união.
2025-07-13 16:03:20,251 - [INFO] - Iniciando união entre 'Dados Integrados' e 'Marketing'.
2025-07-13 16:03:20,253 - [INFO] - Base 'Dados Integrados' possui 3005 registros.
2025-07-13 16:03:20,256 - [INFO] - Base 'Marketing' possui 5000 registros.
2025-07-13 16:03:20,258 - [INFO] - Chaves validadas: ['cliente_id']
2025-07-13 16:03:20,264 - [INFO] - Coluna 'cliente_id' convertida para string.
2025-07-13 16:03:20,271 - [INFO] - Duplicatas removidas em 'Dados Integrados': 0
2025-07-13 16:03:20,274 - [INFO] - Duplicatas removidas em 'Marketing': 1811
2025-07-13 16:03:20,287 - [INFO] - União 'left' realizada com sucesso.
2025-07-13 16:03:20,291 - [AUDIT] - Registros apenas na base esquerda: 1082
2025-07-13 16:03:20,292 - [AUDIT] - Registros apenas na base direita: 0
2025-07-13 16:03:20,298 - [INFO] - Base integrada contém 3005 registros após união.

4.6 Geração de Resumo Estatístico Pós-Processamento

Vamos gerar neste bloco resumo estatístico detalhado dos dados integrados. Esse processo inclui medidas como média, mediana, valores mínimos e máximos, além de verificar a distribuição dos dados. Essa análise nos permite identificar padrões, variações e possíveis inconsistências antes de avançarmos para as etapas finais do pipeline.

In [8]:
def gerar_resumo_estatistico(df, nome_base="Base Integrada"):
    """
    Gera resumo estatístico de colunas numéricas, categóricas e temporais.

    Parâmetros:
        df (pd.DataFrame): base de dados.
        nome_base (str): nome da base para título e logs.

    Retorno:
        None
    """
    logger = logging.getLogger()

    if df is None or df.empty:
        logger.warning(f"A base '{nome_base}' está vazia. Resumo estatístico não será gerado.")
        print(f"\nA base '{nome_base}' está vazia.\n")
        return

    logger.info(f"Gerando resumo estatístico para '{nome_base}' com {df.shape[0]} registros.")
    moldura_texto(f"Resumo Estatístico - {nome_base}")

    # Separar colunas por tipo
    colunas_numericas = df.select_dtypes(include=["number"]).columns.tolist()
    colunas_categoricas = df.select_dtypes(include=["object", "category"]).columns.tolist()
    colunas_datetime = df.select_dtypes(include=["datetime64"]).columns.tolist()

    # Resumo geral
    print("Estatísticas descritivas (geral):\n")
    print(df[colunas_numericas + colunas_categoricas].describe(include="all"))

    # Específicos para datas
    for col in colunas_datetime:
        print(f"\nIntervalo de datas na coluna '{col}':")
        print(df[col].describe(datetime_is_numeric=False))
        logger.info(f"Estatísticas temporais da coluna '{col}' exibidas.")

    # Resumo por coluna de interesse
    colunas_detalhadas = ["duracao_minutos", "valor", "idade"]
    for coluna in colunas_detalhadas:
        if coluna in df.columns:
            print(f"\nIntervalo de '{coluna}':")
            print(df[coluna].describe())
            logger.audit(f"Resumo estatístico de '{coluna}' gerado com distribuição, média e faixa.")


gerar_resumo_estatistico(dados_integrados, nome_base="Base Integrada Atualizada")
2025-07-13 16:03:20,498 - [INFO] - Gerando resumo estatístico para 'Base Integrada Atualizada' com 3005 registros.
----------------------->>>> Resumo Estatístico - Base Integrada Atualizada <<<<-----------------------

Estatísticas descritivas (geral):

           venda_id        valor        idade   data_acesso  duracao_minutos  \
count   3005.000000  3005.000000  3005.000000  1.842000e+03      1842.000000   
unique          NaN          NaN          NaN           NaN              NaN   
top             NaN          NaN          NaN           NaN              NaN   
freq            NaN          NaN          NaN           NaN              NaN   
mean    2125.873877   501.098988    46.323794  1.719141e+12        92.598263   
std     1305.861497   265.967801    16.495449  1.708510e+10        46.891305   
min      114.000000    33.570000    18.000000  1.688256e+12         9.000000   
25%      982.000000   276.070000    32.000000  1.704413e+12        54.000000   
50%     2039.000000   506.110000    46.000000  1.719878e+12        92.000000   
75%     3083.000000   727.390000    61.000000  1.733789e+12       132.000000   
max     4864.000000   968.500000    74.000000  1.749082e+12       175.000000   

        campanha_id cliente_id  data_venda         nome                email  \
count   1923.000000       3005        3005         3005                 3005   
unique          NaN       3005        1460         2888                 2960   
top             NaN       2768  2020-12-30  Gael Fogaça  qbarros@example.com   
freq            NaN          1           7            2                    3   
mean    2107.047322        NaN         NaN          NaN                  NaN   
std     1418.614829        NaN         NaN          NaN                  NaN   
min        3.000000        NaN         NaN          NaN                  NaN   
25%      865.000000        NaN         NaN          NaN                  NaN   
50%     1906.000000        NaN         NaN          NaN                  NaN   
75%     3273.000000        NaN         NaN          NaN                  NaN   
max     5000.000000        NaN         NaN          NaN                  NaN   

       tipo_acesso  canal  data_envio resposta  
count         1842   1923        1923     1923  
unique           2      4         905        2  
top            Web  EMAIL  2023-05-27     True  
freq           924    807           7      990  
mean           NaN    NaN         NaN      NaN  
std            NaN    NaN         NaN      NaN  
min            NaN    NaN         NaN      NaN  
25%            NaN    NaN         NaN      NaN  
50%            NaN    NaN         NaN      NaN  
75%            NaN    NaN         NaN      NaN  
max            NaN    NaN         NaN      NaN  

Intervalo de 'duracao_minutos':
count    1842.000000
mean       92.598263
std        46.891305
min         9.000000
25%        54.000000
50%        92.000000
75%       132.000000
max       175.000000
Name: duracao_minutos, dtype: float64
2025-07-13 16:03:20,539 - [AUDIT] - Resumo estatístico de 'duracao_minutos' gerado com distribuição, média e faixa.
Intervalo de 'valor':
count    3005.000000
mean      501.098988
std       265.967801
min        33.570000
25%       276.070000
50%       506.110000
75%       727.390000
max       968.500000
Name: valor, dtype: float64
2025-07-13 16:03:20,547 - [AUDIT] - Resumo estatístico de 'valor' gerado com distribuição, média e faixa.
Intervalo de 'idade':
count    3005.000000
mean       46.323794
std        16.495449
min        18.000000
25%        32.000000
50%        46.000000
75%        61.000000
max        74.000000
Name: idade, dtype: float64
2025-07-13 16:03:20,555 - [AUDIT] - Resumo estatístico de 'idade' gerado com distribuição, média e faixa.

4.7 Validação Pós-Integração e Tratamento de Dados Consolidados

Vamos dar uma última revisada nos dados integrados, verificando se tudo está no lugar e corrigindo qualquer inconsistência que tenha passado despercebida. Neste bloco, também vamos reintegrar clientes que ficaram de fora da base consolidada, preencher campos com valores padrão, ajustar tipos de dados e revisar colunas críticas para garantir que a estrutura final esteja pronta para os estágios de validação final e exportação.

In [9]:
def validar_tratar_dados_integrados(dados_integrados, clientes_df):
    """
    Valida e ajusta a base integrada final, assegurando cobertura de clientes,
    tipos corretos e consistência estrutural.

    Parâmetros:
        dados_integrados (pd.DataFrame): base integrada resultante das uniões.
        clientes_df (pd.DataFrame): base de clientes original.

    Retorno:
        pd.DataFrame: base validada e ajustada.
    """
    logger = logging.getLogger()

    if dados_integrados is None or dados_integrados.empty:
        logger.warning(
            "A base integrada está vazia. Tentativa de reintegração será forçada."
        )
        dados_integrados = pd.DataFrame()

    logger.info(
        "Iniciando Validação Pós-Integração e Tratamento de Dados Consolidados."
    )

    # 1. Verificação de cobertura dos clientes
    ids_integrados = (
        dados_integrados["cliente_id"].nunique()
        if "cliente_id" in dados_integrados.columns
        else 0
    )
    ids_origem = clientes_df["cliente_id"].nunique()

    if ids_integrados != ids_origem:
        logger.audit(
            f"Inconsistência detectada: {ids_origem} clientes na origem, {ids_integrados} na base integrada."
        )
        ids_ausentes = set(clientes_df["cliente_id"]) - set(
            dados_integrados["cliente_id"]
        )

        if ids_ausentes:
            registros_ausentes = clientes_df[
                clientes_df["cliente_id"].isin(ids_ausentes)
            ].copy()
            registros_ausentes = registros_ausentes.assign(
                venda_id=np.nan,
                valor=np.nan,
                data_venda="Desconhecido",
                campanha_id="Não Participou",
                canal="Não Informado",
                resposta="Não",
                data_envio=pd.Timestamp("1970-01-01"),
                data_acesso=pd.Timestamp("1970-01-01"),
                tipo_acesso="Outros",
                duracao_minutos=0,
            )

            # Garante que todas as colunas estejam presentes
            for col in dados_integrados.columns:
                if col not in registros_ausentes.columns:
                    registros_ausentes[col] = np.nan

            registros_ausentes = registros_ausentes[dados_integrados.columns]
            dados_integrados = pd.concat(
                [dados_integrados, registros_ausentes], ignore_index=True
            )

            logger.info(
                f"Reintegração de {len(ids_ausentes)} registros ausentes concluída."
            )

    # 2. Remoção de duplicatas por e-mail
    if "email" in dados_integrados.columns:
        duplicados_email = dados_integrados.duplicated(subset=["email"]).sum()
        if duplicados_email > 0:
            dados_integrados = dados_integrados.drop_duplicates(
                subset=["email"], keep="first"
            )
            logger.info(f"{duplicados_email} duplicatas removidas na coluna 'email'.")

    # 3. Remoção de duplicatas por cliente_id
    if "cliente_id" in dados_integrados.columns:
        duplicados_id = dados_integrados.duplicated(subset=["cliente_id"]).sum()
        if duplicados_id > 0:
            dados_integrados = dados_integrados.drop_duplicates(
                subset=["cliente_id"], keep="first"
            )
            logger.info(f"{duplicados_id} duplicatas removidas na chave 'cliente_id'.")

    # 4. Conversão da coluna data_venda para datetime
    if "data_venda" in dados_integrados.columns:
        dados_integrados["data_venda"] = pd.to_datetime(
            dados_integrados["data_venda"], errors="coerce"
        )
        dados_integrados["data_venda"].fillna(pd.Timestamp("1970-01-01"), inplace=True)
        logger.info(
            "Coluna 'data_venda' convertida para datetime e valores inválidos corrigidos."
        )

    # 5. Preenchimento padrão em colunas críticas
    preenchimento_default = {
        "canal": "Não Informado",
        "resposta": "Não",
        "tipo_acesso": "Outros",
        "duracao_minutos": 0,
        "campanha_id": "Não Participou",
        "data_envio": "1970-01-01",
        "data_acesso": "1970-01-01",
    }
    dados_integrados.fillna(preenchimento_default, inplace=True)
    logger.info(
        "Campos críticos com valores ausentes preenchidos com padrões definidos."
    )

    # 6. Conversão e validação de tipos finais
    tipos_esperados = {
        "cliente_id": "int64",
        "idade": "int64",
        "valor": "float64",
        "email": "object",
        "data_venda": "datetime64[ns]",
    }

    for col, tipo in tipos_esperados.items():
        if col in dados_integrados.columns:
            try:
                if tipo == "int64":
                    dados_integrados[col] = (
                        pd.to_numeric(dados_integrados[col], errors="coerce")
                        .fillna(0)
                        .astype(int)
                    )
                elif tipo == "float64":
                    dados_integrados[col] = pd.to_numeric(
                        dados_integrados[col], errors="coerce"
                    ).astype(float)
                elif tipo == "datetime64[ns]":
                    dados_integrados[col] = pd.to_datetime(
                        dados_integrados[col], errors="coerce"
                    ).fillna(pd.Timestamp("1970-01-01"))
                else:
                    dados_integrados[col] = dados_integrados[col].astype(tipo)
                logger.info(f"Coluna '{col}' convertida para tipo '{tipo}'.")
            except Exception as e:
                logger.warning(f"Falha ao converter '{col}' para '{tipo}': {str(e)}")

    logger.info(
        f"Validação pós-integração concluída. Total de registros finais: {len(dados_integrados)}"
    )
    return dados_integrados


# Aplicação da validação pós-integração
dados_integrados = validar_tratar_dados_integrados(dados_integrados, clientes_df)
2025-07-13 16:03:20,686 - [INFO] - Iniciando Validação Pós-Integração e Tratamento de Dados Consolidados.
2025-07-13 16:03:20,692 - [AUDIT] - Inconsistência detectada: 5000 clientes na origem, 3005 na base integrada.
2025-07-13 16:03:20,711 - [INFO] - Reintegração de 1995 registros ausentes concluída.
2025-07-13 16:03:20,713 - [INFO] - 119 duplicatas removidas na coluna 'email'.
2025-07-13 16:03:20,774 - [INFO] - Coluna 'data_venda' convertida para datetime e valores inválidos corrigidos.
2025-07-13 16:03:20,782 - [INFO] - Campos críticos com valores ausentes preenchidos com padrões definidos.
2025-07-13 16:03:20,789 - [INFO] - Coluna 'cliente_id' convertida para tipo 'int64'.
2025-07-13 16:03:20,792 - [INFO] - Coluna 'idade' convertida para tipo 'int64'.
2025-07-13 16:03:20,795 - [INFO] - Coluna 'valor' convertida para tipo 'float64'.
2025-07-13 16:03:20,798 - [INFO] - Coluna 'email' convertida para tipo 'object'.
2025-07-13 16:03:20,804 - [INFO] - Coluna 'data_venda' convertida para tipo 'datetime64[ns]'.
2025-07-13 16:03:20,806 - [INFO] - Validação pós-integração concluída. Total de registros finais: 4881

4.8 Validação Final e Métricas de Qualidade da Base Consolidada

Neste bloco fazemos uma checagem completa da base consolidada antes de finalizar o pipeline. Primeiro, tratamos campos que ainda possam ter valores ausentes. Em seguida, reaplicamos as transformações derivadas, como faixa de valor, ano da venda e faixa etária, para garantir que todos os dados – inclusive os registros reintegrados – estejam padronizados. Por fim, analisamos duplicatas, validamos a unicidade das chaves e registramos métricas de volume e integridade em logs. A base está então pronta para ser exportada ou utilizada por etapas seguintes do projeto.

In [10]:
def tratar_valores_ausentes(df):
    """
    Preenche valores ausentes em campos críticos da base consolidada.
    """
    logging.info("Iniciando tratamento final de valores ausentes.")

    if "venda_id" in df.columns:
        df["venda_id"].fillna(df["venda_id"].mean(), inplace=True)

    if "valor" in df.columns:
        df["valor"].fillna(df["valor"].mean(), inplace=True)

    if "tipo_acesso" in df.columns:
        df["tipo_acesso"].fillna("Desconhecido", inplace=True)

    logging.info("Preenchimento de valores ausentes realizado com sucesso.")
    return df


def validar_e_testar_qualidade(df):
    """
    Executa testes finais de qualidade: duplicatas, métricas e valores ausentes.
    """
    logging.info("********* Iniciando Validação Final e Testes de Qualidade *********")

    if "email" in df.columns:
        duplicadas_email = df.duplicated(subset=["email"]).sum()
        logging.info(f"Duplicados restantes na coluna 'email': {duplicadas_email}")

    if "cliente_id" in df.columns:
        duplicadas_id = df.duplicated(subset=["cliente_id"]).sum()
        logging.info(f"Duplicados restantes na chave 'cliente_id': {duplicadas_id}")

    logging.info("Validação final concluída. Métricas principais:")
    logging.info(f"- Total de registros na base: {len(df)}")
    logging.info(f"- Clientes únicos: {df['cliente_id'].nunique()}")
    logging.info(
        f"- Vendas únicas: {df['venda_id'].nunique() if 'venda_id' in df.columns else 0}"
    )
    logging.info(f"- Dimensões da base: {df.shape}")

    nulos = df.isnull().sum()
    nulos_relevantes = nulos[nulos > 0]

    if not nulos_relevantes.empty:
        logging.info(f"Valores ausentes por coluna:\n{nulos_relevantes}")
    else:
        logging.info("Nenhum valor ausente encontrado nas colunas principais.")

    return df


# 1. Tratamento de nulos finais
dados_final = tratar_valores_ausentes(dados_integrados)

# 2. Reaplicação de transformações derivadas
dados_final = transformar_dados(dados_final, "Base Integrada Final")

# 3. Validação final com logs e métricas
dados_final = validar_e_testar_qualidade(dados_final)
2025-07-13 16:03:21,034 - [INFO] - Iniciando tratamento final de valores ausentes.
2025-07-13 16:03:21,040 - [INFO] - Preenchimento de valores ausentes realizado com sucesso.
2025-07-13 16:03:21,042 - [INFO] - Iniciando transformações avançadas para a base 'Base Integrada Final'.
2025-07-13 16:03:21,047 - [INFO] - Coluna 'tipo_acesso' padronizada. Valores únicos alterados de 3 para 3.
2025-07-13 16:03:21,058 - [INFO] - Coluna 'canal' padronizada. Valores nulos corrigidos: 0.
2025-07-13 16:03:21,063 - [INFO] - Coluna derivada 'faixa_valor' criada com faixas definidas.
2025-07-13 16:03:21,070 - [INFO] - Coluna 'ano' criada com base na 'data_venda'.
2025-07-13 16:03:21,074 - [INFO] - Coluna derivada 'faixa_idade' criada para segmentação etária.
2025-07-13 16:03:21,077 - [INFO] - Transformações concluídas para 'Base Integrada Final'. Novas colunas criadas: ['faixa_valor', 'ano', 'faixa_idade']
2025-07-13 16:03:21,079 - [INFO] - ********* Iniciando Validação Final e Testes de Qualidade *********
2025-07-13 16:03:21,082 - [INFO] - Duplicados restantes na coluna 'email': 0
2025-07-13 16:03:21,085 - [INFO] - Duplicados restantes na chave 'cliente_id': 0
2025-07-13 16:03:21,087 - [INFO] - Validação final concluída. Métricas principais:
2025-07-13 16:03:21,090 - [INFO] - - Total de registros na base: 4881
2025-07-13 16:03:21,093 - [INFO] - - Clientes únicos: 4881
2025-07-13 16:03:21,095 - [INFO] - - Vendas únicas: 2805
2025-07-13 16:03:21,098 - [INFO] - - Dimensões da base: (4881, 17)
2025-07-13 16:03:21,103 - [INFO] - Valores ausentes por coluna:
faixa_idade    82
dtype: int64

4.9 Armazenamento Flexível e Dinâmico dos Dados Processados

Neste bloco vamos armazenar os dados tratados em formatos distintos — como CSV, JSON e Parquet — garantindo flexibilidade para consumo futuro e interoperabilidade com outras ferramentas. Também incluímos controle de logs para assegurar rastreabilidade da operação e facilitar a identificação de falhas no salvamento.

In [11]:
def armazenar_dados(df, nome_base, formato="csv", diretorio="dados/output"):
    """
    Essa função salva os dados processados em formatos variados com controle de logs e validações.

    Parâmetros:
        df (pd.DataFrame): DataFrame a ser armazenado.
        nome_base (str): Nome do arquivo sem extensão.
        formato (str): Formato do arquivo ('csv', 'json', 'parquet').
        diretorio (str): Caminho para salvar os arquivos.

    Retorno:
        bool: True se salvo com sucesso, False caso contrário.
    """
    logger = logging.getLogger()

    if df is None or df.empty:
        logger.warning(f"O DataFrame '{nome_base}' está vazio. Nenhum arquivo foi salvo.")
        return False

    os.makedirs(diretorio, exist_ok=True)
    caminho_completo = os.path.join(diretorio, nome_base)

    try:
        if formato == "csv":
            df.to_csv(f"{caminho_completo}.csv", index=False, encoding="utf-8")
            logger.info(f"Arquivo salvo com sucesso em: {caminho_completo}.csv")

        elif formato == "json":
            df.to_json(
                f"{caminho_completo}.json",
                orient="records",
                lines=True,
                force_ascii=False
            )
            logger.info(f"Arquivo salvo com sucesso em: {caminho_completo}.json")

        elif formato == "parquet":
            df.to_parquet(f"{caminho_completo}.parquet", index=False)
            logger.info(f"Arquivo salvo com sucesso em: {caminho_completo}.parquet")

        else:
            logger.error(f"Formato '{formato}' não suportado.")
            return False

        return True

    except Exception as e:
        logger.error(f"Erro ao armazenar o arquivo '{nome_base}': {str(e)}")
        return False

armazenar_dados(clientes_df, "clientes_processados", formato="csv")
armazenar_dados(vendas_df, "vendas_processadas", formato="parquet")
armazenar_dados(marketing_df, "marketing_processado", formato="json")
armazenar_dados(acessos_df, "acessos_processados", formato="csv")
2025-07-13 16:03:21,355 - [INFO] - Arquivo salvo com sucesso em: dados/output\clientes_processados.csv
2025-07-13 16:03:21,394 - [INFO] - Arquivo salvo com sucesso em: dados/output\vendas_processadas.parquet
2025-07-13 16:03:21,407 - [INFO] - Arquivo salvo com sucesso em: dados/output\marketing_processado.json
2025-07-13 16:03:21,417 - [INFO] - Arquivo salvo com sucesso em: dados/output\acessos_processados.csv
Out[11]:
True

5. Backup Automatizado com Controle de Versão

Neste bloco vamos implementar uma rotina de backup para salvar a base consolidada com controle de versão. O nome do arquivo será gerado dinamicamente com base na data e hora da execução, garantindo a rastreabilidade dos resultados. Isso permite manter um histórico de versões da base final, facilitando comparações, auditorias e recuperação de informações em execuções futuras do pipeline.

In [12]:
def gerar_backup(
    df,
    nome_base="base_integrada",
    formato="csv",
    pasta_backup=os.path.join("dados", "backup"),
):
    """
    Realiza backup do DataFrame com controle de versão baseado em timestamp.

    Parâmetros:
        df (pd.DataFrame): base de dados consolidada.
        nome_base (str): nome-base do arquivo de saída.
        formato (str): formato de exportação ('csv', 'json', 'parquet').
        pasta_backup (str): diretório onde os backups serão armazenados.

    Retorno:
        bool: True se o backup foi concluído com sucesso, False caso contrário.
    """
    logger = logging.getLogger()

    if df is None or df.empty:
        logger.warning(f"Backup ignorado: base '{nome_base}' vazia ou inexistente.")
        return False

    try:
        os.makedirs(pasta_backup, exist_ok=True)

        # Geração do nome versionado
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        nome_arquivo = f"{nome_base}_versao_{timestamp}"
        caminho_final = os.path.join(pasta_backup, nome_arquivo)

        if formato == "csv":
            df.to_csv(f"{caminho_final}.csv", index=False, encoding="utf-8")
        elif formato == "json":
            df.to_json(
                f"{caminho_final}.json", orient="records", lines=True, force_ascii=False
            )
        elif formato == "parquet":
            df.to_parquet(f"{caminho_final}.parquet", index=False)
        else:
            logger.error(f"Formato '{formato}' não suportado para backup.")
            return False

        logger.info(f"Backup concluído com sucesso: {caminho_final}.{formato}")
        return True

    except Exception as e:
        logger.error(f"Erro ao realizar backup da base '{nome_base}': {str(e)}")
        return False


# Execução do backup com versionamento
gerar_backup(
    dados_final,
    nome_base="dados_processados",
    formato="csv",
    pasta_backup="dados/backup",
)
2025-07-13 16:03:21,762 - [INFO] - Backup concluído com sucesso: dados/backup\dados_processados_versao_20250713_160321.csv
Out[12]:
True

6. Geração Automatizada de Relatório do Pipeline

Neste bloco vamos encerrar o pipeline com uma etapa de geração automatizada de relatório. O processo inclui a captura do tempo total de execução, a avaliação do status final (sucesso ou falha) e o registro detalhado das ações realizadas, incluindo possíveis falhas de auditoria. O relatório final será salvo em formato de texto (.txt), servindo como documentação oficial da execução, permitindo rastreabilidade e facilitando auditorias futuras.

In [13]:
# Adiciona caminho do módulo externo de validação
# sys.path.append(os.path.abspath(os.path.join(os.getcwd(), 'scripts')))
sys.path.append(os.getcwd())

from validacao_dados import auditar_qualidade_dados

# Regras de estrutura e negócio
regras_estrutura = {
    "clientes_df": {
        "colunas_obrigatorias": ["cliente_id", "nome", "idade", "email"],
        "tipos_colunas": {"cliente_id": int, "nome": str, "idade": int, "email": str},
    }
}

regras_negocio = {
    "clientes_df": {
        "email_invalido": {
            "regra": "~@df['email'].str.contains(r'^[\\w\\.\\-]+@[\\w\\.\\-]+\\.\\w+$')",
            "descricao": "E-mails inválidos",
        },
        "idade_negativa": {"regra": "@df['idade'] < 0", "descricao": "Idade negativa"},
    }
}

# Função auxiliar para gerar o relatório final


def concluir_pipeline(status_execucao, logs_detalhados, tempo_inicial, tempo_final, diretorio_relatorio="logs", base_final=None):
    """
    Gera relatório completo da execução do pipeline com métricas reais.

    Parâmetros:
        status_execucao (str): "SUCESSO" ou "FALHA"
        logs_detalhados (list): Lista de mensagens de auditoria e diagnóstico
        tempo_inicial (float): Timestamp de início
        tempo_final (float): Timestamp de fim
        diretorio_relatorio (str): Caminho onde o relatório será salvo
        base_final (pd.DataFrame): Base final consolidada, usada para gerar métricas
    """

    import os
    import logging
    from datetime import datetime

    logger = logging.getLogger()
    if not logger.hasHandlers():
        logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        formatter = logging.Formatter("%(asctime)s - [%(levelname)s] - %(message)s")
        handler.setFormatter(formatter)
        logger.addHandler(handler)

    try:
        os.makedirs(diretorio_relatorio, exist_ok=True)
        tempo_total = tempo_final - tempo_inicial
        data_execucao = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        # Cabeçalho do relatório
        relatorio = f"""
===============================
RELATÓRIO DO PIPELINE DE DADOS
===============================
📅 Data de Execução  : {data_execucao}
✅ Status Final       : {status_execucao}
⏱️ Tempo Total        : {tempo_total:.2f} segundos
"""

        # Estatísticas da base final
        if base_final is not None and not base_final.empty:
            relatorio += "\n------------------------------\n📊 Métricas de Processamento\n------------------------------\n"
            relatorio += f"✔ Registros finais consolidados: {len(base_final)}\n"
            if "cliente_id" in base_final.columns:
                relatorio += f"✔ Clientes únicos: {base_final['cliente_id'].nunique()}\n"
            if "venda_id" in base_final.columns:
                relatorio += f"✔ Vendas únicas: {base_final['venda_id'].nunique()}\n"
            enriquecidas = [col for col in ["faixa_valor", "faixa_idade", "ano"] if col in base_final.columns]
            if enriquecidas:
                relatorio += f"✔ Atributos enriquecidos: {', '.join(enriquecidas)}\n"
            # Verificação de duplicatas e nulos
            duplicatas_email = base_final.duplicated(subset=["email"]).sum() if "email" in base_final.columns else 0
            duplicatas_id = base_final.duplicated(subset=["cliente_id"]).sum() if "cliente_id" in base_final.columns else 0
            relatorio += f"✔ Duplicatas removidas: email = {duplicatas_email}, cliente_id = {duplicatas_id}\n"
            nulos = base_final.isnull().sum()
            nulos_relevantes = nulos[nulos > 0]
            if not nulos_relevantes.empty:
                relatorio += f"✔ Colunas com valores ausentes: {', '.join(nulos_relevantes.index.tolist())}\n"

        # Logs detalhados de auditoria e execução
        relatorio += "\n------------------------------\n🔍 Auditoria e Eventos\n------------------------------\n"
        for log in logs_detalhados:
            relatorio += f"▶ {log}\n"

        # Caminho do arquivo
        caminho_relatorio = os.path.join(diretorio_relatorio, "relatorio_pipeline.txt")
        with open(caminho_relatorio, "w", encoding="utf-8") as arquivo:
            arquivo.write(relatorio.strip())

        logger.info(f"📄 Relatório gerado com sucesso: {caminho_relatorio}")

    except Exception as e:
        logger.error(f"❌ Erro ao gerar relatório final: {str(e)}")

# Execução Final do Pipeline
tempo_inicial = time.time()
logs_detalhados = []

try:
    # Conversão preventiva de tipos para validação
    clientes_df = clientes_df.astype(
        {"cliente_id": "int64", "nome": "str", "idade": "int64", "email": "str"}
    )

    # Auditoria de qualidade
    logs_detalhados.append("▶ Iniciando auditoria da base de clientes.")
    auditoria_aprovada = auditar_qualidade_dados(
        clientes_df,
        "clientes_df",
        regras_estrutura["clientes_df"],
        regras_negocio["clientes_df"],
    )

    if not auditoria_aprovada:
        raise Exception("Auditoria de clientes não aprovada.")

    logs_detalhados.append("Auditoria concluída com sucesso.")
    status_execucao = "SUCESSO"

except Exception as e:
    logging.error(f"Erro durante execução final: {e}")
    logs_detalhados.append(f"Erro: {e}")
    status_execucao = "FALHA"

# Finalização do pipeline
tempo_final = time.time()
concluir_pipeline(status_execucao, logs_detalhados, tempo_inicial, tempo_final, base_final=dados_final)
2025-07-13 16:03:21,915 - [INFO] - Iniciando auditoria de estrutura para clientes_df.
2025-07-13 16:03:21,918 - [INFO] - Iniciando auditoria de regras de negócio para clientes_df.
2025-07-13 16:03:22,388 - [INFO] - [clientes_df] Auditoria concluída com sucesso!
2025-07-13 16:03:22,397 - [INFO] - 📄 Relatório gerado com sucesso: logs\relatorio_pipeline.txt

7. Arquivos Gerados e Estrutura de Diretórios

Ao longo do pipeline que construimos aqui foram gerados arquivos e organizados de forma padronizada em diretórios específicos. Essa organização facilita a rastreabilidade, a auditoria e o reaproveitamento das bases processadas, mantendo controle de histórico e versionamento automático.

📁 Diretórios Criados
✅ logs/ → Contém os registros detalhados da execução do pipeline.
✅ dados/output/ → Armazena os dados processados em diferentes formatos.
✅ dados/backup/ → Guarda versões anteriores dos arquivos com controle de histórico.

📄 Arquivos Gerados pelo Pipeline

🔹 Logs:
📌 logs/pipeline.log → Contém registros de cada etapa do pipeline.
📌 logs/relatorio_pipeline.txt → Relatório final com status da execução.

🔹 Dados Processados:
📌 dados/output/clientes_processados.csv → Dados de clientes limpos e validados.
📌 dados/output/vendas_processadas.parquet → Dados de vendas após integração e transformação.
📌 dados/output/marketing_processado.json → Dados de campanhas processados.
📌 dados/output/acessos_processados.csv → Registros de acessos tratados.

🔹 Backups Versionados:
📌 dados/backup/clientes_processados_YYYYMMDD_HHMMSS.csv
📌 dados/backup/vendas_processadas_YYYYMMDD_HHMMSS.parquet
📌 dados/backup/marketing_processado_YYYYMMDD_HHMMSS.json
📌 dados/backup/acessos_processados_YYYYMMDD_HHMMSS.csv
(Backups gerados automaticamente com timestamp para rastreabilidade completa.)


8. Conclusão Geral

Imagem de Detecção de Fraude

Chegamos ao final deste trabalho, e construímos um pipeline que não só integra dados de diferentes fontes, como também garante qualidade, segurança e escalabilidade, estabelecendo uma base sólida para análises estratégicas e aprimoramentos futuros.

Principais Destaques :

- Integrar dados de múltiplas fontes (Postgre, SQLite, CSV, JSON).
- Assegurar a qualidade dos dados, aplicando limpezas, correções e validações automatizadas.
- Garantir a escalabilidade, rastreabilidade e segurança, com logs detalhados e backups automáticos.
- Preparar os dados para análises avançadas, incluindo transformações enriquecidas e criação de colunas derivadas.


© Copyright 2025 | Luciano Magalhães