Асинхронный s3 клиент на python

0
(0)

Напишем небольшого своего клиента на python, который подойдет для любых типов объектных хранилищ типа s3.

Начнем с модуля CLI. В нем собственно будет реализовано то, как будет выглядеть наше CLI.

Модуль CLI.

Python
"""CLI module."""
import argparse
from dataclasses import dataclass
from importlib.metadata import version
import os
from typing import Optional, Union


@dataclass
class BasicArgs:
    """Basic class for CLI arguments."""

    config: str

@dataclass
class ListObjectArgs(BasicArgs):
    """Args for listing object in bucket."""

    command: str
    s3_file_path: str

@dataclass
class PutObjectArgs(BasicArgs):
    """Args for put object in bucket."""

    command: str
    local_file: str
    s3_file_path: str

@dataclass
class DeleteObjectArgs(BasicArgs):
    """Args for delete object in bucket."""

    command: str
    s3_file_path: str

@dataclass
class GetObjectArgs(BasicArgs):
    """Args for get object from s3."""

    command: str
    s3_file_path: str
    local_file_path: str

def parse_args() -> Optional[Union[ListObjectArgs, PutObjectArgs, DeleteObjectArgs, GetObjectArgs]]:
    """Create CLI."""
    parser = argparse.ArgumentParser(description="s3_client argument parser")
    parser.add_argument(
        "-v", "--version",
        action="version",
        version=f"%(prog)s {version('s3_sc')}",
        help="s3_client version"
    )
    parser.add_argument(
        "-c", "--config",
        required=True,
        default=os.environ.get("CONFIG_PATH", '~/s3_client.conf'),
        help="Path to s3_client config",
        dest="s3_config"
    )
    subparsers = parser.add_subparsers(
        title="s3_client subcommands",
        description="valid subcommands",
        dest="command"
    )
    ## List Object command
    list_object_parser = subparsers.add_parser(
        "ls", help="Usage <path>. Put </> to start from root."
    )
    list_object_parser.add_argument(
        "s3_file_path",
        help="Path to s3 file or directory"
    )
    ## Put object command
    put_object_parser = subparsers.add_parser(
        "put", help="Usage <path_to_local_file> <path_to_s3_file>"
    )
    put_object_parser.add_argument(
        "local_file",
        help="Path to local file"
    )
    put_object_parser.add_argument(
        "s3_file_path",
        help="Path to file in s3 bucket"
    )
    # Delete object command
    delete_object_parser = subparsers.add_parser(
        "delete", help="Usage <path_to_s3_file>"
    )
    delete_object_parser.add_argument(
        "s3_file_path",
        help="Path to S3 file in bucket"
    )
    # Get object command
    get_object_parser = subparsers.add_parser(
        "get", help="Usage <path_to_s3_file> <path_to_local>"
    )
    get_object_parser.add_argument(
        "s3_file_path",
        help="Path to s3 file"
    )
    get_object_parser.add_argument(
        "local_file_path",
        help="Path to local file to save"
    )

    args = parser.parse_args()

    if args.command == "ls":
        return ListObjectArgs(
            command=args.command,
            config=args.s3_config,
            s3_file_path=args.s3_file_path
        )
    elif args.command == "put":
        return PutObjectArgs(
            command=args.command,
            config=args.s3_config,
            local_file = args.local_file,
            s3_file_path=args.s3_file_path
        )
    elif args.command == "delete":
        return DeleteObjectArgs(
            command=args.command,
            config=args.s3_config,
            s3_file_path=args.s3_file_path
        )
    elif args.command == "get":
        return GetObjectArgs(
            command=args.command,
            config=args.s3_config,
            s3_file_path=args.s3_file_path,
            local_file_path=args.local_file_path
        )
    else:
        parser.print_help()
        return None

Во-первых используем аннотацию типов, это будет удобно нам самим не только лишь в будущем, если вдруг нужно будет вернуться к коду или взять что-то из него, но и уже сейчас, чтобы было меньше ошибок в runtime и чтобы было удобнее работать в IDE.

Здесь мы задаем два верхне уровневых ключа для нашей тулзы, первый это version, он необязательный и выведет нам текущую версию утилиты.

