beerds/server/modules/attachments/repository/attachments.py

155 lines
4.8 KiB
Python

from abc import ABCMeta, abstractmethod
from datetime import UTC, datetime
from sqlalchemy import CursorResult, delete, insert, select, update
from server.config import get_app_config
from server.infra.db import AbstractDB, AbstractSession, AsyncDB, MockDB
from server.modules.attachments.domains.attachments import Attachment
class AttachmentRepository(metaclass=ABCMeta):
@abstractmethod
async def get_by_id(self, session: AbstractSession, attach_id: list[str]) -> list[Attachment]:
"""Get Attachment by ID"""
pass
@abstractmethod
async def create(self, data: Attachment):
"""Create entry in DB"""
pass
@abstractmethod
async def delete(self, attach_id: str):
"""Get Attachment by ID"""
pass
@abstractmethod
async def update(self, attach_id: str, **kwargs):
pass
@abstractmethod
def db(self) -> AbstractDB:
pass
class MockAttachmentRepository(AttachmentRepository):
_data: dict[str, Attachment]
def __init__(self):
self._data = {
"jian4": Attachment(
id="jian4",
size=0,
storage_driver_name="mock",
path="jian4",
media_type="audio",
created_by="created_by",
content_type="audio/mp3",
),
"zai4": Attachment(
id="zai4",
size=0,
storage_driver_name="mock",
path="zai4",
media_type="audio",
created_by="created_by",
content_type="audio/mp3",
),
"cheng2": Attachment(
id="cheng2",
size=0,
storage_driver_name="mock",
path="cheng2",
media_type="audio",
created_by="created_by",
content_type="audio/mp3",
),
"shi4": Attachment(
id="shi4",
size=0,
storage_driver_name="mock",
path="shi4",
media_type="audio",
created_by="created_by",
content_type="audio/mp3",
),
"nv3": Attachment(
id="nv3",
size=0,
storage_driver_name="mock",
path="nv3",
media_type="audio",
created_by="created_by",
content_type="audio/mp3",
),
}
self._db = MockDB(get_app_config())
async def get_by_id(self, session: AbstractSession, attach_id: list[str]) -> list[Attachment]:
f: list[Attachment] = []
for f_id in attach_id:
f_item = self._data.get(f_id)
if f_item is None:
continue
f.append(f_item)
return f
async def create(self, data: Attachment):
self._data[data.id] = data
async def delete(self, attach_id: str):
del self._data[attach_id]
async def update(self, attach_id: str, **kwargs):
pass
def db(self) -> AbstractDB:
return self._db
class DBAttachmentRepository(AttachmentRepository):
_db: AsyncDB
def __init__(self, db: AsyncDB):
self._db = db
async def get_by_id(self, session: AbstractSession, attach_id: list[str]) -> list[Attachment]:
q = select(Attachment).where(
Attachment.id.in_(attach_id) # type: ignore
)
attachment: list[Attachment] = []
result: CursorResult[tuple[Attachment]] = await session.execute(q) # type: ignore
for d in result.all():
attachment.append(d[0])
return attachment
async def create(self, data: Attachment):
new_dict = data.to_serializable(delete_private=True, time_to_str=False)
insert_data = insert(Attachment).values(**new_dict)
async with self._db.session_master() as session:
async with session.begin():
await session.execute(insert_data)
await session.commit()
return data.id
async def delete(self, attach_id: str):
q = delete(Attachment).where(
Attachment.id == attach_id # type: ignore
)
async with self._db.session_master() as session:
async with session.begin():
await session.execute(q)
await session.commit()
async def update(self, attach_id: str, **kwargs):
kwargs["updated_at"] = datetime.now(UTC)
q = update(Attachment).values(kwargs).where(Attachment.id == attach_id) # type: ignore
async with self._db.session_master() as session:
async with session.begin():
await session.execute(q)
await session.commit()
def db(self) -> AbstractDB:
return self._db