Spaces:
Paused
Paused
| import os | |
| import json | |
| # 检查文件编码 | |
| def checkEncoding(filepath): | |
| with open(filepath, "rb") as encode_check: | |
| encoding = encode_check.readline(3) | |
| if encoding == b"\xfe\xff\x00": | |
| return "utf_16_be" | |
| elif encoding == b"\xff\xfe0": | |
| return "utf_16_le" | |
| else: | |
| return "utf_8" | |
| # 读取文本文件 | |
| def readTextFile(filepath): | |
| if os.path.exists(filepath): | |
| file_encoding = checkEncoding(filepath) | |
| try: | |
| with open(filepath, "rt", encoding=file_encoding) as f_in: | |
| return f_in.readlines() | |
| except: | |
| with open(filepath, "rt", encoding="latin_1") as f_in: | |
| return f_in.readlines() | |
| return None | |
| # 处理单个LDR文件数据 | |
| def process_ldr_data(lines, label_mapping, label_inverse_mapping, label_frequency, label_counter): | |
| # 定位main_section范围 | |
| startLine = 0 | |
| endLine = 0 | |
| lineCount = 0 | |
| foundEnd = False | |
| main_section_lines = [] | |
| for line in lines: | |
| parameters = line.strip().split() | |
| if len(parameters) > 2: | |
| if parameters[0] == "0" and parameters[1] == "FILE": | |
| if not foundEnd: | |
| endLine = lineCount | |
| if endLine > startLine: | |
| main_section_lines.extend(lines[startLine:endLine]) | |
| foundEnd = True | |
| break | |
| startLine = lineCount | |
| foundEnd = False | |
| if parameters[0] == "0" and parameters[1] == "NOFILE": | |
| endLine = lineCount | |
| foundEnd = True | |
| main_section_lines.extend(lines[startLine:endLine]) | |
| break | |
| lineCount += 1 | |
| if not foundEnd: | |
| endLine = len(lines) | |
| if endLine > startLine: | |
| main_section_lines.extend(lines[startLine:endLine]) | |
| # 处理main_section中1开头的行 | |
| for line in main_section_lines: | |
| if line.startswith('1'): | |
| parts = line.split() | |
| if len(parts) >= 15: | |
| part_filename = parts[14] | |
| if ".DAT" in part_filename: | |
| part_filename = part_filename.replace(".DAT", ".dat") | |
| if part_filename not in label_mapping: | |
| label_mapping[part_filename] = label_counter | |
| label_inverse_mapping[label_counter] = part_filename | |
| label_counter += 1 | |
| current_label = label_mapping[part_filename] | |
| label_frequency[current_label] = label_frequency.get(current_label, 0) + 1 | |
| return label_mapping, label_inverse_mapping, label_frequency, label_counter | |
| # 处理文件夹中所有LDR文件 | |
| def process_all_ldr_in_folder(folder_path): | |
| overall_label_mapping = {} | |
| overall_label_inverse_mapping = {} | |
| overall_label_frequency = {} | |
| label_counter = 0 | |
| for root, dirs, files in os.walk(folder_path): | |
| for file in files: | |
| if file.lower().endswith('.ldr'): | |
| file_path = os.path.join(root, file) | |
| print(f"正在处理: {file_path}") | |
| lines = readTextFile(file_path) | |
| if lines is None: | |
| print(f"⚠️ 无法读取文件 {file_path},已跳过") | |
| continue | |
| overall_label_mapping, overall_label_inverse_mapping, overall_label_frequency, label_counter = process_ldr_data( | |
| lines, overall_label_mapping, overall_label_inverse_mapping, overall_label_frequency, label_counter) | |
| return overall_label_mapping, overall_label_inverse_mapping, overall_label_frequency | |
| # 保存映射表和按频率排序的频率表 | |
| def save_results(label_mapping, label_inverse_mapping, label_frequency, output_dir): | |
| os.makedirs(output_dir, exist_ok=True) | |
| # 保存标签映射表 | |
| with open(os.path.join(output_dir, 'label_mapping.json'), 'w', encoding='utf-8') as f: | |
| json.dump(label_mapping, f, indent=4, ensure_ascii=False) | |
| # 保存反向标签映射表 | |
| with open(os.path.join(output_dir, 'label_inverse_mapping.json'), 'w', encoding='utf-8') as f: | |
| json.dump(label_inverse_mapping, f, indent=4, ensure_ascii=False) | |
| # 准备频率数据并按使用次数排序(从高到低) | |
| frequency_list = [] | |
| for label_id, count in label_frequency.items(): | |
| frequency_list.append({ | |
| "label_id": label_id, | |
| "part_name": label_inverse_mapping.get(label_id, "未知零件"), | |
| "usage_count": count | |
| }) | |
| # 按使用次数降序排序 | |
| frequency_list.sort(key=lambda x: x["usage_count"], reverse=True) | |
| # 保存排序后的频率表 | |
| with open(os.path.join(output_dir, 'label_frequency.json'), 'w', encoding='utf-8') as f: | |
| json.dump(frequency_list, f, indent=4, ensure_ascii=False) | |
| # 主程序 | |
| if __name__ == "__main__": | |
| INPUT_FOLDER = '/public/home/wangshuo/gap/assembly/data/car_1k/subset_self/ldr_l30_rotrans_expand_wom' | |
| OUTPUT_FOLDER = '/public/home/wangshuo/gap/assembly/data/car_1k/subset_self' | |
| label_mapping, label_inverse_mapping, label_frequency = process_all_ldr_in_folder(INPUT_FOLDER) | |
| save_results(label_mapping, label_inverse_mapping, label_frequency, OUTPUT_FOLDER) | |
| print(f"\n✅ 处理完成!结果已保存到: {OUTPUT_FOLDER}") | |
| print(f"📊 统计摘要:") | |
| print(f" - 总唯一标签数: {len(label_mapping)}") | |
| print(f" - 总使用次数: {sum(label_frequency.values())}") | |
| print(f" - label_frequency.json已按使用频率从高到低排序") | |