基于fastapi实现带token登录注册并限制频繁登录

基于fastapi实现登录注册

此项目主要实现功能有:

  • 登录

  • 注册

  • 登录次数时间限制

  • token验证

运行环境

  • python3.8+
  • mysql Ver 8.0.37 for Win64社区版
  • 以及python中需要的库:
1
2
3
4
5
6
7
8
9
10
fastapi==0.115.0
jose==1.0.0
pandas==2.0.3
passlib==1.7.4
pydantic==2.9.2
python_jose==3.3.0
pytz==2023.3.post1
SQLAlchemy==2.0.30
starlette==0.39.2
uvicorn==0.31.1

功能实现

数据库连接

database.py:数据库连接引擎(配置)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 第一步,导入SQLAlchemy库
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 第二步,创建数据连接引擎
# engine = create_engine("mysql://数据库账号:密码@数据库地址/数据库名称")
# engine = create_engine("mysql://root:211211@localhost/cxk")
engine = create_engine("mysql+pymysql://root:123456@localhost/cxk")

# 第三步,创建本地会话
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 第四步,创建数据模型基类
Base = declarative_base()

此后在main.py创建一个依赖

1
2
3
4
5
6
7
# 数据库连接依赖
def get_db():
db = database.SessionLocal()
try:
yield db
finally:
db.close()

建立数据模型

数据模型是在请求或者连接数据中不可缺少的一个数据整体,保证数据的完整。

pydantic数据模型 (接口使用)

建立一个datamodles.py存放此数据模型

用户信息模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 基础用户信息
class userBase(BaseModel):
username: str
fullname: str
useremail: str
created_at: datetime

class userCreate(userBase):
password: str


# 数据模型,用户,继承自UserBase
class User(userBase):
class Config:
orm_mode = True

登录信息模型:

1
2
3
4
5
6
class LoginInfo(BaseModel):
username: str
login_state: bool
user_ip: str
error_count: int
login_time: str

sqlalchemy数据模型(数据库操作)

建立一个SQLModles.py来存放此数据模型。

pydantic数据模型的数据结构要与sqlalchemy数据模型的一致。

如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class UserInDB(Base):  # 定义用户表,继承自Base
__tablename__ = "user"
id = Column(Integer, primary_key=True, index=True)
username = Column('username', String(50), unique=True, index=True)
fullname = Column('fullname', String(200), unique=True)
useremail = Column('useremail', String(100))
created_at = Column(DateTime, default=datetime.utcnow)
hashed_password = Column('hashed_password', String(64))

login_infos = relationship("LoginInfo", back_populates="user")


# 登录信息记录表
class LoginInfo(Base):
__tablename__ = "loginInfo"
id = Column(Integer, primary_key=True, index=True) # 添加主键
username = Column(String(50), ForeignKey('user.username'), index=True)
login_state = Column(Boolean, default=False)
user_ip = Column(String(200))
error_count = Column(Integer, default=0)
login_time = Column(DateTime, default=datetime.utcnow)

user = relationship("UserInDB", back_populates="login_infos")

注册实现

注册功能实现简单点就是在数据库的用户信息表增加一条数据。

crud.py存放数据库操作的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 创建一个用户
def create_user(db: Session, user: datamodles.userCreate):
# 计算密码的哈希值
hashed_password = get_password_hash(user.password)
db_user = SQLModles.UserInDB(
username=user.username,
fullname=user.fullname,
created_at=user.created_at,
hashed_password=hashed_password,
useremail=user.useremail
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
其中哈希密码加密和验证可以另外建一个utils.py存放,如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 哈希密码生成
from passlib.context import CryptContext

_pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


# 验证密码
def verify_password(plain_password, hashed_password):
return _pwd_context.verify(plain_password, hashed_password)


# 生成密码
def get_password_hash(password):
return _pwd_context.hash(password)
注册接口

接口一律都在main.py里完成,或者写在路由表里更好,这里就放在了main里

1
2
3
4
5
6
7
8
9
@app.post("/user-register", response_model=datamodles.User)
def register(user: datamodles.userCreate, db: Session = Depends(get_db)):
dbuser = crud.get_user(db, user.username)
if dbuser:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="用户名已存在" # 非Auth2,无需添加
)
return crud.create_user(db, user)

登录实现

登录其实就是根据提交的账号密码和数据库里面的对比一下,正确就返回token就行了。

这里增加了登录次数限制这个步骤。

crud.py

1
2
3
# 用户查询第一个
def get_user(db: Session, username: str):
return db.query(SQLModles.UserInDB).filter(SQLModles.UserInDB.username == username).first()

下面这里实现的是登录次数限制登录错误的记录

