package cassandra import ( "context" "fmt" "net" "strconv" "time" "github.com/gocql/gocql" "github.com/shadow1ng/fscan/common" "github.com/shadow1ng/fscan/plugins/base" ) // CassandraConnector Cassandra连接器实现 type CassandraConnector struct { host string port string } // CassandraProxyDialer 实现gocql.Dialer接口,支持代理连接 type CassandraProxyDialer struct { timeout time.Duration } // DialContext 实现代理拨号 func (d *CassandraProxyDialer) DialContext(ctx context.Context, network, addr string) (net.Conn, error) { host, port, err := net.SplitHostPort(addr) if err != nil { return nil, err } return common.WrapperTcpWithContext(ctx, network, fmt.Sprintf("%s:%s", host, port)) } // NewCassandraConnector 创建Cassandra连接器 func NewCassandraConnector() *CassandraConnector { return &CassandraConnector{} } // Connect 连接到Cassandra服务 func (c *CassandraConnector) Connect(ctx context.Context, info *common.HostInfo) (interface{}, error) { c.host = info.Host c.port = info.Ports // 创建Cassandra集群配置 cluster := gocql.NewCluster(c.host) // 解析端口 port, err := strconv.Atoi(c.port) if err != nil { return nil, fmt.Errorf("无效的端口号: %s", c.port) } cluster.Port = port // 设置连接参数 timeout := time.Duration(common.Timeout) * time.Second cluster.Timeout = timeout cluster.ConnectTimeout = timeout cluster.ProtoVersion = 4 cluster.Consistency = gocql.One // 如果配置了代理,设置自定义Dialer if common.Socks5Proxy != "" { cluster.Dialer = &CassandraProxyDialer{ timeout: timeout, } } // 设置重试策略 cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: 3} return cluster, nil } // Authenticate 认证 func (c *CassandraConnector) Authenticate(ctx context.Context, conn interface{}, cred *base.Credential) error { cluster, ok := conn.(*gocql.ClusterConfig) if !ok { return fmt.Errorf("无效的连接类型") } // 创建集群配置副本 authCluster := *cluster // 设置认证信息 if cred.Username != "" || cred.Password != "" { authCluster.Authenticator = gocql.PasswordAuthenticator{ Username: cred.Username, Password: cred.Password, } } // 创建会话通道以支持Context超时 sessionChan := make(chan struct { session *gocql.Session err error }, 1) // 在goroutine中创建会话,以便可以通过Context取消 go func() { session, err := authCluster.CreateSession() select { case <-ctx.Done(): if session != nil { session.Close() } case sessionChan <- struct { session *gocql.Session err error }{session, err}: } }() // 等待会话创建或Context取消 var session *gocql.Session var err error select { case result := <-sessionChan: session, err = result.session, result.err if err != nil { return fmt.Errorf("Cassandra认证失败: %v", err) } case <-ctx.Done(): return fmt.Errorf("Cassandra连接超时: %v", ctx.Err()) } defer session.Close() // 尝试执行查询验证连接 resultChan := make(chan struct { success bool err error }, 1) go func() { var err error // 尝试两种查询,确保至少一种成功 err = session.Query("SELECT peer FROM system.peers").WithContext(ctx).Scan(nil) if err != nil { err = session.Query("SELECT now() FROM system.local").WithContext(ctx).Scan(nil) } select { case <-ctx.Done(): case resultChan <- struct { success bool err error }{err == nil, err}: } }() // 等待查询结果或Context取消 select { case result := <-resultChan: if !result.success && result.err != nil { return fmt.Errorf("Cassandra查询验证失败: %v", result.err) } return nil case <-ctx.Done(): return fmt.Errorf("Cassandra查询超时: %v", ctx.Err()) } } // Close 关闭连接 func (c *CassandraConnector) Close(conn interface{}) error { // Cassandra集群配置无需显式关闭 return nil }