代码
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import random
import time
from dns.resolver import Resolver
from config import config
class GetDnsRecord(object):
def __init__(self):
self.resolver = Resolver()
self.query_type = ["AAAA","MX","CNAME","NS","SOA","A","TXT"]
self.resolver.nameservers = [random.choice(config.resolver_nameservers) for i in range(2)]
self.resolver.timeout = self.resolver.lifetime = 10.0
self.answer_result = []
def dns_query(self,subdomain,dnstype):
"""
指定类型查询dns记录
:param str hostname: 主机名
:return: 查询结果
"""
try:
answers = self.resolver.query(subdomain, dnstype)
except Exception as e:
# logger.error(f"{self.source} module is error;dnstype:{dnstype} exce: {str(e)}")
return
self.addResult(dnstype, answers,subdomain)
def addResult(self,dnstype,answers,subdomain):
current_time = time.time()
for answer in answers:
value = answer.to_text().rstrip('.')
resolver_item = {
"hostname": subdomain,
"value": value,
"type": dnstype,
"first_seen": current_time,
"last_seen": current_time,
"query_time": current_time,
"source": self.source,
"query_type": "realtime"
}
self.answer_result.append(resolver_item)
def query(self,subdomain):
for dnstype in self.query_type:
try:
self.dns_query(subdomain,dnstype)
except Exception as e:
# logger.error(f"{self.source} module is error;dnstype:{dnstype}: {str(e)}")
continue
return self.answer_result
if __name__ == '__main__':
print(GetDnsRecord().query("www.baidu.com"))
dns 域名服务配置(config)
resolver_nameservers = [
'119.29.29.29',
'119.28.28.28', # DNSPod
'182.254.116.116',
'180.76.76.76', # Baidu DNS
'223.5.5.5',
'223.6.6.6', # AliDNS
# 国外
'8.8.8.8',
'8.8.4.4', # Google DNS
"37.235.1.174",
"37.235.1.177", # FreeDNS
"77.88.8.8",
"77.88.8.1", # Yandex DNS
"80.80.80.80",
"80.80.81.81", # Freenom World
# 质量差(响应慢 丢包多)
# '114.114.114.114',
# '114.114.115.115', # 114DNS
# '180.76.76.76', # Baidu DNS 记录较少
# '1.0.0.1',
# '1.1.1.1' # CloudFlare DNS
# '208.67.222.222',
# '208.67.220.220' # OpenDNS
] # 指定查询的DNS域名服务器