基于fastapi实现登录注册
此项目主要实现功能有:
运行环境
- python3.8+
- mysql Ver 8.0.37 for Win64社区版
- 以及python中需要的库:
1 2 3 4 5 6 7 8 9 10
| fastapi==0.115.0 jose==1.0.0 pandas==2.0.3 passlib==1.7.4 pydantic==2.9.2 python_jose==3.3.0 pytz==2023.3.post1 SQLAlchemy==2.0.30 starlette==0.39.2 uvicorn==0.31.1
|
功能实现
数据库连接
database.py:数据库连接引擎(配置)
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker
engine = create_engine("mysql+pymysql://root:123456@localhost/cxk")
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
|
此后在main.py创建一个依赖
1 2 3 4 5 6 7
| def get_db(): db = database.SessionLocal() try: yield db finally: db.close()
|
建立数据模型
数据模型是在请求或者连接数据中不可缺少的一个数据整体,保证数据的完整。
pydantic数据模型 (接口使用)
建立一个datamodles.py
存放此数据模型
用户信息模型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| class userBase(BaseModel): username: str fullname: str useremail: str created_at: datetime
class userCreate(userBase): password: str
class User(userBase): class Config: orm_mode = True
|
登录信息模型:
1 2 3 4 5 6
| class LoginInfo(BaseModel): username: str login_state: bool user_ip: str error_count: int login_time: str
|
sqlalchemy数据模型(数据库操作)
建立一个SQLModles.py
来存放此数据模型。
pydantic数据模型的数据结构要与sqlalchemy数据模型的一致。
如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| class UserInDB(Base): __tablename__ = "user" id = Column(Integer, primary_key=True, index=True) username = Column('username', String(50), unique=True, index=True) fullname = Column('fullname', String(200), unique=True) useremail = Column('useremail', String(100)) created_at = Column(DateTime, default=datetime.utcnow) hashed_password = Column('hashed_password', String(64))
login_infos = relationship("LoginInfo", back_populates="user")
class LoginInfo(Base): __tablename__ = "loginInfo" id = Column(Integer, primary_key=True, index=True) username = Column(String(50), ForeignKey('user.username'), index=True) login_state = Column(Boolean, default=False) user_ip = Column(String(200)) error_count = Column(Integer, default=0) login_time = Column(DateTime, default=datetime.utcnow)
user = relationship("UserInDB", back_populates="login_infos")
|
注册实现
注册功能实现简单点就是在数据库的用户信息表增加一条数据。
建crud.py
存放数据库操作的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| def create_user(db: Session, user: datamodles.userCreate): hashed_password = get_password_hash(user.password) db_user = SQLModles.UserInDB( username=user.username, fullname=user.fullname, created_at=user.created_at, hashed_password=hashed_password, useremail=user.useremail ) db.add(db_user) db.commit() db.refresh(db_user) return db_user
|
其中哈希密码加密和验证可以另外建一个utils.py
存放,如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| from passlib.context import CryptContext
_pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password): return _pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password): return _pwd_context.hash(password)
|
注册接口
接口一律都在main.py
里完成,或者写在路由表里更好,这里就放在了main里
1 2 3 4 5 6 7 8 9
| @app.post("/user-register", response_model=datamodles.User) def register(user: datamodles.userCreate, db: Session = Depends(get_db)): dbuser = crud.get_user(db, user.username) if dbuser: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="用户名已存在" ) return crud.create_user(db, user)
|
登录实现
登录其实就是根据提交的账号密码和数据库里面的对比一下,正确就返回token就行了。
这里增加了登录次数限制这个步骤。
crud.py
:
1 2 3
| def get_user(db: Session, username: str): return db.query(SQLModles.UserInDB).filter(SQLModles.UserInDB.username == username).first()
|
下面这里实现的是登录次数限制和登录错误的记录:
crud.py
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
|
def login_verification(db: Session, username, user_ip): last_info = db.query(SQLModles.LoginInfo).filter(SQLModles.LoginInfo.username == username).order_by( SQLModles.LoginInfo.id.desc()).first() if last_info: if not last_info.login_state: if last_info.error_count >= 5: time_difference = datetime.now() - last_info.login_time total_seconds = time_difference.total_seconds() if total_seconds <= 300: return True else: last_info.error_count += 1 last_info.login_time = datetime.now() db.commit() return False else: last_info.error_count += 1 last_info.login_time = datetime.now() db.commit() else: info = SQLModles.LoginInfo( username=username, login_state=False, user_ip=user_ip, error_count=1, login_time=datetime.now() ) db.add(info) db.commit() db.refresh(info) return False else: info = SQLModles.LoginInfo( username=username, login_state=False, user_ip=user_ip, error_count=1, login_time=datetime.now() ) db.add(info) db.commit() db.refresh(info) return False
|
账号密码的验证:
1 2 3 4 5 6 7 8 9 10 11 12 13
| def verification_user(db: Session, username: str, password: str, user_ip: str): user = get_user(db, username) if not user: return False login_restriction = login_verification(db, username, user_ip) if login_restriction: return "restriction" else: if not verify_password(password, user.hashed_password): return "no_restriction" else: return user
|
登录成功后记录登录信息:
1 2 3 4 5 6 7 8 9 10 11 12 13
| def login_info_submit(db: Session, username: str, user_ip: str): l_info_data = SQLModles.LoginInfo( username=username, login_state=1, user_ip=user_ip, error_count=0, login_time=datetime.now() ) db.add(l_info_data) db.commit() db.refresh(l_info_data) return l_info_data
|
获取最新的登录成功的信息(上一次登录时间):
1 2 3 4 5 6
| def last_login_True(db: Session, username: str): info = db.query(SQLModles.LoginInfo) \ .filter(SQLModles.LoginInfo.username == username, SQLModles.LoginInfo.login_state == 1) \ .order_by(SQLModles.LoginInfo.id.desc()) \ .first() return info
|
获取登录IP
这个在main.py
里跟路由表一起:
1 2 3 4
| def getIp(request: rqtIP): client_host_ip = request.client.host return client_host_ip
|
生成token(令牌)
crud.py
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
SECRET_KEY = "" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 480
def create_token(data: dict): to_encode = data.copy() expires_delta = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) expire = datetime.now() + expires_delta expire = int(expire.timestamp()) to_encode.update({"exp": f'{str(expire)}'}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt
def extract_token(token: str): """ 如果签名过期了会报错 :param token: :return: """ payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload
|
登录接口
main.py
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @app.post("/user-login") async def userLogin(request: rqtIP, form: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): user_ip = request.client.host user = crud.verification_user(db, form.username, form.password,user_ip) if user == "restriction": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="请五分钟之后再尝试", headers={"WWW-Authenticate": "Bearer"}, ) elif user == "no_restriction": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="用户名或密码无效", headers={"WWW-Authenticate": "Bearer"}, ) elif not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="用户名或密码无效", headers={"WWW-Authenticate": "Bearer"}, )
crud.login_info_submit(db, form.username, user_ip) access_token = crud.create_token(data={"username": user.username}) response = Response() response.set_cookie(key="access_token", value=access_token, httponly=True, max_age=3600) return {"access_token": access_token, "token_type": "bearer"}
|
html响应
main.py
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| app.mount("/static", StaticFiles(directory="web"), name="web") templates = Jinja2Templates(directory="web")
@app.get("/", response_class=HTMLResponse) async def index_fist(request: Request): return templates.TemplateResponse('login.html', {"request": request})
@app.get("/{page}", response_class=HTMLResponse) async def webpage(request: Request, page: str = 'login'): if page == "": return templates.TemplateResponse('login.html', {"request": request}) elif page == 'register': return templates.TemplateResponse('register.html', {"request": request}) elif page == 'login': return templates.TemplateResponse('login.html', {"request": request}) elif page == 'index': return templates.TemplateResponse('index.html', {"request": request}) else: return '兄弟你越界了这里没有你想要的东西'
|
前端页面
前端页面:登录注册源码
项目地址
fastapi登录注册