Skip to content
Open
22 changes: 15 additions & 7 deletions pynetworking/Device.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ def __init__(self, host, username='manager', password='friend', protocol='ssh',
self._port = port

self._mock_test = self._get_mock_opt(mock.lower())
if self._mock_test is False:
self.func_dict = {'open': self._open, 'close': self._close}
else:
self._proxy_connection_timeout = 1000
self.func_dict = {'open': self._mocked_open, 'close': self._mocked_close}

self._log_level = getattr(logging, log_level.upper())
hdl = logging.StreamHandler()
Expand Down Expand Up @@ -108,16 +113,11 @@ def ping(self):

def open(self):
self.log_info("open")
if self._mock_test is False:
self._open()
else:
self._mocked_open() # pragma: no cover
self.func_dict['open']()

def close(self):
self.log_info("close")
if isinstance(self._proxy, Process) and (isinstance(self._proxy, Process) and self._proxy.is_alive()):
self.cmd({'cmds': [{'cmd': '_exit', 'prompt': ''}]})
self._proxy.join(10)
self.func_dict['close']()

def cmd(self, cmd, use_cache=True, cache=False, flush_cache=False):
timeout = 12000
Expand Down Expand Up @@ -281,6 +281,11 @@ def _open(self):
self._load_features()
self.load_system()

def _close(self):
if isinstance(self._proxy, Process) and self._proxy.is_alive():
self.cmd({'cmds': [{'cmd': '_exit', 'prompt': ''}]})
self._proxy.join(10)

def _mocked_open(self):
# use 2 because is a second level nesting (test_...(dut, log_level) - open() - _mocked_open())
frm = inspect.stack()[2] # pragma: no cover
Expand All @@ -304,6 +309,9 @@ def _mocked_open(self):
self._load_features() # pragma: no cover
self.load_system() # pragma: no cover

def _mocked_close(self):
pass # pragma: no cover

def _mock_load_features(self, os, feat):
if feat == 'device': # pragma: no cover
return {'system': os + '_system', 'features': {'file': os + '_file', 'vlan': os + '_vlan'}} # pragma: no cover
Expand Down
40 changes: 40 additions & 0 deletions pynetworking/facts/core_ats.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def core_ats(dev):
else:
return ret

ret['boot config'] = 'startup-config'
dev.sw_version = ret['version']
dev.boot_version = ret['boot version']

Expand Down Expand Up @@ -52,6 +53,15 @@ def core_ats(dev):
ret['model'] = 'not found'
dev.log_warn("cannot capture model")

m = re.search("\nUnit\s+Up\s+time\s*\n[^\n]+\n\s+\d\s+(?P<time>[^\s]+)\s*\n", out)
if m:
days = m.group('time').split(',')[0]
int_d = int(days)
hours = m.group('time').split(',')[1]
ret['sysuptime'] = '{0} days {1}'.format(int_d, hours)
else:
ret['sysuptime'] = 'not found'

m = re.search("\nUnit\s+Number:\s+(?P<unit_number>[^\s]+)\s*\n", out)
if m:
ret['unit_number'] = m.group('unit_number')
Expand All @@ -66,4 +76,34 @@ def core_ats(dev):
ret['serial_number'] = 'not found'
dev.log_warn("cannot capture serial number")

out = dev.cmd({'cmds': [{'cmd': 'show system unit 1', 'prompt': '\#'}]})
dev.log_debug("show system\n{0}".format(out))

# System Description: ATI AT-8000S
# System Up Time (days,hour:min:sec): 00,00:02:34
# System Contact:
# System Name: nac_dev
# System Location:
# System MAC Address: 00:15:77:30:36:00
# System Object ID: 1.3.6.1.4.1.207.1.4.126
# Serial number:
# Type: AT-8000S/24
#

m = re.search("\nSystem\s+Contact:\s+(?P<sys_contact>[^\s]*)\s*\n", out)
if m:
ret['system contact'] = m.group('sys_contact')
else:
ret['system contact'] = 'not found'
m = re.search("\nSystem\s+Name:\s+(?P<sys_name>[^\s]*)\s*\n", out)
if m:
ret['system name'] = m.group('sys_name')
else:
ret['system name'] = 'not found'
m = re.search("\nSystem\s+Location:\s+(?P<sys_loc>[^\s]*)\s*\n", out)
if m:
ret['system location'] = m.group('sys_loc')
else:
ret['system location'] = 'not found'

return ret
33 changes: 33 additions & 0 deletions pynetworking/facts/core_awp.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,45 @@ def core_awp(dev):
]}
out = dev.cmd(cmds)

# Hardware data
m = re.search('\s+Board\s+ID\s+Bay[\w\s\-]+Base\s+\d+\s+([\w\-\/]+)\s+([\w\-\/]+)\s+([\w\-\/]+)\s+', out)
if m:
ret['model'] = m.group(1)
ret['hardware_rev'] = m.group(2)
ret['serial_number'] = m.group(3)

# Boot loader version
m = re.search('\Bootloader\s+version\s+:\s+([\d+]).([\d+]).([\d+])', out)
if m:
ret['boot version'] = '{0}.{1}.{2}'.format(m.group(1), m.group(2), m.group(3))

