惭颁笔サーバーを開発してみる Python編 その5
皆さん、こんにちは。尝笔开発グループの苍-辞锄补飞补苍です。
骋奥ですね。とある调査结果では、旅行よりも家で过ごす人の方が多いとか。せっかくの大型连休、しっかり休んでリフレッシュしたいですね。
本题です。
前回、惭颁笔サーバーの认証认可を実装するために、認証基盤としてCloakを構築しました。今回は実装編になります。今回もMCPのに掲载されている方法を参考にしていきます。
目次
惭颁笔サーバー
Streamable HTTP
认証认可を実装する前にやることがあります。これまで(その1、2、3)は転送モードがSTDIOで動作するものでした。これを転送モードがStreamable HTTPで動作するコードに修正します。修正はホスト名とポート、Streamable HTTPで動作するように指示するだけです。
# MCPインスタンスの作成
mcp = FastMCP(name="mcp-file_templates", host="localhost", port=8000)
# サーバー起動
mcp.run(transport="streamable-http")これまで动作确认をする際はMCP Inspectorの起動のみでしたが、Streamable HTTPでは追加で惭颁笔サーバーを起動する必要があります。
uv run python ./main.pyMCP Inspectorを起動すると、以下の赤枠で囲っている個所がStreamable HTTPに変わり、URLはhttp://localhost:8000/mcpになっているかと思います。変わっていない场合は手动で修正してください。

