1
1
import json
2
+ from collections .abc import Callable
2
3
from pathlib import Path
3
4
from typing import Any
4
5
11
12
SAMPLE1 = TESTS_DATA / "schema1.graphql"
12
13
SAMPLE2 = TESTS_DATA / "schema2.graphql"
13
14
SAMPLE3 = TESTS_DATA / "schema3.graphql"
14
- UNITS = TESTS_DATA / "test_units.yaml"
15
15
16
16
# Version bump test schemas
17
17
BASE_SCHEMA = TESTS_DATA / "base.graphql"
@@ -192,19 +192,7 @@ def test_registry_export_concept_uri(runner: CliRunner, tmp_outputs: Path) -> No
192
192
193
193
def test_registry_export_id (runner : CliRunner , tmp_outputs : Path ) -> None :
194
194
out = tmp_outputs / "ids.json"
195
- result = runner .invoke (
196
- cli ,
197
- [
198
- "registry" ,
199
- "id" ,
200
- "-s" ,
201
- str (SAMPLE1 ),
202
- "-u" ,
203
- str (UNITS ),
204
- "-o" ,
205
- str (out ),
206
- ],
207
- )
195
+ result = runner .invoke (cli , ["registry" , "id" , "-s" , str (SAMPLE1 ), "-o" , str (out )])
208
196
assert result .exit_code == 0 , result .output
209
197
assert out .exists ()
210
198
with open (out ) as f :
@@ -215,7 +203,7 @@ def test_registry_export_id(runner: CliRunner, tmp_outputs: Path) -> None:
215
203
216
204
def test_registry_init (runner : CliRunner , tmp_outputs : Path ) -> None :
217
205
out = tmp_outputs / "spec_history.json"
218
- result = runner .invoke (cli , ["registry" , "init" , "-s" , str (SAMPLE1 ), "-u" , str ( UNITS ), "- o" , str (out )])
206
+ result = runner .invoke (cli , ["registry" , "init" , "-s" , str (SAMPLE1 ), "-o" , str (out )])
219
207
assert result .exit_code == 0 , result .output
220
208
assert out .exists ()
221
209
with open (out ) as f :
@@ -230,21 +218,19 @@ def test_registry_init(runner: CliRunner, tmp_outputs: Path) -> None:
230
218
spec_history
231
219
and isinstance (spec_history , list )
232
220
and isinstance (spec_history [0 ], dict )
233
- and spec_history [0 ].get ("@id" ) == "0xEC20D822 "
221
+ and spec_history [0 ].get ("@id" ) == "0x32E14E88 "
234
222
):
235
223
found = True
236
224
break
237
- assert found , 'Expected entry with "@id": "ns:Vehicle.averageSpeed" and specHistory id "0xEC20D822 " not found.'
225
+ assert found , 'Expected entry with "@id": "ns:Vehicle.averageSpeed" and specHistory id "0x32E14E88 " not found.'
238
226
239
227
240
228
def test_registry_update (runner : CliRunner , tmp_outputs : Path ) -> None :
241
229
out = tmp_outputs / "spec_history_update.json"
242
230
# First, create a spec history file
243
231
init_out = tmp_outputs / "spec_history.json"
244
- runner .invoke (cli , ["registry" , "init" , "-s" , str (SAMPLE1 ), "-u" , str (UNITS ), "-o" , str (init_out )])
245
- runner .invoke (
246
- cli , ["registry" , "update" , "-s" , str (SAMPLE2 ), "-u" , str (UNITS ), "-sh" , str (init_out ), "-o" , str (out )]
247
- )
232
+ runner .invoke (cli , ["registry" , "init" , "-s" , str (SAMPLE1 ), "-o" , str (init_out )])
233
+ runner .invoke (cli , ["registry" , "update" , "-s" , str (SAMPLE2 ), "-sh" , str (init_out ), "-o" , str (out )])
248
234
assert out .exists ()
249
235
with open (out ) as f :
250
236
data = json .load (f )
@@ -256,13 +242,13 @@ def test_registry_update(runner: CliRunner, tmp_outputs: Path) -> None:
256
242
if isinstance (entry , dict ) and entry .get ("@id" ) == "ns:Vehicle.averageSpeed" :
257
243
spec_history = entry .get ("specHistory" , [])
258
244
ids = [h .get ("@id" ) for h in spec_history if isinstance (h , dict )]
259
- if "0xEC20D822 " in ids :
245
+ if "0x32E14E88 " in ids :
260
246
found_old = True
261
- if "0xB86BF561 " in ids :
247
+ if "0xE47AA673 " in ids :
262
248
found_new = True
263
249
break
264
- assert found_old , 'Expected old specHistory id "0xEC20D822 " not found.'
265
- assert found_new , 'Expected new specHistory id "0xB86BF561 " not found.'
250
+ assert found_old , 'Expected old specHistory id "0x32E14E88 " not found.'
251
+ assert found_new , 'Expected new specHistory id "0xE47AA673 " not found.'
266
252
267
253
268
254
@pytest .mark .parametrize (
@@ -397,3 +383,33 @@ def test_stats_graphql(runner: CliRunner) -> None:
397
383
print (f"{ result .output = } " )
398
384
assert result .exit_code == 0 , result .output
399
385
assert "'UInt32': 1" in result .output
386
+
387
+
388
+ def test_units_sync_cli (
389
+ runner : CliRunner ,
390
+ tmp_outputs : Path ,
391
+ monkeypatch : pytest .MonkeyPatch ,
392
+ mock_sync_qudt_units : Callable [..., list [Path ]],
393
+ mock_check_latest_qudt_version : Callable [[], str ],
394
+ ) -> None :
395
+ """Test that the units sync CLI command works end-to-end."""
396
+
397
+ # Simple mock that just returns a few paths
398
+ def simple_mock (units_root : Path , version : str | None = None , * , dry_run : bool = False ) -> list [Path ]:
399
+ return mock_sync_qudt_units (units_root , version , dry_run = dry_run , num_paths = 2 )
400
+
401
+ # Apply mocks to avoid network calls
402
+ monkeypatch .setattr ("s2dm.cli.sync_qudt_units" , simple_mock )
403
+ monkeypatch .setattr ("s2dm.cli.check_latest_qudt_version" , mock_check_latest_qudt_version )
404
+
405
+ output_dir = tmp_outputs / "units_cli_test"
406
+
407
+ # Test basic CLI functionality
408
+ result = runner .invoke (cli , ["units" , "sync" , "--output-dir" , str (output_dir )])
409
+ assert result .exit_code == 0 , result .output
410
+ assert "Generated" in result .output or "enum files" in result .output
411
+
412
+ # Test that --dry-run flag is accepted (detailed behavior tested in unit tests)
413
+ result = runner .invoke (cli , ["units" , "sync" , "--output-dir" , str (output_dir ), "--dry-run" ])
414
+ assert result .exit_code == 0 , result .output
415
+ assert "Would generate" in result .output or "dry" in result .output .lower ()
0 commit comments