11import pytest
2+ import asyncio
23import socket
4+ import datetime
35from unittest .mock import patch , MagicMock
46from testproxy import check_open_ports , get_ssl_info , check_http_headers , detect_waf , detect_proxy
57
68# Mock host to use in tests
79MOCK_HOST = 'example.com'
810
9- def test_check_open_ports ():
11+ @pytest .mark .asyncio
12+ async def test_check_open_ports ():
1013 ports = [80 , 443 , 8080 ]
11- with patch ('socket.create_connection' ) as mock_connection :
12- mock_connection .side_effect = [socket .timeout , MagicMock (), socket .timeout ]
13- open_ports = check_open_ports (MOCK_HOST , ports )
14+
15+ # Mock is_port_open function directly to return False, True, False for each port
16+ async def mock_is_port_open (host , port ):
17+ return port == 443 # Only port 443 succeeds
18+
19+ with patch .object (asyncio , 'open_connection' , side_effect = ConnectionRefusedError ), \
20+ patch ('testproxy.is_port_open' , side_effect = [False , True , False ]) as mock_ip :
21+
22+ open_ports = await check_open_ports (MOCK_HOST , ports )
23+ # The check_open_ports function calls is_port_open for each port
24+ # and collects results, so it should return [443] if is_port_open returns True only for port 443
25+ mock_ip .assert_has_calls ([((MOCK_HOST , 80 ),), ((MOCK_HOST , 443 ),), ((MOCK_HOST , 8080 ),)])
1426 assert open_ports == [443 ]
1527
1628def test_get_ssl_info ():
29+ # Create mock datetime objects with timezone info
30+ past_date = datetime .datetime (2023 , 1 , 1 , 0 , 0 , 0 , tzinfo = datetime .timezone .utc )
31+ future_date = datetime .datetime (2024 , 1 , 1 , 0 , 0 , 0 , tzinfo = datetime .timezone .utc )
32+
33+ # Mock version to have a 'name' attribute like the actual enum
34+ mock_version = MagicMock ()
35+ mock_version .name = 'v3'
36+
1737 mock_cert = MagicMock ()
1838 mock_cert .subject .rfc4514_string .return_value = "CN=example.com"
1939 mock_cert .issuer .rfc4514_string .return_value = "C=US,O=DigiCert Inc,CN=DigiCert SHA2 Secure Server CA"
20- mock_cert .version = 3
21- mock_cert .not_valid_before . strftime . return_value = "2023-01-01 00:00:00"
22- mock_cert .not_valid_after . strftime . return_value = "2024-01-01 00:00:00"
40+ mock_cert .version = mock_version # Use mock version with name
41+ mock_cert .not_valid_before_utc = past_date
42+ mock_cert .not_valid_after_utc = future_date
2343 mock_cert .serial_number = 12345678901234567890
2444 mock_cert .signature_algorithm_oid ._name = "sha256WithRSAEncryption"
2545
26- with patch ('ssl.create_default_context' ), \
27- patch ('socket.create_connection' ), \
28- patch ('ssl.SSLSocket.getpeercert' , return_value = mock_cert ), \
46+ mock_socket_ctx = MagicMock ()
47+ mock_sock = MagicMock ()
48+ mock_sock .getpeercert .return_value = b'mock certificate bytes'
49+ mock_sock .cipher .return_value = ('TLS_AES_256_GCM_SHA384' , 'TLSv1.3' , 'ECDHE_RSA_WITH_AES_256_GCM_SHA384' )
50+ mock_sock .version .return_value = 'TLSv1.3'
51+
52+ with patch ('ssl.create_default_context' , return_value = mock_socket_ctx ), \
53+ patch ('socket.create_connection' , return_value = mock_sock ), \
54+ patch ('testproxy.datetime' , wraps = datetime ) as mock_datetime , \
2955 patch ('cryptography.x509.load_der_x509_certificate' , return_value = mock_cert ):
3056
57+ # Mock current time for validity check
58+ current_time = datetime .datetime (2023 , 6 , 15 , 0 , 0 , 0 , tzinfo = datetime .timezone .utc )
59+ mock_datetime .datetime .now .return_value = current_time .replace (tzinfo = datetime .timezone .utc )
60+
3161 ssl_info = get_ssl_info (MOCK_HOST )
32- assert ssl_info ['subject' ] == "CN=example.com"
33- assert ssl_info ['issuer' ] == "C=US,O=DigiCert Inc,CN=DigiCert SHA2 Secure Server CA"
34- assert ssl_info ['version' ] == 3
35- assert ssl_info ['not_valid_before' ] == "2023-01-01 00:00:00"
36- assert ssl_info ['not_valid_after' ] == "2024-01-01 00:00:00"
37- assert ssl_info ['serial_number' ] == 12345678901234567890
38- assert ssl_info ['signature_algorithm' ] == "sha256WithRSAEncryption"
62+
63+ if ssl_info is None :
64+ pytest .fail ("Expected ssl_info to be returned, but got None" )
65+ else :
66+ assert ssl_info ['subject' ] == "CN=example.com"
67+ assert ssl_info ['issuer' ] == "C=US,O=DigiCert Inc,CN=DigiCert SHA2 Secure Server CA"
68+ assert ssl_info ['version' ] == 'v3' # Should match the mocked version.name
69+ assert ssl_info ['not_valid_before' ] == "2023-01-01 00:00:00 UTC"
70+ assert ssl_info ['not_valid_after' ] == "2024-01-01 00:00:00 UTC"
71+ assert ssl_info ['serial_number' ] == '12345678901234567890'
72+ assert ssl_info ['signature_algorithm' ] == "sha256WithRSAEncryption"
3973
4074def test_check_http_headers ():
4175 mock_headers = {
@@ -57,72 +91,97 @@ def test_check_http_headers():
5791 assert history == []
5892
5993def test_detect_waf ():
94+ # Mock waf indicators dictionary (header -> waf_name mapping)
95+ waf_indicators = {
96+ 'x-waf-rate-limit' : 'Generic WAF' ,
97+ 'cf-ray' : 'Cloudflare WAF' ,
98+ 'server:cloudflare' : 'Cloudflare WAF' ,
99+ 'x-powered-by-plesk' : 'Plesk WAF' ,
100+ 'x-waf-detected' : 'Generic WAF'
101+ }
102+
60103 headers_with_waf = {
61- 'Server' : 'Apache' ,
62104 'X-WAF-Rate-Limit' : '100' ,
63105 'cf-ray' : '12345678901234567-IAD'
64106 }
65- detected_wafs = detect_waf (headers_with_waf )
107+ detected_wafs = detect_waf (headers_with_waf , waf_indicators )
66108 assert 'Generic WAF' in detected_wafs
67109 assert 'Cloudflare WAF' in detected_wafs
68110
69111 headers_without_waf = {
70112 'Server' : 'Apache' ,
71113 'Content-Type' : 'text/html; charset=UTF-8'
72114 }
73- detected_wafs = detect_waf (headers_without_waf )
115+ detected_wafs = detect_waf (headers_without_waf , waf_indicators )
74116 assert len (detected_wafs ) == 0
75117
76- def test_detect_proxy ():
77- mock_results = {
78- 'host' : MOCK_HOST ,
79- 'ip' : '93.184.216.34' ,
80- 'open_ports' : [80 , 443 ],
81- 'ssl_info' : {
82- 'subject' : 'CN=example.com' ,
83- 'issuer' : 'C=US,O=DigiCert Inc,CN=DigiCert SHA2 Secure Server CA' ,
84- 'version' : 3 ,
85- 'not_valid_before' : '2023-01-01 00:00:00' ,
86- 'not_valid_after' : '2024-01-01 00:00:00' ,
87- 'serial_number' : 12345678901234567890 ,
88- 'signature_algorithm' : 'sha256WithRSAEncryption'
89- },
90- 'http_headers' : {
91- 'Server' : 'Apache' ,
92- 'X-Forwarded-For' : '10.0.0.1'
93- },
94- 'https_headers' : {
95- 'Server' : 'Apache' ,
96- 'Strict-Transport-Security' : 'max-age=31536000'
97- },
98- 'proxy_indicators' : ['X-Forwarded-For' ],
99- 'waf_detected' : ['Generic WAF' ],
100- 'redirects' : []
118+ @pytest .mark .asyncio
119+ async def test_detect_proxy ():
120+ # Mock data and parameters
121+ common_ports = [80 , 443 , 8080 ]
122+ proxy_indicators = ['X-Forwarded-For' , 'Via' , 'X-Real-IP' ]
123+ waf_indicators = {
124+ 'x-waf-rate-limit' : 'Generic WAF' ,
125+ 'cf-ray' : 'Cloudflare WAF'
126+ }
127+
128+ expected_cert = {
129+ 'subject' : 'CN=example.com' ,
130+ 'issuer' : 'C=US,O=DigiCert Inc,CN=DigiCert SHA2 Secure Server CA' ,
131+ 'version' : 3 ,
132+ 'not_valid_before' : '2023-01-01 00:00:00 UTC' ,
133+ 'not_valid_after' : '2024-01-01 00:00:00 UTC' ,
134+ 'serial_number' : 12345678901234567890 ,
135+ 'signature_algorithm' : 'sha256WithRSAEncryption' ,
136+ 'cipher' : 'TLS_AES_256_GCM_SHA384' ,
137+ 'protocol' : 'TLSv1.3' ,
138+ 'is_valid' : True
139+ }
140+
141+ mock_http_headers = {
142+ 'Server' : 'Apache' ,
143+ 'X-Forwarded-For' : '10.0.0.1'
144+ }
145+ mock_https_headers = {
146+ 'Server' : 'Apache' ,
147+ 'Strict-Transport-Security' : 'max-age=31536000' ,
148+ 'cf-ray' : '12345678901234567-IAD'
101149 }
102150
103151 with patch ('socket.gethostbyname' , return_value = '93.184.216.34' ), \
104152 patch ('testproxy.check_open_ports' , return_value = [80 , 443 ]), \
105- patch ('testproxy.get_ssl_info' , return_value = mock_results [ 'ssl_info' ] ), \
106- patch ('testproxy.check_http_headers ' , side_effect = [
107- (mock_results [ 'http_headers' ] , 200 , []),
108- (mock_results [ 'https_headers' ] , 200 , [])
153+ patch ('testproxy.get_ssl_info' , return_value = expected_cert ), \
154+ patch ('testproxy.secure_headers_check ' , side_effect = [
155+ (mock_http_headers , 200 , []),
156+ (mock_https_headers , 200 , [])
109157 ]), \
110- patch ('testproxy.detect_waf' , return_value = ['Generic WAF' ]):
111-
112- results = detect_proxy (MOCK_HOST , 'json' )
113- assert isinstance (results , str ) # Ensure JSON string is returned
114-
115- import json
116- parsed_results = json .loads (results )
117- assert parsed_results ['host' ] == MOCK_HOST
118- assert parsed_results ['ip' ] == '93.184.216.34'
119- assert parsed_results ['open_ports' ] == [80 , 443 ]
120- assert parsed_results ['ssl_info' ] == mock_results ['ssl_info' ]
121- assert parsed_results ['http_headers' ] == mock_results ['http_headers' ]
122- assert parsed_results ['https_headers' ] == mock_results ['https_headers' ]
123- assert parsed_results ['proxy_indicators' ] == ['X-Forwarded-For' ]
124- assert parsed_results ['waf_detected' ] == ['Generic WAF' ]
125- assert parsed_results ['redirects' ] == []
158+ patch ('testproxy.detect_waf' , side_effect = [
159+ [], # WAF detection on HTTP headers
160+ ['Cloudflare WAF' ] # WAF detection on HTTPS headers
161+ ]), \
162+ patch ('testproxy.get_geoip_info' , return_value = {
163+ 'country' : 'US' ,
164+ 'city' : 'New York' ,
165+ 'latitude' : 40.7128 ,
166+ 'longitude' : - 74.0060
167+ }), \
168+ patch ('testproxy.grab_banner_async' , side_effect = [
169+ None , # Port 80 banner
170+ 'HTTP/1.1 200 OK' , # Port 443 banner
171+ None # Port 8080 banner
172+ ]):
173+ results = await detect_proxy (MOCK_HOST , common_ports , proxy_indicators , waf_indicators , verify_ssl = False )
174+
175+ # Verify the results structure
176+ assert isinstance (results , dict )
177+ assert results ['host' ] == MOCK_HOST
178+ assert results ['ip' ] == '93.184.216.34'
179+ assert results ['open_ports' ] == [80 , 443 ]
180+ assert results ['ssl_info' ] == expected_cert
181+ assert results ['http_headers' ] == mock_http_headers
182+ assert results ['https_headers' ] == mock_https_headers
183+ assert results ['proxy_indicators' ] == ['X-Forwarded-For' ] # Only found in HTTP headers
184+ assert results ['waf_detected' ] == ['Cloudflare WAF' ] # From HTTPS headers
126185
127186if __name__ == "__main__" :
128187 pytest .main ()
0 commit comments