fscan/mysql_tests/test_optimized_mysql.go
ZacharyZcR 43f210ffc6 feat: 实现新一代插件注册系统完全替代传统手动注册模式
- 重构插件注册架构采用现代工厂模式和自动发现机制
- 新增完整的插件元数据管理系统支持版本能力标签等信息
- 实现智能插件适配器提供向后兼容的桥接功能
- 建立MySQL Redis SSH三个标准插件作为新架构参考实现
- 优化插件扫描逻辑支持按端口按类型的智能查询和过滤
- 添加国际化支持和完善的文档体系
- 代码量减少67%维护成本大幅降低扩展性显著提升

新架构特点:
- 零配置插件注册import即用
- 工厂模式延迟初始化和依赖注入
- 丰富元数据系统和能力声明
- 完全解耦的模块化设计
- 面向未来的可扩展架构

测试验证: MySQL和Redis插件功能完整包括弱密码检测未授权访问检测和自动利用攻击
2025-08-07 11:28:34 +08:00

214 lines
5.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"database/sql"
"fmt"
"time"
_ "github.com/go-sql-driver/mysql"
)
func main() {
fmt.Println("测试优化后的MySQL连接字符串")
fmt.Println("===============================")
// 测试参数
host := "127.0.0.1"
port := 3306
username := "root"
password := "123456"
timeout := 3 * time.Second
fmt.Printf("目标: %s:%d\n", host, port)
fmt.Printf("用户: %s/%s\n", username, password)
fmt.Printf("超时: %v\n", timeout)
fmt.Println()
// 测试原始fscan格式
fmt.Println("1. 测试原始fscan格式:")
originalDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/mysql?charset=utf8&timeout=%s",
username, password, host, port, timeout.String())
fmt.Printf(" DSN: %s\n", originalDSN)
testConnection("原始格式", originalDSN, timeout)
// 测试优化后的格式
fmt.Println("\n2. 测试优化后的格式:")
readTimeout := timeout - 500*time.Millisecond
if timeout <= time.Second {
readTimeout = timeout
}
optimizedDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&timeout=%s&readTimeout=%s&writeTimeout=%s&parseTime=true",
username, password, host, port, timeout.String(), readTimeout.String(), readTimeout.String())
fmt.Printf(" DSN: %s\n", optimizedDSN)
testConnection("优化格式", optimizedDSN, timeout)
// 测试其他推荐格式
fmt.Println("\n3. 测试简化格式:")
simpleDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&timeout=%s",
username, password, host, port, timeout.String())
fmt.Printf(" DSN: %s\n", simpleDSN)
testConnection("简化格式", simpleDSN, timeout)
fmt.Println("\n4. 测试完整参数格式:")
fullDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&timeout=%s&readTimeout=%s&writeTimeout=%s&parseTime=true&loc=Local&maxAllowedPacket=16777216",
username, password, host, port, timeout.String(), readTimeout.String(), readTimeout.String())
fmt.Printf(" DSN: %s\n", fullDSN)
testConnection("完整格式", fullDSN, timeout)
// 性能对比测试
fmt.Println("\n=== 性能对比测试 ===")
performanceTest(originalDSN, optimizedDSN, timeout)
}
// testConnection 测试连接
func testConnection(name, dsn string, timeout time.Duration) {
// 创建context超时时间比DSN timeout长
ctx, cancel := context.WithTimeout(context.Background(), timeout+2*time.Second)
defer cancel()
start := time.Now()
// 建立连接
db, err := sql.Open("mysql", dsn)
if err != nil {
fmt.Printf(" ❌ %s - Open失败: %v\n", name, err)
return
}
defer db.Close()
// 优化连接池配置
db.SetConnMaxLifetime(timeout * 3)
db.SetConnMaxIdleTime(timeout * 2)
db.SetMaxIdleConns(1)
db.SetMaxOpenConns(1)
// 测试ping
err = db.PingContext(ctx)
elapsed := time.Since(start)
if err != nil {
fmt.Printf(" ❌ %s - Ping失败 (耗时: %v): %v\n", name, elapsed, err)
return
}
fmt.Printf(" ✅ %s - 连接成功 (耗时: %v)\n", name, elapsed)
// 测试基本查询
testQueries(ctx, db, name)
}
// testQueries 测试基本查询
func testQueries(ctx context.Context, db *sql.DB, name string) {
queries := []struct {
desc string
query string
}{
{"版本", "SELECT VERSION()"},
{"用户", "SELECT CURRENT_USER()"},
{"时间", "SELECT NOW()"},
{"数据库数量", "SELECT COUNT(*) FROM information_schema.SCHEMATA"},
}
for _, q := range queries {
var result string
err := db.QueryRowContext(ctx, q.query).Scan(&result)
if err != nil {
fmt.Printf(" ❌ %s查询失败: %v\n", q.desc, err)
} else {
// 截断长结果
if len(result) > 50 {
result = result[:47] + "..."
}
fmt.Printf(" ✅ %s: %s\n", q.desc, result)
}
}
}
// performanceTest 性能对比测试
func performanceTest(originalDSN, optimizedDSN string, timeout time.Duration) {
testCount := 10
fmt.Printf("执行 %d 次连接测试...\n", testCount)
// 测试原始格式
fmt.Println("\n原始格式性能:")
originalTimes := make([]time.Duration, testCount)
originalSuccess := 0
for i := 0; i < testCount; i++ {
start := time.Now()
success := quickConnect(originalDSN, timeout)
elapsed := time.Since(start)
originalTimes[i] = elapsed
if success {
originalSuccess++
}
fmt.Printf(" 第%d次: %v (%t)\n", i+1, elapsed, success)
}
// 测试优化格式
fmt.Println("\n优化格式性能:")
optimizedTimes := make([]time.Duration, testCount)
optimizedSuccess := 0
for i := 0; i < testCount; i++ {
start := time.Now()
success := quickConnect(optimizedDSN, timeout)
elapsed := time.Since(start)
optimizedTimes[i] = elapsed
if success {
optimizedSuccess++
}
fmt.Printf(" 第%d次: %v (%t)\n", i+1, elapsed, success)
}
// 计算统计数据
fmt.Println("\n性能对比结果:")
fmt.Printf("原始格式: 成功率 %d/%d (%.1f%%), 平均耗时: %v\n",
originalSuccess, testCount, float64(originalSuccess)/float64(testCount)*100,
calculateAverage(originalTimes))
fmt.Printf("优化格式: 成功率 %d/%d (%.1f%%), 平均耗时: %v\n",
optimizedSuccess, testCount, float64(optimizedSuccess)/float64(testCount)*100,
calculateAverage(optimizedTimes))
if optimizedSuccess > originalSuccess {
fmt.Println("✅ 优化格式成功率更高")
} else if optimizedSuccess == originalSuccess {
fmt.Println("⚖️ 两种格式成功率相同")
} else {
fmt.Println("⚠️ 原始格式成功率更高")
}
}
// quickConnect 快速连接测试
func quickConnect(dsn string, timeout time.Duration) bool {
ctx, cancel := context.WithTimeout(context.Background(), timeout+2*time.Second)
defer cancel()
db, err := sql.Open("mysql", dsn)
if err != nil {
return false
}
defer db.Close()
db.SetConnMaxLifetime(timeout * 3)
db.SetMaxOpenConns(1)
err = db.PingContext(ctx)
return err == nil
}
// calculateAverage 计算平均时间
func calculateAverage(times []time.Duration) time.Duration {
if len(times) == 0 {
return 0
}
var total time.Duration
for _, t := range times {
total += t
}
return total / time.Duration(len(times))
}