Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions VERSION.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
11.3.2
- Adding tests
11.4.0
- Adding methods to check if gcp path is writeable
112 changes: 111 additions & 1 deletion ops_utils/gcp_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Module for GCP utilities."""
import os
import logging
import time
import io
import json
import hashlib
Expand All @@ -11,6 +12,7 @@
from mimetypes import guess_type
from typing import Optional, Any
from google.cloud.storage.blob import Blob
from google.api_core.exceptions import Forbidden, GoogleAPICallError
from google.oauth2 import service_account
from google.cloud import storage
from google.auth import default
Expand Down Expand Up @@ -729,4 +731,112 @@ def get_active_gcloud_account() -> str:
)
return result.stdout.strip()


def has_write_permission(self, cloud_path: str) -> bool:
"""
Check if the current user has permission to write to a GCP path.

This method tests write access by attempting to update the metadata
of an existing blob or create a zero-byte temporary file if the blob
doesn't exist. The temporary file is deleted immediately if created.

**Args:**
- cloud_path (str): The GCS path to check for write permissions.

**Returns:**
- bool: True if the user has write permission, False otherwise.
"""
if not cloud_path.startswith("gs://"):
raise ValueError("cloud_path must start with 'gs://'")
if cloud_path.endswith("/"):
logging.warning(f"Provided cloud path {cloud_path} is a directory, will check {cloud_path}permission_test_temp")
cloud_path = f"{cloud_path}permission_test_temp"
try:
blob = self.load_blob_from_full_path(cloud_path)
if blob.exists():
# Try updating metadata (doesn't change the content)
original_metadata = blob.metadata or {}
test_metadata = original_metadata.copy()
test_metadata["_write_permission_test"] = "true"

blob.metadata = test_metadata
blob.patch()

# Restore the original metadata
blob.metadata = original_metadata
blob.patch()

logging.info(f"Write permission confirmed for existing blob {cloud_path}")
return True
else:
# Try writing a temporary file to the bucket
blob.upload_from_string("")

# Clean up the test file
blob.delete()
logging.info(f"Write permission confirmed for {cloud_path}")
return True
except Forbidden:
logging.warning(f"No write permission on path {cloud_path}")
return False
except GoogleAPICallError as e:
logging.warning(f"Error testing write access to {cloud_path}: {e}")
return False

def wait_for_write_permission(self, cloud_path: str, interval_wait_time_minutes: int, max_wait_time_minutes: int) -> None:
"""
Wait for write permissions on a GCP path, checking at regular intervals.

This method will periodically check if the user has write permission on the specified cloud path.
It will continue checking until either write permission is granted or the maximum wait time is reached.

**Args:**
- cloud_path (str): The GCS path to check for write permissions.
- interval_wait_time_minutes (int): Time in minutes to wait between permission checks.
- max_wait_time_minutes (int): Maximum time in minutes to wait for permissions.

**Returns:**
- bool: True if write permission is granted within the wait time, False otherwise.
"""
if not cloud_path.startswith("gs://"):
raise ValueError("cloud_path must start with 'gs://'")

# Convert minutes to seconds for the sleep function
interval_seconds = interval_wait_time_minutes * 60
max_wait_seconds = max_wait_time_minutes * 60

start_time = time.time()
attempt_number = 1

logging.info(
f"Starting to check for write permissions on {cloud_path}. Will check "
f"every {interval_wait_time_minutes} minute(s) for up to {max_wait_time_minutes} minute(s).")

# First check immediately
if self.has_write_permission(cloud_path):
logging.info(f"Write permission confirmed on initial check for {cloud_path}")
return

# If first check fails, start periodic checks
while time.time() - start_time < max_wait_seconds:
elapsed_minutes = (time.time() - start_time) / 60
remaining_minutes = max_wait_time_minutes - elapsed_minutes

logging.info(f"Waiting {interval_wait_time_minutes} minute(s) before next permission check. "
f"Time elapsed: {elapsed_minutes:.1f} minute(s). "
f"Time remaining: {remaining_minutes:.1f} minute(s).")

# Sleep for the interval duration
time.sleep(interval_seconds)

attempt_number += 1
logging.info(f"Checking write permissions (attempt {attempt_number})...")

if self.has_write_permission(cloud_path):
elapsed_minutes = (time.time() - start_time) / 60
logging.info(f"Write permission confirmed after {elapsed_minutes:.1f} minute(s) on attempt {attempt_number}")
return

# If we get here, we've exceeded the maximum wait time
raise PermissionError(
f"Maximum wait time of {max_wait_time_minutes} minute(s) exceeded. Write permission was not granted for "
f"{cloud_path} after {attempt_number} attempts.")
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ dependencies = [
"google-cloud-secret-manager",
"azure-identity==1.17.1",
"azure-storage-blob==12.21.0",
"jira",
"jira==3.8.0",
"oauth2client",
"backoff",
"aiohttp",
Expand Down
2 changes: 1 addition & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ google-cloud-storage==2.17.0
google-cloud-bigquery
google-api-python-client
google-cloud-secret-manager
jira
jira==3.8.0
oauth2client
numpy
pandas
Expand Down