125 lines
3.9 KiB
Python
125 lines
3.9 KiB
Python
|
|
import flask_login
|
|
from flask import Response, redirect, render_template, request, url_for
|
|
from flask_admin import Admin, AdminIndexView, expose
|
|
from flask_admin.theme import Bootstrap4Theme
|
|
from flask_wtf import FlaskForm
|
|
from wtforms import fields, validators
|
|
|
|
from server.admin.auth import POOL, User
|
|
from server.admin.config import app, db_sync
|
|
from server.admin.views.attachments import AttachmentView
|
|
from server.config import get_app_config
|
|
from server.infra.db import AsyncDB
|
|
from server.modules.attachments import AtachmentService, DBAttachmentRepository, S3StorageDriver
|
|
from server.modules.attachments.repository.models import Attachment
|
|
|
|
login_manager = flask_login.LoginManager()
|
|
login_manager.init_app(app)
|
|
|
|
|
|
cnf = get_app_config()
|
|
db_async = AsyncDB(cnf)
|
|
attach_service = AtachmentService(S3StorageDriver(cnf=cnf), DBAttachmentRepository(db_async))
|
|
|
|
|
|
@app.route("/")
|
|
def index():
|
|
return render_template("index.html")
|
|
|
|
|
|
@app.route("/admin/attachment/<path:raw_path>", methods=["GET"])
|
|
async def get_file(raw_path: str):
|
|
attach_path = attach_service.path_from_url(raw_path)
|
|
async with attach_service._repository.db().session_slave() as session:
|
|
attach = await attach_service.get_info_bypath(session=session, path=[attach_path])
|
|
if not attach:
|
|
raise FileNotFoundError
|
|
body = await attach_service.get_data(attach_path)
|
|
|
|
cache_ctrl = "public, max-age=864000" # 10 дней
|
|
last_mod = attach[0].created_at.strftime("%a, %d %b %Y %H:%M:%S GMT")
|
|
|
|
return Response(
|
|
body,
|
|
mimetype=attach[0].content_type,
|
|
headers={
|
|
"Cache-Control": cache_ctrl,
|
|
"Last-Modified": last_mod,
|
|
},
|
|
)
|
|
|
|
|
|
@login_manager.user_loader
|
|
def load_user(alternative_id):
|
|
for user in POOL:
|
|
if user.alternative_id != alternative_id:
|
|
continue
|
|
return user
|
|
|
|
|
|
class LoginForm(FlaskForm):
|
|
username = fields.StringField(validators=[validators.InputRequired()])
|
|
password = fields.PasswordField(validators=[validators.InputRequired()])
|
|
|
|
user: User | None = None # To store the authenticated user for later use
|
|
|
|
def validate_username(self, field):
|
|
self.user = User.get(field.data, "username")
|
|
if self.user is None:
|
|
raise validators.ValidationError("Invalid username")
|
|
|
|
def validate_password(self, field):
|
|
if self.user is None:
|
|
# Skip password check if username validation already failed
|
|
return
|
|
if not self.user.check_password(field.data):
|
|
raise validators.ValidationError("Invalid password")
|
|
|
|
|
|
# Create customized index view class that handles login & registration
|
|
class ExtAdminIndexView(AdminIndexView):
|
|
@expose("/")
|
|
def index(self):
|
|
if not flask_login.current_user.is_authenticated:
|
|
return redirect(url_for(".login_view"))
|
|
return super().index()
|
|
|
|
@expose("/login/", methods=("GET", "POST"))
|
|
def login_view(self):
|
|
if flask_login.current_user.is_authenticated:
|
|
return redirect(url_for(".index"))
|
|
|
|
# handle user login
|
|
form = LoginForm(request.form)
|
|
if form.validate_on_submit():
|
|
user = form.user
|
|
flask_login.login_user(user)
|
|
# rredirect to next
|
|
if "next" in request.args:
|
|
next_url = request.args.get("next")
|
|
if next_url:
|
|
return redirect(next_url)
|
|
return redirect(url_for(".index"))
|
|
|
|
self._template_args["form"] = form
|
|
return super().index()
|
|
|
|
@expose("/logout/")
|
|
def logout_view(self):
|
|
flask_login.logout_user()
|
|
return redirect(url_for(".index"))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
admin = Admin(
|
|
app,
|
|
name="Beerds",
|
|
index_view=ExtAdminIndexView(),
|
|
theme=Bootstrap4Theme(base_template="my_master.html", fluid=True),
|
|
)
|
|
|
|
admin.add_view(AttachmentView(Attachment, db_sync.session))
|
|
|
|
app.run(debug=False)
|