# -*- coding: utf-8 -*-
import typing as T
import hashlib
try:
import botocore.exceptions
except ImportError: # pragma: no cover
pass
except: # pragma: no cover
raise
[docs]
def split_s3_uri(
s3_uri: str,
) -> T.Tuple[str, str]:
"""
Split AWS S3 URI, returns bucket and key.
:param s3_uri: example, ``"s3://my-bucket/my-folder/data.json"``
.. versionadded:: 1.0.1
"""
parts = s3_uri.split("/")
bucket = parts[2]
key = "/".join(parts[3:])
return bucket, key
[docs]
def join_s3_uri(
bucket: str,
key: str,
) -> str:
"""
Join AWS S3 URI from bucket and key.
:param bucket: example, ``"my-bucket"``
:param key: example, ``"my-folder/data.json"`` or ``"my-folder/"``
.. versionadded:: 1.0.1
"""
return "s3://{}/{}".format(bucket, key)
[docs]
def split_parts(key: str) -> T.List[str]:
"""
Split s3 key parts using "/" delimiter.
Example::
>>> split_parts("a/b/c")
["a", "b", "c"]
>>> split_parts("//a//b//c//")
["a", "b", "c"]
.. versionadded:: 1.0.1
"""
return [part for part in key.split("/") if part]
[docs]
def smart_join_s3_key(
parts: T.List[str],
is_dir: bool,
) -> str:
"""
Note, it assume that there's no such double slack in your path. It ensure
that there's only one consecutive "/" in the s3 key.
:param parts: list of s3 key path parts, could have "/"
:param is_dir: if True, the s3 key ends with "/". otherwise enforce no
tailing "/".
Example::
>>> smart_join_s3_key(parts=["/a/", "b/", "/c"], is_dir=True)
a/b/c/
>>> smart_join_s3_key(parts=["/a/", "b/", "/c"], is_dir=False)
a/b/c
.. versionadded:: 1.0.1
"""
new_parts = list()
for part in parts:
new_parts.extend(split_parts(part))
key = "/".join(new_parts)
if is_dir:
return key + "/"
else:
return key
[docs]
def make_s3_console_url(
bucket: T.Optional[str] = None,
prefix: T.Optional[str] = None,
s3_uri: T.Optional[str] = None,
version_id: T.Optional[str] = None,
aws_region: T.Optional[str] = None,
is_us_gov_cloud: bool = False,
) -> str:
"""
Return an AWS Console url that you can use to open it in your browser.
:param bucket: example, ``"my-bucket"``
:param prefix: example, ``"my-folder/"``
:param s3_uri: example, ``"s3://my-bucket/my-folder/data.json"``
Example::
>>> make_s3_console_url(s3_uri="s3://my-bucket/my-folder/data.json")
https://s3.console.aws.amazon.com/s3/object/my-bucket?prefix=my-folder/data.json
.. versionadded:: 1.0.1
.. versionchanged:: 2.0.1
add ``version_id`` parameter.
.. versionchanged:: 2.2.2
"""
if s3_uri is None:
if not ((bucket is not None) and (prefix is not None)):
raise ValueError
else:
if not ((bucket is None) and (prefix is None)):
raise ValueError
bucket, prefix = split_s3_uri(s3_uri)
if aws_region is None:
region_part_in_domain = ""
region_param = ""
else: # pragma: no cover
region_part_in_domain = f"{aws_region}."
region_param = f"region={aws_region}&"
if is_us_gov_cloud:
endpoint = f"{region_part_in_domain}console.amazonaws-us-gov.com"
else:
endpoint = f"{region_part_in_domain}console.aws.amazon.com"
if len(prefix) == 0:
return (
"https://{endpoint}/s3/buckets/{bucket}?{region_param}tab=objects".format(
endpoint=endpoint,
bucket=bucket,
region_param=region_param,
)
)
elif prefix.endswith("/"):
s3_type = "buckets"
prefix_part = f"prefix={prefix}"
else:
s3_type = "object"
prefix_part = f"prefix={prefix}"
if version_id is None:
version_part = ""
else:
version_part = f"&versionId={version_id}"
return f"https://{endpoint}/s3/{s3_type}/{bucket}?{region_param}{prefix_part}{version_part}"
def make_s3_select_console_url(
bucket: str,
key: str,
aws_region: T.Optional[str] = None,
is_us_gov_cloud: bool = False,
) -> str:
if aws_region is None:
region_part_in_domain = ""
region_param = ""
else: # pragma: no cover
region_part_in_domain = f"{aws_region}."
region_param = f"region={aws_region}&"
if is_us_gov_cloud:
endpoint = f"{region_part_in_domain}console.amazonaws-us-gov.com"
else:
endpoint = f"{region_part_in_domain}console.aws.amazon.com"
return "https://{endpoint}/s3/buckets/{bucket}/object/select?{region_param}prefix={key}".format(
endpoint=endpoint,
bucket=bucket,
region_param=region_param,
key=key,
)
[docs]
def ensure_s3_object(
s3_key_or_uri: str,
) -> None:
"""
Raise exception if the string is not in valid format for a AWS S3 object
.. versionadded:: 1.0.1
"""
if s3_key_or_uri.endswith("/"):
raise ValueError("'{}' doesn't represent s3 object!".format(s3_key_or_uri))
[docs]
def ensure_s3_dir(s3_key_or_uri: str) -> None:
"""
Raise exception if the string is not in valid format for a AWS S3 directory
.. versionadded:: 1.0.1
"""
if not s3_key_or_uri.endswith("/"):
raise ValueError("'{}' doesn't represent s3 dir!".format(s3_key_or_uri))
[docs]
def validate_s3_bucket(bucket):
"""
Ref:
https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
"""
pass
[docs]
def validate_s3_key(key):
"""
Ref:
https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-keys.html#object-key-guidelines
"""
pass
MAGNITUDE_OF_DATA = {
i: v for i, v in enumerate(["B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"])
}
[docs]
def repr_data_size(
size_in_bytes: int,
precision: int = 2,
) -> str: # pragma: no cover
"""
Return human readable string represent of a file size. Doesn't support
size greater than 1YB.
For example:
- 100 bytes => 100 B
- 100,000 bytes => 97.66 KB
- 100,000,000 bytes => 95.37 MB
- 100,000,000,000 bytes => 93.13 GB
- 100,000,000,000,000 bytes => 90.95 TB
- 100,000,000,000,000,000 bytes => 88.82 PB
- and more ...
Magnitude of data::
1000 kB kilobyte
1000 ** 2 MB megabyte
1000 ** 3 GB gigabyte
1000 ** 4 TB terabyte
1000 ** 5 PB petabyte
1000 ** 6 EB exabyte
1000 ** 7 ZB zettabyte
1000 ** 8 YB yottabyte
.. versionadded:: 1.0.1
"""
if size_in_bytes < 1024:
return "%s B" % size_in_bytes
index = 0
while 1:
index += 1
size_in_bytes, mod = divmod(size_in_bytes, 1024)
if size_in_bytes < 1024:
break
template = "{0:.%sf} {1}" % precision
s = template.format(size_in_bytes + mod / 1024.0, MAGNITUDE_OF_DATA[index])
return s
[docs]
def parse_data_size(s) -> int: # pragma: no cover
"""
Parse human readable string representing a file size. Doesn't support
size greater than 1YB.
Examples::
>>> parse_data_size("3.43 MB")
3596615
>>> parse_data_size("2_512.4 MB")
2634442342
>>> parse_data_size("2,512.4 MB")
2634442342
.. versionadded:: 1.0.5
"""
s = s.strip()
# split digits and
digits = set("01234567890_,.")
digit_parts = list()
ind = 0
for ind, c in enumerate(s):
if c in digits:
digit_parts.append(c)
else:
break
digit = "".join(digit_parts)
digit = digit.replace("_", "").replace(",", "")
digit = float(digit)
unit_part = s[ind:].strip()
unit_ind = None
for ind, unit in MAGNITUDE_OF_DATA.items():
if unit_part.upper() == unit:
unit_ind = ind
break
if unit_ind is None:
raise ValueError
unit = 1024**unit_ind
return int(digit * unit)
[docs]
def hash_binary(
b: bytes,
hash_meth: callable,
) -> str: # pragma: no cover
"""
Get the hash of a binary object.
:param b: binary object
:param hash_meth: callable hash method, example: hashlib.md5
:return: hash value in hex digits.
.. versionadded:: 1.0.1
"""
m = hash_meth()
m.update(b)
return m.hexdigest()
[docs]
def md5_binary(
b: bytes,
) -> str: # pragma: no cover
"""
Get the md5 hash of a binary object.
:param b: binary object
:return: hash value in hex digits.
.. versionadded:: 1.0.1
"""
return hash_binary(b, hashlib.md5)
[docs]
def sha256_binary(
b: bytes,
) -> str: # pragma: no cover
"""
Get the md5 hash of a binary object.
:param b: binary object
:return: hash value in hex digits.
.. versionadded:: 1.0.1
"""
return hash_binary(b, hashlib.sha256)
DEFAULT_CHUNK_SIZE = 1 << 6
[docs]
def hash_file(
abspath: str,
hash_meth: callable,
nbytes: int = 0,
chunk_size: int = DEFAULT_CHUNK_SIZE,
) -> str: # pragma: no cover
"""
Get the hash of a file on local drive.
:param abspath: absolute path of the file
:param hash_meth: callable hash method, example: hashlib.md5
:param nbytes: only hash first nbytes of the file
:param chunk_size: internal option, stream chunk_size of the data for hash
each time, avoid high memory usage.
:return: hash value in hex digits.
.. versionadded:: 1.0.1
"""
if nbytes < 0:
raise ValueError("nbytes cannot smaller than 0")
if chunk_size < 1:
raise ValueError("nbytes cannot smaller than 1")
if (nbytes > 0) and (nbytes < chunk_size):
chunk_size = nbytes
m = hash_meth()
with open(abspath, "rb") as f:
if nbytes: # use first n bytes
have_reads = 0
while True:
have_reads += chunk_size
if have_reads > nbytes:
n = nbytes - (have_reads - chunk_size)
if n:
data = f.read(n)
m.update(data)
break
else:
data = f.read(chunk_size)
m.update(data)
else: # use entire content
while True:
data = f.read(chunk_size)
if not data:
break
m.update(data)
return m.hexdigest()
[docs]
def grouper_list(
l: T.Iterable,
n: int,
) -> T.Iterable[list]: # pragma: no cover
"""
Evenly divide list into fixed-length piece, no filled value if chunk
size smaller than fixed-length.
Example::
>>> list(grouper_list(range(10), n=3)
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
:param l: an iterable object
:param n: number of item per list
.. versionadded:: 1.0.1
"""
chunk = list()
counter = 0
for item in l:
counter += 1
chunk.append(item)
if counter == n:
yield chunk
chunk = list()
counter = 0
if len(chunk) > 0:
yield chunk