巡风源码的详细分析 | 申博官网
登录
  • 欢迎进入申博官网!
  • 如果您觉得申博官网对你有帮助,那么赶紧使用Ctrl+D 收藏申博官网并分享出去吧
  • 这里是申博官方网!
  • 申博官网是菲律宾sunbet官网品牌平台!
  • 申博开户专业品牌平台!

巡风源码的详细分析

申博_安全预警 申博 157次浏览 已收录 0个评论

媒介

由于一些须要,和抱着进修的目标,研读了下巡风这款相称优异的扫描器代码。

重要剖析了下两个扫描的模块,对web端没有跟进看,固然重点也在扫描的局部。

剖析的语句都以解释的情势标注在代码中了,由于才能有限,剖析中的缺乏和毛病迎接指出。

团体架构逻辑

文件构造

│  Config.py  # 设置装备摆设文件
│  README.md  # 申明文档
│  Run.bat  # Windows启动效劳
│  Run.py  # webserver
│  Run.sh    # Linux启动效劳,从新启动前需把历程先完毕掉
│
├─aider
│      Aider.py  # 辅佐考证剧本
│
├─db  # 初始数据库构造
│
├─masscan  # 内置编译好的Masscan顺序(CentOS win64实用),须要chmod+x给实行权限(root),若没法运用请自行编译装置。
├─nascan
│  │  NAScan.py # 收集资产信息抓取引擎
│  │
│  ├─lib
│  │      common.py 其他要领
│  │      icmp.py  # ICMP发送类
│  │      log.py  # 日记输出
│  │      mongo.py  # 数据库衔接
│  │      scan.py  # 扫描与辨认
│  │      start.py  # 线程掌握
│  │
│  └─plugin
│          masscan.py  # 挪用Masscan剧本
│
├─views
│  │  View.py  # web要求处置惩罚
│  │
│  ├─lib
│  │      Conn.py  # 数据库大众类
│  │      CreateExcel.py  # 表格处置惩罚
│  │      Login.py  # 权限考证
│  │      QueryLogic.py  # 查询语句剖析
│  │
│  ├─static #静态资本目次
│  │
│  └─templates #模板文件目次
│
└─vulscan
    │  VulScan.py  # 破绽检测引擎
    │
    └─vuldb # 破绽库目次

Run.sh

全部顺序的最先就是从Run.sh最先的,能够先来看下起了哪些效劳

#!/bin/bash
CURRENT_PATH=`dirname $0`
cd $CURRENT_PATH

XUNFENG_LOG=/var/log/xunfeng
XUNFENG_DB=/var/lib/mongodb

[ ! -d $XUNFENG_LOG ] && mkdir -p ${XUNFENG_LOG}
[ ! -d $XUNFENG_DB ] && mkdir -p ${XUNFENG_DB}

nohup mongod --port 65521 --dbpath=${XUNFENG_DB} --auth  > ${XUNFENG_LOG}/db.log &
nohup python ./Run.py > ${XUNFENG_LOG}/web.log & 
nohup python ./aider/Aider.py > ${XUNFENG_LOG}/aider.log & 
nohup python ./nascan/NAScan.py > ${XUNFENG_LOG}/scan.log & 
nohup python ./vulscan/VulScan.py > ${XUNFENG_LOG}/vul.log &

能够看到重要起了以下四个效劳

Run.py

from views.View import app

if __name__ == '__main__':
    #app.debug = True
    app.run(threaded=True, port=80,host='0.0.0.0')

webserver能够看出这个是flask起的web端,内里重若是做一些数据的展现和修正的。由于不是扫描器的重点,这里就不详细剖析了,能够本身看下代码。

Aider.py

辅佐考证剧本,一个50行摆布的单文件,运用socket完成了一个简朴的DNS log平台。

NAScan.py

收集资产信息抓取引擎 重若是挪用nascan这个模块来举行收集资产(存活主机、开辟端口、效劳)的扫描。

VulScan.py

破绽检测引擎 重若是挪用vulscan/vuldb中的poc举行破绽检测。

nascan

模块构造

─nascan
  │  NAScan.py # 收集资产信息抓取引擎
  │
  ├─lib
  │      common.py 其他要领
  │      icmp.py  # ICMP发送类
  │      log.py  # 日记输出
  │      mongo.py  # 数据库衔接
  │      scan.py  # 扫描与辨认
  │      start.py  # 线程掌握
  └─plugin
          masscan.py  # 挪用Masscan剧本

NAScan.py文件进口

# coding:utf-8
# author:wolf@YSRC
import thread
from lib.common import *
from lib.start import *
if __name__ == "__main__":
    try:
        CONFIG_INI = get_config()  # 读取设置装备摆设
        log.write('info', None, 0, u'猎取设置装备摆设胜利') # 日记纪录
        STATISTICS = get_statistics()  # 读取统计信息
        MASSCAN_AC = [0] # 标识符 masscan是不是在运用
        NACHANGE = [0] # 标识符 扫描列表是不是被转变
        thread.start_new_thread(monitor, (CONFIG_INI,STATISTICS,NACHANGE))  # 心跳线程
        thread.start_new_thread(cruise, (STATISTICS,MASSCAN_AC))  # 失效纪录删除线程
        socket.setdefaulttimeout(int(CONFIG_INI['Timeout']) / 2)  # 设置衔接超时
        ac_data = []
        while True:
            now_time = time.localtime()
            now_hour = now_time.tm_hour
            now_day = now_time.tm_mday
            now_date = str(now_time.tm_year) + str(now_time.tm_mon) + str(now_day)
            cy_day, ac_hour = CONFIG_INI['Cycle'].split('|')
            log.write('info', None, 0, u'扫描划定规矩: ' + str(CONFIG_INI['Cycle']))
            if (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data) or NACHANGE[0]:  # 推断是不是进入扫描时段
                ac_data.append(now_date)
                NACHANGE[0] = 0
                log.write('info', None, 0, u'最先扫描')
                s = start(CONFIG_INI)
                s.masscan_ac = MASSCAN_AC
                s.statistics = STATISTICS
                s.run() # 最先扫描
            time.sleep(60)
    except Exception, e:
        print e