Второй параметр обязательный «—config», в него мы передаем путь до конфигурационного файла, которых как правило много, под разные бакеты.

Далее реализуем дополнительные парсеры для команд ls, get, put, delete. Это то, что будет уметь делать данный клиент.

В зависимости от переданной команды (ls, get и так далее), возвращаем тот или иной класс. Если не попали никуда, то выведем help, который автоматически генерирует библиотека argparser.

Несколько примеров как выглядит CLI:

Конфиг pyproject.toml

TOML
[tool.poetry]
name = "s3_sc"
version = "1.0.0"
description = "S3 simple client"
authors = ["Trusikhin Andrei <andrei.trusikhin@gmail.com>"]
license = "MIT"
readme = "README.md"
classifiers = [
    "Programming Language :: Python :: 3",
    "Programming Language :: Python :: 3.10",
    "License :: OSI Approved :: MIT License",
    "Operating System :: OS Independent",
    "Intended Audience :: Developers",
    "Topic :: Software Development :: Libraries :: Python Modules"
]

[tool.poetry.urls]
"Homepage" = "https://github.com/Onder7994/s3_python_client"
"Repository" = "https://github.com/Onder7994/s3_python_client"
"Documentation" = "https://github.com/Onder7994/s3_python_client#readme"

[build-system]
requires = ["poetry-core>=1.0.0", "poetry-dynamic-versioning>=1.0.0,<2.0.0"]
build-backend = "poetry_dynamic_versioning.backend"

[tool.poetry.dependencies]
python = ">=3.8"
aiobotocore = "2.13.2"
asyncio = "3.4.3"

[tool.poetry.scripts]
s3_sc = "s3_sc.main:main"

[tool.ruff]
exclude = [
    ".bzr",
    ".direnv",
    ".eggs",
    ".git",
    ".git-rewrite",
    ".hg",
    ".mypy_cache",
    ".nox",
    ".pants.d",
    ".pytype",
    ".ruff_cache",
    ".svn",
    ".tox",
    ".venv",
    "__pypackages__",
    "_build",
    "buck-out",
    "build",
    "dist",
    "venv",
]
line-length = 100
indent-width = 4

[tool.ruff.lint]
select = ["E", "W", "C", "F", "N", "D"]
ignore = []

# Allow fix for all enabled rules (when `--fix`) is provided.
fixable = ["ALL"]
unfixable = []

Модуль логирования

Python
"""Unit logger."""
import logging
import json
from dataclasses import dataclass, field

class JSONFormatter(logging.Formatter):
    """Custom json formatter."""

    def format(self, record: logging.LogRecord) -> str:
        """Make logs format."""
        log_record = {
            'asctime': self.formatTime(record, self.datefmt),
            'levelname': record.levelname,
            'message': record.getMessage(),
        }
        return json.dumps(log_record, ensure_ascii=False)

@dataclass
class Logging:
    """Loggin class."""

    logger_name: str
    logger: logging.Logger = field(init=False)

    def __post_init__(self) -> None:
        """Initialize the logger."""
        self.logger = logging.getLogger(self.logger_name)
        self.logger.setLevel(logging.INFO)
        formatter = JSONFormatter()
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)
        console_handler.setFormatter(formatter)
        self.logger.addHandler(console_handler)

    def log(self, level: int, message: str, *args: object) -> None:
        """Log method."""
        self.logger.log(level, message, *args)

    def info(self, message: str, *args: object) -> None:
        """INFO logging level."""
        self.log(logging.INFO, message, *args)

    def debug(self, message: str, *args: object) -> None:
        """DEBUG logging level."""
        self.log(logging.DEBUG, message, *args)

    def warning(self, message: str, *args: object) -> None:
        """WARNING logging level."""
        self.log(logging.WARNING, message, *args)

    def error(self, message: str, *args: object) -> None:
        """ERROR logging level."""
        self.log(logging.ERROR, message, *args)

    def critical(self, message: str, *args: object) -> None:
        """CRITICAL logging level."""
        self.log(logging.CRITICAL, message, *args)

Далее нам нужно как-то прочитать наш конфиг, он будет иметь вид:

