Skip to content

Commit

Permalink
Better error handling on expand_s3_glob
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Jan 10, 2025
1 parent 5e633e0 commit 683be68
Showing 1 changed file with 53 additions and 14 deletions.
67 changes: 53 additions & 14 deletions pdelfin/s3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,61 @@ def parse_s3_path(s3_path: str) -> tuple[str, str]:


def expand_s3_glob(s3_client, s3_glob: str) -> dict[str, str]:
"""
Expand an S3 path that may or may not contain wildcards (e.g., *.pdf).
Returns a dict of {'s3://bucket/key': etag} for each matching object.
Raises a ValueError if nothing is found or if a bare prefix was provided by mistake.
"""
parsed = urlparse(s3_glob)
bucket_name = parsed.netloc
prefix = os.path.dirname(parsed.path.lstrip('/')).rstrip('/') + "/"
pattern = os.path.basename(parsed.path)

paginator = s3_client.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)

matched_files = {}
for page in page_iterator:
for obj in page.get('Contents', []):
key = obj['Key']
if glob.fnmatch.fnmatch(key, posixpath.join(prefix, pattern)):
matched_files[f"s3://{bucket_name}/{key}"] = obj['ETag'].strip('"')
if not parsed.scheme.startswith("s3"):
raise ValueError("Path must start with s3://")

return matched_files
bucket = parsed.netloc
raw_path = parsed.path.lstrip("/")
prefix = posixpath.dirname(raw_path)
pattern = posixpath.basename(raw_path)

# Case 1: We have a wildcard
if any(wc in pattern for wc in ["*", "?", "[", "]"]):
if prefix and not prefix.endswith("/"):
prefix += "/"
paginator = s3_client.get_paginator("list_objects_v2")
matched = {}
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
key = obj["Key"]
if glob.fnmatch.fnmatch(key, posixpath.join(prefix, pattern)):
matched[f"s3://{bucket}/{key}"] = obj["ETag"].strip('"')
if not matched:
raise ValueError(f"No objects found for pattern '{s3_glob}'. Check your path or pattern.")
return matched

# Case 2: No wildcard → single file or a bare prefix
try:
# Attempt to head a single file
resp = s3_client.head_object(Bucket=bucket, Key=raw_path)

if resp["ContentType"] == "application/x-directory":
raise ValueError(
f"'{s3_glob}' appears to be a folder. "
f"Use a wildcard (e.g., '{s3_glob.rstrip('/')}/*.pdf') to match files."
)

return {f"s3://{bucket}/{raw_path}": resp["ETag"].strip('"')}
except ClientError as e:
if e.response["Error"]["Code"] == "404":
# Check if it's actually a folder with contents
check_prefix = raw_path if raw_path.endswith("/") else raw_path + "/"
paginator = s3_client.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket, Prefix=check_prefix):
if page.get("Contents"):
raise ValueError(
f"'{s3_glob}' appears to be a folder. "
f"Use a wildcard (e.g., '{s3_glob.rstrip('/')}/*.pdf') to match files."
)
raise ValueError(f"No object or prefix found at '{s3_glob}'. Check your path or add a wildcard.")
else:
raise


def get_s3_bytes(s3_client, s3_path: str, start_index: Optional[int] = None, end_index: Optional[int] = None) -> bytes:
Expand Down

0 comments on commit 683be68

Please sign in to comment.