前回Userモデルまで作ったので、次はログイン認証を作りたい。前回の記事は以下。
目次
パッケージインストール
pip install python-jose[cryptography] python-multipart
python-jose[cryptgraphy]
: JWTに必要
python-multipart
: フォームデータを使用するのに必要
ディレクトリ構成
root
|- auth
| |- __init__.py
| |- oauth2.py
| |- routes.py
| |- schemas.py
| |- token.py
|- user
| |- __init__.py
| |- models.py
| |- schemas.py
| |- routes.py
|
|- myfunc
| |- __init__.py
| |- hash.py
|
|- database.py
|- main.py
token.py
JWTトークンを作ったり、認証したりする関数を作る。
from datetime import datetime, timedelta
from jose import jwt, JWTError
from auth.schemas import TokenData
from sqlalchemy.orm import Session
from user.models import User
from fastapi import HTTPException, status
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({'exp': expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token: str, credentials_exception, db: Session):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
email: str = payload.get('sub')
id: str = payload.get('id')
if email is None:
raise credentials_exception
token_data = TokenData(email=email)
except JWTError:
raise credentials_exception
user = db.query(User).filter(User.id == id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f'User with hte id={id} is not available.'
)
return user
SECRET_KEYはopenssl rand -hex 32
とかで作る。誰にも知られてはいけない。上記の値はFastAPI公式ページのサンプルのコピペなので使わないように。
schemas.py
JWTトークンの発行や認証をやり取りするためのデータ構造定義。
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
email: str | None = None
oauth2.py
APIの呼び出しに認証が必要かどうかの判定を持たせるには、Dependsの引数にユーザーがアクティブかどうかの判定をもたせる必要がある。そのための設定を記述するファイルを作成する。
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from auth.token import verify_token
from sqlalchemy.orm import Session
from database import get_db
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='login')
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Could not validate credentials',
headers={'WWW-Authenticate': 'Bearer'},
)
result = verify_token(token, credentials_exception, db)
return result
routes.py
APIにアクセスするためのrouterを作成する。
from fastapi import APIRouter, Depends, status, HTTPException
from user.models import User
from database import get_db
from myfunc.hash import verify_password
from sqlalchemy.orm import Session
from auth.token import create_access_token
from fastapi.security import OAuth2PasswordRequestForm
from user.schemas import UserShow
import auth.oauth2 as oauth2
router = APIRouter(
tags=['Auth'],
)
@router.post('/login')
def login(request: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = db.query(User).filter(User.email == request.username).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f'Invalid Credentials'
)
if not verify_password(request.password, user.password):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f'Incorrect password'
)
access_token = create_access_token(
data={
'sub': user.email,
'id': str(user.id)
}
)
return {'access_token': access_token, 'token_type': 'bearer'}
@router.get('/me', status_code=status.HTTP_200_OK, response_model=UserShow)
def get_me(db: Session = Depends(get_db), current_user: User = Depends(oauth2.get_current_user)):
id = current_user.id
user = db.query(User).filter(User.id == id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f'User with hte id={id} is not available.'
)
return user
login
でOAuth2PasswordRequestForm
はemail
属性を持っていないので、.filter(User.email == request.username)
とする。
認証が必要なものについては、get_me
のようにcurrent_user: User = Depends(oauth2.get_current_user)
を引数にもたせる。
tokenにid
をもたせようとしているが、本モデルではid
がuuid
となっており、そのままではUUID('…') is not JSON serializable
とエラーになるため、str
でキャストしている。
確認
/docs
にアクセスしたらカギマークが出ていて認証が必要なことがわかる。
login
にアクセスるとJWTトークンが発行されていることがわかる。