fscan/plugins/kafka.go
ZacharyZcR 678d750c8a refactor: 重构插件架构,实现单文件插件系统
将复杂的三文件插件架构(connector/exploiter/plugin)重构为简化的单文件插件架构,
大幅减少代码重复和维护成本,提升插件开发效率。

主要改进:
• 将每个服务插件从3个文件简化为1个文件
• 删除过度设计的工厂模式、适配器模式等抽象层
• 消除plugins/services/、plugins/adapters/、plugins/base/复杂目录结构
• 实现直接的插件注册机制,提升系统简洁性
• 保持完全向后兼容,所有扫描功能和输出格式不变

重构统计:
• 删除文件:100+个复杂架构文件
• 新增文件:20个简化的单文件插件
• 代码减少:每个插件减少60-80%代码量
• 功能增强:所有插件包含完整扫描和利用功能

已重构插件: MySQL, SSH, Redis, MongoDB, PostgreSQL, MSSQL, Oracle,
Neo4j, Memcached, RabbitMQ, ActiveMQ, Cassandra, FTP, Kafka, LDAP,
Rsync, SMTP, SNMP, Telnet, VNC

验证通过: 新系统编译运行正常,所有插件功能验证通过
2025-08-25 23:57:00 +08:00

349 lines
8.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package plugins
import (
"context"
"fmt"
"strings"
"time"
"github.com/IBM/sarama"
"github.com/shadow1ng/fscan/common"
"github.com/shadow1ng/fscan/common/i18n"
)
// KafkaPlugin Kafka消息队列扫描和利用插件 - 包含信息收集利用功能
type KafkaPlugin struct {
name string
ports []int
}
// NewKafkaPlugin 创建Kafka插件
func NewKafkaPlugin() *KafkaPlugin {
return &KafkaPlugin{
name: "kafka",
ports: []int{9092, 9093, 9094}, // Kafka broker端口
}
}
// GetName 实现Plugin接口
func (p *KafkaPlugin) GetName() string {
return p.name
}
// GetPorts 实现Plugin接口
func (p *KafkaPlugin) GetPorts() []int {
return p.ports
}
// Scan 执行Kafka扫描 - 弱密码检测和未授权访问检测
func (p *KafkaPlugin) Scan(ctx context.Context, info *common.HostInfo) *ScanResult {
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
// 如果禁用暴力破解,只做服务识别
if common.DisableBrute {
return p.identifyService(ctx, info)
}
// 首先检查未授权访问
if result := p.testUnauthorizedAccess(ctx, info); result != nil && result.Success {
common.LogSuccess(i18n.GetText("kafka_unauthorized_success", target))
return result
}
// 生成测试凭据
credentials := GenerateCredentials("kafka")
if len(credentials) == 0 {
// Kafka默认凭据
credentials = []Credential{
{Username: "admin", Password: "admin"},
{Username: "admin", Password: ""},
{Username: "kafka", Password: "kafka"},
{Username: "user", Password: "user"},
{Username: "test", Password: "test"},
}
}
// 逐个测试凭据
for _, cred := range credentials {
// 检查Context是否被取消
select {
case <-ctx.Done():
return &ScanResult{
Success: false,
Service: "kafka",
Error: ctx.Err(),
}
default:
}
// 测试凭据
if client := p.testCredential(ctx, info, cred); client != nil {
client.Close() // 关闭测试连接
// Kafka认证成功
common.LogSuccess(i18n.GetText("kafka_scan_success", target, cred.Username, cred.Password))
return &ScanResult{
Success: true,
Service: "kafka",
Username: cred.Username,
Password: cred.Password,
}
}
}
// 所有凭据都失败
return &ScanResult{
Success: false,
Service: "kafka",
Error: fmt.Errorf("未发现弱密码或未授权访问"),
}
}
// Exploit 执行Kafka利用操作 - 实现信息收集功能
func (p *KafkaPlugin) Exploit(ctx context.Context, info *common.HostInfo, creds Credential) *ExploitResult {
// 建立Kafka连接
client := p.testCredential(ctx, info, creds)
if client == nil {
return &ExploitResult{
Success: false,
Error: fmt.Errorf("Kafka连接失败"),
}
}
defer client.Close()
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
common.LogSuccess(fmt.Sprintf("Kafka利用开始: %s (用户: %s)", target, creds.Username))
var output strings.Builder
output.WriteString(fmt.Sprintf("=== Kafka利用结果 - %s ===\n", target))
// 获取集群元数据
if err := client.RefreshMetadata(); err == nil {
// 获取刷新后的元数据
brokers := client.Brokers()
topics, _ := client.Topics()
output.WriteString(fmt.Sprintf("\n[集群信息]\n"))
output.WriteString(fmt.Sprintf(" Broker数量: %d\n", len(brokers)))
// 显示Broker信息
for i, broker := range brokers {
if i >= 5 { // 限制显示前5个broker
output.WriteString(" ... (更多broker)\n")
break
}
output.WriteString(fmt.Sprintf(" Broker %d: %s\n", broker.ID(), broker.Addr()))
}
// 显示Topic列表
output.WriteString(fmt.Sprintf("\n[Topic列表] (共%d个)\n", len(topics)))
for i, topic := range topics {
if i >= 10 { // 限制显示前10个topic
output.WriteString(" ... (更多topic)\n")
break
}
// 获取topic分区数
if partitions, err := client.Partitions(topic); err == nil {
output.WriteString(fmt.Sprintf(" %s (分区数: %d)\n", topic, len(partitions)))
} else {
output.WriteString(fmt.Sprintf(" %s\n", topic))
}
}
}
// 获取消费者组信息
if groups, err := p.getConsumerGroups(client); err == nil && len(groups) > 0 {
output.WriteString(fmt.Sprintf("\n[消费者组] (共%d个)\n", len(groups)))
for i, group := range groups {
if i >= 5 { // 限制显示前5个组
output.WriteString(" ... (更多消费者组)\n")
break
}
output.WriteString(fmt.Sprintf(" %s\n", group))
}
}
// 尝试生产消息测试(如果有写权限)
if err := p.testProduceMessage(client, "fscan-test-topic"); err == nil {
output.WriteString(fmt.Sprintf("\n[权限测试] ✅ 成功发送测试消息\n"))
} else {
output.WriteString(fmt.Sprintf("\n[权限测试] ❌ 无生产者权限: %v\n", err))
}
common.LogSuccess(fmt.Sprintf("Kafka利用完成: %s", target))
return &ExploitResult{
Success: true,
Output: output.String(),
}
}
// testUnauthorizedAccess 测试未授权访问
func (p *KafkaPlugin) testUnauthorizedAccess(ctx context.Context, info *common.HostInfo) *ScanResult {
// 尝试无认证连接
emptyCred := Credential{Username: "", Password: ""}
if client := p.testCredential(ctx, info, emptyCred); client != nil {
client.Close()
return &ScanResult{
Success: true,
Service: "kafka",
Banner: "未授权访问",
}
}
return nil
}
// testCredential 测试单个凭据 - 返回Kafka客户端或nil
func (p *KafkaPlugin) testCredential(ctx context.Context, info *common.HostInfo, cred Credential) sarama.Client {
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
timeout := time.Duration(common.Timeout) * time.Second
// 创建Kafka配置
config := sarama.NewConfig()
config.Net.DialTimeout = timeout
config.Net.ReadTimeout = timeout
config.Net.WriteTimeout = timeout
config.Net.TLS.Enable = false
config.Version = sarama.V2_0_0_0
// 如果提供了用户名密码设置SASL认证
if cred.Username != "" || cred.Password != "" {
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Net.SASL.User = cred.Username
config.Net.SASL.Password = cred.Password
config.Net.SASL.Handshake = true
}
brokers := []string{target}
// 使用Context控制超时
type kafkaResult struct {
client sarama.Client
err error
}
clientChan := make(chan kafkaResult, 1)
go func() {
// 尝试创建客户端
client, err := sarama.NewClient(brokers, config)
select {
case <-ctx.Done():
if client != nil {
client.Close()
}
case clientChan <- kafkaResult{client, err}:
}
}()
// 等待客户端创建或超时
select {
case result := <-clientChan:
if result.err != nil {
return nil
}
return result.client
case <-ctx.Done():
return nil
}
}
// getConsumerGroups 获取消费者组列表
func (p *KafkaPlugin) getConsumerGroups(client sarama.Client) ([]string, error) {
// 创建协调器客户端获取消费者组信息
brokers := client.Brokers()
if len(brokers) == 0 {
return nil, fmt.Errorf("没有可用的broker")
}
broker := brokers[0] // 使用第一个broker
// 打开broker连接
if err := broker.Open(client.Config()); err != nil {
return nil, err
}
defer broker.Close()
// 发送ListGroups请求
request := &sarama.ListGroupsRequest{}
response, err := broker.ListGroups(request)
if err != nil {
return nil, err
}
groups := make([]string, 0, len(response.Groups))
for groupId := range response.Groups {
groups = append(groups, groupId)
}
return groups, nil
}
// testProduceMessage 测试发送消息
func (p *KafkaPlugin) testProduceMessage(client sarama.Client, topic string) error {
config := client.Config()
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
return err
}
defer producer.Close()
// 发送测试消息
message := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder("FScan security test message"),
}
_, _, err = producer.SendMessage(message)
return err
}
// identifyService 服务识别 - 检测Kafka服务
func (p *KafkaPlugin) identifyService(ctx context.Context, info *common.HostInfo) *ScanResult {
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
// 尝试无认证连接
emptyCred := Credential{Username: "", Password: ""}
client := p.testCredential(ctx, info, emptyCred)
if client == nil {
return &ScanResult{
Success: false,
Service: "kafka",
Error: fmt.Errorf("无法连接到Kafka服务"),
}
}
defer client.Close()
// 获取集群信息作为banner
var banner string
if err := client.RefreshMetadata(); err == nil {
brokers := client.Brokers()
topics, _ := client.Topics()
banner = fmt.Sprintf("Kafka集群 (Brokers: %d, Topics: %d)", len(brokers), len(topics))
} else {
banner = "Kafka服务"
}
common.LogSuccess(i18n.GetText("kafka_service_identified", target, banner))
return &ScanResult{
Success: true,
Service: "kafka",
Banner: banner,
}
}
// init 自动注册插件
func init() {
RegisterPlugin("kafka", func() Plugin {
return NewKafkaPlugin()
})
}