package main import ( "context" "database/sql" "fmt" "time" _ "github.com/go-sql-driver/mysql" ) func main() { fmt.Println("快速MySQL连接测试") fmt.Println("==================") // 测试参数(与你提供的一致) host := "127.0.0.1" port := 3306 username := "root" password := "123456" timeoutDuration := 3 * time.Second timeoutStr := timeoutDuration.String() // "3s" fmt.Printf("目标: %s:%d\n", host, port) fmt.Printf("用户名: %s\n", username) fmt.Printf("密码: %s\n", password) fmt.Printf("超时: %s\n", timeoutStr) fmt.Println() // 测试当前fscan使用的连接字符串格式 connStr := fmt.Sprintf("%s:%s@tcp(%s:%d)/mysql?charset=utf8&timeout=%s", username, password, host, port, timeoutStr) fmt.Printf("连接字符串: %s\n", connStr) fmt.Println() // 测试连接 fmt.Println("正在尝试连接...") // 创建带超时的context ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // 打开数据库连接 db, err := sql.Open("mysql", connStr) if err != nil { fmt.Printf("❌ 创建连接失败: %v\n", err) return } defer db.Close() // 设置连接池参数(模拟fscan的设置) db.SetConnMaxLifetime(timeoutDuration) db.SetConnMaxIdleTime(timeoutDuration) db.SetMaxIdleConns(1) db.SetMaxOpenConns(1) // 尝试ping - 这是真正建立连接的地方 start := time.Now() err = db.PingContext(ctx) elapsed := time.Since(start) if err != nil { fmt.Printf("❌ 连接失败 (耗时: %v): %v\n", elapsed, err) // 分析常见错误 analyzeError(err) return } fmt.Printf("✅ 连接成功! (耗时: %v)\n", elapsed) // 尝试执行查询 fmt.Println("\n正在测试查询...") var version string err = db.QueryRowContext(ctx, "SELECT VERSION()").Scan(&version) if err != nil { fmt.Printf("❌ 查询失败: %v\n", err) return } fmt.Printf("✅ 查询成功! MySQL版本: %s\n", version) // 测试其他常用操作 testCommonOperations(ctx, db) } // analyzeError 分析常见的MySQL连接错误 func analyzeError(err error) { errorMsg := err.Error() fmt.Println("\n错误分析:") if contains(errorMsg, "context deadline exceeded") { fmt.Println("- 可能原因: 连接超时") fmt.Println(" 解决方案: 1) 检查网络连接 2) 增加超时时间 3) 检查MySQL服务状态") } if contains(errorMsg, "connection refused") { fmt.Println("- 可能原因: MySQL服务未启动或端口不正确") fmt.Println(" 解决方案: 1) 启动MySQL服务 2) 检查端口配置") } if contains(errorMsg, "access denied") { fmt.Println("- 可能原因: 用户名密码错误或权限不足") fmt.Println(" 解决方案: 1) 检查用户名密码 2) 检查用户权限") } if contains(errorMsg, "unknown database") { fmt.Println("- 可能原因: 数据库'mysql'不存在") fmt.Println(" 解决方案: 1) 使用不指定数据库的连接字符串") } // 提供替代连接字符串 fmt.Println("\n建议尝试以下连接字符串:") alternatives := []string{ "root:123456@tcp(127.0.0.1:3306)/?charset=utf8&timeout=3s", "root:123456@tcp(127.0.0.1:3306)?charset=utf8&timeout=3s", "root:123456@tcp(127.0.0.1:3306)/information_schema?charset=utf8&timeout=3s", } for i, alt := range alternatives { fmt.Printf("%d. %s\n", i+1, alt) } } // testCommonOperations 测试常用数据库操作 func testCommonOperations(ctx context.Context, db *sql.DB) { fmt.Println("\n正在测试常用操作...") // 测试显示数据库 fmt.Println("1. 显示数据库:") rows, err := db.QueryContext(ctx, "SHOW DATABASES") if err != nil { fmt.Printf(" ❌ SHOW DATABASES失败: %v\n", err) } else { defer rows.Close() var dbName string count := 0 for rows.Next() && count < 5 { // 只显示前5个 if err := rows.Scan(&dbName); err == nil { fmt.Printf(" - %s\n", dbName) count++ } } fmt.Printf(" ✅ 成功显示数据库列表\n") } // 测试显示用户 fmt.Println("2. 显示当前用户:") var user string err = db.QueryRowContext(ctx, "SELECT CURRENT_USER()").Scan(&user) if err != nil { fmt.Printf(" ❌ 获取用户失败: %v\n", err) } else { fmt.Printf(" ✅ 当前用户: %s\n", user) } // 测试服务器变量 fmt.Println("3. 重要服务器变量:") variables := []string{ "max_connections", "wait_timeout", "interactive_timeout", } for _, varName := range variables { var value string query := fmt.Sprintf("SHOW VARIABLES LIKE '%s'", varName) err := db.QueryRowContext(ctx, query).Scan(&varName, &value) if err != nil { fmt.Printf(" ❌ %s: 获取失败\n", varName) } else { fmt.Printf(" ✅ %s: %s\n", varName, value) } } } // contains 检查字符串是否包含子串(忽略大小写) func contains(s, substr string) bool { return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && indexOf(s, substr) >= 0) } // indexOf 查找子串位置 func indexOf(s, substr string) int { for i := 0; i <= len(s)-len(substr); i++ { match := true for j := 0; j < len(substr); j++ { if toLower(s[i+j]) != toLower(substr[j]) { match = false break } } if match { return i } } return -1 } // toLower 转换为小写(简单实现) func toLower(b byte) byte { if b >= 'A' && b <= 'Z' { return b + ('a' - 'A') } return b }