编辑
2025-07-04
技术
00
请注意,本文编写于 226 天前,最后修改于 226 天前,其中某些信息可能已经过时。

进行了修正, 支持pdf页码颠倒,同个pdf是不同批次的报关单等。。。

python
import pdfplumber import pandas as pd import os import re import pymysql import sys import time # 导入time模块 ''' 错误代码定义: sys.exit(1) 0:正常结束 1:参数传递错误 2:不是PDF文件 3:无法提取数据 4:数据库保存失败 ''' # 在文件开头的import语句下方,添加以下函数定义 def extract_text_by_position(page, x0, y0, x1, y1): """从页面的指定坐标区域提取文本""" try: crop = page.crop((x0, y0, x1, y1)) text = crop.extract_text() if crop else "" return text.strip() except Exception as e: print(f"提取文本时出错: {e}, 坐标: ({x0}, {y0}, {x1}, {y1})") return "" def extract_data_from_pdf_by_position(pdf_path): all_data = [] file_name = os.path.basename(pdf_path) try: with pdfplumber.open(pdf_path) as pdf: print(f"开始处理PDF文件: {file_name}") # 首先扫描所有页面,提取页码信息 all_pages_with_info = [] for page_idx, page_obj in enumerate(pdf.pages): # 只处理横向页面 if page_obj.width > page_obj.height: try: # 提取右上角的页码/页数信息 page_number_coords = (600, 75.4, 800, 84.5) # 右上角页码区域坐标 # 确保坐标在页面范围内 if page_number_coords[0] < page_obj.width and page_number_coords[2] < page_obj.width and \ page_number_coords[1] < page_obj.height and page_number_coords[3] < page_obj.height: page_number_crop = page_obj.crop(page_number_coords) page_number_text = page_number_crop.extract_text() if page_number_crop else "" # 使用正则表达式提取页码/页数 page_info_match = re.search(r'(\d+)/(\d+)', page_number_text) if page_number_text else None if page_info_match: current_page = int(page_info_match.group(1)) total_pages = int(page_info_match.group(2)) print(f"识别到页码信息: 第{current_page}页,共{total_pages}页 (物理页码 {page_idx+1})") # 保存页面和页码信息 all_pages_with_info.append({ 'page_obj': page_obj, 'current_page': current_page, 'total_pages': total_pages, 'physical_page': page_idx + 1 }) else: print(f"未在页面 {page_idx+1} 找到页码/页数信息") except Exception as e: print(f"处理页面 {page_idx+1} 页码信息时出错: {e}") # 处理所有识别到的页面 - 修改为支持每页独立基础信息 if all_pages_with_info: print(f"共识别到 {len(all_pages_with_info)} 个有效页面") # 新的处理逻辑:每页独立处理,动态识别基础信息 # 第一步:建立预录入编号到基础信息的映射 pre_input_to_base_info = {} # 预录入编号 -> 基础信息映射 processed_pages = set() # 记录已处理的页面 # 扫描所有页面,找到包含"境内发货人"字段的页面并提取基础信息 for page_info in all_pages_with_info: page_obj = page_info['page_obj'] physical_page = page_info['physical_page'] # 检查页面是否包含"境内发货人"文字 full_page_text = page_obj.extract_text() if "境内发货人" in full_page_text: print(f"\n页面 {physical_page} 包含'境内发货人'字段,提取基础信息") # 定义基础信息字段的坐标 field_coordinates = { '预录入编号': (85.8, 75.4, 165.8, 84.5), '海关编号': (295.2, 75.4, 375.8, 84.5), '境内发货人': (36.75, 88.67, 255.375, 112.295), '境外收货人': (36.75, 112.295, 255.375, 135.545), '生产销售单位': (36.75, 135.545, 255.375, 158.795), '运输方式': (255.375, 112.295, 383.625, 135.545), '离境口岸': (637.125, 158.795, 805.5, 182.045), '出境关别':(640.50,170.86,658.50,179.86), '合同协议号': (36.75, 158.795, 255.375, 182.045), '监管方式': (255.375, 135.545, 383.625, 158.795), '征免性质': (383.625, 135.545, 516.375, 158.795), '许可证号': (516.375, 135.545, 805.5, 158.795), '备案号': (637.125, 88.67, 805.5, 112.295), '申报日期': (516.375, 88.67, 637.125, 112.295), '出口日期': (383.625, 88.67, 516.375, 112.295), '运输工具名称及航次号': (383.625, 112.295, 516.375, 135.545), '运抵国(地区)': (383.625, 158.795, 516.375, 182.045), '指运港': (516.375, 158.795, 637.125, 182.045), '提运单号': (516.375, 112.295, 805.5, 135.545), '包装种类': (36.75, 182.045, 255.375, 205.295), '件数': (260.375, 182.045, 301.125, 205.295), '毛重': (301.125, 182.045, 383.625, 205.295), '净重': (383.625, 182.045, 463.125, 205.295), '成交方式': (463.125, 182.045, 516.375, 205.295), '运费': (516.375, 182.045, 608.625, 205.295), '保费': (608.625, 182.045, 700.875, 205.295), '杂费': (700.875, 182.045, 805.5, 205.295), '随附单证及编号': (36.75, 205.295, 805.5, 228.545), '标记唛码及备注': (36.75, 228.545, 805.5, 274.295), '申报单位': (78.8, 528.8, 294.8, 537.8) } # 提取预录入编号作为映射键 pre_input_text = extract_text_by_position(page_obj, *field_coordinates['预录入编号']) pre_input_match = re.search(r'([A-Za-z0-9]+)', pre_input_text) if pre_input_match: pre_input_number = pre_input_match.group(1) print(f"页面 {physical_page} 识别到预录入编号: {pre_input_number}") # 初始化基础数据 base_data_from_first_page = {'文件名': file_name} # 提取所有基础信息字段 for field, coords in field_coordinates.items(): try: x0, y0, x1, y1 = coords # 确保坐标在页面范围内 if x0 >= 0 and y0 >= 0 and x1 <= page_obj.width and y1 <= page_obj.height and x0 < x1 and y0 < y1: text_content = extract_text_by_position(page_obj, x0, y0, x1, y1) if text_content: # 数据清洗逻辑 # 初始清理:移除可能的字段名和前后空格 cleaned_text_intermediate = re.sub(rf'(?i)^{re.escape(field)}\s*:?\s*', '', text_content).strip() # 如果第一次清理后结果为空,但原始文本不为空,说明可能字段名就是全部内容,或者清理过度 if not cleaned_text_intermediate and text_content.strip(): # 尝试一个更简单的清理,只移除开头的字段名(不区分大小写) cleaned_text_intermediate = re.sub(rf'(?i)^{re.escape(field)}', '', text_content).strip() # 如果还是空,并且原始文本不等于字段名本身,则用原始文本 if not cleaned_text_intermediate and text_content.strip().lower() != field.lower(): cleaned_text_intermediate = text_content.strip() elif not cleaned_text_intermediate and text_content.strip().lower() == field.lower(): # 如果清理后为空,且原始文本就是字段名,那么这个字段可能确实没有值 base_data_from_first_page[field] = '' continue final_clean_text = cleaned_text_intermediate # 提取括号内的代码 code_match = re.search(r'\(([^)]+)\)', final_clean_text) if code_match: code_value = code_match.group(1).strip() # 提取代码后,从文本中移除括号及其内容,以及可能的尾随字段名 text_without_code = re.sub(r'\s*\(([^)]+)\)\s*', '', final_clean_text).strip() # 再次尝试移除字段名,以防它在括号之后或之前 text_without_code_and_field = re.sub(rf'(?i)^{re.escape(field)}\s*:?\s*', '', text_without_code).strip() if not text_without_code_and_field and text_without_code.strip(): final_clean_text = text_without_code.strip() else: final_clean_text = text_without_code_and_field # 保存代码到对应的代码字段 if field == '境内发货人': base_data_from_first_page['境内发货人代码'] = code_value elif field == '离境口岸': base_data_from_first_page['离境口岸代码'] = code_value elif field == '监管方式': base_data_from_first_page['监管方式代码'] = code_value elif field == '生产销售单位': base_data_from_first_page['生产销售单位代码'] = code_value elif field == '指运港': base_data_from_first_page['指运港代码'] = code_value elif field == '毛重': base_data_from_first_page['毛重代码'] = code_value elif field == '净重': base_data_from_first_page['净重代码'] = code_value elif field == '成交方式': base_data_from_first_page['成交方式代码'] = code_value elif field == '申报单位': base_data_from_first_page['申报单位代码'] = code_value elif field == '征免性质': base_data_from_first_page['征免性质代码'] = code_value # 最后的清理,确保移除了字段名 final_value = re.sub(rf'(?i){re.escape(field)}\s*:?\s*', '', final_clean_text).strip() final_value = re.sub(r'\s+', ' ', final_value).strip() # 如果最终值不等于字段名本身(不区分大小写),则认为是有效值 if final_value.lower() != field.lower(): base_data_from_first_page[field] = final_value elif base_data_from_first_page.get(field) is None: base_data_from_first_page[field] = '' else: base_data_from_first_page[field] = '' else: print(f"警告: 字段 '{field}' 的坐标 {coords} 超出页面边界,跳过此字段。") except Exception as e: print(f"警告: 字段 '{field}' 提取时发生错误: {e}。坐标: {coords}") # 保存到映射中 pre_input_to_base_info[pre_input_number] = base_data_from_first_page print(f"保存预录入编号 {pre_input_number} 的基础信息,共 {len(base_data_from_first_page)} 个字段") print(f"\n共找到 {len(pre_input_to_base_info)} 个预录入编号的基础信息") # 第二步:处理所有页面,提取商品项并匹配基础信息 for page_info in all_pages_with_info: if page_info['physical_page'] in processed_pages: continue current_page_num = page_info['current_page'] total_pages = page_info['total_pages'] page_obj = page_info['page_obj'] physical_page = page_info['physical_page'] print(f"\n处理页面 {physical_page} (报关单页码: {current_page_num}/{total_pages})") # 提取当前页面的预录入编号 pre_input_coords = (85.8, 75.4, 165.8, 84.5) pre_input_text = extract_text_by_position(page_obj, *pre_input_coords) pre_input_match = re.search(r'([A-Za-z0-9]+)', pre_input_text) if pre_input_match: current_pre_input = pre_input_match.group(1) print(f"页面 {physical_page} 识别到预录入编号: {current_pre_input}") # 查找对应的基础信息 if current_pre_input in pre_input_to_base_info: base_data = pre_input_to_base_info[current_pre_input].copy() print(f"页面 {physical_page} 使用预录入编号 {current_pre_input} 的基础信息") else: print(f"警告:页面 {physical_page} 的预录入编号 {current_pre_input} 未找到对应的基础信息") base_data = {'文件名': file_name, '预录入编号': current_pre_input} else: print(f"警告:页面 {physical_page} 未找到预录入编号") base_data = {'文件名': file_name} # 根据页码选择不同的坐标 if current_page_num == 1: # 第一页 row_coordinates = [ (36.75, 287.795, 805.5, 320.045), (36.75, 320.045, 805.5, 352.3), (36.75, 352.3, 805.5, 384.55), (36.75, 384.55, 805.5, 416.8), (36.75, 416.8, 805.5, 449.05), (36.75, 449.05, 805.5, 481.295) ] is_first_page = True print(f"使用第一页坐标提取数据,页码: {current_page_num}/{total_pages}") else: # 后续页面 row_coordinates = [ (36.75, 102.545, 805.5, 134.8), (36.75, 134.8, 805.5, 167.05), (36.75, 167.05, 805.5, 199.3), (36.75, 199.3, 805.5, 231.55), (36.75, 231.55, 805.5, 263.8), (36.75, 263.8, 805.5, 296.05), (36.75, 296.05, 805.5, 328.3), (36.75, 328.3, 805.5, 360.55), (36.75, 360.55, 805.5, 392.8), (36.75, 392.8, 805.5, 425.05), (36.75, 425.05, 805.5, 457.3), (36.75, 457.3, 805.5, 489.55), (36.75, 489.55, 805.5, 521.8), (36.75, 521.8, 805.5, 554.6075) ] is_first_page = False print(f"使用后续页面坐标提取数据,页码: {current_page_num}/{total_pages}") # 提取当前页面的商品项 page_items = extract_items_from_page(page_obj, row_coordinates, pdf_path, physical_page, is_first_page) # 将基础数据与商品项数据合并 if page_items: for item_data in page_items: final_item_data = base_data.copy() final_item_data.update(item_data) all_data.append(final_item_data) processed_pages.add(physical_page) # 重新编排项号 if all_data: all_data.sort(key=lambda x: (x.get('文件名', ''), x.get('项号', float('inf')))) current_item_number = 1 for item in all_data: if '商品编号' in item and item['商品编号']: item['项号'] = current_item_number current_item_number += 1 return all_data except Exception as e: print(f"提取数据过程中发生严重错误: {e}") import traceback traceback.print_exc() # 打印完整的错误堆栈信息 return [] def extract_items_from_page(page_obj, row_coordinates, pdf_path, page_idx, is_first_page=True): """从页面提取商品项数据""" collected_items = [] page_width = page_obj.width page_height = page_obj.height print(f"提取页面 {page_idx} 的商品项,使用{'第一页' if is_first_page else '后续页面'}坐标") for item_row_idx, row_coords in enumerate(row_coordinates): x0, top, x1, bottom = row_coords if not (x0 >= 0 and top >= 0 and x1 <= page_width and bottom <= page_height and x0 < x1 and top < bottom): print(f"警告: 文件 '{pdf_path}',页面 {page_idx},商品行 {item_row_idx + 1} 的坐标 {row_coords} 无效,跳过此行。") continue item_data = {} try: crop_area = page_obj.crop((x0, top, x1, bottom)) full_text = crop_area.extract_text(x_tolerance=1, y_tolerance=1) # 以下正则均为内容解析 if full_text and full_text.strip(): # 打印调试信息 print(f"页面 {page_idx}, 行 {item_row_idx + 1} 提取的文本: {full_text[:50]}...") # 其余解析逻辑保持不变 item_number_match = re.search(r'^\s*(\d{1,2})\s+', full_text) if item_number_match: item_data['项号'] = int(item_number_match.group(1)) product_code_name_match = re.search(r'^\s*\d{1,2}\s+(\d+)([^\d\n]+)', full_text) if product_code_name_match: item_data['商品编号'] = product_code_name_match.group(1).strip() item_data['商品名称'] = product_code_name_match.group(2).strip() qty_unit_match = re.search(r'(\d+\.?\d*)(台|千克|个|件|套|箱)', full_text) if qty_unit_match: item_data['数量及单位'] = qty_unit_match.group(0).strip() product_match2 = re.findall(r'(?:\b\d+\|[^\s]*(?=\s+))|(?:\n\s*(?:[a-zA-Z]+|[\u4e00-\u9fa5]+|[a-zA-Z0-9]+/[a-zA-Z0-9]+|\d+[a-zA-Z]+/?[a-zA-Z/]*)(?=\s+))',full_text) if product_match2: item_data['商品名称及规格型号'] = "".join(product_match2) price_match = re.search(r'(\d+\.\d{4})', full_text) if price_match: item_data['单价'] = price_match.group(1) total_price_match = re.search(r'(\b\d+\.\d{2}\b)', full_text) if total_price_match: item_data['总价'] = total_price_match.group(1) currency_match = re.search(r'(美元|人民币|欧元|英镑|日元)', full_text) if currency_match: item_data['币制'] = currency_match.group(1) regex_pattern = r"摩尔多瓦|芬兰|老挝|俄罗斯|白罗斯|乌克兰|摩尔达维亚|立陶宛|拉脱维亚|爱沙尼亚共和国|哈萨克斯坦|格鲁吉亚|乌兹别克斯坦|朝鲜|蒙古|越南|中国|波兰|保加利亚|捷克|匈牙利|斯洛伐克|阿塞拜疆|吉尔吉斯斯坦|塔吉克斯坦|土库曼斯坦|阿富汗|英国|西班牙|塞尔维亚|土耳其|德国|奥地利|卢森堡|意大利|荷兰|法国|比利时|伊朗|安道尔|阿联酋|安提瓜和巴布达|安圭拉|阿尔巴尼亚|亚美尼亚|安哥拉|南极洲|阿根廷|美属萨摩亚|澳大利亚|阿鲁巴|波黑|巴巴多斯|孟加拉|布基纳法索|巴林|布隆迪|贝宁|百慕大|文莱|玻利维亚|荷兰加勒比区|巴西|巴哈马|不丹|博茨瓦纳|伯利兹|加拿大|刚果(金)|中非|刚果(布)|瑞士|科特迪瓦|智利|喀麦隆|哥伦比亚|哥斯达黎加|古巴|佛得角|塞浦路斯|吉布提|丹麦|多米尼克|多米尼加|阿尔及利亚|厄瓜多尔|爱沙尼亚|埃及|西撒哈拉|厄立特里亚|埃塞俄比亚|密克罗尼西亚联邦|加蓬|格林纳达|法属圭亚那|加纳|直布罗陀|格陵兰|冈比亚|几内亚|瓜德罗普|赤道几内亚|希腊|危地马拉|几内亚比绍|圭亚那|洪都拉斯|克罗地亚|海地|印尼|印度尼西亚共和国|爱尔兰|以色列|印度|英属印度洋领地|伊拉克|牙买加|约旦|日本|肯尼亚|柬埔寨|基里巴斯|科摩罗|圣基茨和尼维斯|韩国|科威特|黎巴嫩|圣卢西亚|列支敦士登|斯里兰卡|利比里亚|莱索托|利比亚|摩洛哥|摩纳哥|黑山|法属圣马丁|马达加斯加|马其顿|马里|缅甸|马提尼克|毛里塔尼亚|马耳他|毛里求斯|马尔代夫|墨西哥|马来西亚|莫桑比克|纳米比亚|新喀里多尼亚|尼日尔|尼日利亚|尼加拉瓜|挪威|尼泊尔|瑙鲁|纽埃|新西兰|阿曼|巴拿马|秘鲁|法属波利尼西亚|巴布亚新几内亚|菲律宾|巴基斯坦|圣皮埃尔和密克隆|波多黎各|巴勒斯坦|葡萄牙|帕劳|巴拉圭|卡塔尔|留尼汪|罗马尼亚|卢旺达|沙特阿拉伯|塞舌尔|苏丹|瑞典|新加坡|圣赫勒拿|斯洛文尼亚|塞拉利昂|圣马力诺|塞内加尔|索马里|苏里南|南苏丹|圣多美和普林西比|萨尔瓦多|叙利亚|斯威士兰|乍得|法属南部领地|多哥|泰国|托克劳|东帝汶|突尼斯|汤加|特立尼达和多巴哥|图瓦卢|坦桑尼亚|乌干达|美国|乌拉圭|梵蒂冈|圣文森特和格林纳丁斯|委内瑞拉|瓦努阿图|瓦利斯和富图纳|萨摩亚|也门|马约特|南非|赞比亚|津巴布韦" country_names = re.findall(regex_pattern, full_text) if len(country_names) >= 2: item_data['原产国'] = country_names[0] item_data['最终目的国'] = country_names[1] source_match = re.search(r'\((\d{5})\)\s*([^()]+?)(?=\s*(?:照章征税|\(CHN\)))', full_text) if source_match: item_data['境内货源地'] = source_match.group(1) + source_match.group(2).strip() tax_match = re.search(r'(照章征税|免税|保税|减免|先征后退|先征后返|保证金|暂定税率|特惠税率|最惠国税率|普通税率|协定税率)',full_text) if tax_match: item_data['征免性质'] = tax_match.group(1) declare_match = re.search(r'申报单位[\s::]*\(.*?\)(.*?)',full_text) if declare_match: item_data['申报单位'] = declare_match.group(1) # 增加更严格的验证 if '商品编号' in item_data and item_data['商品编号'] and '商品名称' in item_data and item_data['商品名称']: # 检查是否是有效的商品项(例如,项号应该是数字) if '项号' in item_data and isinstance(item_data['项号'], int): collected_items.append(item_data) print(f"成功提取商品项: 编号={item_data.get('商品编号', '未知')}, 项号={item_data.get('项号', '未知')}") else: print(f"跳过无效商品项: 页面 {page_idx}, 行 {item_row_idx + 1} - 无效的项号") else: print(f"跳过无效商品项: 页面 {page_idx}, 行 {item_row_idx + 1} - 缺少商品编号或商品名称") except Exception as e: print(f"警告: 文件 '{pdf_path}',页面 {page_idx},商品行 {item_row_idx + 1} 提取时发生错误: {e}。坐标: {row_coords}") print(f"页面 {page_idx} 共提取 {len(collected_items)} 个商品项") return collected_items # 调试坐标的辅助函数 def debug_coordinates(page_obj, description, x0, top, x1, bottom): try: crop = page_obj.crop((x0, top, x1, bottom)) text = crop.extract_text() if crop else "" print(f"调试 {description}: 坐标=({x0}, {top}, {x1}, {bottom}), 提取文本='{text}'") return text except Exception as e: print(f"调试 {description} 出错: {e}") return "" def save_to_mysql(data, table_name="te_bgd"): """将数据保存到MySQL数据库""" connection = None # 初始化connection为None try: # 建立连接 connection = pymysql.connect( host='rm-2ze425vzufup*****', port=3306, user='teuser', password='*****', database='tjtedb', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) print("数据库连接成功") # 新增,确认连接成功 with connection.cursor() as cursor: # 准备批量插入的数据 insert_data = [] for item in data: # 创建一个元组,包含要插入的字段值 row_data = ( item.get('文件名', ''), item.get('预录入编号', ''), item.get('海关编号', ''), item.get('境内发货人代码', ''), item.get('境内发货人', ''), item.get('离境口岸代码', ''), item.get('离境口岸', ''), item.get('申报日期', ''), item.get('境外收货人', ''), item.get('运输方式', ''), item.get('运输工具名称及航次号', ''), item.get('提运单号', ''), item.get('生产销售单位代码', ''), item.get('生产销售单位', ''), item.get('监管方式代码', ''), item.get('监管方式', ''), item.get('合同协议号', ''), item.get('贸易国(地区)', ''), item.get('运抵国(地区)', ''), item.get('指运港代码', ''), item.get('指运港', ''), item.get('离境口岸代码', ''), item.get('离境口岸', ''), item.get('包装种类', ''), item.get('件数', ''), item.get('毛重代码', ''), item.get('毛重', ''), item.get('净重代码', ''), item.get('净重', ''), item.get('成交方式代码', ''), item.get('成交方式', ''), item.get('随附单证及编号', ''), item.get('标记唛码及备注', ''), item.get('申报单位代码', ''), item.get('申报单位', ''), item.get('项号', ''), item.get('商品编号', ''), item.get('商品名称', ''), item.get('商品名称及规格型号', ''), item.get('数量及单位', ''), item.get('单价', ''), item.get('总价', ''), item.get('币制', ''), item.get('原产国', ''), item.get('最终目的国', ''), item.get('境内货源地', ''), item.get('征免性质代码', ''), item.get('征免性质', ''), item.get('运费', ''), item.get('保费', ''), item.get('杂费', '') ) insert_data.append(row_data) # 构建SQL插入语句 fields = [ "file_name", "pre_input_number", "customs_number", "domestic_shipper_code", "domestic_shipper", "exit_customs_code", "exit_customs", "date_of_declaration", "overseas_consignee", "mode_of_transport", "transport_vehicle_name_and_voyage_number", "delivery_numbers", "production_and_sales_unit_code", "production_and_sales_units", "regulatory_code", "regulatory_approach", "contract_number", "trading_country_region", "destination_country_region", "designated_port_code", "port_of_destination", "departure_port_code", "port_of_departure", "packaging_type", "quantity", "gross_weight_code", "gross_weight", "net_weight_code", "net_weight", "transaction_method_code", "transaction_method", "accompanying_documents_and_numbers", "shipping_marks_remarks", "declaration_unit_code", "declaration_unit", "item_number", "product_code", "product_name", "product_name_and_specification_model", "quantity_and_unit", "unit_price", "total_price", "currency_system", "country_of_origin", "final_destination_country", "original_place_of_delivered_goods", "exemption_nature_code", "nature_of_exemption","shipping_fee","premium","miscellaneous_expenses" ] if not insert_data: # 如果没有数据可以插入 print("没有数据需要插入数据库。") return True # 或者 False # 确保字段数量与值数量匹配 (仅在有数据时检查第一条) if len(fields) != len(insert_data[0]): print(f"错误:字段数量({len(fields)})与值数量({len(insert_data[0])})不匹配") print(f"字段列表: {fields}") print(f"第一行数据: {insert_data[0]}") return False sql = """ INSERT INTO {} ({}) VALUES ({}) """.format( table_name, ", ".join(fields), ", ".join(['%s'] * len(fields)) ) cursor.executemany(sql, insert_data) connection.commit() print(f"成功将 {len(insert_data)} 条数据插入到数据库表 {table_name}") return True except Exception as e: print(f"保存到数据库时出错: {e}") if connection: connection.rollback() return False finally: if connection: connection.close() print("数据库连接已关闭") def save_to_excel(data, output_path): """将数据保存到Excel文件""" try: # 确保所有记录都有相同的字段集 all_fields = set() for item in data: all_fields.update(item.keys()) # 为每个记录添加缺失的字段 for item in data: for field in all_fields: if field not in item: item[field] = "" # 创建DataFrame并保存 df = pd.DataFrame(data) # 可以指定列的顺序 # 可以指定列的顺序 column_order = [ # 文件与单证信息 '文件名', '预录入编号', '海关编号', '合同协议号', '提运单号', # 日期信息 '申报日期', '出口日期', # 参与方信息 '境内发货人代码', '境内发货人', '境外收货人', '生产销售单位代码', '生产销售单位', '申报单位代码', '申报单位', # 物流运输信息 '运输方式', '运输工具名称及航次号', '离境口岸代码', '离境口岸','出境关别代码', '出境关别', '运抵国(地区)', '指运港代码', '指运港', # 货物描述 '包装种类', '件数', '毛重代码', '毛重', '净重代码', '净重', # 价格与费用 '成交方式代码', '成交方式', '运费', '保费', '杂费', # 单证与备注 '随附单证及编号', '标记唛码及备注', # 监管与税费 '监管方式代码', '监管方式', '征免性质代码', '征免性质', # 商品项详细信息 '项号', '商品编号', '商品名称', '商品名称及规格型号', '数量及单位', '单价', '总价', '币制', '原产国', '最终目的国', '境内货源地' ] # 只保留存在的列 existing_columns = [col for col in column_order if col in df.columns] # 重新排序列并保存 df = df[existing_columns] df.to_excel(output_path, index=False) print(f"数据已保存到 {output_path}") return True except Exception as e: print(f"保存Excel时出错: {e}") return False def process_pdf_files(pdf_path): """处理指定的PDF文件并返回提取的数据""" data = extract_data_from_pdf_by_position(pdf_path) if data: print(f"从 {os.path.basename(pdf_path)} 提取了 {len(data)} 条数据") return data else: print(f"无法从 {os.path.basename(pdf_path)} 提取数据") return [] # 返回空列表而不是错误代码 def isPdf(file_path): try: with open(file_path, 'rb') as f: header = f.read(5) return header == b'%PDF-' except: return False if __name__ == "__main__": print("程序开始执行...") if len(sys.argv) != 2: print("参数错误:需要提供一个文件夹路径") sys.exit(1) folderPath = sys.argv[1] print(f"接收到的文件夹路径: {folderPath}") if not os.path.isdir(folderPath): print(f"路径错误:'{folderPath}' 不是一个有效的文件夹") sys.exit(1) pdf_files_processed_count = 0 pdf_files_failed_count = 0 all_extracted_data = [] # 用于累积所有PDF文件的数据 for root, dirs, files in os.walk(folderPath): for file in files: if file.lower().endswith('.pdf'): filePath = os.path.join(root, file) print(f"\n开始处理PDF文件: {filePath}") if not isPdf(filePath): print(f"文件类型错误:'{filePath}' 不是一个有效的PDF文件,跳过") pdf_files_failed_count += 1 continue # 处理PDF文件并获取数据 data = process_pdf_files(filePath) if data: # 如果成功提取到数据 all_extracted_data.extend(data) # 将数据添加到累积列表中 pdf_files_processed_count += 1 print(f"文件 {filePath} 处理成功,累计提取 {len(all_extracted_data)} 条数据。") else: pdf_files_failed_count += 1 print(f"文件 {filePath} 处理失败,未能提取数据。") print("暂停1秒...") time.sleep(1) # 每处理完一个PDF后暂停1秒 # 所有文件处理完成后,将累积的数据保存到Excel文件 if all_extracted_data: output_path = os.path.join(folderPath, 'output.xlsx') if save_to_excel(all_extracted_data, output_path): print(f"\n所有数据已成功保存到 {output_path}") else: print("\n保存数据到Excel文件失败") sys.exit(4) # 保存失败 else: print("\n没有从任何PDF文件中提取到数据") sys.exit(3) # 无法提取数据 print(f"\n所有PDF文件处理完成。") print(f"成功处理文件数: {pdf_files_processed_count}") print(f"处理失败文件数: {pdf_files_failed_count}") print(f"总共提取数据条数: {len(all_extracted_data)}") sys.exit(0) # 所有文件处理完毕后以成功状态退出

本文作者:ivan

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!