FastAPI + SQLAlchemy 2.0 实战:从零构建一个带用户认证的待办事项API(附完整代码)

FastAPI + SQLAlchemy 2.0 实战:从零构建一个带用户认证的待办事项API(附完整代码)FastAPI SQLAlchemy 2 0 实战 构建企业级待办事项 API 全流程 在当今快速迭代的开发环境中 选择高效的工具组合往往能事半功倍 FastAPI 以其卓越的性能和直观的 API 设计 配合 SQLAlchemy 强大的 ORM 能力 已经成为 Python 后端开发的热门选择 本文将带你从零开始 构建一个完整的待办事项管理系统 涵盖用户认证 数据关联和 API 设计等核心功能 1

大家好,我是讯享网,很高兴认识大家。这里提供最前沿的Ai技术和互联网信息。

# FastAPI + SQLAlchemy 2.0 实战:构建企业级待办事项API全流程

在当今快速迭代的开发环境中,选择高效的工具组合往往能事半功倍。FastAPI以其卓越的性能和直观的API设计,配合SQLAlchemy强大的ORM能力,已经成为Python后端开发的热门选择。本文将带你从零开始,构建一个完整的待办事项管理系统,涵盖用户认证、数据关联和API设计等核心功能。

1. 项目架构设计与环境搭建

一个合理的项目结构是长期维护的基础。我们采用模块化设计,将不同功能的代码分离到独立文件中,既便于团队协作,也方便后续功能扩展。

推荐的项目结构如下:

todo_api/ ├── app/ │ ├── __init__.py │ ├── main.py # FastAPI应用入口 │ ├── database.py # 数据库连接配置 │ ├── models.py # SQLAlchemy数据模型 │ ├── schemas.py # Pydantic数据验证模型 │ ├── crud.py # 数据库操作函数 │ ├── auth.py # 认证相关逻辑 │ └── config.py # 应用配置 ├── requirements.txt # 依赖文件 └── tests/ # 测试目录 

安装必要的依赖包:

pip install fastapi sqlalchemy pymysql passlib[bcrypt] python-jose[cryptography] python-multipart uvicorn 

config.py中配置数据库连接和安全参数:

from pydantic import BaseSettings class Settings(BaseSettings): DATABASE_URL: str = "mysql+pymysql://user:password@localhost:3306/todo_db" SECRET_KEY: str = "your-secret-key-here" ALGORITHM: str = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES: int = 30 settings = Settings() 

2. 数据库模型设计与关系映射

SQLAlchemy 2.0引入了更简洁的声明式语法,让我们能够更直观地定义数据模型及其关系。

2.1 用户与待办事项模型

models.py中定义核心数据模型:

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String, Text from sqlalchemy.orm import relationship from .database import Base class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True, index=True) email = Column(String(255), unique=True, index=True, nullable=False) hashed_password = Column(String(255), nullable=False) is_active = Column(Boolean, default=True) items = relationship("TodoItem", back_populates="owner", cascade="all, delete-orphan") class TodoItem(Base): __tablename__ = "todo_items" id = Column(Integer, primary_key=True, index=True) title = Column(String(100), index=True, nullable=False) description = Column(Text) completed = Column(Boolean, default=False) priority = Column(Integer, default=1) owner_id = Column(Integer, ForeignKey("users.id")) owner = relationship("User", back_populates="items") 

关键点说明:

  • relationship定义了模型间的双向关联
  • cascade="all, delete-orphan"确保用户删除时其待办事项也被删除
  • nullable=False强制字段必填
  • index=True为常用查询字段创建索引

2.2 数据库连接与会话管理

database.py中配置数据库引擎和会话工厂:

from sqlalchemy import create_engine from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker from .config import settings # 同步引擎(用于迁移等操作) SYNC_DATABASE_URL = settings.DATABASE_URL sync_engine = create_engine(SYNC_DATABASE_URL, pool_pre_ping=True) # 异步引擎(用于API请求) ASYNC_DATABASE_URL = settings.DATABASE_URL.replace("pymysql", "aiomysql") async_engine = create_async_engine(ASYNC_DATABASE_URL) AsyncSessionLocal = sessionmaker( bind=async_engine, class_=AsyncSession, expire_on_commit=False ) async def get_db(): async with AsyncSessionLocal() as session: yield session 

3. 用户认证系统实现

安全的用户认证是任何系统的基石。我们将实现完整的注册、登录和令牌验证流程。

3.1 密码哈希与验证

auth.py中实现密码安全处理:

from passlib.context import CryptContext from jose import JWTError, jwt from datetime import datetime, timedelta from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from . import models, schemas from .database import AsyncSession, get_db from .config import settings pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") def verify_password(plain_password: str, hashed_password: str): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str): return pwd_context.hash(password) async def authenticate_user(db: AsyncSession, email: str, password: str): user = await db.get(models.User, email) if not user or not verify_password(password, user.hashed_password): return False return user def create_access_token(data: dict, expires_delta: timedelta = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) 

3.2 认证依赖项与路由保护

扩展auth.py添加认证依赖项:

async def get_current_user( db: AsyncSession = Depends(get_db), token: str = Depends(oauth2_scheme) ): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) email: str = payload.get("sub") if email is None: raise credentials_exception except JWTError: raise credentials_exception user = await db.get(models.User, email) if user is None: raise credentials_exception return user async def get_current_active_user( current_user: models.User = Depends(get_current_user) ): if not current_user.is_active: raise HTTPException(status_code=400, detail="Inactive user") return current_user 

4. API端点设计与实现

现在我们可以开始构建实际的API端点,实现待办事项的核心功能。

4.1 用户管理端点

main.py中添加用户相关路由:

from fastapi import FastAPI, Depends, HTTPException from sqlalchemy.ext.asyncio import AsyncSession from . import models, schemas, crud, auth from .database import get_db, async_engine app = FastAPI() @app.on_event("startup") async def startup(): async with async_engine.begin() as conn: await conn.run_sync(models.Base.metadata.create_all) @app.post("/users/", response_model=schemas.User) async def create_user( user: schemas.UserCreate, db: AsyncSession = Depends(get_db) ): db_user = await crud.get_user_by_email(db, email=user.email) if db_user: raise HTTPException(status_code=400, detail="Email already registered") return await crud.create_user(db=db, user=user) @app.post("/token", response_model=schemas.Token) async def login_for_access_token( form_data: OAuth2PasswordRequestForm = Depends(), db: AsyncSession = Depends(get_db) ): user = await authenticate_user(db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.email}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} @app.get("/users/me/", response_model=schemas.User) async def read_users_me( current_user: models.User = Depends(auth.get_current_active_user) ): return current_user 

4.2 待办事项CRUD操作

crud.py中实现待办事项的数据库操作:

from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from . import models, schemas async def get_todo_items( db: AsyncSession, user_id: int, skip: int = 0, limit: int = 100, completed: bool = None ): query = select(models.TodoItem).where( models.TodoItem.owner_id == user_id ).offset(skip).limit(limit) if completed is not None: query = query.where(models.TodoItem.completed == completed) result = await db.execute(query) return result.scalars().all() async def create_user_todo_item( db: AsyncSession, item: schemas.TodoItemCreate, user_id: int ): db_item = models.TodoItem(item.dict(), owner_id=user_id) db.add(db_item) await db.commit() await db.refresh(db_item) return db_item async def update_todo_item( db: AsyncSession, item_id: int, user_id: int, item_update: schemas.TodoItemUpdate ): result = await db.execute( select(models.TodoItem).where( models.TodoItem.id == item_id, models.TodoItem.owner_id == user_id ) ) db_item = result.scalar_one_or_none() if not db_item: return None update_data = item_update.dict(exclude_unset=True) for field, value in update_data.items(): setattr(db_item, field, value) await db.commit() await db.refresh(db_item) return db_item 

