beerds/server/modules/attachments/repositories/attachments.py

162 lines
4.8 KiB
Python

from abc import ABCMeta, abstractmethod
from datetime import UTC, datetime
from typing import Tuple
from sqlalchemy import CursorResult, delete, insert, select, update
from server.config import get_app_config
from server.infra.db import AbstractDB, AbstractSession, AsyncDB, MockDB
from server.modules.attachments.domains.attachments import Attachment
class AttachmentRepository(metaclass=ABCMeta):
@abstractmethod
async def get_by_id(
self, session: AbstractSession, attach_id: list[str]
) -> list[Attachment]:
"""Get Attachment by ID"""
pass
@abstractmethod
async def create(self, data: Attachment):
"""Create entry in DB"""
pass
@abstractmethod
async def delete(self, attach_id: str):
"""Get Attachment by ID"""
pass
@abstractmethod
async def update(self, attach_id: str, **kwargs):
pass
@abstractmethod
def db(self) -> AbstractDB:
pass
class MockAttachmentRepository(AttachmentRepository):
_data: dict[str, Attachment]
def __init__(self):
self._data = {
"jian4": Attachment(
id="jian4",
size=0,
storage_driver_name="mock",
path="jian4",
media_type="audio",
created_by="created_by",
content_type="audio/mp3",
),
"zai4": Attachment(
id="zai4",
size=0,
storage_driver_name="mock",
path="zai4",
media_type="audio",
created_by="created_by",
content_type="audio/mp3",
),
"cheng2": Attachment(
id="cheng2",
size=0,
storage_driver_name="mock",
path="cheng2",
media_type="audio",
created_by="created_by",
content_type="audio/mp3",
),
"shi4": Attachment(
id="shi4",
size=0,
storage_driver_name="mock",
path="shi4",
media_type="audio",
created_by="created_by",
content_type="audio/mp3",
),
"nv3": Attachment(
id="nv3",
size=0,
storage_driver_name="mock",
path="nv3",
media_type="audio",
created_by="created_by",
content_type="audio/mp3",
),
}
self._db = MockDB(get_app_config())
async def get_by_id(
self, session: AbstractSession, attach_id: list[str]
) -> list[Attachment]:
f: list[Attachment] = []
for f_id in attach_id:
f_item = self._data.get(f_id)
if f_item is None:
continue
f.append(f_item)
return f
async def create(self, data: Attachment):
self._data[data.id] = data
async def delete(self, attach_id: str):
del self._data[attach_id]
async def update(self, attach_id: str, **kwargs):
pass
def db(self) -> AbstractDB:
return self._db
class DBAttachmentRepository(AttachmentRepository):
_db: AsyncDB
def __init__(self, db: AsyncDB):
self._db = db
async def get_by_id(
self, session: AbstractSession, attach_id: list[str]
) -> list[Attachment]:
q = select(Attachment).where(
Attachment.id.in_(attach_id) # type: ignore
)
attachment: list[Attachment] = []
result: CursorResult[Tuple[Attachment]] = await session.execute(q) # type: ignore
for d in result.all():
attachment.append(d[0])
return attachment
async def create(self, data: Attachment):
new_dict = data.to_serializable(delete_private=True, time_to_str=False)
insert_data = insert(Attachment).values(**new_dict)
async with self._db.session_master() as session:
async with session.begin():
await session.execute(insert_data)
await session.commit()
return data.id
async def delete(self, attach_id: str):
q = delete(Attachment).where(
Attachment.id == attach_id # type: ignore
)
async with self._db.session_master() as session:
async with session.begin():
await session.execute(q)
await session.commit()
async def update(self, attach_id: str, **kwargs):
kwargs["updated_at"] = datetime.now(UTC)
q = update(Attachment).values(kwargs).where(Attachment.id == attach_id) # type: ignore
async with self._db.session_master() as session:
async with session.begin():
await session.execute(q)
await session.commit()
def db(self) -> AbstractDB:
return self._db