精华5
阅读权限90
最后登录2025-6-21
在线时间141 小时
 
 
 
累计签到:377 天 连续签到:1 天
  
域主 
 
     名望- 126 点
 
       
     星币- 6580 枚
 
     星辰- 15 颗
 
     好评- 328 点
 
 
 
 
 | 
 
注册登录后全站资源免费查看下载
您需要 登录 才可以下载或查看,没有账号?立即注册  
 
×
 
https://wwvu.lanzouq.com/icus82tvlt9c 
通过网盘分享的文件:电脑自动备份插入U盘数据.exe 
链接: https://pan.baidu.com/s/1fI_QTf3F6DKw7cFzMJr2Hw?pwd=52pj 提取码: 52pj 
 
- import os
 
 - import shutil
 
 - import time
 
 - import string
 
 - import win32file # 需要安装 pywin32
 
 - import logging
 
 - from datetime import datetime
 
 - import threading
 
 - from concurrent.futures import ThreadPoolExecutor, as_completed
 
 - import tkinter as tk
 
 - from tkinter import scrolledtext
 
 - import queue
 
 -  
 
 - # --- 配置 ---
 
 - # 备份文件存储的基础路径 (请确保这个文件夹存在,或者脚本有权限创建它)
 
 - BACKUP_DESTINATION_BASE = r"D:\USB_Backups"
 
 - # 检测时间间隔(秒)
 
 - CHECK_INTERVAL = 5
 
 - # 日志文件路径
 
 - LOG_FILE = os.path.join(BACKUP_DESTINATION_BASE, "usb_backup_log.txt")
 
 - # --- 配置结束 ---
 
 -  
 
 - # --- GUI 相关 ---
 
 - class TextHandler(logging.Handler):
 
 -     """自定义日志处理器,将日志记录发送到 Text 控件"""
 
 -     def __init__(self, text_widget):
 
 -         logging.Handler.__init__(self)
 
 -         self.text_widget = text_widget
 
 -         self.queue = queue.Queue()
 
 -         # 启动一个线程来处理队列中的日志消息,避免阻塞主线程
 
 -         self.thread = threading.Thread(target=self.process_queue, daemon=True)
 
 -         self.thread.start()
 
 -  
 
 -     def emit(self, record):
 
 -         msg = self.format(record)
 
 -         self.queue.put(msg)
 
 -  
 
 -     def process_queue(self):
 
 -         while True:
 
 -             try:
 
 -                 msg = self.queue.get()
 
 -                 if msg is None: # Sentinel value to stop the thread
 
 -                     break
 
 -                 # Schedule GUI update on the main thread
 
 -                 def update_widget():
 
 -                     try:
 
 -                         self.text_widget.configure(state='normal')
 
 -                         self.text_widget.insert(tk.END, msg + '\n')
 
 -                         self.text_widget.configure(state='disabled')
 
 -                         self.text_widget.yview(tk.END)
 
 -                     except tk.TclError: # Handle cases where the widget might be destroyed
 
 -                         pass
 
 -                 self.text_widget.after(0, update_widget)
 
 -                 self.queue.task_done()
 
 -             except Exception:
 
 -                 # 处理可能的窗口已销毁等异常
 
 -                 import traceback
 
 -                 traceback.print_exc()
 
 -                 break
 
 -  
 
 -     def close(self):
 
 -         self.stop_processing() # Signal the thread to stop
 
 -         # Don't join here to avoid blocking the main thread
 
 -         logging.Handler.close(self)
 
 -  
 
 -     def stop_processing(self):
 
 -         """Signals the processing thread to stop without waiting for it."""
 
 -         self.queue.put(None) # Send sentinel to stop the processing thread
 
 -  
 
 - class App(tk.Tk):
 
 -     def __init__(self):
 
 -         super().__init__()
 
 -         self.title("USB 自动备份")
 
 -         self.geometry("600x400")
 
 -  
 
 -         self.log_text = scrolledtext.ScrolledText(self, state='disabled', wrap=tk.WORD)
 
 -         self.log_text.pack(expand=True, fill='both', padx=10, pady=5)
 
 -  
 
 -         self.status_label = tk.Label(self, text="状态: 初始化中...", anchor='w')
 
 -         self.status_label.pack(fill='x', padx=10, pady=2)
 
 -  
 
 -         self.exit_button = tk.Button(self, text="退出", command=self.quit_app)
 
 -         self.exit_button.pack(pady=5)
 
 -  
 
 -         self.backup_thread = None
 
 -         self.running = True
 
 -         self.protocol("WM_DELETE_WINDOW", self.quit_app)
 
 -  
 
 -         self.configure_logging()
 
 -  
 
 -     def configure_logging(self):
 
 -         # 日志配置前先确保备份目录存在
 
 -         if not os.path.exists(BACKUP_DESTINATION_BASE):
 
 -             try:
 
 -                 os.makedirs(BACKUP_DESTINATION_BASE)
 
 -             except Exception as e:
 
 -                 # 如果无法创建目录,在GUI中显示错误
 
 -                 self.update_status(f"错误: 无法创建备份目录 {BACKUP_DESTINATION_BASE}: {e}")
 
 -                 self.log_text.configure(state='normal')
 
 -                 self.log_text.insert(tk.END, f"错误: 无法创建备份目录 {BACKUP_DESTINATION_BASE}: {e}\n")
 
 -                 self.log_text.configure(state='disabled')
 
 -                 return
 
 -  
 
 -         log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
 
 -  
 
 -         # 文件处理器
 
 -         file_handler = logging.FileHandler(LOG_FILE)
 
 -         file_handler.setFormatter(log_formatter)
 
 -  
 
 -         # GUI 文本框处理器
 
 -         self.text_handler = TextHandler(self.log_text)
 
 -         self.text_handler.setFormatter(log_formatter)
 
 -  
 
 -         # 获取根 logger 并添加处理器
 
 -         root_logger = logging.getLogger()
 
 -         root_logger.setLevel(logging.INFO)
 
 -         # 清除可能存在的默认处理器(例如 basicConfig 创建的 StreamHandler)
 
 -         if root_logger.hasHandlers():
 
 -             root_logger.handlers.clear()
 
 -         root_logger.addHandler(file_handler)
 
 -         root_logger.addHandler(self.text_handler)
 
 -         # 添加一个 StreamHandler 以便在控制台也看到日志(调试用)
 
 -         # stream_handler = logging.StreamHandler()
 
 -         # stream_handler.setFormatter(log_formatter)
 
 -         # root_logger.addHandler(stream_handler)
 
 -  
 
 -     def update_status(self, message):
 
 -         # 使用 self.after 将 GUI 更新调度回主线程
 
 -         self.after(0, lambda: self.status_label.config(text=f"状态: {message}"))
 
 -  
 
 -     def start_backup_monitor(self):
 
 -         self.backup_thread = threading.Thread(target=run_backup_monitor, args=(self,), daemon=True)
 
 -         self.backup_thread.start()
 
 -  
 
 -     def quit_app(self):
 
 -         logging.info("收到退出信号,程序即将关闭。")
 
 -         self.running = False # Signal the backup thread to stop
 
 -  
 
 -         # Signal the logger thread to stop processing new messages
 
 -         if hasattr(self, 'text_handler'):
 
 -             self.text_handler.stop_processing()
 
 -  
 
 -         # Give the backup thread a short time to finish
 
 -         if self.backup_thread and self.backup_thread.is_alive():
 
 -             try:
 
 -                 self.backup_thread.join(timeout=1.0) # Wait max 1 second
 
 -                 if self.backup_thread.is_alive():
 
 -                     logging.warning("备份线程未能在1秒内停止,将强制关闭窗口。")
 
 -             except Exception as e:
 
 -                 logging.error(f"等待备份线程时出错: {e}")
 
 -  
 
 -         # Close the main window. Daemon threads will be terminated.
 
 -         self.destroy()
 
 -         # os._exit(0) # Avoid force exit, let the application close naturally
 
 -  
 
 - # --- 核心备份逻辑 (从旧 main 函数提取) ---
 
 - def get_available_drives():
 
 -     """获取当前所有可用的驱动器盘符"""
 
 -     drives = []
 
 -     bitmask = win32file.GetLogicalDrives()
 
 -     for letter in string.ascii_uppercase:
 
 -         if bitmask & 1:
 
 -             drives.append(letter)
 
 -         bitmask >>= 1
 
 -     return set(drives)
 
 -  
 
 - def is_removable_drive(drive_letter):
 
 -     """判断指定盘符是否是可移动驱动器 (U盘通常是这个类型)"""
 
 -     drive_path = f"{drive_letter}:\"
 
 -     try:
 
 -         # DRIVE_REMOVABLE 的类型代码是 2
 
 -         return win32file.GetDriveTypeW(drive_path) == win32file.DRIVE_REMOVABLE
 
 -     except Exception as e:
 
 -         # logging.error(f"检查驱动器 {drive_path} 类型时出错: {e}") # 可能在驱动器刚插入时发生
 
 -         return False
 
 -  
 
 - def should_skip_file(src, dst):
 
 -     """判断是否需要跳过备份(增量备份逻辑)"""
 
 -     if not os.path.exists(dst):
 
 -         return False
 
 -     try:
 
 -         src_stat = os.stat(src)
 
 -         dst_stat = os.stat(dst)
 
 -         # 文件大小和修改时间都相同则跳过
 
 -         return src_stat.st_size == dst_stat.st_size and int(src_stat.st_mtime) == int(dst_stat.st_mtime)
 
 -     except Exception:
 
 -         return False
 
 -  
 
 - def copy_file_with_log(src, dst):
 
 -     try:
 
 -         file_size = os.path.getsize(src)
 
 -         # 超过128MB的大文件采用分块复制
 
 -         if file_size > 128 * 1024 * 1024:
 
 -             chunk_size = 16 * 1024 * 1024  # 16MB
 
 -             with open(src, 'rb') as fsrc, open(dst, 'wb') as fdst:
 
 -                 while True:
 
 -                     chunk = fsrc.read(chunk_size)
 
 -                     if not chunk:
 
 -                         break
 
 -                     fdst.write(chunk)
 
 -             # 尝试复制亓数据,如果失败则记录但不中断
 
 -             try:
 
 -                 shutil.copystat(src, dst)
 
 -             except Exception as e_stat:
 
 -                 logging.warning(f"无法复制亓数据 {src} -> {dst}: {e_stat}")
 
 -             logging.info(f"分块复制大文件: {src} -> {dst}")
 
 -         else:
 
 -             shutil.copy2(src, dst)
 
 -             logging.info(f"已复制: {src} -> {dst}")
 
 -     except Exception as e:
 
 -         logging.error(f"复制文件 {src} 时出错: {e}")
 
 -  
 
 - def threaded_copytree(src, dst, skip_exts=None, skip_dirs=None, max_workers=8):
 
 -     """线程池递归复制目录,支持增量备份和跳过指定类型,限制最大线程数"""
 
 -     if skip_exts is None:
 
 -         skip_exts = ['.tmp', '.log', '.sys']
 
 -     if skip_dirs is None:
 
 -         skip_dirs = ['$RECYCLE.BIN', 'System Volume Information']
 
 -     if not os.path.exists(dst):
 
 -         try:
 
 -             os.makedirs(dst)
 
 -         except Exception as e_mkdir:
 
 -             logging.error(f"创建目录 {dst} 失败: {e_mkdir}")
 
 -             return #无法创建目标目录,则无法继续复制
 
 -     tasks = []
 
 -     small_files = []
 
 -     try:
 
 -         with ThreadPoolExecutor(max_workers=max_workers) as executor:
 
 -             for item in os.listdir(src):
 
 -                 s = os.path.join(src, item)
 
 -                 d = os.path.join(dst, item)
 
 -                 try:
 
 -                     if os.path.isdir(s):
 
 -                         if item in skip_dirs:
 
 -                             logging.info(f"跳过目录: {s}")
 
 -                             continue
 
 -                         # 递归调用也放入线程池
 
 -                         tasks.append(executor.submit(threaded_copytree, s, d, skip_exts, skip_dirs, max_workers))
 
 -                     else:
 
 -                         ext = os.path.splitext(item)[1].lower()
 
 -                         if ext in skip_exts:
 
 -                             logging.info(f"跳过文件: {s}")
 
 -                             continue
 
 -                         if should_skip_file(s, d):
 
 -                             # logging.debug(f"跳过未变更文件: {s}") # 改为 debug 级别
 
 -                             continue
 
 -                         # 小于16MB的小文件批量处理
 
 -                         if os.path.getsize(s) < 16 * 1024 * 1024:
 
 -                             small_files.append((s, d))
 
 -                         else:
 
 -                             tasks.append(executor.submit(copy_file_with_log, s, d))
 
 -                 except PermissionError:
 
 -                     logging.warning(f"无权限访问: {s},跳过")
 
 -                 except FileNotFoundError:
 
 -                     logging.warning(f"文件或目录不存在(可能在扫描时被移除): {s},跳过")
 
 -                 except Exception as e_item:
 
 -                     logging.error(f"处理 {s} 时出错: {e_item}")
 
 -  
 
 -             # 批量提交小文件任务,减少线程调度开销
 
 -             batch_size = 16
 
 -             for i in range(0, len(small_files), batch_size):
 
 -                 batch = small_files[i:i+batch_size]
 
 -                 tasks.append(executor.submit(batch_copy_files, batch))
 
 -  
 
 -             # 等待所有任务完成
 
 -             for future in as_completed(tasks):
 
 -                 try:
 
 -                     future.result() # 获取结果以暴露异常
 
 -                 except Exception as e_future:
 
 -                     logging.error(f"线程池任务出错: {e_future}")
 
 -     except PermissionError:
 
 -         logging.error(f"无权限访问源目录: {src}")
 
 -     except FileNotFoundError:
 
 -         logging.error(f"源目录不存在: {src}")
 
 -     except Exception as e_pool:
 
 -         logging.error(f"处理目录 {src} 时线程池出错: {e_pool}")
 
 -  
 
 - def batch_copy_files(file_pairs):
 
 -     for src, dst in file_pairs:
 
 -         copy_file_with_log(src, dst)
 
 -  
 
 - def backup_usb_drive(drive_letter, app_instance):
 
 -     """执行U盘备份(多线程+增量),并更新GUI状态"""
 
 -     source_drive = f"{drive_letter}:\"
 
 -     timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
 
 -     destination_folder = os.path.join(BACKUP_DESTINATION_BASE, f"Backup_{drive_letter}_{timestamp}")
 
 -  
 
 -     logging.info(f"检测到U盘: {source_drive}")
 
 -     app_instance.update_status(f"检测到U盘: {drive_letter}:\\,准备备份...")
 
 -     logging.info(f"开始备份到: {destination_folder}")
 
 -     app_instance.update_status(f"开始备份 {drive_letter}:\\ 到 {destination_folder}")
 
 -  
 
 -     start_time = time.time()
 
 -     try:
 
 -         threaded_copytree(source_drive, destination_folder, max_workers=16)
 
 -         end_time = time.time()
 
 -         duration = end_time - start_time
 
 -         logging.info(f"成功完成备份: {source_drive} -> {destination_folder} (耗时: {duration:.2f} 秒)")
 
 -         app_instance.update_status(f"备份完成: {drive_letter}:\\ (耗时: {duration:.2f} 秒)")
 
 -     except FileNotFoundError:
 
 -         logging.error(f"错误:源驱动器 {source_drive} 不存在或无法访问。")
 
 -         app_instance.update_status(f"错误: 无法访问 {drive_letter}:\")
 
 -     except PermissionError:
 
 -         logging.error(f"错误:没有权限读取 {source_drive} 或写入 {destination_folder}。请检查权限设置。")
 
 -         app_instance.update_status(f"错误: 权限不足 {drive_letter}:\\ 或目标文件夹")
 
 -     except Exception as e:
 
 -         logging.error(f"备份U盘 {source_drive} 时发生未知错误: {e}")
 
 -         app_instance.update_status(f"错误: 备份 {drive_letter}:\\ 时发生未知错误")
 
 -     finally:
 
 -         # 短暂显示完成/错误状态后,恢复到空闲状态
 
 -         app_instance.after(5000, lambda: app_instance.update_status("空闲,等待U盘插入..."))
 
 -  
 
 - def run_backup_monitor(app_instance):
 
 -     """后台监控线程的主函数"""
 
 -     logging.info("U盘自动备份程序启动...")
 
 -     logging.info(f"备份将存储在: {BACKUP_DESTINATION_BASE}")
 
 -     app_instance.update_status("启动成功,等待U盘插入...")
 
 -  
 
 -     # 检查备份目录是否已成功创建(在 App 初始化时完成)
 
 -     if not os.path.exists(BACKUP_DESTINATION_BASE):
 
 -         logging.error(f"无法启动监控:备份目录 {BACKUP_DESTINATION_BASE} 不存在且无法创建。")
 
 -         app_instance.update_status(f"错误: 备份目录不存在且无法创建")
 
 -         return
 
 -  
 
 -     try:
 
 -         known_drives = get_available_drives()
 
 -         logging.info(f"当前已知驱动器: {sorted(list(known_drives))}")
 
 -     except Exception as e_init_drives:
 
 -         logging.error(f"初始化获取驱动器列表失败: {e_init_drives}")
 
 -         app_instance.update_status(f"错误: 获取驱动器列表失败")
 
 -         known_drives = set()
 
 -  
 
 -     while app_instance.running:
 
 -         try:
 
 -             app_instance.update_status("正在检测驱动器...")
 
 -             current_drives = get_available_drives()
 
 -             new_drives = current_drives - known_drives
 
 -             removed_drives = known_drives - current_drives
 
 -  
 
 -             if new_drives:
 
 -                 logging.info(f"检测到新驱动器: {sorted(list(new_drives))}")
 
 -                 for drive in new_drives:
 
 -                     if not app_instance.running: break # Check flag before potentially long operation
 
 -                     # 稍作等待,确保驱动器已准备好
 
 -                     logging.info(f"等待驱动器 {drive}: 准备就绪...")
 
 -                     app_instance.update_status(f"检测到新驱动器 {drive}:,等待准备就绪...")
 
 -                     time.sleep(3) # 增加等待时间
 
 -                     if not app_instance.running: break
 
 -                     try:
 
 -                         if is_removable_drive(drive):
 
 -                             backup_usb_drive(drive, app_instance)
 
 -                         else:
 
 -                             logging.info(f"驱动器 {drive}: 不是可移动驱动器,跳过备份。")
 
 -                             app_instance.update_status(f"驱动器 {drive}: 非U盘,跳过")
 
 -                             # 短暂显示后恢复空闲
 
 -                             app_instance.after(3000, lambda: app_instance.update_status("空闲,等待U盘插入...") if app_instance.running else None)
 
 -                     except Exception as e_check:
 
 -                          logging.error(f"检查或备份驱动器 {drive}: 时出错: {e_check}")
 
 -                          app_instance.update_status(f"错误: 处理驱动器 {drive}: 时出错")
 
 -                          app_instance.after(5000, lambda: app_instance.update_status("空闲,等待U盘插入...") if app_instance.running else None)
 
 -  
 
 -             if removed_drives:
 
 -                 logging.info(f"检测到驱动器移除: {sorted(list(removed_drives))}")
 
 -                 # Optionally update status for removed drives
 
 -                 # app_instance.update_status(f"驱动器 {','.join(sorted(list(removed_drives)))} 已移除")
 
 -                 # app_instance.after(3000, lambda: app_instance.update_status("空闲,等待U盘插入...") if app_instance.running else None)
 
 -  
 
 -             # 更新已知驱动器列表
 
 -             known_drives = current_drives
 
 -  
 
 -             # 在循环末尾更新状态为空闲(如果没有正在进行的草作)
 
 -             if not new_drives and app_instance.status_label.cget("text").startswith("状态: 正在检测驱动器"):
 
 -                  app_instance.update_status("空闲,等待U盘插入...")
 
 -  
 
 -             # 等待指定间隔,并允许提前退出
 
 -             interval_counter = 0
 
 -             while app_instance.running and interval_counter < CHECK_INTERVAL:
 
 -                 time.sleep(1)
 
 -                 interval_counter += 1
 
 -             if not app_instance.running:
 
 -                 break
 
 -  
 
 -         except Exception as e:
 
 -             logging.error(f"主循环发生错误: {e}")
 
 -             app_instance.update_status(f"错误: {e}")
 
 -             # 防止因临时错误导致程序崩溃,稍等后继续,并允许提前退出
 
 -             error_wait_counter = 0
 
 -             while app_instance.running and error_wait_counter < CHECK_INTERVAL * 2:
 
 -                  time.sleep(1)
 
 -                  error_wait_counter += 1
 
 -             if not app_instance.running:
 
 -                 break
 
 -  
 
 -     logging.info("后台监控线程已停止。")
 
 -     app_instance.update_status("程序已停止")
 
 -  
 
 - if __name__ == "__main__":
 
 -     app = App()
 
 -     app.start_backup_monitor()
 
 -     app.mainloop()
 
  复制代码  
 |   
 
 
 
 |