Files
HP-Laserjet-Pro-Certificate…/refresh-certificate.py
2024-08-05 18:46:10 +02:00

321 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
import configparser
import argparse
import requests
from sys import exit
import logging
import coloredlogs
import json
import re
from io import StringIO
import getpass
import os
import random
import base64
def createNonce(length=45):
# Inspired by main.js sent by printer
validChars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~'
nonce = ''
for i in range(0, length):
pos = random.randint(0, len(validChars)-1)
nonce+= validChars[pos]
return nonce
def getBearer(hostname, verify, username, password):
logger = logging.getLogger(__name__)
logger.info("Retrieving base content")
baseUrl = 'https://%s' % hostname
try:
r = requests.get(baseUrl, verify=verify)
except Exception as e:
logger.error('Exception: %s' % e)
exit(-1)
if r.status_code != 200:
logger.error('Imposible to retrieve base content')
exit(-1)
content = StringIO(r.content.decode('utf8'))
logger.info('Retrieving main javascript path.')
p = re.compile('^.*src="(?P<main>main[^"]+)".*$')
found = False
nbLines = 0
for line in content.readlines():
nbLines+=1
m = p.match(line)
if m != None:
found = True
main = m.group('main')
break
if not found:
logger.error('Impossible to retrieve main path.')
exit(-1)
mainUrl = baseUrl+'/'+main
logger.debug('Main javascript is located at %s.' % mainUrl)
r = requests.get(mainUrl, verify=verify)
if r.status_code != 200:
logger.error('Imposible to retrieve main javascript content.')
exit(-1)
content = StringIO(r.content.decode('utf8'))
logger.info('Retrieving agent id')
p = re.compile('^.*It\.DEVICE_ADMIN="(?P<agentid>[0-9a-f\-]+)".*$')
found = False
nbLines = 0
for line in content.readlines():
nbLines+=1
m = p.match(line)
if m != None:
found = True
agentId = m.group('agentid')
break
if not found:
logger.error('Impossible to retrieve agent identifier.')
exit(-1)
logger.debug('Agent identifier: %s' % agentId)
logger.info('Retrieving redirection to login URL.')
authUrl = baseUrl+'/cdm/oauth2/v1/authorize'
certificateManagementUrl = baseUrl+'/security/certificateManagement/certificates'
nonce = createNonce()
params = { 'response_type':'code', 'client_id':'com.hp.cdm.client.hpEws', 'state':nonce, 'redirect_uri':certificateManagementUrl, 'scope':'com.hp.cdm.auth.alias.deviceRole.deviceAdmin'}
r = requests.get(authUrl, params=params, allow_redirects=False, verify=verify)
if r.status_code != 302:
logger.error('Impossible to retrieve redirection to login URL')
exit(-1)
loginUrl = baseUrl+r.headers['Location']
logger.debug('UR to login: %s' % loginUrl)
logger.info('Loading login page.')
r = requests.get(loginUrl, verify=verify)
if r.status_code != 200:
logger.error('')
exit(-1)
adminUrl = baseUrl+'/cdm/security/v1/deviceAdminConfig'
authenticationUrl = baseUrl+'/cdm/security/v1/authenticate'
payload = { 'agentId':agentId, 'username':username, 'password':password, 'client_id':'com.hp.cdm.client.hpEws', 'scope':'com.hp.cdm.auth.alias.deviceRole.deviceAdmin', 'grant_type':'authorization_code' , 'state':nonce }
logger.info('Authenticating to receive authentication code.')
r = requests.post(authenticationUrl, data=json.dumps(payload), verify=verify)
if r.status_code != 200:
logger.error('Impossible to authenticate.')
exit(-1)
response = json.loads(r.content.decode('utf8'))
code = response['code']
logger.debug('Authentication code: %s' % code)
tokenUrl = baseUrl+'/cdm/oauth2/v1/token'
logger.debug('Token URL: %s' % tokenUrl)
logger.info('Retrieving authentication bearer.')
payload = { 'grant_type': 'authorization_code', 'code': code, 'redirect_uri': baseUrl, 'client_id': 'com.hp.cdm.client.hpEws' }
r = requests.post(tokenUrl, data=payload, verify=verify)
if r.status_code != 200:
logger.error('Impossible to obtain Oauth bearer.')
exit(-1)
response = json.loads(r.content.decode('utf8'))
bearer = response['access_token']
logger.debug('Bearer: %s' % bearer)
return bearer
def getCSR(hostname, verify, username, password, ou, org, city, state, country, filename):
logger = logging.getLogger(__name__)
baseUrl = 'https://%s' % hostname
bearer = getBearer(hostname, verify, username, password)
logger.info('Retrieving CSR base configuration.')
csrUrl = baseUrl+'/cdm/certificate/v1/certificateSigningRequest'
headers = { 'Authorization' : 'Bearer %s' % bearer}
r = requests.get(csrUrl, headers=headers, verify=verify)
if r.status_code != 200:
logger.error('Impossible to obtain CSR info')
exit(-1)
csrDescription = json.loads(r.content.decode('utf8'))
logger.debug('Defaut CSR values received: %s' % csrDescription)
csrDescription['state'] = 'processing'
csrDescription['links'][0]['hints'] = None
csrDescription['certificateAttributes'] = {'commonName':hostname, 'organization':org,'organizationalUnit':[ou],'cityOrLocality':city,'stateOrProvince':state,'countryOrRegion':country}
logger.info('Sending CSR updated informations.')
r = requests.patch(csrUrl, headers=headers, data=json.dumps(csrDescription), verify=verify)
if r.status_code != 204:
logger.error('Impossible to send CSR description')
exit(-1)
finished = False
while not finished:
logger.info('Waiting for CSR generation.')
r = requests.get(csrUrl, headers=headers, verify=verify)
if r.status_code != 200:
logger.error('Impossible to receive CSR. Status code: %d' % r.status_code)
exit(-1)
csr = json.loads(r.content.decode('utf8'))
logger.debug(csr)
if csr['state'] == 'idle':
finished = True
csr = csr['certificateData']
print(csr)
with open(filename, 'w+') as f:
f.write(csr)
def installCertificate(hostname, verify, username, password, filename):
logger = logging.getLogger(__name__)
baseUrl = 'https://%s' % hostname
with open(filename, 'r') as f:
cert = f.read()
cert = cert.encode('utf8')
cert = base64.b64encode(cert)
cert = cert.decode('ascii')
logger.info(cert)
bearer = getBearer(hostname, verify, username, password)
url = baseUrl+'/cdm/certificate/v1/certificates'
# May be cert should be in base 64.
certificate = { 'version':'1.0.0', 'requestType':'installId', 'certificateFormat':'pem', 'certificateData':cert }
headers = { 'Authorization': 'Bearer %s' % bearer }
r = requests.post(url, headers=headers, data=json.dumps(certificate), verify=verify)
if r.status_code != 200:
logger.error('Impossible to install certificate. Status code: %d.' % r.status_code)
exit(-1)
def main():
logger = logging.getLogger(__name__)
coloredlogs.install()
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--debug", dest='debug', action='store_true', required=False, help="Activate debug.")
parser.add_argument("-c", "--config", dest='configFileName', nargs='?', required=False, default=None, help="Configuration file.")
parser.add_argument("-u", "--user", dest='username', nargs='?', required=False, default='admin', help="Username.")
parser.add_argument("-p", "--password", dest='password', nargs='?', required=False, default=None, help="Password.")
parser.add_argument("-H", "--host", dest='hostname', nargs='?', required=False, default=None, help="Hostname.")
parser.add_argument("-v", "--verify", dest='verify', nargs='?', type=bool, required=False, default=True, help="Verify certificate validity.")
subparsers = parser.add_subparsers(dest='command', required=True, help='command help')
parserCreateCSR = subparsers.add_parser('csr', help='Create CSR')
parserCreateCSR.add_argument("-C", "--country", dest='country', nargs='?', required=False, default='US', help="Country.")
parserCreateCSR.add_argument("-s", "--state", dest='state', nargs='?', required=False, help="State.")
parserCreateCSR.add_argument("-c", "--city", dest='city', nargs='?', required=False, help="State.")
parserCreateCSR.add_argument("--ou", dest='organizationalUnit', nargs='?', required=False, help="Organizational Unit.")
parserCreateCSR.add_argument("--org", dest='organization', nargs='?', required=False, help="Organization.")
parserCreateCSR.add_argument("-o", "--output", dest='output', nargs='?', required=False, default=None, help="Output file.")
parserInstalleCertificate = subparsers.add_parser('pem', help='Install certificate')
parserInstalleCertificate.add_argument("-i", "--input", dest='input', nargs='?', required=True, default=None, help="Input file.")
args = parser.parse_args()
logger.info("Arguments: %s" % args)
if args.configFileName != None:
try:
configFile = open(args.configFileName, 'r')
except Exception as e:
logger.info('Impossible to open configuration file. Error: %s' % e)
exit(-1)
config = configparser.ConfigParser()
config.read_file(configFile)
sections = config.sections()
if 'TLS' in sections:
options = config.options('TLS')
if 'verify' in options:
args.verify = config.getboolean('TLS', 'verify')
if 'Login' in sections:
options = config.options('Login')
if 'user' in options:
args.username = config.get('Login','user')
if 'password' in options:
args.password = config.get('Login','password')
if 'host' in options:
args.hostname = config.get('Login','host')
if 'CSR' in sections:
options = config.options('CSR')
if 'org' in options:
args.organization = config.get('CSR','org')
if 'ou' in options:
args.organizationalUnit = config.get('CSR','ou')
if 'country' in options:
args.country = config.get('CSR','country')
if 'state' in options:
args.state = config.get('CSR','state')
if 'city' in options:
args.city = config.get('CSR','city')
if args.password == None:
logger.error('Password is mandatory')
argparse.usage()
if args.username == None:
logger.error('Username is mandatory')
argparse.usage()
if args.hostname == None:
logger.error('Hostname is mandatory')
argparse.usage()
if args.debug:
logger.info('Setting logging to debug mode')
coloredlogs.set_level(level=logging.DEBUG)
if args.command == 'csr':
if args.output == None:
args.output = '%s.csr' % args.hostname
logger.info('Final arguments: %s' % args)
if args.command == 'csr':
getCSR(hostname=args.hostname, verify=args.verify, username=args.username, password=args.password, org=args.organization, ou=args.organizationalUnit, city=args.city, state=args.state, country=args.country, filename=args.output)
elif args.command == 'pem':
installCertificate(hostname=args.hostname, verify=args.verify, username=args.username, password=args.password, filename=args.input)
if __name__ == "__main__":
main()