Plaintext
[main]
S3_ACCESS_KEY: <access_key>
S3_SECRET_KEY: <secret_key>
S3_ENDPOINT: <endpoint>
S3_BUCKET_NAME: <bucket_name>

При желании можно будет расширить его, добавляя те или иные опции для работы с S3 доступные в библиотеке.

Модуль чтения конфиг файл

Python
"""Config reader module."""
import configparser
import os.path
from dataclasses import dataclass
from s3_sc.logger import Logging
import sys


@dataclass
class ConfigReader:
    """Read config file."""

    logger: Logging
    config_parser: configparser.ConfigParser = configparser.ConfigParser()

    def read_s3_config(self, s3_config: str) -> tuple[str, str, str, str]:
        """Read s3 config."""
        if os.path.isfile(s3_config):
            self.config_parser.read(s3_config)
            s3_access_key = self.config_parser.get("main", "S3_ACCESS_KEY")
            s3_secret_key = self.config_parser.get("main", "S3_SECRET_KEY")
            s3_endpoint = self.config_parser.get("main", "S3_ENDPOINT")
            s3_bucket_name = self.config_parser.get("main", "S3_BUCKET_NAME")
            return s3_access_key, s3_secret_key, s3_endpoint, s3_bucket_name
        else:
            self.logger.error("Config file: %s does not exist", s3_config)
            sys.exit(1)

Здесь мы собственно читаем переданный конфигурационный файл, получаем переменные и возвращаем их в виде кортежа.

Модуль для работы с S3

Python
"""S3 module."""
from contextlib import asynccontextmanager
from s3_sc.logger import Logging
from aiobotocore.client import AioBaseClient
from aiobotocore.session import get_session, AioSession
from dataclasses import dataclass, field
from botocore.exceptions import ClientError, ConnectTimeoutError, EndpointConnectionError
import sys


@dataclass
class S3Client:
    """S3 client class."""

    access_key: str
    secret_key: str
    endpoint: str
    bucket_name: str
    logger: Logging
    session: AioSession = field(init=False)
    s3_config: dict = field(init=False)

    def __post_init__(self):
        """Add attributes."""
        self.s3_config = {
            "aws_access_key_id": self.access_key,
            "aws_secret_access_key": self.secret_key,
            "endpoint_url": self.endpoint
        }
        self.session = get_session()

    @asynccontextmanager
    async def get_client(self) -> AioBaseClient:
        """Get s3 client."""
        async with self.session.create_client("s3", **self.s3_config) as client:
            yield client

    async def list_s3_object(self, path: str) -> None:
        """List object (file and directory)."""
        try:
            async with self.get_client() as client:
                paginator = client.get_paginator('list_objects_v2')
                pages = paginator.paginate(
                    Bucket=self.bucket_name,
                    Prefix=path.lstrip('/'),
                    Delimiter='/'
                )
                async for page in pages:
                    if "Contents" in page:
                        for obj in page["Contents"]:
                            key = obj["Key"]
                            size = obj["Size"]
                            last_modified = obj["LastModified"]
                            date_str = last_modified.strftime('%Y-%m-%d %H:%M')
                            print(f"FILE {date_str} {size:>10}  s3://{self.bucket_name}/{key}")
                    if "CommonPrefixes" in page:
                        for prefix in page["CommonPrefixes"]:
                            print(f"DIR s3://{self.bucket_name}/{prefix['Prefix']}")
        except (ClientError, ValueError, ConnectTimeoutError, EndpointConnectionError) as err:
            self.logger.error("S3Client error: %s", err)
            sys.exit(1)

    async def put_s3_object(self, src_path: str, dest_path: str) -> None:
        """Put object in bucket."""
        try:
            async with self.get_client() as client:
                with open(src_path, "rb") as file:
                    await client.put_object(
                        Bucket=self.bucket_name,
                        Key=dest_path,
                        Body=file
                    )
                self.logger.info(
                    "Object %s was upload to s3://%s/%s",
                    src_path, self.bucket_name, dest_path
                )
        except (ClientError, ValueError) as err:
            self.logger.error("Error uploading file %s. Error: %s", src_path, err)
            sys.exit(1)

    async def delete_s3_object(self, dest_path: str) -> None:
        """Delete object from bucket."""
        try:

            async with self.get_client() as client:
                if dest_path.endswith("/"):
                    paginator = client.get_paginator("list_objects_v2")
                    pages = paginator.paginate(
                        Bucket=self.bucket_name,
                        Prefix=dest_path
                    )
                    objects_to_delete = []
                    async for page in pages:
                        if "Contents" in page:
                            for obj in page["Contents"]:
                                objects_to_delete.append({'Key': obj['Key']})
                    if objects_to_delete:
                        await client.delete_objects(
                            Bucket=self.bucket_name,
                            Delete={
                                'Objects': objects_to_delete,
                                'Quiet': True
                            }
                        )
                        self.logger.info(
                            "Deleted %d objects from s3://%s/%s",
                            len(objects_to_delete), self.bucket_name, dest_path
                        )
                    else:
                        self.logger.info(
                            "No objects found to delete in s3://%s/%s",
                            self.bucket_name,
                            dest_path
                        )
                else:
                    await client.delete_object(
                        Bucket=self.bucket_name,
                        Key=dest_path
                    )
                    self.logger.info(
                        "Deleted single object s3://%s/%s",
                        self.bucket_name, dest_path
                    )
        except (ClientError, ValueError) as err:
            self.logger.error("Error deleting object %s. Error: %s", dest_path, err)
            sys.exit(1)

    async def get_s3_object(self, dest_path: str, src_path: str) -> None:
        """Get object from bucket."""
        try:
            async with self.get_client() as client:
                response = await client.get_object(
                    Bucket=self.bucket_name,
                    Key=dest_path
                )
                obj_data = await response["Body"].read()
                with open(src_path, "wb") as file:
                    file.write(obj_data)
                self.logger.info("Object %s was save to %s", dest_path, src_path)
        except (ClientError, ValueError) as err:
            self.logger.error("Error saving object %s. Error: %s", dest_path, err)
            sys.exit(1)

