import socket, irctokens import requests import re def ircregister(username, password, email): # define the variables d = irctokens.StatefulDecoder() e = irctokens.StatefulEncoder() s = socket.socket() #connecting to the server s.connect(("127.0.0.1", 6667)) #defining the send function with proper formatting def _send(line): print(f"> {line.format()}") e.push(line) while e.pending(): e.pop(s.send(e.pending())) # registering the connection to the server _send(irctokens.build("USER", [username, "0", "*", username])) _send(irctokens.build("NICK", [username])) # define Keycloak related variables server = 'http://192.168.0.115:8880' realm = 'devel' tokenurl = 'http://localhost/kctoken' usererr = 'An error occured.' emailverified = False firstname = 'Foo' lastname = 'Bar' # go through the cases while True: lines = d.push(s.recv(1024)) if lines == None: # if nothing is received from server return "server error" break for line in lines: print(f"< {line.format()}") if line.command == "433": # if nickname already in use return "433" elif line.command == "005": # when 005 is received pass the nickserv register command command _send(irctokens.build("PRIVMSG", ["NickServ", f"REGISTER {password}"])) if line.command == 'NOTICE' and line.params == [username, f"Account created"]: # the IRC registration succeeded _send(irctokens.build("QUIT")) # proceed with connecting to Keycloak try: tokendl = requests.get(tokenurl) tokendata = tokendl.json() token = tokendata['access_token'] url = server + '/auth/admin/realms/' + realm + '/users' except: print("ERROR: Keycloak token could not be installed.") # register the user with Keycloak if re.match(r"[^@]+@[^@]+\.[^@]+", email): payload = { "firstName": firstname, "lastName": lastname, "email": email, "enabled": "true", "username": username, "credentials": [{"type": "password", "value": password, "temporary": emailverified,}], "emailVerified": emailverified } response = requests.post( url, headers = {'Content-Type': 'application/json', 'Authorization': 'Bearer ' + token}, json = payload ) print("Keycloak: HTTP Status ", response.status_code) try: print("Keycloak: Response Text: ", response.text) except: print("Keycloak: No or invalid response text. This is not an error.") try: print("Keycloak: Response JSON: ", response.json()) except: print("Keycloak: No or invalid response JSON. This it not an error.") status = response.status_code if status == 201: # success print(" SSO User " + username + " created.") return "success" # ok, done if status == 400: print("ERROR: Keycloak indicated that the request is invalid.") return "ssoerr" if status == 401: # unauthorized, usually an issue with the token print("ERROR: Fix your Keycloak API credentials and/or client roles, doh.") return "ssoerr" if status == 403: print("ERROR: Keycloak indicated that the authorization provided is not enough to access the resource.") return "ssoerr" if status == 404: # not found, usually an issue with the URL print("ERROR: Keycloak indicated that the requested resource does not exist.") return "ssoerr" if status == 409: # likely already occupied username or email address print("ERROR: Keycloak indicated that the resource already exists or \"some other coonflict when processing the request\" occured.") return "409" #to-do: parse response JSON to inform the user whether it's the username OR the email address causing the culprit if status == 415: print("ERROR: Keycloak indicated that the requested media type is not supported.") return "ssoerr" if status == 500: print("ERROR: Keycloak indicated that the server could not fullfill the request due to \"some unexpected error \".") return "ssoerr" else: # email address doesn't look like an email address print('Invalid email address supplied.') return "invalidemail" #return "success" # ok, done # register("hello", "test")