import httpx import asyncio import base64 from dataclasses import dataclass, asdict import logging import pprint import difflib logging.basicConfig(level=logging.DEBUG) logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING) @dataclass class UniqueIDReadOnlyLDAPUserStoreManagerProperties(): ConnectionURL: str = "ldap://" ConnectionName: str = "uid=,ou=" ConnectionPassword: str = "password" UserSearchBase: str = "ou=Users,dc=wso2,dc=org" UserNameAttribute: str = "uid" UserNameSearchFilter: str = "(&(objectClass=person)(uid=?))" UserNameListFilter: str = "(objectClass=person)" UserIDAttribute: str = "uid" UserIdSearchFilter: str = "(&(objectClass=person)(uid=?))" BackLinksEnabled: bool = True MemberOfAttribute: str = "memberOf" # Optionnal mon cul Disabled: bool = False ReadGroups: bool = True GroupSearchBase: str = "ou=Groups,dc=wso2,dc=org" GroupNameAttribute: str = "cn" GroupNameSearchFilter: str = "(&(objectClass=groupOfNames)(cn=?))" GroupNameListFilter: str = "(objectClass=groupOfNames)" CaseInsensitiveUsername: bool = True MembershipAttribute: str = "member" # Undocumented ??? UserEntryObjectClass: str = "inetOrgPerson" def compare_dicts(d1, d2): diff = ('\n' + '\n'.join(difflib.ndiff( pprint.pformat(d1).splitlines(), pprint.pformat(d2).splitlines())) ) logging.info("diff: %s", diff) async def create_keystore(client: httpx.AsyncClient): url = "https://localhost:9443/t/carbon.super/api/server/v1/userstores/" data = { "typeId": base64.urlsafe_b64encode(b"UniqueIDReadOnlyLDAPUserStoreManager").decode("utf-8").rstrip("="), "description": "New user store from API", "name": "RemoteLDAP2", "properties": [ { "name": key, "value": value } for key, value in asdict(UniqueIDReadOnlyLDAPUserStoreManagerProperties()).items() ] } # FIXME: this works to create the userstore but the Java is not very happy from it resp = await client.post(url, json=data) logging.debug("request: %s", resp.request.content) logging.debug("resp [%s]: %s", resp.status_code, resp.json()) async def create_idp(client: httpx.AsyncClient): url = "https://localhost:9443/t/carbon.super/api/server/v1/identity-providers" data = { "name": "test_idp", "description": "A test IDP", "provisioning": { "jit": { "isEnabled": True, "scheme": "PROVISION_SILENTLY", "userstore": "RemoteLDAP2" # Name of the userstore }, "outboundConnectors": None } } resp = await client.post(url, json=data) logging.debug("request: %s", resp.request.content) logging.debug("resp [%s]: %s", resp.status_code, resp.json()) async def create_sp(client: httpx.AsyncClient, sp_name: str): url = "https://localhost:9443/t/carbon.super/api/server/v1/applications" data = { "name": sp_name, "description": f"A SP for {sp_name}", "accessUrl": "https://example.com/login", "inboundProtocolConfiguration": { "saml": { } }, "outboundProvisioningIdps": [ { "idp": "test_idp" } ] } resp = await client.post(url, json=data) logging.debug("request: %s", resp.request.content) logging.debug("resp [%s]: %s", resp.status_code, resp.json()) async def main(): auth = httpx.BasicAuth(username="admin", password="admin") async with httpx.AsyncClient(auth=auth, verify=False) as client: try: # await create_keystore(client) # await create_idp(client) await create_sp(client, "portal") except Exception as err: logging.exception("Failure during request to WSO2") if __name__ == "__main__": asyncio.run(main())