Skip to content

Add deepcopy to [Nonempty]OrderedSet #440

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions integration-test/plutus_scripts/unroll.plutus
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
58a20100003232232323232323374a90001bb1498c8d4008400401448c8c92610013330063758a00246eb400452f580264c66ae712410c4e616d654572726f723a207a004984c98cd5ce2481144e616d654572726f723a2076616c696461746f72004984c98cd5ce24810d4e616d654572726f723a20723300498888cc8c014894ccd55cf8008a802099aba0300335742002660040046ae8800400800c8c8c0040040041
2 changes: 1 addition & 1 deletion integration-test/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ while true; do
sleep 2
done

poetry run pytest -m "not (CardanoCLI)" -s -vv -n 4 "$ROOT"/test --cov=pycardano --cov-config=../.coveragerc --cov-report=xml:../coverage.xml
poetry run pytest -m "not (CardanoCLI)" -s -vv "$ROOT"/test --cov=pycardano --cov-config=../.coveragerc --cov-report=xml:../coverage.xml

# Cleanup
docker compose -f docker-compose-chang.yml down --volumes --remove-orphans
77 changes: 77 additions & 0 deletions integration-test/test/test_plutus.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Dict, Union

import cbor2
import ogmios as python_ogmios
import pytest
from retry import retry

Expand Down Expand Up @@ -444,6 +445,82 @@ def test_plutus_v3(self):

self.assert_output(taker_address, take_output)

def test_plutus_v3_unroll(self):
# ----------- Giver give ---------------

with open(
"./plutus_scripts/unroll.plutus",
"r",
) as f:
script_hex = f.read()
hello_world_script = bytes.fromhex(script_hex)

script_hash = plutus_script_hash(PlutusV3Script(hello_world_script))

script_address = Address(script_hash, network=self.NETWORK)

giver_address = Address(self.payment_vkey.hash(), network=self.NETWORK)

builder = TransactionBuilder(self.chain_context)
builder.add_input_address(giver_address)
builder.add_output(TransactionOutput(script_address, 50000000, datum=Unit()))

signed_tx = builder.build_and_sign([self.payment_skey], giver_address)

print("############### Transaction created ###############")
print(signed_tx)
print(signed_tx.to_cbor_hex())
print("############### Submitting transaction ###############")
self.chain_context.submit_tx(signed_tx)
time.sleep(3)

# ----------- Taker take ---------------

utxo_to_spend = self.chain_context.utxos(script_address)[0]

taker_address = Address(self.payment_vkey.hash(), network=self.NETWORK)

builder = TransactionBuilder(self.chain_context)
builder.ttl = self.chain_context.last_block_slot + 10

reward_account = Address(
staking_part=self.stake_key_pair.verification_key.hash(),
network=self.NETWORK,
)

builder.add_script_input(
utxo_to_spend, PlutusV3Script(hello_world_script), redeemer=Redeemer(0)
)
builder.add_proposal(
deposit=1234122,
reward_account=bytes(reward_account),
gov_action=ParameterChangeAction(
gov_action_id=GovActionId(
gov_action_index=0,
transaction_id=utxo_to_spend.input.transaction_id,
),
protocol_param_update=ProtocolParamUpdate(
min_fee_b=1000,
),
policy_hash=None,
),
anchor=Anchor(
url="https://test-drep.com",
data_hash=AnchorDataHash(bytes.fromhex("0" * 64)),
),
)

with pytest.raises(python_ogmios.errors.ResponseError) as e:
builder.build_and_sign([self.payment_skey], taker_address)

for v in [
"1234122",
"1000",
reward_account.staking_part.payload.hex(),
utxo_to_spend.input.transaction_id.payload.hex(),
]:
assert v in str(e.value)


class TestPlutusKupoOgmios(TestPlutus):
@classmethod
Expand Down
16 changes: 6 additions & 10 deletions pycardano/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,7 @@
logger.warning("Failed to remove semantic decoder for CBOR tag 258", e)
pass

from cbor2 import (
CBOREncoder,
CBORSimpleValue,
CBORTag,
FrozenDict,
dumps,
undefined,
)
from cbor2 import CBOREncoder, CBORSimpleValue, CBORTag, FrozenDict, dumps, undefined
from frozenlist import FrozenList
from pprintpp import pformat

