当前位置: 首页 > news >正文

anyrouter CDN 测速工具 v2.1 - 测试多个服务地址的连接速度

#!/usr/bin/env python3 """ CDN 测速工具 v2.1 - 测试多个服务地址的连接速度 支持: HTTP延迟、TCP Ping、Traceroute、下载速度测试、JSON导出 """ import urllib.request import urllib.error import time import ssl import socket import statistics import subprocess import struct import sys import json import os from datetime import datetime from urllib.parse import urlparse from concurrent.futures import ThreadPoolExecutor, as_completed # 测试地址列表 ENDPOINTS = [ { "name": "大陆优化CDN", "url": "https://c.cspok.cn", "desc": "针对中国大陆地区优化的后端服务" }, { "name": "主站直连", "url": "https://anyrouter.top", "desc": "主站后端直连服务" }, { "name": "大陆网络优化-宁波", "url": "https://pmpjfbhq.cn-nb1.rainapp.top", "desc": "针对中国大陆地区优化的后端服务" }, { "name": "大陆网络优化-上海", "url": "https://a-ocnfniawgw.cn-shanghai.fcapp.run", "desc": "针对中国大陆地区优化的后端服务" } ] # 测试参数 TEST_COUNT = 3 TCP_PING_COUNT = 5 TIMEOUT = 10 TRACEROUTE_MAX_HOPS = 20 def create_ssl_context(): """创建SSL上下文""" ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE return ctx def parse_url(url): """解析URL获取主机名和端口""" parsed = urlparse(url) host = parsed.hostname port = parsed.port or (443 if parsed.scheme == 'https' else 80) return host, port def resolve_host(hostname): """DNS解析""" try: ip = socket.gethostbyname(hostname) return {"success": True, "ip": ip} except socket.gaierror as e: return {"success": False, "error": str(e)} def tcp_ping(host, port, timeout=5): """TCP Ping - 测试TCP连接延迟""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) start = time.time() sock.connect((host, port)) elapsed = (time.time() - start) * 1000 # ms sock.close() return {"success": True, "latency": elapsed} except socket.timeout: return {"success": False, "error": "连接超时"} except socket.error as e: return {"success": False, "error": str(e)} def tcp_ping_test(host, port, count=TCP_PING_COUNT): """执行多次TCP Ping测试""" results = [] for i in range(count): result = tcp_ping(host, port) results.append(result) time.sleep(0.1) # 短暂间隔 return results def traceroute_tcp(host, port=443, max_hops=TRACEROUTE_MAX_HOPS, timeout=2): """ TCP Traceroute 实现 使用递增的TTL值来追踪路由路径 """ results = [] # 先解析目标IP try: dest_ip = socket.gethostbyname(host) except socket.gaierror: return [{"hop": 1, "error": "DNS解析失败"}] print(f" 追踪路由到 {host} ({dest_ip}), 最大 {max_hops} 跳:") print(f" {'跳数':<4} {'IP地址':<18} {'延迟':<12} {'主机名'}") print(" " + "-" * 55) for ttl in range(1, max_hops + 1): # 创建原始socket(需要root权限)或使用UDP hop_result = {"hop": ttl, "ip": None, "latency": None, "hostname": None} try: # 使用UDP进行traceroute(不需要root) recv_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) recv_socket.settimeout(timeout) send_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) send_socket.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl) start = time.time() send_socket.sendto(b'', (dest_ip, 33434 + ttl)) try: data, addr = recv_socket.recvfrom(1024) elapsed = (time.time() - start) * 1000 hop_ip = addr[0] # 尝试反向DNS解析 try: hostname = socket.gethostbyaddr(hop_ip)[0] except: hostname = hop_ip hop_result["ip"] = hop_ip hop_result["latency"] = elapsed hop_result["hostname"] = hostname print(f" {ttl:<4} {hop_ip:<18} {elapsed:.2f} ms {hostname}") # 如果到达目标 if hop_ip == dest_ip: results.append(hop_result) break except socket.timeout: hop_result["error"] = "超时" print(f" {ttl:<4} {'*':<18} {'*':<12} 请求超时") recv_socket.close() send_socket.close() except PermissionError: # 没有root权限,使用系统traceroute命令 return traceroute_system(host, max_hops) except Exception as e: hop_result["error"] = str(e) # 尝试使用系统命令 return traceroute_system(host, max_hops) results.append(hop_result) return results def traceroute_system(host, max_hops=TRACEROUTE_MAX_HOPS): """使用系统traceroute命令""" results = [] # 检测可用的traceroute命令 traceroute_cmd = None for cmd in ['traceroute', 'tracepath', 'mtr']: try: subprocess.run([cmd, '--version'], capture_output=True, timeout=2) traceroute_cmd = cmd break except: continue if not traceroute_cmd: print(" 未找到traceroute工具,尝试安装...") return [{"hop": 0, "error": "未找到traceroute工具"}] try: if traceroute_cmd == 'traceroute': cmd = ['traceroute', '-n', '-m', str(max_hops), '-w', '2', host] elif traceroute_cmd == 'tracepath': cmd = ['tracepath', '-n', '-m', str(max_hops), host] else: # mtr cmd = ['mtr', '-n', '-c', '1', '--report', host] print(f" 执行: {' '.join(cmd)}") print() process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # 实时输出 for line in process.stdout: print(f" {line.rstrip()}") # 解析输出 parts = line.split() if parts and parts[0].isdigit(): hop_num = int(parts[0]) results.append({"hop": hop_num, "raw": line.strip()}) process.wait(timeout=60) except subprocess.TimeoutExpired: print(" traceroute 超时") except Exception as e: print(f" traceroute 错误: {e}") return results def traceroute_simple(host, max_hops=TRACEROUTE_MAX_HOPS): """ 简化版TCP traceroute,使用connect超时方式 """ results = [] try: dest_ip = socket.gethostbyname(host) except socket.gaierror: return [{"hop": 1, "error": "DNS解析失败"}] print(f" 追踪到 {host} ({dest_ip}):") print(f" {'跳数':<4} {'状态':<15} {'延迟'}") print(" " + "-" * 40) # 尝试直接连接测试 for ttl in range(1, max_hops + 1): try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, ttl) sock.settimeout(2) start = time.time() try: sock.connect((dest_ip, 443)) elapsed = (time.time() - start) * 1000 print(f" {ttl:<4} {'到达目标':<15} {elapsed:.2f} ms") results.append({"hop": ttl, "status": "到达", "latency": elapsed}) sock.close() break except socket.timeout: print(f" {ttl:<4} {'超时':<15} *") results.append({"hop": ttl, "status": "超时"}) except socket.error as e: elapsed = (time.time() - start) * 1000 if e.errno == 111: # Connection refused - 通常表示到达 print(f" {ttl:<4} {'中间节点':<15} {elapsed:.2f} ms") results.append({"hop": ttl, "status": "中间节点", "latency": elapsed}) else: print(f" {ttl:<4} {'TTL过期':<15} {elapsed:.2f} ms") results.append({"hop": ttl, "status": "TTL过期", "latency": elapsed}) sock.close() except Exception as e: print(f" {ttl:<4} 错误: {e}") results.append({"hop": ttl, "error": str(e)}) return results def test_latency(url, timeout=TIMEOUT): """测试HTTP延迟""" ctx = create_ssl_context() start = time.time() try: req = urllib.request.Request(url, method='HEAD') req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36') with urllib.request.urlopen(req, timeout=timeout, context=ctx) as response: status = response.status elapsed = (time.time() - start) * 1000 return {"success": True, "latency": elapsed, "status": status} except urllib.error.HTTPError as e: elapsed = (time.time() - start) * 1000 return {"success": True, "latency": elapsed, "status": e.code} except Exception as e: return {"success": False, "error": str(e)} def test_download_speed(url, timeout=TIMEOUT): """测试下载速度""" ctx = create_ssl_context() start = time.time() try: req = urllib.request.Request(url) req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36') with urllib.request.urlopen(req, timeout=timeout, context=ctx) as response: data = response.read() size = len(data) elapsed = time.time() - start if elapsed > 0: speed = size / elapsed return {"success": True, "size": size, "time": elapsed, "speed": speed} return {"success": True, "size": size, "time": 0, "speed": 0} except Exception as e: return {"success": False, "error": str(e)} def format_speed(bps): """格式化速度""" if bps >= 1024 * 1024: return f"{bps / (1024 * 1024):.2f} MB/s" elif bps >= 1024: return f"{bps / 1024:.2f} KB/s" else: return f"{bps:.2f} B/s" def format_size(bytes_size): """格式化大小""" if bytes_size >= 1024 * 1024: return f"{bytes_size / (1024 * 1024):.2f} MB" elif bytes_size >= 1024: return f"{bytes_size / 1024:.2f} KB" else: return f"{bytes_size} B" def test_endpoint(endpoint, enable_traceroute=True): """测试单个端点""" name = endpoint["name"] url = endpoint["url"] desc = endpoint["desc"] host, port = parse_url(url) print(f"\n{'='*65}") print(f"📡 {name}") print(f" URL: {url}") print(f" 描述: {desc}") print(f"{'='*65}") # 结果数据结构 result_data = { "name": name, "url": url, "description": desc, "host": host, "port": port, "test_time": datetime.now().isoformat(), "dns": {}, "tcp_ping": {}, "traceroute": [], "http": {}, "download": {}, "summary": {} } # DNS 解析 print(f"\n🔍 DNS解析:") dns_result = resolve_host(host) if dns_result["success"]: ip = dns_result["ip"] print(f" {host} -> {ip}") result_data["dns"] = {"success": True, "ip": ip, "hostname": host} else: print(f" ❌ DNS解析失败: {dns_result['error']}") result_data["dns"] = {"success": False, "error": dns_result['error']} result_data["summary"] = { "avg_latency": None, "tcp_latency": None, "speed": None, "status": "DNS解析失败" } return result_data # TCP Ping 测试 print(f"\n🏓 TCP Ping 测试 (端口 {port}, 共{TCP_PING_COUNT}次):") tcp_results = tcp_ping_test(ip, port, TCP_PING_COUNT) tcp_latencies = [] tcp_details = [] for i, result in enumerate(tcp_results, 1): if result["success"]: lat = result["latency"] tcp_latencies.append(lat) tcp_details.append({"seq": i, "latency_ms": round(lat, 2), "success": True}) print(f" 第{i}次: {lat:.2f} ms") else: tcp_details.append({"seq": i, "success": False, "error": result['error']}) print(f" 第{i}次: 失败 - {result['error']}") result_data["tcp_ping"]["details"] = tcp_details if tcp_latencies: tcp_avg = statistics.mean(tcp_latencies) tcp_min = min(tcp_latencies) tcp_max = max(tcp_latencies) tcp_jitter = statistics.stdev(tcp_latencies) if len(tcp_latencies) > 1 else 0 tcp_loss = (TCP_PING_COUNT - len(tcp_latencies)) / TCP_PING_COUNT * 100 print(f"\n 📊 TCP Ping 统计:") print(f" 平均: {tcp_avg:.2f} ms") print(f" 最小: {tcp_min:.2f} ms") print(f" 最大: {tcp_max:.2f} ms") print(f" 抖动: {tcp_jitter:.2f} ms") print(f" 丢包: {tcp_loss:.1f}%") result_data["tcp_ping"]["statistics"] = { "avg_ms": round(tcp_avg, 2), "min_ms": round(tcp_min, 2), "max_ms": round(tcp_max, 2), "jitter_ms": round(tcp_jitter, 2), "packet_loss_percent": round(tcp_loss, 1), "success_count": len(tcp_latencies), "total_count": TCP_PING_COUNT } else: tcp_avg = None print(f"\n ❌ TCP Ping 全部失败") result_data["tcp_ping"]["statistics"] = {"success": False, "packet_loss_percent": 100} # Traceroute 测试 if enable_traceroute: print(f"\n🛤️ Traceroute 测试:") trace_results = traceroute_simple_with_data(ip, max_hops=15) result_data["traceroute"] = trace_results # HTTP 延迟测试 print(f"\n⏱️ HTTP 延迟测试 (共{TEST_COUNT}次):") latencies = [] http_details = [] http_status = None for i in range(TEST_COUNT): result = test_latency(url) if result["success"]: latency = result["latency"] latencies.append(latency) http_status = result.get("status", "N/A") http_details.append({"seq": i+1, "latency_ms": round(latency, 2), "status": http_status, "success": True}) print(f" 第{i+1}次: {latency:.2f} ms (HTTP {http_status})") else: http_details.append({"seq": i+1, "success": False, "error": result['error']}) print(f" 第{i+1}次: 失败 - {result['error']}") result_data["http"]["details"] = http_details result_data["http"]["status_code"] = http_status if latencies: avg_latency = statistics.mean(latencies) min_latency = min(latencies) max_latency = max(latencies) print(f"\n 📊 HTTP延迟统计:") print(f" 平均: {avg_latency:.2f} ms") print(f" 最小: {min_latency:.2f} ms") print(f" 最大: {max_latency:.2f} ms") result_data["http"]["statistics"] = { "avg_ms": round(avg_latency, 2), "min_ms": round(min_latency, 2), "max_ms": round(max_latency, 2), "success_count": len(latencies), "total_count": TEST_COUNT } else: avg_latency = None print(f"\n ❌ HTTP延迟测试失败") result_data["http"]["statistics"] = {"success": False} # 下载测试 print(f"\n📥 下载测试:") dl_result = test_download_speed(url) if dl_result["success"]: size = dl_result["size"] dl_time = dl_result["time"] speed = dl_result["speed"] print(f" 下载大小: {format_size(size)}") print(f" 下载用时: {dl_time:.3f} 秒") print(f" 下载速度: {format_speed(speed)}") result_data["download"] = { "success": True, "size_bytes": size, "time_seconds": round(dl_time, 3), "speed_bps": round(speed, 2), "speed_formatted": format_speed(speed) } else: speed = None print(f" ❌ 下载失败 - {dl_result['error']}") result_data["download"] = {"success": False, "error": dl_result['error']} # 汇总 result_data["summary"] = { "avg_latency": round(avg_latency, 2) if avg_latency else None, "tcp_latency": round(tcp_avg, 2) if tcp_avg else None, "speed": round(speed, 2) if speed else None, "status": "可用" if tcp_avg else "不可用" } return result_data def traceroute_simple_with_data(host, max_hops=TRACEROUTE_MAX_HOPS): """简化版TCP traceroute,返回数据""" results = [] try: dest_ip = socket.gethostbyname(host) except socket.gaierror: return [{"hop": 1, "error": "DNS解析失败"}] print(f" 追踪到 {host} ({dest_ip}):") print(f" {'跳数':<4} {'状态':<15} {'延迟'}") print(" " + "-" * 40) for ttl in range(1, max_hops + 1): hop_data = {"hop": ttl} try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, ttl) sock.settimeout(2) start = time.time() try: sock.connect((dest_ip, 443)) elapsed = (time.time() - start) * 1000 print(f" {ttl:<4} {'到达目标':<15} {elapsed:.2f} ms") hop_data.update({"status": "到达目标", "latency_ms": round(elapsed, 2), "reached": True}) results.append(hop_data) sock.close() break except socket.timeout: print(f" {ttl:<4} {'超时':<15} *") hop_data.update({"status": "超时", "timeout": True}) except socket.error as e: elapsed = (time.time() - start) * 1000 if e.errno == 111: print(f" {ttl:<4} {'中间节点':<15} {elapsed:.2f} ms") hop_data.update({"status": "中间节点", "latency_ms": round(elapsed, 2)}) else: print(f" {ttl:<4} {'TTL过期':<15} {elapsed:.2f} ms") hop_data.update({"status": "TTL过期", "latency_ms": round(elapsed, 2)}) sock.close() except Exception as e: print(f" {ttl:<4} 错误: {e}") hop_data.update({"error": str(e)}) results.append(hop_data) return results def print_summary(results): """打印汇总结果""" print("\n\n" + "="*70) print("📈 测速结果汇总") print("="*70) # 按TCP延迟排序 valid_results = [r for r in results if r["summary"].get("tcp_latency") is not None] valid_results.sort(key=lambda x: x["summary"]["tcp_latency"]) print(f"\n{'排名':<4} {'名称':<22} {'TCP延迟':<12} {'HTTP延迟':<12} {'速度':<12}") print("-" * 70) for i, r in enumerate(valid_results, 1): tcp_str = f"{r['summary']['tcp_latency']:.2f} ms" if r['summary']['tcp_latency'] else "N/A" http_str = f"{r['summary']['avg_latency']:.2f} ms" if r['summary']['avg_latency'] else "N/A" speed_str = format_speed(r['summary']['speed']) if r['summary']['speed'] else "N/A" print(f"{i:<4} {r['name']:<22} {tcp_str:<12} {http_str:<12} {speed_str:<12}") # 显示失败的 failed = [r for r in results if r["summary"].get("tcp_latency") is None] if failed: print("\n❌ 无法连接的节点:") for r in failed: print(f" - {r['name']} ({r['url']})") # 推荐 if valid_results: best = valid_results[0] print(f"\n" + "="*70) print(f"✅ 推荐使用: {best['name']}") print(f" URL: {best['url']}") if best['summary']['tcp_latency']: print(f" TCP延迟: {best['summary']['tcp_latency']:.2f} ms") if best['summary']['avg_latency']: print(f" HTTP延迟: {best['summary']['avg_latency']:.2f} ms") def export_json(results, output_file=None): """导出测试结果到JSON文件""" if output_file is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f"speedtest_result_{timestamp}.json" # 构建完整报告 report = { "report_info": { "tool": "CDN 测速工具", "version": "2.1", "generated_at": datetime.now().isoformat(), "test_parameters": { "tcp_ping_count": TCP_PING_COUNT, "http_test_count": TEST_COUNT, "timeout_seconds": TIMEOUT, "traceroute_max_hops": TRACEROUTE_MAX_HOPS } }, "endpoints": results, "ranking": [], "recommendation": None } # 生成排名 valid_results = [r for r in results if r["summary"].get("tcp_latency") is not None] valid_results.sort(key=lambda x: x["summary"]["tcp_latency"]) for i, r in enumerate(valid_results, 1): report["ranking"].append({ "rank": i, "name": r["name"], "url": r["url"], "tcp_latency_ms": r["summary"]["tcp_latency"], "http_latency_ms": r["summary"]["avg_latency"], "speed_bps": r["summary"]["speed"] }) # 推荐 if valid_results: best = valid_results[0] report["recommendation"] = { "name": best["name"], "url": best["url"], "tcp_latency_ms": best["summary"]["tcp_latency"], "http_latency_ms": best["summary"]["avg_latency"], "reason": "TCP延迟最低" } # 写入文件 with open(output_file, 'w', encoding='utf-8') as f: json.dump(report, f, ensure_ascii=False, indent=2) return output_file def main(): import argparse parser = argparse.ArgumentParser(description='CDN 测速工具 v2.1') parser.add_argument('--no-traceroute', '-t', action='store_true', help='禁用 traceroute 测试') parser.add_argument('--quick', '-q', action='store_true', help='快速模式(减少测试次数)') parser.add_argument('--json', '-j', nargs='?', const='auto', default=None, metavar='FILE', help='导出结果到JSON文件(不指定文件名则自动生成)') parser.add_argument('--output', '-o', type=str, default=None, help='指定JSON输出文件路径') args = parser.parse_args() global TEST_COUNT, TCP_PING_COUNT if args.quick: TEST_COUNT = 1 TCP_PING_COUNT = 3 print("\n" + "="*70) print(" 🚀 CDN 测速工具 v2.1") print(" 支持: TCP Ping | Traceroute | HTTP延迟 | 下载速度 | JSON导出") print(" 测试中国大陆优化节点") print("="*70) results = [] enable_traceroute = not args.no_traceroute for endpoint in ENDPOINTS: result = test_endpoint(endpoint, enable_traceroute=enable_traceroute) results.append(result) print_summary(results) # 导出JSON json_file = args.output or args.json if json_file: if json_file == 'auto': json_file = None # 让export_json自动生成文件名 output_path = export_json(results, json_file) print(f"\n📄 测试结果已导出到: {output_path}") print("\n" + "="*70) print("测速完成!") print("="*70 + "\n") if __name__ == "__main__": main()
http://www.cnnetsun.cn/news/135811.html

