fscan/plugins/services/kafka.go
ZacharyZcR 6cf5719e8a refactor: 彻底清理插件系统,消除虚假利用功能
- 删除整个legacy插件系统(7794行代码)
- 完成所有插件向单文件架构迁移
- 移除19个插件的虚假Exploit功能,只保留真实利用:
  * Redis: 文件写入、SSH密钥注入、计划任务
  * SSH: 命令执行
  * MS17010: EternalBlue漏洞利用
- 统一插件接口,简化架构复杂度
- 清理临时文件和备份文件

重构效果:
- 代码行数: -7794行
- 插件文件数: 从3文件架构→单文件架构
- 真实利用插件: 从22个→3个
- 架构复杂度: 大幅简化
2025-08-26 11:43:48 +08:00

269 lines
6.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package services
import (
"context"
"fmt"
"time"
"github.com/IBM/sarama"
"github.com/shadow1ng/fscan/common"
"github.com/shadow1ng/fscan/common/i18n"
)
// KafkaPlugin Kafka消息队列扫描和利用插件 - 包含信息收集利用功能
type KafkaPlugin struct {
name string
ports []int
}
// NewKafkaPlugin 创建Kafka插件
func NewKafkaPlugin() *KafkaPlugin {
return &KafkaPlugin{
name: "kafka",
ports: []int{9092, 9093, 9094}, // Kafka broker端口
}
}
// GetName 实现Plugin接口
func (p *KafkaPlugin) GetName() string {
return p.name
}
// GetPorts 实现Plugin接口
func (p *KafkaPlugin) GetPorts() []int {
return p.ports
}
// Scan 执行Kafka扫描 - 弱密码检测和未授权访问检测
func (p *KafkaPlugin) Scan(ctx context.Context, info *common.HostInfo) *ScanResult {
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
// 如果禁用暴力破解,只做服务识别
if common.DisableBrute {
return p.identifyService(ctx, info)
}
// 首先检查未授权访问
if result := p.testUnauthorizedAccess(ctx, info); result != nil && result.Success {
common.LogSuccess(i18n.GetText("kafka_unauthorized_success", target))
return result
}
// 生成测试凭据
credentials := GenerateCredentials("kafka")
if len(credentials) == 0 {
// Kafka默认凭据
credentials = []Credential{
{Username: "admin", Password: "admin"},
{Username: "admin", Password: ""},
{Username: "kafka", Password: "kafka"},
{Username: "user", Password: "user"},
{Username: "test", Password: "test"},
}
}
// 逐个测试凭据
for _, cred := range credentials {
// 检查Context是否被取消
select {
case <-ctx.Done():
return &ScanResult{
Success: false,
Service: "kafka",
Error: ctx.Err(),
}
default:
}
// 测试凭据
if client := p.testCredential(ctx, info, cred); client != nil {
client.Close() // 关闭测试连接
// Kafka认证成功
common.LogSuccess(i18n.GetText("kafka_scan_success", target, cred.Username, cred.Password))
return &ScanResult{
Success: true,
Service: "kafka",
Username: cred.Username,
Password: cred.Password,
}
}
}
// 所有凭据都失败
return &ScanResult{
Success: false,
Service: "kafka",
Error: fmt.Errorf("未发现弱密码或未授权访问"),
}
}
// testUnauthorizedAccess 测试未授权访问
func (p *KafkaPlugin) testUnauthorizedAccess(ctx context.Context, info *common.HostInfo) *ScanResult {
// 尝试无认证连接
emptyCred := Credential{Username: "", Password: ""}
if client := p.testCredential(ctx, info, emptyCred); client != nil {
client.Close()
return &ScanResult{
Success: true,
Service: "kafka",
Banner: "未授权访问",
}
}
return nil
}
// testCredential 测试单个凭据 - 返回Kafka客户端或nil
func (p *KafkaPlugin) testCredential(ctx context.Context, info *common.HostInfo, cred Credential) sarama.Client {
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
timeout := time.Duration(common.Timeout) * time.Second
// 创建Kafka配置
config := sarama.NewConfig()
config.Net.DialTimeout = timeout
config.Net.ReadTimeout = timeout
config.Net.WriteTimeout = timeout
config.Net.TLS.Enable = false
config.Version = sarama.V2_0_0_0
// 如果提供了用户名密码设置SASL认证
if cred.Username != "" || cred.Password != "" {
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Net.SASL.User = cred.Username
config.Net.SASL.Password = cred.Password
config.Net.SASL.Handshake = true
}
brokers := []string{target}
// 使用Context控制超时
type kafkaResult struct {
client sarama.Client
err error
}
clientChan := make(chan kafkaResult, 1)
go func() {
// 尝试创建客户端
client, err := sarama.NewClient(brokers, config)
select {
case <-ctx.Done():
if client != nil {
client.Close()
}
case clientChan <- kafkaResult{client, err}:
}
}()
// 等待客户端创建或超时
select {
case result := <-clientChan:
if result.err != nil {
return nil
}
return result.client
case <-ctx.Done():
return nil
}
}
// getConsumerGroups 获取消费者组列表
func (p *KafkaPlugin) getConsumerGroups(client sarama.Client) ([]string, error) {
// 创建协调器客户端获取消费者组信息
brokers := client.Brokers()
if len(brokers) == 0 {
return nil, fmt.Errorf("没有可用的broker")
}
broker := brokers[0] // 使用第一个broker
// 打开broker连接
if err := broker.Open(client.Config()); err != nil {
return nil, err
}
defer broker.Close()
// 发送ListGroups请求
request := &sarama.ListGroupsRequest{}
response, err := broker.ListGroups(request)
if err != nil {
return nil, err
}
groups := make([]string, 0, len(response.Groups))
for groupId := range response.Groups {
groups = append(groups, groupId)
}
return groups, nil
}
// testProduceMessage 测试发送消息
func (p *KafkaPlugin) testProduceMessage(client sarama.Client, topic string) error {
config := client.Config()
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
return err
}
defer producer.Close()
// 发送测试消息
message := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder("FScan security test message"),
}
_, _, err = producer.SendMessage(message)
return err
}
// identifyService 服务识别 - 检测Kafka服务
func (p *KafkaPlugin) identifyService(ctx context.Context, info *common.HostInfo) *ScanResult {
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
// 尝试无认证连接
emptyCred := Credential{Username: "", Password: ""}
client := p.testCredential(ctx, info, emptyCred)
if client == nil {
return &ScanResult{
Success: false,
Service: "kafka",
Error: fmt.Errorf("无法连接到Kafka服务"),
}
}
defer client.Close()
// 获取集群信息作为banner
var banner string
if err := client.RefreshMetadata(); err == nil {
brokers := client.Brokers()
topics, _ := client.Topics()
banner = fmt.Sprintf("Kafka集群 (Brokers: %d, Topics: %d)", len(brokers), len(topics))
} else {
banner = "Kafka服务"
}
common.LogSuccess(i18n.GetText("kafka_service_identified", target, banner))
return &ScanResult{
Success: true,
Service: "kafka",
Banner: banner,
}
}
// init 自动注册插件
func init() {
RegisterPlugin("kafka", func() Plugin {
return NewKafkaPlugin()
})
}