269 lines
10 KiB
Python
Executable File
269 lines
10 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import configparser
|
|
import argparse
|
|
import requests
|
|
from sys import exit
|
|
import logging
|
|
import coloredlogs
|
|
import json
|
|
import re
|
|
from io import StringIO
|
|
import getpass
|
|
import os
|
|
import random
|
|
import base64
|
|
|
|
def createNonce(length=45):
|
|
# Inspired by main.js sent by printer
|
|
validChars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~'
|
|
nonce = ''
|
|
for i in range(0, length):
|
|
pos = random.randint(0, len(validChars)-1)
|
|
nonce+= validChars[pos]
|
|
|
|
return nonce
|
|
|
|
def getBearer(hostname, verify, username, password):
|
|
logger = logging.getLogger(__name__)
|
|
logger.info("Retrieving base content")
|
|
|
|
baseUrl = 'https://%s' % hostname
|
|
|
|
logger.info('Retrieving redirection to login URL.')
|
|
|
|
authUrl = baseUrl+'/cdm/oauth2/v1/authorize'
|
|
certificateManagementUrl = baseUrl+'/security/certificateManagement/certificates'
|
|
|
|
nonce = createNonce()
|
|
params = { 'response_type':'code', 'client_id':'com.hp.cdm.client.hpEws', 'state':nonce, 'redirect_uri':certificateManagementUrl, 'scope':'com.hp.cdm.auth.alias.deviceRole.deviceAdmin'}
|
|
|
|
try:
|
|
r = requests.get(authUrl, params=params, allow_redirects=False, verify=verify)
|
|
except Exception as e:
|
|
logger.error('Impossible to retrieve URL: %s. Error: %s' % (authUrl, e))
|
|
exit(-1)
|
|
|
|
if r.status_code != 302:
|
|
logger.error('Impossible to retrieve redirection to login URL. Status code: %d' % r.status_code)
|
|
exit(-1)
|
|
|
|
|
|
loginUrl = baseUrl+r.headers['Location']
|
|
logger.debug('UR to login: %s' % loginUrl)
|
|
|
|
logger.info('Loading login page.')
|
|
|
|
r = requests.get(loginUrl, verify=verify)
|
|
if r.status_code != 200:
|
|
logger.error('')
|
|
exit(-1)
|
|
|
|
adminUrl = baseUrl+'/cdm/security/v1/deviceAdminConfig'
|
|
|
|
authenticationUrl = baseUrl+'/cdm/security/v1/authenticate'
|
|
# Agent ID is useless
|
|
payload = { 'username':username, 'password':password, 'client_id':'com.hp.cdm.client.hpEws', 'scope':'com.hp.cdm.auth.alias.deviceRole.deviceAdmin', 'grant_type':'authorization_code' , 'state':nonce }
|
|
|
|
logger.info('Authenticating to receive authentication code.')
|
|
|
|
r = requests.post(authenticationUrl, data=json.dumps(payload), verify=verify)
|
|
if r.status_code != 200:
|
|
logger.error('Impossible to authenticate.')
|
|
exit(-1)
|
|
|
|
response = json.loads(r.content.decode('utf8'))
|
|
code = response['code']
|
|
logger.debug('Authentication code: %s' % code)
|
|
|
|
tokenUrl = baseUrl+'/cdm/oauth2/v1/token'
|
|
logger.debug('Token URL: %s' % tokenUrl)
|
|
|
|
logger.info('Retrieving authentication bearer.')
|
|
|
|
payload = { 'grant_type': 'authorization_code', 'code': code, 'redirect_uri': baseUrl, 'client_id': 'com.hp.cdm.client.hpEws' }
|
|
|
|
r = requests.post(tokenUrl, data=payload, verify=verify)
|
|
if r.status_code != 200:
|
|
logger.error('Impossible to obtain Oauth bearer.')
|
|
exit(-1)
|
|
|
|
response = json.loads(r.content.decode('utf8'))
|
|
bearer = response['access_token']
|
|
logger.debug('Bearer: %s' % bearer)
|
|
|
|
return bearer
|
|
|
|
def getCSR(hostname, verify, username, password, ou, org, city, state, country, filename):
|
|
logger = logging.getLogger(__name__)
|
|
baseUrl = 'https://%s' % hostname
|
|
|
|
bearer = getBearer(hostname, verify, username, password)
|
|
|
|
logger.info('Retrieving CSR base configuration.')
|
|
|
|
csrUrl = baseUrl+'/cdm/certificate/v1/certificateSigningRequest'
|
|
headers = { 'Authorization' : 'Bearer %s' % bearer}
|
|
|
|
r = requests.get(csrUrl, headers=headers, verify=verify)
|
|
if r.status_code != 200:
|
|
logger.error('Impossible to obtain CSR info')
|
|
exit(-1)
|
|
|
|
csrDescription = json.loads(r.content.decode('utf8'))
|
|
|
|
logger.debug('Defaut CSR values received: %s' % csrDescription)
|
|
|
|
csrDescription['state'] = 'processing'
|
|
csrDescription['links'][0]['hints'] = None
|
|
csrDescription['certificateAttributes'] = {'commonName':hostname, 'organization':org,'organizationalUnit':[ou],'cityOrLocality':city,'stateOrProvince':state,'countryOrRegion':country}
|
|
|
|
logger.info('Sending CSR updated informations.')
|
|
|
|
r = requests.patch(csrUrl, headers=headers, data=json.dumps(csrDescription), verify=verify)
|
|
|
|
if r.status_code != 204:
|
|
logger.error('Impossible to send CSR description')
|
|
exit(-1)
|
|
|
|
finished = False
|
|
while not finished:
|
|
logger.info('Waiting for CSR generation.')
|
|
|
|
r = requests.get(csrUrl, headers=headers, verify=verify)
|
|
if r.status_code != 200:
|
|
logger.error('Impossible to receive CSR. Status code: %d' % r.status_code)
|
|
exit(-1)
|
|
|
|
csr = json.loads(r.content.decode('utf8'))
|
|
logger.debug(csr)
|
|
if csr['state'] == 'idle':
|
|
finished = True
|
|
|
|
csr = csr['certificateData']
|
|
print(csr)
|
|
|
|
with open(filename, 'w+') as f:
|
|
f.write(csr)
|
|
|
|
def installCertificate(hostname, verify, username, password, filename):
|
|
logger = logging.getLogger(__name__)
|
|
baseUrl = 'https://%s' % hostname
|
|
|
|
with open(filename, 'r') as f:
|
|
cert = f.read()
|
|
cert = cert.encode('utf8')
|
|
cert = base64.b64encode(cert)
|
|
cert = cert.decode('ascii')
|
|
|
|
logger.info(cert)
|
|
|
|
bearer = getBearer(hostname, verify, username, password)
|
|
|
|
url = baseUrl+'/cdm/certificate/v1/certificates'
|
|
|
|
# May be cert should be in base 64.
|
|
certificate = { 'version':'1.0.0', 'requestType':'installId', 'certificateFormat':'pem', 'certificateData':cert }
|
|
headers = { 'Authorization': 'Bearer %s' % bearer }
|
|
|
|
r = requests.post(url, headers=headers, data=json.dumps(certificate), verify=verify)
|
|
if r.status_code != 200:
|
|
logger.error('Impossible to install certificate. Status code: %d.' % r.status_code)
|
|
exit(-1)
|
|
|
|
def main():
|
|
logger = logging.getLogger(__name__)
|
|
coloredlogs.install()
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("-d", "--debug", dest='debug', action='store_true', required=False, help="Activate debug.")
|
|
parser.add_argument("-c", "--config", dest='configFileName', nargs='?', required=False, default=None, help="Configuration file.")
|
|
parser.add_argument("-u", "--user", dest='username', nargs='?', required=False, default='admin', help="Username.")
|
|
parser.add_argument("-p", "--password", dest='password', nargs='?', required=False, default=None, help="Password.")
|
|
parser.add_argument("-H", "--host", dest='hostname', nargs='?', required=False, default=None, help="Hostname.")
|
|
parser.add_argument("-v", "--verify", dest='verify', nargs='?', type=bool, required=False, default=True, help="Verify certificate validity.")
|
|
|
|
subparsers = parser.add_subparsers(dest='command', required=True, help='command help')
|
|
|
|
parserCreateCSR = subparsers.add_parser('csr', help='Create CSR')
|
|
parserCreateCSR.add_argument("-C", "--country", dest='country', nargs='?', required=False, default='US', help="Country.")
|
|
parserCreateCSR.add_argument("-s", "--state", dest='state', nargs='?', required=False, help="State.")
|
|
parserCreateCSR.add_argument("-c", "--city", dest='city', nargs='?', required=False, help="State.")
|
|
parserCreateCSR.add_argument("--ou", dest='organizationalUnit', nargs='?', required=False, help="Organizational Unit.")
|
|
parserCreateCSR.add_argument("--org", dest='organization', nargs='?', required=False, help="Organization.")
|
|
parserCreateCSR.add_argument("-o", "--output", dest='output', nargs='?', required=False, default=None, help="Output file.")
|
|
|
|
parserInstalleCertificate = subparsers.add_parser('pem', help='Install certificate')
|
|
parserInstalleCertificate.add_argument("-i", "--input", dest='input', nargs='?', required=True, default=None, help="Input file.")
|
|
|
|
args = parser.parse_args()
|
|
logger.info("Arguments: %s" % args)
|
|
|
|
if args.configFileName != None:
|
|
try:
|
|
configFile = open(args.configFileName, 'r')
|
|
except Exception as e:
|
|
logger.info('Impossible to open configuration file. Error: %s' % e)
|
|
exit(-1)
|
|
|
|
config = configparser.ConfigParser()
|
|
config.read_file(configFile)
|
|
|
|
sections = config.sections()
|
|
|
|
if 'TLS' in sections:
|
|
options = config.options('TLS')
|
|
if 'verify' in options:
|
|
args.verify = config.getboolean('TLS', 'verify')
|
|
if 'Login' in sections:
|
|
options = config.options('Login')
|
|
if 'user' in options:
|
|
args.username = config.get('Login','user')
|
|
if 'password' in options:
|
|
args.password = config.get('Login','password')
|
|
if 'host' in options:
|
|
args.hostname = config.get('Login','host')
|
|
if 'CSR' in sections:
|
|
options = config.options('CSR')
|
|
if 'org' in options:
|
|
args.organization = config.get('CSR','org')
|
|
if 'ou' in options:
|
|
args.organizationalUnit = config.get('CSR','ou')
|
|
if 'country' in options:
|
|
args.country = config.get('CSR','country')
|
|
if 'state' in options:
|
|
args.state = config.get('CSR','state')
|
|
if 'city' in options:
|
|
args.city = config.get('CSR','city')
|
|
|
|
if args.password == None:
|
|
logger.error('Password is mandatory')
|
|
argparse.usage()
|
|
if args.username == None:
|
|
logger.error('Username is mandatory')
|
|
argparse.usage()
|
|
if args.hostname == None:
|
|
logger.error('Hostname is mandatory')
|
|
argparse.usage()
|
|
|
|
if args.debug:
|
|
logger.info('Setting logging to debug mode')
|
|
coloredlogs.set_level(level=logging.DEBUG)
|
|
|
|
if args.command == 'csr':
|
|
if args.output == None:
|
|
args.output = '%s.csr' % args.hostname
|
|
|
|
logger.info('Final arguments: %s' % args)
|
|
|
|
if args.command == 'csr':
|
|
getCSR(hostname=args.hostname, verify=args.verify, username=args.username, password=args.password, org=args.organization, ou=args.organizationalUnit, city=args.city, state=args.state, country=args.country, filename=args.output)
|
|
elif args.command == 'pem':
|
|
installCertificate(hostname=args.hostname, verify=args.verify, username=args.username, password=args.password, filename=args.input)
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|