数据记录文件转换-python代码

import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import struct
import os

class DataToCsvConverter:
    def __init__(self, root):
        self.root = root
        self.root.title("Data文件转换工具")
        self.root.geometry("600x500")

        # 变量初始化
        self.file_path = tk.StringVar()
        self.output_path = tk.StringVar()
        self.encoding_var = tk.StringVar(value="GB2312")

        self.length_var = tk.StringVar(value="每条记录最大字节数: ")
        self.dez_var = tk.StringVar(value="字段数量: ")
        self.max_rows_var = tk.StringVar(value="最大行数: ")
        self.actual_rows_var = tk.StringVar(value="实际行数: ")

        self.create_widgets()

    def create_widgets(self):
        # 编码选择
        encoding_frame = ttk.LabelFrame(self.root, text="编码设置", padding=10)
        encoding_frame.pack(fill="x", padx=10, pady=5)

        ttk.Label(encoding_frame, text="选择编码:").grid(row=0, column=0, sticky="w")
        encoding_combo = ttk.Combobox(encoding_frame, textvariable=self.encoding_var,
                                    values=["GB2312", "utf-8", "GBK", "big5"], state="readonly")
        encoding_combo.grid(row=0, column=1, sticky="ew", padx=5)
        encoding_frame.columnconfigure(1, weight=1)

        # 文件选择区域
        file_frame = ttk.LabelFrame(self.root, text="文件选择", padding=10)
        file_frame.pack(fill="x", padx=10, pady=5)

        ttk.Label(file_frame, text="原始文件:").grid(row=0, column=0, sticky="w")
        ttk.Entry(file_frame, textvariable=self.file_path, state="readonly").grid(row=0, column=1, sticky="ew", padx=5)
        ttk.Button(file_frame, text="选择文件", command=self.select_file).grid(row=0, column=2, padx=5)

        ttk.Label(file_frame, text="输出文件:").grid(row=1, column=0, sticky="w")
        ttk.Entry(file_frame, textvariable=self.output_path, state="readonly").grid(row=1, column=1, sticky="ew", padx=5)
        ttk.Button(file_frame, text="选择路径", command=self.select_output_path).grid(row=1, column=2, padx=5)

        file_frame.columnconfigure(1, weight=1)

        # 文件信息显示
        info_frame = ttk.LabelFrame(self.root, text="文件信息", padding=10)
        info_frame.pack(fill="x", padx=10, pady=5)

        ttk.Label(info_frame, textvariable=self.length_var).pack(anchor="w")
        ttk.Label(info_frame, textvariable=self.dez_var).pack(anchor="w")
        ttk.Label(info_frame, textvariable=self.max_rows_var).pack(anchor="w")
        ttk.Label(info_frame, textvariable=self.actual_rows_var).pack(anchor="w")

        # 转换按钮
        button_frame = ttk.Frame(self.root)
        button_frame.pack(fill="x", padx=10, pady=10)

        ttk.Button(button_frame, text="开始转换", command=self.convert_file).pack(pady=5)

        # 日志区域
        log_frame = ttk.LabelFrame(self.root, text="日志", padding=10)
        log_frame.pack(fill="both", expand=True, padx=10, pady=5)

        self.log_text = tk.Text(log_frame, height=10, state="disabled")
        scrollbar = ttk.Scrollbar(log_frame, orient="vertical", command=self.log_text.yview)
        self.log_text.configure(yscrollcommand=scrollbar.set)

        self.log_text.pack(side="left", fill="both", expand=True)
        scrollbar.pack(side="right", fill="y")

    def log_message(self, message):
        """添加日志信息"""
        self.log_text.config(state="normal")
        self.log_text.insert("end", message + "\n")
        self.log_text.see("end")
        self.log_text.config(state="disabled")
        self.root.update()

    def select_file(self):
        """选择原始文件"""
        file_path = filedialog.askopenfilename(
            title="选择.data文件",
            filetypes=[("Data files", "*.data"), ("All files", "*.*")]
        )
        if file_path:
            self.file_path.set(file_path)
            # 自动生成输出路径
            output_path = os.path.splitext(file_path)[0] + ".csv"
            self.output_path.set(output_path)
            self.read_file_info()

    def select_output_path(self):
        """选择输出文件路径"""
        if self.file_path.get():
            default_name = os.path.splitext(self.file_path.get())[0] + ".csv"
        else:
            default_name = "output.csv"

        output_path = filedialog.asksaveasfilename(
            title="保存为CSV文件",
            defaultextension=".csv",
            filetypes=[("CSV files", "*.csv"), ("All files", "*.*")],
            initialfile=default_name
        )
        if output_path:
            self.output_path.set(output_path)

    def read_file_info(self):
        """读取文件头信息"""
        if not self.file_path.get():
            return

        try:
            with open(self.file_path.get(), 'rb') as f:
                # 读取每条记录最大字节数 (0x020c)
                f.seek(0x020c)
                length_bytes = f.read(4)
                length = struct.unpack('<I', length_bytes)[0]

                # 读取字段数量 (0x0214)
                f.seek(0x0214)
                dez_bytes = f.read(4)
                dez = struct.unpack('<I', dez_bytes)[0]

                # 读取最大行数 (0x0218)
                f.seek(0x0218)
                max_rows_bytes = f.read(4)
                max_rows = struct.unpack('<I', max_rows_bytes)[0]

                # 读取实际行数 (0x0208)
                f.seek(0x0208)
                actual_rows_bytes = f.read(4)
                actual_rows = struct.unpack('<I', actual_rows_bytes)[0]

                # 更新界面显示
                self.length_var.set(f"每条记录最大字节数: {length}")
                self.dez_var.set(f"字段数量: {dez}")
                self.max_rows_var.set(f"最大行数: {max_rows}")
                self.actual_rows_var.set(f"实际行数: {actual_rows}")

                self.log_message(f"文件信息读取成功:")
                self.log_message(f"  - 记录最大字节数: {length}")
                self.log_message(f"  - 字段数量: {dez}")
                self.log_message(f"  - 最大行数: {max_rows}")
                self.log_message(f"  - 实际行数: {actual_rows}")

        except Exception as e:
            messagebox.showerror("错误", f"读取文件信息失败: {str(e)}")
            self.log_message(f"错误: 读取文件信息失败 - {str(e)}")

    def try_decode(self, data, encoding):
        """尝试用指定编码解码数据"""
        try:
            return data.decode(encoding), True
        except UnicodeDecodeError:
            return None, False

    def convert_file(self):
        """执行文件转换"""
        if not self.file_path.get():
            messagebox.showwarning("警告", "请先选择原始文件")
            return

        if not self.output_path.get():
            messagebox.showwarning("警告", "请先选择输出文件路径")
            return

        try:
            self.log_message("开始转换文件...")

            with open(self.file_path.get(), 'rb') as f:
                # 读取文件头信息
                f.seek(0x020c)
                length = struct.unpack('<I', f.read(4))[0]

                f.seek(0x0214)
                dez = struct.unpack('<I', f.read(4))[0]

                f.seek(0x0208)
                actual_rows = struct.unpack('<I', f.read(4))[0]

                self.log_message(f"转换参数: 记录长度={length}, 字段数={dez}, 实际行数={actual_rows}")

                # 定位到第一条记录 (0x0400)
                f.seek(0x0400)

                with open(self.output_path.get(), 'w', encoding=self.encoding_var.get(), newline='') as csv_file:
                    records_processed = 0
                    encoding_issues = 0

                    for i in range(actual_rows):
                        # 读取一条记录
                        record_bytes = f.read(length)
                        if not record_bytes or len(record_bytes) < length:
                            self.log_message(f"警告: 记录 {i+1} 数据不完整或文件已结束")
                            break

                        # 找到字符串结束位置
                        try:
                            null_pos = record_bytes.index(b'\x00')
                            record_bytes = record_bytes[:null_pos]
                        except ValueError:
                            # 如果没有找到结束符,使用全部数据
                            pass

                        # 尝试使用用户选择的编码解码
                        selected_encoding = self.encoding_var.get()
                        record_text, success = self.try_decode(record_bytes, selected_encoding)

                        # 如果解码失败,尝试其他编码
                        if not success:
                            encoding_issues += 1
                            self.log_message(f"记录 {i+1}: 编码 {selected_encoding} 解码失败,尝试备用编码...")

                            # 尝试其他编码
                            alternative_encodings = [enc for enc in ["GB2312", "utf-8", "GBK", "big5"] if enc != selected_encoding]
                            for alt_enc in alternative_encodings:
                                record_text, success = self.try_decode(record_bytes, alt_enc)
                                if success:
                                    self.log_message(f"记录 {i+1}: 使用备用编码 {alt_enc} 成功解码")
                                    break

                            # 如果所有编码都失败,使用错误处理
                            if not success:
                                try:
                                    record_text = record_bytes.decode(selected_encoding, errors='replace')
                                    self.log_message(f"记录 {i+1}: 使用错误替换方式解码")
                                except:
                                    record_text = "解码失败"
                                    self.log_message(f"记录 {i+1}: 所有解码方式都失败")

                        # 替换^为英文逗号
                        record_text = record_text.replace('^', ',')

                        # 写入CSV文件
                        csv_file.write(record_text + '\n')
                        records_processed += 1

                        # 每处理100条记录显示进度
                        if (i + 1) % 100 == 0:
                            self.log_message(f"已处理 {i + 1} 条记录...")

                    # 转换完成统计
                    self.log_message(f"转换完成!")
                    self.log_message(f"共处理记录数: {records_processed}")
                    if encoding_issues > 0:
                        self.log_message(f"编码问题记录数: {encoding_issues}")

                    messagebox.showinfo("成功",
                                    f"文件转换完成!\n"
                                    f"输出文件: {self.output_path.get()}\n"
                                    f"处理记录数: {records_processed}\n"
                                    f"编码问题记录: {encoding_issues}")

        except Exception as e:
            error_msg = f"转换失败: {str(e)}"
            self.log_message(f"错误: {error_msg}")
            messagebox.showerror("错误", error_msg)

def main():
    root = tk.Tk()
    app = DataToCsvConverter(root)
    root.mainloop()

if __name__ == "__main__":
    main()