import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import struct
import os
class DataToCsvConverter:
def __init__(self, root):
self.root = root
self.root.title("Data文件转换工具")
self.root.geometry("600x500")
# 变量初始化
self.file_path = tk.StringVar()
self.output_path = tk.StringVar()
self.encoding_var = tk.StringVar(value="GB2312")
self.length_var = tk.StringVar(value="每条记录最大字节数: ")
self.dez_var = tk.StringVar(value="字段数量: ")
self.max_rows_var = tk.StringVar(value="最大行数: ")
self.actual_rows_var = tk.StringVar(value="实际行数: ")
self.create_widgets()
def create_widgets(self):
# 编码选择
encoding_frame = ttk.LabelFrame(self.root, text="编码设置", padding=10)
encoding_frame.pack(fill="x", padx=10, pady=5)
ttk.Label(encoding_frame, text="选择编码:").grid(row=0, column=0, sticky="w")
encoding_combo = ttk.Combobox(encoding_frame, textvariable=self.encoding_var,
values=["GB2312", "utf-8", "GBK", "big5"], state="readonly")
encoding_combo.grid(row=0, column=1, sticky="ew", padx=5)
encoding_frame.columnconfigure(1, weight=1)
# 文件选择区域
file_frame = ttk.LabelFrame(self.root, text="文件选择", padding=10)
file_frame.pack(fill="x", padx=10, pady=5)
ttk.Label(file_frame, text="原始文件:").grid(row=0, column=0, sticky="w")
ttk.Entry(file_frame, textvariable=self.file_path, state="readonly").grid(row=0, column=1, sticky="ew", padx=5)
ttk.Button(file_frame, text="选择文件", command=self.select_file).grid(row=0, column=2, padx=5)
ttk.Label(file_frame, text="输出文件:").grid(row=1, column=0, sticky="w")
ttk.Entry(file_frame, textvariable=self.output_path, state="readonly").grid(row=1, column=1, sticky="ew", padx=5)
ttk.Button(file_frame, text="选择路径", command=self.select_output_path).grid(row=1, column=2, padx=5)
file_frame.columnconfigure(1, weight=1)
# 文件信息显示
info_frame = ttk.LabelFrame(self.root, text="文件信息", padding=10)
info_frame.pack(fill="x", padx=10, pady=5)
ttk.Label(info_frame, textvariable=self.length_var).pack(anchor="w")
ttk.Label(info_frame, textvariable=self.dez_var).pack(anchor="w")
ttk.Label(info_frame, textvariable=self.max_rows_var).pack(anchor="w")
ttk.Label(info_frame, textvariable=self.actual_rows_var).pack(anchor="w")
# 转换按钮
button_frame = ttk.Frame(self.root)
button_frame.pack(fill="x", padx=10, pady=10)
ttk.Button(button_frame, text="开始转换", command=self.convert_file).pack(pady=5)
# 日志区域
log_frame = ttk.LabelFrame(self.root, text="日志", padding=10)
log_frame.pack(fill="both", expand=True, padx=10, pady=5)
self.log_text = tk.Text(log_frame, height=10, state="disabled")
scrollbar = ttk.Scrollbar(log_frame, orient="vertical", command=self.log_text.yview)
self.log_text.configure(yscrollcommand=scrollbar.set)
self.log_text.pack(side="left", fill="both", expand=True)
scrollbar.pack(side="right", fill="y")
def log_message(self, message):
"""添加日志信息"""
self.log_text.config(state="normal")
self.log_text.insert("end", message + "\n")
self.log_text.see("end")
self.log_text.config(state="disabled")
self.root.update()
def select_file(self):
"""选择原始文件"""
file_path = filedialog.askopenfilename(
title="选择.data文件",
filetypes=[("Data files", "*.data"), ("All files", "*.*")]
)
if file_path:
self.file_path.set(file_path)
# 自动生成输出路径
output_path = os.path.splitext(file_path)[0] + ".csv"
self.output_path.set(output_path)
self.read_file_info()
def select_output_path(self):
"""选择输出文件路径"""
if self.file_path.get():
default_name = os.path.splitext(self.file_path.get())[0] + ".csv"
else:
default_name = "output.csv"
output_path = filedialog.asksaveasfilename(
title="保存为CSV文件",
defaultextension=".csv",
filetypes=[("CSV files", "*.csv"), ("All files", "*.*")],
initialfile=default_name
)
if output_path:
self.output_path.set(output_path)
def read_file_info(self):
"""读取文件头信息"""
if not self.file_path.get():
return
try:
with open(self.file_path.get(), 'rb') as f:
# 读取每条记录最大字节数 (0x020c)
f.seek(0x020c)
length_bytes = f.read(4)
length = struct.unpack('<I', length_bytes)[0]
# 读取字段数量 (0x0214)
f.seek(0x0214)
dez_bytes = f.read(4)
dez = struct.unpack('<I', dez_bytes)[0]
# 读取最大行数 (0x0218)
f.seek(0x0218)
max_rows_bytes = f.read(4)
max_rows = struct.unpack('<I', max_rows_bytes)[0]
# 读取实际行数 (0x0208)
f.seek(0x0208)
actual_rows_bytes = f.read(4)
actual_rows = struct.unpack('<I', actual_rows_bytes)[0]
# 更新界面显示
self.length_var.set(f"每条记录最大字节数: {length}")
self.dez_var.set(f"字段数量: {dez}")
self.max_rows_var.set(f"最大行数: {max_rows}")
self.actual_rows_var.set(f"实际行数: {actual_rows}")
self.log_message(f"文件信息读取成功:")
self.log_message(f" - 记录最大字节数: {length}")
self.log_message(f" - 字段数量: {dez}")
self.log_message(f" - 最大行数: {max_rows}")
self.log_message(f" - 实际行数: {actual_rows}")
except Exception as e:
messagebox.showerror("错误", f"读取文件信息失败: {str(e)}")
self.log_message(f"错误: 读取文件信息失败 - {str(e)}")
def try_decode(self, data, encoding):
"""尝试用指定编码解码数据"""
try:
return data.decode(encoding), True
except UnicodeDecodeError:
return None, False
def convert_file(self):
"""执行文件转换"""
if not self.file_path.get():
messagebox.showwarning("警告", "请先选择原始文件")
return
if not self.output_path.get():
messagebox.showwarning("警告", "请先选择输出文件路径")
return
try:
self.log_message("开始转换文件...")
with open(self.file_path.get(), 'rb') as f:
# 读取文件头信息
f.seek(0x020c)
length = struct.unpack('<I', f.read(4))[0]
f.seek(0x0214)
dez = struct.unpack('<I', f.read(4))[0]
f.seek(0x0208)
actual_rows = struct.unpack('<I', f.read(4))[0]
self.log_message(f"转换参数: 记录长度={length}, 字段数={dez}, 实际行数={actual_rows}")
# 定位到第一条记录 (0x0400)
f.seek(0x0400)
with open(self.output_path.get(), 'w', encoding=self.encoding_var.get(), newline='') as csv_file:
records_processed = 0
encoding_issues = 0
for i in range(actual_rows):
# 读取一条记录
record_bytes = f.read(length)
if not record_bytes or len(record_bytes) < length:
self.log_message(f"警告: 记录 {i+1} 数据不完整或文件已结束")
break
# 找到字符串结束位置
try:
null_pos = record_bytes.index(b'\x00')
record_bytes = record_bytes[:null_pos]
except ValueError:
# 如果没有找到结束符,使用全部数据
pass
# 尝试使用用户选择的编码解码
selected_encoding = self.encoding_var.get()
record_text, success = self.try_decode(record_bytes, selected_encoding)
# 如果解码失败,尝试其他编码
if not success:
encoding_issues += 1
self.log_message(f"记录 {i+1}: 编码 {selected_encoding} 解码失败,尝试备用编码...")
# 尝试其他编码
alternative_encodings = [enc for enc in ["GB2312", "utf-8", "GBK", "big5"] if enc != selected_encoding]
for alt_enc in alternative_encodings:
record_text, success = self.try_decode(record_bytes, alt_enc)
if success:
self.log_message(f"记录 {i+1}: 使用备用编码 {alt_enc} 成功解码")
break
# 如果所有编码都失败,使用错误处理
if not success:
try:
record_text = record_bytes.decode(selected_encoding, errors='replace')
self.log_message(f"记录 {i+1}: 使用错误替换方式解码")
except:
record_text = "解码失败"
self.log_message(f"记录 {i+1}: 所有解码方式都失败")
# 替换^为英文逗号
record_text = record_text.replace('^', ',')
# 写入CSV文件
csv_file.write(record_text + '\n')
records_processed += 1
# 每处理100条记录显示进度
if (i + 1) % 100 == 0:
self.log_message(f"已处理 {i + 1} 条记录...")
# 转换完成统计
self.log_message(f"转换完成!")
self.log_message(f"共处理记录数: {records_processed}")
if encoding_issues > 0:
self.log_message(f"编码问题记录数: {encoding_issues}")
messagebox.showinfo("成功",
f"文件转换完成!\n"
f"输出文件: {self.output_path.get()}\n"
f"处理记录数: {records_processed}\n"
f"编码问题记录: {encoding_issues}")
except Exception as e:
error_msg = f"转换失败: {str(e)}"
self.log_message(f"错误: {error_msg}")
messagebox.showerror("错误", error_msg)
def main():
root = tk.Tk()
app = DataToCsvConverter(root)
root.mainloop()
if __name__ == "__main__":
main()