|
1 | 1 | """Module for GCP utilities."""
|
2 | 2 | import os
|
3 | 3 | import logging
|
| 4 | +import time |
4 | 5 | import io
|
5 | 6 | import json
|
6 | 7 | import hashlib
|
|
11 | 12 | from mimetypes import guess_type
|
12 | 13 | from typing import Optional, Any
|
13 | 14 | from google.cloud.storage.blob import Blob
|
| 15 | +from google.api_core.exceptions import Forbidden, GoogleAPICallError |
14 | 16 | from google.oauth2 import service_account
|
15 | 17 | from google.cloud import storage
|
16 | 18 | from google.auth import default
|
@@ -729,4 +731,112 @@ def get_active_gcloud_account() -> str:
|
729 | 731 | )
|
730 | 732 | return result.stdout.strip()
|
731 | 733 |
|
732 |
| - |
| 734 | + def has_write_permission(self, cloud_path: str) -> bool: |
| 735 | + """ |
| 736 | + Check if the current user has permission to write to a GCP path. |
| 737 | +
|
| 738 | + This method tests write access by attempting to update the metadata |
| 739 | + of an existing blob or create a zero-byte temporary file if the blob |
| 740 | + doesn't exist. The temporary file is deleted immediately if created. |
| 741 | +
|
| 742 | + **Args:** |
| 743 | + - cloud_path (str): The GCS path to check for write permissions. |
| 744 | +
|
| 745 | + **Returns:** |
| 746 | + - bool: True if the user has write permission, False otherwise. |
| 747 | + """ |
| 748 | + if not cloud_path.startswith("gs://"): |
| 749 | + raise ValueError("cloud_path must start with 'gs://'") |
| 750 | + if cloud_path.endswith("/"): |
| 751 | + logging.warning(f"Provided cloud path {cloud_path} is a directory, will check {cloud_path}permission_test_temp") |
| 752 | + cloud_path = f"{cloud_path}permission_test_temp" |
| 753 | + try: |
| 754 | + blob = self.load_blob_from_full_path(cloud_path) |
| 755 | + if blob.exists(): |
| 756 | + # Try updating metadata (doesn't change the content) |
| 757 | + original_metadata = blob.metadata or {} |
| 758 | + test_metadata = original_metadata.copy() |
| 759 | + test_metadata["_write_permission_test"] = "true" |
| 760 | + |
| 761 | + blob.metadata = test_metadata |
| 762 | + blob.patch() |
| 763 | + |
| 764 | + # Restore the original metadata |
| 765 | + blob.metadata = original_metadata |
| 766 | + blob.patch() |
| 767 | + |
| 768 | + logging.info(f"Write permission confirmed for existing blob {cloud_path}") |
| 769 | + return True |
| 770 | + else: |
| 771 | + # Try writing a temporary file to the bucket |
| 772 | + blob.upload_from_string("") |
| 773 | + |
| 774 | + # Clean up the test file |
| 775 | + blob.delete() |
| 776 | + logging.info(f"Write permission confirmed for {cloud_path}") |
| 777 | + return True |
| 778 | + except Forbidden: |
| 779 | + logging.warning(f"No write permission on path {cloud_path}") |
| 780 | + return False |
| 781 | + except GoogleAPICallError as e: |
| 782 | + logging.warning(f"Error testing write access to {cloud_path}: {e}") |
| 783 | + return False |
| 784 | + |
| 785 | + def wait_for_write_permission(self, cloud_path: str, interval_wait_time_minutes: int, max_wait_time_minutes: int) -> None: |
| 786 | + """ |
| 787 | + Wait for write permissions on a GCP path, checking at regular intervals. |
| 788 | +
|
| 789 | + This method will periodically check if the user has write permission on the specified cloud path. |
| 790 | + It will continue checking until either write permission is granted or the maximum wait time is reached. |
| 791 | +
|
| 792 | + **Args:** |
| 793 | + - cloud_path (str): The GCS path to check for write permissions. |
| 794 | + - interval_wait_time_minutes (int): Time in minutes to wait between permission checks. |
| 795 | + - max_wait_time_minutes (int): Maximum time in minutes to wait for permissions. |
| 796 | +
|
| 797 | + **Returns:** |
| 798 | + - bool: True if write permission is granted within the wait time, False otherwise. |
| 799 | + """ |
| 800 | + if not cloud_path.startswith("gs://"): |
| 801 | + raise ValueError("cloud_path must start with 'gs://'") |
| 802 | + |
| 803 | + # Convert minutes to seconds for the sleep function |
| 804 | + interval_seconds = interval_wait_time_minutes * 60 |
| 805 | + max_wait_seconds = max_wait_time_minutes * 60 |
| 806 | + |
| 807 | + start_time = time.time() |
| 808 | + attempt_number = 1 |
| 809 | + |
| 810 | + logging.info( |
| 811 | + f"Starting to check for write permissions on {cloud_path}. Will check " |
| 812 | + f"every {interval_wait_time_minutes} minute(s) for up to {max_wait_time_minutes} minute(s).") |
| 813 | + |
| 814 | + # First check immediately |
| 815 | + if self.has_write_permission(cloud_path): |
| 816 | + logging.info(f"Write permission confirmed on initial check for {cloud_path}") |
| 817 | + return |
| 818 | + |
| 819 | + # If first check fails, start periodic checks |
| 820 | + while time.time() - start_time < max_wait_seconds: |
| 821 | + elapsed_minutes = (time.time() - start_time) / 60 |
| 822 | + remaining_minutes = max_wait_time_minutes - elapsed_minutes |
| 823 | + |
| 824 | + logging.info(f"Waiting {interval_wait_time_minutes} minute(s) before next permission check. " |
| 825 | + f"Time elapsed: {elapsed_minutes:.1f} minute(s). " |
| 826 | + f"Time remaining: {remaining_minutes:.1f} minute(s).") |
| 827 | + |
| 828 | + # Sleep for the interval duration |
| 829 | + time.sleep(interval_seconds) |
| 830 | + |
| 831 | + attempt_number += 1 |
| 832 | + logging.info(f"Checking write permissions (attempt {attempt_number})...") |
| 833 | + |
| 834 | + if self.has_write_permission(cloud_path): |
| 835 | + elapsed_minutes = (time.time() - start_time) / 60 |
| 836 | + logging.info(f"Write permission confirmed after {elapsed_minutes:.1f} minute(s) on attempt {attempt_number}") |
| 837 | + return |
| 838 | + |
| 839 | + # If we get here, we've exceeded the maximum wait time |
| 840 | + raise PermissionError( |
| 841 | + f"Maximum wait time of {max_wait_time_minutes} minute(s) exceeded. Write permission was not granted for " |
| 842 | + f"{cloud_path} after {attempt_number} attempts.") |
0 commit comments