Используем асинхронную библиотеку aiobotocore для работы с s3. Библиотека требует инциализации конфига внутри себя, т.е. мы не можем просто передать наш конфигурационный файл, поэтому в магическом методе __post_init__, который вызывается после инициализации конструктора класса, мы передаем еще два атрибута. Первый это собственно сам конфиг, нужный библиотеке, пусть prefix aws_* не смущает, на самом деле библиотека будет работать с любым объектым хранилищем. Также мы в атрибут session записывает значение метода get_session(), это нужно чтобы потом реализовать получение клиента в методе get_client().

Из интересного, сделать просто листинг нельзя, в s3 директории на самом деле не являются директориями в классическом понимании этого слова, как например в файловой системе. В s3 это просто префиксы. Мы используем метод paginator что пройтись по переданному пути и получить данные, далее фильтурем их. Contents это файлы, а CommonPrefixes «директории» или префиксы. На основе этого мы делаем print максимально напоминающий s3cmd утилиту.

Остальное в целом имеет похожею логику. Для удаления мы сначала понимаем передан ли нам файл или prefix, если передаем например «test», то расцениваем это как файл. Если же «test/», то как директорию.

Осталось собрать все вместе в основном модуле и проверить работу.

Python
"""Main unit."""
import sys
import configparser

from s3_sc.cli import parse_args, ListObjectArgs, PutObjectArgs, DeleteObjectArgs, GetObjectArgs
from s3_sc.logger import Logging
from s3_sc.config_reader import ConfigReader
from s3_sc.s3_client import S3Client
import asyncio

logger = Logging("s3_client")
s3_config_parser = ConfigReader(logger=logger)

async def init_s3_client(config: str) -> S3Client:
    """Initialize s3 client."""
    try:
        (s3_access_key,
         s3_secret_key,
         s3_endpoint,
         s3_bucket_name) = s3_config_parser.read_s3_config(config)
        s3_client = S3Client(
            access_key=s3_access_key,
            secret_key=s3_secret_key,
            endpoint=s3_endpoint,
            bucket_name=s3_bucket_name,
            logger=logger
        )
        return s3_client
    except (configparser.NoSectionError, configparser.MissingSectionHeaderError):
        logger.error("Can not find section '[main]' in config file: %s", config)
        sys.exit(1)

