From a0b0c44afa631e31f165ec8300658566f4ad22fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Tronel?= Date: Mon, 5 Aug 2024 18:42:00 +0200 Subject: [PATCH] First almost functional version --- .gitignore | 1 + config.example.ini | 12 ++ refresh-certificate.py | 320 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 333 insertions(+) create mode 100644 .gitignore create mode 100644 config.example.ini create mode 100755 refresh-certificate.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2fa7ce7 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +config.ini diff --git a/config.example.ini b/config.example.ini new file mode 100644 index 0000000..3612409 --- /dev/null +++ b/config.example.ini @@ -0,0 +1,12 @@ +[Login] +host= +user=admin +password=XXXXXX +[CSR] +country= +state= +city= +org= +ou= +[TLS] +verify=true diff --git a/refresh-certificate.py b/refresh-certificate.py new file mode 100755 index 0000000..c96ea01 --- /dev/null +++ b/refresh-certificate.py @@ -0,0 +1,320 @@ +#!/usr/bin/env python3 + +import configparser +import argparse +import requests +from sys import exit +import logging +import coloredlogs +import json +import re +from io import StringIO +import getpass +import os +import random +import base64 + +def createNonce(length=45): + # Inspired by main.js sent by printer + validChars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~' + nonce = '' + for i in range(0, length): + pos = random.randint(0, len(validChars)-1) + nonce+= validChars[pos] + + return nonce + +def getBearer(hostname, verify, username, password): + logger = logging.getLogger(__name__) + logger.info("Retrieving base content") + + baseUrl = 'https://%s' % hostname + + try: + r = requests.get(baseUrl, verify=verify) + except Exception as e: + logger.error('Exception: %s' % e) + exit(-1) + + if r.status_code != 200: + logger.error('Imposible to retrieve base content') + exit(-1) + + content = StringIO(r.content.decode('utf8')) + + logger.info('Retrieving main javascript path.') + p = re.compile('^.*src="(?P
main[^"]+)".*$') + + found = False + nbLines = 0 + for line in content.readlines(): + nbLines+=1 + m = p.match(line) + if m != None: + found = True + main = m.group('main') + break + + if not found: + logger.error('Impossible to retrieve main path.') + exit(-1) + + mainUrl = baseUrl+'/'+main + logger.debug('Main javascript is located at %s.' % mainUrl) + + r = requests.get(mainUrl, verify=verify) + if r.status_code != 200: + logger.error('Imposible to retrieve main javascript content.') + exit(-1) + + content = StringIO(r.content.decode('utf8')) + + logger.info('Retrieving agent id') + p = re.compile('^.*It\.DEVICE_ADMIN="(?P[0-9a-f\-]+)".*$') + + found = False + nbLines = 0 + for line in content.readlines(): + nbLines+=1 + m = p.match(line) + if m != None: + found = True + agentId = m.group('agentid') + break + + if not found: + logger.error('Impossible to retrieve agent identifier.') + exit(-1) + + logger.debug('Agent identifier: %s' % agentId) + + logger.info('Retrieving redirection to login URL.') + + authUrl = baseUrl+'/cdm/oauth2/v1/authorize' + certificateManagementUrl = baseUrl+'/security/certificateManagement/certificates' + + nonce = createNonce() + params = { 'response_type':'code', 'client_id':'com.hp.cdm.client.hpEws', 'state':nonce, 'redirect_uri':certificateManagementUrl, 'scope':'com.hp.cdm.auth.alias.deviceRole.deviceAdmin'} + + r = requests.get(authUrl, params=params, allow_redirects=False, verify=verify) + if r.status_code != 302: + logger.error('Impossible to retrieve redirection to login URL') + exit(-1) + + + loginUrl = baseUrl+r.headers['Location'] + logger.debug('UR to login: %s' % loginUrl) + + logger.info('Loading login page.') + + r = requests.get(loginUrl, verify=verify) + if r.status_code != 200: + logger.error('') + exit(-1) + + adminUrl = baseUrl+'/cdm/security/v1/deviceAdminConfig' + + authenticationUrl = baseUrl+'/cdm/security/v1/authenticate' + payload = { 'agentId':agentId, 'username':username, 'password':password, 'client_id':'com.hp.cdm.client.hpEws', 'scope':'com.hp.cdm.auth.alias.deviceRole.deviceAdmin', 'grant_type':'authorization_code' , 'state':nonce } + + logger.info('Authenticating to receive authentication code.') + + r = requests.post(authenticationUrl, data=json.dumps(payload), verify=verify) + if r.status_code != 200: + logger.error('Impossible to authenticate.') + exit(-1) + + response = json.loads(r.content.decode('utf8')) + code = response['code'] + logger.debug('Authentication code: %s' % code) + + tokenUrl = baseUrl+'/cdm/oauth2/v1/token' + logger.debug('Token URL: %s' % tokenUrl) + + logger.info('Retrieving authentication bearer.') + + payload = { 'grant_type': 'authorization_code', 'code': code, 'redirect_uri': baseUrl, 'client_id': 'com.hp.cdm.client.hpEws' } + + r = requests.post(tokenUrl, data=payload, verify=verify) + if r.status_code != 200: + logger.error('Impossible to obtain Oauth bearer.') + exit(-1) + + response = json.loads(r.content.decode('utf8')) + bearer = response['access_token'] + logger.debug('Bearer: %s' % bearer) + + return bearer + +def getCSR(hostname, verify, username, password, ou, org, city, state, country, filename): + logger = logging.getLogger(__name__) + baseUrl = 'https://%s' % hostname + + bearer = getBearer(hostname, verify, username, password) + + logger.info('Retrieving CSR base configuration.') + + csrUrl = baseUrl+'/cdm/certificate/v1/certificateSigningRequest' + headers = { 'Authorization' : 'Bearer %s' % bearer} + + r = requests.get(csrUrl, headers=headers, verify=verify) + if r.status_code != 200: + logger.error('Impossible to obtain CSR info') + exit(-1) + + csrDescription = json.loads(r.content.decode('utf8')) + + logger.debug('Defaut CSR values received: %s' % csrDescription) + + csrDescription['state'] = 'processing' + csrDescription['links'][0]['hints'] = None + csrDescription['certificateAttributes'] = {'commonName':hostname, 'organization':org,'organizationalUnit':[ou],'cityOrLocality':city,'stateOrProvince':state,'countryOrRegion':country} + + logger.info('Sending CSR updated informations.') + + r = requests.patch(csrUrl, headers=headers, data=json.dumps(csrDescription), verify=verify) + + if r.status_code != 204: + logger.error('Impossible to send CSR description') + exit(-1) + + finished = False + while not finished: + logger.info('Waiting for CSR generation.') + + r = requests.get(csrUrl, headers=headers, verify=verify) + if r.status_code != 200: + logger.error('Impossible to receive CSR. Status code: %d' % r.status_code) + exit(-1) + + csr = json.loads(r.content.decode('utf8')) + logger.debug(csr) + if csr['state'] == 'idle': + finished = True + + csr = csr['certificateData'] + print(csr) + + with open(filename, 'w+') as f: + f.write(csr) + +def installCertificate(hostname, verify, username, password, filename): + logger = logging.getLogger(__name__) + baseUrl = 'https://%s' % hostname + + with open(filename, 'r') as f: + cert = f.read() + cert = cert.encode('utf8') + cert = base64.b64encode(cert) + cert = cert.decode('ascii') + + logger.info(cert) + + bearer = getBearer(hostname, verify, username, password) + + url = baseUrl+'/cdm/certificate/v1/certificates' + + # May be cert should be in base 64. + certificate = { 'version':'1.0.0', 'requestType':'installId', 'certificateFormat':'pem', 'certificateData':cert } + headers = { 'Authorization': 'Bearer %s' % bearer } + + r = requests.post(url, headers=headers, data=json.dumps(certificate), verify=verify) + if r.status_code != 200: + logger.error('Impossible to install certificate. Status code: %d.' % r.status_code) + exit(-1) + +def main(): + logger = logging.getLogger(__name__) + coloredlogs.install() + + parser = argparse.ArgumentParser() + parser.add_argument("-d", "--debug", dest='debug', action='store_true', required=False, help="Activate debug.") + parser.add_argument("-c", "--config", dest='configFileName', nargs='?', required=False, default=None, help="Configuration file.") + parser.add_argument("-u", "--user", dest='username', nargs='?', required=False, default='admin', help="Username.") + parser.add_argument("-p", "--password", dest='password', nargs='?', required=False, default=None, help="Password.") + parser.add_argument("-H", "--host", dest='hostname', nargs='?', required=False, default=None, help="Hostname.") + parser.add_argument("-v", "--verify", dest='verify', nargs='?', type=bool, required=False, default=True, help="Verify certificate validity.") + + subparsers = parser.add_subparsers(dest='command', required=True, help='command help') + + parserCreateCSR = subparsers.add_parser('csr', help='Create CSR') + parserCreateCSR.add_argument("-C", "--country", dest='country', nargs='?', required=False, default='US', help="Country.") + parserCreateCSR.add_argument("-s", "--state", dest='state', nargs='?', required=False, help="State.") + parserCreateCSR.add_argument("-c", "--city", dest='city', nargs='?', required=False, help="State.") + parserCreateCSR.add_argument("--ou", dest='organizationalUnit', nargs='?', required=False, help="Organizational Unit.") + parserCreateCSR.add_argument("--org", dest='organization', nargs='?', required=False, help="Organization.") + parserCreateCSR.add_argument("-o", "--output", dest='output', nargs='?', required=False, default=None, help="Output file.") + + parserInstalleCertificate = subparsers.add_parser('pem', help='Install certificate') + parserInstalleCertificate.add_argument("-i", "--input", dest='input', nargs='?', required=True, default=None, help="Input file.") + + args = parser.parse_args() + logger.info("Arguments: %s" % args) + + if args.configFileName != None: + try: + configFile = open(args.configFileName, 'r') + except Exception as e: + logger.info('Impossible to open configuration file. Error: %s' % e) + exit(-1) + + config = configparser.ConfigParser() + config.read_file(configFile) + + sections = config.sections() + + if 'TLS' in sections: + options = config.options('TLS') + if 'verify' in options: + args.verify = config.getboolean('TLS', 'verify') + if 'Login' in sections: + options = config.options('Login') + if 'user' in options: + args.username = config.get('Login','user') + if 'password' in options: + args.password = config.get('Login','password') + if 'host' in options: + args.hostname = config.get('Login','host') + if 'CSR' in sections: + options = config.options('CSR') + if 'org' in options: + args.organization = config.get('CSR','org') + if 'ou' in options: + args.organizationalUnit = config.get('CSR','ou') + if 'country' in options: + args.country = config.get('CSR','country') + if 'state' in options: + args.state = config.get('CSR','state') + if 'city' in options: + args.city = config.get('CSR','city') + + logger.info(args) + + if args.password == None: + logger.error('Password is mandatory') + argparse.usage() + if args.username == None: + logger.error('Username is mandatory') + argparse.usage() + if args.hostname == None: + logger.error('Hostname is mandatory') + argparse.usage() + + if args.debug: + logger.info('Setting logging to debug mode') + coloredlogs.set_level(level=logging.DEBUG) + + if args.command == 'csr': + if args.output == None: + args.output = '%s.csr' % args.hostname + + if args.command == 'csr': + getCSR(hostname=args.hostname, verify=args.verify, username=args.username, password=args.password, org=args.organization, ou=args.organizationalUnit, city=args.city, state=args.state, country=args.country, filename=args.output) + elif args.command == 'pem': + installCertificate(hostname=args.hostname, verify=args.verify, username=args.username, password=args.password, filename=args.input) + + + +if __name__ == "__main__": + main() +