From 358f188e54a5929a90377e311ed4d35069f5a586 Mon Sep 17 00:00:00 2001 From: Hasini Samarathunga Date: Fri, 15 May 2026 14:25:28 +0530 Subject: [PATCH 1/6] Add SDK method for switching agent token to an organization --- .../src/asgardeo_ai/agent_auth_manager.py | 37 ++++++++++++++++++- packages/asgardeo/src/asgardeo/auth/client.py | 12 ++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py b/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py index 7af0f48..5b50b31 100644 --- a/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py +++ b/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py @@ -1,5 +1,5 @@ """ -Copyright (c) 2025, WSO2 LLC. (https://www.wso2.com). +Copyright (c) 2025-2026, WSO2 LLC. (https://www.wso2.com). WSO2 LLC. licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -388,6 +388,41 @@ async def get_obo_token_with_ciba( logger.error(f"CIBA OBO token exchange failed: {e}") raise TokenError(f"CIBA OBO token exchange failed: {e}") + async def switch_token_to_organization( + self, + token: str, + switching_organization: str, + scopes: Optional[List[str]] = None + ) -> OAuthToken: + """Switch token to a sub-organization. + + :param token: The current access token to be switched. + :param switching_organization: The ID or UUID of the target organization. + :param scopes: Optional list of scopes to request. + :return: OAuth token for the switched organization. + """ + if not token: + raise ValidationError("Token is required for organization switch.") + if not switching_organization: + raise ValidationError("switching_organization is required.") + + scope_str = ' '.join(scopes) if scopes else "add" + + try: + switched_token = await self.token_client.get_token( + 'organization_switch', + token=token, + switching_organization=switching_organization, + scope=scope_str + ) + return switched_token + + except (TokenError, ValidationError): + raise + except Exception as e: + logger.error(f"Organization switch failed: {e}") + raise TokenError(f"Organization switch failed: {e}") + async def revoke_token( self, token: str, diff --git a/packages/asgardeo/src/asgardeo/auth/client.py b/packages/asgardeo/src/asgardeo/auth/client.py index 780c9ad..9601f7e 100644 --- a/packages/asgardeo/src/asgardeo/auth/client.py +++ b/packages/asgardeo/src/asgardeo/auth/client.py @@ -327,6 +327,18 @@ async def get_token(self, grant_type: str, **kwargs: Any) -> OAuthToken: scope = kwargs.get("scope") if scope: data["scope"] = scope + elif grant_type == "organization_switch": + token = kwargs.get("token") + switching_organization = kwargs.get("switching_organization") + if not token or not switching_organization: + raise ValidationError( + "token and switching_organization are required for 'organization_switch' grant type.", + ) + data["token"] = token + data["switching_organization"] = switching_organization + scope = kwargs.get("scope") + if scope: + data["scope"] = scope else: raise ValidationError(f"Unsupported grant type: {grant_type}") From 9e63b924973bbf0c128433e13c3b7e8094965c7d Mon Sep 17 00:00:00 2001 From: Hasini Samarathunga Date: Fri, 15 May 2026 14:28:45 +0530 Subject: [PATCH 2/6] Add method for building organization authorization url with organization discovery --- .../src/asgardeo_ai/agent_auth_manager.py | 120 ++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py b/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py index 5b50b31..6823475 100644 --- a/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py +++ b/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py @@ -228,6 +228,126 @@ def get_authorization_url_with_pkce( ) return auth_url, state, code_verifier + def get_org_authorization_url( + self, + scopes: List[str], + org_discovery_type: str, + value: str, + state: Optional[str] = None, + resource: Optional[str] = None, + **kwargs: Any, + ) -> Tuple[str, str]: + """Generate authorization URL for organization-specific user authentication. + + :param scopes: List of OAuth scopes to request + :param org_discovery_type: The type of organization discovery ('orgID', 'orgHandle', 'org', 'emailDomain') + :param value: The value for the discovery type + :param state: Optional state parameter (generated if not provided) + :param resource: Optional resource parameter + :param kwargs: Additional parameters for the authorization URL + :return: Tuple of (authorization_url, state) + """ + if not state: + state = generate_state() + + auth_params = { + "client_id": self.config.client_id, + "redirect_uri": self.config.redirect_uri, + "scope": " ".join(scopes), + "state": state, + "response_type": "code", + "fidp": "OrganizationSSO", + } + + # Switch case to handle each discovery type + if org_discovery_type == "orgID": + auth_params["orgId"] = value + elif org_discovery_type == "orgHandle": + auth_params["orgHandle"] = value + elif org_discovery_type == "org": + auth_params["org"] = value + elif org_discovery_type == "emailDomain": + auth_params["login_hint"] = value + auth_params["orgDiscoveryType"] = "emailDomain" + else: + raise ValueError(f"Unsupported org_discovery_type: {org_discovery_type}") + + if resource: + auth_params["resource"] = resource + + if self.agent_config: + auth_params["requested_actor"] = self.agent_config.agent_id + + auth_params.update(kwargs) + + auth_url = build_authorization_url( + f"{self.config.base_url}/oauth2/authorize", + auth_params + ) + return auth_url, state + + def get_org_authorization_url_with_pkce( + self, + scopes: List[str], + org_discovery_type: str, + value: str, + state: Optional[str] = None, + resource: Optional[str] = None, + **kwargs: Any, + ) -> Tuple[str, str, str]: + """Generate authorization URL for organization-specific user authentication with PKCE. + + :param scopes: List of OAuth scopes to request + :param org_discovery_type: The type of organization discovery ('orgID', 'orgHandle', 'org', 'emailDomain') + :param value: The value for the discovery type + :param state: Optional state parameter (generated if not provided) + :param resource: Optional resource parameter + :param kwargs: Additional parameters for the authorization URL + :return: Tuple of (authorization_url, state, code_verifier) + """ + if not state: + state = generate_state() + + code_verifier, code_challenge = generate_pkce_pair() + + auth_params = { + "client_id": self.config.client_id, + "redirect_uri": self.config.redirect_uri, + "scope": " ".join(scopes), + "state": state, + "response_type": "code", + "code_challenge": code_challenge, + "code_challenge_method": "S256", + "fidp": "OrganizationSSO", + } + + # Switch case to handle each discovery type + if org_discovery_type == "orgID": + auth_params["orgId"] = value + elif org_discovery_type == "orgHandle": + auth_params["orgHandle"] = value + elif org_discovery_type == "org": + auth_params["org"] = value + elif org_discovery_type == "emailDomain": + auth_params["login_hint"] = value + auth_params["orgDiscoveryType"] = "emailDomain" + else: + raise ValueError(f"Unsupported org_discovery_type: {org_discovery_type}") + + if resource: + auth_params["resource"] = resource + + if self.agent_config: + auth_params["requested_actor"] = self.agent_config.agent_id + + auth_params.update(kwargs) + + auth_url = build_authorization_url( + f"{self.config.base_url}/oauth2/authorize", + auth_params + ) + return auth_url, state, code_verifier + async def get_obo_token( self, auth_code: str, From a8105abd01087d3a0dc9db8693ce6fd7c697aaa0 Mon Sep 17 00:00:00 2001 From: Hasini Samarathunga Date: Fri, 15 May 2026 15:24:44 +0530 Subject: [PATCH 3/6] Add method to get organization switched agent token directly --- .../src/asgardeo_ai/agent_auth_manager.py | 29 +++++++++++++++++++ packages/asgardeo/src/asgardeo/auth/client.py | 2 +- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py b/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py index 6823475..78c2262 100644 --- a/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py +++ b/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py @@ -144,6 +144,35 @@ async def get_agent_token(self, scopes: Optional[List[str]] = None) -> OAuthToke logger.error(f"Agent authentication failed: {e}") raise AuthenticationError(f"Agent authentication failed: {e}") + async def get_organization_agent_token( + self, + switching_organization: str, + agent_scopes: Optional[List[str]] = None, + org_scopes: Optional[List[str]] = None + ) -> OAuthToken: + """Get access token for the AI agent and switch it to a sub-organization. + + :param switching_organization: The ID or UUID of the target organization. + :param agent_scopes: Optional list of OAuth scopes to request for the initial agent token. + :param org_scopes: Optional list of OAuth scopes to request for the switched token. + :return: OAuth token for the switched organization. + """ + if not switching_organization: + raise ValidationError("switching_organization is required.") + + # 1. Get agent token + agent_token = await self.get_agent_token(scopes=agent_scopes) + + if not agent_token or not agent_token.access_token: + raise TokenError("Failed to obtain a valid agent access token.") + + # 2. Switch token to organization + return await self.switch_token_to_organization( + token=agent_token.access_token, + switching_organization=switching_organization, + scopes=org_scopes + ) + def get_authorization_url( self, scopes: List[str], diff --git a/packages/asgardeo/src/asgardeo/auth/client.py b/packages/asgardeo/src/asgardeo/auth/client.py index 9601f7e..0321dda 100644 --- a/packages/asgardeo/src/asgardeo/auth/client.py +++ b/packages/asgardeo/src/asgardeo/auth/client.py @@ -1,6 +1,6 @@ """ -Copyright (c) 2025, WSO2 LLC. (https://www.wso2.com). +Copyright (c) 2025-2026, WSO2 LLC. (https://www.wso2.com). WSO2 LLC. licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From f44640eb4a213953cd3b21b0887710118e9557eb Mon Sep 17 00:00:00 2001 From: Hasini Samarathunga Date: Fri, 15 May 2026 16:40:00 +0530 Subject: [PATCH 4/6] Add review suggestions on code quality --- .../src/asgardeo_ai/agent_auth_manager.py | 143 +++++++++--------- 1 file changed, 74 insertions(+), 69 deletions(-) diff --git a/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py b/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py index 78c2262..6a59f4b 100644 --- a/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py +++ b/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py @@ -20,7 +20,7 @@ import base64 import os import time -from typing import Callable, Dict, List, Optional, Tuple, Any +from typing import Callable, Dict, List, Literal, Optional, Tuple, Any from urllib.parse import urlencode from dataclasses import dataclass @@ -43,6 +43,8 @@ logger = logging.getLogger(__name__) +OrgDiscoveryType = Literal["orgID", "orgHandle", "org", "emailDomain"] + @dataclass class AgentConfig: @@ -151,22 +153,22 @@ async def get_organization_agent_token( org_scopes: Optional[List[str]] = None ) -> OAuthToken: """Get access token for the AI agent and switch it to a sub-organization. - + :param switching_organization: The ID or UUID of the target organization. - :param agent_scopes: Optional list of OAuth scopes to request for the initial agent token. - :param org_scopes: Optional list of OAuth scopes to request for the switched token. - :return: OAuth token for the switched organization. + :param agent_scopes: Optional list of OAuth2 scopes to request for the initial agent token. + :param org_scopes: Optional list of OAuth2 scopes to request for the switched token. + :return: OAuth2 token for the switched organization. """ if not switching_organization: raise ValidationError("switching_organization is required.") - # 1. Get agent token + # 1. Get agent token. agent_token = await self.get_agent_token(scopes=agent_scopes) - + if not agent_token or not agent_token.access_token: raise TokenError("Failed to obtain a valid agent access token.") - - # 2. Switch token to organization + + # 2. Switch token to organization. return await self.switch_token_to_organization( token=agent_token.access_token, switching_organization=switching_organization, @@ -257,58 +259,66 @@ def get_authorization_url_with_pkce( ) return auth_url, state, code_verifier + def _build_org_discovery_params(self, org_discovery_type: OrgDiscoveryType, discovery_value: str) -> dict: + match org_discovery_type: + case "orgID": + return {"orgId": discovery_value} + case "orgHandle": + return {"orgHandle": discovery_value} + case "org": + return {"org": discovery_value} + case "emailDomain": + return {"login_hint": discovery_value, "orgDiscoveryType": "emailDomain"} + case _: + raise ValidationError(f"Unsupported org_discovery_type: {org_discovery_type}") + def get_org_authorization_url( self, scopes: List[str], - org_discovery_type: str, - value: str, + org_discovery_type: OrgDiscoveryType, + discovery_value: str, state: Optional[str] = None, resource: Optional[str] = None, + isEnhancedOrgAuth: Optional[bool] = False, **kwargs: Any, ) -> Tuple[str, str]: """Generate authorization URL for organization-specific user authentication. - - :param scopes: List of OAuth scopes to request + + :param scopes: List of OAuth2 scopes to request :param org_discovery_type: The type of organization discovery ('orgID', 'orgHandle', 'org', 'emailDomain') - :param value: The value for the discovery type + :param discovery_value: The identifier whose meaning depends on ``org_discovery_type``: + ``"orgID"`` → organization UUID, ``"orgHandle"`` → org handle slug, + ``"org"`` → org name, ``"emailDomain"`` → user email address used as login hint. :param state: Optional state parameter (generated if not provided) :param resource: Optional resource parameter + :param isEnhancedOrgAuth: If true, omits the fidp=OrganizationSSO parameter :param kwargs: Additional parameters for the authorization URL :return: Tuple of (authorization_url, state) """ if not state: state = generate_state() - + auth_params = { "client_id": self.config.client_id, "redirect_uri": self.config.redirect_uri, "scope": " ".join(scopes), "state": state, "response_type": "code", - "fidp": "OrganizationSSO", } - - # Switch case to handle each discovery type - if org_discovery_type == "orgID": - auth_params["orgId"] = value - elif org_discovery_type == "orgHandle": - auth_params["orgHandle"] = value - elif org_discovery_type == "org": - auth_params["org"] = value - elif org_discovery_type == "emailDomain": - auth_params["login_hint"] = value - auth_params["orgDiscoveryType"] = "emailDomain" - else: - raise ValueError(f"Unsupported org_discovery_type: {org_discovery_type}") - + + if not isEnhancedOrgAuth: + auth_params["fidp"] = "OrganizationSSO" + + auth_params.update(self._build_org_discovery_params(org_discovery_type, discovery_value)) + if resource: auth_params["resource"] = resource - + if self.agent_config: auth_params["requested_actor"] = self.agent_config.agent_id - + auth_params.update(kwargs) - + auth_url = build_authorization_url( f"{self.config.base_url}/oauth2/authorize", auth_params @@ -318,27 +328,31 @@ def get_org_authorization_url( def get_org_authorization_url_with_pkce( self, scopes: List[str], - org_discovery_type: str, - value: str, + org_discovery_type: OrgDiscoveryType, + discovery_value: str, state: Optional[str] = None, resource: Optional[str] = None, + isEnhancedOrgAuth: Optional[bool] = False, **kwargs: Any, ) -> Tuple[str, str, str]: """Generate authorization URL for organization-specific user authentication with PKCE. - - :param scopes: List of OAuth scopes to request + + :param scopes: List of OAuth2 scopes to request :param org_discovery_type: The type of organization discovery ('orgID', 'orgHandle', 'org', 'emailDomain') - :param value: The value for the discovery type + :param discovery_value: The identifier whose meaning depends on ``org_discovery_type``: + ``"orgID"`` → organization UUID, ``"orgHandle"`` → org handle slug, + ``"org"`` → org name, ``"emailDomain"`` → user email address used as login hint. :param state: Optional state parameter (generated if not provided) :param resource: Optional resource parameter + :param isEnhancedOrgAuth: If true, omits the fidp=OrganizationSSO parameter :param kwargs: Additional parameters for the authorization URL :return: Tuple of (authorization_url, state, code_verifier) """ if not state: state = generate_state() - code_verifier, code_challenge = generate_pkce_pair() - + code_verifier, code_challenge = generate_pkce_pair() + auth_params = { "client_id": self.config.client_id, "redirect_uri": self.config.redirect_uri, @@ -347,30 +361,21 @@ def get_org_authorization_url_with_pkce( "response_type": "code", "code_challenge": code_challenge, "code_challenge_method": "S256", - "fidp": "OrganizationSSO", } - - # Switch case to handle each discovery type - if org_discovery_type == "orgID": - auth_params["orgId"] = value - elif org_discovery_type == "orgHandle": - auth_params["orgHandle"] = value - elif org_discovery_type == "org": - auth_params["org"] = value - elif org_discovery_type == "emailDomain": - auth_params["login_hint"] = value - auth_params["orgDiscoveryType"] = "emailDomain" - else: - raise ValueError(f"Unsupported org_discovery_type: {org_discovery_type}") - + + if not isEnhancedOrgAuth: + auth_params["fidp"] = "OrganizationSSO" + + auth_params.update(self._build_org_discovery_params(org_discovery_type, discovery_value)) + if resource: auth_params["resource"] = resource - + if self.agent_config: auth_params["requested_actor"] = self.agent_config.agent_id - + auth_params.update(kwargs) - + auth_url = build_authorization_url( f"{self.config.base_url}/oauth2/authorize", auth_params @@ -488,7 +493,7 @@ async def get_obo_token_with_ciba( :param login_hint: Username or identifier of the user to authenticate :param agent_token: The agent's OAuthToken (used as actor_token for delegation) - :param scopes: List of OAuth scopes to request + :param scopes: List of OAuth2 scopes to request :param binding_message: Message displayed to the user during authentication :param notification_channel: Notification channel (email, sms, external) :param timeout: Maximum time to wait for authentication in seconds @@ -534,8 +539,8 @@ async def get_obo_token_with_ciba( except (CIBAAuthenticationError, ValidationError): raise except Exception as e: - logger.error(f"CIBA OBO token exchange failed: {e}") - raise TokenError(f"CIBA OBO token exchange failed: {e}") + logger.error(f"CIBA OBO token exchange failed: {e}", exc_info=True) + raise TokenError(f"CIBA OBO token exchange failed: {e}") from e async def switch_token_to_organization( self, @@ -544,19 +549,19 @@ async def switch_token_to_organization( scopes: Optional[List[str]] = None ) -> OAuthToken: """Switch token to a sub-organization. - + :param token: The current access token to be switched. :param switching_organization: The ID or UUID of the target organization. :param scopes: Optional list of scopes to request. - :return: OAuth token for the switched organization. + :return: OAuth2 token for the switched organization. """ if not token: raise ValidationError("Token is required for organization switch.") if not switching_organization: raise ValidationError("switching_organization is required.") - - scope_str = ' '.join(scopes) if scopes else "add" - + + scope_str = ' '.join(scopes) if scopes else None + try: switched_token = await self.token_client.get_token( 'organization_switch', @@ -565,12 +570,12 @@ async def switch_token_to_organization( scope=scope_str ) return switched_token - + except (TokenError, ValidationError): raise except Exception as e: - logger.error(f"Organization switch failed: {e}") - raise TokenError(f"Organization switch failed: {e}") + logger.error(f"Organization switch failed: {e}", exc_info=True) + raise TokenError(f"Organization switch failed: {e}") from e async def revoke_token( self, From a3c2511a06aca0c3b043a9b5cd32ce9683a45f4d Mon Sep 17 00:00:00 2001 From: Hasini Samarathunga Date: Fri, 15 May 2026 17:09:08 +0530 Subject: [PATCH 5/6] Update documentation for SDK methods for agent sub organization auth --- packages/asgardeo-ai/README.md | 113 +++++++++++++++++++++++++++++---- 1 file changed, 99 insertions(+), 14 deletions(-) diff --git a/packages/asgardeo-ai/README.md b/packages/asgardeo-ai/README.md index 52a8351..0585c1a 100644 --- a/packages/asgardeo-ai/README.md +++ b/packages/asgardeo-ai/README.md @@ -4,14 +4,15 @@ Python SDK for Asgardeo AI agent authentication and on-behalf-of (OBO) token flows. - - ## Features -- **Agent Authentication**: Authenticate AI agents using agent credentials -- **On-Behalf-Of (OBO) Tokens**: Get user tokens on behalf of authenticated agents -- **Async/Await Support**: Full async implementation using httpx -- **Token Management**: Handle token exchange, refresh, and revocation -- **Authorization URLs**: Generate authorization URLs for user authentication flows + +- **Agent Authentication**: Authenticate AI agents using native auth (username/password + PKCE). +- **Organization Token Switching**: Switch tokens into child organization contexts. +- **On-Behalf-Of (OBO) Tokens**: Get user tokens delegated to the agent via auth-code or CIBA. +- **CIBA Push Authentication**: Push auth requests to users' devices and poll until approved. +- **Async/Await Support**: Full async implementation using httpx. +- **Token Management**: Handle token exchange, refresh, and revocation. +- **Authorization URLs**: Generate standard, PKCE, and organization authorization URLs. ## Installation @@ -70,13 +71,69 @@ async def user_auth_flow(): auth_url, state = auth_manager.get_authorization_url(scopes) print(f"Redirect user to: {auth_url}") - + # After user authorizes and you receive the auth code: - # auth_code = "received_from_callback" - + auth_code = "received_from_callback" + agent_token = await auth_manager.get_agent_token(["openid"]) + # Get OBO token for the user obo_token = await auth_manager.get_obo_token(auth_code, scopes, agent_token) - # print(f"User access token: {obo_token.access_token}") + print(f"User access token: {obo_token.access_token}") +``` + +### Child Organization User Authorization Flow + +```python +async def child_org_user_auth_flow(): + async with AgentAuthManager(config, agent_config) as auth_manager: + # 1. Get an agent token for the child organization + org_agent_token = await auth_manager.get_organization_agent_token( + switching_organization="org-uuid-here", + org_scopes=["openid", "internal_org_user_mgt_view"], + ) + + # 2. Build an authorization URL scoped to the child organization + scopes = ["openid", "profile", "email"] + auth_url, state = auth_manager.get_org_authorization_url( + scopes=scopes, + org_discovery_type="orgID", + discovery_value="org-uuid-here", + ) + print(f"Redirect user to: {auth_url}") + + # 3. After user authorizes, receive the auth code from the callback + auth_code = "received_from_callback" + + # 4. Exchange the auth code for an OBO token delegated to the agent + obo_token = await auth_manager.get_obo_token(auth_code, scopes, org_agent_token) + print(f"Org-scoped user access token: {obo_token.access_token}") +``` + +### Organization Agent Authentication + +```python +async def org_agent_flow(): + async with AgentAuthManager(config, agent_config) as auth_manager: + # Authenticate as agent and switch into a child organization in one call. + org_token = await auth_manager.get_organization_agent_token( + switching_organization="org-uuid-here", + org_scopes=["openid", "internal_org_user_mgt_view"], + ) + print(f"Organization token: {org_token.access_token}") +``` + +### Switch Agent Token to Organization + +```python +async def switch_token_flow(): + async with AgentAuthManager(config, agent_config) as auth_manager: + # Switch any existing root access token into a child organization context. + org_token = await auth_manager.switch_token_to_organization( + token="existing_access_token", + switching_organization="org-uuid-here", + scopes=["openid", "internal_org_user_mgt_view"], + ) + print(f"Organization token: {org_token.access_token}") ``` ## API Reference @@ -85,11 +142,39 @@ async def user_auth_flow(): Main class for handling agent authentication and OBO flows. +#### Constructor + +```python +AgentAuthManager( + config: AsgardeoConfig, + agent_config: Optional[AgentConfig] = None, + authorization_timeout: int = 300, +) +``` + #### Methods -- `get_agent_token(scopes: Optional[List[str]] = None) -> OAuthToken`: Get access token for the agent -- `get_authorization_url(scopes: List[str], state: Optional[str] = None) -> Tuple[str, str]`: Generate authorization URL -- `get_obo_token(auth_code: str, agent_token: str, scopes: Optional[List[str]] = None) -> OAuthToken`: Exchange auth code for user token +| Method | Description | Returns | +|--------|-------------|---------| +| `get_agent_token(scopes)` | Authenticates the agent via native auth (username/password + PKCE) | `OAuthToken` | +| `get_organization_agent_token(switching_organization, agent_scopes, org_scopes)` | Gets agent token then switches it to a child-org in one call | `OAuthToken` | +| `get_authorization_url(scopes, state, resource, **kwargs)` | Builds an OAuth2 authorization URL to redirect a user to | `Tuple[str, str]` — `(url, state)` | +| `get_authorization_url_with_pkce(scopes, state, resource, **kwargs)` | Same as above but generates and returns a PKCE `code_verifier` | `Tuple[str, str, str]` — `(url, state, code_verifier)` | +| `get_org_authorization_url(scopes, org_discovery_type, discovery_value, ...)` | Authorization URL with org-discovery params (`orgID`, `orgHandle`, `org`, `emailDomain`) | `Tuple[str, str]` — `(url, state)` | +| `get_org_authorization_url_with_pkce(scopes, org_discovery_type, discovery_value, ...)` | Org authorization URL with PKCE | `Tuple[str, str, str]` — `(url, state, code_verifier)` | +| `get_obo_token(auth_code, agent_token, scopes, code_verifier)` | Exchanges a user's auth code for an OBO token delegated to the agent | `OAuthToken` | +| `get_obo_token_with_ciba(login_hint, agent_token, scopes, ...)` | Push-based OBO — sends auth request to user's device and polls until approved | `Tuple[CIBAResponse, OAuthToken]` | +| `switch_token_to_organization(token, switching_organization, scopes)` | Switches any access token into a child-org scoped token | `OAuthToken` | +| `revoke_token(token, token_type_hint)` | Revokes an access or refresh token | `bool` | + +#### `get_org_authorization_url` / `get_org_authorization_url_with_pkce` — `org_discovery_type` values + +| Value | `discovery_value` meaning | +|-------|--------------------------| +| `"orgID"` | Organization UUID | +| `"orgHandle"` | Organization handle/slug | +| `"org"` | Organization name | +| `"emailDomain"` | User email address (used as `login_hint` with domain-based discovery) | ### AgentConfig From f123650c7eee76649ba340be8e1b242f8a6cdfb2 Mon Sep 17 00:00:00 2001 From: Hasini Samarathunga Date: Fri, 15 May 2026 17:15:53 +0530 Subject: [PATCH 6/6] Add review suggestions on code quality --- .../src/asgardeo_ai/agent_auth_manager.py | 77 ++++++++++++++----- 1 file changed, 59 insertions(+), 18 deletions(-) diff --git a/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py b/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py index 6a59f4b..a4030bc 100644 --- a/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py +++ b/packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py @@ -21,7 +21,7 @@ import os import time from typing import Callable, Dict, List, Literal, Optional, Tuple, Any -from urllib.parse import urlencode +from urllib.parse import unquote from dataclasses import dataclass from asgardeo import ( @@ -45,6 +45,23 @@ OrgDiscoveryType = Literal["orgID", "orgHandle", "org", "emailDomain"] +_RESERVED_AUTH_KEYS = frozenset({ + "client_id", + "redirect_uri", + "scope", + "state", + "response_type", + "resource", + "fidp", + "requested_actor", + "orgId", + "orgHandle", + "org", + "login_hint", + "orgDiscoveryType", + "code_challenge", + "code_challenge_method", +}) @dataclass class AgentConfig: @@ -206,15 +223,20 @@ def get_authorization_url( if self.agent_config: auth_params["requested_actor"] = self.agent_config.agent_id - + + conflicts = _RESERVED_AUTH_KEYS.intersection(kwargs) + if conflicts: + raise ValidationError( + f"Reserved authorization parameters cannot be overridden: {', '.join(sorted(conflicts))}" + ) auth_params.update(kwargs) - + auth_url = build_authorization_url( f"{self.config.base_url}/oauth2/authorize", auth_params ) return auth_url, state - + def get_authorization_url_with_pkce( self, scopes: List[str], @@ -250,25 +272,34 @@ def get_authorization_url_with_pkce( if self.agent_config: auth_params["requested_actor"] = self.agent_config.agent_id - + + conflicts = _RESERVED_AUTH_KEYS.intersection(kwargs) + if conflicts: + raise ValidationError( + f"Reserved authorization parameters cannot be overridden: {', '.join(sorted(conflicts))}" + ) auth_params.update(kwargs) - + auth_url = build_authorization_url( f"{self.config.base_url}/oauth2/authorize", auth_params ) - return auth_url, state, code_verifier + return auth_url, state, code_verifier + + def _build_org_discovery_params(self, org_discovery_type: OrgDiscoveryType, discovery_input: str) -> dict: + discovery_input = unquote(discovery_input.strip()) if discovery_input else "" + if not discovery_input: + raise ValidationError("discovery_input is required.") - def _build_org_discovery_params(self, org_discovery_type: OrgDiscoveryType, discovery_value: str) -> dict: match org_discovery_type: case "orgID": - return {"orgId": discovery_value} + return {"orgId": discovery_input} case "orgHandle": - return {"orgHandle": discovery_value} + return {"orgHandle": discovery_input} case "org": - return {"org": discovery_value} + return {"org": discovery_input} case "emailDomain": - return {"login_hint": discovery_value, "orgDiscoveryType": "emailDomain"} + return {"login_hint": discovery_input, "orgDiscoveryType": "emailDomain"} case _: raise ValidationError(f"Unsupported org_discovery_type: {org_discovery_type}") @@ -276,7 +307,7 @@ def get_org_authorization_url( self, scopes: List[str], org_discovery_type: OrgDiscoveryType, - discovery_value: str, + discovery_input: str, state: Optional[str] = None, resource: Optional[str] = None, isEnhancedOrgAuth: Optional[bool] = False, @@ -286,7 +317,7 @@ def get_org_authorization_url( :param scopes: List of OAuth2 scopes to request :param org_discovery_type: The type of organization discovery ('orgID', 'orgHandle', 'org', 'emailDomain') - :param discovery_value: The identifier whose meaning depends on ``org_discovery_type``: + :param discovery_input: The identifier whose meaning depends on ``org_discovery_type``: ``"orgID"`` → organization UUID, ``"orgHandle"`` → org handle slug, ``"org"`` → org name, ``"emailDomain"`` → user email address used as login hint. :param state: Optional state parameter (generated if not provided) @@ -309,7 +340,7 @@ def get_org_authorization_url( if not isEnhancedOrgAuth: auth_params["fidp"] = "OrganizationSSO" - auth_params.update(self._build_org_discovery_params(org_discovery_type, discovery_value)) + auth_params.update(self._build_org_discovery_params(org_discovery_type, discovery_input)) if resource: auth_params["resource"] = resource @@ -317,6 +348,11 @@ def get_org_authorization_url( if self.agent_config: auth_params["requested_actor"] = self.agent_config.agent_id + conflicts = _RESERVED_AUTH_KEYS.intersection(kwargs) + if conflicts: + raise ValidationError( + f"Reserved authorization parameters cannot be overridden: {', '.join(sorted(conflicts))}" + ) auth_params.update(kwargs) auth_url = build_authorization_url( @@ -329,7 +365,7 @@ def get_org_authorization_url_with_pkce( self, scopes: List[str], org_discovery_type: OrgDiscoveryType, - discovery_value: str, + discovery_input: str, state: Optional[str] = None, resource: Optional[str] = None, isEnhancedOrgAuth: Optional[bool] = False, @@ -339,7 +375,7 @@ def get_org_authorization_url_with_pkce( :param scopes: List of OAuth2 scopes to request :param org_discovery_type: The type of organization discovery ('orgID', 'orgHandle', 'org', 'emailDomain') - :param discovery_value: The identifier whose meaning depends on ``org_discovery_type``: + :param discovery_input: The identifier whose meaning depends on ``org_discovery_type``: ``"orgID"`` → organization UUID, ``"orgHandle"`` → org handle slug, ``"org"`` → org name, ``"emailDomain"`` → user email address used as login hint. :param state: Optional state parameter (generated if not provided) @@ -366,7 +402,7 @@ def get_org_authorization_url_with_pkce( if not isEnhancedOrgAuth: auth_params["fidp"] = "OrganizationSSO" - auth_params.update(self._build_org_discovery_params(org_discovery_type, discovery_value)) + auth_params.update(self._build_org_discovery_params(org_discovery_type, discovery_input)) if resource: auth_params["resource"] = resource @@ -374,6 +410,11 @@ def get_org_authorization_url_with_pkce( if self.agent_config: auth_params["requested_actor"] = self.agent_config.agent_id + conflicts = _RESERVED_AUTH_KEYS.intersection(kwargs) + if conflicts: + raise ValidationError( + f"Reserved authorization parameters cannot be overridden: {', '.join(sorted(conflicts))}" + ) auth_params.update(kwargs) auth_url = build_authorization_url(