# Current configuration file
m = re.search('Current\s+boot\s+config:\s+flash:([\w\-\/]+)', out)
if m:
ret['boot config'] = m.group(1)[1:] + '.cfg'

# Sys up time
m = re.search('Uptime\s+:\s(?P<time>[^\r]+)', out)
if m:
ret['sysuptime'] = m.group('time')

# System
m = re.search('System\s+Name\r\s+([^\r]+)', out)
if m:
ret['system name'] = m.group(1)
if ret['system name'] == 'System Contact':
ret['system name'] = ''
m = re.search('System\s+Contact\r\s+([^\r]+)', out)
if m:
ret['system contact'] = m.group(1)
if ret['system contact'] == 'System Location':
ret['system contact'] = ''
m = re.search('System\s+Location\r\s+([^\r]+)', out)
if m:
ret['system location'] = m.group(1)
else:
ret['system location'] = ''

# Release license
if (ret['version'] >= '5.4.4'):
cmds = {'cmds': [{'cmd': 'terminal length 0', 'prompt': '\>'},
Expand Down
20 changes: 0 additions & 20 deletions pynetworking/system/awp_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,23 +111,3 @@ def _check_running_software(self, release):
break

return is_running_software

def _get_boot_image(self):
boot_image = ''
string_to_be_found = 'Current boot image : flash:/'

# Boot configuration
# ----------------------------------------------------------------
# Current software : x210-5.4.4.rel
# Current boot image : flash:/x210-5.4.4.rel
# Backup boot image : flash:/x210-5.4.4.rel
# Default boot config: flash:/default.cfg
# Current boot config: flash:/my.cfg (file exists)
# Backup boot config: flash:/backup.cfg (file not found)
for line in self._device.cmd("show boot").split('\n'):
self._device.log_debug("read {0}".format(line))
if (line.find(string_to_be_found) == 0):
boot_image = line[len(string_to_be_found):-1]
break

return boot_image
26 changes: 10 additions & 16 deletions tests/test_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ def setup_dut(dut):
User Configured Territory: europe

System Name
awplus
System Contact

System Contact
[email protected]
System Location

here
"""]})
dut.add_cmd({'cmd': 'show version', 'state': 0, 'action': 'PRINT', 'args': ["""
AlliedWare Plus (TM) 5.4.2 09/25/13 12:57:26
Expand Down Expand Up @@ -141,12 +141,18 @@ def test_facts(dut, log_level, use_mock):
if dut.mode != 'emulated':
pytest.skip("only on emulated")
setup_dut(dut)
d = Device(host=dut.host, port=dut.port, protocol=dut.protocol, log_level=log_level, mock=use_mock)
d = Device(host=dut.host, port=dut.port, protocol=dut.protocol, log_level=log_level, mock='n')
d.open()
assert d.facts['build_date'] == 'Wed Sep 25 12:57:26 NZST 2013'
assert d.facts['build_name'] == 'x600-5.4.2-3.14.rel'
assert d.facts['build_type'] == 'RELEASE'
assert d.facts['version'] == '5.4.2'
assert d.facts['boot config'] == 'default.cfg'
assert d.facts['boot version'] == '1.1.0'
assert d.facts['sysuptime'] == '0 days 00:07:29'
assert d.facts['system name'] == ''
assert d.facts['system contact'] == '[email protected]'
assert d.facts['system location'] == 'here'
d.close()


Expand Down Expand Up @@ -399,9 +405,6 @@ def test_firmware_upgrade_543(dut, log_level, use_mock):
d.system.update_firmware('x210-5.4.3-2.7.rel', protocol='tftp')
assert 'protocol tftp not supported' in excinfo.value
d.system.update_firmware(release_file, dontwait=dut.dontwait)
if (dut.mode == 'emulated'):
# real devices will be rebooting here
assert(d.system._get_boot_image() == release_file)
d.close()

clean_test_firmware_upgrade(dut, release_file)
Expand Down Expand Up @@ -443,9 +446,6 @@ def test_full_path_firmware_upgrade(dut, log_level, use_mock):
d = Device(host=dut.host, port=dut.port, protocol=dut.protocol, log_level=log_level, mock=use_mock)
d.open()
d.system.update_firmware(image_name, dontwait=dut.dontwait)
if (dut.mode == 'emulated'):
# real devices will be rebooting here
assert(d.system._get_boot_image() == image_file)
d.close()

clean_test_firmware_upgrade(dut, image_name)
Expand Down Expand Up @@ -499,9 +499,6 @@ def test_firmware_upgrade_544(dut, log_level, use_mock):
d = Device(host=dut.host, port=dut.port, protocol=dut.protocol, log_level=log_level, mock='n')
d.open()
d.system.update_firmware(release_file, dontwait=dut.dontwait)
if (dut.mode == 'emulated'):
# real devices will be rebooting here
assert(d.system._get_boot_image() == release_file)
d.close()

clean_test_firmware_upgrade(dut, release_file)
Expand Down Expand Up @@ -619,9 +616,6 @@ def test_firmware_upgrade_544_licensed(dut, log_level, use_mock):
d = Device(host=dut.host, port=dut.port, protocol=dut.protocol, log_level=log_level, mock='n')
d.open()
d.system.update_firmware(release_file, dontwait=dut.dontwait)
if (dut.mode == 'emulated'):
# real devices will be rebooting here
assert(d.system._get_boot_image() == release_file)
d.close()

clean_test_firmware_upgrade(dut, release_file)
Expand Down
32 changes: 29 additions & 3 deletions tests/test_device_ats.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,23 @@ def tftp_server_for_ever(port, tftp_server_dir):


def setup_test_firmware_upgrade(dut, image_name):
dut.tftp_port = 69
if (dut.mode == 'emulated'):
# Wait an answer from the device in case of reboot during emulation.
dut.dontwait = False
# Only root users can access port 69, that is mandatory for TFTP upload.
# Normal users like Travis can rely on other ports but greater than 1024.
if (getpass.getuser() != 'root'):
if (getpass.getuser() == 'root'):
dut.tftp_port = 69
else:
dut.tftp_port = 20069
while dut.tftp_port < 65536:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = s.connect_ex(('127.0.0.1', dut.tftp_port))
s.close()
if (result > 0):
break
dut.tftp_port = dut.tftp_port + 1

# Create a dummy image file to be uploaded.
myfile = open(image_name, 'w')
myfile.write('1')
Expand All @@ -69,6 +78,7 @@ def setup_test_firmware_upgrade(dut, image_name):
# Don't wait an answer from the device in case of reboot.
dut.dontwait = True
# Only root users can access port 69, that is mandatory for TFTP upload.
dut.tftp_port = 69
assert 'root' == getpass.getuser()

client_dir = './tftp_client_dir'
Expand Down Expand Up @@ -100,7 +110,7 @@ def test_facts_1(dut, log_level, use_mock):
1


Unit Up time
Unit up time
---- ---------------
1 00,00:14:51

Expand All @@ -115,14 +125,30 @@ def test_facts_1(dut, log_level, use_mock):

"""

out_un = """
System Description: ATI AT-8000S
System Up Time (days,hour:min:sec): 00,00:02:34
System Contact: [email protected]
System Name: nac_dev
System Location: milan
System MAC Address: 00:15:77:30:36:00
System Object ID: 1.3.6.1.4.1.207.1.4.126
Serial number:
Type: AT-8000S/24
"""
setup_dut(dut)
dut.add_cmd({'cmd': 'show system', 'state': 0, 'action': 'PRINT', 'args': [out_sys]})
dut.add_cmd({'cmd': 'show version', 'state': 0, 'action': 'PRINT', 'args': [out_ver]})
dut.add_cmd({'cmd': 'show system unit 1', 'state': 0, 'action': 'PRINT', 'args': [out_un]})
d = Device(host=dut.host, port=dut.port, protocol=dut.protocol, log_level=log_level, mock='n')
d.open()
assert d.facts['model'] == 'not found'
assert d.facts['sysuptime'] == 'not found'
assert d.facts['unit_number'] == 'not found'
assert d.facts['serial_number'] == 'not found'
assert d.facts['system contact'] == '[email protected]'
assert d.facts['system name'] == 'nac_dev'
assert d.facts['system location'] == 'milan'
d.close()


Expand Down
18 changes: 14 additions & 4 deletions tests/test_file_ats.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,21 @@ def setup_dut(dut):
Unit Number: 1
Serial number:
"""]})
if (dut.mode != 'emulated'):
if (dut.mode == 'emulated'):
if (getpass.getuser() == 'root'):
dut.tftp_port = 69
else:
dut.tftp_port = 20069
while dut.tftp_port < 65536:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = s.connect_ex(('127.0.0.1', dut.tftp_port))
s.close()
if (result > 0):
break
dut.tftp_port = dut.tftp_port + 1
else:
dut.tftp_port = 69
assert 'root' == getpass.getuser()
dut.tftp_port = 69
if (getpass.getuser() != 'root'):
dut.tftp_port = 20069

client_dir = './tftp_client_dir'
server_dir = './tftp_server_dir'
Expand Down
18 changes: 14 additions & 4 deletions tests/test_license.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,21 @@ def setup_test_certificate(dut, cert_file, label, key, mac='', tftp_server=False
myfile.close()

if (tftp_server is True):
if (dut.mode != 'emulated'):
if (dut.mode == 'emulated'):
if (getpass.getuser() == 'root'):
dut.tftp_port = 69
else:
dut.tftp_port = 20069
while dut.tftp_port < 65536:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = s.connect_ex(('127.0.0.1', dut.tftp_port))
s.close()
if (result > 0):
break
dut.tftp_port = dut.tftp_port + 1
else:
dut.tftp_port = 69
assert 'root' == getpass.getuser()
dut.tftp_port = 69
if (getpass.getuser() != 'root'):
dut.tftp_port = 20069

client_dir = './tftp_client_dir'
server_dir = './tftp_server_dir'
Expand Down
Loading