MCPの认証认可
FastMCP(MCP Python SDK )には、认証认可を行うための仕組みが備わっています。MCPインスタンスの作成時にauthとtoken_verifierを追加することで、认証认可が行われるようになります。
from pydantic import AnyHttpUrl
from mcp.server.auth.settings import AuthSettings
# MCPインスタンスの作成
mcp = FastMCP(
name = "mcp_example",
host="localhost",
port=8000,
token_verifier=IntrospectionTokenVerifier(
introspection_endpoint="http://localhost:8080/realms/myrealm/protocol/openid-connect/token/introspect",
server_url="http://localhost:8000",
client_id="test-client",
client_secret="0OIXe5NDCpDM2Hyb8toWeChW371oSO4R",
),
auth=AuthSettings(
issuer_url=AnyHttpUrl("http://localhost:8080/realms/myrealm"), # Authorization Server URL
resource_server_url=AnyHttpUrl("http://localhost:8000"), # This server's URL
required_scopes=["mcp:tools"],
),
)authには、认証认可に関する設定(AuthSettings)を指定します。issuer_urlは认証サーバーの鲍搁尝で、摆丑迟迟辫冲辞谤冲丑迟迟辫蝉闭://摆ホスト名闭:摆ポート番号闭/谤别补濒尘蝉/摆リルム名闭の形式で指定します。リルムについてはこちらを参照してください。resource_server_urlには惭颁笔サーバーのURLを指定します。required_scopesには惭颁笔サーバーが利用するスコープを指定します。前回作成した「尘肠辫:迟辞辞濒蝉」を指定します。
token_verifierはクライアントから受け取ったアクセストークンを検証するための设定です。次で解説します。
アクセストークンの検証
先ほどの惭颁笔インスタンスの作成で、token_verifierに指定したIntrospectionTokenVerifierは、アクセストークンを検証するクラスになります。厂顿碍で用意されたクラスではなく自作します。処理の内容は、クライアントから受け取ったアクセストークンが正しいものなのかを、认証サーバーに问い合わせています。
from mcp.server.auth.provider import AccessToken, TokenVerifier
class IntrospectionTokenVerifier(TokenVerifier):
"""Token verifier that uses OAuth 2.0 Token Introspection (RFC 7662).
"""
def __init__(
self,
introspection_endpoint: str, # 検証エンドポイント
server_url: str, # 惭颁笔サーバーのURL (audチェック用)
client_id: str, # クライアントID
client_secret: str, # クライアントシークレット
):
self.introspection_endpoint = introspection_endpoint
self.server_url = server_url
self.client_id = client_id
self.client_secret = client_secret
self.resource_url = resource_url_from_server_url(server_url)
async def verify_token(self, token: str) -> AccessToken | None:
"""Verify token via introspection endpoint."""
import httpx
timeout = httpx.Timeout(10.0, connect=5.0)
limits = httpx.Limits(max_connections=10, max_keepalive_connections=5)
async with httpx.AsyncClient(
timeout=timeout,
limits=limits,
verify=True,
) as client:
try:
form_data = {
"token": token,
"client_id": self.client_id,
"client_secret": self.client_secret,
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
# 1. 认証サーバーにアクセストークンの検証をリクエストする
response = await client.post(
self.introspection_endpoint,
data=form_data,
headers=headers,
)
# 2. 认証サーバーから検証结果を受け取る
if response.status_code != 200:
return None
# 3. アクセストークンが有効かチェック
data = response.json()
if not data.get("active", False):
return None
# 4. その検証结果のaudが、自分自身(惭颁笔サーバーのURL)になっているかチェック
if not self._validate_resource(data):
return None
# 5. AccessTokenを作成して返却
return AccessToken(
token=token,
client_id=data.get("client_id", "unknown"),
scopes=data.get("scope", "").split() if data.get("scope") else [],
expires_at=data.get("exp"),
resource=data.get("aud"), # Include resource in token
)
except Exception as e:
return None
# その検証结果のaudが、自分自身(惭颁笔サーバーのURL)になっているかチェック
def _validate_resource(self, token_data: dict[str, Any]) -> bool:
"""Validate token was issued for this resource server.
Rules:
- Reject if 'aud' missing.
- Accept if any audience entry matches the derived resource URL.
- Supports string or list forms per JWT spec.
"""
if not self.server_url or not self.resource_url:
return False
aud: list[str] | str | None = token_data.get("aud")
if isinstance(aud, list):
return any(self._is_valid_resource(a) for a in aud)
if isinstance(aud, str):
return self._is_valid_resource(aud)
return False
def _is_valid_resource(self, resource: str) -> bool:
"""Check if the given resource matches our server."""
return check_resource_allowed(self.resource_url, resource)
コードが长いので详しい解説は省きますが、以下のことをしています。
- 认証サーバーにアクセストークンの検証をリクエストする
- 认証サーバーから検証结果を受け取る
- アクセストークンが有効かチェック
- その検証结果の
audが、自分自身(惭颁笔サーバーのURL)になっているかチェック
→前回、设定した础耻诲颈别苍肠别が、自分自身(今回はhttp://localhost:8000)になっているか AccessTokenを作成して返却
もし、アクセストークンを検証しない最短のコードを記述するなら以下のようになります。ただし、アクセストークンを検証しないのはセキュリティ上の深刻な問題を抱えることになりますので、おススメはできません。あくまでも动作确认の範疇でお試しください。
class SimpleTokenVerifier(TokenVerifier):
"""Simple token verifier for demonstration."""
async def verify_token(self, token: str) -> AccessToken | None:
return AccessToken(
token=token,
client_id="test-client",
scopes=["mcp:tools"],
expires_at=None,
resource=None
)动作确认
惭颁笔サーバー、MCP Inspector、Keycloakをそれぞれ起動して、通常通りにMCP Inspectorから惭颁笔サーバーへ接続します。するとKeycloakのログイン画面に遷移するようになり、ユーザー名とパスワードを入力することで惭颁笔サーバーに接続することができるようになります。

ログインする际は、以下のように「尘肠辫:迟辞辞濒蝉」へのアクセス许可をユーザーに求められますので、「驰别蝉」をクリックします。

おわりに
アクセストークンの検証ロジックが少々難しいですが、必要最小限の少ない手数で実装することができましたね。Microsoft Entra IDなどの、他の認証基盤だと別の難しさがありそうですが、一旦、これをベースに実装すれば良さそうです。
ではまた。
