|
| 1 | +import os |
| 2 | +import sys |
| 3 | +import unittest |
| 4 | +from unittest.mock import patch |
| 5 | + |
| 6 | +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) |
| 7 | +__package__ = "tests" |
| 8 | + |
| 9 | +from dpdispatcher.utils.utils import rsync |
| 10 | + |
| 11 | + |
| 12 | +class TestRsyncFlags(unittest.TestCase): |
| 13 | + """Test rsync function flags to ensure correct options are used.""" |
| 14 | + |
| 15 | + @patch("dpdispatcher.utils.utils.run_cmd_with_all_output") |
| 16 | + def test_rsync_flags_exclude_owner_group(self, mock_run_cmd): |
| 17 | + """Test that rsync uses flags that exclude owner and group preservation.""" |
| 18 | + # Mock successful command execution |
| 19 | + mock_run_cmd.return_value = (0, "", "") |
| 20 | + |
| 21 | + # Call rsync function |
| 22 | + rsync("source_file", "dest_file", key_filename="test_key") |
| 23 | + |
| 24 | + # Verify the command was called |
| 25 | + mock_run_cmd.assert_called_once() |
| 26 | + |
| 27 | + # Get the command that was executed |
| 28 | + called_cmd = mock_run_cmd.call_args[0][0] |
| 29 | + |
| 30 | + # Verify the command contains the correct flags |
| 31 | + self.assertIn("-rlptDz", called_cmd) |
| 32 | + self.assertNotIn("-az", called_cmd) |
| 33 | + |
| 34 | + # Verify rsync command structure |
| 35 | + self.assertIn("rsync", called_cmd) |
| 36 | + self.assertIn("source_file", called_cmd) |
| 37 | + self.assertIn("dest_file", called_cmd) |
| 38 | + self.assertIn("-e", called_cmd) |
| 39 | + self.assertIn("-q", called_cmd) |
| 40 | + |
| 41 | + @patch("dpdispatcher.utils.utils.run_cmd_with_all_output") |
| 42 | + def test_rsync_with_proxy_command_flags(self, mock_run_cmd): |
| 43 | + """Test that rsync uses correct flags even with proxy command.""" |
| 44 | + # Mock successful command execution |
| 45 | + mock_run_cmd.return_value = (0, "", "") |
| 46 | + |
| 47 | + # Call rsync function with proxy command |
| 48 | + rsync( |
| 49 | + "source_file", |
| 50 | + "dest_file", |
| 51 | + key_filename="test_key", |
| 52 | + proxy_command="ssh -W target:22 jump_host", |
| 53 | + ) |
| 54 | + |
| 55 | + # Verify the command was called |
| 56 | + mock_run_cmd.assert_called_once() |
| 57 | + |
| 58 | + # Get the command that was executed |
| 59 | + called_cmd = mock_run_cmd.call_args[0][0] |
| 60 | + |
| 61 | + # Verify the command contains the correct flags |
| 62 | + self.assertIn("-rlptDz", called_cmd) |
| 63 | + self.assertNotIn("-az", called_cmd) |
| 64 | + |
| 65 | + @patch("dpdispatcher.utils.utils.run_cmd_with_all_output") |
| 66 | + def test_rsync_error_handling(self, mock_run_cmd): |
| 67 | + """Test that rsync properly handles errors.""" |
| 68 | + # Mock failed command execution |
| 69 | + mock_run_cmd.return_value = ( |
| 70 | + 23, |
| 71 | + "", |
| 72 | + "rsync: chown failed: Operation not permitted", |
| 73 | + ) |
| 74 | + |
| 75 | + # Call rsync function and expect RuntimeError |
| 76 | + with self.assertRaises(RuntimeError) as context: |
| 77 | + rsync("source_file", "dest_file") |
| 78 | + |
| 79 | + # Verify error message contains the command and error |
| 80 | + self.assertIn("Failed to run", str(context.exception)) |
| 81 | + self.assertIn( |
| 82 | + "rsync: chown failed: Operation not permitted", str(context.exception) |
| 83 | + ) |
| 84 | + |
| 85 | + |
| 86 | +if __name__ == "__main__": |
| 87 | + unittest.main() |
0 commit comments