-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmassrdap.py
142 lines (106 loc) · 5.11 KB
/
massrdap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#!/usr/bin/env python
# MassRDAP - developed by acidvegas (https://git.acid.vegas/massrdap)
import argparse
import asyncio
import logging
import json
import re
try:
import aiofiles
except ImportError:
raise ImportError('missing required aiofiles library (pip install aiofiles)')
try:
import aiohttp
except ImportError:
raise ImportError('missing required aiohttp library (pip install aiohttp)')
# Color codes
BLUE = '\033[1;34m'
CYAN = '\033[1;36m'
GREEN = '\033[1;32m'
GREY = '\033[1;90m'
PINK = '\033[1;95m'
PURPLE = '\033[0;35m'
RED = '\033[1;31m'
YELLOW = '\033[1;33m'
RESET = '\033[0m'
# Setup basic logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# Global variable to store RDAP servers
RDAP_SERVERS = {}
async def fetch_rdap_servers():
'''Fetches RDAP servers from IANA's RDAP Bootstrap file.'''
async with aiohttp.ClientSession() as session:
async with session.get('https://data.iana.org/rdap/dns.json') as response:
data = await response.json()
for entry in data['services']:
tlds = entry[0]
rdap_url = entry[1][0]
for tld in tlds:
RDAP_SERVERS[tld] = rdap_url
def get_tld(domain: str):
'''Extracts the top-level domain from a domain name.'''
parts = domain.split('.')
return '.'.join(parts[1:]) if len(parts) > 1 else parts[0]
async def lookup_domain(domain: str, proxy_url: str, semaphore: asyncio.Semaphore, success_file, failure_file):
'''
Looks up a domain using the RDAP protocol.
:param domain: The domain to look up.
:param proxy_url: The proxy URL to use for the request.
:param semaphore: The semaphore to use for concurrency limiting.
'''
async with semaphore:
tld = get_tld(domain)
rdap_url = RDAP_SERVERS.get(tld)
if not rdap_url:
return
query_url = f'{rdap_url}domain/{domain}'
try:
async with aiohttp.ClientSession() as session:
async with session.get(query_url, proxy=proxy_url if proxy_url else None) as response:
if response.status == 200:
data = await response.json()
await success_file.write(json.dumps(data) + '\n')
print(f'{GREEN}SUCCESS {GREY}| {BLUE}{response.status} {GREY}| {PURPLE}{rdap_url.ljust(50)} {GREY}| {CYAN}{domain}{GREEN}')
else:
await failure_file.write(domain + '\n')
print(f'{RED}FAILED {GREY}| {YELLOW}{response.status} {GREY}| {PURPLE}{rdap_url.ljust(50)} {GREY}| {CYAN}{domain}{RESET}')
except Exception as e:
print(f'{RED}FAILED {GREY}| --- | {PURPLE}{rdap_url.ljust(50)} {GREY}| {CYAN}{domain} {RED}| {e}{RESET}')
async def process_domains(args: argparse.Namespace):
'''
Processes a list of domains, performing RDAP lookups for each one.
:param args: The parsed command-line arguments.
'''
await fetch_rdap_servers() # Populate RDAP_SERVERS with TLDs and their RDAP servers
if not RDAP_SERVERS:
logging.error('No RDAP servers found.')
return
semaphore = asyncio.Semaphore(args.concurrency)
async with aiofiles.open(args.output, 'w') as success_file, aiofiles.open(args.failed, 'w') as failure_file:
async with aiofiles.open(args.input_file) as file:
async for domain in file:
domain = domain.strip()
if domain:
await semaphore.acquire()
task = asyncio.create_task(lookup_domain(domain, args.proxy, semaphore, success_file, failure_file))
task.add_done_callback(lambda t: semaphore.release())
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Perform RDAP lookups for a list of domains.')
parser.add_argument('-i', '--input_file', required=True, help='File containing list of domains (one per line).')
parser.add_argument('-p', '--proxy', help='Proxy in user:pass@host:port format. If not supplied, none is used.')
parser.add_argument('-c', '--concurrency', type=int, default=25, help='Number of concurrent requests to make. (default: 25)')
parser.add_argument('-o', '--output', default='output.json', help='Output file to write successful RDAP data to. (default: output.json)')
parser.add_argument('-f', '--failed', default='failed.txt', help='Output file to write failed domains to. (optional)')
args = parser.parse_args()
if not args.input_file:
raise ValueError('File path is required.')
if args.concurrency < 1:
raise ValueError('Concurrency must be at least 1.')
if args.proxy:
if not re.match(r'^https?:\/\/[^:]+:[^@]+@[^:]+:\d+$', args.proxy):
raise ValueError('Invalid proxy format. Must be in user:pass@host:port format.')
if not args.output:
raise ValueError('Output file path is required.')
if not args.failed:
print(f'{YELLOW}Failed domains will not be saved.{RESET}')
asyncio.run(process_domains(args))