Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/sonic_ax_impl/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class SonicMIB(
cisco.ciscoPfcExtMIB.cpfcIfPriorityTable,
cisco.ciscoSwitchQosMIB.csqIfQosGroupStatsTable,
cisco.ciscoEntityFruControlMIB.cefcFruPowerStatusTable,
cisco.ciscoEntityFruControlFanTrayMIB.cefcFruFanTrayStatusTable,
):
"""
If SONiC was to create custom MIBEntries, they may be specified here.
Expand Down
1 change: 1 addition & 0 deletions src/sonic_ax_impl/mibs/vendor/cisco/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
from . import ciscoPfcExtMIB
from . import ciscoSwitchQosMIB
from . import ciscoEntityFruControlMIB
from . import ciscoEntityFruControlFanTrayMIB
105 changes: 105 additions & 0 deletions src/sonic_ax_impl/mibs/vendor/cisco/ciscoEntityFruControlFanTrayMIB.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from enum import Enum, unique
from sonic_ax_impl import mibs
from ax_interface import MIBMeta, ValueType, SubtreeMIBEntry
import sonic_ax_impl.mibs.ietf.physical_entity_sub_oid_generator as fru_oids
import re


PRESENCE_OK = 'true'
STATUS_OK = 'true'

@unique
class FanTrayInfoDB(str, Enum):
"""
FAN info keys
"""
PRESENCE = "presence"
STATUS = "status"

def get_fantray_data(fantray_info):
"""
:param chassis_info: chassis info dict
:return: tuple (psu_num) of chassis;
Empty string if field not in chassis_info
"""

return tuple(fantray_info.get(field.value, "") for field in FanTrayInfoDB)

class FanStatusHandler:
"""
Class to handle the SNMP request
"""
def __init__(self):
"""
init the handler
"""
self.statedb = mibs.init_db()
self.statedb.connect(self.statedb.STATE_DB)
self.init_fan_trays()

def init_fan_trays(self):
fan_trays = self.statedb.keys(self.statedb.STATE_DB,
'FAN_DRAWER_INFO' + mibs.TABLE_NAME_SEPARATOR_VBAR + '*')
if not fan_trays:
mibs.logger.debug('No fan trays found in {}'.format(fan_trays))
return None
fan_trays = sorted(fan_trays)
positions = [int(re.findall(r'\d+', s)[0]) for s in fan_trays]
oids = [fru_oids.get_fan_drawer_sub_id(pos) for pos in positions]
self.oids = oids
self.fan_trays = fan_trays

def get_next(self, sub_id):
"""
:param sub_id: The 1-based snmp sub-identifier query.
:return: the next sub id.
"""
if not sub_id:
self.init_fan_trays()
return (1, )

index = sub_id[0]
if index >= len(self.fan_trays):
return None

return (index + 1,)

def _get_fantray_status(self, oid):
"""
:return: oper status of requested sub_id according to cefcFanTrayOperStatus
1 - unknown
2 - ok
3 - down
4 - warning
:ref: https://mibbrowser.online/mibdb_search.php?mib=CISCO-ENTITY-FRU-CONTROL-MIB
"""
fantray_name = self.fan_trays[oid - 1]
fantray_info = self.statedb.get_all(self.statedb.STATE_DB, fantray_name)
presence, status = get_fantray_data(fantray_info)
mibs.logger.debug('Fantray {} name {} presence {} status {}'.format(oid, fantray_name, presence, status))
if presence.lower() == "true" and status.lower() == "true":
return 2
return 3

def get_fantray_status(self, sub_id):
"""
:param sub_id: The 1-based sub-identifier query. Only iterate the entity
: of type FAN
:return: oper status of requested sub_id according to cefcFanTrayOperStatus
1 - unknown
2 - ok
3 - down
4 - warning
:ref: https://mibbrowser.online/mibdb_search.php?mib=CISCO-ENTITY-FRU-CONTROL-MIB
"""
if not sub_id:
return None

return self._get_fantray_status(sub_id[0])

class cefcFruFanTrayStatusTable(metaclass=MIBMeta, prefix='.1.3.6.1.4.1.9.9.117.1.4.1'):
"""
'cefcFruFanStatusTable' http://oidref.com/.1.3.6.1.4.1.9.9.117.1.4.1
"""
handler = FanStatusHandler()
fan_status = SubtreeMIBEntry('1.1', handler, ValueType.INTEGER, handler.get_fantray_status)
22 changes: 22 additions & 0 deletions tests/mock_tables/state_db.json
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,32 @@
"position_in_parent": "1",
"parent_name": "PSU 2"
},
"FAN_DRAWER_INFO|drawer0": {
"model": "DRAWERMODEL",
"serial": "DRAWERSERIAL0",
"presence": "True",
"status": "True",
"is_replaceable": "True"
},
"FAN_DRAWER_INFO|drawer1": {
"model": "DRAWERMODEL",
"serial": "DRAWERSERIAL",
"presence": "True",
"status": "False",
"is_replaceable": "True"
},
"FAN_DRAWER_INFO|drawer2": {
"model": "DRAWERMODEL",
"serial": "DRAWERSERIAL2",
"presence": "True",
"status": "True",
"is_replaceable": "True"
},
"FAN_DRAWER_INFO|drawer3": {
"model": "DRAWERMODEL",
"serial": "DRAWERSERIAL3",
"presence": "False",
"status": "False",
"is_replaceable": "True"
},
"FAN_INFO|fan1": {
Expand Down
178 changes: 178 additions & 0 deletions tests/test_fantray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import os
import sys

# noinspection PyUnresolvedReferences
import tests.mock_tables.dbconnector

modules_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(modules_path, 'src'))

from unittest import TestCase

from ax_interface import ValueType
from ax_interface.pdu_implementations import GetPDU, GetNextPDU
from ax_interface.encodings import ObjectIdentifier
from ax_interface.constants import PduTypes
from ax_interface.pdu import PDU, PDUHeader
from ax_interface.mib import MIBTable
from sonic_ax_impl.mibs.vendor.cisco import ciscoEntityFruControlFanTrayMIB

class TestFanTrayStatus(TestCase):
@classmethod
def setUpClass(cls):
cls.lut = MIBTable(ciscoEntityFruControlFanTrayMIB.cefcFruFanTrayStatusTable)

def test_getNextFanTray0(self):
oid = ObjectIdentifier(2, 0, 0, 0, (1, 3, 6, 1, 4, 1, 9, 9, 117, 1, 4, 1, 1, 1))
expected_oid = ObjectIdentifier(2, 0, 0, 0, (1, 3, 6, 1, 4, 1, 9, 9, 117, 1, 4, 1, 1, 1, 1))
get_pdu = GetNextPDU(
header=PDUHeader(1, PduTypes.GET_NEXT, 16, 0, 42, 0, 0, 0),
oids=[oid]
)

encoded = get_pdu.encode()
response = get_pdu.make_response(self.lut)

value0 = response.values[0]
self.assertEqual(value0.type_, ValueType.INTEGER)
self.assertEqual(str(value0.name), str(expected_oid))
self.assertEqual(value0.data, 2)

def test_getFanTray0Status(self):
oid = ObjectIdentifier(2, 0, 0, 0, (1, 3, 6, 1, 4, 1, 9, 9, 117, 1, 4, 1, 1, 1, 1))
get_pdu = GetPDU(
header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0),
oids=[oid]
)

encoded = get_pdu.encode()
response = get_pdu.make_response(self.lut)

value0 = response.values[0]
self.assertEqual(value0.type_, ValueType.INTEGER)
self.assertEqual(str(value0.name), str(oid))
self.assertEqual(value0.data, 2)

def test_getNextFanTray1(self):
oid = ObjectIdentifier(2, 0, 0, 0, (1, 3, 6, 1, 4, 1, 9, 9, 117, 1, 4, 1, 1, 1, 1))
expected_oid = ObjectIdentifier(2, 0, 0, 0, (1, 3, 6, 1, 4, 1, 9, 9, 117, 1, 4, 1, 1, 1, 2))
get_pdu = GetNextPDU(
header=PDUHeader(1, PduTypes.GET_NEXT, 16, 0, 42, 0, 0, 0),
oids=[oid]
)

encoded = get_pdu.encode()
response = get_pdu.make_response(self.lut)

value0 = response.values[0]
self.assertEqual(value0.type_, ValueType.INTEGER)
self.assertEqual(str(value0.name), str(expected_oid))
self.assertEqual(value0.data, 3)

def test_getFanTray1Status(self):
oid = ObjectIdentifier(2, 0, 0, 0, (1, 3, 6, 1, 4, 1, 9, 9, 117, 1, 4, 1, 1, 1, 2))
get_pdu = GetPDU(
header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0),
oids=[oid]
)

encoded = get_pdu.encode()
response = get_pdu.make_response(self.lut)

value0 = response.values[0]
self.assertEqual(value0.type_, ValueType.INTEGER)
self.assertEqual(str(value0.name), str(oid))
self.assertEqual(value0.data, 3)

def test_getNextFanTray2(self):
oid = ObjectIdentifier(2, 0, 0, 0, (1, 3, 6, 1, 4, 1, 9, 9, 117, 1, 4, 1, 1, 1, 2))
expected_oid = ObjectIdentifier(2, 0, 0, 0, (1, 3, 6, 1, 4, 1, 9, 9, 117, 1, 4, 1, 1, 1, 3))
get_pdu = GetNextPDU(
header=PDUHeader(1, PduTypes.GET_NEXT, 16, 0, 42, 0, 0, 0),
oids=[oid]
)

encoded = get_pdu.encode()
response = get_pdu.make_response(self.lut)

value0 = response.values[0]
self.assertEqual(value0.type_, ValueType.INTEGER)
self.assertEqual(str(value0.name), str(expected_oid))
self.assertEqual(value0.data, 2)

def test_getFanTray2Status(self):
oid = ObjectIdentifier(2, 0, 0, 0, (1, 3, 6, 1, 4, 1, 9, 9, 117, 1, 4, 1, 1, 1, 3))
get_pdu = GetPDU(
header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0),
oids=[oid]
)

encoded = get_pdu.encode()
response = get_pdu.make_response(self.lut)

value0 = response.values[0]
self.assertEqual(value0.type_, ValueType.INTEGER)
self.assertEqual(str(value0.name), str(oid))
self.assertEqual(value0.data, 2)

def test_getNextFanTray3(self):
oid = ObjectIdentifier(2, 0, 0, 0, (1, 3, 6, 1, 4, 1, 9, 9, 117, 1, 4, 1, 1, 1, 3))
expected_oid = ObjectIdentifier(2, 0, 0, 0, (1, 3, 6, 1, 4, 1, 9, 9, 117, 1, 4, 1, 1, 1, 4))
get_pdu = GetNextPDU(
header=PDUHeader(1, PduTypes.GET_NEXT, 16, 0, 42, 0, 0, 0),
oids=[oid]
)

encoded = get_pdu.encode()
response = get_pdu.make_response(self.lut)

value0 = response.values[0]
self.assertEqual(value0.type_, ValueType.INTEGER)
self.assertEqual(str(value0.name), str(expected_oid))
self.assertEqual(value0.data, 3)

def test_getFanTray3Status(self):
oid = ObjectIdentifier(2, 0, 0, 0, (1, 3, 6, 1, 4, 1, 9, 9, 117, 1, 4, 1, 1, 1, 4))
get_pdu = GetPDU(
header=PDUHeader(1, PduTypes.GET, 16, 0, 42, 0, 0, 0),
oids=[oid]
)

encoded = get_pdu.encode()
response = get_pdu.make_response(self.lut)

value0 = response.values[0]
self.assertEqual(value0.type_, ValueType.INTEGER)
self.assertEqual(str(value0.name), str(oid))
self.assertEqual(value0.data, 3)

def test_getNextFanTray4(self):
oid = ObjectIdentifier(2, 0, 0, 0, (1, 3, 6, 1, 4, 1, 9, 9, 117, 1, 4, 1, 1, 1, 4))
get_pdu = GetNextPDU(
header=PDUHeader(1, PduTypes.GET_NEXT, 16, 0, 42, 0, 0, 0),
oids=[oid]
)

encoded = get_pdu.encode()
response = get_pdu.make_response(self.lut)

value0 = response.values[0]
self.assertEqual(value0.type_, ValueType.END_OF_MIB_VIEW)
self.assertEqual(str(value0.name), str(oid))
self.assertEqual(value0.data, None)

def test_getMissedFanTray(self):
oid = ObjectIdentifier(2, 0, 0, 0, (1, 3, 6, 1, 4, 1, 9, 9, 117, 1, 4, 1, 1, 1, 8))
expected_oid = None
get_pdu = GetNextPDU(
header=PDUHeader(1, PduTypes.GET_NEXT, 16, 0, 42, 0, 0, 0),
oids=[oid]
)

encoded = get_pdu.encode()
response = get_pdu.make_response(self.lut)

value0 = response.values[0]
self.assertEqual(value0.type_, ValueType.END_OF_MIB_VIEW)
self.assertEqual(str(value0.name), str(oid))
self.assertEqual(value0.data, None)
Loading