Напишем небольшого своего клиента на python, который подойдет для любых типов объектных хранилищ типа s3.
Начнем с модуля CLI. В нем собственно будет реализовано то, как будет выглядеть наше CLI.
Модуль CLI.
"""CLI module."""
import argparse
from dataclasses import dataclass
from importlib.metadata import version
import os
from typing import Optional, Union
@dataclass
class BasicArgs:
"""Basic class for CLI arguments."""
config: str
@dataclass
class ListObjectArgs(BasicArgs):
"""Args for listing object in bucket."""
command: str
s3_file_path: str
@dataclass
class PutObjectArgs(BasicArgs):
"""Args for put object in bucket."""
command: str
local_file: str
s3_file_path: str
@dataclass
class DeleteObjectArgs(BasicArgs):
"""Args for delete object in bucket."""
command: str
s3_file_path: str
@dataclass
class GetObjectArgs(BasicArgs):
"""Args for get object from s3."""
command: str
s3_file_path: str
local_file_path: str
def parse_args() -> Optional[Union[ListObjectArgs, PutObjectArgs, DeleteObjectArgs, GetObjectArgs]]:
"""Create CLI."""
parser = argparse.ArgumentParser(description="s3_client argument parser")
parser.add_argument(
"-v", "--version",
action="version",
version=f"%(prog)s {version('s3_sc')}",
help="s3_client version"
)
parser.add_argument(
"-c", "--config",
required=True,
default=os.environ.get("CONFIG_PATH", '~/s3_client.conf'),
help="Path to s3_client config",
dest="s3_config"
)
subparsers = parser.add_subparsers(
title="s3_client subcommands",
description="valid subcommands",
dest="command"
)
## List Object command
list_object_parser = subparsers.add_parser(
"ls", help="Usage <path>. Put </> to start from root."
)
list_object_parser.add_argument(
"s3_file_path",
help="Path to s3 file or directory"
)
## Put object command
put_object_parser = subparsers.add_parser(
"put", help="Usage <path_to_local_file> <path_to_s3_file>"
)
put_object_parser.add_argument(
"local_file",
help="Path to local file"
)
put_object_parser.add_argument(
"s3_file_path",
help="Path to file in s3 bucket"
)
# Delete object command
delete_object_parser = subparsers.add_parser(
"delete", help="Usage <path_to_s3_file>"
)
delete_object_parser.add_argument(
"s3_file_path",
help="Path to S3 file in bucket"
)
# Get object command
get_object_parser = subparsers.add_parser(
"get", help="Usage <path_to_s3_file> <path_to_local>"
)
get_object_parser.add_argument(
"s3_file_path",
help="Path to s3 file"
)
get_object_parser.add_argument(
"local_file_path",
help="Path to local file to save"
)
args = parser.parse_args()
if args.command == "ls":
return ListObjectArgs(
command=args.command,
config=args.s3_config,
s3_file_path=args.s3_file_path
)
elif args.command == "put":
return PutObjectArgs(
command=args.command,
config=args.s3_config,
local_file = args.local_file,
s3_file_path=args.s3_file_path
)
elif args.command == "delete":
return DeleteObjectArgs(
command=args.command,
config=args.s3_config,
s3_file_path=args.s3_file_path
)
elif args.command == "get":
return GetObjectArgs(
command=args.command,
config=args.s3_config,
s3_file_path=args.s3_file_path,
local_file_path=args.local_file_path
)
else:
parser.print_help()
return None
Во-первых используем аннотацию типов, это будет удобно нам самим не только лишь в будущем, если вдруг нужно будет вернуться к коду или взять что-то из него, но и уже сейчас, чтобы было меньше ошибок в runtime и чтобы было удобнее работать в IDE.
Здесь мы задаем два верхне уровневых ключа для нашей тулзы, первый это version, он необязательный и выведет нам текущую версию утилиты.
Второй параметр обязательный «—config», в него мы передаем путь до конфигурационного файла, которых как правило много, под разные бакеты.
Далее реализуем дополнительные парсеры для команд ls, get, put, delete. Это то, что будет уметь делать данный клиент.
В зависимости от переданной команды (ls, get и так далее), возвращаем тот или иной класс. Если не попали никуда, то выведем help, который автоматически генерирует библиотека argparser.
Несколько примеров как выглядит CLI:
Конфиг pyproject.toml
[tool.poetry]
name = "s3_sc"
version = "1.0.0"
description = "S3 simple client"
authors = ["Trusikhin Andrei <andrei.trusikhin@gmail.com>"]
license = "MIT"
readme = "README.md"
classifiers = [
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Intended Audience :: Developers",
"Topic :: Software Development :: Libraries :: Python Modules"
]
[tool.poetry.urls]
"Homepage" = "https://github.com/Onder7994/s3_python_client"
"Repository" = "https://github.com/Onder7994/s3_python_client"
"Documentation" = "https://github.com/Onder7994/s3_python_client#readme"
[build-system]
requires = ["poetry-core>=1.0.0", "poetry-dynamic-versioning>=1.0.0,<2.0.0"]
build-backend = "poetry_dynamic_versioning.backend"
[tool.poetry.dependencies]
python = ">=3.8"
aiobotocore = "2.13.2"
asyncio = "3.4.3"
[tool.poetry.scripts]
s3_sc = "s3_sc.main:main"
[tool.ruff]
exclude = [
".bzr",
".direnv",
".eggs",
".git",
".git-rewrite",
".hg",
".mypy_cache",
".nox",
".pants.d",
".pytype",
".ruff_cache",
".svn",
".tox",
".venv",
"__pypackages__",
"_build",
"buck-out",
"build",
"dist",
"venv",
]
line-length = 100
indent-width = 4
[tool.ruff.lint]
select = ["E", "W", "C", "F", "N", "D"]
ignore = []
# Allow fix for all enabled rules (when `--fix`) is provided.
fixable = ["ALL"]
unfixable = []
Модуль логирования
"""Unit logger."""
import logging
import json
from dataclasses import dataclass, field
class JSONFormatter(logging.Formatter):
"""Custom json formatter."""
def format(self, record: logging.LogRecord) -> str:
"""Make logs format."""
log_record = {
'asctime': self.formatTime(record, self.datefmt),
'levelname': record.levelname,
'message': record.getMessage(),
}
return json.dumps(log_record, ensure_ascii=False)
@dataclass
class Logging:
"""Loggin class."""
logger_name: str
logger: logging.Logger = field(init=False)
def __post_init__(self) -> None:
"""Initialize the logger."""
self.logger = logging.getLogger(self.logger_name)
self.logger.setLevel(logging.INFO)
formatter = JSONFormatter()
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
def log(self, level: int, message: str, *args: object) -> None:
"""Log method."""
self.logger.log(level, message, *args)
def info(self, message: str, *args: object) -> None:
"""INFO logging level."""
self.log(logging.INFO, message, *args)
def debug(self, message: str, *args: object) -> None:
"""DEBUG logging level."""
self.log(logging.DEBUG, message, *args)
def warning(self, message: str, *args: object) -> None:
"""WARNING logging level."""
self.log(logging.WARNING, message, *args)
def error(self, message: str, *args: object) -> None:
"""ERROR logging level."""
self.log(logging.ERROR, message, *args)
def critical(self, message: str, *args: object) -> None:
"""CRITICAL logging level."""
self.log(logging.CRITICAL, message, *args)
Далее нам нужно как-то прочитать наш конфиг, он будет иметь вид:
[main]
S3_ACCESS_KEY: <access_key>
S3_SECRET_KEY: <secret_key>
S3_ENDPOINT: <endpoint>
S3_BUCKET_NAME: <bucket_name>
При желании можно будет расширить его, добавляя те или иные опции для работы с S3 доступные в библиотеке.
Модуль чтения конфиг файл
"""Config reader module."""
import configparser
import os.path
from dataclasses import dataclass
from s3_sc.logger import Logging
import sys
@dataclass
class ConfigReader:
"""Read config file."""
logger: Logging
config_parser: configparser.ConfigParser = configparser.ConfigParser()
def read_s3_config(self, s3_config: str) -> tuple[str, str, str, str]:
"""Read s3 config."""
if os.path.isfile(s3_config):
self.config_parser.read(s3_config)
s3_access_key = self.config_parser.get("main", "S3_ACCESS_KEY")
s3_secret_key = self.config_parser.get("main", "S3_SECRET_KEY")
s3_endpoint = self.config_parser.get("main", "S3_ENDPOINT")
s3_bucket_name = self.config_parser.get("main", "S3_BUCKET_NAME")
return s3_access_key, s3_secret_key, s3_endpoint, s3_bucket_name
else:
self.logger.error("Config file: %s does not exist", s3_config)
sys.exit(1)
Здесь мы собственно читаем переданный конфигурационный файл, получаем переменные и возвращаем их в виде кортежа.
Модуль для работы с S3
"""S3 module."""
from contextlib import asynccontextmanager
from s3_sc.logger import Logging
from aiobotocore.client import AioBaseClient
from aiobotocore.session import get_session, AioSession
from dataclasses import dataclass, field
from botocore.exceptions import ClientError, ConnectTimeoutError, EndpointConnectionError
import sys
@dataclass
class S3Client:
"""S3 client class."""
access_key: str
secret_key: str
endpoint: str
bucket_name: str
logger: Logging
session: AioSession = field(init=False)
s3_config: dict = field(init=False)
def __post_init__(self):
"""Add attributes."""
self.s3_config = {
"aws_access_key_id": self.access_key,
"aws_secret_access_key": self.secret_key,
"endpoint_url": self.endpoint
}
self.session = get_session()
@asynccontextmanager
async def get_client(self) -> AioBaseClient:
"""Get s3 client."""
async with self.session.create_client("s3", **self.s3_config) as client:
yield client
async def list_s3_object(self, path: str) -> None:
"""List object (file and directory)."""
try:
async with self.get_client() as client:
paginator = client.get_paginator('list_objects_v2')
pages = paginator.paginate(
Bucket=self.bucket_name,
Prefix=path.lstrip('/'),
Delimiter='/'
)
async for page in pages:
if "Contents" in page:
for obj in page["Contents"]:
key = obj["Key"]
size = obj["Size"]
last_modified = obj["LastModified"]
date_str = last_modified.strftime('%Y-%m-%d %H:%M')
print(f"FILE {date_str} {size:>10} s3://{self.bucket_name}/{key}")
if "CommonPrefixes" in page:
for prefix in page["CommonPrefixes"]:
print(f"DIR s3://{self.bucket_name}/{prefix['Prefix']}")
except (ClientError, ValueError, ConnectTimeoutError, EndpointConnectionError) as err:
self.logger.error("S3Client error: %s", err)
sys.exit(1)
async def put_s3_object(self, src_path: str, dest_path: str) -> None:
"""Put object in bucket."""
try:
async with self.get_client() as client:
with open(src_path, "rb") as file:
await client.put_object(
Bucket=self.bucket_name,
Key=dest_path,
Body=file
)
self.logger.info(
"Object %s was upload to s3://%s/%s",
src_path, self.bucket_name, dest_path
)
except (ClientError, ValueError) as err:
self.logger.error("Error uploading file %s. Error: %s", src_path, err)
sys.exit(1)
async def delete_s3_object(self, dest_path: str) -> None:
"""Delete object from bucket."""
try:
async with self.get_client() as client:
if dest_path.endswith("/"):
paginator = client.get_paginator("list_objects_v2")
pages = paginator.paginate(
Bucket=self.bucket_name,
Prefix=dest_path
)
objects_to_delete = []
async for page in pages:
if "Contents" in page:
for obj in page["Contents"]:
objects_to_delete.append({'Key': obj['Key']})
if objects_to_delete:
await client.delete_objects(
Bucket=self.bucket_name,
Delete={
'Objects': objects_to_delete,
'Quiet': True
}
)
self.logger.info(
"Deleted %d objects from s3://%s/%s",
len(objects_to_delete), self.bucket_name, dest_path
)
else:
self.logger.info(
"No objects found to delete in s3://%s/%s",
self.bucket_name,
dest_path
)
else:
await client.delete_object(
Bucket=self.bucket_name,
Key=dest_path
)
self.logger.info(
"Deleted single object s3://%s/%s",
self.bucket_name, dest_path
)
except (ClientError, ValueError) as err:
self.logger.error("Error deleting object %s. Error: %s", dest_path, err)
sys.exit(1)
async def get_s3_object(self, dest_path: str, src_path: str) -> None:
"""Get object from bucket."""
try:
async with self.get_client() as client:
response = await client.get_object(
Bucket=self.bucket_name,
Key=dest_path
)
obj_data = await response["Body"].read()
with open(src_path, "wb") as file:
file.write(obj_data)
self.logger.info("Object %s was save to %s", dest_path, src_path)
except (ClientError, ValueError) as err:
self.logger.error("Error saving object %s. Error: %s", dest_path, err)
sys.exit(1)
Используем асинхронную библиотеку aiobotocore для работы с s3. Библиотека требует инциализации конфига внутри себя, т.е. мы не можем просто передать наш конфигурационный файл, поэтому в магическом методе __post_init__, который вызывается после инициализации конструктора класса, мы передаем еще два атрибута. Первый это собственно сам конфиг, нужный библиотеке, пусть prefix aws_* не смущает, на самом деле библиотека будет работать с любым объектым хранилищем. Также мы в атрибут session записывает значение метода get_session(), это нужно чтобы потом реализовать получение клиента в методе get_client().
Из интересного, сделать просто листинг нельзя, в s3 директории на самом деле не являются директориями в классическом понимании этого слова, как например в файловой системе. В s3 это просто префиксы. Мы используем метод paginator что пройтись по переданному пути и получить данные, далее фильтурем их. Contents это файлы, а CommonPrefixes «директории» или префиксы. На основе этого мы делаем print максимально напоминающий s3cmd утилиту.
Остальное в целом имеет похожею логику. Для удаления мы сначала понимаем передан ли нам файл или prefix, если передаем например «test», то расцениваем это как файл. Если же «test/», то как директорию.
Осталось собрать все вместе в основном модуле и проверить работу.
"""Main unit."""
import sys
import configparser
from s3_sc.cli import parse_args, ListObjectArgs, PutObjectArgs, DeleteObjectArgs, GetObjectArgs
from s3_sc.logger import Logging
from s3_sc.config_reader import ConfigReader
from s3_sc.s3_client import S3Client
import asyncio
logger = Logging("s3_client")
s3_config_parser = ConfigReader(logger=logger)
async def init_s3_client(config: str) -> S3Client:
"""Initialize s3 client."""
try:
(s3_access_key,
s3_secret_key,
s3_endpoint,
s3_bucket_name) = s3_config_parser.read_s3_config(config)
s3_client = S3Client(
access_key=s3_access_key,
secret_key=s3_secret_key,
endpoint=s3_endpoint,
bucket_name=s3_bucket_name,
logger=logger
)
return s3_client
except (configparser.NoSectionError, configparser.MissingSectionHeaderError):
logger.error("Can not find section '[main]' in config file: %s", config)
sys.exit(1)
async def async_main():
"""Async main."""
cmd_args = parse_args()
if cmd_args is not None:
s3_client = await init_s3_client(cmd_args.config)
if isinstance(cmd_args, ListObjectArgs):
await s3_client.list_s3_object(cmd_args.s3_file_path)
elif isinstance(cmd_args, PutObjectArgs):
await s3_client.put_s3_object(cmd_args.local_file, cmd_args.s3_file_path)
elif isinstance(cmd_args, GetObjectArgs):
await s3_client.get_s3_object(cmd_args.s3_file_path, cmd_args.local_file_path)
elif isinstance(cmd_args, DeleteObjectArgs):
await s3_client.delete_s3_object(cmd_args.s3_file_path)
def main():
"""Sync main."""
asyncio.run(async_main())
if __name__ == "__main__":
main()
Так как мы используем poetry и при установке например из pypi, или запуске через poetry, мы будем входить в утилиту через так называемый entry_point, это вот эта строка в pyproject.toml
[tool.poetry.scripts]
s3_sc = "s3_sc.main:main"
то использовать для входа мы должны синхронную функцию, из которой будем вызывать асинхронную. Иначе будут ошибки запуска связанные с корутинами и тем, что мы не используем await.
В основном условие мы проверяем является ли cmd_args экземпляром того или иного класса, как мы помним метод parse_args() из модуля cli возвращает либо None либо какой-то класс, в котором есть нужные нам атрибуты для cli. На основе этого мы вызываем тот или иной метод из модуля s3_client, тем самым в зависимости от переданных аргументов в cli будет вызван нужный метод.
Осталось протестировать. Для этого сделаем небольшой docker-compose файл с minio внутри.
Minio как раз реализует объектное хранилище, это отлично подойдет для локальных тестов. Также за кадром этого поста, я проверил работы утилиты с ceph, все работает.
version: '3.8'
services:
minio:
image: minio/minio
container_name: minio
ports:
- "9000:9000"
- "9001:9001"
volumes:
- minio_data:/data
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin123
command: server /data --console-address ":9001"
volumes:
minio_data:
Запускаем:
docker-compose up
Перед проверкой создаем в minio бакет и получаем нужные креды. У меня итоговый файл выглядит так:
[main]
S3_ACCESS_KEY: RzYTUyNp9WzDEVWJ4At9
S3_SECRET_KEY: 5BFREelrRyLcJnQ5sGzc2fo2dwRPcIenzftk14Zg
S3_ENDPOINT: http://127.0.0.1:9000
S3_BUCKET_NAME: test-bucket
Посмотрим наличие файлов в бакете
poetry run s3_sc -c s3_client.conf ls / ## При разработке
s3_sc -c s3_client.conf ls / ## Это если установили с pypi
Вывод будет ожидаемо пустым, так как у нас ничего нет в бакете. Давайте создадим тестовый файл и положим его туда.
echo "s3_test" > test.xt
s3_sc -c s3_client.conf put test.txt test.txt
s3_sc -c s3_client.conf ls /
Видим что файл создался в корне бакета. Давайте положим его в директорию.
s3_sc -c s3_client.conf put test.txt /test/test.txt
s3_sc -c s3_client.conf ls /
s3_sc -c s3_client.conf ls /test/
Видим наличие директории и самого файла. Теперь скачаем себе файл на локальную машину.
s3_sc -c s3_client.conf get test.txt local_test.txt
Отлично, теперь удалим файл и директорию.
s3_sc -c s3_client.conf delete test/
s3_client.conf delete test.txt
Все работает и бакет снова пустой.
Страница проекта на pypi.
Полный код на github.