• 欢迎访问挑战自我博客网站,安全研究,web渗透,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站,欢迎加入挑战自我博客网站 网站主页

基于web的信息探测-Httpscan-1.7

httpscan 挑战自我 733次浏览 已收录 0个评论

代码已上传至github

https://github.com/linxi0428/httpscan

作者博客

http://www.tiaozhanziwo.com/

更新日志

Httpscan Version:1.7

添加针对已存在的web目录开展迭代查询,迭代次数由Iterations参数控制;

修复迭代过程中遇到的死循环问题,由Deduplicate列表控制;

功能模块函数化,精炼部分代码;

添加request前,先期判断http与https页面,测试链接数降低一半;

添加特定的404页面判定,增强程序的准确度;

解决301、302跳转的相关问题;

删除控制台的print输出,python的编码转换问题以及cmd的兼容问题真是太挫了,重新定义了logging函数,仍然没有很好的解决中文的输出问题,难道python2版本就真的没办法解决了吗?希望知道的小伙伴告诉我下!

利用爬虫的特性,分析、读取响应页面的URL,作为进一步web目录测试的条件;

Httpscan Version:1.6

修复针对域名列表的测试相关问题;

修复部分bug;

Httpscan Version:1.6

修复针对域名列表的测试相关问题;

修复部分bug;

Httpscan Version:1.5

添加web页面测试时常见的漏洞目录等要素;

添加端口扫描模块,在测试的时候先进行端口扫描,然后针对指定端口开展web漏洞目录测试;

基于web的信息探测-Httpscan-1.7

Httpscan Version:1.4

测试的时候发现对https的页面测试效果非常差,修复忽略https请求的证书验证等问题;

添加web页面测试时常见的漏洞端口等要素;

Httpscan Version:1.3

增加对多线程处理中的中断响应,(ctrl+c)实现柔和退出

Httpscan Version:1.2

增加https页面的访问功能,原来的版本https页面访问从日志信息中发现一直报错

Httpscan Version:1.1

增添日志输出功能,输出到程序目录的httpscan_log.txt文件中

Httpscan Version:1.0

代码是网上综合改造的

功能:基于web的信息探测,支持从文件读取目标(IP或域名),支持IP地址探测,IP地址支持CIDR,文件中的目标不加http(s)://,程序会自动添加

演示:

python xxx.py -f file.txt -t 20

python xxx.py 1.1.1.0/24 -t 20

Install(安装需求库与程序)

1、pip install nmap

2、pip install python-nmap

3、pip install IPy

4、pip install lxml

