Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,6 @@ dmypy.json

# Pyre type checker
.pyre/

# Pycharm project settings
.idea/
51 changes: 46 additions & 5 deletions doipclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,10 @@ class DoIPClient:
:param use_secure: Enables TLS. If set to True, a default SSL context is used. For more control, a preconfigured
SSL context can be passed directly. Untested. Should be combined with changing tcp_port to 3496.
:type use_secure: Union[bool,ssl.SSLContext]
:param log_level: Logging level
:type log_level: int
:param auto_reconnect_tcp: Attempt to automatically reconnect TCP sockets that were closed by peer
:type auto_reconnect_tcp: bool
:param vm_specific: Optional 4 byte long int
:type vm_specific: int, optional

:raises ConnectionRefusedError: If the activation request fails
:raises ValueError: If the IPAddress is neither an IPv4 nor an IPv6 address
Expand All @@ -166,6 +166,7 @@ def __init__(
client_ip_address=None,
use_secure=False,
auto_reconnect_tcp=False,
vm_specific=None
):
self._ecu_logical_address = ecu_logical_address
self._client_logical_address = client_logical_address
Expand All @@ -180,6 +181,7 @@ def __init__(
self._protocol_version = protocol_version
self._auto_reconnect_tcp = auto_reconnect_tcp
self._tcp_close_detected = False
self.vm_specific = vm_specific

# Check the ECU IP type to determine socket family
# Will raise ValueError if neither a valid IPv4, nor IPv6 address
Expand Down Expand Up @@ -587,16 +589,19 @@ def request_activation(
:type disable_retry: bool, optional
:return: The resulting activation response object
:rtype: RoutingActivationResponse
:raises ValueError: vm_specific is invalid or out of range
"""
message = RoutingActivationRequest(
self._client_logical_address, activation_type, vm_specific=vm_specific
self._client_logical_address,
activation_type,
vm_specific=self._validate_vm_specific_value(vm_specific) if vm_specific else self.vm_specific,
)
self.send_doip_message(message, disable_retry=disable_retry)
while True:
result = self.read_doip()
if type(result) == RoutingActivationResponse:
if isinstance(result, RoutingActivationResponse):
return result
elif result:
if result:
logger.warning(
"Received unexpected DoIP message type {}. Ignoring".format(
type(result)
Expand Down Expand Up @@ -824,3 +829,39 @@ def reconnect(self, close_delay=A_PROCESSING_TIME):
raise ConnectionRefusedError(
f"Activation Request failed with code {result.response_code}"
)

@staticmethod
def _validate_vm_specific_value(value):
"""Validate the VM specific value (must be > 0 and <= 0xffffffff) or None.
If the conditions are not fulfilled, raises an exception.

:param value: The value to check.
:type value: int, optional
:return: The input value if valid.
:rtype: int or None
:raises ValueError: If the value is invalid or out of range.
"""
if not isinstance(value, int) and value is not None:
raise ValueError("Invalid vm_specific type must be int or None")
if isinstance(value, int) and (value < 0 or value > 0xffffffff):
raise ValueError("Invalid vm_specific value must be > 0 and <= 0xffffffff")
return value

@property
def vm_specific(self):
"""Get the optional OEM specific field value if set.

:return: vm_specific value
:rtype: int, optional
"""
return self._vm_specific

@vm_specific.setter
def vm_specific(self, value):
"""Set the optional OEM specific field value. If you do not need to send this item, set it to None.

:param value: The vm_specific value (must be > 0 and <= 0xffffffff) or None
:type value: int, optional
:raises ValueError: Value is invalid or out of range
"""
self._vm_specific = self._validate_vm_specific_value(value)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

setuptools.setup(
name="doipclient",
version="1.1.6",
version="1.1.7.dev1",
description="A Diagnostic over IP (DoIP) client implementing ISO-13400-2.",
long_description=long_description,
long_description_content_type="text/x-rst",
Expand Down
57 changes: 57 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -750,3 +750,60 @@ def test_use_secure_with_external_ssl_context(mock_socket, mocker):

mocked_external_wrap_socket = mocked_external_context.wrap_socket
mocked_external_wrap_socket.assert_called_once_with(mock_socket)


@pytest.mark.parametrize(
"vm_specific, exc",
((None, False), (0, False), (-1, True), (0xFFFFFFFF, False), (0x100000000, True), ("0x1", True), (10.0, True)))
def test_vm_specific_setter(mock_socket, mocker, vm_specific, exc):
sut = DoIPClient(test_ip, test_logical_address, auto_reconnect_tcp=True)
if exc:
with pytest.raises(ValueError):
sut.vm_specific = vm_specific
else:
sut.vm_specific = vm_specific
assert sut.vm_specific == vm_specific


def test_vm_specific_static_value(mock_socket, mocker):
request_activation_spy = mocker.spy(DoIPClient, "request_activation")
mock_socket.rx_queue.append(successful_activation_response_with_vm)

sut = DoIPClient(
test_ip,
test_logical_address,
auto_reconnect_tcp=True,
activation_type=None,
vm_specific=0x01020304,
)
sut.request_activation(activation_type=RoutingActivationRequest.ActivationType.Default)
assert mock_socket.tx_queue[-1] == activation_request_with_vm
assert request_activation_spy.call_count == 1


def test_vm_specific_request_activation_bad_value(mock_socket, mocker):
request_activation_spy = mocker.spy(DoIPClient, "request_activation")
mock_socket.rx_queue.append(successful_activation_response_with_vm)

sut = DoIPClient(
test_ip,
test_logical_address,
auto_reconnect_tcp=True,
activation_type=None,
vm_specific=0x01020304,
)
with pytest.raises(ValueError):
sut.request_activation(activation_type=RoutingActivationRequest.ActivationType.Default, vm_specific=-1)


def test_vm_specific_verification_in_init(mock_socket, mocker):
request_activation_spy = mocker.spy(DoIPClient, "request_activation")
mock_socket.rx_queue.append(successful_activation_response_with_vm)
with pytest.raises(ValueError):
sut = DoIPClient(
test_ip,
test_logical_address,
auto_reconnect_tcp=True,
activation_type=None,
vm_specific=-1,
)
Loading