Expand Down Expand Up @@ -948,8 +941,8 @@ def __repr__(self):
def __copy__(self):
return self.__class__(self)

def __deepcopy__(self, memodict={}):
return self.__class__(deepcopy(self.data))
def __deepcopy__(self, memo):
return self.__class__(deepcopy(self.data, memo))

def validate(self):
for key, value in self.data.items():
Expand Down Expand Up @@ -1082,6 +1075,9 @@ def from_primitive(

raise ValueError(f"Cannot deserialize {value} to {cls}")

def __deepcopy__(self, memo):
return self.__class__(deepcopy(list(self), memo), use_tag=self._use_tag)


class NonEmptyOrderedSet(OrderedSet[T]):
def __init__(self, iterable: Optional[List[T]] = None, use_tag: bool = True):
Expand Down
103 changes: 103 additions & 0 deletions test/pycardano/test_serialization.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from collections import defaultdict, deque
from copy import deepcopy
from dataclasses import dataclass, field
from test.pycardano.util import check_two_way_cbor
from typing import (
Expand Down Expand Up @@ -879,3 +880,105 @@ class ChildCodedClass(TestCodedClass):
assert restored.value == "test"
assert restored.numbers == [1, 2]
assert restored.extra == "extra"


def test_ordered_set_deepcopy():
"""Test the deepcopy implementation of OrderedSet."""
# Test basic deepcopy

class MyOrderedSet(OrderedSet):
pass

s = MyOrderedSet([1, 2, 3], use_tag=True)
s_copy = deepcopy(s)

assert s == s_copy
assert s is not s_copy
assert s._use_tag == s_copy._use_tag

# Test that modifications don't affect each other
s_copy.append(4)
assert 4 in s_copy
assert 4 not in s

# Test with complex objects
class TestObj:
def __init__(self, value):
self.value = value

def __str__(self):
return f"TestObj({self.value})"

obj1 = TestObj("a")
obj2 = TestObj("b")

s = MyOrderedSet([obj1, obj2], use_tag=False)
s_copy = deepcopy(s)

# Objects should be equal but not the same instances
assert len(s) == len(s_copy) == 2
assert s[0].value == s_copy[0].value
assert s[1].value == s_copy[1].value
assert s[0] is not s_copy[0]
assert s[1] is not s_copy[1]

# Test that memodict works with shared references
shared_obj = TestObj("shared")
s = MyOrderedSet([shared_obj, shared_obj], use_tag=True) # Same object twice

s_copy = deepcopy(s)
# In the copy, both elements should be the same object (preserved reference)
assert len(s_copy) == 1
assert s_copy[0] is not shared_obj


def test_non_empty_ordered_set_deepcopy():
"""Test the deepcopy implementation of NonEmptyOrderedSet."""

class MyNonEmptyOrderedSet(NonEmptyOrderedSet):
pass

# Test basic deepcopy
s = MyNonEmptyOrderedSet([1, 2, 3], use_tag=True)
s_copy = deepcopy(s)

assert s == s_copy
assert s is not s_copy
assert s._use_tag == s_copy._use_tag

# Test with nested lists
nested_list = [[1, 2], [3, 4]]
s = MyNonEmptyOrderedSet(nested_list, use_tag=False)
s_copy = deepcopy(s)

# Lists should be equal but not the same instances
assert s[0] == s_copy[0]
assert s[1] == s_copy[1]
assert s[0] is not s_copy[0]
assert s[1] is not s_copy[1]

# Modifying the copy shouldn't affect the original
s_copy[0].append(5)
assert s[0] == [1, 2]
assert s_copy[0] == [1, 2, 5]

# Test complex nesting with CBORSerializable objects
@dataclass
class TestData(MapCBORSerializable):
value: int = 0

obj1 = TestData(1)
obj2 = TestData(2)
s = MyNonEmptyOrderedSet([obj1, obj2], use_tag=True)
s_copy = deepcopy(s)

# Objects should be equal but not the same instances
assert s[0].value == s_copy[0].value
assert s[1].value == s_copy[1].value
assert s[0] is not s_copy[0]
assert s[1] is not s_copy[1]

# Modifying the copy shouldn't affect the original
s_copy[0].value = 100
assert s[0].value == 1
assert s_copy[0].value == 100