crud.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 检查登录次数
# 写的有点鸡肋
def login_verification(db: Session, username, user_ip):
last_info = db.query(SQLModles.LoginInfo).filter(SQLModles.LoginInfo.username == username).order_by(
SQLModles.LoginInfo.id.desc()).first()
if last_info:
if not last_info.login_state:
if last_info.error_count >= 5:
time_difference = datetime.now() - last_info.login_time
total_seconds = time_difference.total_seconds()
if total_seconds <= 300: # 时间差是五分钟
return True
else:
last_info.error_count += 1
last_info.login_time = datetime.now()
db.commit()
return False
else:
last_info.error_count += 1
last_info.login_time = datetime.now()
db.commit()
else:
info = SQLModles.LoginInfo(
username=username,
login_state=False,
user_ip=user_ip,
error_count=1,
login_time=datetime.now()
)
db.add(info)
db.commit()
db.refresh(info)
return False
else:
info = SQLModles.LoginInfo(
username=username,
login_state=False,
user_ip=user_ip,
error_count=1,
login_time=datetime.now()
)
db.add(info)
db.commit()
db.refresh(info)
return False

账号密码的验证:

1
2
3
4
5
6
7
8
9
10
11
12
13
def verification_user(db: Session, username: str, password: str, user_ip: str):
user = get_user(db, username)
if not user:
return False
login_restriction = login_verification(db, username, user_ip)
# 登录限制,True限制
if login_restriction:
return "restriction"
else:
if not verify_password(password, user.hashed_password):
return "no_restriction"
else:
return user

登录成功后记录登录信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
# 登录信息表
def login_info_submit(db: Session, username: str, user_ip: str):
l_info_data = SQLModles.LoginInfo(
username=username,
login_state=1,
user_ip=user_ip,
error_count=0,
login_time=datetime.now()
)
db.add(l_info_data)
db.commit()
db.refresh(l_info_data)
return l_info_data

获取最新的登录成功的信息(上一次登录时间):

1
2
3
4
5
6
def last_login_True(db: Session, username: str):
info = db.query(SQLModles.LoginInfo) \
.filter(SQLModles.LoginInfo.username == username, SQLModles.LoginInfo.login_state == 1) \
.order_by(SQLModles.LoginInfo.id.desc()) \
.first()
return info
获取登录IP

这个在main.py里跟路由表一起:

1
2
3
4
# 获取IP
def getIp(request: rqtIP):
client_host_ip = request.client.host
return client_host_ip
生成token(令牌)

crud.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 使用命令获取SECRET_KEY:
# openssl rand -hex 32
# 密钥
SECRET_KEY = ""
ALGORITHM = "HS256" # 算法
ACCESS_TOKEN_EXPIRE_MINUTES = 480 # 令牌有效期 单位:分钟


# 创建令牌,将用户名放进令牌
def create_token(data: dict):
to_encode = data.copy()
expires_delta = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
expire = datetime.now() + expires_delta
expire = int(expire.timestamp())
to_encode.update({"exp": f'{str(expire)}'})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt


# 解析令牌,返回用户名
def extract_token(token: str):
"""
如果签名过期了会报错
:param token:
:return:
"""
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
登录接口

main.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@app.post("/user-login")
async def userLogin(request: rqtIP,
form: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)):
user_ip = request.client.host
user = crud.verification_user(db, form.username, form.password,user_ip)
if user == "restriction":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="请五分钟之后再尝试",
headers={"WWW-Authenticate": "Bearer"},
)
elif user == "no_restriction":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码无效",
headers={"WWW-Authenticate": "Bearer"},
)
elif not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码无效",
headers={"WWW-Authenticate": "Bearer"},
)

crud.login_info_submit(db, form.username, user_ip)
access_token = crud.create_token(data={"username": user.username}) # 发放令牌
# # 将访问令牌存储在 Cookie 中
response = Response()
response.set_cookie(key="access_token", value=access_token, httponly=True, max_age=3600) # 设置过期时间为 1 小时
return {"access_token": access_token, "token_type": "bearer"} # 返回令牌

html响应

main.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# ↓前端页面
app.mount("/static", StaticFiles(directory="web"), name="web")
templates = Jinja2Templates(directory="web")


@app.get("/", response_class=HTMLResponse)
async def index_fist(request: Request):
return templates.TemplateResponse('login.html', {"request": request})


@app.get("/{page}", response_class=HTMLResponse)
async def webpage(request: Request, page: str = 'login'):
# 检查路由路径,返回对应的HTML页面
if page == "":
return templates.TemplateResponse('login.html', {"request": request})
elif page == 'register':
return templates.TemplateResponse('register.html', {"request": request})
elif page == 'login':
return templates.TemplateResponse('login.html', {"request": request})
elif page == 'index':
return templates.TemplateResponse('index.html', {"request": request})
else:
return '兄弟你越界了这里没有你想要的东西'

前端页面

前端页面:登录注册源码

项目地址

fastapi登录注册


基于fastapi实现带token登录注册并限制频繁登录
https://wechatid.github.io/2024/10/12/Fastapi-login/
作者
Imscamd
发布于
2024年10月12日
许可协议