相关文章:

  • 7、Windows应用开发中的用户界面控件使用指南
  • 18、Windows 应用数据管理全解析
  • AI大模型微调完全指南:13分钟让小模型“开挂“超越GPT-5,程序员必备收藏!
  • 汇编语言全接触-34.RichEdit 控件:更多的正文操作
  • 汇编语言全接触-35.RichEdit 控件:语法高亮显示
  • 自养号测评:跳出“隐形工具”定位,筑牢品牌增长核心基建
  • 昂瑞微推出了面向移动电源行业的一站式智能方案-OM70201MV
  • Mobox移动桌面体验优化指南:极致显示与性能调校
  • 23、Web与互联网管理及服务器日志分析
  • LPxxR100FN_36W/48W/60W开关电源100V同步整流芯片典型应用电路(LP20R100FN,LP20R100FN,LP10R100FN)
  • SHP文件GCJ02转WGS84坐标系系统源码
  • ComfyUI智能字幕生成终极指南:轻松为图片添加精准描述 [特殊字符]
  • ComfyUI智能字幕生成终极指南:3步实现AI自动化图片标注
  • 2025亚马逊运营升级:从短期竞争,转向品牌资产长效经营
  • 【大前端】【iOS】iOS 真实项目可落地目录结构方案
  • “在我电脑上明明是好的”:我用这套云原生工作流,终结了团队内耗
  • 揭秘MCP服务发现:构建智能AI工具生态系统的核心技术
  • 智能笔记管理:如何高效组织你的每日任务与灵感
  • 终极指南:如何快速掌握 My Mind 免费在线思维导图工具
  • Vim插件管理器VAM终极指南:从零开始构建高效开发环境
  • AgentWeb终极指南:Android混合开发的一站式解决方案
  • 事件循环机制
  • TikTok 电商全球新棋局:从野蛮生长到精耕细作,谁能站稳脚跟?
  • 创建Mysql 用户 并赋权
  • 完整指南:如何快速掌握Vue可视化打印解决方案
  • Three-DXF深度解析:在浏览器中实现专业级CAD文件渲染
  • P2MS:比特币的多重签名机制与比特鹰的技术解析
  • 终极AI字幕生成指南:快速掌握智能字幕处理技巧
  • C语言程序设计教学指导:突破误区,设计有效实验项目
  • DirectX 9.0b SDK介绍:核心组件、在游戏中的重要性