Files
HP-Laserjet-Pro-Certificate…/refresh-certificate.py

270 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
import configparser
import argparse
import requests
from sys import exit
import logging
import coloredlogs
import json
import re
from io import StringIO
import getpass
import os
import random
import base64
def createNonce(length=45):
# Inspired by main.js sent by printer
validChars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~'
nonce = ''
for i in range(0, length):
pos = random.randint(0, len(validChars)-1)
nonce+= validChars[pos]
return nonce
def getBearer(hostname, verify, username, password):
logger = logging.getLogger(__name__)
logger.info("Retrieving base content")
baseUrl = 'https://%s' % hostname
logger.info('Retrieving redirection to login URL.')
authUrl = baseUrl+'/cdm/oauth2/v1/authorize'
certificateManagementUrl = baseUrl+'/security/certificateManagement/certificates'
nonce = createNonce()
params = { 'response_type':'code', 'client_id':'com.hp.cdm.client.hpEws', 'state':nonce, 'redirect_uri':certificateManagementUrl, 'scope':'com.hp.cdm.auth.alias.deviceRole.deviceAdmin'}
try:
r = requests.get(authUrl, params=params, allow_redirects=False, verify=verify)
except Exception as e:
logger.error('Impossible to retrieve URL: %s. Error: %s' % (authUrl, e))
exit(-1)
if r.status_code != 302:
logger.error('Impossible to retrieve redirection to login URL. Status code: %d' % r.status_code)
exit(-1)
loginUrl = baseUrl+r.headers['Location']
logger.debug('UR to login: %s' % loginUrl)
logger.info('Loading login page.')
r = requests.get(loginUrl, verify=verify)
if r.status_code != 200:
logger.error('')
exit(-1)
adminUrl = baseUrl+'/cdm/security/v1/deviceAdminConfig'
authenticationUrl = baseUrl+'/cdm/security/v1/authenticate'
# Agent ID is useless
payload = { 'username':username, 'password':password, 'client_id':'com.hp.cdm.client.hpEws', 'scope':'com.hp.cdm.auth.alias.deviceRole.deviceAdmin', 'grant_type':'authorization_code' , 'state':nonce }
logger.info('Authenticating to receive authentication code.')
r = requests.post(authenticationUrl, data=json.dumps(payload), verify=verify)
if r.status_code != 200:
logger.error('Impossible to authenticate.')
exit(-1)
response = json.loads(r.content.decode('utf8'))
code = response['code']
logger.debug('Authentication code: %s' % code)
tokenUrl = baseUrl+'/cdm/oauth2/v1/token'
logger.debug('Token URL: %s' % tokenUrl)
logger.info('Retrieving authentication bearer.')
payload = { 'grant_type': 'authorization_code', 'code': code, 'redirect_uri': baseUrl, 'client_id': 'com.hp.cdm.client.hpEws' }
r = requests.post(tokenUrl, data=payload, verify=verify)
if r.status_code != 200:
logger.error('Impossible to obtain Oauth bearer.')
exit(-1)
response = json.loads(r.content.decode('utf8'))
bearer = response['access_token']
logger.debug('Bearer: %s' % bearer)
return bearer
def getCSR(hostname, verify, username, password, ou, org, city, state, country, filename):
logger = logging.getLogger(__name__)
baseUrl = 'https://%s' % hostname
bearer = getBearer(hostname, verify, username, password)
logger.info('Retrieving CSR base configuration.')
csrUrl = baseUrl+'/cdm/certificate/v1/certificateSigningRequest'
headers = { 'Authorization' : 'Bearer %s' % bearer}
r = requests.get(csrUrl, headers=headers, verify=verify)
if r.status_code != 200:
logger.error('Impossible to obtain CSR info')
exit(-1)
csrDescription = json.loads(r.content.decode('utf8'))
logger.debug('Defaut CSR values received: %s' % csrDescription)
csrDescription['state'] = 'processing'
csrDescription['links'][0]['hints'] = None
csrDescription['certificateAttributes'] = {'commonName':hostname, 'organization':org,'organizationalUnit':[ou],'cityOrLocality':city,'stateOrProvince':state,'countryOrRegion':country}
logger.info('Sending CSR updated informations.')
r = requests.patch(csrUrl, headers=headers, data=json.dumps(csrDescription), verify=verify)
if r.status_code != 204:
logger.error('Impossible to send CSR description')
exit(-1)
finished = False
while not finished:
logger.info('Waiting for CSR generation.')
r = requests.get(csrUrl, headers=headers, verify=verify)
if r.status_code != 200:
logger.error('Impossible to receive CSR. Status code: %d' % r.status_code)
exit(-1)
csr = json.loads(r.content.decode('utf8'))
logger.debug(csr)
if csr['state'] == 'idle':
finished = True
csr = csr['certificateData']
print(csr)
with open(filename, 'w+') as f:
f.write(csr)
def installCertificate(hostname, verify, username, password, filename):
logger = logging.getLogger(__name__)
baseUrl = 'https://%s' % hostname
with open(filename, 'r') as f:
cert = f.read()
cert = cert.encode('utf8')
cert = base64.b64encode(cert)
cert = cert.decode('ascii')
logger.info(cert)
bearer = getBearer(hostname, verify, username, password)
url = baseUrl+'/cdm/certificate/v1/certificates'
# May be cert should be in base 64.
certificate = { 'version':'1.0.0', 'requestType':'installId', 'certificateFormat':'pem', 'certificateData':cert }
headers = { 'Authorization': 'Bearer %s' % bearer }
r = requests.post(url, headers=headers, data=json.dumps(certificate), verify=verify)
if r.status_code != 200 or r.status_code != 201:
logger.error('Impossible to install certificate. Status code: %d.' % r.status_code)
exit(-1)
def main():
logger = logging.getLogger(__name__)
coloredlogs.install()
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--debug", dest='debug', action='store_true', required=False, help="Activate debug.")
parser.add_argument("-c", "--config", dest='configFileName', required=False, default=None, help="Configuration file.")
parser.add_argument("-u", "--user", dest='username', required=False, default='admin', help="Username.")
parser.add_argument("-p", "--password", dest='password', nargs='?', required=False, default=None, help="Password.")
parser.add_argument("-H", "--host", dest='hostname', required=False, default=None, help="Hostname.")
parser.add_argument("-n", "--no-tls-verification", dest='verify', action='store_true', required=False, help="Verify certificate validity.")
subparsers = parser.add_subparsers(dest='command', required=True, help='command help')
parserCreateCSR = subparsers.add_parser('csr', help='Create CSR')
parserCreateCSR.add_argument("-C", "--country", dest='country', required=False, default='US', help="Country.")
parserCreateCSR.add_argument("-s", "--state", dest='state', required=False, help="State.")
parserCreateCSR.add_argument("-c", "--city", dest='city', required=False, help="State.")
parserCreateCSR.add_argument("--ou", dest='organizationalUnit', required=False, help="Organizational Unit.")
parserCreateCSR.add_argument("--org", dest='organization', required=False, help="Organization.")
parserCreateCSR.add_argument("-o", "--output", dest='output', required=False, default=None, help="Output file.")
parserInstalleCertificate = subparsers.add_parser('pem', help='Install certificate')
parserInstalleCertificate.add_argument("-i", "--input", dest='input', required=True, default=None, help="Input file.")
args = parser.parse_args()
logger.info("Arguments: %s" % args)
if args.configFileName != None:
try:
configFile = open(args.configFileName, 'r')
except Exception as e:
logger.info('Impossible to open configuration file. Error: %s' % e)
exit(-1)
config = configparser.ConfigParser()
config.read_file(configFile)
sections = config.sections()
if 'TLS' in sections:
options = config.options('TLS')
if 'verify' in options:
args.verify = config.getboolean('TLS', 'verify')
if 'Login' in sections:
options = config.options('Login')
if 'user' in options:
args.username = config.get('Login','user')
if 'password' in options:
args.password = config.get('Login','password')
if 'host' in options:
args.hostname = config.get('Login','host')
if 'CSR' in sections:
options = config.options('CSR')
if 'org' in options:
args.organization = config.get('CSR','org')
if 'ou' in options:
args.organizationalUnit = config.get('CSR','ou')
if 'country' in options:
args.country = config.get('CSR','country')
if 'state' in options:
args.state = config.get('CSR','state')
if 'city' in options:
args.city = config.get('CSR','city')
if args.username == None:
logger.error('Username is mandatory')
parser.print_help()
exit(-1)
if args.hostname == None:
logger.error('Hostname is mandatory')
parser.print_help()
exit(-1)
if args.password == None:
args.password = getpass.getpass()
if args.debug:
logger.info('Setting logging to debug mode')
coloredlogs.set_level(level=logging.DEBUG)
if args.command == 'csr':
if args.output == None:
args.output = '%s.csr' % args.hostname
logger.info('Final arguments: %s' % args)
if args.command == 'csr':
getCSR(hostname=args.hostname, verify=args.verify, username=args.username, password=args.password, org=args.organization, ou=args.organizationalUnit, city=args.city, state=args.state, country=args.country, filename=args.output)
elif args.command == 'pem':
installCertificate(hostname=args.hostname, verify=args.verify, username=args.username, password=args.password, filename=args.input)
if __name__ == "__main__":
main()