进行了修正, 支持pdf页码颠倒,同个pdf是不同批次的报关单等。。。
pythonimport pdfplumber
import pandas as pd
import os
import re
import pymysql
import sys
import time # 导入time模块
'''
错误代码定义:
sys.exit(1)
0:正常结束
1:参数传递错误
2:不是PDF文件
3:无法提取数据
4:数据库保存失败
'''
# 在文件开头的import语句下方,添加以下函数定义
def extract_text_by_position(page, x0, y0, x1, y1):
"""从页面的指定坐标区域提取文本"""
try:
crop = page.crop((x0, y0, x1, y1))
text = crop.extract_text() if crop else ""
return text.strip()
except Exception as e:
print(f"提取文本时出错: {e}, 坐标: ({x0}, {y0}, {x1}, {y1})")
return ""
def extract_data_from_pdf_by_position(pdf_path):
all_data = []
file_name = os.path.basename(pdf_path)
try:
with pdfplumber.open(pdf_path) as pdf:
print(f"开始处理PDF文件: {file_name}")
# 首先扫描所有页面,提取页码信息
all_pages_with_info = []
for page_idx, page_obj in enumerate(pdf.pages):
# 只处理横向页面
if page_obj.width > page_obj.height:
try:
# 提取右上角的页码/页数信息
page_number_coords = (600, 75.4, 800, 84.5) # 右上角页码区域坐标
# 确保坐标在页面范围内
if page_number_coords[0] < page_obj.width and page_number_coords[2] < page_obj.width and \
page_number_coords[1] < page_obj.height and page_number_coords[3] < page_obj.height:
page_number_crop = page_obj.crop(page_number_coords)
page_number_text = page_number_crop.extract_text() if page_number_crop else ""
# 使用正则表达式提取页码/页数
page_info_match = re.search(r'(\d+)/(\d+)', page_number_text) if page_number_text else None
if page_info_match:
current_page = int(page_info_match.group(1))
total_pages = int(page_info_match.group(2))
print(f"识别到页码信息: 第{current_page}页,共{total_pages}页 (物理页码 {page_idx+1})")
# 保存页面和页码信息
all_pages_with_info.append({
'page_obj': page_obj,
'current_page': current_page,
'total_pages': total_pages,
'physical_page': page_idx + 1
})
else:
print(f"未在页面 {page_idx+1} 找到页码/页数信息")
except Exception as e:
print(f"处理页面 {page_idx+1} 页码信息时出错: {e}")
# 处理所有识别到的页面 - 修改为支持每页独立基础信息
if all_pages_with_info:
print(f"共识别到 {len(all_pages_with_info)} 个有效页面")
# 新的处理逻辑:每页独立处理,动态识别基础信息
# 第一步:建立预录入编号到基础信息的映射
pre_input_to_base_info = {} # 预录入编号 -> 基础信息映射
processed_pages = set() # 记录已处理的页面
# 扫描所有页面,找到包含"境内发货人"字段的页面并提取基础信息
for page_info in all_pages_with_info:
page_obj = page_info['page_obj']
physical_page = page_info['physical_page']
# 检查页面是否包含"境内发货人"文字
full_page_text = page_obj.extract_text()
if "境内发货人" in full_page_text:
print(f"\n页面 {physical_page} 包含'境内发货人'字段,提取基础信息")
# 定义基础信息字段的坐标
field_coordinates = {
'预录入编号': (85.8, 75.4, 165.8, 84.5),
'海关编号': (295.2, 75.4, 375.8, 84.5),
'境内发货人': (36.75, 88.67, 255.375, 112.295),
'境外收货人': (36.75, 112.295, 255.375, 135.545),
'生产销售单位': (36.75, 135.545, 255.375, 158.795),
'运输方式': (255.375, 112.295, 383.625, 135.545),
'离境口岸': (637.125, 158.795, 805.5, 182.045),
'出境关别':(640.50,170.86,658.50,179.86),
'合同协议号': (36.75, 158.795, 255.375, 182.045),
'监管方式': (255.375, 135.545, 383.625, 158.795),
'征免性质': (383.625, 135.545, 516.375, 158.795),
'许可证号': (516.375, 135.545, 805.5, 158.795),
'备案号': (637.125, 88.67, 805.5, 112.295),
'申报日期': (516.375, 88.67, 637.125, 112.295),
'出口日期': (383.625, 88.67, 516.375, 112.295),
'运输工具名称及航次号': (383.625, 112.295, 516.375, 135.545),
'运抵国(地区)': (383.625, 158.795, 516.375, 182.045),
'指运港': (516.375, 158.795, 637.125, 182.045),
'提运单号': (516.375, 112.295, 805.5, 135.545),
'包装种类': (36.75, 182.045, 255.375, 205.295),
'件数': (260.375, 182.045, 301.125, 205.295),
'毛重': (301.125, 182.045, 383.625, 205.295),
'净重': (383.625, 182.045, 463.125, 205.295),
'成交方式': (463.125, 182.045, 516.375, 205.295),
'运费': (516.375, 182.045, 608.625, 205.295),
'保费': (608.625, 182.045, 700.875, 205.295),
'杂费': (700.875, 182.045, 805.5, 205.295),
'随附单证及编号': (36.75, 205.295, 805.5, 228.545),
'标记唛码及备注': (36.75, 228.545, 805.5, 274.295),
'申报单位': (78.8, 528.8, 294.8, 537.8)
}
# 提取预录入编号作为映射键
pre_input_text = extract_text_by_position(page_obj, *field_coordinates['预录入编号'])
pre_input_match = re.search(r'([A-Za-z0-9]+)', pre_input_text)
if pre_input_match:
pre_input_number = pre_input_match.group(1)
print(f"页面 {physical_page} 识别到预录入编号: {pre_input_number}")
# 初始化基础数据
base_data_from_first_page = {'文件名': file_name}
# 提取所有基础信息字段
for field, coords in field_coordinates.items():
try:
x0, y0, x1, y1 = coords
# 确保坐标在页面范围内
if x0 >= 0 and y0 >= 0 and x1 <= page_obj.width and y1 <= page_obj.height and x0 < x1 and y0 < y1:
text_content = extract_text_by_position(page_obj, x0, y0, x1, y1)
if text_content:
# 数据清洗逻辑
# 初始清理:移除可能的字段名和前后空格
cleaned_text_intermediate = re.sub(rf'(?i)^{re.escape(field)}\s*:?\s*', '', text_content).strip()
# 如果第一次清理后结果为空,但原始文本不为空,说明可能字段名就是全部内容,或者清理过度
if not cleaned_text_intermediate and text_content.strip():
# 尝试一个更简单的清理,只移除开头的字段名(不区分大小写)
cleaned_text_intermediate = re.sub(rf'(?i)^{re.escape(field)}', '', text_content).strip()
# 如果还是空,并且原始文本不等于字段名本身,则用原始文本
if not cleaned_text_intermediate and text_content.strip().lower() != field.lower():
cleaned_text_intermediate = text_content.strip()
elif not cleaned_text_intermediate and text_content.strip().lower() == field.lower():
# 如果清理后为空,且原始文本就是字段名,那么这个字段可能确实没有值
base_data_from_first_page[field] = ''
continue
final_clean_text = cleaned_text_intermediate
# 提取括号内的代码
code_match = re.search(r'\(([^)]+)\)', final_clean_text)
if code_match:
code_value = code_match.group(1).strip()
# 提取代码后,从文本中移除括号及其内容,以及可能的尾随字段名
text_without_code = re.sub(r'\s*\(([^)]+)\)\s*', '', final_clean_text).strip()
# 再次尝试移除字段名,以防它在括号之后或之前
text_without_code_and_field = re.sub(rf'(?i)^{re.escape(field)}\s*:?\s*', '', text_without_code).strip()
if not text_without_code_and_field and text_without_code.strip():
final_clean_text = text_without_code.strip()
else:
final_clean_text = text_without_code_and_field
# 保存代码到对应的代码字段
if field == '境内发货人': base_data_from_first_page['境内发货人代码'] = code_value
elif field == '离境口岸': base_data_from_first_page['离境口岸代码'] = code_value
elif field == '监管方式': base_data_from_first_page['监管方式代码'] = code_value
elif field == '生产销售单位': base_data_from_first_page['生产销售单位代码'] = code_value
elif field == '指运港': base_data_from_first_page['指运港代码'] = code_value
elif field == '毛重': base_data_from_first_page['毛重代码'] = code_value
elif field == '净重': base_data_from_first_page['净重代码'] = code_value
elif field == '成交方式': base_data_from_first_page['成交方式代码'] = code_value
elif field == '申报单位': base_data_from_first_page['申报单位代码'] = code_value
elif field == '征免性质': base_data_from_first_page['征免性质代码'] = code_value
# 最后的清理,确保移除了字段名
final_value = re.sub(rf'(?i){re.escape(field)}\s*:?\s*', '', final_clean_text).strip()
final_value = re.sub(r'\s+', ' ', final_value).strip()
# 如果最终值不等于字段名本身(不区分大小写),则认为是有效值
if final_value.lower() != field.lower():
base_data_from_first_page[field] = final_value
elif base_data_from_first_page.get(field) is None:
base_data_from_first_page[field] = ''
else:
base_data_from_first_page[field] = ''
else:
print(f"警告: 字段 '{field}' 的坐标 {coords} 超出页面边界,跳过此字段。")
except Exception as e:
print(f"警告: 字段 '{field}' 提取时发生错误: {e}。坐标: {coords}")
# 保存到映射中
pre_input_to_base_info[pre_input_number] = base_data_from_first_page
print(f"保存预录入编号 {pre_input_number} 的基础信息,共 {len(base_data_from_first_page)} 个字段")
print(f"\n共找到 {len(pre_input_to_base_info)} 个预录入编号的基础信息")
# 第二步:处理所有页面,提取商品项并匹配基础信息
for page_info in all_pages_with_info:
if page_info['physical_page'] in processed_pages:
continue
current_page_num = page_info['current_page']
total_pages = page_info['total_pages']
page_obj = page_info['page_obj']
physical_page = page_info['physical_page']
print(f"\n处理页面 {physical_page} (报关单页码: {current_page_num}/{total_pages})")
# 提取当前页面的预录入编号
pre_input_coords = (85.8, 75.4, 165.8, 84.5)
pre_input_text = extract_text_by_position(page_obj, *pre_input_coords)
pre_input_match = re.search(r'([A-Za-z0-9]+)', pre_input_text)
if pre_input_match:
current_pre_input = pre_input_match.group(1)
print(f"页面 {physical_page} 识别到预录入编号: {current_pre_input}")
# 查找对应的基础信息
if current_pre_input in pre_input_to_base_info:
base_data = pre_input_to_base_info[current_pre_input].copy()
print(f"页面 {physical_page} 使用预录入编号 {current_pre_input} 的基础信息")
else:
print(f"警告:页面 {physical_page} 的预录入编号 {current_pre_input} 未找到对应的基础信息")
base_data = {'文件名': file_name, '预录入编号': current_pre_input}
else:
print(f"警告:页面 {physical_page} 未找到预录入编号")
base_data = {'文件名': file_name}
# 根据页码选择不同的坐标
if current_page_num == 1: # 第一页
row_coordinates = [
(36.75, 287.795, 805.5, 320.045), (36.75, 320.045, 805.5, 352.3),
(36.75, 352.3, 805.5, 384.55), (36.75, 384.55, 805.5, 416.8),
(36.75, 416.8, 805.5, 449.05), (36.75, 449.05, 805.5, 481.295)
]
is_first_page = True
print(f"使用第一页坐标提取数据,页码: {current_page_num}/{total_pages}")
else: # 后续页面
row_coordinates = [
(36.75, 102.545, 805.5, 134.8), (36.75, 134.8, 805.5, 167.05),
(36.75, 167.05, 805.5, 199.3), (36.75, 199.3, 805.5, 231.55),
(36.75, 231.55, 805.5, 263.8), (36.75, 263.8, 805.5, 296.05),
(36.75, 296.05, 805.5, 328.3), (36.75, 328.3, 805.5, 360.55),
(36.75, 360.55, 805.5, 392.8), (36.75, 392.8, 805.5, 425.05),
(36.75, 425.05, 805.5, 457.3), (36.75, 457.3, 805.5, 489.55),
(36.75, 489.55, 805.5, 521.8), (36.75, 521.8, 805.5, 554.6075)
]
is_first_page = False
print(f"使用后续页面坐标提取数据,页码: {current_page_num}/{total_pages}")
# 提取当前页面的商品项
page_items = extract_items_from_page(page_obj, row_coordinates, pdf_path, physical_page, is_first_page)
# 将基础数据与商品项数据合并
if page_items:
for item_data in page_items:
final_item_data = base_data.copy()
final_item_data.update(item_data)
all_data.append(final_item_data)
processed_pages.add(physical_page)
# 重新编排项号
if all_data:
all_data.sort(key=lambda x: (x.get('文件名', ''), x.get('项号', float('inf'))))
current_item_number = 1
for item in all_data:
if '商品编号' in item and item['商品编号']:
item['项号'] = current_item_number
current_item_number += 1
return all_data
except Exception as e:
print(f"提取数据过程中发生严重错误: {e}")
import traceback
traceback.print_exc() # 打印完整的错误堆栈信息
return []
def extract_items_from_page(page_obj, row_coordinates, pdf_path, page_idx, is_first_page=True):
"""从页面提取商品项数据"""
collected_items = []
page_width = page_obj.width
page_height = page_obj.height
print(f"提取页面 {page_idx} 的商品项,使用{'第一页' if is_first_page else '后续页面'}坐标")
for item_row_idx, row_coords in enumerate(row_coordinates):
x0, top, x1, bottom = row_coords
if not (x0 >= 0 and top >= 0 and x1 <= page_width and bottom <= page_height and x0 < x1 and top < bottom):
print(f"警告: 文件 '{pdf_path}',页面 {page_idx},商品行 {item_row_idx + 1} 的坐标 {row_coords} 无效,跳过此行。")
continue
item_data = {}
try:
crop_area = page_obj.crop((x0, top, x1, bottom))
full_text = crop_area.extract_text(x_tolerance=1, y_tolerance=1)
# 以下正则均为内容解析
if full_text and full_text.strip():
# 打印调试信息
print(f"页面 {page_idx}, 行 {item_row_idx + 1} 提取的文本: {full_text[:50]}...")
# 其余解析逻辑保持不变
item_number_match = re.search(r'^\s*(\d{1,2})\s+', full_text)
if item_number_match: item_data['项号'] = int(item_number_match.group(1))
product_code_name_match = re.search(r'^\s*\d{1,2}\s+(\d+)([^\d\n]+)', full_text)
if product_code_name_match:
item_data['商品编号'] = product_code_name_match.group(1).strip()
item_data['商品名称'] = product_code_name_match.group(2).strip()
qty_unit_match = re.search(r'(\d+\.?\d*)(台|千克|个|件|套|箱)', full_text)
if qty_unit_match: item_data['数量及单位'] = qty_unit_match.group(0).strip()
product_match2 = re.findall(r'(?:\b\d+\|[^\s]*(?=\s+))|(?:\n\s*(?:[a-zA-Z]+|[\u4e00-\u9fa5]+|[a-zA-Z0-9]+/[a-zA-Z0-9]+|\d+[a-zA-Z]+/?[a-zA-Z/]*)(?=\s+))',full_text)
if product_match2: item_data['商品名称及规格型号'] = "".join(product_match2)
price_match = re.search(r'(\d+\.\d{4})', full_text)
if price_match: item_data['单价'] = price_match.group(1)
total_price_match = re.search(r'(\b\d+\.\d{2}\b)', full_text)
if total_price_match: item_data['总价'] = total_price_match.group(1)
currency_match = re.search(r'(美元|人民币|欧元|英镑|日元)', full_text)
if currency_match: item_data['币制'] = currency_match.group(1)
regex_pattern = r"摩尔多瓦|芬兰|老挝|俄罗斯|白罗斯|乌克兰|摩尔达维亚|立陶宛|拉脱维亚|爱沙尼亚共和国|哈萨克斯坦|格鲁吉亚|乌兹别克斯坦|朝鲜|蒙古|越南|中国|波兰|保加利亚|捷克|匈牙利|斯洛伐克|阿塞拜疆|吉尔吉斯斯坦|塔吉克斯坦|土库曼斯坦|阿富汗|英国|西班牙|塞尔维亚|土耳其|德国|奥地利|卢森堡|意大利|荷兰|法国|比利时|伊朗|安道尔|阿联酋|安提瓜和巴布达|安圭拉|阿尔巴尼亚|亚美尼亚|安哥拉|南极洲|阿根廷|美属萨摩亚|澳大利亚|阿鲁巴|波黑|巴巴多斯|孟加拉|布基纳法索|巴林|布隆迪|贝宁|百慕大|文莱|玻利维亚|荷兰加勒比区|巴西|巴哈马|不丹|博茨瓦纳|伯利兹|加拿大|刚果(金)|中非|刚果(布)|瑞士|科特迪瓦|智利|喀麦隆|哥伦比亚|哥斯达黎加|古巴|佛得角|塞浦路斯|吉布提|丹麦|多米尼克|多米尼加|阿尔及利亚|厄瓜多尔|爱沙尼亚|埃及|西撒哈拉|厄立特里亚|埃塞俄比亚|密克罗尼西亚联邦|加蓬|格林纳达|法属圭亚那|加纳|直布罗陀|格陵兰|冈比亚|几内亚|瓜德罗普|赤道几内亚|希腊|危地马拉|几内亚比绍|圭亚那|洪都拉斯|克罗地亚|海地|印尼|印度尼西亚共和国|爱尔兰|以色列|印度|英属印度洋领地|伊拉克|牙买加|约旦|日本|肯尼亚|柬埔寨|基里巴斯|科摩罗|圣基茨和尼维斯|韩国|科威特|黎巴嫩|圣卢西亚|列支敦士登|斯里兰卡|利比里亚|莱索托|利比亚|摩洛哥|摩纳哥|黑山|法属圣马丁|马达加斯加|马其顿|马里|缅甸|马提尼克|毛里塔尼亚|马耳他|毛里求斯|马尔代夫|墨西哥|马来西亚|莫桑比克|纳米比亚|新喀里多尼亚|尼日尔|尼日利亚|尼加拉瓜|挪威|尼泊尔|瑙鲁|纽埃|新西兰|阿曼|巴拿马|秘鲁|法属波利尼西亚|巴布亚新几内亚|菲律宾|巴基斯坦|圣皮埃尔和密克隆|波多黎各|巴勒斯坦|葡萄牙|帕劳|巴拉圭|卡塔尔|留尼汪|罗马尼亚|卢旺达|沙特阿拉伯|塞舌尔|苏丹|瑞典|新加坡|圣赫勒拿|斯洛文尼亚|塞拉利昂|圣马力诺|塞内加尔|索马里|苏里南|南苏丹|圣多美和普林西比|萨尔瓦多|叙利亚|斯威士兰|乍得|法属南部领地|多哥|泰国|托克劳|东帝汶|突尼斯|汤加|特立尼达和多巴哥|图瓦卢|坦桑尼亚|乌干达|美国|乌拉圭|梵蒂冈|圣文森特和格林纳丁斯|委内瑞拉|瓦努阿图|瓦利斯和富图纳|萨摩亚|也门|马约特|南非|赞比亚|津巴布韦"
country_names = re.findall(regex_pattern, full_text)
if len(country_names) >= 2:
item_data['原产国'] = country_names[0]
item_data['最终目的国'] = country_names[1]
source_match = re.search(r'\((\d{5})\)\s*([^()]+?)(?=\s*(?:照章征税|\(CHN\)))', full_text)
if source_match: item_data['境内货源地'] = source_match.group(1) + source_match.group(2).strip()
tax_match = re.search(r'(照章征税|免税|保税|减免|先征后退|先征后返|保证金|暂定税率|特惠税率|最惠国税率|普通税率|协定税率)',full_text)
if tax_match: item_data['征免性质'] = tax_match.group(1)
declare_match = re.search(r'申报单位[\s::]*\(.*?\)(.*?)',full_text)
if declare_match: item_data['申报单位'] = declare_match.group(1)
# 增加更严格的验证
if '商品编号' in item_data and item_data['商品编号'] and '商品名称' in item_data and item_data['商品名称']:
# 检查是否是有效的商品项(例如,项号应该是数字)
if '项号' in item_data and isinstance(item_data['项号'], int):
collected_items.append(item_data)
print(f"成功提取商品项: 编号={item_data.get('商品编号', '未知')}, 项号={item_data.get('项号', '未知')}")
else:
print(f"跳过无效商品项: 页面 {page_idx}, 行 {item_row_idx + 1} - 无效的项号")
else:
print(f"跳过无效商品项: 页面 {page_idx}, 行 {item_row_idx + 1} - 缺少商品编号或商品名称")
except Exception as e:
print(f"警告: 文件 '{pdf_path}',页面 {page_idx},商品行 {item_row_idx + 1} 提取时发生错误: {e}。坐标: {row_coords}")
print(f"页面 {page_idx} 共提取 {len(collected_items)} 个商品项")
return collected_items
# 调试坐标的辅助函数
def debug_coordinates(page_obj, description, x0, top, x1, bottom):
try:
crop = page_obj.crop((x0, top, x1, bottom))
text = crop.extract_text() if crop else ""
print(f"调试 {description}: 坐标=({x0}, {top}, {x1}, {bottom}), 提取文本='{text}'")
return text
except Exception as e:
print(f"调试 {description} 出错: {e}")
return ""
def save_to_mysql(data, table_name="te_bgd"):
"""将数据保存到MySQL数据库"""
connection = None # 初始化connection为None
try:
# 建立连接
connection = pymysql.connect(
host='rm-2ze425vzufup*****',
port=3306,
user='teuser',
password='*****',
database='tjtedb',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
print("数据库连接成功") # 新增,确认连接成功
with connection.cursor() as cursor:
# 准备批量插入的数据
insert_data = []
for item in data:
# 创建一个元组,包含要插入的字段值
row_data = (
item.get('文件名', ''),
item.get('预录入编号', ''),
item.get('海关编号', ''),
item.get('境内发货人代码', ''),
item.get('境内发货人', ''),
item.get('离境口岸代码', ''),
item.get('离境口岸', ''),
item.get('申报日期', ''),
item.get('境外收货人', ''),
item.get('运输方式', ''),
item.get('运输工具名称及航次号', ''),
item.get('提运单号', ''),
item.get('生产销售单位代码', ''),
item.get('生产销售单位', ''),
item.get('监管方式代码', ''),
item.get('监管方式', ''),
item.get('合同协议号', ''),
item.get('贸易国(地区)', ''),
item.get('运抵国(地区)', ''),
item.get('指运港代码', ''),
item.get('指运港', ''),
item.get('离境口岸代码', ''),
item.get('离境口岸', ''),
item.get('包装种类', ''),
item.get('件数', ''),
item.get('毛重代码', ''),
item.get('毛重', ''),
item.get('净重代码', ''),
item.get('净重', ''),
item.get('成交方式代码', ''),
item.get('成交方式', ''),
item.get('随附单证及编号', ''),
item.get('标记唛码及备注', ''),
item.get('申报单位代码', ''),
item.get('申报单位', ''),
item.get('项号', ''),
item.get('商品编号', ''),
item.get('商品名称', ''),
item.get('商品名称及规格型号', ''),
item.get('数量及单位', ''),
item.get('单价', ''),
item.get('总价', ''),
item.get('币制', ''),
item.get('原产国', ''),
item.get('最终目的国', ''),
item.get('境内货源地', ''),
item.get('征免性质代码', ''),
item.get('征免性质', ''),
item.get('运费', ''),
item.get('保费', ''),
item.get('杂费', '')
)
insert_data.append(row_data)
# 构建SQL插入语句
fields = [
"file_name", "pre_input_number", "customs_number", "domestic_shipper_code",
"domestic_shipper", "exit_customs_code", "exit_customs", "date_of_declaration",
"overseas_consignee", "mode_of_transport", "transport_vehicle_name_and_voyage_number",
"delivery_numbers", "production_and_sales_unit_code", "production_and_sales_units",
"regulatory_code", "regulatory_approach", "contract_number", "trading_country_region",
"destination_country_region", "designated_port_code", "port_of_destination",
"departure_port_code", "port_of_departure", "packaging_type", "quantity",
"gross_weight_code", "gross_weight", "net_weight_code", "net_weight", "transaction_method_code",
"transaction_method", "accompanying_documents_and_numbers", "shipping_marks_remarks",
"declaration_unit_code", "declaration_unit", "item_number", "product_code",
"product_name", "product_name_and_specification_model", "quantity_and_unit",
"unit_price", "total_price", "currency_system", "country_of_origin",
"final_destination_country", "original_place_of_delivered_goods", "exemption_nature_code",
"nature_of_exemption","shipping_fee","premium","miscellaneous_expenses"
]
if not insert_data: # 如果没有数据可以插入
print("没有数据需要插入数据库。")
return True # 或者 False
# 确保字段数量与值数量匹配 (仅在有数据时检查第一条)
if len(fields) != len(insert_data[0]):
print(f"错误:字段数量({len(fields)})与值数量({len(insert_data[0])})不匹配")
print(f"字段列表: {fields}")
print(f"第一行数据: {insert_data[0]}")
return False
sql = """
INSERT INTO {} ({}) VALUES ({})
""".format(
table_name,
", ".join(fields),
", ".join(['%s'] * len(fields))
)
cursor.executemany(sql, insert_data)
connection.commit()
print(f"成功将 {len(insert_data)} 条数据插入到数据库表 {table_name}")
return True
except Exception as e:
print(f"保存到数据库时出错: {e}")
if connection:
connection.rollback()
return False
finally:
if connection:
connection.close()
print("数据库连接已关闭")
def save_to_excel(data, output_path):
"""将数据保存到Excel文件"""
try:
# 确保所有记录都有相同的字段集
all_fields = set()
for item in data:
all_fields.update(item.keys())
# 为每个记录添加缺失的字段
for item in data:
for field in all_fields:
if field not in item:
item[field] = ""
# 创建DataFrame并保存
df = pd.DataFrame(data)
# 可以指定列的顺序
# 可以指定列的顺序
column_order = [
# 文件与单证信息
'文件名', '预录入编号', '海关编号', '合同协议号', '提运单号',
# 日期信息
'申报日期', '出口日期',
# 参与方信息
'境内发货人代码', '境内发货人', '境外收货人', '生产销售单位代码', '生产销售单位', '申报单位代码', '申报单位',
# 物流运输信息
'运输方式', '运输工具名称及航次号', '离境口岸代码', '离境口岸','出境关别代码', '出境关别', '运抵国(地区)', '指运港代码', '指运港',
# 货物描述
'包装种类', '件数', '毛重代码', '毛重', '净重代码', '净重',
# 价格与费用
'成交方式代码', '成交方式', '运费', '保费', '杂费',
# 单证与备注
'随附单证及编号', '标记唛码及备注',
# 监管与税费
'监管方式代码', '监管方式', '征免性质代码', '征免性质',
# 商品项详细信息
'项号', '商品编号', '商品名称', '商品名称及规格型号', '数量及单位',
'单价', '总价', '币制', '原产国', '最终目的国', '境内货源地'
]
# 只保留存在的列
existing_columns = [col for col in column_order if col in df.columns]
# 重新排序列并保存
df = df[existing_columns]
df.to_excel(output_path, index=False)
print(f"数据已保存到 {output_path}")
return True
except Exception as e:
print(f"保存Excel时出错: {e}")
return False
def process_pdf_files(pdf_path):
"""处理指定的PDF文件并返回提取的数据"""
data = extract_data_from_pdf_by_position(pdf_path)
if data:
print(f"从 {os.path.basename(pdf_path)} 提取了 {len(data)} 条数据")
return data
else:
print(f"无法从 {os.path.basename(pdf_path)} 提取数据")
return [] # 返回空列表而不是错误代码
def isPdf(file_path):
try:
with open(file_path, 'rb') as f:
header = f.read(5)
return header == b'%PDF-'
except:
return False
if __name__ == "__main__":
print("程序开始执行...")
if len(sys.argv) != 2:
print("参数错误:需要提供一个文件夹路径")
sys.exit(1)
folderPath = sys.argv[1]
print(f"接收到的文件夹路径: {folderPath}")
if not os.path.isdir(folderPath):
print(f"路径错误:'{folderPath}' 不是一个有效的文件夹")
sys.exit(1)
pdf_files_processed_count = 0
pdf_files_failed_count = 0
all_extracted_data = [] # 用于累积所有PDF文件的数据
for root, dirs, files in os.walk(folderPath):
for file in files:
if file.lower().endswith('.pdf'):
filePath = os.path.join(root, file)
print(f"\n开始处理PDF文件: {filePath}")
if not isPdf(filePath):
print(f"文件类型错误:'{filePath}' 不是一个有效的PDF文件,跳过")
pdf_files_failed_count += 1
continue
# 处理PDF文件并获取数据
data = process_pdf_files(filePath)
if data: # 如果成功提取到数据
all_extracted_data.extend(data) # 将数据添加到累积列表中
pdf_files_processed_count += 1
print(f"文件 {filePath} 处理成功,累计提取 {len(all_extracted_data)} 条数据。")
else:
pdf_files_failed_count += 1
print(f"文件 {filePath} 处理失败,未能提取数据。")
print("暂停1秒...")
time.sleep(1) # 每处理完一个PDF后暂停1秒
# 所有文件处理完成后,将累积的数据保存到Excel文件
if all_extracted_data:
output_path = os.path.join(folderPath, 'output.xlsx')
if save_to_excel(all_extracted_data, output_path):
print(f"\n所有数据已成功保存到 {output_path}")
else:
print("\n保存数据到Excel文件失败")
sys.exit(4) # 保存失败
else:
print("\n没有从任何PDF文件中提取到数据")
sys.exit(3) # 无法提取数据
print(f"\n所有PDF文件处理完成。")
print(f"成功处理文件数: {pdf_files_processed_count}")
print(f"处理失败文件数: {pdf_files_failed_count}")
print(f"总共提取数据条数: {len(all_extracted_data)}")
sys.exit(0) # 所有文件处理完毕后以成功状态退出
本文作者:ivan
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!