Source code for s3pathlib.core.opener

# -*- coding: utf-8 -*-

"""
Smart open library integration.

.. _bsm: https://github.com/aws-samples/boto-session-manager-project
.. _smart_open: https://github.com/RaRe-Technologies/smart_open
.. _get_object: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.get_object
.. _put_object: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_object
"""

import typing as T
import copy
from datetime import datetime

from func_args import NOTHING, resolve_kwargs

from ..metadata import warn_upper_case_in_metadata_key
from ..aws import context
from ..compat import smart_open, compat
from ..type import MetadataType, TagType
from ..tag import encode_url_query

from .resolve_s3_client import resolve_s3_client

if T.TYPE_CHECKING:  # pragma: no cover
    from .s3path import S3Path
    from boto_session_manager import BotoSesManager
    from mypy_boto3_s3 import S3Client


[docs] class OpenerAPIMixin: """ A mixin class that implements the file-object protocol. """
[docs] def open( self: "S3Path", mode: T.Optional[str] = "r", version_id: T.Optional[str] = NOTHING, buffering: T.Optional[int] = -1, encoding: T.Optional[str] = None, errors: T.Optional[str] = None, newline: T.Optional[str] = None, closefd=True, opener=None, ignore_ext: bool = False, compression: T.Optional[str] = None, multipart_upload: bool = True, metadata: T.Optional[MetadataType] = NOTHING, tags: T.Optional[TagType] = NOTHING, # write related parameters acl: str = NOTHING, cache_control: str = NOTHING, content_disposition: str = NOTHING, content_encoding: str = NOTHING, content_language: str = NOTHING, content_length: int = NOTHING, content_md5: str = NOTHING, content_type: str = NOTHING, checksum_algorithm: str = NOTHING, # put_object exclusive parameters checksum_crc32: str = NOTHING, checksum_crc32c: str = NOTHING, checksum_sha1: str = NOTHING, checksum_sha256: str = NOTHING, # write related parameters expires_datetime: datetime = NOTHING, grant_full_control: str = NOTHING, grant_read: str = NOTHING, grant_read_acp: str = NOTHING, grant_write_acp: str = NOTHING, server_side_encryption: str = NOTHING, storage_class: str = NOTHING, website_redirect_location: str = NOTHING, # common read / write related parameters sse_customer_algorithm: str = NOTHING, sse_customer_key: str = NOTHING, # write related parameters sse_kms_key_id: str = NOTHING, sse_kms_encryption_context: str = NOTHING, bucket_key_enabled: bool = NOTHING, # common read / write related parameters request_payer: str = NOTHING, # write related parameters object_lock_mode: str = NOTHING, object_lock_retain_until_datetime: datetime = NOTHING, object_lock_legal_hold_status: str = NOTHING, # common read / write related parameters expected_bucket_owner: str = NOTHING, # read related parameters if_match: str = NOTHING, if_modified_since: datetime = NOTHING, if_none_match: str = NOTHING, if_unmodified_since: datetime = NOTHING, range: str = NOTHING, response_cache_control: str = NOTHING, response_content_disposition: str = NOTHING, response_content_encoding: str = NOTHING, response_content_language: str = NOTHING, response_content_type: str = NOTHING, response_expires: str = NOTHING, part_number: int = NOTHING, checksum_mode: str = NOTHING, # other parameters transport_params: T.Optional[dict] = None, bsm: T.Optional[T.Union["BotoSesManager", "S3Client"]] = None, ): """ Open S3Path as a file-liked object. Example:: >>> import json >>> with S3Path("s3://bucket/data.json").open("w") as f: ... json.dump({"a": 1}, f) >>> with S3Path("s3://bucket/data.json").open("r") as f: ... data = json.load(f) :param mode: "r", "w", "rb", "wb". :param version_id: optional version id you want to read from. :param buffering: See smart_open_. :param encoding: See smart_open_. :param errors: See smart_open_. :param newline: See smart_open_. :param closefd: See smart_open_. :param opener: See smart_open_. :param ignore_ext: See smart_open_. :param compression: whether do you want to compress the content. :param multipart_upload: do you want to use multi-parts upload, by default it is True. :param metadata: also put the user defined metadata dictionary. :param tags: also put the tag dictionary. :param acl: See put_object_. :param cache_control: See put_object_. :param content_disposition: See put_object_. :param content_encoding: See put_object_. :param content_language: See put_object_. :param content_length: See put_object_. :param content_md5: See put_object_. :param content_type: See put_object_. :param checksum_algorithm: See put_object_. :param checksum_crc32: See put_object_. :param checksum_crc32c: See put_object_. :param checksum_sha1: See put_object_. :param checksum_sha256: See put_object_. :param expires_datetime: See put_object_. :param grant_full_control: See put_object_. :param grant_read: See put_object_. :param grant_read_acp: See put_object_. :param grant_write_acp: See put_object_. :param server_side_encryption: See put_object_. :param storage_class: See put_object_. :param website_redirect_location: See put_object_. :param sse_customer_algorithm: See put_object_ or get_object_. :param sse_customer_key: See put_object_ or get_object_. :param sse_kms_key_id: See put_object_. :param sse_kms_encryption_context: See put_object_. :param bucket_key_enabled: See put_object_. :param request_payer: See put_object_ or get_object_. :param object_lock_mode: See put_object_. :param object_lock_retain_until_datetime: See put_object_. :param object_lock_legal_hold_status: See put_object_. :param expected_bucket_owner: See put_object_ or get_object_. :param if_match: See get_object_. :param if_modified_since: See get_object_. :param if_none_match: See get_object_. :param if_unmodified_since: See get_object_. :param range: See get_object_. :param response_cache_control: See get_object_. :param response_content_disposition: See get_object_. :param response_content_encoding: See get_object_. :param response_content_language: See get_object_. :param response_content_type: See get_object_. :param response_expires: See get_object_. :param part_number: See get_object_. :param checksum_mode: See get_object_. :param bsm: See bsm_. :return: a file-like object that has ``read()`` and ``write()`` method. See smart_open_ for more info. Also see https://github.com/RaRe-Technologies/smart_open/blob/develop/howto.md#how-to-access-s3-anonymously for S3 related info. .. versionadded:: 1.0.1 .. versionchanged:: 1.2.1 add ``metadata`` and ``tags`` parameters .. versionchanged:: 2.0.1 add ``version_id`` parameter .. versionchanged:: 2.1.1 add full list of get_object, put_object, create_multipart_upload arguments """ s3_client = resolve_s3_client(context, bsm) if transport_params is None: transport_params = dict() else: transport_params = transport_params.copy() if "client_kwargs" in transport_params: transport_params["client_kwargs"] = copy.deepcopy( transport_params["client_kwargs"] ) transport_params["client"] = s3_client transport_params["multipart_upload"] = multipart_upload # write API doesn't take version_id parameter # set it to NOTHING in case human made a mistake if mode.startswith("w") is True: # pragma: no cover version_id = NOTHING if metadata is not NOTHING: warn_upper_case_in_metadata_key(metadata) is_write_mode = True elif mode.startswith("r") is True: # pragma: no cover is_write_mode = False else: # pragma: no cover raise ValueError("mode must be one of 'r', 'w', 'rb', 'wb'") if version_id is not NOTHING: transport_params["version_id"] = version_id open_kwargs = dict( uri=self.uri, mode=mode, buffering=buffering, encoding=encoding, errors=errors, newline=newline, closefd=closefd, opener=opener, transport_params=transport_params, ) if compat.smart_open_version_major < 6: # pragma: no cover open_kwargs["ignore_ext"] = ignore_ext if ( compat.smart_open_version_major >= 5 and compat.smart_open_version_major >= 1 ): # pragma: no cover if compression is not None: open_kwargs["compression"] = compression existing_client_kwargs: T.Dict[str, T.Dict[str, T.Any]] existing_client_kwargs = transport_params.get("client_kwargs", {}) if is_write_mode: # if any of additional parameters exists, we need additional handling if sum([metadata is not NOTHING, tags is not NOTHING]) > 0: s3_client_kwargs = resolve_kwargs( Metadata=metadata, Tagging=tags if tags is NOTHING else encode_url_query(tags), ) if multipart_upload: key_name = "S3.Client.create_multipart_upload" s3_client_kwargs.update( resolve_kwargs( ACL=acl, CacheControl=cache_control, ContentDisposition=content_disposition, ContentEncoding=content_encoding, ContentLanguage=content_language, ContentType=content_type, Expires=expires_datetime, GrantFullControl=grant_full_control, GrantRead=grant_read, GrantReadACP=grant_read_acp, GrantWriteACP=grant_write_acp, ServerSideEncryption=server_side_encryption, StorageClass=storage_class, WebsiteRedirectLocation=website_redirect_location, SSECustomerAlgorithm=sse_customer_algorithm, SSECustomerKey=sse_customer_key, SSEKMSKeyId=sse_kms_key_id, SSEKMSEncryptionContext=sse_kms_encryption_context, BucketKeyEnabled=bucket_key_enabled, RequestPayer=request_payer, ObjectLockMode=object_lock_mode, ObjectLockRetainUntilDate=object_lock_retain_until_datetime, ObjectLockLegalHoldStatus=object_lock_legal_hold_status, ExpectedBucketOwner=expected_bucket_owner, ChecksumAlgorithm=checksum_algorithm, ) ) else: s3_client_kwargs.update( resolve_kwargs( ACL=acl, CacheControl=cache_control, ContentDisposition=content_disposition, ContentEncoding=content_encoding, ContentLanguage=content_language, ContentLength=content_length, ContentMD5=content_md5, ContentType=content_type, ChecksumAlgorithm=checksum_algorithm, ChecksumCRC32=checksum_crc32, ChecksumCRC32C=checksum_crc32c, ChecksumSHA1=checksum_sha1, ChecksumSHA256=checksum_sha256, Expires=expires_datetime, GrantFullControl=grant_full_control, GrantRead=grant_read, GrantReadACP=grant_read_acp, GrantWriteACP=grant_write_acp, ServerSideEncryption=server_side_encryption, StorageClass=storage_class, WebsiteRedirectLocation=website_redirect_location, SSECustomerAlgorithm=sse_customer_algorithm, SSECustomerKey=sse_customer_key, SSEKMSKeyId=sse_kms_key_id, SSEKMSEncryptionContext=sse_kms_encryption_context, BucketKeyEnabled=bucket_key_enabled, RequestPayer=request_payer, ObjectLockMode=object_lock_mode, ObjectLockRetainUntilDate=object_lock_retain_until_datetime, ObjectLockLegalHoldStatus=object_lock_legal_hold_status, ExpectedBucketOwner=expected_bucket_owner, ) ) key_name = "S3.Client.put_object" if key_name in existing_client_kwargs: existing_client_kwargs[key_name].update(s3_client_kwargs) else: existing_client_kwargs[key_name] = s3_client_kwargs if len(existing_client_kwargs): transport_params["client_kwargs"] = existing_client_kwargs else: # read mode only s3_client_kwargs = resolve_kwargs( IfMatch=if_match, IfModifiedSince=if_modified_since, IfNoneMatch=if_none_match, IfUnmodifiedSince=if_unmodified_since, Range=range, ResponseCacheControl=response_cache_control, ResponseContentDisposition=response_content_disposition, ResponseContentEncoding=response_content_encoding, ResponseContentLanguage=response_content_language, ResponseContentType=response_content_type, ResponseExpires=response_expires, SSECustomerAlgorithm=sse_customer_algorithm, SSECustomerKey=sse_customer_key, RequestPayer=request_payer, PartNumber=part_number, ExpectedBucketOwner=expected_bucket_owner, ChecksumMode=checksum_mode, ) key_name = "S3.Client.get_object" if key_name in existing_client_kwargs: # pragma: no cover existing_client_kwargs[key_name].update(s3_client_kwargs) else: existing_client_kwargs[key_name] = s3_client_kwargs if len(existing_client_kwargs): transport_params["client_kwargs"] = existing_client_kwargs return smart_open.open(**open_kwargs)