Hi,
I created a custom MCP for ChatGPT. It works when I don’t use authentication; however, I need to implement OAUTH.
It authenticates OK, but when authenticated, it doesn’t recognize the tools’ search and fetch.
Here is my code:
from fastmcp.server import FastMCP
from pydantic import BaseModel
from dotenv import load_dotenv
from atlassian import Jira
from mcp_auth import MCPOAuth2AuthorizationServer, MCPOAuthMiddleware, create_oauth_routes
import re, shlex, os, logging
import logging
import os
load_dotenv()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SearchResult(BaseModel):
id: str
title: str
text: str
assignee: str
status: str
created: str
updated: str
priority: str
issue_type: str
class SearchResultPage(BaseModel):
results: list[SearchResult]
class FetchResult(BaseModel):
id: str
title: str
text: str
url: str | None = None
metadata: dict[str, str] | None = None
def get_jira_client():
"""Initialize Jira client with environment variables using Bearer token authentication"""
if not all(
[
os.getenv("JIRA_URL"),
os.getenv("JIRA_API_TOKEN"),
os.getenv("JIRA_PROJECT"),
]
):
raise ValueError(
"Missing Jira configuration. Please set JIRA_URL and JIRA_API_TOKEN"
)
# For Jira Server with API token authentication
return Jira(
url=os.getenv("JIRA_URL"),
token=os.getenv("JIRA_API_TOKEN"),
cloud=False, # This is Jira Server, not Cloud
)
###############################################################################################
jira = get_jira_client()
# Print meaningful Jira client information
print("\n" + "=" * 60)
print("🚀 JIRA CLIENT INFORMATION".center(60))
print("=" * 60)
# Basic connection info
print("\n📡 CONNECTION DETAILS:")
print(f" URL: {jira.url}")
print(f" API Version: {jira.api_version}")
print(f" Authentication: Bearer Token")
print(f" Cloud Mode: {getattr(jira, 'cloud', False)}")
# Server information
print("\n🖥️ SERVER INFORMATION:")
try:
server_info = jira.get_server_info()
print(f" Title: {server_info.get('serverTitle', 'N/A')}")
print(f" Version: {server_info.get('version', 'N/A')}")
print(f" Build Number: {server_info.get('buildNumber', 'N/A')}")
print(f" Deployment: {server_info.get('deploymentType', 'N/A')}")
except Exception as e:
print(f" ❌ Could not get server info: {e}")
# Current user info
print("\n👤 USER INFORMATION:")
try:
user_info = jira.myself()
print(f" Display Name: {user_info.get('displayName', 'N/A')}")
print(f" Email: {user_info.get('emailAddress', 'N/A')}")
print(f" Username: {user_info.get('name', 'N/A')}")
print(f" Active: {'✅ Yes' if user_info.get('active') else '❌ No'}")
print(f" Timezone: {user_info.get('timeZone', 'N/A')}")
except Exception as e:
print(f" ❌ Could not get user info: {e}")
# Project information
print("\n📁 PROJECT ACCESS:")
try:
projects = jira.projects()
print(f" Total Projects: {len(projects)} (access confirmed)")
except Exception as e:
print(f" ❌ Could not get projects: {e}")
print("\n" + "=" * 60)
print("✅ CONNECTION ESTABLISHED SUCCESSFULLY".center(60))
print("=" * 60 + "\n")
###############################################################################################
def create_server():
# Get server URL from environment or use default
server_host = os.getenv("HOST", "0.0.0.0")
server_port = int(os.getenv("PORT", 9000))
base_url = os.getenv("OAUTH_BASE_URL", f"https://localhost:{server_port}")
# Initialize OAuth server
oauth_server = MCPOAuth2AuthorizationServer(base_url)
mcp = FastMCP(name="Jira MCP", instructions="Search and fetch Jira issues")
# Add OAuth authentication as ASGI middleware
def create_oauth_middleware(app):
from starlette.responses import JSONResponse
async def oauth_middleware(scope, receive, send):
if scope["type"] != "http":
await app(scope, receive, send)
return
path = scope["path"].lower() # Convert to lowercase to handle /MCP/ vs /mcp/
# Allow these endpoints without authentication:
allowed_paths = [
"/", "/health", "/mcp", "/MCP",
"/authorize", "/token", "/register",
"/.well-known/oauth-authorization-server"
]
# Allow MCP protocol methods and allowed paths
if (path in [p.lower() for p in allowed_paths] or
"initialize" in path or
"tools/list" in path):
logger.info(f"Allowing unauthenticated access to: {scope['path']}")
await app(scope, receive, send)
return
# Check for Authorization header
headers = dict(scope.get("headers", []))
auth_header = headers.get(b"authorization", b"").decode()
if not auth_header.startswith("Bearer "):
logger.warning(f"Missing OAuth token for protected endpoint: {scope['path']}")
response = JSONResponse(
{"error": "OAuth token required"},
status_code=401
)
await response(scope, receive, send)
return
token = auth_header.split(" ")[1]
# Validate OAuth token
try:
token_info = oauth_server.validate_access_token(token)
if not token_info:
logger.warning(f"Invalid OAuth token: {token[:10]}...")
response = JSONResponse(
{"error": "Invalid OAuth token"},
status_code=401
)
await response(scope, receive, send)
return
logger.info(f"Authenticated tool call with valid OAuth token: {token[:10]}...")
await app(scope, receive, send)
except Exception as e:
logger.error(f"OAuth token validation error: {e}")
response = JSONResponse(
{"error": "Token validation failed"},
status_code=401
)
await response(scope, receive, send)
return oauth_middleware
# Add the middleware to FastMCP
mcp.add_middleware(create_oauth_middleware)
@mcp.tool()
async def search(query: str) -> SearchResultPage:
"""
Always return the last 100 tickets for the configured project, ignoring the user query.
"""
project = os.getenv("JIRA_PROJECT")
jql_query = f'project = "{project}" ORDER BY updated DESC'
limit = 100
logger.info(f"Executing JQL: {jql_query} (limit {limit})")
try:
jira_results = (
jira.enhanced_jql(jql_query, limit=limit)
if getattr(jira, "cloud", False)
else jira.jql(jql_query, limit=limit)
)
except Exception as e:
logger.error(f"Search failed: {e}")
return SearchResultPage(results=[])
# Process results into SearchResult list
results = []
for issue in jira_results.get("issues", []):
fields = issue.get("fields", {})
assignee = (fields.get("assignee") or {}).get("displayName", "Unassigned")
status = (fields.get("status") or {}).get("name", "Unknown")
priority = (fields.get("priority") or {}).get("name", "Unknown")
issue_type = (fields.get("issuetype") or {}).get("name", "Unknown")
results.append(
SearchResult(
id=issue.get("key", ""),
title=f"{issue.get('key', '')}: {fields.get('summary', 'Untitled Issue')}",
text=fields.get("description") or fields.get("summary", ""),
assignee=assignee,
status=status,
created=fields.get("created", ""),
updated=fields.get("updated", ""),
priority=priority,
issue_type=issue_type,
)
)
return SearchResultPage(results=results)
@mcp.tool()
async def fetch(id: str) -> FetchResult:
"""
Fetch a specific Jira issue by key (ID).
Use this after using search, or when you know the issue key (e.g. "ABC-123"),
to retrieve full details including description, status, assignee, etc.
Returns a FetchResult with the issue's details, URL, and metadata.
"""
logger.info(f"Fetching Jira issue: {id}")
try:
issue = jira.issue(id)
fields = issue.get("fields", {})
# Safely extract nested object values
assignee = (fields.get("assignee") or {}).get("displayName", "Unassigned")
status = (fields.get("status") or {}).get("name", "Unknown")
priority = (fields.get("priority") or {}).get("name", "Unknown")
issue_type = (fields.get("issuetype") or {}).get("name", "Unknown")
# Build metadata
metadata = {
"status": status,
"assignee": assignee,
"created": fields.get("created", ""),
"updated": fields.get("updated", ""),
"priority": priority,
"issue_type": issue_type,
}
return FetchResult(
id=issue.get("key", id),
title=f"{issue.get('key', '')}: {fields.get('summary', 'Untitled Issue')}",
text=fields.get("description", "") or fields.get("summary", ""),
url=f"{os.getenv('JIRA_URL')}/browse/{issue.get('key', id)}",
metadata=metadata,
)
except Exception as e:
logger.error(f"Fetch failed for {id}: {e}")
# Return a proper error result instead of raising
return FetchResult(
id=id,
title=f"Issue {id} not found",
text=f"The issue '{id}' does not exist or you don't have permission to view it.",
url=None,
metadata={"error": str(e)},
)
# Store oauth_server reference for later use
mcp._oauth_server = oauth_server
return mcp
if __name__ == "__main__":
port = int(os.getenv("PORT", 9000))
logger.info(f"Starting Jira MCP server on port {port}")
mcp_server = create_server()
# Add OAuth routes to the server
oauth_routes = create_oauth_routes(mcp_server._oauth_server)
# Register metadata endpoint
@mcp_server.custom_route("/.well-known/oauth-authorization-server", methods=["GET"])
async def metadata_endpoint(request):
return await oauth_routes["/.well-known/oauth-authorization-server"](request)
# Register client registration endpoint
@mcp_server.custom_route("/register", methods=["POST"])
async def register_endpoint(request):
return await oauth_routes["/register"](request)
# Register authorization endpoint
@mcp_server.custom_route("/authorize", methods=["GET"])
async def authorize_endpoint(request):
return await oauth_routes["/authorize"](request)
# Register token endpoint
@mcp_server.custom_route("/token", methods=["POST"])
async def token_endpoint(request):
return await oauth_routes["/token"](request)
# Add health check using custom route
@mcp_server.custom_route("/", methods=["GET"])
async def health_check(request):
from starlette.responses import JSONResponse
return JSONResponse({"status": "healthy"})
@mcp_server.custom_route("/health", methods=["GET"])
async def health_check_alt(request):
from starlette.responses import JSONResponse
return JSONResponse({"status": "healthy"})
# Add uppercase MCP route for ChatGPT compatibility
@mcp_server.custom_route("/MCP/", methods=["GET", "POST"])
async def mcp_uppercase_redirect(request):
from starlette.responses import RedirectResponse
# Redirect to lowercase mcp endpoint
return RedirectResponse(url="/mcp/", status_code=307)
mcp_server.run(transport="http", host="0.0.0.0", port=port)
logger.warning(
"Server started counter ------------------------------------------ 001"
)
Auth part
"""
MCP OAuth 2.1 Authorization Server Implementation
Compliant with MCP Authorization specification (Protocol Revision: 2025-03-26)
"""
import os
import time
import secrets
import hashlib
import base64
from typing import Dict, Optional, Any
from urllib.parse import urlencode, urlparse
from authlib.oauth2 import OAuth2Error
from authlib.common.security import generate_token
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
from starlette.requests import Request
from starlette.responses import JSONResponse, RedirectResponse
import logging
logger = logging.getLogger(__name__)
# In-memory storage for demo purposes - replace with persistent storage in production
clients_db: Dict[str, Dict[str, Any]] = {}
tokens_db: Dict[str, Dict[str, Any]] = {}
authorization_codes: Dict[str, Dict[str, Any]] = {}
class MCPOAuth2AuthorizationServer:
"""MCP-compliant OAuth 2.1 Authorization Server"""
def __init__(self, base_url: str):
self.base_url = base_url.rstrip('/')
self.issuer = self.base_url
self.secret_key = os.getenv('OAUTH_SECRET_KEY', secrets.token_urlsafe(32))
# Generate RSA key pair for JWT signing
self.private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
self.public_key = self.private_key.public_key()
def get_authorization_server_metadata(self) -> Dict[str, Any]:
"""OAuth 2.0 Authorization Server Metadata (RFC 8414)"""
return {
"issuer": self.issuer,
"authorization_endpoint": f"{self.base_url}/authorize",
"token_endpoint": f"{self.base_url}/token",
"registration_endpoint": f"{self.base_url}/register",
"response_types_supported": ["code"],
"grant_types_supported": ["authorization_code", "refresh_token"],
"code_challenge_methods_supported": ["S256"],
"token_endpoint_auth_methods_supported": ["none"],
"scopes_supported": ["read", "write"],
"subject_types_supported": ["public"]
}
def register_client(self, client_metadata: Dict[str, Any]) -> Dict[str, Any]:
"""Dynamic Client Registration (RFC 7591)"""
client_id = generate_token(24)
# Validate redirect URIs per MCP spec
redirect_uris = client_metadata.get("redirect_uris", [])
for uri in redirect_uris:
parsed = urlparse(uri)
# Must be localhost or HTTPS per MCP security requirements
if not (parsed.scheme == "https" or parsed.hostname == "localhost"):
raise OAuth2Error("invalid_redirect_uri",
"Redirect URIs must be HTTPS or localhost")
client_data = {
"client_id": client_id,
"client_secret": "", # Public client - empty string instead of None
"redirect_uris": redirect_uris,
"grant_types": client_metadata.get("grant_types", ["authorization_code"]),
"response_types": client_metadata.get("response_types", ["code"]),
"token_endpoint_auth_method": "none", # Public client
"created_at": int(time.time())
}
clients_db[client_id] = client_data
logger.info(f"Registered OAuth client: {client_id}")
return client_data
def authorize(self, request: Request) -> RedirectResponse:
"""Authorization endpoint with PKCE support"""
query_params = dict(request.query_params)
client_id = query_params.get("client_id")
redirect_uri = query_params.get("redirect_uri")
state = query_params.get("state")
code_challenge = query_params.get("code_challenge")
code_challenge_method = query_params.get("code_challenge_method", "S256")
# Validate client
if client_id not in clients_db:
raise OAuth2Error("invalid_client", "Unknown client_id")
client = clients_db[client_id]
# Validate redirect URI
if redirect_uri not in client["redirect_uris"]:
raise OAuth2Error("invalid_request", "Invalid redirect_uri")
# PKCE is required per MCP spec
if not code_challenge:
raise OAuth2Error("invalid_request", "code_challenge is required")
if code_challenge_method != "S256":
raise OAuth2Error("invalid_request", "Only S256 code_challenge_method supported")
# Generate authorization code
auth_code = generate_token(32)
# Store authorization code with PKCE data
authorization_codes[auth_code] = {
"client_id": client_id,
"redirect_uri": redirect_uri,
"code_challenge": code_challenge,
"code_challenge_method": code_challenge_method,
"expires_at": int(time.time()) + 600, # 10 minutes
"used": False
}
# For MCP servers, auto-approve the authorization
# In production, you might want to show a consent screen
callback_params = {"code": auth_code}
if state:
callback_params["state"] = state
callback_url = f"{redirect_uri}?{urlencode(callback_params)}"
return RedirectResponse(url=callback_url)
def exchange_code_for_token(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""Token endpoint - exchange authorization code for access token"""
grant_type = request_data.get("grant_type")
client_id = request_data.get("client_id")
code = request_data.get("code")
code_verifier = request_data.get("code_verifier")
redirect_uri = request_data.get("redirect_uri")
if grant_type != "authorization_code":
raise OAuth2Error("unsupported_grant_type",
"Only authorization_code grant type supported")
# Validate authorization code
if code not in authorization_codes:
raise OAuth2Error("invalid_grant", "Invalid authorization code")
auth_data = authorization_codes[code]
# Check if code is expired or already used
if auth_data["used"] or int(time.time()) > auth_data["expires_at"]:
raise OAuth2Error("invalid_grant", "Authorization code expired or used")
# Validate client and redirect URI
if (auth_data["client_id"] != client_id or
auth_data["redirect_uri"] != redirect_uri):
raise OAuth2Error("invalid_grant", "Code was issued to another client")
# Validate PKCE code verifier
if not code_verifier:
raise OAuth2Error("invalid_request", "code_verifier is required")
# Verify PKCE challenge
challenge = base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode()).digest()
).decode().rstrip('=')
if challenge != auth_data["code_challenge"]:
raise OAuth2Error("invalid_grant", "Invalid code_verifier")
# Mark code as used
auth_data["used"] = True
# Generate access token and refresh token
access_token = generate_token(32)
refresh_token = generate_token(32)
token_data = {
"access_token": access_token,
"token_type": "Bearer",
"expires_in": 3600, # 1 hour
"refresh_token": refresh_token,
"scope": "read write",
"client_id": client_id,
"created_at": int(time.time())
}
# Store token
tokens_db[access_token] = token_data
logger.info(f"Issued access token for client: {client_id}")
return {
"access_token": access_token,
"token_type": "Bearer",
"expires_in": 3600,
"refresh_token": refresh_token,
"scope": "read write"
}
def validate_access_token(self, token: str) -> Optional[Dict[str, Any]]:
"""Validate Bearer access token"""
if token not in tokens_db:
return None
token_data = tokens_db[token]
# Check if token is expired
if int(time.time()) > token_data["created_at"] + token_data["expires_in"]:
del tokens_db[token]
return None
return token_data
# OAuth middleware for protecting MCP endpoints
class MCPOAuthMiddleware:
"""Middleware to protect MCP endpoints with OAuth Bearer tokens"""
def __init__(self, app, oauth_server: MCPOAuth2AuthorizationServer,
protected_paths: list = None):
self.app = app
self.oauth_server = oauth_server
self.protected_paths = protected_paths or ["/search", "/fetch"]
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
path = scope["path"]
# Check if this path needs protection
if any(path.startswith(protected) for protected in self.protected_paths):
# Extract Authorization header
headers = dict(scope["headers"])
auth_header = headers.get(b"authorization", b"").decode()
if not auth_header.startswith("Bearer "):
# Return 401 to trigger OAuth flow per MCP spec
response = JSONResponse(
{"error": "unauthorized", "error_description": "Bearer token required"},
status_code=401,
headers={"WWW-Authenticate": "Bearer"}
)
await response(scope, receive, send)
return
# Extract and validate token
token = auth_header[7:] # Remove "Bearer " prefix
token_data = self.oauth_server.validate_access_token(token)
if not token_data:
response = JSONResponse(
{"error": "invalid_token", "error_description": "Invalid or expired token"},
status_code=401,
headers={"WWW-Authenticate": "Bearer"}
)
await response(scope, receive, send)
return
# Add token data to scope for use in endpoints
scope["oauth_token"] = token_data
await self.app(scope, receive, send)
def create_oauth_routes(oauth_server: MCPOAuth2AuthorizationServer):
"""Create OAuth endpoint routes"""
def add_cors_headers(response):
"""Add CORS headers for ChatGPT requests"""
response.headers.update({
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization",
"Access-Control-Max-Age": "86400"
})
return response
async def metadata_endpoint(request: Request):
"""OAuth 2.0 Authorization Server Metadata endpoint"""
response = JSONResponse(oauth_server.get_authorization_server_metadata())
return add_cors_headers(response)
async def register_endpoint(request: Request):
"""Dynamic Client Registration endpoint"""
try:
client_metadata = await request.json()
client_data = oauth_server.register_client(client_metadata)
response = JSONResponse(client_data, status_code=201)
return add_cors_headers(response)
except OAuth2Error as e:
response = JSONResponse(
{"error": e.error, "error_description": e.description},
status_code=400
)
return add_cors_headers(response)
except Exception as e:
logger.error(f"Registration error: {e}")
response = JSONResponse(
{"error": "server_error", "error_description": "Internal server error"},
status_code=500
)
return add_cors_headers(response)
async def authorize_endpoint(request: Request):
"""OAuth 2.1 Authorization endpoint"""
try:
return oauth_server.authorize(request)
except OAuth2Error as e:
# Redirect with error
redirect_uri = request.query_params.get("redirect_uri")
if redirect_uri:
params = {"error": e.error, "error_description": e.description}
state = request.query_params.get("state")
if state:
params["state"] = state
return RedirectResponse(url=f"{redirect_uri}?{urlencode(params)}")
else:
response = JSONResponse(
{"error": e.error, "error_description": e.description},
status_code=400
)
return add_cors_headers(response)
async def token_endpoint(request: Request):
"""OAuth 2.1 Token endpoint"""
try:
# Handle both form data and JSON
if request.headers.get("content-type", "").startswith("application/x-www-form-urlencoded"):
form_data = await request.form()
request_data = dict(form_data)
else:
request_data = await request.json()
token_response = oauth_server.exchange_code_for_token(request_data)
response = JSONResponse(token_response)
return add_cors_headers(response)
except OAuth2Error as e:
response = JSONResponse(
{"error": e.error, "error_description": e.description},
status_code=400
)
return add_cors_headers(response)
except Exception as e:
logger.error(f"Token error: {e}")
response = JSONResponse(
{"error": "server_error", "error_description": "Internal server error"},
status_code=500
)
return add_cors_headers(response)
return {
"/.well-known/oauth-authorization-server": metadata_endpoint,
"/register": register_endpoint,
"/authorize": authorize_endpoint,
"/token": token_endpoint
}```
What am I missing here? There is not much documentation about it.
