fscan/Plugins/services/kafka/plugin.go

216 lines
6.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package kafka
import (
"context"
"fmt"
"strings"
"time"
"github.com/IBM/sarama"
"github.com/shadow1ng/fscan/common"
"github.com/shadow1ng/fscan/common/i18n"
"github.com/shadow1ng/fscan/plugins/base"
)
// KafkaPlugin Kafka插件实现
type KafkaPlugin struct {
*base.ServicePlugin
exploiter *KafkaExploiter
}
// NewKafkaPlugin 创建Kafka插件
func NewKafkaPlugin() *KafkaPlugin {
// 插件元数据
metadata := &base.PluginMetadata{
Name: "kafka",
Version: "2.0.0",
Author: "fscan-team",
Description: "Apache Kafka消息队列扫描和利用插件",
Category: "service",
Ports: []int{9092, 9093, 9094}, // Kafka常用端口
Protocols: []string{"tcp"},
Tags: []string{"kafka", "message-queue", "bruteforce", "unauthorized"},
}
// 创建连接器和服务插件
connector := NewKafkaConnector()
servicePlugin := base.NewServicePlugin(metadata, connector)
// 创建Kafka插件
plugin := &KafkaPlugin{
ServicePlugin: servicePlugin,
exploiter: NewKafkaExploiter(),
}
// 设置能力
plugin.SetCapabilities([]base.Capability{
base.CapWeakPassword,
base.CapDataExtraction,
})
return plugin
}
// Scan 重写扫描方法,先检测无认证访问
func (p *KafkaPlugin) Scan(ctx context.Context, info *common.HostInfo) (*base.ScanResult, error) {
// 如果禁用了暴力破解,只进行服务识别
if common.DisableBrute {
return p.performServiceIdentification(ctx, info)
}
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
// 先尝试无认证访问
unauthCred := &base.Credential{Username: "", Password: ""}
unauthResult, err := p.ScanCredential(ctx, info, unauthCred)
if err == nil && unauthResult.Success {
// 无认证访问成功
common.LogSuccess(i18n.GetText("kafka_unauth_access", target))
return &base.ScanResult{
Success: true,
Service: "Kafka",
Credentials: []*base.Credential{unauthCred},
Extra: map[string]interface{}{
"service": "Kafka",
"port": info.Ports,
"unauthorized": true,
"access_type": "no_authentication",
},
}, nil
}
// 执行基础的密码扫描
result, err := p.ServicePlugin.Scan(ctx, info)
if err != nil || !result.Success {
return result, err
}
// 记录成功的弱密码发现
cred := result.Credentials[0]
common.LogSuccess(i18n.GetText("kafka_weak_pwd_success", target, cred.Username, cred.Password))
return result, nil
}
// generateCredentials 重写凭据生成方法
func (p *KafkaPlugin) generateCredentials() []*base.Credential {
// 获取Kafka专用的用户名字典
usernames := common.Userdict["kafka"]
if len(usernames) == 0 {
// 默认Kafka用户名
usernames = []string{"admin", "kafka", "test", "user", "root"}
}
return base.GenerateCredentials(usernames, common.Passwords)
}
// Exploit 使用exploiter执行利用
func (p *KafkaPlugin) Exploit(ctx context.Context, info *common.HostInfo, creds *base.Credential) (*base.ExploitResult, error) {
return p.exploiter.Exploit(ctx, info, creds)
}
// GetExploitMethods 获取利用方法
func (p *KafkaPlugin) GetExploitMethods() []base.ExploitMethod {
return p.exploiter.GetExploitMethods()
}
// IsExploitSupported 检查利用支持
func (p *KafkaPlugin) IsExploitSupported(method base.ExploitType) bool {
return p.exploiter.IsExploitSupported(method)
}
// performServiceIdentification 执行Kafka服务识别-nobr模式
func (p *KafkaPlugin) performServiceIdentification(ctx context.Context, info *common.HostInfo) (*base.ScanResult, error) {
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
// 尝试连接Kafka获取版本信息
kafkaInfo, isKafka := p.identifyKafkaService(ctx, info)
if isKafka {
// 记录服务识别成功
common.LogSuccess(i18n.GetText("kafka_service_identified", target, kafkaInfo))
return &base.ScanResult{
Success: true,
Service: "Kafka",
Banner: kafkaInfo,
Extra: map[string]interface{}{
"service": "Kafka",
"port": info.Ports,
"info": kafkaInfo,
},
}, nil
}
// 如果无法识别为Kafka返回失败
return &base.ScanResult{
Success: false,
Error: fmt.Errorf("无法识别为Kafka服务"),
}, nil
}
// identifyKafkaService 通过连接识别Kafka服务
func (p *KafkaPlugin) identifyKafkaService(ctx context.Context, info *common.HostInfo) (string, bool) {
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
timeout := time.Duration(common.Timeout) * time.Second
config := sarama.NewConfig()
config.Net.DialTimeout = timeout
config.Net.ReadTimeout = timeout
config.Net.WriteTimeout = timeout
config.Net.TLS.Enable = false
config.Version = sarama.V2_0_0_0
brokers := []string{target}
// 尝试创建客户端连接
client, err := sarama.NewClient(brokers, config)
if err != nil {
// 检查错误是否表明这是Kafka服务但认证失败
if strings.Contains(strings.ToLower(err.Error()), "kafka") ||
strings.Contains(strings.ToLower(err.Error()), "sasl") ||
strings.Contains(strings.ToLower(err.Error()), "authentication") {
return fmt.Sprintf("Kafka服务 (需要认证): %v", err), true
}
return "", false
}
defer client.Close()
// 获取集群信息
brokerList := client.Brokers()
if len(brokerList) > 0 {
return fmt.Sprintf("Kafka集群 (Brokers: %d)", len(brokerList)), true
}
return "Kafka服务", true
}
// =============================================================================
// 插件注册
// =============================================================================
// RegisterKafkaPlugin 注册Kafka插件
func RegisterKafkaPlugin() {
factory := base.NewSimplePluginFactory(
&base.PluginMetadata{
Name: "kafka",
Version: "2.0.0",
Author: "fscan-team",
Description: "Apache Kafka消息队列扫描和利用插件",
Category: "service",
Ports: []int{9092, 9093, 9094}, // Kafka常用端口
Protocols: []string{"tcp"},
Tags: []string{"kafka", "message-queue", "bruteforce", "unauthorized"},
},
func() base.Plugin {
return NewKafkaPlugin()
},
)
base.GlobalPluginRegistry.Register("kafka", factory)
}
// 自动注册
func init() {
RegisterKafkaPlugin()
}