From 3e2b776335ceefb29c7460e28d5cdadfb4a0d93d Mon Sep 17 00:00:00 2001 From: BrunoDeskWSL Date: Wed, 18 Dec 2024 10:28:59 -0300 Subject: [PATCH] criado codigo para baixar e extrair dados --- .gitignore | 6 + ...os arquivos zip da receita nesta pasta.txt | 0 ...43o descompactados e gerado o cnpj.db.txt" | 0 dados_cnpj_baixa.py | 217 ++++++------------ dados_cnpj_extrair.py | 54 +++++ 5 files changed, 132 insertions(+), 145 deletions(-) create mode 100644 .gitignore delete mode 100644 dados-publicos-zip/copie os arquivos zip da receita nesta pasta.txt delete mode 100644 "dados-publicos/aqui os arquivos ser\303\243o descompactados e gerado o cnpj.db.txt" create mode 100644 dados_cnpj_extrair.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b8ccf91 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +# Ignorar ambiente virtual +.venv/ + +# Ignorar dados públicos e suas versões comprimidas +dados-publicos/ +dados-publicos-zip/ diff --git a/dados-publicos-zip/copie os arquivos zip da receita nesta pasta.txt b/dados-publicos-zip/copie os arquivos zip da receita nesta pasta.txt deleted file mode 100644 index e69de29..0000000 diff --git "a/dados-publicos/aqui os arquivos ser\303\243o descompactados e gerado o cnpj.db.txt" "b/dados-publicos/aqui os arquivos ser\303\243o descompactados e gerado o cnpj.db.txt" deleted file mode 100644 index e69de29..0000000 diff --git a/dados_cnpj_baixa.py b/dados_cnpj_baixa.py index abe699d..13e8628 100644 --- a/dados_cnpj_baixa.py +++ b/dados_cnpj_baixa.py @@ -1,153 +1,80 @@ -# -*- coding: utf-8 -*- -""" - -lista relação de arquivos na página de dados públicos da receita federal -e faz o download -https://www.gov.br/receitafederal/pt-br/assuntos/orientacao-tributaria/cadastros/consultas/dados-publicos-cnpj -https://dadosabertos.rfb.gov.br/CNPJ/ -http://200.152.38.155/CNPJ/ -""" +import os +import requests from bs4 import BeautifulSoup -import requests, wget, os, sys, time, glob, parfive - -#url = 'http://200.152.38.155/CNPJ/dados_abertos_cnpj/2024-08/' #padrão a partir de agosto/2024 -#url_dados_abertos = 'https://dadosabertos.rfb.gov.br/CNPJ/dados_abertos_cnpj/' -#url_dados_abertos = 'http://200.152.38.155/CNPJ/dados_abertos_cnpj/' -url_dados_abertos = 'https://arquivos.receitafederal.gov.br/cnpj/dados_abertos_cnpj/' - -pasta_zip = r"dados-publicos-zip" #local dos arquivos zipados da Receita -pasta_cnpj = 'dados-publicos' - -def requisitos(): - #se pastas não existirem, cria automaticamente - if not os.path.isdir(pasta_cnpj): - os.mkdir(pasta_cnpj) - if not os.path.isdir(pasta_zip): - os.mkdir(pasta_zip) - - arquivos_existentes = list(glob.glob(pasta_cnpj +'/*.*')) + list(glob.glob(pasta_zip + '/*.*')) - if len(arquivos_existentes): - #eg.msgbox("Este programa baixa arquivos csv.zip de dados abertos da Receita Federal e converte para uso na RedeCNPJ aplicativo.\nIMPORTANTE: Para prosseguir, as pastas 'dados-publicos' e 'dados-publicos-zip', devem estar vazias, senão poderá haver inconsistências (juntar dados de meses distintos).\n",'Criar Bases RedeCNPJ') - #if eg.ynbox('Deseja apagar os arquivos das pastas ' + pasta_cnpj + ' e ' + pasta_zip + '?\nNÃO SERÁ POSSÍVEL REVERTER!!!!\n' + '\n'.join(arquivos_existentes) + '\nATENÇÃO: SE FOR EXECUTAR APENAS ALGUMA PARTE DO PROGRAMA, NÃO SELECIONE ESTA OPÇÃO, APAGUE MANUALMENTE.','Criar Bases RedeCNPJ', ['SIM-APAGAR', 'NÃO']): - r = input('Deseja apagar os arquivos das pastas ' + pasta_cnpj + ' e ' + pasta_zip + '?\n' + '\n'.join(arquivos_existentes) + '\nATENÇÃO: SE FOR EXECUTAR APENAS ALGUMA PARTE DO PROGRAMA, NÃO SELECIONE ESTA OPÇÃO, APAGUE MANUALMENTE. \nNÃO SERÁ POSSÍVEL REVERTER!!!!\nDeseja prosseguir e apagar os arquivos (y/n)??') - if r and r.upper()=='Y': - for arq in arquivos_existentes: - print('Apagando arquivo ' + arq) - os.remove(arq) - else: - print('Parando... Apague os arquivos ' + pasta_cnpj + ' e ' + pasta_zip +' e tente novamente') - input('Pressione Enter') - sys.exit(1) - # else: - # eg.msgbox("Este programa baixa arquivos csv.zip de dados abertos da Receita Federal e converte para uso na RedeCNPJ aplicativo.",'Criar Bases RedeCNPJ') - - # if len(glob.glob(os.path.join(pasta_zip,'*.zip'))): - # print(f'Há arquivos zip na pasta {pasta_zip}. Apague ou mova esses arquivos zip e tente novamente') - # input('Pressione Enter') - # sys.exit(1) -requisitos() - -print(time.asctime(), f'Início de {sys.argv[0]}:') - -soup_pagina_dados_abertos = BeautifulSoup(requests.get(url_dados_abertos).text, features="lxml") -try: - ultima_referencia = sorted([link.get('href') for link in soup_pagina_dados_abertos.find_all('a') if link.get('href').startswith('20')])[-1] -except: - print('Não encontrou pastas em ' + url_dados_abertos) - r = input('Pressione Enter.') - sys.exit(1) - - -url = url_dados_abertos + ultima_referencia -# page = requests.get(url) -# data = page.text -soup = BeautifulSoup(requests.get(url).text, features="lxml") -lista = [] -print('Relação de Arquivos em ' + url) -for link in soup.find_all('a'): - if str(link.get('href')).endswith('.zip'): - cam = link.get('href') - if not cam.startswith('http'): - print(url+cam) - lista.append(url+cam) - else: - print(cam) - lista.append(cam) -if __name__ == '__main__': - resp = input(f'Deseja baixar os arquivos acima para a pasta {pasta_zip} (y/n)?') - if resp.lower()!='y' and resp.lower()!='s': - sys.exit() +# URL base da Receita Federal +URL_BASE = "https://arquivos.receitafederal.gov.br/cnpj/dados_abertos_cnpj/2024-11/" +DIRETORIO_DOWNLOAD = "./downloads_cnpj" + +# Configuração de sessão com User-Agent +session = requests.Session() +session.headers.update({ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36" +}) + +def listar_arquivos(url): + """Obtém a lista de arquivos disponíveis para download.""" + try: + print(f"Acessando {url}...") + response = session.get(url, timeout=10) + response.raise_for_status() + + # Parse do HTML + soup = BeautifulSoup(response.text, "html.parser") + arquivos = [] + # Procura links de arquivos + for link in soup.find_all("a"): + href = link.get("href") + if href and href.endswith(".zip"): + arquivos.append(href) + + return arquivos + + except requests.exceptions.RequestException as e: + print(f"Erro ao acessar a URL base: {e}") + return [] +def baixar_arquivo(url_arquivo, caminho_destino): + """Faz o download de um arquivo.""" + try: + print(f"Baixando {url_arquivo}...") + response = session.get(url_arquivo, stream=True, timeout=30) + response.raise_for_status() -print(time.asctime(), 'Início do Download dos arquivos...') + with open(caminho_destino, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) -if True: #baixa usando parfive, download em paralelo - downloader = parfive.Downloader() - for url in lista: - downloader.enqueue_file(url, path=pasta_zip, filename=os.path.split(url)[1]) - downloader.download() -else: #baixar sequencial, rotina antiga - def bar_progress(current, total, width=80): - if total>=2**20: - tbytes='Megabytes' - unidade = 2**20 + print(f"Arquivo salvo em {caminho_destino}") + except requests.exceptions.RequestException as e: + print(f"Erro ao baixar {url_arquivo}: {e}") + +def main(): + """Executa o fluxo principal.""" + print("Iniciando o processo de download dos arquivos do CNPJ...") + + # Garantir que o diretório de downloads exista + os.makedirs(DIRETORIO_DOWNLOAD, exist_ok=True) + + # Listar arquivos disponíveis + arquivos = listar_arquivos(URL_BASE) + + if not arquivos: + print("Não foi possível listar os arquivos.") + return + + print(f"Arquivos encontrados: {arquivos}") + + # Baixar cada arquivo + for arquivo in arquivos: + url_arquivo = os.path.join(URL_BASE, arquivo) + caminho_destino = os.path.join(DIRETORIO_DOWNLOAD, arquivo) + + if not os.path.exists(caminho_destino): # Evita baixar novamente + baixar_arquivo(url_arquivo, caminho_destino) else: - tbytes='kbytes' - unidade = 2**10 - progress_message = f"Baixando: %d%% [%d / %d] {tbytes}" % (current / total * 100, current//unidade, total//unidade) - # Don't use print() as it will print in new line every time. - sys.stdout.write("\r" + progress_message) - sys.stdout.flush() - - for k, url in enumerate(lista): - print('\n' + time.asctime() + f' - item {k}: ' + url) - wget.download(url, out=os.path.join(pasta_zip, os.path.split(url)[1]), bar=bar_progress) + print(f"Arquivo já existe: {caminho_destino}") - -print('\n\n'+ time.asctime(), f' Finalizou {sys.argv[0]}!!!') -print(f"Baixou {len(glob.glob(os.path.join(pasta_zip,'*.zip')))} arquivos.") -if __name__ == '__main__': - input('Pressione Enter') - -#lista dos arquivos (até julho/2024) -''' -http://200.152.38.155/CNPJ/Cnaes.zip -http://200.152.38.155/CNPJ/Empresas0.zip -http://200.152.38.155/CNPJ/Empresas1.zip -http://200.152.38.155/CNPJ/Empresas2.zip -http://200.152.38.155/CNPJ/Empresas3.zip -http://200.152.38.155/CNPJ/Empresas4.zip -http://200.152.38.155/CNPJ/Empresas5.zip -http://200.152.38.155/CNPJ/Empresas6.zip -http://200.152.38.155/CNPJ/Empresas7.zip -http://200.152.38.155/CNPJ/Empresas8.zip -http://200.152.38.155/CNPJ/Empresas9.zip -http://200.152.38.155/CNPJ/Estabelecimentos0.zip -http://200.152.38.155/CNPJ/Estabelecimentos1.zip -http://200.152.38.155/CNPJ/Estabelecimentos2.zip -http://200.152.38.155/CNPJ/Estabelecimentos3.zip -http://200.152.38.155/CNPJ/Estabelecimentos4.zip -http://200.152.38.155/CNPJ/Estabelecimentos5.zip -http://200.152.38.155/CNPJ/Estabelecimentos6.zip -http://200.152.38.155/CNPJ/Estabelecimentos7.zip -http://200.152.38.155/CNPJ/Estabelecimentos8.zip -http://200.152.38.155/CNPJ/Estabelecimentos9.zip -http://200.152.38.155/CNPJ/Motivos.zip -http://200.152.38.155/CNPJ/Municipios.zip -http://200.152.38.155/CNPJ/Naturezas.zip -http://200.152.38.155/CNPJ/Paises.zip -http://200.152.38.155/CNPJ/Qualificacoes.zip -http://200.152.38.155/CNPJ/Simples.zip -http://200.152.38.155/CNPJ/Socios0.zip -http://200.152.38.155/CNPJ/Socios1.zip -http://200.152.38.155/CNPJ/Socios2.zip -http://200.152.38.155/CNPJ/Socios3.zip -http://200.152.38.155/CNPJ/Socios4.zip -http://200.152.38.155/CNPJ/Socios5.zip -http://200.152.38.155/CNPJ/Socios6.zip -http://200.152.38.155/CNPJ/Socios7.zip -http://200.152.38.155/CNPJ/Socios8.zip -http://200.152.38.155/CNPJ/Socios9.zip -''' +if __name__ == "__main__": + main() diff --git a/dados_cnpj_extrair.py b/dados_cnpj_extrair.py new file mode 100644 index 0000000..d3c7c41 --- /dev/null +++ b/dados_cnpj_extrair.py @@ -0,0 +1,54 @@ +from pathlib import Path +import zipfile +from tqdm import tqdm + +# Caminhos das pastas +PASTA_ZIPS = Path("dados-publicos-zip") +PASTA_DESTINO = Path("dados-publicos") + +def excluir_arquivos_existentes(pasta_destino): + """Exclui todos os arquivos existentes na pasta de destino.""" + if pasta_destino.exists(): + for arquivo in pasta_destino.rglob("*"): + if arquivo.is_file(): + arquivo.unlink() + +def extrair_arquivos_zip(): + """Extrai arquivos ZIP da pasta origem para a pasta destino com barra de progresso.""" + if not PASTA_ZIPS.exists(): + print(f"A pasta {PASTA_ZIPS} não existe. Verifique o caminho.") + return + + # Excluir arquivos existentes na pasta de destino + excluir_arquivos_existentes(PASTA_DESTINO) + + # Criar a pasta de destino, se não existir + PASTA_DESTINO.mkdir(parents=True, exist_ok=True) + + # Obter lista de arquivos ZIP + arquivos_zip = list(PASTA_ZIPS.glob("*.zip")) + + if not arquivos_zip: + print("Nenhum arquivo ZIP encontrado na pasta origem.") + return + + total_arquivos = len(arquivos_zip) + sucesso = 0 + + # Barra de progresso + print("Extraindo arquivos...") + with tqdm(total=total_arquivos, unit="arquivo", desc="Progresso") as barra: + for arquivo_zip in arquivos_zip: + try: + with zipfile.ZipFile(arquivo_zip, 'r') as zip_ref: + zip_ref.extractall(PASTA_DESTINO) + sucesso += 1 + except Exception as e: + print(f"Erro ao extrair {arquivo_zip.name}: {e}") + finally: + barra.update(1) + + print(f"Extração concluída: {sucesso}/{total_arquivos} arquivos extraídos com sucesso.") + +if __name__ == "__main__": + extrair_arquivos_zip()