fscan/plugins/services/kafka.go
ZacharyZcR e082e2bb59 refactor: 重组插件目录结构,提升管理直观性
将所有服务插件移动到plugins/services/目录下,使目录结构更加清晰直观:
• 创建plugins/services/目录统一管理服务扫描插件
• 添加init.go提供类型别名和函数导出
• 更新main.go导入路径
• 所有20个服务插件功能验证正常

新的目录结构更便于插件管理和维护。
2025-08-26 00:02:13 +08:00

349 lines
8.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package services
import (
"context"
"fmt"
"strings"
"time"
"github.com/IBM/sarama"
"github.com/shadow1ng/fscan/common"
"github.com/shadow1ng/fscan/common/i18n"
)
// KafkaPlugin Kafka消息队列扫描和利用插件 - 包含信息收集利用功能
type KafkaPlugin struct {
name string
ports []int
}
// NewKafkaPlugin 创建Kafka插件
func NewKafkaPlugin() *KafkaPlugin {
return &KafkaPlugin{
name: "kafka",
ports: []int{9092, 9093, 9094}, // Kafka broker端口
}
}
// GetName 实现Plugin接口
func (p *KafkaPlugin) GetName() string {
return p.name
}
// GetPorts 实现Plugin接口
func (p *KafkaPlugin) GetPorts() []int {
return p.ports
}
// Scan 执行Kafka扫描 - 弱密码检测和未授权访问检测
func (p *KafkaPlugin) Scan(ctx context.Context, info *common.HostInfo) *ScanResult {
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
// 如果禁用暴力破解,只做服务识别
if common.DisableBrute {
return p.identifyService(ctx, info)
}
// 首先检查未授权访问
if result := p.testUnauthorizedAccess(ctx, info); result != nil && result.Success {
common.LogSuccess(i18n.GetText("kafka_unauthorized_success", target))
return result
}
// 生成测试凭据
credentials := GenerateCredentials("kafka")
if len(credentials) == 0 {
// Kafka默认凭据
credentials = []Credential{
{Username: "admin", Password: "admin"},
{Username: "admin", Password: ""},
{Username: "kafka", Password: "kafka"},
{Username: "user", Password: "user"},
{Username: "test", Password: "test"},
}
}
// 逐个测试凭据
for _, cred := range credentials {
// 检查Context是否被取消
select {
case <-ctx.Done():
return &ScanResult{
Success: false,
Service: "kafka",
Error: ctx.Err(),
}
default:
}
// 测试凭据
if client := p.testCredential(ctx, info, cred); client != nil {
client.Close() // 关闭测试连接
// Kafka认证成功
common.LogSuccess(i18n.GetText("kafka_scan_success", target, cred.Username, cred.Password))
return &ScanResult{
Success: true,
Service: "kafka",
Username: cred.Username,
Password: cred.Password,
}
}
}
// 所有凭据都失败
return &ScanResult{
Success: false,
Service: "kafka",
Error: fmt.Errorf("未发现弱密码或未授权访问"),
}
}
// Exploit 执行Kafka利用操作 - 实现信息收集功能
func (p *KafkaPlugin) Exploit(ctx context.Context, info *common.HostInfo, creds Credential) *ExploitResult {
// 建立Kafka连接
client := p.testCredential(ctx, info, creds)
if client == nil {
return &ExploitResult{
Success: false,
Error: fmt.Errorf("Kafka连接失败"),
}
}
defer client.Close()
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
common.LogSuccess(fmt.Sprintf("Kafka利用开始: %s (用户: %s)", target, creds.Username))
var output strings.Builder
output.WriteString(fmt.Sprintf("=== Kafka利用结果 - %s ===\n", target))
// 获取集群元数据
if err := client.RefreshMetadata(); err == nil {
// 获取刷新后的元数据
brokers := client.Brokers()
topics, _ := client.Topics()
output.WriteString(fmt.Sprintf("\n[集群信息]\n"))
output.WriteString(fmt.Sprintf(" Broker数量: %d\n", len(brokers)))
// 显示Broker信息
for i, broker := range brokers {
if i >= 5 { // 限制显示前5个broker
output.WriteString(" ... (更多broker)\n")
break
}
output.WriteString(fmt.Sprintf(" Broker %d: %s\n", broker.ID(), broker.Addr()))
}
// 显示Topic列表
output.WriteString(fmt.Sprintf("\n[Topic列表] (共%d个)\n", len(topics)))
for i, topic := range topics {
if i >= 10 { // 限制显示前10个topic
output.WriteString(" ... (更多topic)\n")
break
}
// 获取topic分区数
if partitions, err := client.Partitions(topic); err == nil {
output.WriteString(fmt.Sprintf(" %s (分区数: %d)\n", topic, len(partitions)))
} else {
output.WriteString(fmt.Sprintf(" %s\n", topic))
}
}
}
// 获取消费者组信息
if groups, err := p.getConsumerGroups(client); err == nil && len(groups) > 0 {
output.WriteString(fmt.Sprintf("\n[消费者组] (共%d个)\n", len(groups)))
for i, group := range groups {
if i >= 5 { // 限制显示前5个组
output.WriteString(" ... (更多消费者组)\n")
break
}
output.WriteString(fmt.Sprintf(" %s\n", group))
}
}
// 尝试生产消息测试(如果有写权限)
if err := p.testProduceMessage(client, "fscan-test-topic"); err == nil {
output.WriteString(fmt.Sprintf("\n[权限测试] ✅ 成功发送测试消息\n"))
} else {
output.WriteString(fmt.Sprintf("\n[权限测试] ❌ 无生产者权限: %v\n", err))
}
common.LogSuccess(fmt.Sprintf("Kafka利用完成: %s", target))
return &ExploitResult{
Success: true,
Output: output.String(),
}
}
// testUnauthorizedAccess 测试未授权访问
func (p *KafkaPlugin) testUnauthorizedAccess(ctx context.Context, info *common.HostInfo) *ScanResult {
// 尝试无认证连接
emptyCred := Credential{Username: "", Password: ""}
if client := p.testCredential(ctx, info, emptyCred); client != nil {
client.Close()
return &ScanResult{
Success: true,
Service: "kafka",
Banner: "未授权访问",
}
}
return nil
}
// testCredential 测试单个凭据 - 返回Kafka客户端或nil
func (p *KafkaPlugin) testCredential(ctx context.Context, info *common.HostInfo, cred Credential) sarama.Client {
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
timeout := time.Duration(common.Timeout) * time.Second
// 创建Kafka配置
config := sarama.NewConfig()
config.Net.DialTimeout = timeout
config.Net.ReadTimeout = timeout
config.Net.WriteTimeout = timeout
config.Net.TLS.Enable = false
config.Version = sarama.V2_0_0_0
// 如果提供了用户名密码设置SASL认证
if cred.Username != "" || cred.Password != "" {
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Net.SASL.User = cred.Username
config.Net.SASL.Password = cred.Password
config.Net.SASL.Handshake = true
}
brokers := []string{target}
// 使用Context控制超时
type kafkaResult struct {
client sarama.Client
err error
}
clientChan := make(chan kafkaResult, 1)
go func() {
// 尝试创建客户端
client, err := sarama.NewClient(brokers, config)
select {
case <-ctx.Done():
if client != nil {
client.Close()
}
case clientChan <- kafkaResult{client, err}:
}
}()
// 等待客户端创建或超时
select {
case result := <-clientChan:
if result.err != nil {
return nil
}
return result.client
case <-ctx.Done():
return nil
}
}
// getConsumerGroups 获取消费者组列表
func (p *KafkaPlugin) getConsumerGroups(client sarama.Client) ([]string, error) {
// 创建协调器客户端获取消费者组信息
brokers := client.Brokers()
if len(brokers) == 0 {
return nil, fmt.Errorf("没有可用的broker")
}
broker := brokers[0] // 使用第一个broker
// 打开broker连接
if err := broker.Open(client.Config()); err != nil {
return nil, err
}
defer broker.Close()
// 发送ListGroups请求
request := &sarama.ListGroupsRequest{}
response, err := broker.ListGroups(request)
if err != nil {
return nil, err
}
groups := make([]string, 0, len(response.Groups))
for groupId := range response.Groups {
groups = append(groups, groupId)
}
return groups, nil
}
// testProduceMessage 测试发送消息
func (p *KafkaPlugin) testProduceMessage(client sarama.Client, topic string) error {
config := client.Config()
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
return err
}
defer producer.Close()
// 发送测试消息
message := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder("FScan security test message"),
}
_, _, err = producer.SendMessage(message)
return err
}
// identifyService 服务识别 - 检测Kafka服务
func (p *KafkaPlugin) identifyService(ctx context.Context, info *common.HostInfo) *ScanResult {
target := fmt.Sprintf("%s:%s", info.Host, info.Ports)
// 尝试无认证连接
emptyCred := Credential{Username: "", Password: ""}
client := p.testCredential(ctx, info, emptyCred)
if client == nil {
return &ScanResult{
Success: false,
Service: "kafka",
Error: fmt.Errorf("无法连接到Kafka服务"),
}
}
defer client.Close()
// 获取集群信息作为banner
var banner string
if err := client.RefreshMetadata(); err == nil {
brokers := client.Brokers()
topics, _ := client.Topics()
banner = fmt.Sprintf("Kafka集群 (Brokers: %d, Topics: %d)", len(brokers), len(topics))
} else {
banner = "Kafka服务"
}
common.LogSuccess(i18n.GetText("kafka_service_identified", target, banner))
return &ScanResult{
Success: true,
Service: "kafka",
Banner: banner,
}
}
// init 自动注册插件
func init() {
RegisterPlugin("kafka", func() Plugin {
return NewKafkaPlugin()
})
}