From 24c9ac1c175c42f0fc78c5b61df8b6ab1ab5e6b0 Mon Sep 17 00:00:00 2001 From: Herwin Grobben Date: Fri, 12 Nov 2021 11:42:40 +0100 Subject: [PATCH 1/8] Busy with block in server, upload block now working, but w.i.p. --- canopen/sdo/client.py | 1 + canopen/sdo/constants.py | 32 +++++-- canopen/sdo/exceptions.py | 27 ++++-- canopen/sdo/server.py | 186 +++++++++++++++++++++++++++++++++++++- 4 files changed, 227 insertions(+), 19 deletions(-) diff --git a/canopen/sdo/client.py b/canopen/sdo/client.py index e4c50aa8..e4c3aca9 100644 --- a/canopen/sdo/client.py +++ b/canopen/sdo/client.py @@ -528,6 +528,7 @@ def read(self, size=-1): if seqno == self._ackseq + 1: self._ackseq = seqno else: + logger.debug('Wrong seqno') # Wrong sequence number response = self._retransmit() res_command, = struct.unpack_from("B", response) diff --git a/canopen/sdo/constants.py b/canopen/sdo/constants.py index e8a07359..d591b3de 100644 --- a/canopen/sdo/constants.py +++ b/canopen/sdo/constants.py @@ -2,7 +2,11 @@ # Command, index, subindex SDO_STRUCT = struct.Struct(" %X', self.state, new_state) + if new_state >= self.state: + self.state = new_state + else: + raise SdoBlockException(0x08000022) + + def get_upload_blocks(self): + msgs = [] + + # seq no 1 - 127, not 0 -.. + for seqno in range(1,self.req_blocksize+1): + logger.debug('SEQNO %d', seqno) + response = bytearray(8) + command = 0 + if self.size <= (self.data_uploaded + 7): + # no more segments after this + command |= NO_MORE_BLOCKS + + command |= seqno + response[0] = command + for i in range(7): + databyte = self.get_data_byte() + if databyte != None: + response[i+1] = databyte + else: + self.last_bytes = 7 - i + break + msgs.append(response) + self.last_seqno = seqno + + if self.size == self.data_uploaded: + break + logger.debug(msgs) + return msgs + + def get_data_byte(self): + if self.data_uploaded < self.size: + self.data_uploaded += 1 + return self.data[self.data_uploaded-1] + return None + From 5b5088276140f21b8bb8e376c912762d1a34a3ea Mon Sep 17 00:00:00 2001 From: Herwin Grobben Date: Tue, 23 Nov 2021 18:35:12 +0100 Subject: [PATCH 2/8] add Abort handling during block upload --- canopen/sdo/constants.py | 3 +- canopen/sdo/server.py | 17 +- test/test_sdo.py | 442 ++++++++++++++++++++------------------- 3 files changed, 234 insertions(+), 228 deletions(-) diff --git a/canopen/sdo/constants.py b/canopen/sdo/constants.py index d591b3de..ec5c7bda 100644 --- a/canopen/sdo/constants.py +++ b/canopen/sdo/constants.py @@ -2,9 +2,10 @@ # Command, index, subindex SDO_STRUCT = struct.Struct(" Date: Tue, 13 Jun 2023 10:08:43 +0200 Subject: [PATCH 3/8] print statement adjusted --- test/test_sdo.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/test_sdo.py b/test/test_sdo.py index c2700d65..5c04c06e 100644 --- a/test/test_sdo.py +++ b/test/test_sdo.py @@ -26,11 +26,12 @@ def _send_message(self, can_id, data, remote=False): """ next_data = self.data.pop(0) self.assertEqual(next_data[0], TX, "No transmission was expected") - # print(f"> {binascii.hexlify(data)} ({binascii.hexlify(next_data[1])})") + # print("> %s:%s" % (hex(can_id), + # binascii.hexlify(data))) self.assertSequenceEqual(data, next_data[1]) self.assertEqual(can_id, 0x602) while self.data and self.data[0][0] == RX: - # print(f"< {binascii.hexlify(self.data[0][1])}") + # print(" < 0x%03X:%s" % (0x580 + RX, binascii.hexlify(self.data[0][1]))) self.network.notify(0x582, self.data.pop(0)[1], 0.0) def setUp(self): From 6d3ed09de43181880cf25c6aee09a5ea3666dabe Mon Sep 17 00:00:00 2001 From: Herwin Grobben Date: Mon, 17 Feb 2025 16:37:54 +0100 Subject: [PATCH 4/8] formatter from VSCode does something wrong.. --- test/test_sdo.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/test/test_sdo.py b/test/test_sdo.py index 5c04c06e..cf9103d3 100644 --- a/test/test_sdo.py +++ b/test/test_sdo.py @@ -114,11 +114,7 @@ def test_block_download(self): (RX, b"\xa1\x00\x00\x00\x00\x00\x00\x00"), ] data = b"A really really long string..." - with ( - self.network[2] - .sdo["Writable string"] - .open("wb", size=len(data), block_transfer=True) as fp - ): + with self.network[2].sdo["Writable string"].open("wb", size=len(data), block_transfer=True) as fp: fp.write(data) def test_block_upload(self): From 58778b49d5da574722f1ea56157d11fb6918d3ae Mon Sep 17 00:00:00 2001 From: Herwin Grobben Date: Mon, 17 Feb 2025 16:40:12 +0100 Subject: [PATCH 5/8] Fixed formatter from VSCode. --- test/test_sdo.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/test/test_sdo.py b/test/test_sdo.py index cf9103d3..224f603c 100644 --- a/test/test_sdo.py +++ b/test/test_sdo.py @@ -114,7 +114,11 @@ def test_block_download(self): (RX, b"\xa1\x00\x00\x00\x00\x00\x00\x00"), ] data = b"A really really long string..." - with self.network[2].sdo["Writable string"].open("wb", size=len(data), block_transfer=True) as fp: + with ( + self.network[2] + .sdo["Writable string"] + .open("wb", size=len(data), block_transfer=True) + ) as fp: fp.write(data) def test_block_upload(self): From 9908fa7aef618e40b0edfdac640f3d8459478ad0 Mon Sep 17 00:00:00 2001 From: Herwin Grobben Date: Thu, 22 May 2025 10:33:20 +0200 Subject: [PATCH 6/8] revert black formatting in test_sdo --- test/test_sdo.py | 438 +++++++++++++++++++++++------------------------ 1 file changed, 214 insertions(+), 224 deletions(-) diff --git a/test/test_sdo.py b/test/test_sdo.py index 01afed6e..d5b8f05f 100644 --- a/test/test_sdo.py +++ b/test/test_sdo.py @@ -1,11 +1,10 @@ import unittest -import binascii import canopen import canopen.objectdictionary.datatypes as dt from canopen.objectdictionary import ODVariable -from .util import DATATYPES_EDS, SAMPLE_EDS +from util import DATATYPES_EDS, SAMPLE_EDS TX = 1 @@ -67,7 +66,6 @@ def _send_message(self, can_id, data, remote=False): self.assertSequenceEqual(data, next_data[1]) self.assertEqual(can_id, 0x602) while self.data and self.data[0][0] == RX: - # print(" < 0x%03X:%s" % (0x580 + RX, binascii.hexlify(self.data[0][1]))) self.network.notify(0x582, self.data.pop(0)[1], 0.0) self.message_sent = True @@ -84,25 +82,25 @@ def setUp(self): def test_expedited_upload(self): self.data = [ - (TX, b"\x40\x18\x10\x01\x00\x00\x00\x00"), - (RX, b"\x43\x18\x10\x01\x04\x00\x00\x00"), + (TX, b'\x40\x18\x10\x01\x00\x00\x00\x00'), + (RX, b'\x43\x18\x10\x01\x04\x00\x00\x00') ] vendor_id = self.network[2].sdo[0x1018][1].raw self.assertEqual(vendor_id, 4) # UNSIGNED8 without padded data part (see issue #5) self.data = [ - (TX, b"\x40\x00\x14\x02\x00\x00\x00\x00"), - (RX, b"\x4f\x00\x14\x02\xfe"), + (TX, b'\x40\x00\x14\x02\x00\x00\x00\x00'), + (RX, b'\x4f\x00\x14\x02\xfe') ] - trans_type = self.network[2].sdo[0x1400]["Transmission type RPDO 1"].raw + trans_type = self.network[2].sdo[0x1400]['Transmission type RPDO 1'].raw self.assertEqual(trans_type, 254) self.assertTrue(self.message_sent) def test_size_not_specified(self): self.data = [ - (TX, b"\x40\x00\x14\x02\x00\x00\x00\x00"), - (RX, b"\x42\x00\x14\x02\xfe\x00\x00\x00"), + (TX, b'\x40\x00\x14\x02\x00\x00\x00\x00'), + (RX, b'\x42\x00\x14\x02\xfe\x00\x00\x00') ] # Make sure the size of the data is 1 byte data = self.network[2].sdo.upload(0x1400, 2) @@ -111,58 +109,55 @@ def test_size_not_specified(self): def test_expedited_download(self): self.data = [ - (TX, b"\x2b\x17\x10\x00\xa0\x0f\x00\x00"), - (RX, b"\x60\x17\x10\x00\x00\x00\x00\x00"), + (TX, b'\x2b\x17\x10\x00\xa0\x0f\x00\x00'), + (RX, b'\x60\x17\x10\x00\x00\x00\x00\x00') ] self.network[2].sdo[0x1017].raw = 4000 self.assertTrue(self.message_sent) def test_segmented_upload(self): self.data = [ - (TX, b"\x40\x08\x10\x00\x00\x00\x00\x00"), - (RX, b"\x41\x08\x10\x00\x1a\x00\x00\x00"), - (TX, b"\x60\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x00\x54\x69\x6e\x79\x20\x4e\x6f"), - (TX, b"\x70\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x10\x64\x65\x20\x2d\x20\x4d\x65"), - (TX, b"\x60\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x00\x67\x61\x20\x44\x6f\x6d\x61"), - (TX, b"\x70\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x15\x69\x6e\x73\x20\x21\x00\x00"), + (TX, b'\x40\x08\x10\x00\x00\x00\x00\x00'), + (RX, b'\x41\x08\x10\x00\x1A\x00\x00\x00'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\x54\x69\x6E\x79\x20\x4E\x6F'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x10\x64\x65\x20\x2D\x20\x4D\x65'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\x67\x61\x20\x44\x6F\x6D\x61'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x15\x69\x6E\x73\x20\x21\x00\x00') ] device_name = self.network[2].sdo[0x1008].raw self.assertEqual(device_name, "Tiny Node - Mega Domains !") def test_segmented_download(self): self.data = [ - (TX, b"\x21\x00\x20\x00\x0d\x00\x00\x00"), - (RX, b"\x60\x00\x20\x00\x00\x00\x00\x00"), - (TX, b"\x00\x41\x20\x6c\x6f\x6e\x67\x20"), - (RX, b"\x20\x00\x20\x00\x00\x00\x00\x00"), - (TX, b"\x13\x73\x74\x72\x69\x6e\x67\x00"), - (RX, b"\x30\x00\x20\x00\x00\x00\x00\x00"), + (TX, b'\x21\x00\x20\x00\x0d\x00\x00\x00'), + (RX, b'\x60\x00\x20\x00\x00\x00\x00\x00'), + (TX, b'\x00\x41\x20\x6c\x6f\x6e\x67\x20'), + (RX, b'\x20\x00\x20\x00\x00\x00\x00\x00'), + (TX, b'\x13\x73\x74\x72\x69\x6e\x67\x00'), + (RX, b'\x30\x00\x20\x00\x00\x00\x00\x00') ] - self.network[2].sdo["Writable string"].raw = "A long string" + self.network[2].sdo['Writable string'].raw = 'A long string' def test_block_download(self): self.data = [ - (TX, b"\xc6\x00\x20\x00\x1e\x00\x00\x00"), - (RX, b"\xa4\x00\x20\x00\x7f\x00\x00\x00"), - (TX, b"\x01\x41\x20\x72\x65\x61\x6c\x6c"), - (TX, b"\x02\x79\x20\x72\x65\x61\x6c\x6c"), - (TX, b"\x03\x79\x20\x6c\x6f\x6e\x67\x20"), - (TX, b"\x04\x73\x74\x72\x69\x6e\x67\x2e"), - (TX, b"\x85\x2e\x2e\x00\x00\x00\x00\x00"), - (RX, b"\xa2\x05\x7f\x00\x00\x00\x00\x00"), - (TX, b"\xd5\x45\x69\x00\x00\x00\x00\x00"), - (RX, b"\xa1\x00\x00\x00\x00\x00\x00\x00"), - ] - data = b"A really really long string..." - with ( - self.network[2] - .sdo["Writable string"] - .open("wb", size=len(data), block_transfer=True) - ) as fp: + (TX, b'\xc6\x00\x20\x00\x1e\x00\x00\x00'), + (RX, b'\xa4\x00\x20\x00\x7f\x00\x00\x00'), + (TX, b'\x01\x41\x20\x72\x65\x61\x6c\x6c'), + (TX, b'\x02\x79\x20\x72\x65\x61\x6c\x6c'), + (TX, b'\x03\x79\x20\x6c\x6f\x6e\x67\x20'), + (TX, b'\x04\x73\x74\x72\x69\x6e\x67\x2e'), + (TX, b'\x85\x2e\x2e\x00\x00\x00\x00\x00'), + (RX, b'\xa2\x05\x7f\x00\x00\x00\x00\x00'), + (TX, b'\xd5\x45\x69\x00\x00\x00\x00\x00'), + (RX, b'\xa1\x00\x00\x00\x00\x00\x00\x00') + ] + data = b'A really really long string...' + with self.network[2].sdo['Writable string'].open( + 'wb', size=len(data), block_transfer=True) as fp: fp.write(data) def test_segmented_download_zero_length(self): @@ -177,44 +172,44 @@ def test_segmented_download_zero_length(self): def test_block_upload(self): self.data = [ - (TX, b"\xa4\x08\x10\x00\x7f\x00\x00\x00"), - (RX, b"\xc6\x08\x10\x00\x1a\x00\x00\x00"), - (TX, b"\xa3\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x01\x54\x69\x6e\x79\x20\x4e\x6f"), - (RX, b"\x02\x64\x65\x20\x2d\x20\x4d\x65"), - (RX, b"\x03\x67\x61\x20\x44\x6f\x6d\x61"), - (RX, b"\x84\x69\x6e\x73\x20\x21\x00\x00"), - (TX, b"\xa2\x04\x7f\x00\x00\x00\x00\x00"), - (RX, b"\xc9\x40\xe1\x00\x00\x00\x00\x00"), - (TX, b"\xa1\x00\x00\x00\x00\x00\x00\x00"), - ] - with self.network[2].sdo[0x1008].open("r", block_transfer=True) as fp: + (TX, b'\xa4\x08\x10\x00\x7f\x00\x00\x00'), + (RX, b'\xc6\x08\x10\x00\x1a\x00\x00\x00'), + (TX, b'\xa3\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x01\x54\x69\x6e\x79\x20\x4e\x6f'), + (RX, b'\x02\x64\x65\x20\x2d\x20\x4d\x65'), + (RX, b'\x03\x67\x61\x20\x44\x6f\x6d\x61'), + (RX, b'\x84\x69\x6e\x73\x20\x21\x00\x00'), + (TX, b'\xa2\x04\x7f\x00\x00\x00\x00\x00'), + (RX, b'\xc9\x40\xe1\x00\x00\x00\x00\x00'), + (TX, b'\xa1\x00\x00\x00\x00\x00\x00\x00') + ] + with self.network[2].sdo[0x1008].open('r', block_transfer=True) as fp: data = fp.read() - self.assertEqual(data, "Tiny Node - Mega Domains !") + self.assertEqual(data, 'Tiny Node - Mega Domains !') def test_writable_file(self): self.data = [ - (TX, b"\x20\x00\x20\x00\x00\x00\x00\x00"), - (RX, b"\x60\x00\x20\x00\x00\x00\x00\x00"), - (TX, b"\x00\x31\x32\x33\x34\x35\x36\x37"), - (RX, b"\x20\x00\x20\x00\x00\x00\x00\x00"), - (TX, b"\x1a\x38\x39\x00\x00\x00\x00\x00"), - (RX, b"\x30\x00\x20\x00\x00\x00\x00\x00"), - (TX, b"\x0f\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x20\x00\x20\x00\x00\x00\x00\x00"), - ] - with self.network[2].sdo["Writable string"].open("wb") as fp: - fp.write(b"1234") - fp.write(b"56789") + (TX, b'\x20\x00\x20\x00\x00\x00\x00\x00'), + (RX, b'\x60\x00\x20\x00\x00\x00\x00\x00'), + (TX, b'\x00\x31\x32\x33\x34\x35\x36\x37'), + (RX, b'\x20\x00\x20\x00\x00\x00\x00\x00'), + (TX, b'\x1a\x38\x39\x00\x00\x00\x00\x00'), + (RX, b'\x30\x00\x20\x00\x00\x00\x00\x00'), + (TX, b'\x0f\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x20\x00\x20\x00\x00\x00\x00\x00') + ] + with self.network[2].sdo['Writable string'].open('wb') as fp: + fp.write(b'1234') + fp.write(b'56789') self.assertTrue(fp.closed) # Write on closed file with self.assertRaises(ValueError): - fp.write(b"123") + fp.write(b'123') def test_abort(self): self.data = [ - (TX, b"\x40\x18\x10\x01\x00\x00\x00\x00"), - (RX, b"\x80\x18\x10\x01\x11\x00\x09\x06"), + (TX, b'\x40\x18\x10\x01\x00\x00\x00\x00'), + (RX, b'\x80\x18\x10\x01\x11\x00\x09\x06') ] with self.assertRaises(canopen.SdoAbortedError) as cm: _ = self.network[2].sdo[0x1018][1].raw @@ -252,267 +247,265 @@ def setUp(self): def test_boolean(self): self.data = [ - (TX, b"\x40\x01\x20\x00\x00\x00\x00\x00"), - (RX, b"\x4f\x01\x20\x00\xfe\xfd\xfc\xfb"), + (TX, b'\x40\x01\x20\x00\x00\x00\x00\x00'), + (RX, b'\x4f\x01\x20\x00\xfe\xfd\xfc\xfb') ] data = self.network[2].sdo.upload(0x2000 + dt.BOOLEAN, 0) - self.assertEqual(data, b"\xfe") + self.assertEqual(data, b'\xfe') def test_unsigned8(self): self.data = [ - (TX, b"\x40\x05\x20\x00\x00\x00\x00\x00"), - (RX, b"\x4f\x05\x20\x00\xfe\xfd\xfc\xfb"), + (TX, b'\x40\x05\x20\x00\x00\x00\x00\x00'), + (RX, b'\x4f\x05\x20\x00\xfe\xfd\xfc\xfb') ] data = self.network[2].sdo.upload(0x2000 + dt.UNSIGNED8, 0) - self.assertEqual(data, b"\xfe") + self.assertEqual(data, b'\xfe') def test_unsigned16(self): self.data = [ - (TX, b"\x40\x06\x20\x00\x00\x00\x00\x00"), - (RX, b"\x4b\x06\x20\x00\xfe\xfd\xfc\xfb"), + (TX, b'\x40\x06\x20\x00\x00\x00\x00\x00'), + (RX, b'\x4b\x06\x20\x00\xfe\xfd\xfc\xfb') ] data = self.network[2].sdo.upload(0x2000 + dt.UNSIGNED16, 0) - self.assertEqual(data, b"\xfe\xfd") + self.assertEqual(data, b'\xfe\xfd') def test_unsigned24(self): self.data = [ - (TX, b"\x40\x16\x20\x00\x00\x00\x00\x00"), - (RX, b"\x47\x16\x20\x00\xfe\xfd\xfc\xfb"), + (TX, b'\x40\x16\x20\x00\x00\x00\x00\x00'), + (RX, b'\x47\x16\x20\x00\xfe\xfd\xfc\xfb') ] data = self.network[2].sdo.upload(0x2000 + dt.UNSIGNED24, 0) - self.assertEqual(data, b"\xfe\xfd\xfc") + self.assertEqual(data, b'\xfe\xfd\xfc') def test_unsigned32(self): self.data = [ - (TX, b"\x40\x07\x20\x00\x00\x00\x00\x00"), - (RX, b"\x43\x07\x20\x00\xfe\xfd\xfc\xfb"), + (TX, b'\x40\x07\x20\x00\x00\x00\x00\x00'), + (RX, b'\x43\x07\x20\x00\xfe\xfd\xfc\xfb') ] data = self.network[2].sdo.upload(0x2000 + dt.UNSIGNED32, 0) - self.assertEqual(data, b"\xfe\xfd\xfc\xfb") + self.assertEqual(data, b'\xfe\xfd\xfc\xfb') def test_unsigned40(self): self.data = [ - (TX, b"\x40\x18\x20\x00\x00\x00\x00\x00"), - (RX, b"\x41\x18\x20\x00\xfe\xfd\xfc\xfb"), - (TX, b"\x60\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x05\xb2\x01\x20\x02\x91\x12\x03"), + (TX, b'\x40\x18\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x18\x20\x00\xfe\xfd\xfc\xfb'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x05\xb2\x01\x20\x02\x91\x12\x03'), ] data = self.network[2].sdo.upload(0x2000 + dt.UNSIGNED40, 0) - self.assertEqual(data, b"\xb2\x01\x20\x02\x91") + self.assertEqual(data, b'\xb2\x01\x20\x02\x91') def test_unsigned48(self): self.data = [ - (TX, b"\x40\x19\x20\x00\x00\x00\x00\x00"), - (RX, b"\x41\x19\x20\x00\xfe\xfd\xfc\xfb"), - (TX, b"\x60\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x03\xb2\x01\x20\x02\x91\x12\x03"), + (TX, b'\x40\x19\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x19\x20\x00\xfe\xfd\xfc\xfb'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x03\xb2\x01\x20\x02\x91\x12\x03'), ] data = self.network[2].sdo.upload(0x2000 + dt.UNSIGNED48, 0) - self.assertEqual(data, b"\xb2\x01\x20\x02\x91\x12") + self.assertEqual(data, b'\xb2\x01\x20\x02\x91\x12') def test_unsigned56(self): self.data = [ - (TX, b"\x40\x1a\x20\x00\x00\x00\x00\x00"), - (RX, b"\x41\x1a\x20\x00\xfe\xfd\xfc\xfb"), - (TX, b"\x60\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x01\xb2\x01\x20\x02\x91\x12\x03"), + (TX, b'\x40\x1a\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x1a\x20\x00\xfe\xfd\xfc\xfb'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x01\xb2\x01\x20\x02\x91\x12\x03'), ] data = self.network[2].sdo.upload(0x2000 + dt.UNSIGNED56, 0) - self.assertEqual(data, b"\xb2\x01\x20\x02\x91\x12\x03") + self.assertEqual(data, b'\xb2\x01\x20\x02\x91\x12\x03') def test_unsigned64(self): self.data = [ - (TX, b"\x40\x1b\x20\x00\x00\x00\x00\x00"), - (RX, b"\x41\x1b\x20\x00\xfe\xfd\xfc\xfb"), - (TX, b"\x60\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x00\xb2\x01\x20\x02\x91\x12\x03"), - (TX, b"\x70\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x1d\x19\x21\x70\xfe\xfd\xfc\xfb"), + (TX, b'\x40\x1b\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x1b\x20\x00\xfe\xfd\xfc\xfb'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\xb2\x01\x20\x02\x91\x12\x03'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x1d\x19\x21\x70\xfe\xfd\xfc\xfb'), ] data = self.network[2].sdo.upload(0x2000 + dt.UNSIGNED64, 0) - self.assertEqual(data, b"\xb2\x01\x20\x02\x91\x12\x03\x19") + self.assertEqual(data, b'\xb2\x01\x20\x02\x91\x12\x03\x19') def test_integer8(self): self.data = [ - (TX, b"\x40\x02\x20\x00\x00\x00\x00\x00"), - (RX, b"\x4f\x02\x20\x00\xfe\xfd\xfc\xfb"), + (TX, b'\x40\x02\x20\x00\x00\x00\x00\x00'), + (RX, b'\x4f\x02\x20\x00\xfe\xfd\xfc\xfb') ] data = self.network[2].sdo.upload(0x2000 + dt.INTEGER8, 0) - self.assertEqual(data, b"\xfe") + self.assertEqual(data, b'\xfe') def test_integer16(self): self.data = [ - (TX, b"\x40\x03\x20\x00\x00\x00\x00\x00"), - (RX, b"\x4b\x03\x20\x00\xfe\xfd\xfc\xfb"), + (TX, b'\x40\x03\x20\x00\x00\x00\x00\x00'), + (RX, b'\x4b\x03\x20\x00\xfe\xfd\xfc\xfb') ] data = self.network[2].sdo.upload(0x2000 + dt.INTEGER16, 0) - self.assertEqual(data, b"\xfe\xfd") + self.assertEqual(data, b'\xfe\xfd') def test_integer24(self): self.data = [ - (TX, b"\x40\x10\x20\x00\x00\x00\x00\x00"), - (RX, b"\x47\x10\x20\x00\xfe\xfd\xfc\xfb"), + (TX, b'\x40\x10\x20\x00\x00\x00\x00\x00'), + (RX, b'\x47\x10\x20\x00\xfe\xfd\xfc\xfb') ] data = self.network[2].sdo.upload(0x2000 + dt.INTEGER24, 0) - self.assertEqual(data, b"\xfe\xfd\xfc") + self.assertEqual(data, b'\xfe\xfd\xfc') def test_integer32(self): self.data = [ - (TX, b"\x40\x04\x20\x00\x00\x00\x00\x00"), - (RX, b"\x43\x04\x20\x00\xfe\xfd\xfc\xfb"), + (TX, b'\x40\x04\x20\x00\x00\x00\x00\x00'), + (RX, b'\x43\x04\x20\x00\xfe\xfd\xfc\xfb') ] data = self.network[2].sdo.upload(0x2000 + dt.INTEGER32, 0) - self.assertEqual(data, b"\xfe\xfd\xfc\xfb") + self.assertEqual(data, b'\xfe\xfd\xfc\xfb') def test_integer40(self): self.data = [ - (TX, b"\x40\x12\x20\x00\x00\x00\x00\x00"), - (RX, b"\x41\x12\x20\x00\xfe\xfd\xfc\xfb"), - (TX, b"\x60\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x05\xb2\x01\x20\x02\x91\x12\x03"), + (TX, b'\x40\x12\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x12\x20\x00\xfe\xfd\xfc\xfb'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x05\xb2\x01\x20\x02\x91\x12\x03'), ] data = self.network[2].sdo.upload(0x2000 + dt.INTEGER40, 0) - self.assertEqual(data, b"\xb2\x01\x20\x02\x91") + self.assertEqual(data, b'\xb2\x01\x20\x02\x91') def test_integer48(self): self.data = [ - (TX, b"\x40\x13\x20\x00\x00\x00\x00\x00"), - (RX, b"\x41\x13\x20\x00\xfe\xfd\xfc\xfb"), - (TX, b"\x60\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x03\xb2\x01\x20\x02\x91\x12\x03"), + (TX, b'\x40\x13\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x13\x20\x00\xfe\xfd\xfc\xfb'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x03\xb2\x01\x20\x02\x91\x12\x03'), ] data = self.network[2].sdo.upload(0x2000 + dt.INTEGER48, 0) - self.assertEqual(data, b"\xb2\x01\x20\x02\x91\x12") + self.assertEqual(data, b'\xb2\x01\x20\x02\x91\x12') def test_integer56(self): self.data = [ - (TX, b"\x40\x14\x20\x00\x00\x00\x00\x00"), - (RX, b"\x41\x14\x20\x00\xfe\xfd\xfc\xfb"), - (TX, b"\x60\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x01\xb2\x01\x20\x02\x91\x12\x03"), + (TX, b'\x40\x14\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x14\x20\x00\xfe\xfd\xfc\xfb'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x01\xb2\x01\x20\x02\x91\x12\x03'), ] data = self.network[2].sdo.upload(0x2000 + dt.INTEGER56, 0) - self.assertEqual(data, b"\xb2\x01\x20\x02\x91\x12\x03") + self.assertEqual(data, b'\xb2\x01\x20\x02\x91\x12\x03') def test_integer64(self): self.data = [ - (TX, b"\x40\x15\x20\x00\x00\x00\x00\x00"), - (RX, b"\x41\x15\x20\x00\xfe\xfd\xfc\xfb"), - (TX, b"\x60\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x00\xb2\x01\x20\x02\x91\x12\x03"), - (TX, b"\x70\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x1d\x19\x21\x70\xfe\xfd\xfc\xfb"), + (TX, b'\x40\x15\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x15\x20\x00\xfe\xfd\xfc\xfb'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\xb2\x01\x20\x02\x91\x12\x03'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x1d\x19\x21\x70\xfe\xfd\xfc\xfb'), ] data = self.network[2].sdo.upload(0x2000 + dt.INTEGER64, 0) - self.assertEqual(data, b"\xb2\x01\x20\x02\x91\x12\x03\x19") + self.assertEqual(data, b'\xb2\x01\x20\x02\x91\x12\x03\x19') def test_real32(self): self.data = [ - (TX, b"\x40\x08\x20\x00\x00\x00\x00\x00"), - (RX, b"\x43\x08\x20\x00\xfe\xfd\xfc\xfb"), + (TX, b'\x40\x08\x20\x00\x00\x00\x00\x00'), + (RX, b'\x43\x08\x20\x00\xfe\xfd\xfc\xfb') ] data = self.network[2].sdo.upload(0x2000 + dt.REAL32, 0) - self.assertEqual(data, b"\xfe\xfd\xfc\xfb") + self.assertEqual(data, b'\xfe\xfd\xfc\xfb') def test_real64(self): self.data = [ - (TX, b"\x40\x11\x20\x00\x00\x00\x00\x00"), - (RX, b"\x41\x11\x20\x00\xfe\xfd\xfc\xfb"), - (TX, b"\x60\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x00\xb2\x01\x20\x02\x91\x12\x03"), - (TX, b"\x70\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x1d\x19\x21\x70\xfe\xfd\xfc\xfb"), + (TX, b'\x40\x11\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x11\x20\x00\xfe\xfd\xfc\xfb'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\xb2\x01\x20\x02\x91\x12\x03'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x1d\x19\x21\x70\xfe\xfd\xfc\xfb'), ] data = self.network[2].sdo.upload(0x2000 + dt.REAL64, 0) - self.assertEqual(data, b"\xb2\x01\x20\x02\x91\x12\x03\x19") + self.assertEqual(data, b'\xb2\x01\x20\x02\x91\x12\x03\x19') def test_visible_string(self): self.data = [ - (TX, b"\x40\x09\x20\x00\x00\x00\x00\x00"), - (RX, b"\x41\x09\x20\x00\x1a\x00\x00\x00"), - (TX, b"\x60\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x00\x54\x69\x6e\x79\x20\x4e\x6f"), - (TX, b"\x70\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x10\x64\x65\x20\x2d\x20\x4d\x65"), - (TX, b"\x60\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x00\x67\x61\x20\x44\x6f\x6d\x61"), - (TX, b"\x70\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x15\x69\x6e\x73\x20\x21\x00\x00"), + (TX, b'\x40\x09\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x09\x20\x00\x1A\x00\x00\x00'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\x54\x69\x6E\x79\x20\x4E\x6F'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x10\x64\x65\x20\x2D\x20\x4D\x65'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\x67\x61\x20\x44\x6F\x6D\x61'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x15\x69\x6E\x73\x20\x21\x00\x00') ] data = self.network[2].sdo.upload(0x2000 + dt.VISIBLE_STRING, 0) - self.assertEqual(data, b"Tiny Node - Mega Domains !") + self.assertEqual(data, b'Tiny Node - Mega Domains !') def test_unicode_string(self): self.data = [ - (TX, b"\x40\x0b\x20\x00\x00\x00\x00\x00"), - (RX, b"\x41\x0b\x20\x00\x1a\x00\x00\x00"), - (TX, b"\x60\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x00\x54\x69\x6e\x79\x20\x4e\x6f"), - (TX, b"\x70\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x10\x64\x65\x20\x2d\x20\x4d\x65"), - (TX, b"\x60\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x00\x67\x61\x20\x44\x6f\x6d\x61"), - (TX, b"\x70\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x15\x69\x6e\x73\x20\x21\x00\x00"), + (TX, b'\x40\x0b\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x0b\x20\x00\x1A\x00\x00\x00'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\x54\x69\x6E\x79\x20\x4E\x6F'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x10\x64\x65\x20\x2D\x20\x4D\x65'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\x67\x61\x20\x44\x6F\x6D\x61'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x15\x69\x6E\x73\x20\x21\x00\x00') ] data = self.network[2].sdo.upload(0x2000 + dt.UNICODE_STRING, 0) - self.assertEqual(data, b"Tiny Node - Mega Domains !") + self.assertEqual(data, b'Tiny Node - Mega Domains !') def test_octet_string(self): self.data = [ - (TX, b"\x40\x0a\x20\x00\x00\x00\x00\x00"), - (RX, b"\x41\x0a\x20\x00\x1a\x00\x00\x00"), - (TX, b"\x60\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x00\x54\x69\x6e\x79\x20\x4e\x6f"), - (TX, b"\x70\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x10\x64\x65\x20\x2d\x20\x4d\x65"), - (TX, b"\x60\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x00\x67\x61\x20\x44\x6f\x6d\x61"), - (TX, b"\x70\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x15\x69\x6e\x73\x20\x21\x00\x00"), + (TX, b'\x40\x0a\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x0a\x20\x00\x1A\x00\x00\x00'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\x54\x69\x6E\x79\x20\x4E\x6F'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x10\x64\x65\x20\x2D\x20\x4D\x65'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\x67\x61\x20\x44\x6F\x6D\x61'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x15\x69\x6E\x73\x20\x21\x00\x00') ] data = self.network[2].sdo.upload(0x2000 + dt.OCTET_STRING, 0) - self.assertEqual(data, b"Tiny Node - Mega Domains !") + self.assertEqual(data, b'Tiny Node - Mega Domains !') def test_domain(self): self.data = [ - (TX, b"\x40\x0f\x20\x00\x00\x00\x00\x00"), - (RX, b"\x41\x0f\x20\x00\x1a\x00\x00\x00"), - (TX, b"\x60\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x00\x54\x69\x6e\x79\x20\x4e\x6f"), - (TX, b"\x70\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x10\x64\x65\x20\x2d\x20\x4d\x65"), - (TX, b"\x60\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x00\x67\x61\x20\x44\x6f\x6d\x61"), - (TX, b"\x70\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x15\x69\x6e\x73\x20\x21\x00\x00"), + (TX, b'\x40\x0f\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\x0f\x20\x00\x1A\x00\x00\x00'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\x54\x69\x6E\x79\x20\x4E\x6F'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x10\x64\x65\x20\x2D\x20\x4D\x65'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\x67\x61\x20\x44\x6F\x6D\x61'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x15\x69\x6E\x73\x20\x21\x00\x00') ] data = self.network[2].sdo.upload(0x2000 + dt.DOMAIN, 0) - self.assertEqual(data, b"Tiny Node - Mega Domains !") + self.assertEqual(data, b'Tiny Node - Mega Domains !') def test_unknown_od_32(self): """Test an unknown OD entry of 32 bits (4 bytes).""" self.data = [ - (TX, b"\x40\xff\x20\x00\x00\x00\x00\x00"), - (RX, b"\x43\xff\x20\x00\xfe\xfd\xfc\xfb"), + (TX, b'\x40\xFF\x20\x00\x00\x00\x00\x00'), + (RX, b'\x43\xFF\x20\x00\xfe\xfd\xfc\xfb') ] data = self.network[2].sdo.upload(0x20FF, 0) - self.assertEqual(data, b"\xfe\xfd\xfc\xfb") + self.assertEqual(data, b'\xfe\xfd\xfc\xfb') def test_unknown_od_112(self): """Test an unknown OD entry of 112 bits (14 bytes).""" self.data = [ - (TX, b"\x40\xff\x20\x00\x00\x00\x00\x00"), - (RX, b"\x41\xff\x20\x00\xfe\xfd\xfc\xfb"), - (TX, b"\x60\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x00\xb2\x01\x20\x02\x91\x12\x03"), - (TX, b"\x70\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x11\x19\x21\x70\xfe\xfd\xfc\xfb"), + (TX, b'\x40\xFF\x20\x00\x00\x00\x00\x00'), + (RX, b'\x41\xFF\x20\x00\xfe\xfd\xfc\xfb'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\xb2\x01\x20\x02\x91\x12\x03'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x11\x19\x21\x70\xfe\xfd\xfc\xfb'), ] data = self.network[2].sdo.upload(0x20FF, 0) - self.assertEqual( - data, b"\xb2\x01\x20\x02\x91\x12\x03\x19\x21\x70\xfe\xfd\xfc\xfb" - ) + self.assertEqual(data, b'\xb2\x01\x20\x02\x91\x12\x03\x19\x21\x70\xfe\xfd\xfc\xfb') def test_unknown_datatype32(self): """Test an unknown datatype, but known OD, of 32 bits (4 bytes).""" @@ -523,11 +516,11 @@ def test_unknown_datatype32(self): fake_var.data_type = 0xFF self.node.object_dictionary.add_object(fake_var) self.data = [ - (TX, b"\x40\x00\x21\x00\x00\x00\x00\x00"), - (RX, b"\x43\x00\x21\x00\xfe\xfd\xfc\xfb"), + (TX, b'\x40\x00\x21\x00\x00\x00\x00\x00'), + (RX, b'\x43\x00\x21\x00\xfe\xfd\xfc\xfb') ] data = self.network[2].sdo.upload(0x2100, 0) - self.assertEqual(data, b"\xfe\xfd\xfc\xfb") + self.assertEqual(data, b'\xfe\xfd\xfc\xfb') def test_unknown_datatype112(self): """Test an unknown datatype, but known OD, of 112 bits (14 bytes).""" @@ -538,18 +531,15 @@ def test_unknown_datatype112(self): fake_var.data_type = 0xFF self.node.object_dictionary.add_object(fake_var) self.data = [ - (TX, b"\x40\x00\x21\x00\x00\x00\x00\x00"), - (RX, b"\x41\x00\x21\x00\xfe\xfd\xfc\xfb"), - (TX, b"\x60\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x00\xb2\x01\x20\x02\x91\x12\x03"), - (TX, b"\x70\x00\x00\x00\x00\x00\x00\x00"), - (RX, b"\x11\x19\x21\x70\xfe\xfd\xfc\xfb"), + (TX, b'\x40\x00\x21\x00\x00\x00\x00\x00'), + (RX, b'\x41\x00\x21\x00\xfe\xfd\xfc\xfb'), + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x00\xb2\x01\x20\x02\x91\x12\x03'), + (TX, b'\x70\x00\x00\x00\x00\x00\x00\x00'), + (RX, b'\x11\x19\x21\x70\xfe\xfd\xfc\xfb'), ] data = self.network[2].sdo.upload(0x2100, 0) - self.assertEqual( - data, b"\xb2\x01\x20\x02\x91\x12\x03\x19\x21\x70\xfe\xfd\xfc\xfb" - ) - + self.assertEqual(data, b'\xb2\x01\x20\x02\x91\x12\x03\x19\x21\x70\xfe\xfd\xfc\xfb') if __name__ == "__main__": unittest.main() From 1131c8d6b75c8ff5f34db3b4eedfd88460b9b7fa Mon Sep 17 00:00:00 2001 From: Herwin Grobben Date: Thu, 22 May 2025 10:33:55 +0200 Subject: [PATCH 7/8] review from github: suggested changes / formatting --- canopen/sdo/exceptions.py | 31 ++++++++--------- canopen/sdo/server.py | 70 ++++++++++++++++++++++++++++----------- 2 files changed, 67 insertions(+), 34 deletions(-) diff --git a/canopen/sdo/exceptions.py b/canopen/sdo/exceptions.py index fe419765..8f602e61 100644 --- a/canopen/sdo/exceptions.py +++ b/canopen/sdo/exceptions.py @@ -1,3 +1,5 @@ +from typing import Union + class SdoError(Exception): pass @@ -33,23 +35,23 @@ class SdoAbortedError(SdoError): 0x060A0023: "Resource not available", 0x08000000: "General error", 0x08000020: "Data cannot be transferred or stored to the application", - 0x08000021: ( - "Data can not be transferred or stored to the application " - "because of local control" - ), - 0x08000022: ( - "Data can not be transferred or stored to the application " - "because of the present device state" - ), - 0x08000023: ( - "Object dictionary dynamic generation fails or no object " - "dictionary is present" - ), + 0x08000021: ("Data can not be transferred or stored to the application " + "because of local control"), + 0x08000022: ("Data can not be transferred or stored to the application " + "because of the present device state"), + 0x08000023: ("Object dictionary dynamic generation fails or no object " + "dictionary is present"), 0x08000024: "No data available", } - def __init__(self, code: int): + def __init__(self, code: Union[int, str]): #: Abort code + if isinstance(code, str): + try: + self.code = self.from_string(code) + except ValueError as e: + raise ValueError(f"Unknown SDO abort description: {code}") from e + else: self.code = code def __str__(self): @@ -62,8 +64,7 @@ def __eq__(self, other): """Compare two exception objects based on SDO abort code.""" return self.code == other.code - @staticmethod - def from_string(text): + def from_string(self, text): code = list(SdoAbortedError.CODES.keys())[ list(SdoAbortedError.CODES.values()).index(text) ] diff --git a/canopen/sdo/server.py b/canopen/sdo/server.py index a7299d86..9f32f9d2 100644 --- a/canopen/sdo/server.py +++ b/canopen/sdo/server.py @@ -9,8 +9,7 @@ class SdoBlockException(SdoAbortedError): - def __init__(self, code: int): - super.__init__(self, code) + """ Dedicated SDO Block exception. """ class SdoServer(SdoBase): """Creates an SDO server.""" @@ -68,6 +67,14 @@ def on_request(self, can_id, data, timestamp): logger.exception(exc) def process_block(self, request): + """ + Process a block request, using a state mechanisme from SdoBlock class + to handle the different states of the block transfer. + + :param request: + CAN message containing EMCY or SDO request. + """ + logger.debug('process_block') command, _, _, code = SDO_ABORT_STRUCT.unpack_from(request) if command == 0x80: @@ -84,14 +91,12 @@ def process_block(self, request): logger.debug('BLOCK_STATE_UP_INIT_RESP') #init response was sent, client required to send new request if (command & REQUEST_BLOCK_UPLOAD) != REQUEST_BLOCK_UPLOAD: - raise SdoBlockException(0x05040001) + raise SdoBlockException("Unknown SDO command specified") if (command & START_BLOCK_UPLOAD) != START_BLOCK_UPLOAD: - raise SdoBlockException(0x05040001) - # self.sdo_block.update_state(BLOCK_STATE_UP_DATA) + raise SdoBlockException("Unknown SDO command specified") # now start blasting data to client from server self.sdo_block.update_state(BLOCK_STATE_UP_DATA) - #self.data_succesfull_upload = self.data_uploaded blocks = self.sdo_block.get_upload_blocks() for block in blocks: @@ -101,13 +106,12 @@ def process_block(self, request): logger.debug('BLOCK_STATE_UP_DATA') command, ackseq, newblk = SDO_BLOCKACK_STRUCT.unpack_from(request) if (command & REQUEST_BLOCK_UPLOAD) != REQUEST_BLOCK_UPLOAD: - raise SdoBlockException(0x05040001) + raise SdoBlockException("Unknown SDO command specified") elif (command & BLOCK_TRANSFER_RESPONSE) != BLOCK_TRANSFER_RESPONSE: - raise SdoBlockException(0x05040001) + raise SdoBlockException("Unknown SDO command specified") elif (ackseq != self.sdo_block.last_seqno): self.sdo_block.data_uploaded = self.sdo_block.data_succesfull_upload - if self.sdo_block.size == self.sdo_block.data_uploaded: logger.debug('BLOCK_STATE_UP_DATA last data') self.sdo_block.update_state(BLOCK_STATE_UP_END) @@ -131,12 +135,12 @@ def process_block(self, request): self.sdo_block = None elif BLOCK_STATE_DOWNLOAD < self.sdo_block.state: - logger.debug('BLOCK_STATE_DOWNLOAD') # in download state - pass + logger.debug('BLOCK_STATE_DOWNLOAD') else: # in neither - raise SdoBlockException(0x08000022) + raise SdoBlockException("Data can not be transferred or stored to the application " + "because of the present device state") def init_upload(self, request): _, index, subindex = SDO_STRUCT.unpack_from(request) @@ -188,6 +192,13 @@ def segmented_upload(self, command): self.send_response(response) def block_upload(self, request): + """ + Process an initial block upload request. + Create a CAN response message and update the state of the SDO block. + + :param request: + CAN message containing SDO request. + """ logging.debug('Enter server block upload') self.sdo_block = SdoBlock(self._node, request) @@ -314,6 +325,10 @@ def download( return self._node.set_data(index, subindex, data) class SdoBlock(): + """ + SdoBlock class to handle block transfer. It keeps track of the + current state and the prepares data to be transferred. + """ state = BLOCK_STATE_NONE crc = False data_uploaded = 0 @@ -323,7 +338,14 @@ class SdoBlock(): last_seqno = 0 def __init__(self, node, request, docrc=False): - + """ + :param node: + Node object owning the server + :param request: + CAN message containing SDO request. + :param docrc: + If True, CRC is calculated and checked. + """ command, index, subindex = SDO_STRUCT.unpack_from(request) # only do crc if crccheck lib is available _and_ if requested _req_crc = (command & CRC_SUPPORTED) == CRC_SUPPORTED @@ -331,8 +353,9 @@ def __init__(self, node, request, docrc=False): if (command & SUB_COMMAND_MASK) == INITIATE_BLOCK_TRANSFER: self.state = BLOCK_STATE_INIT else: - raise SdoBlockException(SdoAbortedError.from_string("Unknown SDO command specified")) + raise SdoBlockException("Unknown SDO command specified") + # TODO: CRC of data if requested self.crc = CRC_SUPPORTED if (docrc & _req_crc) else 0 self._node = node self.index = index @@ -340,24 +363,32 @@ def __init__(self, node, request, docrc=False): self.req_blocksize = request[4] self.seqno = 0 if not 1 <= self.req_blocksize <= 127: - raise SdoBlockException(SdoAbortedError.from_string("Invalid block size")) + raise SdoBlockException("Invalid block size") self.data = self._node.get_data(index, subindex, check_readable=True) self.size = len(self.data) - # TODO: add PST if needed - # self.pst = data[5] - def update_state(self, new_state): + """ + Update the state of the SDO block transfer. The state is + updated only if the new state is higher than the current + state. Otherwise an exception is raised. + """ logging.debug('update_state %X -> %X', self.state, new_state) if new_state >= self.state: self.state = new_state else: - raise SdoBlockException(0x08000022) + raise SdoBlockException("Data can not be transferred or stored to the application " + "because of the present device state") def get_upload_blocks(self): + """ + Get the blocks of data to be sent to the client. The blocks are + created in a messages list of bytearrays. + """ + msgs = [] # seq no 1 - 127, not 0 -.. @@ -387,6 +418,7 @@ def get_upload_blocks(self): return msgs def get_data_byte(self): + """Get the next byte of data to be sent to the client.""" if self.data_uploaded < self.size: self.data_uploaded += 1 return self.data[self.data_uploaded-1] From 5ef47718307d5f4f60a98251bdc30e73601fca54 Mon Sep 17 00:00:00 2001 From: Herwin Grobben Date: Tue, 10 Jun 2025 11:16:43 +0200 Subject: [PATCH 8/8] Requests by reviewer: bugfixes --- canopen/sdo/exceptions.py | 2 +- test/test_local.py | 15 ++++++++------- test/test_sdo.py | 2 +- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/canopen/sdo/exceptions.py b/canopen/sdo/exceptions.py index 8f602e61..d910818b 100644 --- a/canopen/sdo/exceptions.py +++ b/canopen/sdo/exceptions.py @@ -52,7 +52,7 @@ def __init__(self, code: Union[int, str]): except ValueError as e: raise ValueError(f"Unknown SDO abort description: {code}") from e else: - self.code = code + self.code = code def __str__(self): text = f"Code 0x{self.code:08X}" diff --git a/test/test_local.py b/test/test_local.py index e184c040..bc195b2d 100644 --- a/test/test_local.py +++ b/test/test_local.py @@ -37,13 +37,14 @@ def test_expedited_upload(self): vendor_id = self.remote_node.sdo[0x1400][1].raw self.assertEqual(vendor_id, 0x99) - def test_block_upload_switch_to_expedite_upload(self): - with self.assertRaises(canopen.SdoCommunicationError) as context: - with self.remote_node.sdo[0x1008].open('r', block_transfer=True) as fp: - pass - # We get this since the sdo client don't support the switch - # from block upload to expedite upload - self.assertEqual("Unexpected response 0x41", str(context.exception)) + # Remove this test, as Block upload is now supported: + # def test_block_upload_switch_to_expedite_upload(self): + # with self.assertRaises(canopen.SdoCommunicationError) as context: + # with self.remote_node.sdo[0x1008].open('r', block_transfer=True) as fp: + # pass + # # We get this since the sdo client don't support the switch + # # from block upload to expedite upload + # self.assertEqual("Unexpected response 0x41", str(context.exception)) def test_block_download_not_supported(self): data = b"TEST DEVICE" diff --git a/test/test_sdo.py b/test/test_sdo.py index d5b8f05f..e4036efe 100644 --- a/test/test_sdo.py +++ b/test/test_sdo.py @@ -4,7 +4,7 @@ import canopen.objectdictionary.datatypes as dt from canopen.objectdictionary import ODVariable -from util import DATATYPES_EDS, SAMPLE_EDS +from .util import DATATYPES_EDS, SAMPLE_EDS TX = 1