学习fastapi
一、FastAPI请求响应数据
1.1新建项目
![]()
1.2两种方式运行代码
![]()
命令行和手动运行,更改代码后均可以自动刷新
uvicorn main:app --reloadctrl+c停掉代码
1.3访问交互式文档
在端口号后面加上docs
1.4路由
路由就是URL,地址和处理函数之间的映射关系,它决定了当用户访问某个特定网址时,服务器应该执行哪段代码来返回结果
#访问/hello/{name},响应结果Hello {name} @app.get("/hello/{name}") async def say_hello(name: str): return {"message": f"Hello {name}"}1.5参数
同一段接口逻辑,可以根据参数不同返回不同的数据
参数就是客户端发送请求时附带的额外信息和指令
参数的作用是让同一个接口能根据不同的输入,返回不同的输出,实现动态交互
常见参数有三类:路径参数、查询参数、请求体
1、路径参数
- 位置:URL路径的一部分:/hello/{name}
- 作用:指向唯一的、特定的资源
- 方法:GET
#访问/hello/{name},响应结果Hello {name} @app.get("/hello/{name}") async def say_hello(name: str): return {"message": f"Hello {name}"}路径参数-类型注解Path:
1、导入Path
from fastapi import FastAPI,Path2、给参数加限制
#限制新闻id范围为1-100 @app.get("/news/{id}") async def news_id(id:int=Path(...,gt=0,lt=101,description="id的范围是1-100")): return {"id":id,"msg":f"新闻的id是{id}"} #限制新闻的标题长度是5-10 @app.get("/{title}") async def news_title(title:str=Path(...,min_length=2,max_length=10)): return {f"新闻的标题是{title}"}2、查询参数
声明的参数不是路径参数时,路径操作函数会把参数自动解释为查询参数
- 位置:在UPL问好后面的部分,例如:URL?k1=v1&k2=v2
- 作用:对资源集合进行过滤、排序、分页等操作
- 方法:GET
为查询参数添加类型注解:有python原生注解和Query注解
1、导入Query
from fastapi import FastAPI,Query2、给参数加限制
@app.get("/news/news_list") async def get_news_list( skip:int=Query(default=0,ge=0,le=10,description="开始的新闻数"), limit:int=Query(default=10,ge=10,le=100,description="每页新闻数")): return {"skip":skip,"limit":limit}3、请求体参数
- 位置:HTTP请求中的消息体(body)中
- 作用:创建、更新资源、携带大量数据,如JSON
- 方法:POST、PUT等
在HTTP协议中,一个完整的请求由下列三部分组成
- 请求行:包含方法、URL、协议版本
- 请求头:元数据信息(内容类型、鉴权信息等)
- 请求体:实际要发送的数据内容
1、导入基类
from pydantic import BaseModel2、根据需求定义类型,写post提交数据
#注册:用户名和密码->str class User(BaseModel): username: str password: str @app.post("/register") async def register(user: User): return user请求体参数-类型注解Field
1、导入pydantic的Field函数
from pydantic import BaseModel,Field2、给类加上Field注解
#title不能为空,author长度1-10,出版社默认黑马出版社,售价不能为空,价格大于0 class Book(BaseModel): title: str=Field(...) author: str=Field(min_length=1,max_length=10) isbn: str=Field(default="黑马出版社") price: float=Field(...,gt=0) @app.post("/get_book") async def get_book(book: Book): return book1.6请求与响应
响应类型:默认情况下,Fast API会自动将路径操作函数返回的Python对象(字典、列表、Pydantic模型等),经由jsonable_encoder转化为JSON兼容格式,并包装为JSONResponse返回。这省去了手动序列化的步骤,让开发者更专注于业务逻辑,
如果需要返回非JSON数据(如HTML、文件流),FastAPI提供了丰富的相应类型来返回不同的数据
| 响应类型 | 用途 | 示例 |
|---|---|---|
| JSONResponse | 默认响应,返回JSON数据 | return {"key":"value"} |
| HTMLResponse | 返回HTML内容 | return HTMLResponse(html_content) |
| PlainTextResponse | 返回纯文本 | return PlainTextResponse("text") |
| FieldResponse | 返回文件下载 | return FileResponse(path) |
| StreamingResponse | 流式响应 | 生成器返回数据 |
| RedirectResponse | 重定向 | return RedirectResponse(url) |
1、JSONResponse
之前写的任意一个python接口,返回的都是JSON数据
其他响应类型设置方式:
1、装饰器指定响应类:适用场景:接口固定了返回类型(HTML、纯文本等)
2、返回响应对象:场景:文件下载、图片、流式响应
装饰器指定响应类:
例如:设置响应类为HTMLResponse,当前接口即可返回HTML内容
1、导入类:
from fastapi.responses import HTMLResponse2、装饰器直接指定响应类
#响应html内容 @app.get("/html",response_class=HTMLResponse) async def get_html(): return "<h1>一级标题</h1>"返回响应对象:例如:响应文件格式
FileResponse是FastAPI提供的专门用于高效返回文件内容(如图片、PDF、Excel、音视频等)的相应类。他能够只能处理文件路径、媒体类型推断、范围请求和缓存头部,是服务静态文件的推荐方式
1、导入类:
from fastapi.responses import FileResponse2、返回时指定响应类型
#响应file内容 @app.get("/file") async def get_file(): path= "./file/1.pdf" return FileResponse(path)自定义响应数据类型格式
response_model是路径操作装饰器(如@app.get或@app.post)的关键参数,它通过一个Pydantic模型来严格定义和约束API端点的输出格式,这一机制在自动数据验证和序列化的同时,更是保障数据安全性的第一道防线
1、导入类
2、自定义数据类型格式
class News(BaseModel): id:int title:str content:str #正确代码 @app.get("/news/{id}",response_model=News) async def get_news(id:int): return { "id":id, "title":f"这是第{id}书", "content":"这是本好书", } #错误代码 @app.get("/news/{id}",response_model=News) async def get_news(id:int): return { "id":id, "title":f"这是第{id}书", }正确:
错误:
1.7异常响应处理
对于客户端引发的错误(4xx,如资源未找到,认证失败),使用fastapi.HTTPException来终端处理流程,并返回标准错误响应
1、导入HTTPException
from fastapi import HTTPException2、代码:
#需求-按照id查找新闻 @app.get("/new/{id}") async def get_new(id:int): id_list=[1,2,3,4,5] if id not in id_list: raise HTTPException(status_code=404,detail="没有找到这条新闻") return {f"第{id}条新闻"}结果:
1.8中间件
中间件的作用:为每个请求添加统一的处理逻辑(记录日志、身份认证、跨域、设置响应头、性能监控等)
中间件的定义:
函数的顶部的hi用装饰器@app.midddleware("http")
#中间件1 @app.middleware("http") #request指的是请求,包含请求路径方法等 # call_next是自动向下执行 async def middleware1(request,call_next): print("中间件1,开始") # 让请求继续往后执行,知道路由处理完,拿到返回的响应,再回到中间件 response=await call_next(request) print("中间件1,结束") return response #中间件2 @app.middleware("http") async def middleware1(request,call_next): print("中间件2,开始") response=await call_next(request) print("中间件2,结束") return response @app.get("/") async def root(): return {"message": "Hello World"}多个中间价的执行顺序:自下而上,上面代码的结果如下:
1.9依赖注入
作用:抽取可复用的组件,实现代码复用、解耦且可以轻松替换依赖项进行测试
怎么使用依赖注入系统:
创建依赖项->导入Depends->声明依赖项
from fastapi import FastAPI,Depends,Query #2、导入depends # 分页逻辑公用:新闻列表和用户列表 #1、依赖项 async def common_list( skip:int=Query(0,ge=0), limit:int=Query(10,le=60), ): return {"skip":skip,"limit":limit} #3、注入依赖项 @app.get("/news/news_list") async def get_news_list(commons=Depends(common_list)): return commons @app.get("/user/user_list") async def get_user_list(): return {"msg":"你好"}结果:get_news_list成功注入依赖项:
二、ORM
ORM(Object-RelationMapping,对象关系映射)是一种编程技术,用于在面向对象编程语言和关系型数据库之间建立映射,它允许开发者通过操作对象的方式与数据库进行交互,无需直接编写复杂的SQL语句
2.1安装包
pip install sqlalchemy[asyncio] aiomysql2.2建库、建表
三步走:创建数据库引擎->定义模型类->启动应用时建表
1、创建数据库引擎
先用mysql创建一个数据库:
MySQL安装包:MySQL :: Download MySQL Installerhttps://dev.mysql.com/downloads/installer/安装完后进入MySQL 创建一个数据库
create database fastapi_one从SQLAlchemy的异步扩展模块里,导入创建异步数据库引擎的功能
from sqlalchemy.ext.asyncio import create_async_engine然后创建异步引擎
# 1、创建异步引擎 ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_one?charset=utf8" async_engine=create_async_engine( ASYNC_DATABASE_URL, echo=True, #可选输出SQL日志 #最多处理30个数据库操作,平时保持10个连接(pool_size=10,) pool_size=10,#设置连接池活跃的连接数 max_overflow=20,#允许额外的连接数 )2、ORM-定义模型类
先导入所需模块
#所有表模型必须继承DeclarativeBase,才能映射成数据库表, from sqlalchemy.orm import DeclarativeBase,Mapped,mapped_column from sqlalchemy import DateTime, func, String,Float from datetime import datetime定义模型类:
1、基类(通用属性和字段的映射),继承DeclarativeBase
2、一个数据库表对应一个模型类
#2、定义模型类:基类+表对应的模型类 # 基类是指通用的字段,这里包括创建时间、更新时间;书籍表:id,作者,价格,出版社 class Base(DeclarativeBase): create_time:Mapped[datetime]=mapped_column(DateTime,default=func.now(),comment="创建时间") update_time:Mapped[datetime]=mapped_column(DateTime,default=func.now(),onupdate=func.now,comment="更新时间") #继承上面的基类 class Book(Base): __tablename__ = "book" id:Mapped[int]=mapped_column(primary_key=True,comment="书籍id") bookname:Mapped[str]=mapped_column(String(255),comment="书名") author:Mapped[str]=mapped_column(String(255),comment="作者") price:Mapped[float]=mapped_column(Float,comment="价格") publisher:Mapped[str]=mapped_column(String(255),comment="出版社")3、建表
先定义一个函数去建表:
#根据写的模型类,自动去数据库建表 async def create_tables(): #获取异步引擎,创建事务-表 #async with 异步引擎.开始连接() as 连接对象:打开一个异步的数据库连接 async with async_engine.begin() as conn: #run_sync:让同步代码能在异步里运行 #Base.metadata:所有继承了Base的模型类(这里只有Book) #create_all:create all tables → 创建所有表 await conn.run_sync(Base.metadata.create_all) #Base模型类的元数据创建在FastAPI启动时去调用建表的函数,立即去建表
#@app.在事件(“启动时”):FastAPI 一启动,就立刻执行下面的函数 @app.on_event("startup") async def startup(): #调用这个函数去建表 await create_tables()查看是否把表建好了:
先运行,跑起来没报错,然后点Database
选择Mysql,填写用户名、密码以及数据库进行连接。
连接成功,可以看到已经成功创建表了:
2.3在路由中使用ORM
核心:创建依赖项,使用Depends 将依赖项注入到处理函数中
1、导入会话类和会话工厂类
from sqlalchemy.ext.asyncio import create_async_engine,async_sessionmaker,AsyncSession2、在路由中注入依赖项,用来获取数据库会话
# 需求:查询用户功能的接口:创建异步会话工厂->创建依赖项->注入依赖项到路由处理函数中 #创建异步会话工厂 #造一个专门生产异步数据库会话的工厂,绑定好数据库连接,生产出来的会话必须是异步版本! AsyncSessionLocal=async_sessionmaker( bind=async_engine,#绑定数据库引擎 expire_on_commit=False,#提交后会话不会过期,不会重新查询数据库 class_=AsyncSession #生产异步会话 ) #依赖项,用来获取数据库会话 async def get_database(): async with AsyncSessionLocal() as session: try: yield session #返回会话给路由处理函数 await session.commit() #提交事务 except Exception: await session.rollback() #有异常,回滚 raise finally: await session.close() #关闭会话 @app.get("/user/users") #注入依赖 async def get_users_list(db: AsyncSession = Depends(get_database)): res = await db.execute(select(User)) users = res.scalars().all() return users2.4数据库操作
1、查询
核心语句:await db.execute(select(模型类)),返回一个ORM对象
@app.get("/users") #依赖注入 async def get_users_list_all(db: AsyncSession = Depends(get_database)): res = await db.execute(select(User)) users = res.scalars().all() #查询所有 return users @app.get("/user_first") async def get_user_first(db:AsyncSession=Depends(get_database)): res=await db.execute(select(User)) user_first=res.scalars().first() return user_first @app.get("/get_user") async def get_user(db:AsyncSession=Depends(get_database)): res=await db.execute(select(User)) get_u=await db.get(User,5)#根据主键获取单条数据 return get_u2、查询条件
核心语句:select(User).where(条件,条件2...)
条件:
- 比较判断:==、>、<、>=、<=
- 模糊查询:like()
- 或与非查询:&、|、~
- 包含查询:in_()
比较判断:
# orm条件查询 @app.get("/users/{user_id}") async def get_user_id(user_id:int,db:AsyncSession=Depends(get_database)): res=await db.execute(select(User).where(User.id==user_id)) #条件查询:查询参数等于User表中id的的数据 user=res.scalar_one_or_none() #可能查不到,也可能查到,这里要用scalar_one_or_none()方法,注意scalar没有s return user @app.get("/search_user") async def get_search_user(db:AsyncSession=Depends(get_database)): res=await db.execute(select(User).where(User.id<=4)) #查询所有id小于4的用户 users=res.scalars().all() return users模糊查询:
- %:零个、一个或多个字符
- _:单个字符
#模糊查询 @app.get("/search_like_user1") async def get_search_like_user1(db:AsyncSession=Depends(get_database)): res=await db.execute(select(User).where(User.user_name.like("见%"))) #%匹配多个字符 user=res.scalars().all() return user @app.get("/search_like_user2") async def get_search_like_user2(db:AsyncSession=Depends(get_database)): res=await db.execute(select(User).where(User.user_name.like("见_"))) #_匹配单个字符 user=res.scalars().all() return user或与非查询:
#and @app.get("/search_and") async def get_search_and(db:AsyncSession=Depends(get_database)): res=await db.execute(select(User).where(User.user_name.like("%天")&(User.id>=5))) #并且 user=res.scalars().all() return user #or @app.get("/search_or") async def get_search_or(db:AsyncSession=Depends(get_database)): res=await db.execute(select(User).where(User.user_name.like("%天")|(User.id>=5))) #或 user=res.scalars().all() return user #not @app.get("/search_not") async def get_search_not(db:AsyncSession=Depends(get_database)): res=await db.execute(select(User).where(~User.user_name.like("%天"))) #非 user=res.scalars().all() return user包含查询:
@app.get("/search_include") async def get_search_include(db:AsyncSession=Depends(get_database)): id_list=[1,3,4,7,8] res=await db.execute(select(User).where(User.id.in_(id_list))) #包含查询 user=res.scalars().all() return user3、聚合查询
聚合计算:select(func.方法(模型类:属性))
- count:统计行数量
- avg:求平均值
- max:求最大值
- min:求最小值
- sum:求和
@app.get("/get_user") async def get_user(db:AsyncSession=Depends(get_database)): # res=await db.execute(select(func.count(User.id))) #count统计数量 # res = await db.execute(select(func.sum(User.id))) #sum统计总和 # res = await db.execute(select(func.max(User.id))) #max统计最大值 # res = await db.execute(select(func.min(User.id))) #统计最小值 res = await db.execute(select(func.avg(User.id))) #统计平均值 num=res.scalar() return num4、分页查询
分页查询:select(模型类).offset().limit()
- offset括号里填写跳过的记录树
- limit括号里填写返回的记录数
| 当前页码 | 每页数量(limit) | 跳过数量(offest) |
|---|---|---|
| 1 | 10 | 0 |
| 2 | 10 | 10 |
| 3 | 10 | 20 |
| 4 | 10 | 30 |
offset值=(当前页码-1)*每页的数量(limit)
@app.get("/get_user") async def get_user( page:int = 1, page_size:int = 2, db:AsyncSession=Depends(get_database)): skip=(page-1)*page_size st=select(User).offset(skip).limit(page_size) res=await db.execute(st) user=res.scalars().all() return user5、新增数据
核心步骤:定义ORM对象(创建请求体参数)->创建ORM对象->添加对象到事务(add)->commit提交到数据库
#需求:输入用户信息(id,user_name,password),设置请求体参数BaseModel class UserBase(BaseModel): id:int #字段名需要完全一致 user_name:str password:str #新增数据->提交用post请求 @app.post("/add_user") async def add_user(user:UserBase,db:AsyncSession=Depends(get_database)): #创建ORM对象->然后新增数据——>提交这个数据 user_obj=User(**user.__dict__) db.add(user_obj) await db.commit() return user6、更新数据
核心步骤:先查询数据(get/scalars)->属性重新赋值->commit提交到数据库
7、删除数据
核心步骤:先查数据->delete删除数据->commit提交到数据库
#删除用delete @app.delete("/delete_user/{user_id}") async def delete_user(user_id: int,db:AsyncSession=Depends(get_database)): id_db=await db.get(User,user_id) if id_db is None: raise HTTPException(status_code=404,detail="查无此人") #删除图书 await db.delete(id_db) #将修改的数据提交给数据库 await db.commit() return {"msg":"删除成功"}写在最后,学这个好难啊,但我居然看完了,又给自己灌了很多鸡汤:世界上有两条路,一条是错的,一条是难走的路;前路是路,错的路也是路,后路也是路,只有原地不动时没有路
