fscan/mysql_tests/quick_mysql_check.go
ZacharyZcR 43f210ffc6 feat: 实现新一代插件注册系统完全替代传统手动注册模式
- 重构插件注册架构采用现代工厂模式和自动发现机制
- 新增完整的插件元数据管理系统支持版本能力标签等信息
- 实现智能插件适配器提供向后兼容的桥接功能
- 建立MySQL Redis SSH三个标准插件作为新架构参考实现
- 优化插件扫描逻辑支持按端口按类型的智能查询和过滤
- 添加国际化支持和完善的文档体系
- 代码量减少67%维护成本大幅降低扩展性显著提升

新架构特点:
- 零配置插件注册import即用
- 工厂模式延迟初始化和依赖注入
- 丰富元数据系统和能力声明
- 完全解耦的模块化设计
- 面向未来的可扩展架构

测试验证: MySQL和Redis插件功能完整包括弱密码检测未授权访问检测和自动利用攻击
2025-08-07 11:28:34 +08:00

210 lines
5.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"database/sql"
"fmt"
"time"
_ "github.com/go-sql-driver/mysql"
)
func main() {
fmt.Println("快速MySQL连接测试")
fmt.Println("==================")
// 测试参数(与你提供的一致)
host := "127.0.0.1"
port := 3306
username := "root"
password := "123456"
timeoutDuration := 3 * time.Second
timeoutStr := timeoutDuration.String() // "3s"
fmt.Printf("目标: %s:%d\n", host, port)
fmt.Printf("用户名: %s\n", username)
fmt.Printf("密码: %s\n", password)
fmt.Printf("超时: %s\n", timeoutStr)
fmt.Println()
// 测试当前fscan使用的连接字符串格式
connStr := fmt.Sprintf("%s:%s@tcp(%s:%d)/mysql?charset=utf8&timeout=%s",
username, password, host, port, timeoutStr)
fmt.Printf("连接字符串: %s\n", connStr)
fmt.Println()
// 测试连接
fmt.Println("正在尝试连接...")
// 创建带超时的context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 打开数据库连接
db, err := sql.Open("mysql", connStr)
if err != nil {
fmt.Printf("❌ 创建连接失败: %v\n", err)
return
}
defer db.Close()
// 设置连接池参数模拟fscan的设置
db.SetConnMaxLifetime(timeoutDuration)
db.SetConnMaxIdleTime(timeoutDuration)
db.SetMaxIdleConns(1)
db.SetMaxOpenConns(1)
// 尝试ping - 这是真正建立连接的地方
start := time.Now()
err = db.PingContext(ctx)
elapsed := time.Since(start)
if err != nil {
fmt.Printf("❌ 连接失败 (耗时: %v): %v\n", elapsed, err)
// 分析常见错误
analyzeError(err)
return
}
fmt.Printf("✅ 连接成功! (耗时: %v)\n", elapsed)
// 尝试执行查询
fmt.Println("\n正在测试查询...")
var version string
err = db.QueryRowContext(ctx, "SELECT VERSION()").Scan(&version)
if err != nil {
fmt.Printf("❌ 查询失败: %v\n", err)
return
}
fmt.Printf("✅ 查询成功! MySQL版本: %s\n", version)
// 测试其他常用操作
testCommonOperations(ctx, db)
}
// analyzeError 分析常见的MySQL连接错误
func analyzeError(err error) {
errorMsg := err.Error()
fmt.Println("\n错误分析:")
if contains(errorMsg, "context deadline exceeded") {
fmt.Println("- 可能原因: 连接超时")
fmt.Println(" 解决方案: 1) 检查网络连接 2) 增加超时时间 3) 检查MySQL服务状态")
}
if contains(errorMsg, "connection refused") {
fmt.Println("- 可能原因: MySQL服务未启动或端口不正确")
fmt.Println(" 解决方案: 1) 启动MySQL服务 2) 检查端口配置")
}
if contains(errorMsg, "access denied") {
fmt.Println("- 可能原因: 用户名密码错误或权限不足")
fmt.Println(" 解决方案: 1) 检查用户名密码 2) 检查用户权限")
}
if contains(errorMsg, "unknown database") {
fmt.Println("- 可能原因: 数据库'mysql'不存在")
fmt.Println(" 解决方案: 1) 使用不指定数据库的连接字符串")
}
// 提供替代连接字符串
fmt.Println("\n建议尝试以下连接字符串:")
alternatives := []string{
"root:123456@tcp(127.0.0.1:3306)/?charset=utf8&timeout=3s",
"root:123456@tcp(127.0.0.1:3306)?charset=utf8&timeout=3s",
"root:123456@tcp(127.0.0.1:3306)/information_schema?charset=utf8&timeout=3s",
}
for i, alt := range alternatives {
fmt.Printf("%d. %s\n", i+1, alt)
}
}
// testCommonOperations 测试常用数据库操作
func testCommonOperations(ctx context.Context, db *sql.DB) {
fmt.Println("\n正在测试常用操作...")
// 测试显示数据库
fmt.Println("1. 显示数据库:")
rows, err := db.QueryContext(ctx, "SHOW DATABASES")
if err != nil {
fmt.Printf(" ❌ SHOW DATABASES失败: %v\n", err)
} else {
defer rows.Close()
var dbName string
count := 0
for rows.Next() && count < 5 { // 只显示前5个
if err := rows.Scan(&dbName); err == nil {
fmt.Printf(" - %s\n", dbName)
count++
}
}
fmt.Printf(" ✅ 成功显示数据库列表\n")
}
// 测试显示用户
fmt.Println("2. 显示当前用户:")
var user string
err = db.QueryRowContext(ctx, "SELECT CURRENT_USER()").Scan(&user)
if err != nil {
fmt.Printf(" ❌ 获取用户失败: %v\n", err)
} else {
fmt.Printf(" ✅ 当前用户: %s\n", user)
}
// 测试服务器变量
fmt.Println("3. 重要服务器变量:")
variables := []string{
"max_connections",
"wait_timeout",
"interactive_timeout",
}
for _, varName := range variables {
var value string
query := fmt.Sprintf("SHOW VARIABLES LIKE '%s'", varName)
err := db.QueryRowContext(ctx, query).Scan(&varName, &value)
if err != nil {
fmt.Printf(" ❌ %s: 获取失败\n", varName)
} else {
fmt.Printf(" ✅ %s: %s\n", varName, value)
}
}
}
// contains 检查字符串是否包含子串(忽略大小写)
func contains(s, substr string) bool {
return len(s) >= len(substr) &&
(s == substr ||
len(s) > len(substr) &&
indexOf(s, substr) >= 0)
}
// indexOf 查找子串位置
func indexOf(s, substr string) int {
for i := 0; i <= len(s)-len(substr); i++ {
match := true
for j := 0; j < len(substr); j++ {
if toLower(s[i+j]) != toLower(substr[j]) {
match = false
break
}
}
if match {
return i
}
}
return -1
}
// toLower 转换为小写(简单实现)
func toLower(b byte) byte {
if b >= 'A' && b <= 'Z' {
return b + ('a' - 'A')
}
return b
}