FastAPI – JWTトークンによるログイン認証

前回Userモデルまで作ったので、次はログイン認証を作りたい。前回の記事は以下。

パッケージインストール

pip install python-jose[cryptography] python-multipart

python-jose[cryptgraphy]: JWTに必要

python-multipart: フォームデータを使用するのに必要

ディレクトリ構成

root
 |- auth
 |    |- __init__.py
 |    |- oauth2.py
 |    |- routes.py
 |    |- schemas.py
 |    |- token.py
 |- user
 |    |- __init__.py
 |    |- models.py
 |    |- schemas.py
 |    |- routes.py
 |
 |- myfunc
 |    |- __init__.py
 |    |- hash.py
 |
 |- database.py
 |- main.py

token.py

JWTトークンを作ったり、認証したりする関数を作る。

from datetime import datetime, timedelta
from jose import jwt, JWTError
from auth.schemas import TokenData
from sqlalchemy.orm import Session
from user.models import User
from fastapi import HTTPException, status


SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)

    to_encode.update({'exp': expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

    return encoded_jwt


def verify_token(token: str, credentials_exception, db: Session):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        email: str = payload.get('sub')
        id: str = payload.get('id')
        if email is None:
            raise credentials_exception

        token_data = TokenData(email=email)

    except JWTError:
        raise credentials_exception

    user = db.query(User).filter(User.id == id).first()
    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f'User with hte id={id} is not available.'
        )

    return user

SECRET_KEYはopenssl rand -hex 32とかで作る。誰にも知られてはいけない。上記の値はFastAPI公式ページのサンプルのコピペなので使わないように。

schemas.py

JWTトークンの発行や認証をやり取りするためのデータ構造定義。

from pydantic import BaseModel


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    email: str | None = None

oauth2.py

APIの呼び出しに認証が必要かどうかの判定を持たせるには、Dependsの引数にユーザーがアクティブかどうかの判定をもたせる必要がある。そのための設定を記述するファイルを作成する。

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from auth.token import verify_token
from sqlalchemy.orm import Session
from database import get_db


oauth2_scheme = OAuth2PasswordBearer(tokenUrl='login')


def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail='Could not validate credentials',
        headers={'WWW-Authenticate': 'Bearer'},
    )

    result = verify_token(token, credentials_exception, db)

    return result

routes.py

APIにアクセスするためのrouterを作成する。

from fastapi import APIRouter, Depends, status, HTTPException
from user.models import User
from database import get_db
from myfunc.hash import verify_password
from sqlalchemy.orm import Session
from auth.token import create_access_token
from fastapi.security import OAuth2PasswordRequestForm
from user.schemas import UserShow
import auth.oauth2 as oauth2


router = APIRouter(
    tags=['Auth'],
)


@router.post('/login')
def login(request: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
    user = db.query(User).filter(User.email == request.username).first()

    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f'Invalid Credentials'
        )

    if not verify_password(request.password, user.password):
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f'Incorrect password'
        )

    access_token = create_access_token(
        data={
            'sub': user.email,
            'id': str(user.id)
        }
    )

    return {'access_token': access_token, 'token_type': 'bearer'}


@router.get('/me', status_code=status.HTTP_200_OK, response_model=UserShow)
def get_me(db: Session = Depends(get_db), current_user: User = Depends(oauth2.get_current_user)):
    id = current_user.id

    user = db.query(User).filter(User.id == id).first()

    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f'User with hte id={id} is not available.'
        )

    return user

loginOAuth2PasswordRequestFormemail属性を持っていないので、.filter(User.email == request.username)とする。

認証が必要なものについては、get_meのようにcurrent_user: User = Depends(oauth2.get_current_user)を引数にもたせる。

tokenにidをもたせようとしているが、本モデルではiduuidとなっており、そのままではUUID('…') is not JSON serializableとエラーになるため、strでキャストしている。

確認

/docsにアクセスしたらカギマークが出ていて認証が必要なことがわかる。

loginにアクセスるとJWTトークンが発行されていることがわかる。

コメントを残す

メールアドレスが公開されることはありません。

このサイトはスパムを低減するために Akismet を使っています。コメントデータの処理方法の詳細はこちらをご覧ください