Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, Depends, HTTPException, Request, Form, Query, status | |
| from fastapi.responses import HTMLResponse, RedirectResponse | |
| from sqlalchemy.orm import Session | |
| from models import User, Device, SystemSetting, StatusRecord | |
| from database import get_db | |
| from datetime import datetime | |
| from typing import Optional | |
| from fastapi.templating import Jinja2Templates | |
| admin_router = APIRouter(prefix="/admin", tags=["admin"]) | |
| templates = Jinja2Templates(directory="templates") | |
| def login_required(request: Request, db: Session = Depends(get_db)): | |
| username = request.cookies.get("username") | |
| if not username: | |
| return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
| user = db.query(User).filter(User.username == username).first() | |
| if not user: | |
| return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
| return user | |
| async def admin_page(request: Request, db: Session = Depends(get_db)): | |
| current_user = login_required(request, db) | |
| if isinstance(current_user, RedirectResponse): | |
| return current_user | |
| if not current_user.is_admin: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" | |
| ) | |
| users = db.query(User).all() | |
| devices = db.query(Device).all() | |
| system_setting = db.query(SystemSetting).first() | |
| return templates.TemplateResponse( | |
| "admin.html", { | |
| "request": request, | |
| "users": users, | |
| "devices": devices, | |
| "system_setting": system_setting, | |
| "current_user": current_user | |
| } | |
| ) | |
| async def edit_system_setting( | |
| request: Request, | |
| check_connect_period: int = Form(...), | |
| data_sync_period: int = Form(...), | |
| get_config_period: int = Form(...), | |
| point_distance: int = Form(...), | |
| db: Session = Depends(get_db), | |
| ): | |
| current_user = login_required(request, db) | |
| if isinstance(current_user, RedirectResponse): | |
| return current_user | |
| if not current_user.is_admin: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" | |
| ) | |
| system_setting = db.query(SystemSetting).first() | |
| if not system_setting: | |
| system_setting = SystemSetting() | |
| db.add(system_setting) | |
| system_setting.check_connect_period = check_connect_period | |
| system_setting.data_sync_period = data_sync_period | |
| system_setting.get_config_period = get_config_period | |
| system_setting.point_distance = point_distance | |
| db.commit() | |
| return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND) | |
| async def admin_data( | |
| request: Request, | |
| start_date: Optional[str] = Query(None), | |
| end_date: Optional[str] = Query(None), | |
| device_id: Optional[str] = Query(None), | |
| page: int = Query(1, ge=1), | |
| per_page: int = Query(10, ge=10, le=100), | |
| db: Session = Depends(get_db) | |
| ): | |
| current_user = login_required(request, db) | |
| if isinstance(current_user, RedirectResponse): | |
| return current_user | |
| if not current_user.is_admin: | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized") | |
| query = db.query(StatusRecord) | |
| if start_date: | |
| query = query.filter(StatusRecord.timestamp >= datetime.strptime(start_date, "%Y-%m-%d")) | |
| if end_date: | |
| query = query.filter(StatusRecord.timestamp <= datetime.strptime(end_date, "%Y-%m-%d")) | |
| if device_id: | |
| query = query.filter(StatusRecord.device_id == device_id) | |
| total_records = query.count() | |
| total_pages = (total_records + per_page - 1) // per_page | |
| records = query.order_by(StatusRecord.timestamp.desc()).offset((page - 1) * per_page).limit(per_page).all() | |
| devices = db.query(Device.device_id).distinct().all() | |
| device_ids = [device.device_id for device in devices] | |
| # Tính toán phạm vi trang để hiển thị | |
| page_range = 5 | |
| start_page = max(1, page - page_range // 2) | |
| end_page = min(total_pages, start_page + page_range - 1) | |
| start_page = max(1, end_page - page_range + 1) | |
| return templates.TemplateResponse( | |
| "admin_data.html", | |
| { | |
| "request": request, | |
| "records": records, | |
| "current_page": page, | |
| "total_pages": total_pages, | |
| "start_page": start_page, | |
| "end_page": end_page, | |
| "start_date": start_date, | |
| "end_date": end_date, | |
| "device_id": device_id, | |
| "device_ids": device_ids, | |
| "current_user": current_user, | |
| "per_page": per_page, | |
| } | |
| ) | |
| async def delete_user(request: Request, username: str, db: Session = Depends(get_db)): | |
| current_user = login_required(request, db) | |
| if isinstance(current_user, RedirectResponse): | |
| return current_user | |
| if not current_user.is_admin: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" | |
| ) | |
| user = db.query(User).filter(User.username == username).first() | |
| if user: | |
| db.delete(user) | |
| db.commit() | |
| return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND) | |
| async def edit_user( | |
| request: Request, | |
| username: str, | |
| new_username: str = Form(...), | |
| email: str = Form(...), | |
| is_admin: bool = Form(False), | |
| is_active: bool = Form(False), | |
| old_password: str = Form(None), | |
| new_password: str = Form(None), | |
| db: Session = Depends(get_db), | |
| ): | |
| current_user = login_required(request, db) | |
| if isinstance(current_user, RedirectResponse): | |
| return current_user | |
| if not current_user.is_admin: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" | |
| ) | |
| user = db.query(User).filter(User.username == username).first() | |
| if user: | |
| if ( | |
| new_username != username | |
| and db.query(User).filter(User.username == new_username).first() | |
| ): | |
| raise HTTPException(status_code=400, detail="Username already exists") | |
| user.username = new_username | |
| user.email = email | |
| user.is_admin = is_admin | |
| user.is_active = is_active | |
| if old_password and new_password: | |
| if user.password != old_password: | |
| raise HTTPException(status_code=400, detail="Incorrect old password") | |
| user.password = new_password | |
| db.commit() | |
| return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND) | |
| async def add_device( | |
| request: Request, | |
| name: str = Form(...), | |
| description: str = Form(...), | |
| device_id: str = Form(...), | |
| password: str = Form(...), | |
| db: Session = Depends(get_db), | |
| ): | |
| current_user = login_required(request, db) | |
| if isinstance(current_user, RedirectResponse): | |
| return current_user | |
| if not current_user.is_admin: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" | |
| ) | |
| existing_device = db.query(Device).filter(Device.device_id == device_id).first() | |
| if existing_device: | |
| raise HTTPException(status_code=400, detail="Device ID already exists") | |
| new_device = Device( | |
| name=name, description=description, device_id=device_id, password=password | |
| ) | |
| db.add(new_device) | |
| db.commit() | |
| return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND) | |
| async def edit_device( | |
| request: Request, | |
| device_id: str, | |
| name: str = Form(...), | |
| description: str = Form(...), | |
| new_device_id: str = Form(...), | |
| password: str = Form(...), | |
| db: Session = Depends(get_db), | |
| ): | |
| current_user = login_required(request, db) | |
| if isinstance(current_user, RedirectResponse): | |
| return current_user | |
| if not current_user.is_admin: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" | |
| ) | |
| device = db.query(Device).filter(Device.device_id == device_id).first() | |
| if not device: | |
| raise HTTPException(status_code=404, detail="Device not found") | |
| if ( | |
| new_device_id != device_id | |
| and db.query(Device).filter(Device.device_id == new_device_id).first() | |
| ): | |
| raise HTTPException(status_code=400, detail="New Device ID already exists") | |
| device.name = name | |
| device.description = description | |
| device.device_id = new_device_id | |
| device.password = password | |
| db.commit() | |
| return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND) | |
| async def delete_device( | |
| request: Request, device_id: str, db: Session = Depends(get_db) | |
| ): | |
| current_user = login_required(request, db) | |
| if isinstance(current_user, RedirectResponse): | |
| return current_user | |
| if not current_user.is_admin: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" | |
| ) | |
| device = db.query(Device).filter(Device.device_id == device_id).first() | |
| if device: | |
| db.delete(device) | |
| db.commit() | |
| return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND) | |