# -*- coding: utf-8 -*-
"""
Read and write related API.
.. _bsm: https://github.com/aws-samples/boto-session-manager-project
.. _get_object: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.get_object
.. _put_object: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_object
.. _decode: https://docs.python.org/3/library/stdtypes.html#bytes.decode
.. _encode: https://docs.python.org/3/library/stdtypes.html#str.encode
"""
import typing as T
from datetime import datetime
from func_args import NOTHING, resolve_kwargs
from .. import exc
from ..metadata import warn_upper_case_in_metadata_key
from ..type import TagType, MetadataType
from ..tag import encode_url_query
from ..aws import context
from .resolve_s3_client import resolve_s3_client
if T.TYPE_CHECKING: # pragma: no cover
from .s3path import S3Path
from boto_session_manager import BotoSesManager
from mypy_boto3_s3 import S3Client
[docs]
class ReadAndWriteAPIMixin:
"""
A mixin class that implements the Text / Bytes, Read / Write methods.
"""
[docs]
def read_bytes(
self: "S3Path",
version_id: str = NOTHING,
if_match: str = NOTHING,
if_modified_since: datetime = NOTHING,
if_none_match: str = NOTHING,
if_unmodified_since: datetime = NOTHING,
range: str = NOTHING,
response_cache_control: str = NOTHING,
response_content_disposition: str = NOTHING,
response_content_encoding: str = NOTHING,
response_content_language: str = NOTHING,
response_content_type: str = NOTHING,
response_expires: str = NOTHING,
sse_customer_algorithm: str = NOTHING,
sse_customer_key: str = NOTHING,
request_payer: str = NOTHING,
part_number: int = NOTHING,
expected_bucket_owner: str = NOTHING,
checksum_mode: str = NOTHING,
bsm: T.Optional[T.Union["BotoSesManager", "S3Client"]] = None,
) -> bytes:
"""
Read binary data from s3 object.
A simple wrapper around get_object_.
It also updates this ``S3Path`` object's metadata and attributes like
``etag``, ``size``, ``version_id``, etc with the get_object_ response.
Example:
>>> s3path = S3Path.from_s3_uri("s3://my-bucket/my-file.txt")
>>> s3path.write_bytes(b"hello", metadata={"creator": "me"})
>>> s3path.read_bytes()
b'hello'
>>> s3path.size
5
>>> s3path.metadata
{'creator': 'me'}
:param version_id: See get_object_.
:param if_match: See get_object_.
:param if_modified_since: See get_object_.
:param if_none_match: See get_object_.
:param if_unmodified_since: See get_object_.
:param range: See get_object_.
:param response_cache_control: See get_object_.
:param response_content_disposition: See get_object_.
:param response_content_encoding: See get_object_.
:param response_content_language: See get_object_.
:param response_content_type: See get_object_.
:param response_expires: See get_object_.
:param sse_customer_algorithm: See get_object_.
:param sse_customer_key: See get_object_.
:param request_payer: See get_object_.
:param part_number: See get_object_.
:param expected_bucket_owner: See get_object_.
:param checksum_mode: See get_object_.
:param bsm: See bsm_.
:return: the binary data.
.. versionadded:: 1.0.3
.. versionchanged:: 1.1.2
automatically store metadata in cache.
.. versionchanged:: 2.0.1
add ``version_id`` parameter, and now support full list of get_object_
arguments.
"""
s3_client = resolve_s3_client(context, bsm)
response = s3_client.get_object(
**resolve_kwargs(
Bucket=self.bucket,
Key=self.key,
VersionId=version_id,
IfMatch=if_match,
IfModifiedSince=if_modified_since,
IfNoneMatch=if_none_match,
IfUnmodifiedSince=if_unmodified_since,
Range=range,
ResponseCacheControl=response_cache_control,
ResponseContentDisposition=response_content_disposition,
ResponseContentEncoding=response_content_encoding,
ResponseContentLanguage=response_content_language,
ResponseContentType=response_content_type,
ResponseExpires=response_expires,
SSECustomerAlgorithm=sse_customer_algorithm,
SSECustomerKey=sse_customer_key,
RequestPayer=request_payer,
PartNumber=part_number,
ExpectedBucketOwner=expected_bucket_owner,
ChecksumMode=checksum_mode,
),
)
# print("--- get_object response ---")
# pprint(response)
data = response["Body"].read()
del response["Body"]
del response["ResponseMetadata"]
self._meta = response
return data
[docs]
def read_text(
self: "S3Path",
encoding="utf-8",
errors="strict",
version_id: str = NOTHING,
if_match: str = NOTHING,
if_modified_since: datetime = NOTHING,
if_none_match: str = NOTHING,
if_unmodified_since: datetime = NOTHING,
range: str = NOTHING,
response_cache_control: str = NOTHING,
response_content_disposition: str = NOTHING,
response_content_encoding: str = NOTHING,
response_content_language: str = NOTHING,
response_content_type: str = NOTHING,
response_expires: str = NOTHING,
sse_customer_algorithm: str = NOTHING,
sse_customer_key: str = NOTHING,
request_payer: str = NOTHING,
part_number: int = NOTHING,
expected_bucket_owner: str = NOTHING,
checksum_mode: str = NOTHING,
bsm: T.Optional[T.Union["BotoSesManager", "S3Client"]] = None,
) -> str:
"""
Read text data from s3 object.
A simple wrapper around get_object_.
It also updates this ``S3Path`` object's metadata and attributes like
``etag``, ``size``, ``version_id``, etc with the get_object_ response.
Example:
>>> s3path = S3Path("s3://my-bucket/my-file.txt")
>>> s3path.write_text("hello", metadata={"creator": "me"})
>>> s3path.read_text()
'hello'
>>> s3path.size
5
>>> s3path.metadata
{'creator': 'me'}
:param encoding: See decode_.
:param errors: See decode_.
:param version_id: See get_object_.
:param if_match: See get_object_.
:param if_modified_since: See get_object_.
:param if_none_match: See get_object_.
:param if_unmodified_since: See get_object_.
:param range: See get_object_.
:param response_cache_control: See get_object_.
:param response_content_disposition: See get_object_.
:param response_content_encoding: See get_object_.
:param response_content_language: See get_object_.
:param response_content_type: See get_object_.
:param response_expires: See get_object_.
:param sse_customer_algorithm: See get_object_.
:param sse_customer_key: See get_object_.
:param request_payer: See get_object_.
:param part_number: See get_object_.
:param expected_bucket_owner: See get_object_.
:param checksum_mode: See get_object_.
:param bsm: See bsm_.
:return: the string data.
.. versionadded:: 1.0.3
.. versionchanged:: 1.1.2
automatically store metadata in cache.
.. versionchanged:: 2.0.1
add ``version_id`` parameter, and now support full list of get_object_
arguments.
"""
data = self.read_bytes(
version_id=version_id,
if_match=if_match,
if_modified_since=if_modified_since,
if_none_match=if_none_match,
if_unmodified_since=if_unmodified_since,
range=range,
response_cache_control=response_cache_control,
response_content_disposition=response_content_disposition,
response_content_encoding=response_content_encoding,
response_content_language=response_content_language,
response_content_type=response_content_type,
response_expires=response_expires,
sse_customer_algorithm=sse_customer_algorithm,
sse_customer_key=sse_customer_key,
request_payer=request_payer,
part_number=part_number,
expected_bucket_owner=expected_bucket_owner,
checksum_mode=checksum_mode,
bsm=bsm,
)
return data.decode(encoding, errors=errors)
[docs]
def write_bytes(
self: "S3Path",
data: bytes,
metadata: MetadataType = NOTHING,
tags: TagType = NOTHING,
acl: str = NOTHING,
cache_control: str = NOTHING,
content_disposition: str = NOTHING,
content_encoding: str = NOTHING,
content_language: str = NOTHING,
content_length: int = NOTHING,
content_md5: str = NOTHING,
content_type: str = NOTHING,
checksum_algorithm: str = NOTHING,
checksum_crc32: str = NOTHING,
checksum_crc32c: str = NOTHING,
checksum_sha1: str = NOTHING,
checksum_sha256: str = NOTHING,
expires_datetime: datetime = NOTHING,
grant_full_control: str = NOTHING,
grant_read: str = NOTHING,
grant_read_acp: str = NOTHING,
grant_write_acp: str = NOTHING,
server_side_encryption: str = NOTHING,
storage_class: str = NOTHING,
website_redirect_location: str = NOTHING,
sse_customer_algorithm: str = NOTHING,
sse_customer_key: str = NOTHING,
sse_kms_key_id: str = NOTHING,
sse_kms_encryption_context: str = NOTHING,
bucket_key_enabled: bool = NOTHING,
request_payer: str = NOTHING,
object_lock_mode: str = NOTHING,
object_lock_retain_until_datetime: datetime = NOTHING,
object_lock_legal_hold_status: str = NOTHING,
expected_bucket_owner: str = NOTHING,
bsm: T.Optional[T.Union["BotoSesManager", "S3Client"]] = None,
) -> "S3Path":
"""
Write binary data to s3 object.
A simple wrapper around put_object_.
Example:
>>> s3path = S3Path("s3://my-bucket/my-file.txt")
>>> s3path.write_bytes(b"hello", metadata={"creator": "me"})
>>> s3path.size
5
>>> s3path.metadata
{'creator': 'me'}
:param data: the text you want to write.
:param metadata: the s3 object metadata in string key value pair dict.
:param tags: the s3 object tags in string key value pair dict.
:param acl: See put_object_.
:param cache_control: See put_object_.
:param content_disposition: See put_object_.
:param content_encoding: See put_object_.
:param content_language: See put_object_.
:param content_length: See put_object_.
:param content_md5: See put_object_.
:param content_type: See put_object_.
:param checksum_algorithm: See put_object_.
:param checksum_crc32: See put_object_.
:param checksum_crc32c: See put_object_.
:param checksum_sha1: See put_object_.
:param checksum_sha256: See put_object_.
:param expires_datetime: See put_object_.
:param grant_full_control: See put_object_.
:param grant_read: See put_object_.
:param grant_read_acp: See put_object_.
:param grant_write_acp: See put_object_.
:param server_side_encryption: See put_object_.
:param storage_class: See put_object_.
:param website_redirect_location: See put_object_.
:param sse_customer_algorithm: See put_object_.
:param sse_customer_key: See put_object_.
:param sse_kms_key_id: See put_object_.
:param sse_kms_encryption_context: See put_object_.
:param bucket_key_enabled: See put_object_.
:param request_payer: See put_object_.
:param object_lock_mode: See put_object_.
:param object_lock_retain_until_datetime: See put_object_.
:param object_lock_legal_hold_status: See put_object_.
:param expected_bucket_owner: See put_object_.
:param bsm: See bsm_.
:return: A new :class:`~s3pathlib.core.s3path.S3Path` object with the
same bucket and key, but the new metadata representing the object
you just put.
.. versionadded:: 1.0.3
.. versionchanged:: 1.1.1
add ``metadata`` and ``tags`` parameters.
"""
s3_client = resolve_s3_client(context, bsm)
if metadata is not NOTHING:
warn_upper_case_in_metadata_key(metadata)
response = s3_client.put_object(
**resolve_kwargs(
Bucket=self.bucket,
Key=self.key,
Body=data,
Metadata=metadata,
Tagging=tags if tags is NOTHING else encode_url_query(tags),
ACL=acl,
CacheControl=cache_control,
ContentDisposition=content_disposition,
ContentEncoding=content_encoding,
ContentLanguage=content_language,
ContentLength=content_length,
ContentMD5=content_md5,
ContentType=content_type,
ChecksumAlgorithm=checksum_algorithm,
ChecksumCRC32=checksum_crc32,
ChecksumCRC32C=checksum_crc32c,
ChecksumSHA1=checksum_sha1,
ChecksumSHA256=checksum_sha256,
Expires=expires_datetime,
GrantFullControl=grant_full_control,
GrantRead=grant_read,
GrantReadACP=grant_read_acp,
GrantWriteACP=grant_write_acp,
ServerSideEncryption=server_side_encryption,
StorageClass=storage_class,
WebsiteRedirectLocation=website_redirect_location,
SSECustomerAlgorithm=sse_customer_algorithm,
SSECustomerKey=sse_customer_key,
SSEKMSKeyId=sse_kms_key_id,
SSEKMSEncryptionContext=sse_kms_encryption_context,
BucketKeyEnabled=bucket_key_enabled,
RequestPayer=request_payer,
ObjectLockMode=object_lock_mode,
ObjectLockRetainUntilDate=object_lock_retain_until_datetime,
ObjectLockLegalHoldStatus=object_lock_legal_hold_status,
ExpectedBucketOwner=expected_bucket_owner,
)
)
# print("--- put_object response ---")
# pprint(response)
del response["ResponseMetadata"]
response["ContentLength"] = len(data)
if metadata is not NOTHING:
response["Metadata"] = metadata
s3path = self.copy()
s3path._meta = response
return s3path
[docs]
def write_text(
self: "S3Path",
data: str,
encoding: str = "utf-8",
errors: str = "strict",
metadata: MetadataType = NOTHING,
tags: TagType = NOTHING,
acl: str = NOTHING,
cache_control: str = NOTHING,
content_disposition: str = NOTHING,
content_encoding: str = NOTHING,
content_language: str = NOTHING,
content_length: int = NOTHING,
content_md5: str = NOTHING,
content_type: str = NOTHING,
checksum_algorithm: str = NOTHING,
checksum_crc32: str = NOTHING,
checksum_crc32c: str = NOTHING,
checksum_sha1: str = NOTHING,
checksum_sha256: str = NOTHING,
expires_datetime: datetime = NOTHING,
grant_full_control: str = NOTHING,
grant_read: str = NOTHING,
grant_read_acp: str = NOTHING,
grant_write_acp: str = NOTHING,
server_side_encryption: str = NOTHING,
storage_class: str = NOTHING,
website_redirect_location: str = NOTHING,
sse_customer_algorithm: str = NOTHING,
sse_customer_key: str = NOTHING,
sse_kms_key_id: str = NOTHING,
sse_kms_encryption_context: str = NOTHING,
bucket_key_enabled: bool = NOTHING,
request_payer: str = NOTHING,
object_lock_mode: str = NOTHING,
object_lock_retain_until_datetime: datetime = NOTHING,
object_lock_legal_hold_status: str = NOTHING,
expected_bucket_owner: str = NOTHING,
bsm: T.Optional[T.Union["BotoSesManager", "S3Client"]] = None,
) -> "S3Path":
"""
Write text to s3 object.
A simple wrapper around put_object_.
Example:
>>> s3path = S3Path("s3://my-bucket/my-file.txt")
>>> s3path.write_text("hello", metadata={"creator": "me"})
>>> s3path.size
5
>>> s3path.metadata
{'creator': 'me'}
:param data: the text you want to write.
:param encoding: See encode_.
:param errors: See encode_.
:param metadata: the s3 object metadata in string key value pair dict.
:param tags: the s3 object tags in string key value pair dict.
:param acl: See put_object_.
:param cache_control: See put_object_.
:param content_disposition: See put_object_.
:param content_encoding: See put_object_.
:param content_language: See put_object_.
:param content_length: See put_object_.
:param content_md5: See put_object_.
:param content_type: See put_object_.
:param checksum_algorithm: See put_object_.
:param checksum_crc32: See put_object_.
:param checksum_crc32c: See put_object_.
:param checksum_sha1: See put_object_.
:param checksum_sha256: See put_object_.
:param expires_datetime: See put_object_.
:param grant_full_control: See put_object_.
:param grant_read: See put_object_.
:param grant_read_acp: See put_object_.
:param grant_write_acp: See put_object_.
:param server_side_encryption: See put_object_.
:param storage_class: See put_object_.
:param website_redirect_location: See put_object_.
:param sse_customer_algorithm: See put_object_.
:param sse_customer_key: See put_object_.
:param sse_kms_key_id: See put_object_.
:param sse_kms_encryption_context: See put_object_.
:param bucket_key_enabled: See put_object_.
:param request_payer: See put_object_.
:param object_lock_mode: See put_object_.
:param object_lock_retain_until_datetime: See put_object_.
:param object_lock_legal_hold_status: See put_object_.
:param expected_bucket_owner: See put_object_.
:param bsm: See bsm_.
:return: A new :class:`~s3pathlib.core.s3path.S3Path` object with the
same bucket and key, but the new metadata representing the object
you just put.
.. versionadded:: 1.0.3
.. versionchanged:: 1.1.1
add ``metadata`` and ``tags`` parameters.
"""
body = data.encode(encoding, errors=errors)
return self.write_bytes(
data=body,
metadata=metadata,
tags=tags,
bsm=bsm,
acl=acl,
cache_control=cache_control,
content_disposition=content_disposition,
content_encoding=content_encoding,
content_language=content_language,
content_length=content_length,
content_md5=content_md5,
content_type=content_type,
checksum_algorithm=checksum_algorithm,
checksum_crc32=checksum_crc32,
checksum_crc32c=checksum_crc32c,
checksum_sha1=checksum_sha1,
checksum_sha256=checksum_sha256,
expires_datetime=expires_datetime,
grant_full_control=grant_full_control,
grant_read=grant_read,
grant_read_acp=grant_read_acp,
grant_write_acp=grant_write_acp,
server_side_encryption=server_side_encryption,
storage_class=storage_class,
website_redirect_location=website_redirect_location,
sse_customer_algorithm=sse_customer_algorithm,
sse_customer_key=sse_customer_key,
sse_kms_key_id=sse_kms_key_id,
sse_kms_encryption_context=sse_kms_encryption_context,
bucket_key_enabled=bucket_key_enabled,
request_payer=request_payer,
object_lock_mode=object_lock_mode,
object_lock_retain_until_datetime=object_lock_retain_until_datetime,
object_lock_legal_hold_status=object_lock_legal_hold_status,
expected_bucket_owner=expected_bucket_owner,
)
[docs]
def touch(
self: "S3Path",
exist_ok: bool = False,
metadata: MetadataType = NOTHING,
tags: TagType = NOTHING,
acl: str = NOTHING,
cache_control: str = NOTHING,
content_disposition: str = NOTHING,
content_encoding: str = NOTHING,
content_language: str = NOTHING,
content_length: int = NOTHING,
content_md5: str = NOTHING,
content_type: str = NOTHING,
checksum_algorithm: str = NOTHING,
checksum_crc32: str = NOTHING,
checksum_crc32c: str = NOTHING,
checksum_sha1: str = NOTHING,
checksum_sha256: str = NOTHING,
expires_datetime: datetime = NOTHING,
grant_full_control: str = NOTHING,
grant_read: str = NOTHING,
grant_read_acp: str = NOTHING,
grant_write_acp: str = NOTHING,
server_side_encryption: str = NOTHING,
storage_class: str = NOTHING,
website_redirect_location: str = NOTHING,
sse_customer_algorithm: str = NOTHING,
sse_customer_key: str = NOTHING,
sse_kms_key_id: str = NOTHING,
sse_kms_encryption_context: str = NOTHING,
bucket_key_enabled: bool = NOTHING,
request_payer: str = NOTHING,
object_lock_mode: str = NOTHING,
object_lock_retain_until_datetime: datetime = NOTHING,
object_lock_legal_hold_status: str = NOTHING,
expected_bucket_owner: str = NOTHING,
bsm: T.Optional[T.Union["BotoSesManager", "S3Client"]] = None,
):
"""
Create an empty S3 object at the S3 location if the S3 object not exists.
Do nothing if already exists.
Example:
>>> s3path = S3Path("s3://my-bucket/my-file.txt")
>>> s3path.write_text("hello", metadata={"creator": "me"})
>>> s3path.size
5
>>> s3path.metadata
{'creator': 'me'}
:param exist_ok: if True, it won't raise error when the S3 object
already exists.
:param data: the text you want to write.
:param metadata: the s3 object metadata in string key value pair dict.
:param tags: the s3 object tags in string key value pair dict.
:param acl: See put_object_.
:param cache_control: See put_object_.
:param content_disposition: See put_object_.
:param content_encoding: See put_object_.
:param content_language: See put_object_.
:param content_length: See put_object_.
:param content_md5: See put_object_.
:param content_type: See put_object_.
:param checksum_algorithm: See put_object_.
:param checksum_crc32: See put_object_.
:param checksum_crc32c: See put_object_.
:param checksum_sha1: See put_object_.
:param checksum_sha256: See put_object_.
:param expires_datetime: See put_object_.
:param grant_full_control: See put_object_.
:param grant_read: See put_object_.
:param grant_read_acp: See put_object_.
:param grant_write_acp: See put_object_.
:param server_side_encryption: See put_object_.
:param storage_class: See put_object_.
:param website_redirect_location: See put_object_.
:param sse_customer_algorithm: See put_object_.
:param sse_customer_key: See put_object_.
:param sse_kms_key_id: See put_object_.
:param sse_kms_encryption_context: See put_object_.
:param bucket_key_enabled: See put_object_.
:param request_payer: See put_object_.
:param object_lock_mode: See put_object_.
:param object_lock_retain_until_datetime: See put_object_.
:param object_lock_legal_hold_status: See put_object_.
:param expected_bucket_owner: See put_object_.
:param bsm: See bsm_.
.. versionadded:: 1.0.6
.. versionchanged:: 1.2.1
add ``metadata`` and ``tags`` parameters.
"""
self.ensure_object()
kwargs = dict(
data="",
metadata=metadata,
tags=tags,
acl=acl,
cache_control=cache_control,
content_disposition=content_disposition,
content_encoding=content_encoding,
content_language=content_language,
content_length=content_length,
content_md5=content_md5,
content_type=content_type,
checksum_algorithm=checksum_algorithm,
checksum_crc32=checksum_crc32,
checksum_crc32c=checksum_crc32c,
checksum_sha1=checksum_sha1,
checksum_sha256=checksum_sha256,
expires_datetime=expires_datetime,
grant_full_control=grant_full_control,
grant_read=grant_read,
grant_read_acp=grant_read_acp,
grant_write_acp=grant_write_acp,
server_side_encryption=server_side_encryption,
storage_class=storage_class,
website_redirect_location=website_redirect_location,
sse_customer_algorithm=sse_customer_algorithm,
sse_customer_key=sse_customer_key,
sse_kms_key_id=sse_kms_key_id,
sse_kms_encryption_context=sse_kms_encryption_context,
bucket_key_enabled=bucket_key_enabled,
request_payer=request_payer,
object_lock_mode=object_lock_mode,
object_lock_retain_until_datetime=object_lock_retain_until_datetime,
object_lock_legal_hold_status=object_lock_legal_hold_status,
expected_bucket_owner=expected_bucket_owner,
bsm=bsm,
)
if exist_ok:
self.write_text(**kwargs)
elif self.exists(bsm=bsm) is False:
self.write_text(**kwargs)
else:
raise exc.S3FileAlreadyExist.make(self.uri)
[docs]
def mkdir(
self: "S3Path",
exist_ok: bool = False,
parents: bool = False,
bsm: T.Optional[T.Union["BotoSesManager", "S3Client"]] = None,
):
"""
Make an S3 folder (empty "/" file)
Example:
>>> s3dir = S3Path("s3://my-bucket/my-folder/").to_dir()
>>> s3dir.to_dir(exist_ok=True)
:param exist_ok: If True, it won't raise error when the S3 folder already exists.
:param parents: If True, all parent folders will be created.
:param bsm: See bsm_.
.. versionadded:: 1.0.6
"""
self.ensure_dir()
if exist_ok:
self.write_text("", bsm=bsm)
elif self.exists(bsm=bsm) is False:
self.write_text("", bsm=bsm)
else:
raise exc.S3FolderAlreadyExist.make(self.uri)
if parents:
for p in self.parents:
if p.is_bucket() is False:
p.mkdir(exist_ok=True, parents=False, bsm=bsm)