Improve linting and add functions to list and remove certificates.
This commit is contained in:
@@ -1,139 +1,250 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
Module pour la gestion de certificats et la communication avec les serveurs HP CDM.
|
||||
|
||||
Ce module fournit des fonctions pour créer des demandes de signature de certificat (CSR),
|
||||
installer des certificats, et communiquer avec les serveurs HP CDM.
|
||||
|
||||
Fonctions principales :
|
||||
- `get_bearer` : obtenir un jeton d'accès pour la communication avec le serveur HP CDM
|
||||
- `get_csr` : créer une demande de signature de certificat (CSR)
|
||||
- `install_certificate` : installer un certificat sur un serveur HP CDM
|
||||
- `main` : fonction principale pour exécuter le programme
|
||||
|
||||
Dépendances :
|
||||
- `argparse` pour la gestion des arguments de ligne de commande
|
||||
- `configparser` pour la lecture des fichiers de configuration
|
||||
- `getpass` pour la saisie de mots de passe
|
||||
- `logging` pour la gestion des messages de log
|
||||
- `requests` pour les requêtes HTTP
|
||||
- `coloredlogs` pour la coloration des messages de log
|
||||
|
||||
Notes :
|
||||
- Ce module est conçu pour être utilisé dans un environnement de gestion de certificats et
|
||||
de communication avec les serveurs HP CDM.
|
||||
- Les fonctions de ce module peuvent être utilisées séparément pour des tâches spécifiques,
|
||||
ou ensemble pour créer un programme complet de gestion de certificats et de communication
|
||||
avec les serveurs HP CDM.
|
||||
"""
|
||||
|
||||
|
||||
import configparser
|
||||
import argparse
|
||||
import requests
|
||||
from sys import exit
|
||||
import logging
|
||||
import coloredlogs
|
||||
import sys
|
||||
import json
|
||||
import re
|
||||
from io import StringIO
|
||||
import getpass
|
||||
import os
|
||||
import random
|
||||
import base64
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.primitives.serialization import Encoding
|
||||
|
||||
def createNonce(length=45):
|
||||
# Inspired by main.js sent by printer
|
||||
validChars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~'
|
||||
import requests
|
||||
import coloredlogs
|
||||
|
||||
def create_nonce(length: int=45):
|
||||
"""
|
||||
Generate a random nonce string.
|
||||
|
||||
This function creates a random string of a specified length, consisting only of valid
|
||||
characters as defined in the URL specification (RFC 7230).
|
||||
The generated string can be used as a nonce, for example in HTTP requests.
|
||||
Code is inspired by main.js sent by printer
|
||||
|
||||
Args:
|
||||
length (int, optional): The length of the nonce string. Defaults to 45.
|
||||
|
||||
Returns:
|
||||
str: A random nonce string of the specified length.
|
||||
|
||||
Notes:
|
||||
- The generated string only contains characters that are valid in URLs, as defined in
|
||||
RFC 7230.
|
||||
- The function uses a random number generator to select characters from the set of valid
|
||||
characters.
|
||||
"""
|
||||
valid_chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~'
|
||||
nonce = ''
|
||||
for i in range(0, length):
|
||||
pos = random.randint(0, len(validChars)-1)
|
||||
nonce+= validChars[pos]
|
||||
for _ in range(0, length):
|
||||
pos = random.randint(0, len(valid_chars)-1)
|
||||
nonce+= valid_chars[pos]
|
||||
|
||||
return nonce
|
||||
|
||||
def getBearer(hostname, verify, username, password):
|
||||
def get_bearer(hostname: str, verify: bool, username: str, password:str):
|
||||
"""
|
||||
Retrieve an OAuth bearer token for authentication with a HP CDM server.
|
||||
|
||||
This function performs the necessary steps to obtain an OAuth bearer token, including:
|
||||
- Retrieving the base content and redirection to the login URL
|
||||
- Authenticating with the provided username and password
|
||||
- Exchanging the authentication code for an access token
|
||||
|
||||
Args:
|
||||
hostname (str): The hostname of the HP CDM server.
|
||||
verify (bool): Whether to verify the SSL certificate of the server.
|
||||
username (str): The username to use for authentication.
|
||||
password (str): The password to use for authentication.
|
||||
|
||||
Returns:
|
||||
str: The OAuth bearer token.
|
||||
|
||||
Notes:
|
||||
- This function uses the `requests` library to make HTTP requests to the server.
|
||||
- The function logs information and errors using the `logging` module.
|
||||
- If an error occurs during the authentication process, the function exits with a non-zero
|
||||
status code.
|
||||
"""
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.info("Retrieving base content")
|
||||
|
||||
baseUrl = 'https://%s' % hostname
|
||||
base_url = f'https://{hostname}'
|
||||
|
||||
logger.info('Retrieving redirection to login URL.')
|
||||
|
||||
authUrl = baseUrl+'/cdm/oauth2/v1/authorize'
|
||||
certificateManagementUrl = baseUrl+'/security/certificateManagement/certificates'
|
||||
auth_url = base_url+'/cdm/oauth2/v1/authorize'
|
||||
certificate_management_url = base_url+'/security/certificateManagement/certificates'
|
||||
|
||||
nonce = createNonce()
|
||||
params = { 'response_type':'code', 'client_id':'com.hp.cdm.client.hpEws', 'state':nonce, 'redirect_uri':certificateManagementUrl, 'scope':'com.hp.cdm.auth.alias.deviceRole.deviceAdmin'}
|
||||
nonce = create_nonce()
|
||||
params = { 'response_type':'code', 'client_id':'com.hp.cdm.client.hpEws', 'state':nonce,
|
||||
'redirect_uri':certificate_management_url,
|
||||
'scope':'com.hp.cdm.auth.alias.deviceRole.deviceAdmin'}
|
||||
|
||||
try:
|
||||
r = requests.get(authUrl, params=params, allow_redirects=False, verify=verify)
|
||||
except Exception as e:
|
||||
logger.error('Impossible to retrieve URL: %s. Error: %s' % (authUrl, e))
|
||||
exit(-1)
|
||||
r = requests.get(auth_url, params=params, allow_redirects=False, verify=verify, timeout=10)
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error('Impossible to retrieve URL: %s. Error: %s', auth_url, e)
|
||||
sys.exit(-1)
|
||||
|
||||
if r.status_code != 302:
|
||||
logger.error('Impossible to retrieve redirection to login URL. Status code: %d' % r.status_code)
|
||||
exit(-1)
|
||||
logger.error('Impossible to retrieve redirection to login URL. Status code: %d',
|
||||
r.status_code)
|
||||
sys.exit(-1)
|
||||
|
||||
|
||||
loginUrl = baseUrl+r.headers['Location']
|
||||
logger.debug('UR to login: %s' % loginUrl)
|
||||
login_url = base_url+r.headers['Location']
|
||||
logger.debug('UR to login: %s', login_url)
|
||||
|
||||
logger.info('Loading login page.')
|
||||
|
||||
r = requests.get(loginUrl, verify=verify)
|
||||
r = requests.get(login_url, verify=verify, timeout=10)
|
||||
if r.status_code != 200:
|
||||
logger.error('')
|
||||
exit(-1)
|
||||
sys.exit(-1)
|
||||
|
||||
adminUrl = baseUrl+'/cdm/security/v1/deviceAdminConfig'
|
||||
# admin_url = base_url+'/cdm/security/v1/deviceAdminConfig'
|
||||
|
||||
authenticationUrl = baseUrl+'/cdm/security/v1/authenticate'
|
||||
authentication_url = base_url+'/cdm/security/v1/authenticate'
|
||||
# Agent ID is useless
|
||||
payload = { 'username':username, 'password':password, 'client_id':'com.hp.cdm.client.hpEws', 'scope':'com.hp.cdm.auth.alias.deviceRole.deviceAdmin', 'grant_type':'authorization_code' , 'state':nonce }
|
||||
payload = { 'username':username, 'password':password, 'client_id':'com.hp.cdm.client.hpEws',
|
||||
'scope':'com.hp.cdm.auth.alias.deviceRole.deviceAdmin',
|
||||
'grant_type':'authorization_code' , 'state':nonce }
|
||||
|
||||
logger.info('Authenticating to receive authentication code.')
|
||||
|
||||
r = requests.post(authenticationUrl, data=json.dumps(payload), verify=verify)
|
||||
r = requests.post(authentication_url, data=json.dumps(payload), verify=verify, timeout=10)
|
||||
if r.status_code != 200:
|
||||
logger.error('Impossible to authenticate.')
|
||||
exit(-1)
|
||||
sys.exit(-1)
|
||||
|
||||
response = json.loads(r.content.decode('utf8'))
|
||||
code = response['code']
|
||||
logger.debug('Authentication code: %s' % code)
|
||||
logger.debug('Authentication code: %s', code)
|
||||
|
||||
tokenUrl = baseUrl+'/cdm/oauth2/v1/token'
|
||||
logger.debug('Token URL: %s' % tokenUrl)
|
||||
token_url = base_url+'/cdm/oauth2/v1/token'
|
||||
logger.debug('Token URL: %s', token_url)
|
||||
|
||||
logger.info('Retrieving authentication bearer.')
|
||||
|
||||
payload = { 'grant_type': 'authorization_code', 'code': code, 'redirect_uri': baseUrl, 'client_id': 'com.hp.cdm.client.hpEws' }
|
||||
payload = { 'grant_type': 'authorization_code', 'code': code, 'redirect_uri': base_url,
|
||||
'client_id': 'com.hp.cdm.client.hpEws' }
|
||||
|
||||
r = requests.post(tokenUrl, data=payload, verify=verify)
|
||||
r = requests.post(token_url, data=payload, verify=verify, timeout=10)
|
||||
if r.status_code != 200:
|
||||
logger.error('Impossible to obtain Oauth bearer.')
|
||||
exit(-1)
|
||||
sys.exit(-1)
|
||||
|
||||
response = json.loads(r.content.decode('utf8'))
|
||||
bearer = response['access_token']
|
||||
logger.debug('Bearer: %s' % bearer)
|
||||
logger.debug('Bearer: %s', bearer)
|
||||
|
||||
return bearer
|
||||
|
||||
def getCSR(hostname, verify, username, password, ou, org, city, state, country, filename):
|
||||
logger = logging.getLogger(__name__)
|
||||
baseUrl = 'https://%s' % hostname
|
||||
def get_csr(hostname: str, verify: bool, username: str, password: str, ou: str, org: str, city:str,
|
||||
state: str, country: str, filename: str):
|
||||
"""
|
||||
Generate a Certificate Signing Request (CSR) for a HP CDM server.
|
||||
|
||||
bearer = getBearer(hostname, verify, username, password)
|
||||
This function performs the necessary steps to generate a CSR, including:
|
||||
- Retrieving the base configuration for the CSR
|
||||
- Updating the CSR description with the provided information
|
||||
- Sending the updated CSR description to the server
|
||||
- Waiting for the CSR generation to complete
|
||||
- Saving the generated CSR to a file
|
||||
|
||||
Args:
|
||||
hostname (str): The hostname of the HP CDM server.
|
||||
verify (bool): Whether to verify the SSL certificate of the server.
|
||||
username (str): The username to use for authentication.
|
||||
password (str): The password to use for authentication.
|
||||
ou (str): The organizational unit.
|
||||
org (str): The organization.
|
||||
city (str): The city.
|
||||
state (str): The state.
|
||||
country (str): The country.
|
||||
filename (str): The filename to save the generated CSR to.
|
||||
|
||||
Returns:
|
||||
None
|
||||
|
||||
Notes:
|
||||
- This function uses the `requests` library to make HTTP requests to the server.
|
||||
- The function logs information and errors using the `logging` module.
|
||||
- If an error occurs during the CSR generation process, the function exits with a non-zero
|
||||
status code.
|
||||
"""
|
||||
logger = logging.getLogger(__name__)
|
||||
base_url = f'https://{hostname}'
|
||||
|
||||
bearer = get_bearer(hostname, verify, username, password)
|
||||
|
||||
logger.info('Retrieving CSR base configuration.')
|
||||
|
||||
csrUrl = baseUrl+'/cdm/certificate/v1/certificateSigningRequest'
|
||||
headers = { 'Authorization' : 'Bearer %s' % bearer}
|
||||
csr_url = base_url+'/cdm/certificate/v1/certificateSigningRequest'
|
||||
headers = { 'Authorization' : f'Bearer {bearer}'}
|
||||
|
||||
r = requests.get(csrUrl, headers=headers, verify=verify)
|
||||
r = requests.get(csr_url, headers=headers, verify=verify, timeout=10)
|
||||
if r.status_code != 200:
|
||||
logger.error('Impossible to obtain CSR info')
|
||||
exit(-1)
|
||||
sys.exit(-1)
|
||||
|
||||
csrDescription = json.loads(r.content.decode('utf8'))
|
||||
csr_description = json.loads(r.content.decode('utf8'))
|
||||
|
||||
logger.debug('Defaut CSR values received: %s' % csrDescription)
|
||||
logger.debug('Defaut CSR values received: %s', csr_description)
|
||||
|
||||
csrDescription['state'] = 'processing'
|
||||
csrDescription['links'][0]['hints'] = None
|
||||
csrDescription['certificateAttributes'] = {'commonName':hostname, 'organization':org,'organizationalUnit':[ou],'cityOrLocality':city,'stateOrProvince':state,'countryOrRegion':country}
|
||||
csr_description['state'] = 'processing'
|
||||
csr_description['links'][0]['hints'] = None
|
||||
csr_description['certificateAttributes'] = {'commonName':hostname, 'organization':org,
|
||||
'organizationalUnit':[ou],'cityOrLocality':city,
|
||||
'stateOrProvince':state,'countryOrRegion':country}
|
||||
|
||||
logger.info('Sending CSR updated informations.')
|
||||
|
||||
r = requests.patch(csrUrl, headers=headers, data=json.dumps(csrDescription), verify=verify)
|
||||
r = requests.patch(csr_url, headers=headers, data=json.dumps(csr_description), verify=verify,
|
||||
timeout=10)
|
||||
|
||||
if r.status_code != 204:
|
||||
logger.error('Impossible to send CSR description')
|
||||
exit(-1)
|
||||
sys.exit(-1)
|
||||
|
||||
finished = False
|
||||
while not finished:
|
||||
logger.info('Waiting for CSR generation.')
|
||||
|
||||
r = requests.get(csrUrl, headers=headers, verify=verify)
|
||||
r = requests.get(csr_url, headers=headers, verify=verify, timeout=10)
|
||||
if r.status_code != 200:
|
||||
logger.error('Impossible to receive CSR. Status code: %d' % r.status_code)
|
||||
exit(-1)
|
||||
logger.error('Impossible to receive CSR. Status code: %d', r.status_code)
|
||||
sys.exit(-1)
|
||||
|
||||
csr = json.loads(r.content.decode('utf8'))
|
||||
logger.debug(csr)
|
||||
@@ -143,127 +254,357 @@ def getCSR(hostname, verify, username, password, ou, org, city, state, country,
|
||||
csr = csr['certificateData']
|
||||
print(csr)
|
||||
|
||||
with open(filename, 'w+') as f:
|
||||
with open(filename, 'w+', encoding='utf-8') as f:
|
||||
f.write(csr)
|
||||
|
||||
def installCertificate(hostname, verify, username, password, filename):
|
||||
def install_certificate(hostname, verify, username, password, filename, bearer=None):
|
||||
"""
|
||||
Install a certificate on a HP CDM server.
|
||||
|
||||
This function performs the necessary steps to install a certificate, including:
|
||||
- Reading the certificate from a file
|
||||
- Encoding the certificate in base64
|
||||
- Authenticating with the server using a bearer token
|
||||
- Sending the certificate to the server for installation
|
||||
|
||||
Args:
|
||||
hostname (str): The hostname of the HP CDM server.
|
||||
verify (bool): Whether to verify the SSL certificate of the server.
|
||||
username (str): The username to use for authentication.
|
||||
password (str): The password to use for authentication.
|
||||
filename (str): The filename of the certificate to install.
|
||||
bearer (str, optional): The access token for authentication. If None, the token is
|
||||
generated automatically. Defaults to None.
|
||||
|
||||
Returns:
|
||||
None
|
||||
|
||||
Notes:
|
||||
- This function uses the `requests` library to make HTTP requests to the server.
|
||||
- The function logs information and errors using the `logging` module.
|
||||
- If an error occurs during the certificate installation process, the function exits with
|
||||
a non-zero status code.
|
||||
"""
|
||||
logger = logging.getLogger(__name__)
|
||||
baseUrl = 'https://%s' % hostname
|
||||
base_url = f'https://{hostname}'
|
||||
|
||||
with open(filename, 'r') as f:
|
||||
cert = f.read()
|
||||
cert = cert.encode('utf8')
|
||||
cert = base64.b64encode(cert)
|
||||
cert = cert.decode('ascii')
|
||||
if bearer is None:
|
||||
bearer = get_bearer(hostname, verify, username, password)
|
||||
|
||||
logger.info(cert)
|
||||
with open(filename, 'rb') as f:
|
||||
data = f.read()
|
||||
certs = x509.load_pem_x509_certificates(data)
|
||||
for cert in certs:
|
||||
print(f'Subject: {cert.subject}. Issuer: {cert.issuer}.')
|
||||
extensions = cert.extensions
|
||||
ca = False
|
||||
for ext in extensions:
|
||||
if isinstance(ext.value, x509.extensions.BasicConstraints):
|
||||
ca = ext.value.ca
|
||||
break
|
||||
cert = cert.public_bytes(Encoding.PEM)
|
||||
cert = base64.b64encode(cert)
|
||||
cert = cert.decode('ascii')
|
||||
|
||||
bearer = getBearer(hostname, verify, username, password)
|
||||
logger.debug('Installing certificate: %s. CA: %s', cert, ca)
|
||||
|
||||
url = baseUrl+'/cdm/certificate/v1/certificates'
|
||||
url = base_url+'/cdm/certificate/v1/certificates'
|
||||
|
||||
# May be cert should be in base 64.
|
||||
certificate = { 'version':'1.0.0', 'requestType':'installId', 'certificateFormat':'pem', 'certificateData':cert }
|
||||
headers = { 'Authorization': 'Bearer %s' % bearer }
|
||||
# May be cert should be in base 64
|
||||
# Request type:
|
||||
# installId for identity certificate
|
||||
# importCA for CA for certificate
|
||||
# certificateData: Base 64 encoding of the PEM certificate itself.
|
||||
|
||||
r = requests.post(url, headers=headers, data=json.dumps(certificate), verify=verify)
|
||||
if r.status_code != 200 or r.status_code != 201:
|
||||
logger.error('Impossible to install certificate. Status code: %d.' % r.status_code)
|
||||
exit(-1)
|
||||
if ca:
|
||||
request_type = 'importCa'
|
||||
else:
|
||||
request_type = 'installId'
|
||||
|
||||
certificate = { 'version':'1.1.0', 'requestType':request_type, 'certificateFormat':
|
||||
'pem', 'certificateData':cert }
|
||||
headers = { 'Authorization': f'Bearer {bearer}' }
|
||||
|
||||
r = requests.post(url, headers=headers, data=json.dumps(certificate), verify=verify,
|
||||
timeout=10)
|
||||
if r.status_code not in [200, 201]:
|
||||
logger.error('Impossible to install certificate. Status code: %d.', r.status_code)
|
||||
sys.exit(-1)
|
||||
|
||||
logger.info('Certificate successfully installed.')
|
||||
|
||||
def get_certificates(hostname, verify, username, password, bearer=None):
|
||||
"""
|
||||
Retrieve a list of certificates from an HP CDM server.
|
||||
|
||||
This function uses the HP CDM server API to retrieve a list of certificates and returns them
|
||||
as a dictionary.
|
||||
|
||||
Args:
|
||||
hostname (str): The hostname of the HP CDM server.
|
||||
verify (bool): Verify the SSL certificate validity of the server.
|
||||
username (str): The username for authentication.
|
||||
password (str): The password for authentication.
|
||||
bearer (str, optional): The access token for authentication. If None, the token is
|
||||
generated automatically. Defaults to None.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary where each key is a certificate index (starting from 1) and each value
|
||||
is the corresponding certificate.
|
||||
|
||||
Notes:
|
||||
- This function uses the `requests` library to make the certificate retrieval request.
|
||||
- If the certificate retrieval fails, the function logs an error and exits with a
|
||||
non-zero status code.
|
||||
- If no certificates are found, the function logs an error and exits with a non-zero
|
||||
status code.
|
||||
"""
|
||||
logger = logging.getLogger(__name__)
|
||||
base_url = f'https://{hostname}'
|
||||
res = {}
|
||||
|
||||
if bearer is None:
|
||||
bearer = get_bearer(hostname, verify, username, password)
|
||||
|
||||
url = base_url+'/cdm/certificate/v1/certificates'
|
||||
headers = { 'Authorization': f'Bearer {bearer}' }
|
||||
r = requests.get(url, headers=headers, verify=verify, timeout=10)
|
||||
if r.status_code != 200:
|
||||
logger.error('Impossible to obtain certificates list')
|
||||
sys.exit(-1)
|
||||
|
||||
certificates = json.loads(r.content.decode('utf8'))
|
||||
if 'certificates' not in certificates:
|
||||
logger.error('No certificates found')
|
||||
sys.exit(-1)
|
||||
|
||||
certificates = certificates['certificates']
|
||||
num = 1
|
||||
for certificate in certificates:
|
||||
res[num] = certificate
|
||||
num+=1
|
||||
|
||||
return res
|
||||
|
||||
def delete_certificate(hostname, verify, username, password, certificates, certid, bearer=None):
|
||||
"""
|
||||
Delete a certificate from an HP CDM server.
|
||||
|
||||
This function uses the HP CDM server API to delete a specified certificate.
|
||||
|
||||
Args:
|
||||
hostname (str): The hostname of the HP CDM server.
|
||||
verify (bool): Verify the SSL certificate validity of the server.
|
||||
username (str): The username for authentication.
|
||||
password (str): The password for authentication.
|
||||
certificates (list): The list of available certificates on the server.
|
||||
certid (int): The index of the certificate to delete in the list of certificates.
|
||||
bearer (str, optional): The access token for authentication. If None, the token is
|
||||
generated automatically. Defaults to None.
|
||||
|
||||
Returns:
|
||||
None
|
||||
|
||||
Notes:
|
||||
- This function uses the `requests` library to make the certificate deletion request.
|
||||
- If the certificate deletion fails, the function logs an error and exits with a non-zero
|
||||
status code.
|
||||
|
||||
Raises:
|
||||
sys.exit(-1) if the certificate deletion fails.
|
||||
"""
|
||||
logger = logging.getLogger(__name__)
|
||||
base_url = f'https://{hostname}'
|
||||
|
||||
if bearer is None:
|
||||
bearer = get_bearer(hostname, verify, username, password)
|
||||
|
||||
logger.debug('Certificates: %s', certificates)
|
||||
cert = certificates[certid]
|
||||
certid = cert.get('certificateId')
|
||||
url = base_url+f'/cdm/certificate/v1/certificates/{certid}'
|
||||
|
||||
headers = { 'Authorization': f'Bearer {bearer}' }
|
||||
r = requests.delete(url, headers=headers, verify=verify, timeout=10)
|
||||
if r.status_code != 200:
|
||||
logger.error('Impossible to delete certificate')
|
||||
sys.exit(-1)
|
||||
|
||||
def main():
|
||||
"""
|
||||
Main entry point of the program.
|
||||
|
||||
This function parses command-line arguments, reads configuration files, and performs the
|
||||
necessary actions based on the provided commands.
|
||||
|
||||
Args:
|
||||
None (command-line arguments are parsed using argparse)
|
||||
|
||||
Returns:
|
||||
None (the program exits with a status code)
|
||||
|
||||
Notes:
|
||||
- The program uses the following commands:
|
||||
- `csr`: create a Certificate Signing Request (CSR)
|
||||
- `pem`: install a certificate
|
||||
- The program uses the following options:
|
||||
- `--debug`: activate debug mode
|
||||
- `--config`: specify a configuration file
|
||||
- `--user`: specify a username
|
||||
- `--password`: specify a password
|
||||
- `--host`: specify a hostname
|
||||
- `--no-tls-verification`: disable TLS verification
|
||||
- The program logs information and errors using the `logging` module.
|
||||
- If an error occurs during the execution of the program, the program exits with a
|
||||
non-zero status code.
|
||||
"""
|
||||
logger = logging.getLogger(__name__)
|
||||
coloredlogs.install()
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("-d", "--debug", dest='debug', action='store_true', required=False, help="Activate debug.")
|
||||
parser.add_argument("-c", "--config", dest='configFileName', required=False, default=None, help="Configuration file.")
|
||||
parser.add_argument("-u", "--user", dest='username', required=False, default='admin', help="Username.")
|
||||
parser.add_argument("-p", "--password", dest='password', nargs='?', required=False, default=None, help="Password.")
|
||||
parser.add_argument("-H", "--host", dest='hostname', required=False, default=None, help="Hostname.")
|
||||
parser.add_argument("-n", "--no-tls-verification", dest='verify', action='store_true', required=False, help="Verify certificate validity.")
|
||||
parser.add_argument("-d", "--debug", dest='debug', action='store_true', required=False,
|
||||
help="Activate debug.")
|
||||
parser.add_argument("-c", "--config", dest='config_filename', required=False, default=None,
|
||||
help="Configuration file.")
|
||||
parser.add_argument("-u", "--user", dest='username', required=False, default='admin',
|
||||
help="Username.")
|
||||
parser.add_argument("-p", "--password", dest='password', nargs='?', required=False,
|
||||
default=None, help="Password.")
|
||||
parser.add_argument("-H", "--host", dest='hostname', required=False, default=None,
|
||||
help="Hostname.")
|
||||
parser.add_argument("-n", "--no-tls-verification", dest='verify', default=None,
|
||||
action='store_const', const=False,
|
||||
required=False, help="Verify certificate validity.")
|
||||
|
||||
subparsers = parser.add_subparsers(dest='command', required=True, help='command help')
|
||||
|
||||
parserCreateCSR = subparsers.add_parser('csr', help='Create CSR')
|
||||
parserCreateCSR.add_argument("-C", "--country", dest='country', required=False, default='US', help="Country.")
|
||||
parserCreateCSR.add_argument("-s", "--state", dest='state', required=False, help="State.")
|
||||
parserCreateCSR.add_argument("-c", "--city", dest='city', required=False, help="State.")
|
||||
parserCreateCSR.add_argument("--ou", dest='organizationalUnit', required=False, help="Organizational Unit.")
|
||||
parserCreateCSR.add_argument("--org", dest='organization', required=False, help="Organization.")
|
||||
parserCreateCSR.add_argument("-o", "--output", dest='output', required=False, default=None, help="Output file.")
|
||||
subparsers.add_parser('list', help='List certificates')
|
||||
|
||||
parserInstalleCertificate = subparsers.add_parser('pem', help='Install certificate')
|
||||
parserInstalleCertificate.add_argument("-i", "--input", dest='input', required=True, default=None, help="Input file.")
|
||||
parser_delete = subparsers.add_parser('del', help='Delete a certificate')
|
||||
parser_delete.add_argument("-#", "--number", dest='certid', required=True, type=int,
|
||||
help="Certificate number as given by list command.")
|
||||
|
||||
parser_create_csr = subparsers.add_parser('csr', help='Create CSR')
|
||||
parser_create_csr.add_argument("-C", "--country", dest='country', required=False, default='US',
|
||||
help="Country.")
|
||||
parser_create_csr.add_argument("-s", "--state", dest='state', required=False, help="State.")
|
||||
parser_create_csr.add_argument("-c", "--city", dest='city', required=False, help="State.")
|
||||
parser_create_csr.add_argument("--ou", dest='organizational_unit', required=False,
|
||||
help="Organizational Unit.")
|
||||
parser_create_csr.add_argument("--org", dest='organization', required=False,
|
||||
help="Organization.")
|
||||
parser_create_csr.add_argument("-o", "--output", dest='output', required=False, default=None,
|
||||
help="Output file.")
|
||||
|
||||
parser_install_certificate = subparsers.add_parser('pem', help='Install certificate')
|
||||
parser_install_certificate.add_argument("-i", "--input", dest='input', required=True,
|
||||
default=None, help="Input file.")
|
||||
|
||||
args = parser.parse_args()
|
||||
logger.info("Arguments: %s" % args)
|
||||
if not hasattr(args, 'country'):
|
||||
args.country = None
|
||||
if not hasattr(args, 'state'):
|
||||
args.state = None
|
||||
if not hasattr(args, 'city'):
|
||||
args.city = None
|
||||
if not hasattr(args, 'organizational_unit'):
|
||||
args.organizational_unit = None
|
||||
if not hasattr(args, 'organization'):
|
||||
args.organization = None
|
||||
|
||||
if args.configFileName != None:
|
||||
logger.info("Arguments: %s", args)
|
||||
|
||||
if args.config_filename is not None:
|
||||
try:
|
||||
configFile = open(args.configFileName, 'r')
|
||||
except Exception as e:
|
||||
logger.info('Impossible to open configuration file. Error: %s' % e)
|
||||
exit(-1)
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read_file(configFile)
|
||||
with open(args.config_filename, 'r', encoding='utf-8') as config_file:
|
||||
config = configparser.ConfigParser()
|
||||
config.read_file(config_file)
|
||||
except OSError as e:
|
||||
logger.info('Impossible to open configuration file. Error: %s', e)
|
||||
sys.exit(-1)
|
||||
|
||||
sections = config.sections()
|
||||
|
||||
if 'TLS' in sections:
|
||||
options = config.options('TLS')
|
||||
if 'verify' in options:
|
||||
if 'verify' in options and args.verify is None:
|
||||
args.verify = config.getboolean('TLS', 'verify')
|
||||
if 'Login' in sections:
|
||||
options = config.options('Login')
|
||||
if 'user' in options:
|
||||
if 'user' in options and args.username is None:
|
||||
args.username = config.get('Login','user')
|
||||
if 'password' in options:
|
||||
if 'password' in options and args.password is None:
|
||||
args.password = config.get('Login','password')
|
||||
if 'host' in options:
|
||||
if 'host' in options and args.hostname is None:
|
||||
args.hostname = config.get('Login','host')
|
||||
if 'CSR' in sections:
|
||||
options = config.options('CSR')
|
||||
if 'org' in options:
|
||||
if 'org' in options and args.organization is None :
|
||||
args.organization = config.get('CSR','org')
|
||||
if 'ou' in options:
|
||||
args.organizationalUnit = config.get('CSR','ou')
|
||||
if 'country' in options:
|
||||
if 'ou' in options and args.organizational_unit is None :
|
||||
args.organizational_unit = config.get('CSR','ou')
|
||||
if 'country' in options and args.country is None:
|
||||
args.country = config.get('CSR','country')
|
||||
if 'state' in options:
|
||||
if 'state' in options and args.state is None:
|
||||
args.state = config.get('CSR','state')
|
||||
if 'city' in options:
|
||||
if 'city' in options and args.city is None:
|
||||
args.city = config.get('CSR','city')
|
||||
|
||||
if args.username == None:
|
||||
if args.verify is None:
|
||||
args.verification = False
|
||||
if args.username is None:
|
||||
logger.error('Username is mandatory')
|
||||
parser.print_help()
|
||||
exit(-1)
|
||||
if args.hostname == None:
|
||||
sys.exit(-1)
|
||||
if args.hostname is None:
|
||||
logger.error('Hostname is mandatory')
|
||||
parser.print_help()
|
||||
exit(-1)
|
||||
if args.password == None:
|
||||
sys.exit(-1)
|
||||
if args.password is None:
|
||||
args.password = getpass.getpass()
|
||||
|
||||
if args.debug:
|
||||
logger.info('Setting logging to debug mode')
|
||||
coloredlogs.set_level(level=logging.DEBUG)
|
||||
logger.info("Arguments: %s", args)
|
||||
|
||||
if args.command == 'csr':
|
||||
if args.output == None:
|
||||
args.output = '%s.csr' % args.hostname
|
||||
|
||||
logger.info('Final arguments: %s' % args)
|
||||
|
||||
if args.command == 'csr':
|
||||
getCSR(hostname=args.hostname, verify=args.verify, username=args.username, password=args.password, org=args.organization, ou=args.organizationalUnit, city=args.city, state=args.state, country=args.country, filename=args.output)
|
||||
elif args.command == 'pem':
|
||||
installCertificate(hostname=args.hostname, verify=args.verify, username=args.username, password=args.password, filename=args.input)
|
||||
if args.output is None:
|
||||
args.output = f'{args.hostname}.csr'
|
||||
|
||||
logger.info('Final arguments: %s', args)
|
||||
|
||||
match args.command:
|
||||
case 'list':
|
||||
certs = get_certificates(hostname=args.hostname, verify=args.verify,
|
||||
username=args.username,
|
||||
password=args.password)
|
||||
for certid, cert in certs.items():
|
||||
subject = cert.get('subject')
|
||||
issuer = cert.get('issuer')
|
||||
print(f'{certid} - {subject} issued by {issuer}.')
|
||||
case 'del':
|
||||
bearer = get_bearer(hostname=args.hostname, verify=args.verify, username=args.username,
|
||||
password=args.password)
|
||||
certs = get_certificates(hostname=args.hostname, verify=args.verify,
|
||||
username=args.username,
|
||||
password=args.password, bearer=bearer)
|
||||
if args.certid not in certs:
|
||||
logger.error('Certificate #%d does not exist', args.certid)
|
||||
sys.exit(-1)
|
||||
delete_certificate(hostname=args.hostname, verify=args.verify, username=args.username,
|
||||
password=args.password, certificates=certs, certid=args.certid,
|
||||
bearer=bearer)
|
||||
case 'csr':
|
||||
get_csr(hostname=args.hostname, verify=args.verify, username=args.username,
|
||||
password=args.password, org=args.organization, ou=args.organizational_unit,
|
||||
city=args.city, state=args.state, country=args.country, filename=args.output)
|
||||
case 'pem':
|
||||
install_certificate(hostname=args.hostname, verify=args.verify, username=args.username,
|
||||
password=args.password, filename=args.input)
|
||||
case _:
|
||||
logger.error('Unknown command: %s', args.command)
|
||||
sys.exit(-1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user