"""
MCP SignIn integration for FastMCP.
Provides JWKS-backed RS256 JWT verification and simple auth wiring.
"""
import logging
import jwt
from mcp.server import FastMCP
from mcp.server.auth.provider import TokenVerifier, AccessToken
from mcp.server.auth.settings import AuthSettings
from pydantic import AnyHttpUrl
class JWKSTokenVerifier(TokenVerifier):
"""
Verifies MCP SignIn JWTs using JWKS (RS256).
Validates signature, issuer, audience, and expiration; returns
an `AccessToken` on success or `None` on failure.
"""
def __init__(self, app_id: str, resource: str):
"""
Initialize with MCP SignIn app id and expected audience.
Args:
app_id: MCP SignIn application id; used for issuer and JWKS URL.
resource: Expected audience/resource for issued tokens.
"""
self.app_id = app_id
self.resource = resource
self.jwks_client = jwt.PyJWKClient(f"https://mcpsignin.com/apps/{app_id}/discovery/v2.0/keys")
async def verify_token(self, token: str) -> AccessToken | None:
"""
Validate and decode a JWT from MCP SignIn via JWKS.
Args:
token: JWT to verify.
Returns:
AccessToken on success; None on failure.
"""
key = self.jwks_client.get_signing_key(self.app_id)
try:
# Decode JWT with comprehensive validation
payload = jwt.decode(
token,
key.key,
algorithms=["RS256"], # Only allow RS256
issuer=f"https://mcpsignin.com", # Validate issuer
audience=self.resource # Validate audience
)
if payload.get("token_type") != "access":
logging.warning("Token type mismatch: %s", payload.get("token_type"))
return None
# Extract OAuth2 access token information
return AccessToken(
token=token,
client_id=payload.get("client_id"), # OAuth2 client identifier
scopes=[payload.get("scope")], # Granted permissions
expires_at=payload.get("exp"), # Token expiration timestamp
resource=self.resource, # Target resource/audience
)
# Handle specific JWT validation errors
except jwt.ExpiredSignatureError:
logging.warning("Token has expired")
return None
except jwt.InvalidAudienceError:
logging.warning("Token audience mismatch")
return None
except jwt.InvalidIssuerError:
logging.warning("Token issuer mismatch")
return None
except jwt.InvalidSignatureError as e:
logging.warning("Invalid token signature: %s", e)
return None
except Exception as e:
# Catch-all for any other verification failures
logging.error("Token verification failed: %s", e)
return None
class FastMCPSignInWrapper(FastMCP):
"""
FastMCP server preconfigured for MCP SignIn OAuth2.
Sets issuer/resource in `AuthSettings` and installs `JWKSTokenVerifier`.
"""
def __init__(self, *args, app_id: str, resource: str, **kwargs):
"""
Initialize FastMCP with MCP SignIn auth.
Args:
app_id: MCP SignIn app id.
resource: Expected audience/identifier for this server.
**kwargs: Passed through to `FastMCP`.
"""
super().__init__(
*args,
auth=AuthSettings(
# OAuth2 issuer - your MCP SignIn application
issuer_url=AnyHttpUrl(f"https://mcpsignin.com/{app_id}"),
# Resource server - your MCP server's identifier
resource_server_url=AnyHttpUrl(resource),
),
# JWT token verifier using JWKS
token_verifier=JWKSTokenVerifier(app_id, resource),
**kwargs
)