#!/usr/bin/env python3 """ Module pour la gestion de certificats et la communication avec les serveurs HP CDM. Ce module fournit des fonctions pour créer des demandes de signature de certificat (CSR), installer des certificats, et communiquer avec les serveurs HP CDM. Fonctions principales : - `get_bearer` : obtenir un jeton d'accès pour la communication avec le serveur HP CDM - `get_csr` : créer une demande de signature de certificat (CSR) - `install_certificate` : installer un certificat sur un serveur HP CDM - `main` : fonction principale pour exécuter le programme Dépendances : - `argparse` pour la gestion des arguments de ligne de commande - `configparser` pour la lecture des fichiers de configuration - `getpass` pour la saisie de mots de passe - `logging` pour la gestion des messages de log - `requests` pour les requêtes HTTP - `coloredlogs` pour la coloration des messages de log Notes : - Ce module est conçu pour être utilisé dans un environnement de gestion de certificats et de communication avec les serveurs HP CDM. - Les fonctions de ce module peuvent être utilisées séparément pour des tâches spécifiques, ou ensemble pour créer un programme complet de gestion de certificats et de communication avec les serveurs HP CDM. """ import configparser import argparse import logging import sys import json import getpass import random import base64 from cryptography import x509 from cryptography.hazmat.primitives.serialization import Encoding import requests import coloredlogs def create_nonce(length: int=45): """ Generate a random nonce string. This function creates a random string of a specified length, consisting only of valid characters as defined in the URL specification (RFC 7230). The generated string can be used as a nonce, for example in HTTP requests. Code is inspired by main.js sent by printer Args: length (int, optional): The length of the nonce string. Defaults to 45. Returns: str: A random nonce string of the specified length. Notes: - The generated string only contains characters that are valid in URLs, as defined in RFC 7230. - The function uses a random number generator to select characters from the set of valid characters. """ valid_chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~' nonce = '' for _ in range(0, length): pos = random.randint(0, len(valid_chars)-1) nonce+= valid_chars[pos] return nonce def get_bearer(hostname: str, verify: bool, username: str, password:str): """ Retrieve an OAuth bearer token for authentication with a HP CDM server. This function performs the necessary steps to obtain an OAuth bearer token, including: - Retrieving the base content and redirection to the login URL - Authenticating with the provided username and password - Exchanging the authentication code for an access token Args: hostname (str): The hostname of the HP CDM server. verify (bool): Whether to verify the SSL certificate of the server. username (str): The username to use for authentication. password (str): The password to use for authentication. Returns: str: The OAuth bearer token. Notes: - This function uses the `requests` library to make HTTP requests to the server. - The function logs information and errors using the `logging` module. - If an error occurs during the authentication process, the function exits with a non-zero status code. """ logger = logging.getLogger(__name__) logger.info("Retrieving base content") base_url = f'https://{hostname}' logger.info('Retrieving redirection to login URL.') auth_url = base_url+'/cdm/oauth2/v1/authorize' certificate_management_url = base_url+'/security/certificateManagement/certificates' nonce = create_nonce() params = { 'response_type':'code', 'client_id':'com.hp.cdm.client.hpEws', 'state':nonce, 'redirect_uri':certificate_management_url, 'scope':'com.hp.cdm.auth.alias.deviceRole.deviceAdmin'} try: r = requests.get(auth_url, params=params, allow_redirects=False, verify=verify, timeout=10) except requests.exceptions.RequestException as e: logger.error('Impossible to retrieve URL: %s. Error: %s', auth_url, e) sys.exit(-1) if r.status_code != 302: logger.error('Impossible to retrieve redirection to login URL. Status code: %d', r.status_code) sys.exit(-1) login_url = base_url+r.headers['Location'] logger.debug('UR to login: %s', login_url) logger.info('Loading login page.') r = requests.get(login_url, verify=verify, timeout=10) if r.status_code != 200: logger.error('') sys.exit(-1) # admin_url = base_url+'/cdm/security/v1/deviceAdminConfig' authentication_url = base_url+'/cdm/security/v1/authenticate' # Agent ID is useless payload = { 'username':username, 'password':password, 'client_id':'com.hp.cdm.client.hpEws', 'scope':'com.hp.cdm.auth.alias.deviceRole.deviceAdmin', 'grant_type':'authorization_code' , 'state':nonce } logger.info('Authenticating to receive authentication code.') r = requests.post(authentication_url, data=json.dumps(payload), verify=verify, timeout=10) if r.status_code != 200: logger.error('Impossible to authenticate.') sys.exit(-1) response = json.loads(r.content.decode('utf8')) code = response['code'] logger.debug('Authentication code: %s', code) token_url = base_url+'/cdm/oauth2/v1/token' logger.debug('Token URL: %s', token_url) logger.info('Retrieving authentication bearer.') payload = { 'grant_type': 'authorization_code', 'code': code, 'redirect_uri': base_url, 'client_id': 'com.hp.cdm.client.hpEws' } r = requests.post(token_url, data=payload, verify=verify, timeout=10) if r.status_code != 200: logger.error('Impossible to obtain Oauth bearer.') sys.exit(-1) response = json.loads(r.content.decode('utf8')) bearer = response['access_token'] logger.debug('Bearer: %s', bearer) return bearer def get_csr(hostname: str, verify: bool, username: str, password: str, ou: str, org: str, city:str, state: str, country: str, filename: str): """ Generate a Certificate Signing Request (CSR) for a HP CDM server. This function performs the necessary steps to generate a CSR, including: - Retrieving the base configuration for the CSR - Updating the CSR description with the provided information - Sending the updated CSR description to the server - Waiting for the CSR generation to complete - Saving the generated CSR to a file Args: hostname (str): The hostname of the HP CDM server. verify (bool): Whether to verify the SSL certificate of the server. username (str): The username to use for authentication. password (str): The password to use for authentication. ou (str): The organizational unit. org (str): The organization. city (str): The city. state (str): The state. country (str): The country. filename (str): The filename to save the generated CSR to. Returns: None Notes: - This function uses the `requests` library to make HTTP requests to the server. - The function logs information and errors using the `logging` module. - If an error occurs during the CSR generation process, the function exits with a non-zero status code. """ logger = logging.getLogger(__name__) base_url = f'https://{hostname}' bearer = get_bearer(hostname, verify, username, password) logger.info('Retrieving CSR base configuration.') csr_url = base_url+'/cdm/certificate/v1/certificateSigningRequest' headers = { 'Authorization' : f'Bearer {bearer}'} r = requests.get(csr_url, headers=headers, verify=verify, timeout=10) if r.status_code != 200: logger.error('Impossible to obtain CSR info') sys.exit(-1) csr_description = json.loads(r.content.decode('utf8')) logger.debug('Defaut CSR values received: %s', csr_description) csr_description['state'] = 'processing' csr_description['links'][0]['hints'] = None csr_description['certificateAttributes'] = {'commonName':hostname, 'organization':org, 'organizationalUnit':[ou],'cityOrLocality':city, 'stateOrProvince':state,'countryOrRegion':country} logger.info('Sending CSR updated informations.') r = requests.patch(csr_url, headers=headers, data=json.dumps(csr_description), verify=verify, timeout=10) if r.status_code != 204: logger.error('Impossible to send CSR description') sys.exit(-1) finished = False while not finished: logger.info('Waiting for CSR generation.') r = requests.get(csr_url, headers=headers, verify=verify, timeout=10) if r.status_code != 200: logger.error('Impossible to receive CSR. Status code: %d', r.status_code) sys.exit(-1) csr = json.loads(r.content.decode('utf8')) logger.debug(csr) if csr['state'] == 'idle': finished = True csr = csr['certificateData'] print(csr) with open(filename, 'w+', encoding='utf-8') as f: f.write(csr) def install_certificate(hostname, verify, username, password, filename, bearer=None): """ Install a certificate on a HP CDM server. This function performs the necessary steps to install a certificate, including: - Reading the certificate from a file - Encoding the certificate in base64 - Authenticating with the server using a bearer token - Sending the certificate to the server for installation Args: hostname (str): The hostname of the HP CDM server. verify (bool): Whether to verify the SSL certificate of the server. username (str): The username to use for authentication. password (str): The password to use for authentication. filename (str): The filename of the certificate to install. bearer (str, optional): The access token for authentication. If None, the token is generated automatically. Defaults to None. Returns: None Notes: - This function uses the `requests` library to make HTTP requests to the server. - The function logs information and errors using the `logging` module. - If an error occurs during the certificate installation process, the function exits with a non-zero status code. """ logger = logging.getLogger(__name__) base_url = f'https://{hostname}' if bearer is None: bearer = get_bearer(hostname, verify, username, password) with open(filename, 'rb') as f: data = f.read() certs = x509.load_pem_x509_certificates(data) for cert in certs: print(f'Subject: {cert.subject}. Issuer: {cert.issuer}.') extensions = cert.extensions ca = False for ext in extensions: if isinstance(ext.value, x509.extensions.BasicConstraints): ca = ext.value.ca break cert = cert.public_bytes(Encoding.PEM) cert = base64.b64encode(cert) cert = cert.decode('ascii') logger.debug('Installing certificate: %s. CA: %s', cert, ca) url = base_url+'/cdm/certificate/v1/certificates' # May be cert should be in base 64 # Request type: # installId for identity certificate # importCA for CA for certificate # certificateData: Base 64 encoding of the PEM certificate itself. if ca: request_type = 'importCa' else: request_type = 'installId' certificate = { 'version':'1.1.0', 'requestType':request_type, 'certificateFormat': 'pem', 'certificateData':cert } headers = { 'Authorization': f'Bearer {bearer}' } r = requests.post(url, headers=headers, data=json.dumps(certificate), verify=verify, timeout=10) if r.status_code not in [200, 201]: logger.error('Impossible to install certificate. Status code: %d.', r.status_code) sys.exit(-1) logger.info('Certificate successfully installed.') def get_certificates(hostname, verify, username, password, bearer=None): """ Retrieve a list of certificates from an HP CDM server. This function uses the HP CDM server API to retrieve a list of certificates and returns them as a dictionary. Args: hostname (str): The hostname of the HP CDM server. verify (bool): Verify the SSL certificate validity of the server. username (str): The username for authentication. password (str): The password for authentication. bearer (str, optional): The access token for authentication. If None, the token is generated automatically. Defaults to None. Returns: dict: A dictionary where each key is a certificate index (starting from 1) and each value is the corresponding certificate. Notes: - This function uses the `requests` library to make the certificate retrieval request. - If the certificate retrieval fails, the function logs an error and exits with a non-zero status code. - If no certificates are found, the function logs an error and exits with a non-zero status code. """ logger = logging.getLogger(__name__) base_url = f'https://{hostname}' res = {} if bearer is None: bearer = get_bearer(hostname, verify, username, password) url = base_url+'/cdm/certificate/v1/certificates' headers = { 'Authorization': f'Bearer {bearer}' } r = requests.get(url, headers=headers, verify=verify, timeout=10) if r.status_code != 200: logger.error('Impossible to obtain certificates list') sys.exit(-1) certificates = json.loads(r.content.decode('utf8')) if 'certificates' not in certificates: logger.error('No certificates found') sys.exit(-1) certificates = certificates['certificates'] num = 1 for certificate in certificates: res[num] = certificate num+=1 return res def delete_certificate(hostname, verify, username, password, certificates, certid, bearer=None): """ Delete a certificate from an HP CDM server. This function uses the HP CDM server API to delete a specified certificate. Args: hostname (str): The hostname of the HP CDM server. verify (bool): Verify the SSL certificate validity of the server. username (str): The username for authentication. password (str): The password for authentication. certificates (list): The list of available certificates on the server. certid (int): The index of the certificate to delete in the list of certificates. bearer (str, optional): The access token for authentication. If None, the token is generated automatically. Defaults to None. Returns: None Notes: - This function uses the `requests` library to make the certificate deletion request. - If the certificate deletion fails, the function logs an error and exits with a non-zero status code. Raises: sys.exit(-1) if the certificate deletion fails. """ logger = logging.getLogger(__name__) base_url = f'https://{hostname}' if bearer is None: bearer = get_bearer(hostname, verify, username, password) logger.debug('Certificates: %s', certificates) cert = certificates[certid] certid = cert.get('certificateId') url = base_url+f'/cdm/certificate/v1/certificates/{certid}' headers = { 'Authorization': f'Bearer {bearer}' } r = requests.delete(url, headers=headers, verify=verify, timeout=10) if r.status_code != 200: logger.error('Impossible to delete certificate') sys.exit(-1) def main(): """ Main entry point of the program. This function parses command-line arguments, reads configuration files, and performs the necessary actions based on the provided commands. Args: None (command-line arguments are parsed using argparse) Returns: None (the program exits with a status code) Notes: - The program uses the following commands: - `csr`: create a Certificate Signing Request (CSR) - `pem`: install a certificate - The program uses the following options: - `--debug`: activate debug mode - `--config`: specify a configuration file - `--user`: specify a username - `--password`: specify a password - `--host`: specify a hostname - `--no-tls-verification`: disable TLS verification - The program logs information and errors using the `logging` module. - If an error occurs during the execution of the program, the program exits with a non-zero status code. """ logger = logging.getLogger(__name__) coloredlogs.install() parser = argparse.ArgumentParser() parser.add_argument("-d", "--debug", dest='debug', action='store_true', required=False, help="Activate debug.") parser.add_argument("-c", "--config", dest='config_filename', required=False, default=None, help="Configuration file.") parser.add_argument("-u", "--user", dest='username', required=False, default='admin', help="Username.") parser.add_argument("-p", "--password", dest='password', nargs='?', required=False, default=None, help="Password.") parser.add_argument("-H", "--host", dest='hostname', required=False, default=None, help="Hostname.") parser.add_argument("-n", "--no-tls-verification", dest='verify', default=None, action='store_const', const=False, required=False, help="Verify certificate validity.") subparsers = parser.add_subparsers(dest='command', required=True, help='command help') subparsers.add_parser('list', help='List certificates') parser_delete = subparsers.add_parser('del', help='Delete a certificate') parser_delete.add_argument("-#", "--number", dest='certid', required=True, type=int, help="Certificate number as given by list command.") parser_create_csr = subparsers.add_parser('csr', help='Create CSR') parser_create_csr.add_argument("-C", "--country", dest='country', required=False, default='US', help="Country.") parser_create_csr.add_argument("-s", "--state", dest='state', required=False, help="State.") parser_create_csr.add_argument("-c", "--city", dest='city', required=False, help="State.") parser_create_csr.add_argument("--ou", dest='organizational_unit', required=False, help="Organizational Unit.") parser_create_csr.add_argument("--org", dest='organization', required=False, help="Organization.") parser_create_csr.add_argument("-o", "--output", dest='output', required=False, default=None, help="Output file.") parser_install_certificate = subparsers.add_parser('pem', help='Install certificate') parser_install_certificate.add_argument("-i", "--input", dest='input', required=True, default=None, help="Input file.") args = parser.parse_args() if not hasattr(args, 'country'): args.country = None if not hasattr(args, 'state'): args.state = None if not hasattr(args, 'city'): args.city = None if not hasattr(args, 'organizational_unit'): args.organizational_unit = None if not hasattr(args, 'organization'): args.organization = None logger.info("Arguments: %s", args) if args.config_filename is not None: try: with open(args.config_filename, 'r', encoding='utf-8') as config_file: config = configparser.ConfigParser() config.read_file(config_file) except OSError as e: logger.info('Impossible to open configuration file. Error: %s', e) sys.exit(-1) sections = config.sections() if 'TLS' in sections: options = config.options('TLS') if 'verify' in options and args.verify is None: args.verify = config.getboolean('TLS', 'verify') if 'Login' in sections: options = config.options('Login') if 'user' in options and args.username is None: args.username = config.get('Login','user') if 'password' in options and args.password is None: args.password = config.get('Login','password') if 'host' in options and args.hostname is None: args.hostname = config.get('Login','host') if 'CSR' in sections: options = config.options('CSR') if 'org' in options and args.organization is None : args.organization = config.get('CSR','org') if 'ou' in options and args.organizational_unit is None : args.organizational_unit = config.get('CSR','ou') if 'country' in options and args.country is None: args.country = config.get('CSR','country') if 'state' in options and args.state is None: args.state = config.get('CSR','state') if 'city' in options and args.city is None: args.city = config.get('CSR','city') if args.verify is None: args.verification = False if args.username is None: logger.error('Username is mandatory') parser.print_help() sys.exit(-1) if args.hostname is None: logger.error('Hostname is mandatory') parser.print_help() sys.exit(-1) if args.password is None: args.password = getpass.getpass() if args.debug: logger.info('Setting logging to debug mode') coloredlogs.set_level(level=logging.DEBUG) logger.info("Arguments: %s", args) if args.command == 'csr': if args.output is None: args.output = f'{args.hostname}.csr' logger.info('Final arguments: %s', args) match args.command: case 'list': certs = get_certificates(hostname=args.hostname, verify=args.verify, username=args.username, password=args.password) for certid, cert in certs.items(): subject = cert.get('subject') issuer = cert.get('issuer') print(f'{certid} - {subject} issued by {issuer}.') case 'del': bearer = get_bearer(hostname=args.hostname, verify=args.verify, username=args.username, password=args.password) certs = get_certificates(hostname=args.hostname, verify=args.verify, username=args.username, password=args.password, bearer=bearer) if args.certid not in certs: logger.error('Certificate #%d does not exist', args.certid) sys.exit(-1) delete_certificate(hostname=args.hostname, verify=args.verify, username=args.username, password=args.password, certificates=certs, certid=args.certid, bearer=bearer) case 'csr': get_csr(hostname=args.hostname, verify=args.verify, username=args.username, password=args.password, org=args.organization, ou=args.organizational_unit, city=args.city, state=args.state, country=args.country, filename=args.output) case 'pem': install_certificate(hostname=args.hostname, verify=args.verify, username=args.username, password=args.password, filename=args.input) case _: logger.error('Unknown command: %s', args.command) sys.exit(-1) if __name__ == "__main__": main()