import hashlib import os.path from abc import ABCMeta, abstractmethod from datetime import UTC, datetime from enum import Enum from io import BytesIO from pathlib import Path from typing import Any, AsyncIterable, AsyncIterator, Optional import uuid import aioboto3 # type: ignore import aiofiles import magic # type: ignore from botocore.client import BaseClient # type: ignore from botocore.exceptions import ClientError # type: ignore from PIL import Image from server.config import AppConfig, get_app_config from server.infra.db import AbstractSession from server.infra.logger import get_logger from server.modules.attachments.domains.attachments import Attachment from server.modules.attachments.repositories.attachments import AttachmentRepository class StorageDriversType(str, Enum): MOCK = "mock" LOCAL = "local" S3 = "s3" class MediaType(str, Enum): AUDIO = "audio" VIDEO = "video" IMAGE = "image" SVG_IMAGE = "svg_image" OTHER = "other" class StorageDriver(metaclass=ABCMeta): @abstractmethod def get_name(self) -> str: pass @abstractmethod async def put(self, data: bytes) -> str: pass @abstractmethod async def take(self, path: str) -> Optional[bytes]: pass @abstractmethod async def stream(self, path: str) -> AsyncIterable[Any]: pass @abstractmethod async def delete(self, path: str): pass class LocalStorageDriver(StorageDriver): _mount_dir: Path def __init__(self) -> None: self._mount_dir = Path(get_app_config().fs_local_mount_dir) # type: ignore def get_name(self) -> str: return StorageDriversType.LOCAL.value async def put(self, data: bytes) -> str: dir_path = self._mount_dir / Path(datetime.now(UTC).strftime("%d")) if not os.path.isdir(dir_path): os.mkdir(dir_path) path = dir_path / Path(hashlib.file_digest(BytesIO(data), "sha256").hexdigest()) if os.path.isfile(path) and os.stat(path).st_size == len(data): return str(path) async with aiofiles.open(path, "wb") as f: await f.write(data) return str(path) async def take(self, path: str) -> Optional[bytes]: if not os.path.isfile(path): return None async with aiofiles.open(path, "rb") as f: return await f.read() async def stream(self, path: str): async with aiofiles.open(path, mode="rb") as f: while chunk := await f.read(1024): yield chunk async def delete(self, path: str): if not os.path.isfile(path): return os.remove(path) class MockStorageDriver(StorageDriver): _store: dict[str, bytes] def __init__(self) -> None: self._store = {} def get_name(self) -> str: return StorageDriversType.MOCK.value async def stream(self, path: str) -> AsyncIterable: raise NotImplementedError async def put(self, data: bytes) -> str: path = str(uuid.uuid4()) self._store[path] = data return path async def take(self, path: str) -> Optional[bytes]: return self._store.get(path) async def delete(self, path: str): del self._store[path] class S3StorageDriver(StorageDriver): _prefix: str = "pvc-435e5137-052f-43b1-ace2-e350c9d50c76" def __init__(self, cnf: AppConfig) -> None: self._chunk_size: int = 69 * 1024 self._cnf = cnf self._logger = get_logger() self._session = aioboto3.Session( aws_access_key_id=str(cnf.fs_s3_access_key_id), aws_secret_access_key=str(cnf.fs_s3_access_key), ) def get_name(self) -> str: return StorageDriversType.S3.value async def _client(self) -> BaseClient: return self._session.client("s3", endpoint_url=self._cnf.fs_s3_endpoint) def _normalize_path(self, path: str) -> str: return f"{S3StorageDriver._prefix}{path}".replace( self._cnf.fs_local_mount_dir, "" ) async def put(self, data: bytes) -> str: sign = hashlib.file_digest(BytesIO(data), "sha256").hexdigest() day = datetime.now(UTC).strftime("%d") path = str(Path(S3StorageDriver._prefix) / day / sign) async with await self._client() as s3: await s3.upload_fileobj( Fileobj=BytesIO(data), Bucket=self._cnf.fs_s3_bucket, Key=path, ExtraArgs={ "ChecksumAlgorithm": "SHA256", "ChecksumSHA256": sign, }, ) return path.replace(S3StorageDriver._prefix, "") async def stream(self, path: str): path = self._normalize_path(path) self._logger.debug( f"stream s3 path {path}, bucket {self._cnf.fs_s3_bucket}, endpoint {self._cnf.fs_s3_endpoint}" ) try: async with await self._client() as s3: obj = await s3.get_object(Bucket=self._cnf.fs_s3_bucket, Key=path) body = obj["Body"] while chunk := await body.read(self._chunk_size): yield chunk except ClientError as e: if e.response.get("Error", {}).get("Code") != "NoSuchKey": self._logger.error(f"stream client error: {str(e)}, path: {path}") raise FileNotFoundError except Exception as e: self._logger.error( f"stream error: {type(e).__name__} {str(e)}, path: {path}" ) raise FileNotFoundError async def take(self, path: str) -> Optional[bytes]: buffer = BytesIO() async for chunk in self.stream(path): if chunk: buffer.write(chunk) content = buffer.getvalue() return content if content else None async def delete(self, path: str) -> None: async with await self._client() as s3: await s3.delete_object( Bucket=self._cnf.fs_s3_bucket, Key=self._normalize_path(path) ) RESIZE_MAX_SIZE = 100_000 RESIZE_PARAMS = (500, 500) class AtachmentService: _driver: StorageDriver _repository: AttachmentRepository _cnf: AppConfig def __init__(self, driver: StorageDriver, attach_repository: AttachmentRepository): self._driver = driver self._repository = attach_repository self._cnf = get_app_config() def media_type(self, content_type: str) -> MediaType: if content_type == "": return MediaType.OTHER firt_part = content_type.split("/")[0] match firt_part: case str(MediaType.SVG_IMAGE): return MediaType.SVG_IMAGE case str(MediaType.IMAGE): return MediaType.IMAGE case str(MediaType.AUDIO): return MediaType.AUDIO case str(MediaType.VIDEO): return MediaType.VIDEO case _: return MediaType.OTHER def content_type(self, file: bytes) -> str: return magic.from_buffer(file[0:2048], mime=True) def extension(self, content_type: str | None) -> str: if not content_type: return "jpg" if len(content_type.split("/")) != 2: return "jpg" return content_type.split("/")[1].split("+")[0] def id_from_url(self, url: str) -> str: if ".original" not in url: raise ValueError(f"wrong url: {url}") parts = url.split(".original")[0] return parts.replace("/", "") def url(self, attachment_id: str, content_type: str | None = None) -> str: return f"{self._cnf.app_public_url}/api/v0/attachment/{attachment_id}.original.{ self.extension(content_type) }" async def create(self, file: bytes, user_id: str) -> Attachment: path = await self._driver.put(file) content_type = self.content_type(file) attach = Attachment( size=len(file), storage_driver_name=str(self._driver.get_name()), path=path, media_type=self.media_type(content_type), content_type=content_type, created_by=user_id, id=str(uuid.uuid4()), created_at=datetime.now(UTC), updated_at=datetime.now(UTC), ) await self._repository.create(attach) return attach async def get_info( self, session: AbstractSession | None, attach_id: list[str] ) -> list[Attachment]: if not attach_id: return [] if session is not None: return await self._repository.get_by_id(session, attach_id) async with self._repository.db().session_slave() as session: return await self._repository.get_by_id(session, attach_id) def get_name(self, attachment: Attachment) -> str: return f"{attachment.id}.{self.extension(attachment.content_type)}" async def get_data( self, session: AbstractSession, attach_id: str ) -> Optional[bytes]: file = await self._repository.get_by_id(session, [attach_id]) if not file: return None return await self._driver.take(file[0].path) async def get_stream( self, session: AbstractSession | None, attach_id: str ) -> AsyncIterator[bytes]: async def _stream_iterator(is_empty: bool): if is_empty: return yield first_chunk async for chunk in stream: # type: ignore yield chunk if session: file = await self._repository.get_by_id(session, [attach_id]) else: async with self._repository.db().session_slave() as session: file = await self._repository.get_by_id(session, [attach_id]) if not file: raise FileNotFoundError stream = self._driver.stream(file[0].path) try: first_chunk = await stream.__anext__() # type: ignore except StopAsyncIteration: return _stream_iterator(is_empty=True) return _stream_iterator(is_empty=False) async def image_resize(self, session: AbstractSession, attach_id: list[str]): info = await self.get_info(session, attach_id) get_logger().info( f"image_resize {len(info)}", ) if not info: return for item in info: if item.media_type != MediaType.IMAGE.value: continue if item.is_deleted: continue data = await self.get_data(session, item.id) if data is None: continue if len(data) <= RESIZE_MAX_SIZE: get_logger().info( f"skip because size: {len(data)}", ) continue img = Image.open(BytesIO(data)) img.thumbnail(RESIZE_PARAMS) buffer = BytesIO() img.save(buffer, format="JPEG", quality=70) buffer.seek(0) d = buffer.read() if d == 0: return buffer.close() get_logger().info( f"delete:{item.path}", ) path = await self._driver.put(d) await self._repository.update( item.id, path=path, content_type="image/jpeg", size=len(d) ) await self._driver.delete(item.path)