main.py中添加待办事项路由:

@app.post("/todos/", response_model=schemas.TodoItem) async def create_todo_for_user( todo: schemas.TodoItemCreate, current_user: models.User = Depends(auth.get_current_active_user), db: AsyncSession = Depends(get_db) ): return await crud.create_user_todo_item(db=db, item=todo, user_id=current_user.id) @app.get("/todos/", response_model=list[schemas.TodoItem]) async def read_todos( completed: bool = None, skip: int = 0, limit: int = 100, current_user: models.User = Depends(auth.get_current_active_user), db: AsyncSession = Depends(get_db) ): todos = await crud.get_todo_items( db, user_id=current_user.id, skip=skip, limit=limit, completed=completed ) return todos @app.patch("/todos/{todo_id}", response_model=schemas.TodoItem) async def update_todo( todo_id: int, todo_update: schemas.TodoItemUpdate, current_user: models.User = Depends(auth.get_current_active_user), db: AsyncSession = Depends(get_db) ): db_todo = await crud.update_todo_item( db, item_id=todo_id, user_id=current_user.id, item_update=todo_update ) if db_todo is None: raise HTTPException(status_code=404, detail="Todo not found") return db_todo 

5. 高级功能与优化

5.1 分页与过滤

扩展待办事项查询接口,支持更复杂的分页和过滤:

from typing import Optional from fastapi import Query @app.get("/todos/advanced/", response_model=list[schemas.TodoItem]) async def read_todos_advanced( priority: Optional[int] = Query(None, ge=1, le=5), created_after: Optional[datetime] = None, current_user: models.User = Depends(auth.get_current_active_user), db: AsyncSession = Depends(get_db) ): query = select(models.TodoItem).where( models.TodoItem.owner_id == current_user.id ) if priority is not None: query = query.where(models.TodoItem.priority == priority) if created_after is not None: query = query.where(models.TodoItem.created_at >= created_after) result = await db.execute(query.order_by(models.TodoItem.priority.desc())) return result.scalars().all() 

5.2 性能优化与缓存

添加Redis缓存层提高性能:

from fastapi_cache import FastAPICache from fastapi_cache.backends.redis import RedisBackend from fastapi_cache.decorator import cache from redis import asyncio as aioredis @app.on_event("startup") async def startup(): redis = aioredis.from_url("redis://localhost") FastAPICache.init(RedisBackend(redis), prefix="fastapi-cache") @app.get("/todos/cached/", response_model=list[schemas.TodoItem]) @cache(expire=60) async def read_todos_cached( current_user: models.User = Depends(auth.get_current_active_user), db: AsyncSession = Depends(get_db) ): return await crud.get_todo_items(db, user_id=current_user.id) 

5.3 异步任务处理

使用Celery处理耗时任务:

from celery import Celery from .config import settings celery_app = Celery( "tasks", broker=settings.CELERY_BROKER_URL, backend=settings.CELERY_RESULT_BACKEND ) @celery_app.task def send_todo_reminder(email: str, todo_title: str): # 实际项目中这里会发送邮件或通知 print(f"Reminder sent to {email} about {todo_title}") return True @app.post("/todos/{todo_id}/remind/") async def remind_todo( todo_id: int, current_user: models.User = Depends(auth.get_current_active_user), db: AsyncSession = Depends(get_db) ): result = await db.execute( select(models.TodoItem).where( models.TodoItem.id == todo_id, models.TodoItem.owner_id == current_user.id ) ) todo = result.scalar_one_or_none() if not todo: raise HTTPException(status_code=404, detail="Todo not found") send_todo_reminder.delay(current_user.email, todo.title) return {"message": "Reminder scheduled"} 
小讯
上一篇 2026-04-20 08:54
下一篇 2026-04-20 08:52

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/270977.html