diff --git a/.gitignore b/.gitignore index db4561e..8eecbd4 100644 --- a/.gitignore +++ b/.gitignore @@ -52,3 +52,8 @@ docs/_build/ # PyBuilder target/ + +.idea +.venv +Pipfile +Pipfile.lock diff --git a/README.rst b/README.rst index 6440b48..5c6c673 100644 --- a/README.rst +++ b/README.rst @@ -1,29 +1,24 @@ -python-onvif +python-onvif-zeep ============ ONVIF Client Implementation in Python Dependencies ------------ -`Python 2.x` (For a Python 3 compatible fork, see https://github.com/FalkTannhaeuser/python-onvif-zeep) +`zeep `_ >= 3.0.0 -`suds `_ >= 0.4 - -`suds-passworddigest `_ - -Install python-onvif --------------------- +Install python-onvif-zeep +------------------------- **From Source** You should clone this repository and run setup.py:: - cd python-onvif && python setup.py install + cd python-onvif-zeep && python setup.py install -**From PyPI** +Alternatively, you can run:: -:: + pip install --upgrade onvif_zeep - pip install onvif Getting Started --------------- diff --git a/examples/AbsoluteMove.py b/examples/AbsoluteMove.py new file mode 100644 index 0000000..791071f --- /dev/null +++ b/examples/AbsoluteMove.py @@ -0,0 +1,270 @@ +import asyncio, sys, os +from onvif import ONVIFCamera +import time + +IP="192.168.1.64" # Camera IP address +PORT=80 # Port +USER="admin" # Username +PASS="intflow3121" # Password + + +XMAX = 1 +XMIN = -1 +XNOW = 0.5 +YMAX = 1 +YMIN = -1 +YNOW = 0.5 +Move = 0.1 +Velocity = 1 +Zoom = 0 +positionrequest = None +ptz = None +active = False +ptz_configuration_options = None +media_profile = None + +def do_move(ptz, request): + + global active + if active: + ptz.Stop({'ProfileToken': request.ProfileToken}) + active = True + ptz.AbsoluteMove(request) + +def move_up(ptz, request): + + if YNOW - Move <= -1: + request.Position.PanTilt.y = YNOW + else: + request.Position.PanTilt.y = YNOW - Move + + + do_move(ptz, request) + +def move_down(ptz, request): + + if YNOW + Move >= 1: + request.Position.PanTilt.y = YNOW + else: + request.Position.PanTilt.y = YNOW + Move + + + do_move(ptz, request) + +def move_right(ptz, request): + + if XNOW - Move >= -0.99: + request.Position.PanTilt.x = XNOW - Move + elif abs(XNOW + Move) >= 0.0: + request.Position.PanTilt.x = abs(XNOW) - Move + elif abs(XNOW) <= 0.01: + request.Position.PanTilt.x = XNOW + + request.Position.PanTilt.y = YNOW + do_move(ptz, request) + +def move_left(ptz, request): + + if XNOW + Move <= 1.0: + request.Position.PanTilt.x = XNOW + Move + elif XNOW <= 1.0 and XNOW > 0.99: + request.Position.PanTilt.x = -XNOW + elif XNOW < 0: + request.Position.PanTilt.x = XNOW + Move + elif XNOW <= -0.105556 and XNOW > -0.11: + request.Position.PanTilt.x = XNOW + + request.Position.PanTilt.y = YNOW + do_move(ptz, request) + + +def move_upleft(ptz, request): + + if YNOW == -1: + request.Position.PanTilt.y = YNOW + else: + request.Position.PanTilt.y = YNOW - Move + if XNOW + Move <= 1.0: + request.Position.PanTilt.x = XNOW + Move + elif XNOW <= 1.0 and XNOW > 0.99: + request.Position.PanTilt.x = -XNOW + elif XNOW < 0: + request.Position.PanTilt.x = XNOW + Move + elif XNOW <= -0.105556 and XNOW > -0.11: + request.Position.PanTilt.x = XNOW + do_move(ptz, request) + +def move_upright(ptz, request): + + if YNOW == -1: + request.Position.PanTilt.y = YNOW + else: + request.Position.PanTilt.y = YNOW - Move + + if XNOW - Move >= -0.99: + request.Position.PanTilt.x = XNOW - Move + elif abs(XNOW + Move) >= 0.0: + request.Position.PanTilt.x = abs(XNOW) - Move + elif abs(XNOW) <= 0.01: + request.Position.PanTilt.x = XNOW + + do_move(ptz, request) + +def move_downleft(ptz, request): + + if YNOW - Move == 1: + request.Position.PanTilt.y = YNOW + else: + request.Position.PanTilt.y = YNOW - Move + + if XNOW + Move <= 1.0: + request.Position.PanTilt.x = XNOW + Move + elif XNOW <= 1.0 and XNOW > 0.99: + request.Position.PanTilt.x = -XNOW + elif XNOW < 0: + request.Position.PanTilt.x = XNOW + Move + elif XNOW <= -0.105556 and XNOW > -0.11: + request.Position.PanTilt.x = XNOW + + do_move(ptz, request) + +def move_downright(ptz, request): + + if YNOW == -1: + request.Position.PanTilt.y = YNOW + else: + request.Position.PanTilt.y = YNOW - Move + + if XNOW - Move >= -0.99: + request.Position.PanTilt.x = XNOW - Move + elif abs(XNOW + Move) >= 0.0: + request.Position.PanTilt.x = abs(XNOW) - Move + elif abs(XNOW) <= 0.01: + request.Position.PanTilt.x = XNOW + + do_move(ptz, request) + +def Zoom_in(ptz,request): + + if Zoom + Move >= 1.0: + request.Position.Zoom = 1.0 + else: + request.Position.Zoom = Zoom + Move + do_move(ptz, request) + +def Zoom_out(ptz,request): + + if Zoom - Move <= 0.0: + request.Position.Zoom = 0.0 + else: + request.Position.Zoom = Zoom - Move + do_move(ptz,request) + +def setup_move(): + mycam = ONVIFCamera(IP, PORT, USER, PASS) + # Create media service object + media = mycam.create_media_service() + + # Create ptz service object + global ptz , ptz_configuration_options, media_profile + ptz = mycam.create_ptz_service() + + # Get target profile + media_profile = media.GetProfiles()[0] + + + request = ptz.create_type('GetConfigurationOptions') + request.ConfigurationToken = media_profile.PTZConfiguration.token + ptz_configuration_options = ptz.GetConfigurationOptions(request) + + request_configuration = ptz.create_type('GetConfiguration') + request_configuration.PTZConfigurationToken = media_profile.PTZConfiguration.token + ptz_configuration = ptz.GetConfiguration(request_configuration) + + request_setconfiguration = ptz.create_type('SetConfiguration') + request_setconfiguration.PTZConfiguration = ptz_configuration + + global positionrequest + + positionrequest = ptz.create_type('AbsoluteMove') + positionrequest.ProfileToken = media_profile.token + + if positionrequest.Position is None : + positionrequest.Position = ptz.GetStatus({'ProfileToken': media_profile.token}).Position + positionrequest.Position.PanTilt.space = ptz_configuration_options.Spaces.AbsolutePanTiltPositionSpace[0].URI + positionrequest.Position.Zoom.space = ptz_configuration_options.Spaces.AbsoluteZoomPositionSpace[0].URI + if positionrequest.Speed is None : + positionrequest.Speed = ptz.GetStatus({'ProfileToken': media_profile.token}).Position + positionrequest.Speed.PanTilt.space = ptz_configuration_options.Spaces.PanTiltSpeedSpace[0].URI + +def Get_Status(): + # Get range of pan and tilt + global XMAX, XMIN, YMAX, YMIN, XNOW, YNOW, Velocity, Zoom + XMAX = ptz_configuration_options.Spaces.AbsolutePanTiltPositionSpace[0].XRange.Max + XMIN = ptz_configuration_options.Spaces.AbsolutePanTiltPositionSpace[0].XRange.Min + YMAX = ptz_configuration_options.Spaces.AbsolutePanTiltPositionSpace[0].YRange.Max + YMIN = ptz_configuration_options.Spaces.AbsolutePanTiltPositionSpace[0].YRange.Min + XNOW = ptz.GetStatus({'ProfileToken': media_profile.token}).Position.PanTilt.x + YNOW = ptz.GetStatus({'ProfileToken': media_profile.token}).Position.PanTilt.y + Velocity = ptz_configuration_options.Spaces.PanTiltSpeedSpace[0].XRange.Max + Zoom = ptz.GetStatus({'ProfileToken': media_profile.token}).Position.Zoom.x + + +def readin(): + """Reading from stdin and displaying menu""" + global positionrequest, ptz + + selection = sys.stdin.readline().strip("\n") + lov=[ x for x in selection.split(" ") if x != ""] + if lov: + + if lov[0].lower() in ["u","up"]: + move_up(ptz,positionrequest) + elif lov[0].lower() in ["d","do","dow","down"]: + move_down(ptz,positionrequest) + elif lov[0].lower() in ["l","le","lef","left"]: + move_left(ptz,positionrequest) + elif lov[0].lower() in ["l","le","lef","left"]: + move_left(ptz,positionrequest) + elif lov[0].lower() in ["r","ri","rig","righ","right"]: + move_right(ptz,positionrequest) + elif lov[0].lower() in ["ul"]: + move_upleft(ptz,positionrequest) + elif lov[0].lower() in ["ur"]: + move_upright(ptz,positionrequest) + elif lov[0].lower() in ["dl"]: + move_downleft(ptz,positionrequest) + elif lov[0].lower() in ["dr"]: + move_downright(ptz,positionrequest) + elif lov[0].lower() in ["s","st","sto","stop"]: + ptz.Stop({'ProfileToken': positionrequest.ProfileToken}) + active = False + else: + print("What are you asking?\tI only know, 'up','down','left','right', 'ul' (up left), \n\t\t\t'ur' (up right), 'dl' (down left), 'dr' (down right) and 'stop'") + + print("") + print("Your command: ", end='',flush=True) + +# Test Define +# def move(ptz, request): + +# request.Position.PanTilt.y = -1 +# request.Position.PanTilt.x = 0 + +# do_move(ptz,request) + +if __name__ == '__main__': + + setup_move() + # Get_Status() + # Zoom_out(ptz,positionrequest) + # Get_Status() + # move(ptz,positionrequest) + while True: + if active == True: + time.sleep(1) + active = False + else: + Get_Status() + + move_up(ptz, positionrequest) \ No newline at end of file diff --git a/examples/continuous_move.py b/examples/continuous_move.py index 058a6f9..2e0217b 100644 --- a/examples/continuous_move.py +++ b/examples/continuous_move.py @@ -1,63 +1,102 @@ -from time import sleep - +import asyncio, sys, os from onvif import ONVIFCamera +IP="192.168.1.64" # Camera IP address +PORT=80 # Port +USER="admin" # Username +PASS="intflow3121" # Password + + XMAX = 1 XMIN = -1 YMAX = 1 YMIN = -1 +moverequest = None +ptz = None +active = False -def perform_move(ptz, request, timeout): +def do_move(ptz, request): # Start continuous move + global active + if active: + ptz.Stop({'ProfileToken': request.ProfileToken}) + active = True ptz.ContinuousMove(request) - # Wait a certain time - sleep(timeout) - # Stop continuous move - ptz.Stop({'ProfileToken': request.ProfileToken}) - -def move_up(ptz, request, timeout=1): - print 'move up...' - request.Velocity.PanTilt._x = 0 - request.Velocity.PanTilt._y = YMAX - perform_move(ptz, request, timeout) - -def move_down(ptz, request, timeout=1): - print 'move down...' - request.Velocity.PanTilt._x = 0 - request.Velocity.PanTilt._y = YMIN - perform_move(ptz, request, timeout) - -def move_right(ptz, request, timeout=1): - print 'move right...' - request.Velocity.PanTilt._x = XMAX - request.Velocity.PanTilt._y = 0 - perform_move(ptz, request, timeout) - -def move_left(ptz, request, timeout=1): - print 'move left...' - request.Velocity.PanTilt._x = XMIN - request.Velocity.PanTilt._y = 0 - perform_move(ptz, request, timeout) - -def continuous_move(): - mycam = ONVIFCamera('192.168.0.112', 80, 'admin', '12345') + +def move_up(ptz, request): + print ('move up...') + request.Velocity.PanTilt.x = 0 + request.Velocity.PanTilt.y = YMAX + do_move(ptz, request) + +def move_down(ptz, request): + print ('move down...') + request.Velocity.PanTilt.x = 0 + request.Velocity.PanTilt.y = YMIN + do_move(ptz, request) + +def move_right(ptz, request): + print ('move right...') + request.Velocity.PanTilt.x = XMAX + request.Velocity.PanTilt.y = 0 + do_move(ptz, request) + +def move_left(ptz, request): + print ('move left...') + request.Velocity.PanTilt.x = XMIN + request.Velocity.PanTilt.y = 0 + do_move(ptz, request) + + +def move_upleft(ptz, request): + print ('move up left...') + request.Velocity.PanTilt.x = XMIN + request.Velocity.PanTilt.y = YMAX + do_move(ptz, request) + +def move_upright(ptz, request): + print ('move up left...') + request.Velocity.PanTilt.x = XMAX + request.Velocity.PanTilt.y = YMAX + do_move(ptz, request) + +def move_downleft(ptz, request): + print ('move down left...') + request.Velocity.PanTilt.x = XMIN + request.Velocity.PanTilt.y = YMIN + do_move(ptz, request) + +def move_downright(ptz, request): + print ('move down left...') + request.Velocity.PanTilt.x = XMAX + request.Velocity.PanTilt.y = YMIN + do_move(ptz, request) + +def setup_move(): + mycam = ONVIFCamera(IP, PORT, USER, PASS) # Create media service object media = mycam.create_media_service() + # Create ptz service object + global ptz ptz = mycam.create_ptz_service() # Get target profile - media_profile = media.GetProfiles()[0]; + media_profile = media.GetProfiles()[0] # Get PTZ configuration options for getting continuous move range request = ptz.create_type('GetConfigurationOptions') - request.ConfigurationToken = media_profile.PTZConfiguration._token + request.ConfigurationToken = media_profile.PTZConfiguration.token ptz_configuration_options = ptz.GetConfigurationOptions(request) - request = ptz.create_type('ContinuousMove') - request.ProfileToken = media_profile._token + global moverequest + moverequest = ptz.create_type('ContinuousMove') + moverequest.ProfileToken = media_profile.token + if moverequest.Velocity is None: + moverequest.Velocity = ptz.GetStatus({'ProfileToken': media_profile.token}).Position + moverequest.Velocity.PanTilt.space = ptz_configuration_options.Spaces.ContinuousPanTiltVelocitySpace[0].URI + moverequest.Velocity.Zoom.space = ptz_configuration_options.Spaces.ContinuousZoomVelocitySpace[0].URI - ptz.Stop({'ProfileToken': media_profile._token}) # Get range of pan and tilt # NOTE: X and Y are velocity vector @@ -67,17 +106,59 @@ def continuous_move(): YMAX = ptz_configuration_options.Spaces.ContinuousPanTiltVelocitySpace[0].YRange.Max YMIN = ptz_configuration_options.Spaces.ContinuousPanTiltVelocitySpace[0].YRange.Min - # move right - move_right(ptz, request) - - # move left - move_left(ptz, request) - - # Move up - move_up(ptz, request) - - # move down - move_down(ptz, request) +def readin(): + """Reading from stdin and displaying menu""" + global moverequest, ptz + + selection = sys.stdin.readline().strip("\n") + lov=[ x for x in selection.split(" ") if x != ""] + if lov: + + if lov[0].lower() in ["u","up"]: + move_up(ptz,moverequest) + elif lov[0].lower() in ["d","do","dow","down"]: + move_down(ptz,moverequest) + elif lov[0].lower() in ["l","le","lef","left"]: + move_left(ptz,moverequest) + elif lov[0].lower() in ["l","le","lef","left"]: + move_left(ptz,moverequest) + elif lov[0].lower() in ["r","ri","rig","righ","right"]: + move_right(ptz,moverequest) + elif lov[0].lower() in ["ul"]: + move_upleft(ptz,moverequest) + elif lov[0].lower() in ["ur"]: + move_upright(ptz,moverequest) + elif lov[0].lower() in ["dl"]: + move_downleft(ptz,moverequest) + elif lov[0].lower() in ["dr"]: + move_downright(ptz,moverequest) + elif lov[0].lower() in ["s","st","sto","stop"]: + ptz.Stop({'ProfileToken': moverequest.ProfileToken}) + active = False + else: + print("What are you asking?\tI only know, 'up','down','left','right', 'ul' (up left), \n\t\t\t'ur' (up right), 'dl' (down left), 'dr' (down right) and 'stop'") + + print("") + print("Your command: ", end='',flush=True) + + if __name__ == '__main__': - continuous_move() + setup_move() + move_upleft(ptz,moverequest) + #if os.name == 'nt': + # loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows + # asyncio.set_event_loop(loop) + #else: + # loop = asyncio.get_event_loop() + + #try: + # loop.add_reader(sys.stdin,readin) + # print("Use Ctrl-C to quit") + # print("Your command: ", end='',flush=True) + # loop.run_forever() + #except: + # pass + #finally: + # loop.remove_reader(sys.stdin) + # loop.close() diff --git a/examples/events.py b/examples/events.py index 4ad93a8..ac6bf52 100644 --- a/examples/events.py +++ b/examples/events.py @@ -4,7 +4,7 @@ if __name__ == '__main__': - mycam = ONVIFCamera('192.168.1.10', 8899, 'admin', 'admin') #, no_cache=True) + mycam = ONVIFCamera('192.168.1.64', 80, 'admin', 'intflow3121') #, no_cache=True) event_service = mycam.create_events_service() print(event_service.GetEventProperties()) diff --git a/examples/streaming.py b/examples/streaming.py index 1f7caa0..7ab402b 100644 --- a/examples/streaming.py +++ b/examples/streaming.py @@ -9,7 +9,7 @@ def media_profile_configuration(): ''' # Create the media service - mycam = ONVIFCamera('192.168.0.112', 80, 'admin', '12345') + mycam = ONVIFCamera('192.168.1.64', 80, 'admin', 'intflow3121') media_service = mycam.create_media_service() profiles = media_service.GetProfiles() diff --git a/onvif/__init__.py b/onvif/__init__.py index aef09ea..de1775e 100644 --- a/onvif/__init__.py +++ b/onvif/__init__.py @@ -1,10 +1,10 @@ from onvif.client import ONVIFService, ONVIFCamera, SERVICES from onvif.exceptions import ONVIFError, ERR_ONVIF_UNKNOWN, \ ERR_ONVIF_PROTOCOL, ERR_ONVIF_WSDL, ERR_ONVIF_BUILD -from onvif import cli +#from onvif import cli __all__ = ( 'ONVIFService', 'ONVIFCamera', 'ONVIFError', 'ERR_ONVIF_UNKNOWN', 'ERR_ONVIF_PROTOCOL', 'ERR_ONVIF_WSDL', 'ERR_ONVIF_BUILD', - 'SERVICES', 'cli' + 'SERVICES'#, 'cli' ) diff --git a/onvif/cli.py b/onvif/cli.py old mode 100755 new mode 100644 index 549fdd1..046aa56 --- a/onvif/cli.py +++ b/onvif/cli.py @@ -1,16 +1,15 @@ #!/usr/bin/python '''ONVIF Client Command Line Interface''' - +from __future__ import print_function, division import re from cmd import Cmd from ast import literal_eval -from json import dumps -from argparse import ArgumentParser, ArgumentError, REMAINDER +from argparse import ArgumentParser, REMAINDER -from suds import MethodNotFound -from suds.sax.text import Text +from zeep.exceptions import LookupError as MethodNotFound +from zeep.xsd import String as Text from onvif import ONVIFCamera, ONVIFService, ONVIFError -from definition import SERVICES +from onvif.definition import SERVICES import os.path SUPPORTED_SERVICES = SERVICES.keys() @@ -21,10 +20,10 @@ def error(self, message): raise ValueError("%s\n%s" % (message, usage)) def success(message): - print 'True: ' + str(message) + print('True: ' + str(message)) def error(message): - print 'False: ' + str(message) + print('False: ' + str(message)) class ONVIFCLI(Cmd): prompt = 'ONVIF >>> ' @@ -151,7 +150,7 @@ def main(): try: args = parser.parse_args() except ValueError as err: - print str(err) + print(str(err)) return # Also need parse configuration file. diff --git a/onvif/client.py b/onvif/client.py index 0ed2d8c..ca3e564 100644 --- a/onvif/client.py +++ b/onvif/client.py @@ -1,27 +1,22 @@ +from __future__ import print_function, division __version__ = '0.0.1' - +import datetime as dt +import logging import os.path -import urlparse -import urllib from threading import Thread, RLock -import logging +from zeep.client import Client, CachingClient, Settings +from zeep.wsse.username import UsernameToken +import zeep.helpers + +from onvif.exceptions import ONVIFError +from onvif.definition import SERVICES + logger = logging.getLogger('onvif') logging.basicConfig(level=logging.INFO) -logging.getLogger('suds.client').setLevel(logging.CRITICAL) +logging.getLogger('zeep.client').setLevel(logging.CRITICAL) -import suds.sudsobject -from suds.client import Client -from suds.wsse import Security, UsernameToken -from suds.cache import ObjectCache, NoCache -from suds_passworddigest.token import UsernameDigestToken -from suds.bindings import binding -binding.envns = ('SOAP-ENV', 'http://www.w3.org/2003/05/soap-envelope') -from onvif.exceptions import ONVIFError -from definition import SERVICES, NSMAP -from suds.sax.date import UTC -import datetime as dt # Ensure methods to raise an ONVIFError Exception # when some thing was wrong def safe_func(func): @@ -33,28 +28,30 @@ def wrapped(*args, **kwargs): return wrapped -class UsernameDigestTokenDtDiff(UsernameDigestToken): - ''' +class UsernameDigestTokenDtDiff(UsernameToken): + """ UsernameDigestToken class, with a time offset parameter that can be adjusted; This allows authentication on cameras without being time synchronized. - Please note that using NTP on both end is the recommended solution, - this should only be used in "safe" environements. - ''' - def __init__(self, user, passw, dt_diff=None) : -# Old Style class ... sigh ... - UsernameDigestToken.__init__(self, user, passw) - self.dt_diff = dt_diff - - def setcreated(self, *args, **kwargs): - dt_adjusted = None - if self.dt_diff : - dt_adjusted = (self.dt_diff + dt.datetime.utcnow()) - UsernameToken.setcreated(self, dt=dt_adjusted, *args, **kwargs) - self.created = str(UTC(self.created)) + Please note that using NTP on both end is the recommended solution, + this should only be used in "safe" environments. + """ + def __init__(self, user, passw, dt_diff=None, **kwargs): + super().__init__(user, passw, **kwargs) + self.dt_diff = dt_diff # Date/time difference in datetime.timedelta + + def apply(self, envelope, headers): + old_created = self.created + if self.created is None: + self.created = dt.datetime.utcnow() + if self.dt_diff is not None: + self.created += self.dt_diff + result = super().apply(envelope, headers) + self.created = old_created + return result class ONVIFService(object): - ''' + """ Python Implemention for ONVIF Service. Services List: DeviceMgmt DeviceIO Event AnalyticsDevice Display Imaging Media @@ -82,75 +79,44 @@ class ONVIFService(object): params = device_service.create_type('SetHostname') params.Hostname = 'NewHostName' device_service.SetHostname(params) - ''' + """ @safe_func def __init__(self, xaddr, user, passwd, url, - cache_location='/tmp/suds', cache_duration=None, - encrypt=True, daemon=False, ws_client=None, no_cache=False, portType=None, dt_diff = None): - + encrypt=True, daemon=False, zeep_client=None, no_cache=False, + dt_diff=None, binding_name='', transport=None): if not os.path.isfile(url): raise ONVIFError('%s doesn`t exist!' % url) - if no_cache: - cache = NoCache() - else: - # Create cache object - # NOTE: if cache_location is specified, - # onvif must has the permission to access it. - cache = ObjectCache(location=cache_location) - # cache_duration: cache will expire in `cache_duration` days - if cache_duration is not None: - cache.setduration(days=cache_duration) - - - # Convert pathname to url - self.url = urlparse.urljoin('file:', urllib.pathname2url(url)) + self.url = url self.xaddr = xaddr + wsse = UsernameDigestTokenDtDiff(user, passwd, dt_diff=dt_diff, use_digest=encrypt) # Create soap client - if not ws_client: - self.ws_client = Client(url=self.url, - location=self.xaddr, - cache=cache, - port=portType, - headers={'Content-Type': 'application/soap+xml'}) + if not zeep_client: + ClientType = Client if no_cache else CachingClient + settings = Settings() + settings.strict = False + settings.xml_huge_tree = True + self.zeep_client = ClientType(wsdl=url, wsse=wsse, transport=transport, settings=settings) + self.zeep_client.set_ns_prefix('tds', 'http://www.onvif.org/ver10/device/wsdl') + self.zeep_client.set_ns_prefix('tev', 'http://www.onvif.org/ver10/events/wsdl') + self.zeep_client.set_ns_prefix('timg', 'http://www.onvif.org/ver20/imaging/wsdl') + self.zeep_client.set_ns_prefix('tmd', 'http://www.onvif.org/ver10/deviceIO/wsdl') + self.zeep_client.set_ns_prefix('tptz', 'http://www.onvif.org/ver20/ptz/wsdl') + self.zeep_client.set_ns_prefix('ttr', 'http://www.onvif.org/ver10/media/wsdl') + self.zeep_client.set_ns_prefix('ter', 'http://www.onvif.org/ver10/error') else: - self.ws_client = ws_client - self.ws_client.set_options(location=self.xaddr) + self.zeep_client = zeep_client + self.ws_client = self.zeep_client.create_service(binding_name, self.xaddr) # Set soap header for authentication self.user = user self.passwd = passwd # Indicate wether password digest is needed self.encrypt = encrypt - self.daemon = daemon - self.dt_diff = dt_diff - self.set_wsse() - - # Method to create type instance of service method defined in WSDL - self.create_type = self.ws_client.factory.create - - @safe_func - def set_wsse(self, user=None, passwd=None): - ''' Basic ws-security auth ''' - if user: - self.user = user - if passwd: - self.passwd = passwd - - security = Security() - - if self.encrypt: - token = UsernameDigestTokenDtDiff(self.user, self.passwd, dt_diff=self.dt_diff) - else: - token = UsernameToken(self.user, self.passwd) - token.setnonce() - token.setcreated() - - security.tokens.append(token) - self.ws_client.set_options(wsse=security) + self.create_type = lambda x: self.zeep_client.get_element('ns0:' + x)() @classmethod @safe_func @@ -161,16 +127,9 @@ def clone(cls, service, *args, **kwargs): @staticmethod @safe_func - def to_dict(sudsobject): + def to_dict(zeepobject): # Convert a WSDL Type instance into a dictionary - if sudsobject is None: - return { } - elif isinstance(sudsobject, list): - ret = [ ] - for item in sudsobject: - ret.append(Client.dict(item)) - return ret - return Client.dict(sudsobject) + return {} if zeepobject is None else zeep.helpers.serialize_object(zeepobject) def service_wrapper(self, func): @safe_func @@ -180,9 +139,12 @@ def call(params=None, callback=None): # print(params.__class__.__mro__) if params is None: params = {} - elif isinstance(params, suds.sudsobject.Object): + else: params = ONVIFService.to_dict(params) - ret = func(**params) + try: + ret = func(**params) + except TypeError: + ret = func(params) if callable(callback): callback(ret) return ret @@ -195,29 +157,29 @@ def call(params=None, callback=None): return call(params, callback) return wrapped - def __getattr__(self, name): - ''' + """ Call the real onvif Service operations, - See the offical wsdl definition for the + See the official wsdl definition for the APIs detail(API name, request parameters, response parameters, parameter types, etc...) - ''' - builtin = name.startswith('__') and name.endswith('__') + """ + builtin = name.startswith('__') and name.endswith('__') if builtin: return self.__dict__[name] else: - return self.service_wrapper(getattr(self.ws_client.service, name)) + return self.service_wrapper(getattr(self.ws_client, name)) + class ONVIFCamera(object): - ''' - Python Implemention ONVIF compliant device - This class integrates onvif services - + """ + Python Implementation of an ONVIF compliant device. + This class integrates ONVIF services + adjust_time parameter allows authentication on cameras without being time synchronized. - Please note that using NTP on both end is the recommended solution, - this should only be used in "safe" environements. - Also, this cannot be used on AXIS camera, as every request is authenticated, contrary to ONVIF standard + Please note that using NTP on both end is the recommended solution, + this should only be used in "safe" environments. + Also, this cannot be used on AXIS camera, as every request is authenticated, contrary to ONVIF standard >>> from onvif import ONVIFCamera >>> mycam = ONVIFCamera('192.168.0.112', 80, 'admin', '12345') @@ -225,33 +187,36 @@ class ONVIFCamera(object): >>> media_service = mycam.create_media_service() >>> ptz_service = mycam.create_ptz_service() # Get PTZ Configuration: - >>> mycam.ptz.GetConfiguration() - # Another way: >>> ptz_service.GetConfiguration() - ''' + """ # Class-level variables services_template = {'devicemgmt': None, 'ptz': None, 'media': None, - 'imaging': None, 'events': None, 'analytics': None } + 'imaging': None, 'events': None, 'analytics': None} use_services_template = {'devicemgmt': True, 'ptz': True, 'media': True, - 'imaging': True, 'events': True, 'analytics': True } - def __init__(self, host, port ,user, passwd, wsdl_dir=os.path.join(os.path.dirname(os.path.dirname(__file__)), "wsdl"), - cache_location=None, cache_duration=None, - encrypt=True, daemon=False, no_cache=False, adjust_time=False): + 'imaging': True, 'events': True, 'analytics': True} + + def __init__(self, host, port, user, passwd, + wsdl_dir=os.path.join(os.path.dirname(os.path.dirname(__file__)), + "wsdl"), + encrypt=True, daemon=False, no_cache=False, adjust_time=False, event_pullpoint=True, + transport=None): + os.environ.pop('http_proxy', None) + os.environ.pop('https_proxy', None) self.host = host self.port = int(port) self.user = user self.passwd = passwd self.wsdl_dir = wsdl_dir - self.cache_location = cache_location - self.cache_duration = cache_duration self.encrypt = encrypt self.daemon = daemon self.no_cache = no_cache self.adjust_time = adjust_time + self.event_pullpoint = event_pullpoint + self.transport = transport # Active service client container - self.services = { } + self.services = {} self.services_lock = RLock() # Set xaddrs @@ -262,31 +227,46 @@ def __init__(self, host, port ,user, passwd, wsdl_dir=os.path.join(os.path.dirna def update_xaddrs(self): # Establish devicemgmt service first self.dt_diff = None - self.devicemgmt = self.create_devicemgmt_service() - if self.adjust_time : + self.devicemgmt = self.create_devicemgmt_service() + if self.adjust_time: cdate = self.devicemgmt.GetSystemDateAndTime().UTCDateTime - cam_date = dt.datetime(cdate.Date.Year, cdate.Date.Month, cdate.Date.Day, cdate.Time.Hour, cdate.Time.Minute, cdate.Time.Second) + cam_date = dt.datetime(cdate.Date.Year, cdate.Date.Month, cdate.Date.Day, + cdate.Time.Hour, cdate.Time.Minute, cdate.Time.Second) self.dt_diff = cam_date - dt.datetime.utcnow() self.devicemgmt.dt_diff = self.dt_diff - self.devicemgmt.set_wsse() + self.devicemgmt = self.create_devicemgmt_service() # Get XAddr of services on the device - self.xaddrs = { } + self.xaddrs = {} capabilities = self.devicemgmt.GetCapabilities({'Category': 'All'}) - for name, capability in capabilities: + for name in capabilities: try: - if name.lower() in SERVICES: + retrived_address=capabilities[name].XAddr + right=retrived_address.split("//")[1] + retrived_url=right.split("/")[0] + ip_address=retrived_url.split(":")[0] + port_address = retrived_url.split(":")[1] + if (self.host != ip_address or self.port != port_address): + remaining=right.split("/")[1] + new_address="http://"+self.host+":"+str(self.port)+"/"+right.split("/")[1]+"/"+right.split("/")[2] + capabilities[name].XAddr=new_address + except: + pass + capability = capabilities[name] + try: + if name.lower() in SERVICES and capability is not None: ns = SERVICES[name.lower()]['ns'] self.xaddrs[ns] = capability['XAddr'] except Exception: - logger.exception('Unexcept service type') + logger.exception('Unexpected service type') with self.services_lock: try: self.event = self.create_events_service() - self.xaddrs['http://www.onvif.org/ver10/events/wsdl/PullPointSubscription'] = self.event.CreatePullPointSubscription().SubscriptionReference.Address - except: - pass - + if self.event_pullpoint: + self.xaddrs['http://www.onvif.org/ver10/events/wsdl/PullPointSubscription'] = \ + self.event.CreatePullPointSubscription().SubscriptionReference.Address._value_1 + except Exception: + pass def update_url(self, host=None, port=None): changed = False @@ -308,77 +288,68 @@ def update_url(self, host=None, port=None): xaddr = getattr(self.capabilities, sname.capitalize).XAddr self.services[sname].ws_client.set_options(location=xaddr) - def update_auth(self, user=None, passwd=None): - changed = False - if user and user != self.user: - changed = True - self.user = user - if passwd and passwd != self.passwd: - changed = True - self.passwd = passwd - - if not changed: - return - - with self.services_lock: - for service in self.services.keys(): - self.services[service].set_wsse(user, passwd) - def get_service(self, name, create=True): - service = None service = getattr(self, name.lower(), None) if not service and create: return getattr(self, 'create_%s_service' % name.lower())() return service - def get_definition(self, name): - '''Returns xaddr and wsdl of specified service''' + def get_definition(self, name, portType=None): + """Returns xaddr and wsdl of specified service""" # Check if the service is supported if name not in SERVICES: raise ONVIFError('Unknown service %s' % name) wsdl_file = SERVICES[name]['wsdl'] ns = SERVICES[name]['ns'] + binding_name = '{%s}%s' % (ns, SERVICES[name]['binding']) + + if portType: + ns += '/' + portType + wsdlpath = os.path.join(self.wsdl_dir, wsdl_file) if not os.path.isfile(wsdlpath): raise ONVIFError('No such file: %s' % wsdlpath) # XAddr for devicemgmt is fixed: if name == 'devicemgmt': - xaddr = 'http://%s:%s/onvif/device_service' % (self.host, self.port) - return xaddr, wsdlpath + xaddr = '%s:%s/onvif/device_service' % \ + (self.host if (self.host.startswith('http://') or self.host.startswith('https://')) + else 'http://%s' % self.host, self.port) + return xaddr, wsdlpath, binding_name # Get other XAddr xaddr = self.xaddrs.get(ns) if not xaddr: - raise ONVIFError('Device doesn`t support service: %s' % name) + raise ONVIFError("Device doesn't support service: %s" % name) + + return xaddr, wsdlpath, binding_name - return xaddr, wsdlpath + def create_onvif_service(self, name, portType=None, transport=None): + """ + Create ONVIF service client. - def create_onvif_service(self, name, from_template=True, portType=None): - '''Create ONVIF service client''' + :param name: service name, should be present as a key within + the `SERVICES` dictionary declared within the `onvif.definition` module + :param portType: + :param transport: + :return: + """ + """Create ONVIF service client""" name = name.lower() - xaddr, wsdl_file = self.get_definition(name) + xaddr, wsdl_file, binding_name = self.get_definition(name, portType) with self.services_lock: - svt = self.services_template.get(name) - # Has a template, clone from it. Faster. - if svt and from_template and self.use_services_template.get(name): - service = ONVIFService.clone(svt, xaddr, self.user, - self.passwd, wsdl_file, - self.cache_location, - self.cache_duration, - self.encrypt, - self.daemon, - no_cache=self.no_cache, portType=portType, dt_diff=self.dt_diff) - # No template, create new service from wsdl document. - # A little time-comsuming - else: - service = ONVIFService(xaddr, self.user, self.passwd, - wsdl_file, self.cache_location, - self.cache_duration, self.encrypt, - self.daemon, no_cache=self.no_cache, portType=portType, dt_diff=self.dt_diff) + if not transport: + transport = self.transport + + service = ONVIFService(xaddr, self.user, self.passwd, + wsdl_file, self.encrypt, + self.daemon, no_cache=self.no_cache, + dt_diff=self.dt_diff, + binding_name=binding_name, + transport=transport) self.services[name] = service @@ -388,39 +359,47 @@ def create_onvif_service(self, name, from_template=True, portType=None): return service - def create_devicemgmt_service(self, from_template=True): + def create_devicemgmt_service(self, transport=None): # The entry point for devicemgmt service is fixed. - return self.create_onvif_service('devicemgmt', from_template) + return self.create_onvif_service('devicemgmt', transport=transport) + + def create_media_service(self, transport=None): + return self.create_onvif_service('media', transport=transport) + + def create_ptz_service(self, transport=None): + return self.create_onvif_service('ptz', transport=transport) - def create_media_service(self, from_template=True): - return self.create_onvif_service('media', from_template) + def create_imaging_service(self, transport=None): + return self.create_onvif_service('imaging', transport=transport) - def create_ptz_service(self, from_template=True): - return self.create_onvif_service('ptz', from_template) + def create_deviceio_service(self, transport=None): + return self.create_onvif_service('deviceio', transport=transport) - def create_imaging_service(self, from_template=True): - return self.create_onvif_service('imaging', from_template) + def create_events_service(self, transport=None): + return self.create_onvif_service('events', transport=transport) - def create_deviceio_service(self, from_template=True): - return self.create_onvif_service('deviceio', from_template) + def create_analytics_service(self, transport=None): + return self.create_onvif_service('analytics', transport=transport) - def create_events_service(self, from_template=True): - return self.create_onvif_service('events', from_template) + def create_recording_service(self, transport=None): + return self.create_onvif_service('recording', transport=transport) - def create_analytics_service(self, from_template=True): - return self.create_onvif_service('analytics', from_template) + def create_search_service(self, transport=None): + return self.create_onvif_service('search', transport=transport) - def create_recording_service(self, from_template=True): - return self.create_onvif_service('recording', from_template) + def create_replay_service(self, transport=None): + return self.create_onvif_service('replay', transport=transport) - def create_search_service(self, from_template=True): - return self.create_onvif_service('search', from_template) + def create_pullpoint_service(self, transport=None): + return self.create_onvif_service('pullpoint', + portType='PullPointSubscription', + transport=transport) - def create_replay_service(self, from_template=True): - return self.create_onvif_service('replay', from_template) + def create_receiver_service(self, transport=None): + return self.create_onvif_service('receiver', transport=transport) - def create_pullpoint_service(self, from_template=True): - return self.create_onvif_service('pullpoint', from_template, portType='PullPointSubscription') + def create_notification_service(self, transport=None): + return self.create_onvif_service('notification', transport=transport) - def create_receiver_service(self, from_template=True): - return self.create_onvif_service('receiver', from_template) + def create_subscription_service(self, transport=None): + return self.create_onvif_service('subscription', transport=transport) diff --git a/onvif/definition.py b/onvif/definition.py index a7ae4d8..24acbd3 100644 --- a/onvif/definition.py +++ b/onvif/definition.py @@ -1,20 +1,22 @@ SERVICES = { - # Name namespace wsdl file - 'devicemgmt': {'ns': 'http://www.onvif.org/ver10/device/wsdl', 'wsdl': 'devicemgmt.wsdl'}, - 'media' : {'ns': 'http://www.onvif.org/ver10/media/wsdl', 'wsdl': 'media.wsdl' }, - 'ptz' : {'ns': 'http://www.onvif.org/ver20/ptz/wsdl', 'wsdl': 'ptz.wsdl'}, - 'imaging' : {'ns': 'http://www.onvif.org/ver20/imaging/wsdl', 'wsdl': 'imaging.wsdl'}, - 'deviceio' : {'ns': 'http://www.onvif.org/ver10/deviceIO/wsdl', 'wsdl': 'deviceio.wsdl'}, - 'events' : {'ns': 'http://www.onvif.org/ver10/events/wsdl', 'wsdl': 'events.wsdl'}, - 'pullpoint' : {'ns': 'http://www.onvif.org/ver10/events/wsdl/PullPointSubscription', 'wsdl': 'events.wsdl'}, - 'analytics' : {'ns': 'http://www.onvif.org/ver20/analytics/wsdl', 'wsdl': 'analytics.wsdl'}, - 'recording' : {'ns': 'http://www.onvif.org/ver10/recording/wsdl', 'wsdl': 'recording.wsdl'}, - 'search' : {'ns': 'http://www.onvif.org/ver10/search/wsdl', 'wsdl': 'search.wsdl'}, - 'replay' : {'ns': 'http://www.onvif.org/ver10/replay/wsdl', 'wsdl': 'replay.wsdl'}, - 'receiver' : {'ns': 'http://www.onvif.org/ver10/receiver/wsdl', 'wsdl': 'receiver.wsdl'}, + # Name namespace wsdl file binding name + 'devicemgmt' : {'ns': 'http://www.onvif.org/ver10/device/wsdl', 'wsdl': 'devicemgmt.wsdl', 'binding' : 'DeviceBinding'}, + 'media' : {'ns': 'http://www.onvif.org/ver10/media/wsdl', 'wsdl': 'media.wsdl', 'binding' : 'MediaBinding'}, + 'ptz' : {'ns': 'http://www.onvif.org/ver20/ptz/wsdl', 'wsdl': 'ptz.wsdl', 'binding' : 'PTZBinding'}, + 'imaging' : {'ns': 'http://www.onvif.org/ver20/imaging/wsdl', 'wsdl': 'imaging.wsdl', 'binding' : 'ImagingBinding'}, + 'deviceio' : {'ns': 'http://www.onvif.org/ver10/deviceIO/wsdl', 'wsdl': 'deviceio.wsdl', 'binding' : 'DeviceIOBinding'}, + 'events' : {'ns': 'http://www.onvif.org/ver10/events/wsdl', 'wsdl': 'events.wsdl', 'binding' : 'EventBinding'}, + 'pullpoint' : {'ns': 'http://www.onvif.org/ver10/events/wsdl', 'wsdl': 'events.wsdl', 'binding' : 'PullPointSubscriptionBinding'}, + 'notification' : {'ns': 'http://www.onvif.org/ver10/events/wsdl', 'wsdl': 'events.wsdl', 'binding' : 'NotificationProducerBinding'}, + 'subscription' : {'ns': 'http://www.onvif.org/ver10/events/wsdl', 'wsdl': 'events.wsdl', 'binding' : 'SubscriptionManagerBinding'}, + 'analytics' : {'ns': 'http://www.onvif.org/ver20/analytics/wsdl', 'wsdl': 'analytics.wsdl', 'binding' : 'AnalyticsEngineBinding'}, + 'recording' : {'ns': 'http://www.onvif.org/ver10/recording/wsdl', 'wsdl': 'recording.wsdl', 'binding' : 'RecordingBinding'}, + 'search' : {'ns': 'http://www.onvif.org/ver10/search/wsdl', 'wsdl': 'search.wsdl', 'binding' : 'SearchBinding'}, + 'replay' : {'ns': 'http://www.onvif.org/ver10/replay/wsdl', 'wsdl': 'replay.wsdl', 'binding' : 'ReplayBinding'}, + 'receiver' : {'ns': 'http://www.onvif.org/ver10/receiver/wsdl', 'wsdl': 'receiver.wsdl', 'binding' : 'ReceiverBinding'}, } - -NSMAP = { } -for name, item in SERVICES.items(): - NSMAP[item['ns']] = name +# +#NSMAP = { } +#for name, item in SERVICES.items(): +# NSMAP[item['ns']] = name diff --git a/onvif/exceptions.py b/onvif/exceptions.py index e56399f..5e7b82b 100644 --- a/onvif/exceptions.py +++ b/onvif/exceptions.py @@ -1,15 +1,12 @@ ''' Core exceptions raised by the ONVIF Client ''' -from suds import WebFault, MethodNotFound, PortNotFound, \ - ServiceNotFound, TypeNotFound, BuildError - -# Some version dont have this -try: - from suds import SoapHeadersNotPermitted - with_soap_exc = True -except ImportError: - with_soap_exc = False - +#from suds import WebFault, MethodNotFound, PortNotFound, \ +# ServiceNotFound, TypeNotFound, BuildError, \ +# SoapHeadersNotPermitted +#TODO: Translate these errors into ONVIFError instances, mimicking the original 'suds' behaviour +#from zeep.exceptions import XMLSyntaxError, XMLParseError, UnexpectedElementError, \ +# WsdlSyntaxError, TransportError, LookupError, NamespaceError, Fault, ValidationError, \ +# SignatureVerificationFailed, IncompleteMessage, IncompleteOperation # Error codes setting # Error unknown, e.g, HTTP errors ERR_ONVIF_UNKNOWN = 1 @@ -24,18 +21,18 @@ class ONVIFError(Exception): def __init__(self, err): - if isinstance(err, (WebFault, SoapHeadersNotPermitted) if with_soap_exc else WebFault): - self.reason = err.fault.Reason.Text - self.fault = err.fault - self.code = ERR_ONVIF_PROTOCOL - elif isinstance(err, (ServiceNotFound, PortNotFound, - MethodNotFound, TypeNotFound)): - self.reason = str(err) - self.code = ERR_ONVIF_PROTOCOL - elif isinstance(err, BuildError): - self.reason = str(err) - self.code = ERR_ONVIF_BUILD - else: +# if isinstance(err, (WebFault, SoapHeadersNotPermitted) if with_soap_exc else WebFault): +# self.reason = err.fault.Reason.Text +# self.fault = err.fault +# self.code = ERR_ONVIF_PROTOCOL +# elif isinstance(err, (ServiceNotFound, PortNotFound, +# MethodNotFound, TypeNotFound)): +# self.reason = str(err) +# self.code = ERR_ONVIF_PROTOCOL +# elif isinstance(err, BuildError): +# self.reason = str(err) +# self.code = ERR_ONVIF_BUILD +# else: self.reason = 'Unknown error: ' + str(err) self.code = ERR_ONVIF_UNKNOWN diff --git a/onvif/version.txt b/onvif/version.txt index 0ea3a94..f2722b1 100644 --- a/onvif/version.txt +++ b/onvif/version.txt @@ -1 +1 @@ -0.2.0 +0.2.12 diff --git a/setup.py b/setup.py index a2b1598..6d69919 100644 --- a/setup.py +++ b/setup.py @@ -1,11 +1,16 @@ +"""Setup script for the onvif_zeep package.""" import os +import sysconfig +import shutil from setuptools import setup, find_packages +from setuptools.command.install import install + here = os.path.abspath(os.path.dirname(__file__)) version_path = os.path.join(here, 'onvif/version.txt') version = open(version_path).read().strip() -requires = [ 'suds >= 0.4', 'suds-passworddigest' ] +requires = ['zeep >= 3.0.0'] CLASSIFIERS = [ 'Development Status :: 3 - Alpha', @@ -22,32 +27,49 @@ 'Topic :: Utilities', "Programming Language :: Python", "Programming Language :: Python :: 2", - "Programming Language :: Python :: 2.6", "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.5", ] -wsdl_files = [ 'wsdl/' + item for item in os.listdir('wsdl') ] -setup( - name='onvif', - version=version, - description='Python Client for ONVIF Camera', - long_description=open('README.rst', 'r').read(), - author='Cherish Chen', - author_email='sinchb128@gmail.com', - maintainer='sinchb', - maintainer_email='sinchb128@gmail.com', - license='MIT', - keywords=['ONVIF', 'Camera', 'IPC'], - url='http://github.com/quatanium/python-onvif', - zip_safe=False, - packages=find_packages(exclude=['docs', 'examples', 'tests']), - install_requires=requires, - include_package_data=True, - data_files=[('wsdl', wsdl_files)], - entry_points={ - 'console_scripts': ['onvif-cli = onvif.cli:main'] - } - ) +class CustomInstallCommand(install): + """Custom install command to handle WSDL files.""" + def run(self): + # Run regular installation first + install.run(self) + + # Now manually copy the wsdl files to site-packages/wsdl + wsdl_src_dir = 'wsdl' + wsdl_dst_dir = os.path.join(sysconfig.get_paths()['purelib'], 'wsdl') + os.makedirs(wsdl_dst_dir, exist_ok=True) + for file in os.listdir(wsdl_src_dir): + src_path = os.path.join(wsdl_src_dir, file) + dst_path = os.path.join(wsdl_dst_dir, file) + shutil.copyfile(src_path, dst_path) + + +setup( + name='onvif_zeep', + version=version, + description='Python Client for ONVIF Camera', + long_description=open('README.rst', 'r').read(), + author='Cherish Chen', + author_email='sinchb128@gmail.com', + maintainer='sinchb', + maintainer_email='sinchb128@gmail.com', + license='MIT', + keywords=['ONVIF', 'Camera', 'IPC'], + url='http://github.com/quatanium/python-onvif', + zip_safe=False, + packages=find_packages(exclude=['docs', 'examples', 'tests']), + install_requires=requires, + entry_points={ + 'console_scripts': ['onvif-cli = onvif.cli:main'] + }, + cmdclass={ + 'install': CustomInstallCommand, + }, +) diff --git a/tests/test.py b/tests/test.py index 7d46f60..38f2b5a 100644 --- a/tests/test.py +++ b/tests/test.py @@ -1,22 +1,22 @@ #!/usr/bin/python -#-*-coding=utf-8 - +# -*-coding=utf-8 +from __future__ import print_function, division import unittest -from suds import WebFault - from onvif import ONVIFCamera, ONVIFError -CAM_HOST = '192.168.0.112' -CAM_PORT = 80 -CAM_USER = 'admin' -CAM_PASS = '12345' +CAM_HOST = '192.168.1.64' +CAM_PORT = 80 +CAM_USER = 'admin' +CAM_PASS = 'intflow3121' DEBUG = False + def log(ret): if DEBUG: - print ret + print(ret) + class TestDevice(unittest.TestCase): @@ -25,93 +25,93 @@ class TestDevice(unittest.TestCase): # ***************** Test Capabilities *************************** def test_GetWsdlUrl(self): - ret = self.cam.devicemgmt.GetWsdlUrl() + self.cam.devicemgmt.GetWsdlUrl() def test_GetServices(self): - ''' - Returns a cllection of the devices + """ + Returns a collection of the devices services and possibly their available capabilities - ''' - params = {'IncludeCapability': True } - ret = self.cam.devicemgmt.GetServices(params) + """ + params = {'IncludeCapability': True} + self.cam.devicemgmt.GetServices(params) params = self.cam.devicemgmt.create_type('GetServices') - params.IncludeCapability=False - ret = self.cam.devicemgmt.GetServices(params) + params.IncludeCapability = False + self.cam.devicemgmt.GetServices(params) def test_GetServiceCapabilities(self): - '''Returns the capabilities of the devce service.''' - ret = self.cam.devicemgmt.GetServiceCapabilities() - ret.Network._IPFilter + """Returns the capabilities of the device service.""" + self.cam.devicemgmt.GetServiceCapabilities() def test_GetCapabilities(self): - ''' - Probides a backward compatible interface for the base capabilities. - ''' - categorys = ['PTZ', 'Media', 'Imaging', - 'Device', 'Analytics', 'Events'] - ret = self.cam.devicemgmt.GetCapabilities() - for category in categorys: - ret = self.cam.devicemgmt.GetCapabilities({'Category': category}) + """ + Provides a backward compatible interface for the base capabilities. + """ + categories = ['PTZ', 'Media', 'Imaging', + 'Device', 'Analytics', 'Events'] + self.cam.devicemgmt.GetCapabilities() + for category in categories: + self.cam.devicemgmt.GetCapabilities({'Category': category}) with self.assertRaises(ONVIFError): self.cam.devicemgmt.GetCapabilities({'Category': 'unknown'}) # *************** Test Network ********************************* def test_GetHostname(self): - ''' Get the hostname from a device ''' + """ Get the hostname from a device """ self.cam.devicemgmt.GetHostname() def test_SetHostname(self): - ''' + """ Set the hostname on a device - A device shall accept strings formated according to + A device shall accept strings formatted according to RFC 1123 section 2.1 or alternatively to RFC 952, other string shall be considered as invalid strings - ''' + """ pre_host_name = self.cam.devicemgmt.GetHostname() - self.cam.devicemgmt.SetHostname({'Name':'testHostName'}) + self.cam.devicemgmt.SetHostname({'Name': 'testHostName'}) self.assertEqual(self.cam.devicemgmt.GetHostname().Name, 'testHostName') - - self.cam.devicemgmt.SetHostname(pre_host_name) + self.cam.devicemgmt.SetHostname({'Name': pre_host_name.Name}) def test_SetHostnameFromDHCP(self): - ''' Controls whether the hostname shall be retrieved from DHCP ''' + """ Controls whether the hostname shall be retrieved from DHCP """ ret = self.cam.devicemgmt.SetHostnameFromDHCP(dict(FromDHCP=False)) self.assertTrue(isinstance(ret, bool)) def test_GetDNS(self): - ''' Gets the DNS setting from a device ''' + """ Gets the DNS setting from a device """ ret = self.cam.devicemgmt.GetDNS() self.assertTrue(hasattr(ret, 'FromDHCP')) - if ret.FromDHCP == False: + if not ret.FromDHCP and len(ret.DNSManual) > 0: log(ret.DNSManual[0].Type) log(ret.DNSManual[0].IPv4Address) def test_SetDNS(self): - ''' Set the DNS settings on a device ''' - ret = self.cam.devicemgmt.SetDNS(dict(FromDHCP=False)) + """ Set the DNS settings on a device """ + self.cam.devicemgmt.SetDNS(dict(FromDHCP=False)) def test_GetNTP(self): - ''' Get the NTP settings from a device ''' + """ Get the NTP settings from a device """ ret = self.cam.devicemgmt.GetNTP() - if ret.FromDHCP == False: + if not ret.FromDHCP: self.assertTrue(hasattr(ret, 'NTPManual')) log(ret.NTPManual) def test_SetNTP(self): - '''Set the NTP setting''' - ret = self.cam.devicemgmt.SetNTP(dict(FromDHCP=False)) + """Set the NTP setting""" + self.cam.devicemgmt.SetNTP(dict(FromDHCP=False)) def test_GetDynamicDNS(self): - '''Get the dynamic DNS setting''' + """Get the dynamic DNS setting""" ret = self.cam.devicemgmt.GetDynamicDNS() log(ret) def test_SetDynamicDNS(self): - ''' Set the dynamic DNS settings on a device ''' - ret = self.cam.devicemgmt.GetDynamicDNS() - ret = self.cam.devicemgmt.SetDynamicDNS(dict(Type=ret.Type, Name="random")) + """ Set the dynamic DNS settings on a device """ + self.cam.devicemgmt.GetDynamicDNS() + self.cam.devicemgmt.SetDynamicDNS({'Type': 'NoUpdate', 'Name': None, + 'TTL': None}) + if __name__ == '__main__': unittest.main() diff --git a/wsdl/b-2.xsd b/wsdl/b-2.xsd index 1e0d47e..997bb8f 100644 --- a/wsdl/b-2.xsd +++ b/wsdl/b-2.xsd @@ -47,11 +47,12 @@ This document and the information contained herein is provided on an "AS IS" bas - - - - - + + + + + +