async def async_main():
    """Async main."""
    cmd_args = parse_args()
    if cmd_args is not None:
        s3_client = await init_s3_client(cmd_args.config)
        if isinstance(cmd_args, ListObjectArgs):
            await s3_client.list_s3_object(cmd_args.s3_file_path)
        elif isinstance(cmd_args, PutObjectArgs):
            await s3_client.put_s3_object(cmd_args.local_file, cmd_args.s3_file_path)
        elif isinstance(cmd_args, GetObjectArgs):
            await s3_client.get_s3_object(cmd_args.s3_file_path, cmd_args.local_file_path)
        elif isinstance(cmd_args, DeleteObjectArgs):
            await s3_client.delete_s3_object(cmd_args.s3_file_path)

def main():
    """Sync main."""
    asyncio.run(async_main())

if __name__ == "__main__":
    main()

Так как мы используем poetry и при установке например из pypi, или запуске через poetry, мы будем входить в утилиту через так называемый entry_point, это вот эта строка в pyproject.toml

TOML
[tool.poetry.scripts]
s3_sc = "s3_sc.main:main"

то использовать для входа мы должны синхронную функцию, из которой будем вызывать асинхронную. Иначе будут ошибки запуска связанные с корутинами и тем, что мы не используем await.

В основном условие мы проверяем является ли cmd_args экземпляром того или иного класса, как мы помним метод parse_args() из модуля cli возвращает либо None либо какой-то класс, в котором есть нужные нам атрибуты для cli. На основе этого мы вызываем тот или иной метод из модуля s3_client, тем самым в зависимости от переданных аргументов в cli будет вызван нужный метод.

Осталось протестировать. Для этого сделаем небольшой docker-compose файл с minio внутри.

Minio как раз реализует объектное хранилище, это отлично подойдет для локальных тестов. Также за кадром этого поста, я проверил работы утилиты с ceph, все работает.

YAML
version: '3.8'

services:
  minio:
    image: minio/minio
    container_name: minio
    ports:
      - "9000:9000"
      - "9001:9001"
    volumes:
      - minio_data:/data
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin123
    command: server /data --console-address ":9001"

volumes:
  minio_data:

Запускаем:

Bash
docker-compose up

Перед проверкой создаем в minio бакет и получаем нужные креды. У меня итоговый файл выглядит так:

Plaintext
[main]
S3_ACCESS_KEY: RzYTUyNp9WzDEVWJ4At9
S3_SECRET_KEY: 5BFREelrRyLcJnQ5sGzc2fo2dwRPcIenzftk14Zg
S3_ENDPOINT: http://127.0.0.1:9000
S3_BUCKET_NAME: test-bucket

Посмотрим наличие файлов в бакете

Bash
poetry run s3_sc -c s3_client.conf ls / ## При разработке
s3_sc -c s3_client.conf ls / ## Это если установили с pypi

Вывод будет ожидаемо пустым, так как у нас ничего нет в бакете. Давайте создадим тестовый файл и положим его туда.

Bash
echo "s3_test" > test.xt
s3_sc -c s3_client.conf put test.txt test.txt
s3_sc -c s3_client.conf ls /

Видим что файл создался в корне бакета. Давайте положим его в директорию.

Bash
s3_sc -c s3_client.conf put test.txt /test/test.txt
s3_sc -c s3_client.conf ls /
s3_sc -c s3_client.conf ls /test/

Видим наличие директории и самого файла. Теперь скачаем себе файл на локальную машину.

Bash
s3_sc -c s3_client.conf get test.txt local_test.txt

Отлично, теперь удалим файл и директорию.

Bash
s3_sc -c s3_client.conf delete test/
s3_client.conf delete test.txt 

Все работает и бакет снова пустой.

Страница проекта на pypi.

Полный код на github.

Насколько статья полезна?

Нажмите на звезду, чтобы оценить!

Средняя оценка 0 / 5. Количество оценок: 0

Оценок пока нет. Поставьте оценку первым.

Оставить комментарий