diff --git a/integration-test/plutus_scripts/unroll.plutus b/integration-test/plutus_scripts/unroll.plutus new file mode 100644 index 00000000..0c0fabf1 --- /dev/null +++ b/integration-test/plutus_scripts/unroll.plutus @@ -0,0 +1 @@ +58a20100003232232323232323374a90001bb1498c8d4008400401448c8c92610013330063758a00246eb400452f580264c66ae712410c4e616d654572726f723a207a004984c98cd5ce2481144e616d654572726f723a2076616c696461746f72004984c98cd5ce24810d4e616d654572726f723a20723300498888cc8c014894ccd55cf8008a802099aba0300335742002660040046ae8800400800c8c8c0040040041 \ No newline at end of file diff --git a/integration-test/run_tests.sh b/integration-test/run_tests.sh index cc5cccf9..11ea5242 100755 --- a/integration-test/run_tests.sh +++ b/integration-test/run_tests.sh @@ -93,7 +93,7 @@ while true; do sleep 2 done -poetry run pytest -m "not (CardanoCLI)" -s -vv -n 4 "$ROOT"/test --cov=pycardano --cov-config=../.coveragerc --cov-report=xml:../coverage.xml +poetry run pytest -m "not (CardanoCLI)" -s -vv "$ROOT"/test --cov=pycardano --cov-config=../.coveragerc --cov-report=xml:../coverage.xml # Cleanup docker compose -f docker-compose-chang.yml down --volumes --remove-orphans \ No newline at end of file diff --git a/integration-test/test/test_plutus.py b/integration-test/test/test_plutus.py index d2f95389..2b60dd40 100644 --- a/integration-test/test/test_plutus.py +++ b/integration-test/test/test_plutus.py @@ -4,6 +4,7 @@ from typing import Dict, Union import cbor2 +import ogmios as python_ogmios import pytest from retry import retry @@ -444,6 +445,82 @@ def test_plutus_v3(self): self.assert_output(taker_address, take_output) + def test_plutus_v3_unroll(self): + # ----------- Giver give --------------- + + with open( + "./plutus_scripts/unroll.plutus", + "r", + ) as f: + script_hex = f.read() + hello_world_script = bytes.fromhex(script_hex) + + script_hash = plutus_script_hash(PlutusV3Script(hello_world_script)) + + script_address = Address(script_hash, network=self.NETWORK) + + giver_address = Address(self.payment_vkey.hash(), network=self.NETWORK) + + builder = TransactionBuilder(self.chain_context) + builder.add_input_address(giver_address) + builder.add_output(TransactionOutput(script_address, 50000000, datum=Unit())) + + signed_tx = builder.build_and_sign([self.payment_skey], giver_address) + + print("############### Transaction created ###############") + print(signed_tx) + print(signed_tx.to_cbor_hex()) + print("############### Submitting transaction ###############") + self.chain_context.submit_tx(signed_tx) + time.sleep(3) + + # ----------- Taker take --------------- + + utxo_to_spend = self.chain_context.utxos(script_address)[0] + + taker_address = Address(self.payment_vkey.hash(), network=self.NETWORK) + + builder = TransactionBuilder(self.chain_context) + builder.ttl = self.chain_context.last_block_slot + 10 + + reward_account = Address( + staking_part=self.stake_key_pair.verification_key.hash(), + network=self.NETWORK, + ) + + builder.add_script_input( + utxo_to_spend, PlutusV3Script(hello_world_script), redeemer=Redeemer(0) + ) + builder.add_proposal( + deposit=1234122, + reward_account=bytes(reward_account), + gov_action=ParameterChangeAction( + gov_action_id=GovActionId( + gov_action_index=0, + transaction_id=utxo_to_spend.input.transaction_id, + ), + protocol_param_update=ProtocolParamUpdate( + min_fee_b=1000, + ), + policy_hash=None, + ), + anchor=Anchor( + url="https://test-drep.com", + data_hash=AnchorDataHash(bytes.fromhex("0" * 64)), + ), + ) + + with pytest.raises(python_ogmios.errors.ResponseError) as e: + builder.build_and_sign([self.payment_skey], taker_address) + + for v in [ + "1234122", + "1000", + reward_account.staking_part.payload.hex(), + utxo_to_spend.input.transaction_id.payload.hex(), + ]: + assert v in str(e.value) + class TestPlutusKupoOgmios(TestPlutus): @classmethod diff --git a/pycardano/serialization.py b/pycardano/serialization.py index b2c888d1..63dced92 100644 --- a/pycardano/serialization.py +++ b/pycardano/serialization.py @@ -41,14 +41,7 @@ logger.warning("Failed to remove semantic decoder for CBOR tag 258", e) pass -from cbor2 import ( - CBOREncoder, - CBORSimpleValue, - CBORTag, - FrozenDict, - dumps, - undefined, -) +from cbor2 import CBOREncoder, CBORSimpleValue, CBORTag, FrozenDict, dumps, undefined from frozenlist import FrozenList from pprintpp import pformat @@ -948,8 +941,8 @@ def __repr__(self): def __copy__(self): return self.__class__(self) - def __deepcopy__(self, memodict={}): - return self.__class__(deepcopy(self.data)) + def __deepcopy__(self, memo): + return self.__class__(deepcopy(self.data, memo)) def validate(self): for key, value in self.data.items(): @@ -1082,6 +1075,9 @@ def from_primitive( raise ValueError(f"Cannot deserialize {value} to {cls}") + def __deepcopy__(self, memo): + return self.__class__(deepcopy(list(self), memo), use_tag=self._use_tag) + class NonEmptyOrderedSet(OrderedSet[T]): def __init__(self, iterable: Optional[List[T]] = None, use_tag: bool = True): diff --git a/test/pycardano/test_serialization.py b/test/pycardano/test_serialization.py index 81fa12fd..d7878717 100644 --- a/test/pycardano/test_serialization.py +++ b/test/pycardano/test_serialization.py @@ -1,4 +1,5 @@ from collections import defaultdict, deque +from copy import deepcopy from dataclasses import dataclass, field from test.pycardano.util import check_two_way_cbor from typing import ( @@ -879,3 +880,105 @@ class ChildCodedClass(TestCodedClass): assert restored.value == "test" assert restored.numbers == [1, 2] assert restored.extra == "extra" + + +def test_ordered_set_deepcopy(): + """Test the deepcopy implementation of OrderedSet.""" + # Test basic deepcopy + + class MyOrderedSet(OrderedSet): + pass + + s = MyOrderedSet([1, 2, 3], use_tag=True) + s_copy = deepcopy(s) + + assert s == s_copy + assert s is not s_copy + assert s._use_tag == s_copy._use_tag + + # Test that modifications don't affect each other + s_copy.append(4) + assert 4 in s_copy + assert 4 not in s + + # Test with complex objects + class TestObj: + def __init__(self, value): + self.value = value + + def __str__(self): + return f"TestObj({self.value})" + + obj1 = TestObj("a") + obj2 = TestObj("b") + + s = MyOrderedSet([obj1, obj2], use_tag=False) + s_copy = deepcopy(s) + + # Objects should be equal but not the same instances + assert len(s) == len(s_copy) == 2 + assert s[0].value == s_copy[0].value + assert s[1].value == s_copy[1].value + assert s[0] is not s_copy[0] + assert s[1] is not s_copy[1] + + # Test that memodict works with shared references + shared_obj = TestObj("shared") + s = MyOrderedSet([shared_obj, shared_obj], use_tag=True) # Same object twice + + s_copy = deepcopy(s) + # In the copy, both elements should be the same object (preserved reference) + assert len(s_copy) == 1 + assert s_copy[0] is not shared_obj + + +def test_non_empty_ordered_set_deepcopy(): + """Test the deepcopy implementation of NonEmptyOrderedSet.""" + + class MyNonEmptyOrderedSet(NonEmptyOrderedSet): + pass + + # Test basic deepcopy + s = MyNonEmptyOrderedSet([1, 2, 3], use_tag=True) + s_copy = deepcopy(s) + + assert s == s_copy + assert s is not s_copy + assert s._use_tag == s_copy._use_tag + + # Test with nested lists + nested_list = [[1, 2], [3, 4]] + s = MyNonEmptyOrderedSet(nested_list, use_tag=False) + s_copy = deepcopy(s) + + # Lists should be equal but not the same instances + assert s[0] == s_copy[0] + assert s[1] == s_copy[1] + assert s[0] is not s_copy[0] + assert s[1] is not s_copy[1] + + # Modifying the copy shouldn't affect the original + s_copy[0].append(5) + assert s[0] == [1, 2] + assert s_copy[0] == [1, 2, 5] + + # Test complex nesting with CBORSerializable objects + @dataclass + class TestData(MapCBORSerializable): + value: int = 0 + + obj1 = TestData(1) + obj2 = TestData(2) + s = MyNonEmptyOrderedSet([obj1, obj2], use_tag=True) + s_copy = deepcopy(s) + + # Objects should be equal but not the same instances + assert s[0].value == s_copy[0].value + assert s[1].value == s_copy[1].value + assert s[0] is not s_copy[0] + assert s[1] is not s_copy[1] + + # Modifying the copy shouldn't affect the original + s_copy[0].value = 100 + assert s[0].value == 1 + assert s_copy[0].value == 100