预备事情

一最先是猎取设置装备摆设信息

def get_config():
    config = {}
    # 从mongodb中读取`nascan`的设置装备摆设,能够从navicat中看到Config鸠合中有`vulscan`和`nascan`的扫描设置装备摆设
    config_info = mongo.na_db.Config.find_one({"type": "nascan"})
    for name in config_info['config']:
        # 关于cms辨认、组件容器、动态言语、效劳 的设置装备摆设存储是运用`|`举行支解存储的
        # 以是在掏出之前要举行简朴的花样化然后放到设置装备摆设中
        if name in ['Discern_cms', 'Discern_con', 'Discern_lang', 'Discern_server']:
            config[name] = format_config(name, config_info['config'][name]['value'])
        else:
            config[name] = config_info['config'][name]['value']
    return config

巡风源码的详细分析

巡风源码的详细分析

然后是举行日记纪录

# coding:utf-8
import threading
import time
import sys
reload(sys)
sys.setdefaultencoding('utf8')
mutex = threading.Lock() # 线程互斥锁
def write(scan_type, host, port, info):
    mutex.acquire() # 上锁,制止多个历程输出,致使花样杂沓
    port = int(port)
    try:  # 由于Run.sh中运用了nohup,以是`print`的输出会被输出到log文件中
        time_str = time.strftime('%X', time.localtime(time.time()))
        if scan_type == 'portscan':
            print "[%s] %s:%d open" % (time_str, host, port)
        elif scan_type == 'server':
            print "[%s] %s:%d is %s" % (time_str, host, port, str(info))
        elif scan_type == 'web':
            print "[%s] %s:%d is web" % (time_str, host, port)
            print "[%s] %s:%d web info %s" % (time_str, host, port, info)
        elif scan_type == 'active':
            print "[%s] %s active" % (time_str, host)
        elif scan_type == 'info':
            print "[%s] %s" % (time_str, info)
    except Exception, e:
        print 'logerror',e
        pass
    mutex.release()

以后举行读取统计信息

def get_statistics():
    date_ = datetime.datetime.now().strftime('%Y-%m-%d')
    # 猎取当日的统计信息
    now_stati = mongo.na_db.Statistics.find_one({"date": date_}) 
    if not now_stati:
        # 没有当日的信息则返回一个初始统计信息
        now_stati = {date_: {"add": 0, "update": 0, "delete": 0}}
        return now_stati
    else:
        # 有则返回
        return {date_: now_stati['info']}

两个监测线程

以后启动了两个现场,离别对应分歧的功用

thread.start_new_thread(monitor, (CONFIG_INI,STATISTICS,NACHANGE))  # 心跳线程
thread.start_new_thread(cruise, (STATISTICS,MASSCAN_AC))  # 失效纪录删除线程

monitor

monitor心跳线程,重要用于推断扫描设置装备摆设是不是发生了转变

def monitor(CONFIG_INI, STATISTICS, NACHANGE):
    while True:
        try:
            time_ = datetime.datetime.now()
            date_ = time_.strftime('%Y-%m-%d')
            # 纪录心跳
            mongo.na_db.Heartbeat.update({"name": "heartbeat"}, {"$set": {"up_time": time_}})
            if date_ not in STATISTICS: STATISTICS[date_] = {"add": 0, "update": 0, "delete": 0}
            # 更新统计信息
            mongo.na_db.Statistics.update({"date": date_}, {"$set": {"info": STATISTICS[date_]}}, upsert=True)
            new_config = get_config() # 猎取最新设置装备摆设
            # 对照设置装备摆设扫描列表的base64是不是雷同,分歧则置NACHANGE[0]为1
            if base64.b64encode(CONFIG_INI["Scan_list"]) != base64.b64encode(new_config["Scan_list"]):NACHANGE[0] = 1
            CONFIG_INI.clear() 
            CONFIG_INI.update(new_config) # 更新新设置装备摆设
        except Exception, e:
            print e
        time.sleep(30) # 每30秒检测一次

回到NAScan.py中能够看到

# 推断是不是达到了一个扫描的周期,或许心跳线程是不是检测到扫描列表更新
# 由于上面能够看到base64分歧时会将NACHANGE[0]置于1
# 至于为何要传入NACHANGE[0]如许一个列表,而不是一个flag的int值(由于列表是援用啊!
if (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data) or NACHANGE[0]:

cruise

然后是cruise 失效纪录删除线程

def cruise(STATISTICS,MASSCAN_AC):
    while True:
        now_str = datetime.datetime.now()
        week = int(now_str.weekday())
        hour = int(now_str.hour)
        if week >= 1 and week <= 5 and hour >= 9 and hour <= 18:  # 非事情时候不删除
            try:
                # 猎取扫描信息纪录
                data = mongo.NA_INFO.find().sort("time", 1)
                for history_info in data:
                    while True:
                        # 若是masscan正在扫描即不举行清算
                        # 在后期能够看到在用masscan举行扫描的时刻会置1
                        if MASSCAN_AC[0]:  
                            time.sleep(10)
                        else:
                            break
                    ip = history_info['ip']
                    port = history_info['port']
                    try:
                        # 检测端口是不是存活
                        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                        sock.connect((ip, int(port)))
                        sock.close()
                    except Exception, e:
                        time_ = datetime.datetime.now()
                        date_ = time_.strftime('%Y-%m-%d')
                        # 不存活则删除改纪录
                        mongo.NA_INFO.remove({"ip": ip, "port": port})
                        log.write('info', None, 0, '%s:%s delete' % (ip, port)) # 日记纪录
                        STATISTICS[date_]['delete'] += 1
                        del history_info["_id"]
                        history_info['del_time'] = time_
                        history_info['type'] = 'delete'
                        # 增添一条操纵汗青
                        mongo.NA_HISTORY.insert(history_info)
            except:
                pass
        time.sleep(3600) # 60分钟检测一次

