package services import ( "context" "encoding/hex" "fmt" "net" "time" "github.com/shadow1ng/fscan/common" ) type SNMPPlugin struct { name string ports []int } func NewSNMPPlugin() *SNMPPlugin { return &SNMPPlugin{ name: "snmp", ports: []int{161, 162}, } } func (p *SNMPPlugin) Name() string { return p.name } func (p *SNMPPlugin) GetPorts() []int { return p.ports } func (p *SNMPPlugin) Scan(ctx context.Context, info *common.HostInfo) *ScanResult { target := fmt.Sprintf("%s:%s", info.Host, info.Ports) if common.DisableBrute { return p.identifyService(ctx, info) } credentials := GenerateCredentials("snmp") if len(credentials) == 0 { return &ScanResult{ Success: false, Service: "snmp", Error: fmt.Errorf("没有可用的测试凭据"), } } for _, cred := range credentials { if p.testCredential(ctx, info, cred) { common.LogSuccess(fmt.Sprintf("SNMP %s %s", target, cred.Username)) return &ScanResult{ Success: true, Service: "snmp", Username: cred.Username, Password: "", } } } return &ScanResult{ Success: false, Service: "snmp", Error: fmt.Errorf("未发现弱团体字符串"), } } func (p *SNMPPlugin) testCredential(ctx context.Context, info *common.HostInfo, cred Credential) bool { target := fmt.Sprintf("%s:%s", info.Host, info.Ports) conn, err := net.DialTimeout("udp", target, time.Duration(common.Timeout)*time.Second) if err != nil { return false } defer conn.Close() packet := p.buildSNMPGetRequest(cred.Username, "1.3.6.1.2.1.1.1.0") conn.SetWriteDeadline(time.Now().Add(time.Duration(common.Timeout) * time.Second)) if _, err := conn.Write(packet); err != nil { return false } conn.SetReadDeadline(time.Now().Add(time.Duration(common.Timeout) * time.Second)) response := make([]byte, 1024) n, err := conn.Read(response) if err != nil { return false } return p.isValidSNMPResponse(response[:n]) } func (p *SNMPPlugin) buildSNMPGetRequest(community, oid string) []byte { template := "302902010004067075626c6963a01c02020f7102010002010030113015060a2b060102010101000500" packet, err := hex.DecodeString(template) if err != nil { return nil } return packet } func (p *SNMPPlugin) isValidSNMPResponse(data []byte) bool { if len(data) < 10 { return false } return data[0] == 0x30 } func (p *SNMPPlugin) identifyService(ctx context.Context, info *common.HostInfo) *ScanResult { target := fmt.Sprintf("%s:%s", info.Host, info.Ports) cred := Credential{Username: "public", Password: ""} if p.testCredential(ctx, info, cred) { banner := "SNMP网络管理服务 (public团体可访问)" common.LogSuccess(fmt.Sprintf("SNMP %s %s", target, banner)) return &ScanResult{ Success: true, Service: "snmp", Banner: banner, } } conn, err := net.DialTimeout("udp", target, time.Duration(common.Timeout)*time.Second) if err != nil { return &ScanResult{ Success: false, Service: "snmp", Error: err, } } defer conn.Close() banner := "SNMP网络管理服务" common.LogSuccess(fmt.Sprintf("SNMP %s %s", target, banner)) return &ScanResult{ Success: true, Service: "snmp", Banner: banner, } } func init() { // 使用高效注册方式:直接传递端口信息,避免实例创建 RegisterPluginWithPorts("snmp", func() Plugin { return NewSNMPPlugin() }, []int{161, 162}) }