import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import threading
import subprocess
import platform
import os
import sys
from datetime import datetime
import queue
import atexit
import signal
import json
import time
import winreg
class PingMonitor:
def __init__(self, root):
self.root = root
self.root.title("网络连通性监控工具 (守护进程版)")
self.root.geometry("900x650")
# 设置最小窗口大小
self.root.minsize(700, 500)
# 检查是否是开机自启动(通过命令行参数)
self.is_autostart = '--autostart' in sys.argv
# 监控状态
self.is_monitoring = False
self.monitor_thread = None
self.log_queue = queue.Queue()
# 守护进程相关
self.daemon_enabled = False
self.daemon_thread = None
self.config_file = self.get_config_path()
self.state_file = self.get_state_path()
# 统计数据
self.total_pings = 0
self.success_pings = 0
self.failed_pings = 0
self.start_time = None
self.target_ip = None
self.ping_interval = 1
# 自动保存相关
self.auto_save_interval = 30 # 每30秒自动保存一次
self.last_auto_save_time = None
self.auto_save_filepath = None
# 日志目录
self.log_dir = self.get_log_directory()
self.ensure_log_directory()
# 创建界面
self.create_widgets()
# 启动日志更新
self.update_log_display()
# 启动定期自动保存
self.start_auto_save_timer()
# 绑定关闭事件
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
# 注册退出时的自动保存
atexit.register(self.auto_save_on_exit)
# 注册信号处理(处理强制关闭等情况)
if platform.system().lower() != 'windows':
signal.signal(signal.SIGTERM, self.signal_handler)
signal.signal(signal.SIGINT, self.signal_handler)
# 尝试恢复上次的监控状态
self.restore_monitoring_state()
# 如果是开机自启动且有监控任务,自动最小化
if self.is_autostart and self.daemon_enabled:
self.root.after(2000, self.auto_minimize) # 2秒后自动最小化
# 更新开机自启动状态显示
self.update_autostart_status()
def get_config_path(self):
"""获取配置文件路径"""
if getattr(sys, 'frozen', False):
base_dir = os.path.dirname(sys.executable)
else:
base_dir = os.path.dirname(os.path.abspath(__file__))
return os.path.join(base_dir, "ping_monitor_config.json")
def get_state_path(self):
"""获取状态文件路径"""
if getattr(sys, 'frozen', False):
base_dir = os.path.dirname(sys.executable)
else:
base_dir = os.path.dirname(os.path.abspath(__file__))
return os.path.join(base_dir, "ping_monitor_state.json")
def get_log_directory(self):
"""获取日志目录路径"""
if getattr(sys, 'frozen', False):
base_dir = os.path.dirname(sys.executable)
else:
base_dir = os.path.dirname(os.path.abspath(__file__))
return os.path.join(base_dir, "ping_logs")
def ensure_log_directory(self):
"""确保日志目录存在"""
if not os.path.exists(self.log_dir):
os.makedirs(self.log_dir)
def get_today_log_path(self):
"""获取今天的日志文件路径"""
today = datetime.now().strftime("%Y-%m-%d")
filename = f"ping_log_{today}.txt"
return os.path.join(self.log_dir, filename)
def signal_handler(self, signum, frame):
"""处理系统信号"""
self.save_monitoring_state()
self.auto_save_on_exit()
sys.exit(0)
def create_widgets(self):
# 顶部控制面板
control_frame = ttk.Frame(self.root, padding="10")
control_frame.pack(fill=tk.X)
# 第一行:基本控制
# IP地址输入
ttk.Label(control_frame, text="目标IP:").grid(row=0, column=0, sticky=tk.W, padx=5)
self.ip_entry = ttk.Entry(control_frame, width=20)
self.ip_entry.insert(0, "192.168.60.250")
self.ip_entry.grid(row=0, column=1, padx=5)
# Ping间隔设置
ttk.Label(control_frame, text="间隔(秒):").grid(row=0, column=2, sticky=tk.W, padx=5)
self.interval_spinbox = ttk.Spinbox(control_frame, from_=1, to=60, width=10)
self.interval_spinbox.set(1)
self.interval_spinbox.grid(row=0, column=3, padx=5)
# 开始/停止按钮
self.start_button = ttk.Button(control_frame, text="开始监控", command=self.toggle_monitoring)
self.start_button.grid(row=0, column=4, padx=5)
# 清空日志按钮
self.clear_button = ttk.Button(control_frame, text="清空日志", command=self.clear_log)
self.clear_button.grid(row=0, column=5, padx=5)
# 保存日志按钮
self.save_button = ttk.Button(control_frame, text="手动保存", command=self.manual_save_log)
self.save_button.grid(row=0, column=6, padx=5)
# 最小化按钮
self.minimize_button = ttk.Button(control_frame, text="最小化", command=self.minimize_window)
self.minimize_button.grid(row=0, column=7, padx=5)
# 第二行:守护进程控制
daemon_frame = ttk.Frame(control_frame)
daemon_frame.grid(row=1, column=0, columnspan=8, pady=10, sticky=tk.W)
# 守护进程开关
self.daemon_var = tk.BooleanVar(value=False)
self.daemon_checkbox = ttk.Checkbutton(
daemon_frame,
text="启用守护进程 (自动恢复监控)",
variable=self.daemon_var,
command=self.toggle_daemon
)
self.daemon_checkbox.pack(side=tk.LEFT, padx=5)
# 守护进程状态标签
self.daemon_status_label = ttk.Label(daemon_frame, text="守护: 未启用", foreground="gray")
self.daemon_status_label.pack(side=tk.LEFT, padx=20)
# 打开日志文件夹按钮
self.open_log_folder_button = ttk.Button(daemon_frame, text="打开日志文件夹", command=self.open_log_folder)
self.open_log_folder_button.pack(side=tk.LEFT, padx=5)
# 第三行:开机自启动控制
autostart_frame = ttk.Frame(control_frame)
autostart_frame.grid(row=2, column=0, columnspan=8, pady=5, sticky=tk.W)
# 开机自启动开关
self.autostart_var = tk.BooleanVar(value=False)
self.autostart_checkbox = ttk.Checkbutton(
autostart_frame,
text="开机自动启动程序",
variable=self.autostart_var,
command=self.toggle_autostart
)
self.autostart_checkbox.pack(side=tk.LEFT, padx=5)
# 开机自启动状态标签
self.autostart_status_label = ttk.Label(autostart_frame, text="开机启动: 未启用", foreground="gray")
self.autostart_status_label.pack(side=tk.LEFT, padx=20)
# 提示标签
tip_label = ttk.Label(autostart_frame, text="💡 提示: 同时启用「守护进程」+「开机启动」可实现重启后自动恢复监控", foreground="blue")
tip_label.pack(side=tk.LEFT, padx=10)
# 统计信息面板
stats_frame = ttk.LabelFrame(self.root, text="统计信息", padding="10")
stats_frame.pack(fill=tk.X, padx=10, pady=5)
# 统计标签
self.total_label = ttk.Label(stats_frame, text="总计: 0")
self.total_label.grid(row=0, column=0, padx=15)
self.success_label = ttk.Label(stats_frame, text="成功: 0", foreground="green")
self.success_label.grid(row=0, column=1, padx=15)
self.failed_label = ttk.Label(stats_frame, text="失败: 0", foreground="red")
self.failed_label.grid(row=0, column=2, padx=15)
self.success_rate_label = ttk.Label(stats_frame, text="成功率: 0%")
self.success_rate_label.grid(row=0, column=3, padx=15)
self.status_label = ttk.Label(stats_frame, text="状态: 未开始", foreground="blue")
self.status_label.grid(row=0, column=4, padx=15)
self.auto_save_label = ttk.Label(stats_frame, text="自动保存: 就绪", foreground="gray")
self.auto_save_label.grid(row=0, column=5, padx=15)
# 日志显示区域
log_frame = ttk.LabelFrame(self.root, text="监控日志 (按日期自动分文件保存)", padding="10")
log_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
# 创建带滚动条的文本框
self.log_text = scrolledtext.ScrolledText(log_frame, wrap=tk.WORD, height=20)
self.log_text.pack(fill=tk.BOTH, expand=True)
# 配置文本标签颜色
self.log_text.tag_config("success", foreground="green")
self.log_text.tag_config("failed", foreground="red")
self.log_text.tag_config("info", foreground="blue")
self.log_text.tag_config("timestamp", foreground="gray")
self.log_text.tag_config("daemon", foreground="purple")
# ==================== 开机自启动相关功能 ====================
def get_program_path(self):
"""获取程序完整路径"""
if getattr(sys, 'frozen', False):
# 打包后的exe文件
return sys.executable
else:
# Python脚本
return os.path.abspath(__file__)
def check_autostart_enabled(self):
"""检查是否已启用开机自启动"""
if platform.system().lower() != 'windows':
return False
try:
key = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",
0,
winreg.KEY_READ
)
try:
value, _ = winreg.QueryValueEx(key, "PingMonitor")
winreg.CloseKey(key)
# 检查路径是否匹配当前程序(兼容带参数和不带参数的情况)
program_path = self.get_program_path()
return f'"{program_path}"' in value
except FileNotFoundError:
winreg.CloseKey(key)
return False
except Exception as e:
print(f"检查自启动状态失败: {e}")
return False
def enable_autostart(self):
"""启用开机自启动"""
if platform.system().lower() != 'windows':
messagebox.showerror("错误", "开机自启动功能仅支持Windows系统!")
return False
try:
key = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",
0,
winreg.KEY_WRITE
)
program_path = self.get_program_path()
# 添加 --autostart 参数,用于标识是开机自启动
startup_command = f'"{program_path}" --autostart'
winreg.SetValueEx(key, "PingMonitor", 0, winreg.REG_SZ, startup_command)
winreg.CloseKey(key)
self.log_message("✅ 已启用开机自启动(启动后将自动最小化)", "info")
return True
except Exception as e:
messagebox.showerror("错误", f"启用开机自启动失败:\n{str(e)}\n\n可能需要管理员权限。")
return False
def disable_autostart(self):
"""禁用开机自启动"""
if platform.system().lower() != 'windows':
return False
try:
key = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",
0,
winreg.KEY_WRITE
)
try:
winreg.DeleteValue(key, "PingMonitor")
winreg.CloseKey(key)
self.log_message("❌ 已禁用开机自启动", "info")
return True
except FileNotFoundError:
winreg.CloseKey(key)
return True
except Exception as e:
messagebox.showerror("错误", f"禁用开机自启动失败:\n{str(e)}")
return False
def toggle_autostart(self):
"""切换开机自启动状态"""
if self.autostart_var.get():
# 启用自启动
if self.enable_autostart():
self.autostart_status_label.config(text="开机启动: 已启用", foreground="green")
# 如果守护进程也启用了,给出提示
if self.daemon_var.get():
messagebox.showinfo(
"配置成功",
"✅ 已启用开机自启动\n"
"✅ 守护进程已启用\n\n"
"现在电脑重启后:\n"
"• 程序会自动启动并最小化到后台\n"
"• 自动恢复上次的监控任务\n"
"• 静默运行,不打扰您的工作!"
)
else:
messagebox.showinfo(
"提示",
"✅ 已启用开机自启动\n\n"
"建议同时启用「守护进程」功能,\n"
"这样重启后不仅会启动程序,还会自动恢复监控任务。"
)
else:
self.autostart_var.set(False)
else:
# 禁用自启动
if self.disable_autostart():
self.autostart_status_label.config(text="开机启动: 未启用", foreground="gray")
def update_autostart_status(self):
"""更新开机自启动状态显示"""
if self.check_autostart_enabled():
self.autostart_var.set(True)
self.autostart_status_label.config(text="开机启动: 已启用", foreground="green")
else:
self.autostart_var.set(False)
self.autostart_status_label.config(text="开机启动: 未启用", foreground="gray")
# ==================== 守护进程相关功能 ====================
def toggle_daemon(self):
"""切换守护进程状态"""
self.daemon_enabled = self.daemon_var.get()
if self.daemon_enabled:
self.start_daemon()
self.daemon_status_label.config(text="守护: 已启用", foreground="green")
self.log_message("守护进程已启用 - 将在重启后自动恢复监控", "daemon")
# 如果开机自启动也启用了,给出提示
if self.autostart_var.get():
self.log_message("✨ 守护进程 + 开机启动 已全部启用,重启后将自动恢复监控", "daemon")
else:
self.stop_daemon()
self.daemon_status_label.config(text="守护: 未启用", foreground="gray")
self.log_message("守护进程已禁用", "daemon")
def start_daemon(self):
"""启动守护进程"""
if not self.daemon_thread or not self.daemon_thread.is_alive():
self.daemon_thread = threading.Thread(target=self.daemon_loop, daemon=True)
self.daemon_thread.start()
self.save_config()
def stop_daemon(self):
"""停止守护进程"""
self.daemon_enabled = False
self.clear_saved_state()
self.save_config()
def daemon_loop(self):
"""守护进程循环 - 定期保存状态"""
while self.daemon_enabled:
if self.is_monitoring:
self.save_monitoring_state()
time.sleep(10) # 每10秒保存一次状态
def save_monitoring_state(self):
"""保存监控状态到文件"""
if not self.is_monitoring:
return
state = {
'target_ip': self.target_ip,
'interval': self.ping_interval,
'total_pings': self.total_pings,
'success_pings': self.success_pings,
'failed_pings': self.failed_pings,
'start_time': self.start_time.isoformat() if self.start_time else None,
'daemon_enabled': self.daemon_enabled,
'last_update': datetime.now().isoformat()
}
try:
with open(self.state_file, 'w', encoding='utf-8') as f:
json.dump(state, f, indent=2)
except Exception as e:
print(f"保存状态失败: {e}")
def restore_monitoring_state(self):
"""恢复监控状态"""
try:
if not os.path.exists(self.state_file):
# 尝试加载配置
self.load_config()
return
with open(self.state_file, 'r', encoding='utf-8') as f:
state = json.load(f)
# 检查是否启用了守护进程
if state.get('daemon_enabled', False):
self.daemon_var.set(True)
self.daemon_enabled = True
self.start_daemon()
self.daemon_status_label.config(text="守护: 已启用", foreground="green")
# 恢复监控参数
target_ip = state.get('target_ip')
interval = state.get('interval', 1)
if target_ip:
self.ip_entry.delete(0, tk.END)
self.ip_entry.insert(0, target_ip)
self.interval_spinbox.set(interval)
# 恢复统计数据
self.total_pings = state.get('total_pings', 0)
self.success_pings = state.get('success_pings', 0)
self.failed_pings = state.get('failed_pings', 0)
start_time_str = state.get('start_time')
if start_time_str:
self.start_time = datetime.fromisoformat(start_time_str)
self.update_stats()
# 自动重启监控
self.log_message("🔄 检测到守护进程 - 正在恢复监控...", "daemon")
self.root.after(1000, lambda: self.start_monitoring(restore=True))
except Exception as e:
print(f"恢复状态失败: {e}")
self.load_config()
def clear_saved_state(self):
"""清除保存的状态"""
try:
if os.path.exists(self.state_file):
os.remove(self.state_file)
except Exception as e:
print(f"清除状态文件失败: {e}")
def save_config(self):
"""保存配置"""
config = {
'daemon_enabled': self.daemon_enabled,
'last_target_ip': self.ip_entry.get().strip(),
'last_interval': self.interval_spinbox.get()
}
try:
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2)
except Exception as e:
print(f"保存配置失败: {e}")
def load_config(self):
"""加载配置"""
try:
if os.path.exists(self.config_file):
with open(self.config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
if config.get('daemon_enabled', False):
self.daemon_var.set(True)
self.daemon_enabled = True
self.daemon_status_label.config(text="守护: 已启用", foreground="green")
last_ip = config.get('last_target_ip')
if last_ip:
self.ip_entry.delete(0, tk.END)
self.ip_entry.insert(0, last_ip)
last_interval = config.get('last_interval')
if last_interval:
self.interval_spinbox.set(last_interval)
except Exception as e:
print(f"加载配置失败: {e}")
# ==================== 监控相关功能 ====================
def toggle_monitoring(self):
if not self.is_monitoring:
self.start_monitoring()
else:
self.stop_monitoring()
def start_monitoring(self, restore=False):
target_ip = self.ip_entry.get().strip()
if not target_ip:
messagebox.showwarning("警告", "请输入目标IP地址!")
return
try:
interval = int(self.interval_spinbox.get())
if interval < 1:
raise ValueError
except ValueError:
messagebox.showwarning("警告", "请输入有效的时间间隔(大于0的整数)!")
return
self.is_monitoring = True
self.start_button.config(text="停止监控")
self.ip_entry.config(state='disabled')
self.interval_spinbox.config(state='disabled')
self.daemon_checkbox.config(state='disabled')
self.autostart_checkbox.config(state='disabled')
# 如果不是恢复模式,重置统计
if not restore:
self.total_pings = 0
self.success_pings = 0
self.failed_pings = 0
self.start_time = datetime.now()
self.target_ip = target_ip
self.ping_interval = interval
self.last_auto_save_time = datetime.now()
self.update_stats()
# 设置自动保存文件路径(按日期)
self.auto_save_filepath = self.get_today_log_path()
# 启动监控线程
self.monitor_thread = threading.Thread(target=self.monitor_loop, args=(target_ip, interval), daemon=True)
self.monitor_thread.start()
if restore:
self.log_message(f"✅ 监控已恢复 - 目标: {target_ip}, 间隔: {interval}秒", "daemon")
else:
self.log_message(f"🚀 开始监控 {target_ip},间隔 {interval} 秒", "info")
self.log_message(f"📁 日志文件: {self.auto_save_filepath}", "info")
# 保存配置
self.save_config()
def stop_monitoring(self):
self.is_monitoring = False
self.start_button.config(text="开始监控")
self.ip_entry.config(state='normal')
self.interval_spinbox.config(state='normal')
self.daemon_checkbox.config(state='normal')
self.autostart_checkbox.config(state='normal')
self.log_message("⏹️ 监控已停止", "info")
# 停止时执行最后一次自动保存
if self.total_pings > 0:
self.perform_auto_save()
# 如果没有启用守护进程,清除状态
if not self.daemon_enabled:
self.clear_saved_state()
def monitor_loop(self, target_ip, interval):
while self.is_monitoring:
try:
# 执行ping命令
result = self.ping_host(target_ip)
self.total_pings += 1
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if result['success']:
self.success_pings += 1
message = f"[{timestamp}] ✅ 成功 - {target_ip} - 时间={result['time']}ms TTL={result['ttl']}"
tag = "success"
else:
self.failed_pings += 1
message = f"[{timestamp}] ❌ 失败 - {target_ip} - {result['error']}"
tag = "failed"
self.log_queue.put((message, tag))
# 等待指定间隔
for _ in range(interval * 10):
if not self.is_monitoring:
break
threading.Event().wait(0.1)
except Exception as e:
self.log_queue.put((f"⚠️ 错误: {str(e)}", "failed"))
def ping_host(self, host):
"""执行ping命令(无窗口闪烁)"""
try:
# 根据操作系统选择ping命令参数
param = '-n' if platform.system().lower() == 'windows' else '-c'
# 执行ping命令
command = ['ping', param, '1', host]
# Windows下隐藏控制台窗口,防止闪烁
if platform.system().lower() == 'windows':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
result = subprocess.run(
command,
capture_output=True,
text=True,
timeout=5,
encoding='gbk',
startupinfo=startupinfo,
creationflags=subprocess.CREATE_NO_WINDOW
)
else:
result = subprocess.run(
command,
capture_output=True,
text=True,
timeout=5,
encoding='utf-8'
)
output = result.stdout
if result.returncode == 0:
time_ms = None
ttl = None
if platform.system().lower() == 'windows':
for line in output.split('\n'):
if '时间' in line or 'time' in line.lower():
if '时间=' in line:
time_ms = line.split('时间=')[1].split('ms')[0].strip()
elif 'time=' in line.lower():
time_ms = line.lower().split('time=')[1].split('ms')[0].strip()
if 'TTL=' in line:
ttl = line.split('TTL=')[1].split()[0].strip()
elif 'ttl=' in line.lower():
ttl = line.lower().split('ttl=')[1].split()[0].strip()
else:
for line in output.split('\n'):
if 'time=' in line.lower():
time_ms = line.lower().split('time=')[1].split()[0].strip()
if 'ttl=' in line.lower():
ttl = line.lower().split('ttl=')[1].split()[0].strip()
return {
'success': True,
'time': time_ms or 'N/A',
'ttl': ttl or 'N/A'
}
else:
return {
'success': False,
'error': '请求超时或主机不可达'
}
except subprocess.TimeoutExpired:
return {
'success': False,
'error': '请求超时'
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
# ==================== 日志和界面相关功能 ====================
def log_message(self, message, tag="info"):
"""添加日志消息到队列"""
self.log_queue.put((message, tag))
def update_log_display(self):
"""更新日志显示"""
try:
while True:
message, tag = self.log_queue.get_nowait()
self.log_text.insert(tk.END, message + "\n", tag)
self.log_text.see(tk.END)
self.update_stats()
except queue.Empty:
pass
# 每100ms更新一次
self.root.after(100, self.update_log_display)
def update_stats(self):
"""更新统计信息"""
self.total_label.config(text=f"总计: {self.total_pings}")
self.success_label.config(text=f"成功: {self.success_pings}")
self.failed_label.config(text=f"失败: {self.failed_pings}")
if self.total_pings > 0:
success_rate = (self.success_pings / self.total_pings) * 100
self.success_rate_label.config(text=f"成功率: {success_rate:.1f}%")
if self.is_monitoring:
status = "正在监控" if self.failed_pings == 0 else "监控中(有失败)"
color = "green" if self.failed_pings == 0 else "orange"
else:
status = "未开始" if self.total_pings == 0 else "已停止"
color = "blue"
self.status_label.config(text=f"状态: {status}", foreground=color)
def clear_log(self):
"""清空日志"""
if messagebox.askyesno("确认", "确定要清空日志吗?"):
self.log_text.delete(1.0, tk.END)
def get_statistics_summary(self):
"""获取统计信息摘要"""
success_rate = (self.success_pings / self.total_pings * 100) if self.total_pings > 0 else 0
failure_rate = (self.failed_pings / self.total_pings * 100) if self.total_pings > 0 else 0
duration = ""
if self.start_time:
elapsed = datetime.now() - self.start_time
hours = elapsed.seconds // 3600
minutes = (elapsed.seconds % 3600) // 60
seconds = elapsed.seconds % 60
duration = f"{hours}小时{minutes}分{seconds}秒"
summary = f"""
{'=' * 60}
统计信息摘要
{'=' * 60}
监控目标: {self.target_ip or 'N/A'}
开始时间: {self.start_time.strftime('%Y-%m-%d %H:%M:%S') if self.start_time else 'N/A'}
当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
监控时长: {duration}
守护进程: {'已启用' if self.daemon_enabled else '未启用'}
开机启动: {'已启用' if self.autostart_var.get() else '未启用'}
总 Ping 数: {self.total_pings}
成功次数: {self.success_pings}
失败次数: {self.failed_pings}
成功率: {success_rate:.2f}%
失败率: {failure_rate:.2f}%
{'=' * 60}
"""
return summary
def start_auto_save_timer(self):
"""启动定期自动保存定时器"""
if self.is_monitoring and self.total_pings > 0:
if self.last_auto_save_time:
elapsed = (datetime.now() - self.last_auto_save_time).total_seconds()
if elapsed >= self.auto_save_interval:
self.perform_auto_save()
# 每5秒检查一次
self.root.after(5000, self.start_auto_save_timer)
def perform_auto_save(self):
"""执行自动保存"""
try:
if self.total_pings == 0:
return
# 检查日期是否变更,如果变更则更新文件路径
new_log_path = self.get_today_log_path()
if new_log_path != self.auto_save_filepath:
self.auto_save_filepath = new_log_path
self.log_message(f"📅 日期变更,新日志文件: {self.auto_save_filepath}", "info")
log_content = self.log_text.get(1.0, tk.END)
# 保存日志
with open(self.auto_save_filepath, 'w', encoding='utf-8') as f:
f.write(f"网络连通性监控日志 - 自动保存\n")
f.write(f"最后更新: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"\n")
f.write(self.get_statistics_summary())
f.write(f"\n")
f.write(f"{'=' * 60}\n")
f.write(f"详细日志记录\n")
f.write(f"{'=' * 60}\n\n")
f.write(log_content)
self.last_auto_save_time = datetime.now()
save_time = datetime.now().strftime("%H:%M:%S")
self.auto_save_label.config(
text=f"自动保存: {save_time}",
foreground="green"
)
self.root.after(3000, lambda: self.auto_save_label.config(foreground="gray"))
except Exception as e:
print(f"自动保存失败: {str(e)}")
self.auto_save_label.config(text=f"自动保存: 失败", foreground="red")
def manual_save_log(self):
"""手动保存日志(另存为带时间戳的文件)"""
try:
log_content = self.log_text.get(1.0, tk.END)
if not log_content.strip() or self.total_pings == 0:
messagebox.showinfo("提示", "日志为空,无需保存!")
return
# 生成带时间戳的文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"ping_log_manual_{timestamp}.txt"
filepath = os.path.join(self.log_dir, filename)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(f"网络连通性监控日志 - 手动保存\n")
f.write(f"保存时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"\n")
f.write(self.get_statistics_summary())
f.write(f"\n")
f.write(f"{'=' * 60}\n")
f.write(f"详细日志记录\n")
f.write(f"{'=' * 60}\n\n")
f.write(log_content)
messagebox.showinfo("成功", f"日志已手动保存到:\n{filepath}")
except Exception as e:
messagebox.showerror("错误", f"保存日志失败:\n{str(e)}")
def auto_save_on_exit(self):
"""程序退出时自动保存日志"""
try:
if self.total_pings > 0 and self.auto_save_filepath:
self.perform_auto_save()
print(f"日志已自动保存到: {self.auto_save_filepath}")
# 保存监控状态
if self.daemon_enabled and self.is_monitoring:
self.save_monitoring_state()
print("监控状态已保存")
except Exception as e:
print(f"自动保存失败: {str(e)}")
def open_log_folder(self):
"""打开日志文件夹"""
try:
if platform.system().lower() == 'windows':
os.startfile(self.log_dir)
elif platform.system().lower() == 'darwin': # macOS
subprocess.Popen(['open', self.log_dir])
else: # Linux
subprocess.Popen(['xdg-open', self.log_dir])
except Exception as e:
messagebox.showerror("错误", f"无法打开文件夹:\n{str(e)}")
def minimize_window(self):
"""最小化窗口"""
self.root.iconify()
def auto_minimize(self):
"""自动最小化窗口(用于开机启动)"""
if self.is_monitoring:
self.log_message("🔽 开机自启动 - 自动最小化到后台", "info")
self.minimize_window()
def on_closing(self):
"""关闭窗口时的处理"""
if self.is_monitoring:
if self.daemon_enabled:
# 如果启用了守护进程,提示用户
result = messagebox.askyesnocancel(
"确认",
"守护进程已启用,监控将继续运行。\n\n"
"是:最小化到后台继续监控\n"
"否:停止监控并退出\n"
"取消:返回"
)
if result is None: # 取消
return
elif result: # 是 - 最小化
self.minimize_window()
return
else: # 否 - 停止并退出
self.is_monitoring = False
self.daemon_enabled = False
self.daemon_var.set(False)
self.save_monitoring_state()
self.auto_save_on_exit()
self.root.destroy()
else:
if messagebox.askokcancel("确认", "监控正在进行中,确定要退出吗?\n日志将自动保存。"):
self.is_monitoring = False
self.auto_save_on_exit()
self.root.destroy()
else:
if self.total_pings > 0:
self.auto_save_on_exit()
self.root.destroy()
def main():
root = tk.Tk()
app = PingMonitor(root)
root.mainloop()
if __name__ == "__main__":
main()