start.py

回到NAScan.py, 前期的一些事情已做完了,背面就能够进入while True的扫描轮回了

now_time = time.localtime()
now_hour = now_time.tm_hour
now_day = now_time.tm_mday
now_date = str(now_time.tm_year) + str(now_time.tm_mon) + str(now_day)
# 猎取资产探测周期
cy_day, ac_hour = CONFIG_INI['Cycle'].split('|')
log.write('info', None, 0, u'扫描划定规矩: ' + str(CONFIG_INI['Cycle']))
if (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data) or NACHANGE[0]:  # 推断是不是进入扫描时段
    ac_data.append(now_date) # 推断是不是扫描过的列表
    NACHANGE[0] = 0 # 置0,
    log.write('info', None, 0, u'最先扫描')
    s = start(CONFIG_INI)
    s.masscan_ac = MASSCAN_AC
    s.statistics = STATISTICS
    s.run() # 最先扫描
    time.sleep(60)

s = start(CONFIG_INI)初始化了一个start

class start:
    def __init__(self, config):  
        # 传入CONFIG_INI 设置装备摆设,然后设置类的属性
        self.config_ini = config
        self.queue = Queue.Queue()
        self.thread = int(self.config_ini['Thread'])
        self.scan_list = self.config_ini['Scan_list'].split('\n')
        self.mode = int(self.config_ini['Masscan'].split('|')[0])
        self.icmp = int(self.config_ini['Port_list'].split('|')[0])
        self.white_list = self.config_ini.get('White_list', '').split('\n')