5、系统需要安装nmap程序(下载安装:https://nmap.org/)

To Do

有什么需求可以提交,我会着手改造!

1、https页面的访问问题,目前有些问题,其实就是证书相关的东西;(V-1.2版本已经解决)

2、增添对于访问目标后302跳转页面的输出以及日志记录功能;

3、添加对多线程处理中的中断响应(ctrl+c);(V-1.3版本已经解决)

4、添加域名解析、旁站解析、C段旁站解析;

5、测试的时候发现对https的页面测试效果非常不好,可能涉及到忽略证书等问题;(V-1.4版本已经解决)

6、在进行页面的测试时,简单添加常见的漏洞目录、漏洞端口等要素;(V-1.4版本初步构建功能)

7、收集了一个常见的网站目录,大概2000个,结合1.4版本构建的几千个常见web端口,完善一个测试策略。比如先进行一个端口扫描,再把开放的端口放入扫描队列,同时结合常见的漏洞目录列表进行,效率会有成倍的提升;(V-1.5版本构建功能)

8、添加网站爬虫方法,不断提升网站目录漏洞扫描精准度,目前目录的遍历探测仅仅针对根目录,后续将实现对其他的二级、三级乃至多级目录的探测;(V-1.7版本已经解决)

9、不断简化功能分类以及功能模组,提升代码的可读性以及代码分工的简易性;(V-1.7版本初步解决)

10、随着后期程序功能的不断扩充,替换部分效率低下的功能方法,去除不必要的接口、循环以及判断,提升程序的整体效率;

11、鉴于C段扫描过程太长,后续可以添加一个扫描进度显示,然后界面就不显示扫描结果了,这个可以从端口扫描结束的时候开始显示,端口扫描结束的时候,后续的整体request数量就定了;

12、目前的域名文件列表读取存在一些bug或者是问题,下一步着手解决;(V-1.6版本已经解决)

13、portscan扫描模块有时候会卡住(一些可能的原因python-nmap库,多线程解决方案不完善等问题),下一步着手解决;

14、扫描结果的优化处理:一是http的status为200,但是是一个特制的404页面,这种可以从title中做进一步判断;二是部分IP直接访问后,URL地址处会直接变成或者跳转到某个域名地址,结果可以把这一部分地址记录下来;(V-1.7版本初步解决)

15、利用爬虫的特性,读取响应页面的URL,作为进一步web目录测试的条件,优化了目录探测的准确度,提升了web探测的精度;(V-1.7版本初步解决)

Q & A

1、错误’ValueError: IP(‘1.1.1.1/24’) has invalid prefix length (24)’的解决办法

这个是由于IP地址的CIDR格式引起的,根据IPy的库规定,第一位必须是所在IP段的网络号,正确的写法如下:

1.1.1.0/24 : 1.1.1.0~1.1.1.255

1.1.1.128/25 : 1.1.1.128~1.1.1.255

1.1.1.64/26 : 1.1.1.64~1.1.1.127

1.1.1.32/27 : 1.1.1.32~1.1.1.63

1.1.1.16/28 : 1.1.1.16~1.1.1.31

2、结果保存的相关问题

结果除了显示在cmd的控制台界面外,还会在程序当前目录生成httpscan_log.txt文件,保存扫描的结果

3、当遇到错误’module’ object has no attribute ‘PortScanner’时,尝试重新安装python-nmap

pip uninstall python-nmap

pip install python-nmap

 

 

 


#!/usr/bin/env python
#coding:utf-8
#Author: linxi0428
#Version: 1.7

import re
import os
import sys
import ssl
import time
import logging
import optparse
import requests
import signal
import socket
import nmap
import logging
import threading
import Queue
import codecs
import urlparse

from lxml import etree
from IPy import IP
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.poolmanager import PoolManager

#Config the default encoding
reload(sys)
sys.setdefaultencoding("utf8")

#Set the request in ssl with unverified cert and disable_warnings
ssl._create_default_https_context = ssl._create_unverified_context
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
#import requests.packages.urllib3.util.ssl_ 
#requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = 'ALL'

#Request Timeout
TimeOut = 5

#The iterations of the directory
Iterations = 1

#Log-Config
logging_file_result = codecs.open('httpscan_result.txt','wb',encoding = 'utf-8')
logging_file_info = codecs.open('httpscan_info.txt','wb',encoding = 'utf-8')
logging_file_error = codecs.open('httpscan_error.txt','wb',encoding = 'utf-8')

test_list = []

#User-Agent
header = {'User-Agent' : 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 \
          (KHTML, like Gecko) Chrome/34.0.1847.131 Safari/537.36','Connection':'close'}

#Transport adapter" that allows us to use SSLv3
class Ssl3HttpAdapter(HTTPAdapter):
    def init_poolmanager(self, connections, maxsize, block=False):
        self.poolmanager = PoolManager(num_pools=connections,
                                       maxsize=maxsize,
                                       block=block,
                                       ssl_version=ssl.PROTOCOL_SSLv3)

class httpscan():
    def __init__(self,cidr,threads_num,open_ports):
        self.threads_num = threads_num
        self.IPs = Queue.Queue() #build ip queue
        self.open_ports = open_ports
        self.Deduplicate_list = set()
        self.dict_list_file = 'dict.txt' #open the path dictionary

        with open(self.dict_list_file,'r') as dict_lists:
            for dict_line in dict_lists.readlines():
                dict_line = dict_line.strip()
                for open_port in list(self.open_ports):
                    if open_port.strip().endswith('80'):
                        self.IPs.put("http://"+str(open_port)+str(dict_line))
                    elif open_port.strip().endswith('443'):
                        self.IPs.put("https://"+str(open_port)+str(dict_line))
                    else:
                        self.IPs.put("http://"+str(open_port)+str(dict_line))
                        self.IPs.put("https://"+str(open_port)+str(dict_line))
                    
    def request(self):
        with threading.Lock():
            while self.IPs.qsize() > 0:
                ip = self.IPs.get()
                if ip == None:
                	continue
                ip = self.str_replace(ip)
                if (ip not in self.Deduplicate_list) and (ip.strip() not in self.Deduplicate_list):
                    ip_original = ip.strip()
                    self.Deduplicate_list.add(ip_original)
                    self.Deduplicate_list.add(ip)
                    try:
                        s = requests.Session()
                        s.mount('https:', Ssl3HttpAdapter()) #Mount All Https to ssl.PROTOCOL_SSLV3
                        r = s.get(str(ip).strip(),headers=header,timeout=TimeOut,verify=False,allow_redirects=False)
                        try:
                            self.get_url_to_queue(ip,response=r)
                        except Exception,e:
                            rewrite_logging('ERROR-1',e)
                        status = r.status_code
                        
                        title = re.search(r'<title>(.*)</title>', r.text) #get the title
                        if title:
                            title = title.group(1).strip()[:30]
                        else:
                            title = "No Title Field"

                        if ((status == 301) or (status == 302)) and ('404' not in title):
                            if 'Location' in r.headers:
                                try:
                                    location = r.headers['Location']
                                    self.redirect_handler_func(ip,location)
                                except Exception,e:
                                    rewrite_logging('ERROR-2',e)
                        else:
                            try:
                                if 'Server' in r.headers:
                                    banner = r.headers['Server'][:20] #get the server banner
                                else:
                                    banner = 'No Server Field'
                                self.log_func(ip,ip_original,status,banner,title)
                            except Exception,e:
                                message = 'Current IP is %s,the error is %s'  % (ip,e)
                                rewrite_logging('ERROR-3',message)
                                self.log_func(ip,ip_original,status,banner,title)
                    except Exception,e:
                        message = 'Current IP is %s,the error is %s'  % (ip,e)
                        rewrite_logging('ERROR-4',message)
    
    def run(self):#Multi thread
        signal.signal(signal.SIGINT, quit)
        signal.signal(signal.SIGTERM, quit)
        for i in range(self.threads_num):
            t = threading.Thread(target=self.request)
            t.setDaemon(True)
            t.start()
        while True:
            if not t.isAlive():
                break

    def redirect_handler_func(self,ip,location):
        loc_urlparse = urlparse.urlparse(location)
        ip_urlparse = urlparse.urlparse(ip)
        if loc_urlparse.netloc.split(':')[0] == ip_urlparse.netloc.split(':')[0]:
            if location.strip() not in self.Deduplicate_list:
                self.IPs.put(location.strip())
                self.Deduplicate_list.add(location.strip())
                rewrite_logging('INFO','rejoin the 302 url %s' % location)

    def str_replace(self,ip): #Replace 'https://test.com//1//2//3//4/(//)' to 'https://test.com/1/2/3/4/'
        new_ip = ip.split('://')[0] + '://'
        new_ip = new_ip + ip.split('://')[1].replace('//','/')
        return new_ip

    def log_func(self,ip,ip_original,status,banner,title):
        if (status != 400) and (status != 403) and (status != 404) and ('404' not in str(title)):
            self.print_log(ip,status,banner,title)
        if (status != 400) and (status != 404) and ('404' not in str(title)):
            self.rejoin_queue(ip,ip_original,status)

    def rejoin_queue(self,ip,ip_original,status):
        if (ip.strip().endswith('/')):
            if (status == 200) or (status == 403) or (status == 501):
                with open(self.dict_list_file,'r') as dict_lists:
                    for dict_line in dict_lists.readlines():
                        dict_line = dict_line.strip()
                        if dict_line != '/':
                            rejoin_queue_ip = str(ip).strip() + str(dict_line)
                            rejoin_queue_ip_original = str(ip_original).strip() + str(dict_line)
                            if rejoin_queue_ip_original.count('//') <= (Iterations+1): #Judge the count of Iterations
                                if (rejoin_queue_ip_original not in self.Deduplicate_list) and \
                                    (rejoin_queue_ip not in self.Deduplicate_list):
                                    self.IPs.put(rejoin_queue_ip_original)
                            self.Deduplicate_list.add(rejoin_queue_ip)
                            self.Deduplicate_list.add(rejoin_queue_ip_original)
    
    def print_log(self,ip,status,banner,title):
        message = "|%-66s|%-6s|%-14s|%-30s|" % (ip.strip(),status,banner,title)
        rewrite_logging('Result',message)

    def get_url_to_queue(self,ip,response):
        page = etree.HTML((response.text.encode('utf-8')).decode('utf-8'))
        reqs = set()
        orig_url = response.url
    
        #get_href_reqs
        href_url = []        
        link_href_url = page.xpath("//link/@href")
        a_href_url = page.xpath("//a/@href")
        li_href_url = page.xpath("//li/@href")
        href_url = link_href_url + a_href_url + li_href_url
        
        #get_src_reqs
        src_url = []        
        img_src_url = page.xpath("//img/@src")
        script_src_url = page.xpath("//script/@src")
        src_url = img_src_url + script_src_url
    
        all_url = []
        all_url = href_url + src_url
        for x in xrange(0,len(all_url)):
            if not all_url[x].startswith('/') and not all_url[x].startswith('http'):
                all_url[x] = '/' + all_url[x]
            reqs.add(self.url_valid(all_url[x], orig_url))
    
        for x in xrange(0,len(list(reqs))):
            req = list(reqs)[x]
            if req not in self.Deduplicate_list:
                self.IPs.put(req)
                self.Deduplicate_list.add(req)
    
    def url_valid(self,url,orig_url):
        if url == None:
            return
        if '://' not in url:
            proc_url = self.url_processor(orig_url)
            url = proc_url[1] + proc_url[0] + url
        else:
            url_parse = self.url_processor(url)
            orig_url_parse = self.url_processor(orig_url)
            if url_parse[0].split(':')[0] != orig_url_parse[0].split(':')[0]:
                return
        return url
    
    def url_processor(self,url): # Get the url domain, protocol, and netloc using urlparse
        try:
            parsed_url = urlparse.urlparse(url)
            path = parsed_url.path
            protocol = parsed_url.scheme+'://'
            hostname = parsed_url.hostname
            netloc = parsed_url.netloc
            doc_domain = '.'.join(hostname.split('.')[-2:])
        except:
            rewrite_logging('ERROR-5','Could not parse url: %s' % url)
            return
    
        return (netloc, protocol, doc_domain, path)

class portscan():
    def __init__(self,cidr,threads_num,file_source,ports):
        self.threads_num = threads_num
        self.ports = ports
        self.IPs = Queue.Queue()
        self.file_source = file_source
        self.open_ports = set() #ip-port lists

        if self.file_source == None:
            try:
                self.cidr = IP(cidr)
            except Exception,e:
                rewrite_logging('ERROR-6',e)
            for ip in self.cidr:
                ip = str(ip)
                self.IPs.put(ip)
        else:
            with open(self.file_source,'r') as file_ip:
                for line in file_ip:
                    self.IPs.put(line)

    def nmapScan(self):
        with threading.Lock():
            while self.IPs.qsize() > 0:
                item = self.IPs.get()
                self.IPs.task_done()
                try:
                    nmScan = nmap.PortScanner()
                    nmScan.scan(item,arguments = self.ports.read())
                    for tgthost in nmScan.all_hosts():
                        for tgtport in nmScan[tgthost]['tcp']:
                            tgthost = tgthost.strip()
                            tgtport = int(tgtport)
                            if nmScan[tgthost]['tcp'][tgtport]['state'] == 'open':
                                if self.file_source ==None:
                                    open_list = str(tgthost) + ':' + str(tgtport)
                                    self.open_ports.add(open_list)
                                    message = 'the target %s has opened port %s' % (tgthost,tgtport)
                                    rewrite_logging('Result',message)
                                    print message + '\n'
                                else:
                                    open_list = str(item.strip()) + ':' + str(tgtport)
                                    self.open_ports.add(open_list)
                                    message = 'the target %s has opened port %s' % (item.strip(),tgtport)
                                    rewrite_logging('Result',message)
                                    print message + '\n'
                except Exception, e:
                    rewrite_logging('ERROR-7',e)

    def run(self):
        threads = [threading.Thread(target=self.nmapScan) for i in range(self.threads_num)]
        for thread in threads:
            thread.setDaemon(True)
            thread.start()
        for thread in threads:
            thread.join()

        while True:
            if not thread.isAlive():
                break
        return self.open_ports

def help():
    print "Example:"
    print "  python "+sys.argv[0]+" -f domain_list.txt"
    print "  python "+sys.argv[0]+" 1.1.1.0/24"

def quit(signum, frame): #Judge Child Thread's Statue(Exit or Not)!
    print '\nYou choose to stop me!!'
    sys.exit()

def rewrite_logging(level,message):
    log = "[%s] %s: %s" % (time.asctime(),level,message)
    if level == 'Result':
        logging_file_result.write(log)
        logging_file_result.write('\n')
    elif 'ERROR' in level:
        logging_file_error.write(log)
        logging_file_error.write('\n')
    else:
        logging_file_info.write(log)
        logging_file_info.write('\n')

def startscan(port,cidr,threads_num,file_source):
    ports = open(port,'r')
    print "------------------------------------------------------------------------------"
    print '# Start Port Scan\n'
    scan_port = portscan(cidr=cidr,threads_num=3,file_source=file_source,ports=ports)
    open_ports = scan_port.run()
    print '# Port Scan Ends\n'
    print "------------------------------------------------------------------------------"
    print '# Start Http Scan\n'
    s = httpscan(cidr=cidr,threads_num=threads_num,open_ports=open_ports)
    s.run()

if __name__ == "__main__":
    parser = optparse.OptionParser("Usage: %prog [target or file] [options] ")
    parser.add_option("-t", "--thread", dest = "threads_num",\
                      default = 100, help = "number of theads,default=100")
    parser.add_option("-f", "--file", dest = "file_source",\
                      help = "source of file,default=domain_list.txt")
    (options, args) = parser.parse_args()

    if options.file_source == None:
        if len(args) < 1:
            parser.print_help()
            help()
            sys.exit(0)
        else:
            startscan(port='port.txt',cidr=args[0],threads_num=options.threads_num,file_source=None)
    else:
        startscan(port='port.txt',cidr=None,threads_num=options.threads_num,file_source=options.file_source)


挑战自我博客, 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权 , 转载请注明基于web的信息探测-Httpscan-1.7
喜欢 (15)
支付宝[]
分享 (0)
发表我的评论
取消评论
表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址