然后返来分外设置了masscan_acstatistics两个援用标识符(由于要与其他线程同享对它的修正,相称于全局变量

然后启动s.run()最先扫描

def run(self):
    # 在start.py中界说的全局变量,端口列表
    global AC_PORT_LIST
    all_ip_list = []
    for ip in self.scan_list:
        # 处置惩罚CIDR花样的ip, eg:192.168.0.1/24
        # 就不详细跟进看了,约莫40行摆布,触及一些位运算花样转换啥的
        if "/" in ip: ip = cidr.CIDR(ip)
        if not ip:continue
        # 处置惩罚 192.168.0.1-192.168.0.255 这类局限ip
        ip_list = self.get_ip_list(ip)
        # 关于白名单ip举行移除
        for white_ip in self.white_list:
            if white_ip in ip_list:
                ip_list.remove(white_ip)
       # 是不是最先了masscan扫描,开启了mode置为1,不然为0             
       if self.mode == 1: # 运用masscan扫描
            # 猎取文件途径
            self.masscan_path = self.config_ini['Masscan'].split('|')[2]
            # 猎取扫描速度
            self.masscan_rate = self.config_ini['Masscan'].split('|')[1]
            # 猎取存活的ip
            ip_list = self.get_ac_ip(ip_list)
            self.masscan_ac[0] = 1
            AC_PORT_LIST = self.masscan(ip_list)  # 若是装置了Masscan即运用Masscan举行全端口扫描
            if not AC_PORT_LIST: continue
            self.masscan_ac[0] = 0
            for ip_str in AC_PORT_LIST.keys(): self.queue.put(ip_str)  # 到场行列
            self.scan_start()  # 最先扫描
        else:
            all_ip_list.extend(ip_list)
    if self.mode == 0: # 不运用masscan扫描
        if self.icmp: all_ip_list = self.get_ac_ip(all_ip_list)
        for ip_str in all_ip_list: self.queue.put(ip_str)  # 到场行列
        self.scan_start()  # TCP探测形式最先扫描

探测存活ip

self.get_ac_ip()是经由历程ping要求来探测主机存活,后期只对存活主机举行扫描

def get_ac_ip(self, ip_list):
    try:
        s = icmp.Nscan()
        ipPool = set(ip_list)
        return s.mPing(ipPool)
    except Exception, e:
        print 'The current user permissions unable to send icmp packets'
        return ip_list

跟到s.mPing()

def mPing(self, ipPool):
    # 取得icmp的socket
    Sock = self.__icmpSocket
    Sock.settimeout(self.timeout)
    # 设置icmp数据报
    packet = self.__icmpPacket
    recvFroms = set()
    # 初始化一个多线程的icmp要求类
    sendThr = SendPingThr(ipPool, packet, Sock, self.timeout)
    # 启动多线程icmp扫描
    sendThr.start()
    while True:
        try:
            # 猎取返回的ip地点
            ac_ip = Sock.recvfrom(1024)[1][0]
            if ac_ip not in recvFroms:
                log.write("active", ac_ip, 0, None)
                # 增添存活ip到`recvForms`
                recvFroms.add(ac_ip)
        except Exception:
            pass
        finally:
            if not sendThr.isAlive():
                break
    # 返回两个鸠合的交集
    return recvFroms & ipPool

SendPingThr

class SendPingThr(threading.Thread):
    def __init__(self, ipPool, icmpPacket, icmpSocket, timeout=3):
        threading.Thread.__init__(self)
        self.Sock = icmpSocket
        self.ipPool = ipPool
        self.packet = icmpPacket
        self.timeout = timeout
        self.Sock.settimeout(timeout + 1)

    def run(self):
        for ip in self.ipPool:
            try:
                self.Sock.sendto(self.packet, (ip, 0))
            except socket.timeout:
                break
            except:
                pass
        time.sleep(self.timeout)

masscan扫描全端口

如许就顺次将存活的ip返回到了start.py中的run()

# 猎取到返回的存活ip
ip_list = self.get_ac_ip(ip_list)
# 将masscan_ac[0]置1,透露表现masscan正在运用
self.masscan_ac[0] = 1
# 应用masscan举行全端口扫描
AC_PORT_LIST = self.masscan(ip_list) 
if not AC_PORT_LIST: continue
# 将masscan_ac[0]置0
self.masscan_ac[0] = 0
for ip_str in AC_PORT_LIST.keys(): self.queue.put(ip_str)  # 到场行列
self.scan_start()  # 最先扫描

跟进self.masscan()函数

def masscan(self, ip):
    try:
        if len(ip) == 0: return
        sys.path.append(sys.path[0] + "/plugin")
        m_scan = __import__("masscan")
        result = m_scan.run(ip, self.masscan_path, self.masscan_rate)
        return result
    except Exception, e:
        print e
        print 'No masscan plugin detected'

跟进m_scan.run()

import os
def run(ip_list,path,rate):
    try:
        ip_file = open('target.log','w')
        # 将存活的ip列表写到target.log中
        ip_file.write("\n".join(ip_list))
        ip_file.close()
        # 举行过滤一些风险字符
        #(issue中也有提到,并不能完整包管背景的平安,重要照样包管对密钥的治理
        path = str(path).translate(None, ';|&`\n')
        rate = str(rate).translate(None, ';|&`\n')
        if not os.path.exists(path):return
        # 用体系敕令举行masscan全端口扫描
        os.system("%s -p1-65535 -iL target.log -oL tmp.log --randomize-hosts --rate=%s"%(path,rate))
        # 读取扫描效果
        result_file = open('tmp.log', 'r')
        result_json = result_file.readlines()
        result_file.close()
        del result_json[0]
        del result_json[-1]
        open_list = {}
        # 对扫描效果举行花样化处置惩罚
        for res in result_json:
            try:
                ip = res.split()[3]
                port = res.split()[2]
                if ip in open_list:
                    open_list[ip].append(port)
                else:
                    open_list[ip] = [port]
            except:pass
        os.remove('target.log')
        os.remove('tmp.log')
        # 返回扫描效果
        return open_list
    except:
        pass

如许,再次回到start.pyrun()

# 用masscan举行全端口扫描
AC_PORT_LIST = self.masscan(ip_list)
if not AC_PORT_LIST: continue
# 将self.masscan_ac[0]置0,透露表现完毕运用
self.masscan_ac[0] = 0
# 将扫描效果存入行列中
for ip_str in AC_PORT_LIST.keys(): self.queue.put(ip_str)
# 最先扫描
self.scan_start()

scan.py

前期预备

self.scan_start()

def scan_start(self):
    for i in range(self.thread):  # 最先扫描
        t = ThreadNum(self.queue)
        t.setDaemon(True)
        t.mode = self.mode
        t.config_ini = self.config_ini
        t.statistics = self.statistics
        t.start()
    self.queue.join()

跟进ThreadNum

class ThreadNum(threading.Thread):
    def __init__(self, queue):
        # 赋值扫描行列
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            try:
                # 非壅塞形式
                task_host = self.queue.get(block=False)
            except:
                break
            try:
                if self.mode:
                    # 开启masscan扫描则运用扫描出的存活端口
                    port_list = AC_PORT_LIST[task_host]
                else:
                    # 不然扫描特定的端口
                    port_list = self.config_ini['Port_list'].split('|')[1].split('\n')
                _s = scan.scan(task_host, port_list) # 初始化scan
                _s.config_ini = self.config_ini  # 供应设置装备摆设信息
                _s.statistics = self.statistics  # 供应统计信息
                _s.run() # 启动
            except Exception, e:
                print e
            finally:
                self.queue.task_done()

跟到scan类的run()要领中

def run(self):
    self.timeout = int(self.config_ini['Timeout']) # 猎取timeout
    for _port in self.port_list:
        self.server = ''
        self.banner = ''
        self.port = int(_port)
        self.scan_port()  # 端口扫描
        if not self.banner:continue #无banner则跳过(`NULL`透露表现暂未检测出,不会continue
        self.server_discern()  # 效劳辨认
        if self.server == '':
            web_info = self.try_web()  # 实验web接见
            if web_info:
                # log纪录
                log.write('web', self.ip, self.port, web_info)
                time_ = datetime.datetime.now()
                # 将扫描效果存入mongodb
                mongo.NA_INFO.update({'ip': self.ip, 'port': self.port},
                                     {"$set": {'banner': self.banner, 'server': 'web', 'webinfo': web_info,
                                               'time': time_}})

端口扫描

先是举行了self.scan_port()端口扫描

def scan_port(self):
    try:
        # 举行socket衔接
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.connect((self.ip, self.port))
        time.sleep(0.2)
    except Exception, e:
        return
    try:
        # 猎取banner信息
        self.banner = sock.recv(1024)
        sock.close()
        # 小于即是2则置为'NULL'
        if len(self.banner) <= 2:
            self.banner = 'NULL'
    except Exception, e:
        # 异常情况也置为'NULL'
        self.banner = 'NULL'
    # 日记纪录
    log.write('portscan', self.ip, self.port, None)
    banner = ''
    hostname = self.ip2hostname(self.ip)
    time_ = datetime.datetime.now()
    date_ = time_.strftime('%Y-%m-%d')
    try:
        # 举行unicode转换
        banner = unicode(self.banner, errors='replace')
        if self.banner == 'NULL': banner = ''
        # 增添一条info信息
        mongo.NA_INFO.insert({"ip": self.ip, "port": self.port, "hostname": hostname, "banner": banner, "time": time_})
        # 统计信息+1
        self.statistics[date_]['add'] += 1
    except:
        if banner:
            # 原子操纵,删除已存在的纪录
            history_info = mongo.NA_INFO.find_and_modify(
                query={"ip": self.ip, "port": self.port, "banner": {"$ne": banner}}, remove=True)
            if history_info:
                # 新增info纪录
                mongo.NA_INFO.insert(
                    {"ip": self.ip, "port": self.port, "hostname": hostname, "banner": banner, "time": time_})
                # 统计信息+1
                self.statistics[date_]['update'] += 1
                # 删除本来的_id
                del history_info["_id"]
                history_info['del_time'] = time_
                history_info['type'] = 'update'
                # 更新type和del_time以后插进去一条新汗青纪录
                mongo.NA_HISTORY.insert(history_info)

举行socket衔接的时刻,比方一些ssh之类的效劳,会返回一些banner信息

巡风源码的详细分析

————————————-

申博网络安全巴士站

申博-网络安全巴士站是一个专注于网络安全、系统安全、互联网安全、信息安全,全新视界的互联网安全新媒体。

————————————-

效劳辨认

def server_discern(self):
    # 先实验举行应用设置装备摆设中的`Discern_server`举行疾速婚配辨认
    for mark_info in self.config_ini['Discern_server']: 
        try:
            name, default_port, mode, reg = mark_info
            if mode == 'default':
                # default透露表现用特定端口,婚配特定效劳
                if int(default_port) == self.port:
                    self.server = name
            elif mode == 'banner':
                # 应用banner信息举行正则婚配检测
                matchObj = re.search(reg, self.banner, re.I | re.M)
                if matchObj:
                    self.server = name
            if self.server:break
        except:
            continue
    # 关于未检测出效劳并且端口不为80、443、8080的端口举行检测
    if not self.server and self.port not in [80,443,8080]:
        for mark_info in self.config_ini['Discern_server']:  # 发包辨认
            try:
                name, default_port, mode, reg = mark_info
                if mode not in ['default','banner']:
                    # 举行发送特定的socket包猎取banner信息,举行再次婚配
                    dis_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    dis_sock.connect((self.ip, self.port))
                    mode = mode.decode('string_escape')
                    reg = reg.decode('string_escape')
                    dis_sock.send(mode)
                    time.sleep(0.3)
                    dis_recv = dis_sock.recv(1024)
                    dis_sock.close()
                    matchObj = re.search(reg, dis_recv, re.I | re.M)
                    if matchObj:
                        self.server = name
                        break
            except:
                pass
    if self.server:
        # 关于检测到的效劳,举行log和info的纪录
        log.write("server", self.ip, self.port, self.server)
        mongo.NA_INFO.update({"ip": self.ip, "port": self.port}, {"$set": {"server": self.server}})

config_ini['Discern_server']中的值

巡风源码的详细分析

config_ini['Discern_server']中的特定socket数据包

巡风源码的详细分析

web接见

def try_web(self):
    title_str, html = '', ''
    try:# 举行http/https要求,猎取相应报文
        if self.port == 443:
            # 关于443端口的运用https协定
            info = urllib2.urlopen("https://%s:%s" % (self.ip, self.port), timeout=self.timeout)
        else:
            info = urllib2.urlopen("http://%s:%s" % (self.ip, self.port), timeout=self.timeout)
        html = info.read()
        header = info.headers
    except urllib2.HTTPError, e:
        html = e.read()
        header = e.headers
    except:
        return
    if not header: return
    # 关于gzip花样的相应,举行解压gzip
    if 'Content-Encoding' in header and 'gzip' in header['Content-Encoding']: 
        html_data = StringIO.StringIO(html)
        gz = gzip.GzipFile(fileobj=html_data)
        html = gz.read()
    try:
        # 花样转码
        html_code = self.get_code(header, html).strip()
        if html_code and len(html_code) < 12:
            html = html.decode(html_code).encode('utf-8')
    except: pass
    try:
        # 猎取titile信息
        title = re.search(r'<title>(.*?)</title>', html, flags=re.I | re.M)
        if title: title_str = title.group(1)
    except: pass
    try:
        # 将相应的http报文设置成banner信息
        web_banner = str(header) + "\r\n\r\n" + html
        self.banner = web_banner
        # 增添纪录
        history_info = mongo.NA_INFO.find_one({"ip": self.ip, "port": self.port})
        if 'server' not in history_info:
            tag = self.get_tag()
            web_info = {'title': title_str, 'tag': tag}
            return web_info
        else:
            if abs(len(history_info['banner'].encode('utf-8')) - len(web_banner)) > len(web_banner) / 60:
                del history_info['_id']
                history_info['del_time'] = datetime.datetime.now()
                mongo.NA_HISTORY.insert(history_info)
                tag = self.get_tag()
                web_info = {'title': title_str, 'tag': tag}
                date_ = datetime.datetime.now().strftime('%Y-%m-%d')
                self.statistics[date_]['update'] += 1
                log.write('info', None, 0, '%s:%s update web info'%(self.ip, self.port))
                return web_info
    except:
        return

get_tag()

def get_tag(self):
    try:
        url = self.ip + ':' + str(self.port)
        # 对web效劳举行cms、组件容器、动态言语的辨认
        tag = map(self.discern, ['Discern_cms', 'Discern_con', 'Discern_lang'], [url, url, url])
        # 过滤掉未辨认出的效劳
        return filter(None, tag)
    except Exception, e:
        return

discern()

def discern(self, dis_type, domain):
    file_tmp = {}
    if int(domain.split(":")[1]) == 443: # http/https处置惩罚
        protocol = "https://"
    else:
        protocol = "http://"
    try:
        # http要求
        req = urllib2.urlopen(protocol + domain, timeout=self.timeout)
        header = req.headers
        html = req.read()
    except urllib2.HTTPError, e:
        html = e.read()
        header = e.headers
    except Exception, e:
        return

    # 关于'Discern_cms', 'Discern_con', 'Discern_lang'在数据库中都有本身的辨认推断体式格局
    for mark_info in self.config_ini[dis_type]:
        if mark_info[1] == 'header':
            try:
                if not header: return
                # 经由历程header体式格局则对对应的http头中的值举行婚配
                # 如存在PHPSSIONID之类的值判定为php
                if re.search(mark_info[3], header[mark_info[2]], re.I):
                    return mark_info[0]
            except Exception, e:
                continue
        elif mark_info[1] == 'file':
            if mark_info[2] == 'index':
                try:
                    if not html: return
                    # 关于file index体式格局应用文件后缀,如1.php如许推断为php言语
                    if re.search(mark_info[3], html, re.I):
                        return mark_info[0]
                except Exception, e:
                    continue
            else:
                # 防备反复检测
                if mark_info[2] in file_tmp:
                    re_html = file_tmp[mark_info[2]]
                else:
                    # 接见指定的robots.txt之类的文件
                    try:
                        re_html = urllib2.urlopen(protocol + domain + "/" + mark_info[2],
                                                  timeout=self.timeout).read()
                    except urllib2.HTTPError, e:
                        re_html = e.read()
                    except Exception, e:
                        return
                    file_tmp[mark_info[2]] = re_html
                try:
                    # 检测指定文件中是不是存在特定关键字
                    # 如robots.txt中存在'php168'则为php168cms
                    if re.search(mark_info[3], re_html, re.I):
                        return mark_info[0]
                except Exception, e:
                    print mark_info[3]

config_ini[Discern_lang]中的值

巡风源码的详细分析

config_ini[Discern_cms]中的值

巡风源码的详细分析

末了回到run()

web_info = self.try_web()  # 实验web接见
if web_info:
    # 检测完web特性以后,就是举行简朴的log纪录,和更新数据库中info的值
    log.write('web', self.ip, self.port, web_info)
    time_ = datetime.datetime.now()
    mongo.NA_INFO.update({'ip': self.ip, 'port': self.port},
                         {"$set": {'banner': self.banner, 'server': 'web', 'webinfo': web_info,
                                   'time': time_}})

到这里,scan的扫描也就完毕了,回到start类的run()中,剩下的就是不运用masscan的扫描

if self.mode == 0: # 不运用masscan扫描
    # 若是设置了icmp检测,会对ip列表举行存活检测,只扫描存活ip
    if self.icmp: all_ip_list = self.get_ac_ip(all_ip_list)
    for ip_str in all_ip_list: self.queue.put(ip_str)  # 到场行列
    self.scan_start()  # TCP探测形式最先扫描

这里的扫描历程中将ip列表改成了all_ip_list,其他的扫描历程也是经由历程scan_start()来挪用scan类举行扫描。

到这里,全部NAScan资产扫描历程也就完成了,每次扫描完会sleep60秒,然后再次轮回这个历程。

vulscan

用于对扫出的资产举行破绽扫描,详细的扫描历程依赖于vuldb中的插件情势举行扫描,做到可插拔的形式

json花样的插件

巡风源码的详细分析

转换成json情势后就是

{
    "name" : "Axis2信息泄漏",
    "info" : "HappyAxis.jsp 页面存在体系敏感信息。",
    "level" : "低危",
    "type" : "信息泄漏",
    "author" : "wolf@YSRC",
    "url": "",
    "keyword" : "tag:axis2",
    "source" : 1,
    "plugin" : {
        "url" : "/axis2/axis2-web/HappyAxis.jsp",
        "tag" : "敏感信息泄漏",
        "analyzing" : "keyword",
        "analyzingdata" : "Axis2 Happiness Page",
        "data" : "",
        "method" : "GET"
    }
}

python剧本花样的插件

# coding:utf-8

import ftplib

def get_plugin_info():  # 插件形貌信息
    plugin_info = {
        "name": "FTP弱口令",
        "info": "致使敏感信息泄漏,严重情况可致使效劳器被入侵掌握。",
        "level": "高危",
        "type": "弱口令",
        "author": "wolf@YSRC",
        "url": "",
        "keyword": "server:ftp",  # 引荐搜刮关键字
    }
    return plugin_info

def check(ip, port, timeout): # 破绽检测代码
    user_list = ['ftp', 'www', 'admin', 'root', 'db', 'wwwroot', 'data', 'web']
    for user in user_list:
        for pass_ in PASSWORD_DIC:  # 暗码字典无需界说,顺序会自动为其赋值。
            pass_ = str(pass_.replace('{user}', user))
            try:
                ftp = ftplib.FTP()
                ftp.timeout = timeout
                ftp.connect(ip, port)
                ftp.login(user, pass_)
                if pass_ == '': pass_ = 'null'
                if user == 'ftp' and pass_ == 'ftp': return u"可匿名登录"
                return u"存在弱口令,账号:%s,暗码:%s" % (user, pass_)  # 胜利返回效果,内容显现在扫描效果页面。
            except:
                pass

扫描历程较资产扫描偏简朴些,一个280行摆布的单文件

一最先界说了一些全局变量

# 增添体系途径
sys.path.append(sys.path[0] + '/vuldb')
sys.path.append(sys.path[0] + "/../")
# 猎取mongodb账号设置装备摆设
from Config import ProductionConfig
# 举行mongodb认证衔接
db_conn = pymongo.MongoClient(ProductionConfig.DB, ProductionConfig.PORT)
na_db = getattr(db_conn, ProductionConfig.DBNAME)
na_db.authenticate(ProductionConfig.DBUSERNAME, ProductionConfig.DBPASSWORD)
# 做了几个鸠合的简化操纵
na_task = na_db.Task
na_result = na_db.Result
na_plugin = na_db.Plugin
na_config = na_db.Config
na_heart = na_db.Heartbeat
# 线程锁
lock = thread.allocate()
# 一些全局变量
PASSWORD_DIC = []
THREAD_COUNT = 50
TIMEOUT = 10
PLUGIN_DB = {}
TASK_DATE_DIC = {}
WHITE_LIST = []

然后最先运转流程

if __name__ == '__main__':
    init() # 举行init初始化操纵
    PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config() # 猎取设置装备摆设
    thread.start_new_thread(monitor, ()) # 启动监控线程
    while True:
        task_id, task_plan, task_target, task_plugin = queue_get() # 义务信息猎取
        if task_id == '':
            time.sleep(10)
            continue
        if PLUGIN_DB:
            del sys.modules[PLUGIN_DB.keys()[0]] # 清算插件缓存
            PLUGIN_DB.clear()
        for task_netloc in task_target:
            while True:
                if int(thread._count()) < THREAD_COUNT:
                    if task_netloc[0] in WHITE_LIST: break
                    thread.start_new_thread(vulscan, (task_id, task_netloc, task_plugin))
                    break
                else:
                    time.sleep(2)
        if task_plan == 0: na_task.update({"_id": task_id}, {"$set": {"status": 2}})

预备事情

init

用于猎取插件的信息

def init():
    # 若数据库中存在插件信息,则直接返回,不然从新猎取
    if na_plugin.find().count() >= 1: return
    script_plugin = []
    json_plugin = []
    # 猎取vuldb中的插件
    file_list = os.listdir(sys.path[0] + '/vuldb')
    time_ = datetime.datetime.now()
    for filename in file_list:
        try:
            # 插件分为json和py两种花样
            if filename.split('.')[1] == 'py':
                script_plugin.append(filename.split('.')[0])
            if filename.split('.')[1] == 'json':
                json_plugin.append(filename)
        except:
            pass
    for plugin_name in script_plugin:
        try:
            # py花样的插件直接导入,然后读取关于变量,插进去到mongodb中
            res_tmp = __import__(plugin_name)
            plugin_info = res_tmp.get_plugin_info()
            plugin_info['add_time'] = time_
            plugin_info['filename'] = plugin_name
            plugin_info['count'] = 0
            na_plugin.insert(plugin_info)
        except:
            pass
    for plugin_name in json_plugin:
        try:
            # json花样的插件,用json剖析后读取对应变量,插进去到mongodb中
            json_text = open(sys.path[0] + '/vuldb/' + plugin_name, 'r').read()
            plugin_info = json.loads(json_text)
            plugin_info['add_time'] = time_
            plugin_info['filename'] = plugin_name
            plugin_info['count'] = 0
            del plugin_info['plugin']
            na_plugin.insert(plugin_info)
        except:
            pass

get_config

def get_config():
    try:
        config_info = na_config.find_one({"type": "vulscan"})
        pass_row = config_info['config']['Password_dic']
        thread_row = config_info['config']['Thread']
        timeout_row = config_info['config']['Timeout']
        white_row = config_info['config']['White_list']
        password_dic = pass_row['value'].split('\n')
        thread_count = int(thread_row['value'])
        timeout = int(timeout_row['value'])
        white_list = white_row['value'].split('\n')
        return password_dic, thread_count, timeout, white_list
    except Exception, e:
        print e

和之前nascan中的读取设置装备摆设相似,只是这回读的是type为vulscan的设置装备摆设

读取弱口令、线程数、timeout、白名单之类的设置装备摆设参数,然后返回

monitor

新起了个monitor监测线程,监测插件的运用情况

def monitor():
    # 引入全局变量
    global PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST
    while True:
        # 猎取正在实行的义务
        queue_count = na_task.find({"status": 0, "plan": 0}).count()
        if queue_count:
            # 若是有正在实行的义务,则置为1
            load = 1
        else:
            # 不然依据以后线程数,来推断插件是不是在被运用
            ac_count = thread._count()
            load = float(ac_count - 4) / THREAD_COUNT
        if load > 1: load = 1
        if load < 0: load = 0
        # 更新mongodb中的heatbeat鸠合,有插件正在扫描
        na_heart.update({"name": "load"}, {"$set": {"value": load, "up_time": datetime.datetime.now()}})
        PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config()
        # 然后依据load值举行分歧时候的休眠
        if load > 0:
            time.sleep(8)
        else:
            time.sleep(60)

然后进入到while True的轮回

经由历程queue_get()举行义务参数的猎取

def queue_get():
    global TASK_DATE_DIC
    # 猎取未加载的task,更新为启动状况
    task_req = na_task.find_and_modify(query={"status": 0, "plan": 0}, update={"$set": {"status": 1}}, sort={'time': 1})
    if task_req:
        # 若是存在,在TASK_DATE_DIC纪录task,然后返回义务信息
        TASK_DATE_DIC[str(task_req['_id'])] = datetime.datetime.now()
        return task_req['_id'], task_req['plan'], task_req['target'], task_req['plugin']
    else:
        # 猎取 plan != 0 的task列表
        task_req_row = na_task.find({"plan": {"$ne": 0}})
        if task_req_row:
            for task_req in task_req_row:
                # 推断是不是须要再次启动义务
                if (datetime.datetime.now() - task_req['time']).days / int(task_req['plan']) >= int(task_req['status']):
                    if task_req['isupdate'] == 1:
                        # 义务更新后,须要从新从info鸠合中猎取ip和port
                        # 更新task鸠合的target
                        task_req['target'] = update_target(json.loads(task_req['query']))
                        na_task.update({"_id": task_req['_id']}, {"$set": {"target": task_req['target']}})
                    # 更新task鸠合中的status自增1
                    na_task.update({"_id": task_req['_id']}, {"$inc": {"status": 1}})
                    # 在TASK_DATE_DIC纪录task
                    TASK_DATE_DIC[str(task_req['_id'])] = datetime.datetime.now()
                    # 返回task信息
                    return task_req['_id'], task_req['plan'], task_req['target'], task_req['plugin']
        return '', '', '', ''

回到__main__

# 猎取义务信息
task_id, task_plan, task_target, task_plugin = queue_get()
if task_id == '':
    # 没有猎取到task设置装备摆设则sleep10秒后继承猎取
    time.sleep(10)
    continue
if PLUGIN_DB:
    # 当有插件缓存时清算插件缓存
    # 背面扫描时会导入插件模块,删除之前导入的模块
    del sys.modules[PLUGIN_DB.keys()[0]] 
    PLUGIN_DB.clear()
for task_netloc in task_target:
    while True:
        # 掌握线程数
        if int(thread._count()) < THREAD_COUNT:
            # 剔除白名单ip
            if task_netloc[0] in WHITE_LIST: break
            # 启动vulscan扫描线程
            thread.start_new_thread(vulscan, (task_id, task_netloc, task_plugin))
            break
        else:
            time.sleep(2)
# task_plan == 0 为一次性义务
# 更新 status = 2
if task_plan == 0: na_task.update({"_id": task_id}, {"$set": {"status": 2}})

vulscan

__init__

def __init__(self, task_id, task_netloc, task_plugin):
    self.task_id = task_id
    self.task_netloc = task_netloc
    self.task_plugin = task_plugin
    self.result_info = ''
    self.start()

设置好类变量,然后进入start()

def start(self):
    self.get_plugin_info()
    if '.json' in self.plugin_info['filename']:  # json检测形式
        try:
            self.load_json_plugin()  # 读取破绽标示
            self.set_request()  # 标示符转换为要求
            self.poc_check()  # 检测
        except Exception, e:
            return
    else:  # py剧本检测形式
        plugin_filename = self.plugin_info['filename']
        self.log(str(self.task_netloc) + "call " + self.task_plugin)
        if task_plugin not in PLUGIN_DB:
            plugin_res = __import__(plugin_filename)
            setattr(plugin_res, "PASSWORD_DIC", PASSWORD_DIC)  # 给插件声明暗码字典
            PLUGIN_DB[plugin_filename] = plugin_res
        try:
            self.result_info = PLUGIN_DB[plugin_filename].check(str(self.task_netloc[0]), int(self.task_netloc[1]),TIMEOUT)
        except:
            return
    self.save_request()  # 生存效果

json花样检测

load_json_plugin()加载设置装备摆设剧本

def get_plugin_info(self):
    info = na_plugin.find_one({"name": self.task_plugin})
    self.plugin_info = info

然后转换为http要求,返回要求句柄

def set_request(self):
    # 构建url
    url = 'http://' + self.task_netloc[0] + ":" + str(self.task_netloc[1]) + self.plugin_info['plugin']['url']
    if self.plugin_info['plugin']['method'] == 'GET':
        # 举行GET要求
        request = urllib2.Request(url)
    else:
        # 不然举行post要求
        request = urllib2.Request(url, self.plugin_info['plugin']['data'])
    self.poc_request = request

然后考证poc是不是有用

def poc_check(self):
    try:
        # 举行http要求,猎取header和body信息
        res = urllib2.urlopen(self.poc_request, timeout=30)
        res_html = res.read(204800)
        header = res.headers
        # res_code = res.code
    except urllib2.HTTPError, e:
        # res_code = e.code
        header = e.headers
        res_html = e.read(204800)
    except Exception, e:
        return
    try:
        # 猎取编码,然后转码
        html_code = self.get_code(header, res_html).strip()
        if html_code and len(html_code) < 12:
            res_html = res_html.decode(html_code).encode('utf-8')
    except:
        pass
    an_type = self.plugin_info['plugin']['analyzing']
    vul_tag = self.plugin_info['plugin']['tag']
    analyzingdata = self.plugin_info['plugin']['analyzingdata']
    if an_type == 'keyword':
        # 若是是关键词检测,推断是正则照样MD5的检测,然后举行比对
        if analyzingdata.encode("utf-8") in res_html: self.result_info = vul_tag
    elif an_type == 'regex':
        if re.search(analyzingdata, res_html, re.I): self.result_info = vul_tag
    elif an_type == 'md5':
        md5 = hashlib.md5()
        md5.update(res_html)
        # 比对胜利,则返回插件tag
        if md5.hexdigest() == analyzingdata: self.result_info = vul_tag

py剧本检测

# 猎取插件文件名
plugin_filename = self.plugin_info['filename']
self.log(str(self.task_netloc) + "call " + self.task_plugin)
if task_plugin not in PLUGIN_DB:
    # 不在PLUGIN_DB中则导入
    plugin_res = __import__(plugin_filename)
    setattr(plugin_res, "PASSWORD_DIC", PASSWORD_DIC)  # 给插件声明暗码字典
    PLUGIN_DB[plugin_filename] = plugin_res # 增添到PLUGIN_DB中
try:
    # 启用py剧本的check要领,并设置timeout
    self.result_info = PLUGIN_DB[plugin_filename].check(str(self.task_netloc[0]), int(self.task_netloc[1]),TIMEOUT)
except:
    return

生存要求效果

def save_request(self):
    # 推断是不是扫描出效果了
    if self.result_info:
        try:
            time_ = datetime.datetime.now()
            self.log(str(self.task_netloc) + " " + self.result_info)
            # 没有这条扫描纪录则插件扫出的纪录+1
            v_count = na_result.find(
                {"ip": self.task_netloc[0], "port": self.task_netloc[1], "info": self.result_info}).count()
            if not v_count: na_plugin.update({"name": self.task_plugin}, {"$inc": {'count': 1}})
            vulinfo = {"vul_name": self.plugin_info['name'], "vul_level": self.plugin_info['level'],
                       "vul_type": self.plugin_info['type']}
            w_vul = {"task_id": self.task_id, "ip": self.task_netloc[0], "port": self.task_netloc[1],
                     "vul_info": vulinfo, "info": self.result_info, "time": time_,
                     "task_date": TASK_DATE_DIC[str(self.task_id)]}
            # 增添扫描效果纪录
            na_result.insert(w_vul)
        except Exception, e:
            pass

到此也就完成了vulscan的扫描历程。

末了

巡风中关于扫描的分工,多线程的处置惩罚都有许多值得进修和自创的处所。并且险些都有增添一些心跳线程,用于监测。

剖析中不免有些缺乏或许毛病,迎接大佬们指出!


申博|网络安全巴士站声明:该文看法仅代表作者自己,与本平台无关。版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明巡风源码的详细分析
喜欢 (0)
[]
分享 (0)
发表我的评论
取消评论
表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址