First almost functional version
This commit is contained in:
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
config.ini
|
||||
12
config.example.ini
Normal file
12
config.example.ini
Normal file
@@ -0,0 +1,12 @@
|
||||
[Login]
|
||||
host=
|
||||
user=admin
|
||||
password=XXXXXX
|
||||
[CSR]
|
||||
country=
|
||||
state=
|
||||
city=
|
||||
org=
|
||||
ou=
|
||||
[TLS]
|
||||
verify=true
|
||||
320
refresh-certificate.py
Executable file
320
refresh-certificate.py
Executable file
@@ -0,0 +1,320 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import configparser
|
||||
import argparse
|
||||
import requests
|
||||
from sys import exit
|
||||
import logging
|
||||
import coloredlogs
|
||||
import json
|
||||
import re
|
||||
from io import StringIO
|
||||
import getpass
|
||||
import os
|
||||
import random
|
||||
import base64
|
||||
|
||||
def createNonce(length=45):
|
||||
# Inspired by main.js sent by printer
|
||||
validChars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~'
|
||||
nonce = ''
|
||||
for i in range(0, length):
|
||||
pos = random.randint(0, len(validChars)-1)
|
||||
nonce+= validChars[pos]
|
||||
|
||||
return nonce
|
||||
|
||||
def getBearer(hostname, verify, username, password):
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.info("Retrieving base content")
|
||||
|
||||
baseUrl = 'https://%s' % hostname
|
||||
|
||||
try:
|
||||
r = requests.get(baseUrl, verify=verify)
|
||||
except Exception as e:
|
||||
logger.error('Exception: %s' % e)
|
||||
exit(-1)
|
||||
|
||||
if r.status_code != 200:
|
||||
logger.error('Imposible to retrieve base content')
|
||||
exit(-1)
|
||||
|
||||
content = StringIO(r.content.decode('utf8'))
|
||||
|
||||
logger.info('Retrieving main javascript path.')
|
||||
p = re.compile('^.*src="(?P<main>main[^"]+)".*$')
|
||||
|
||||
found = False
|
||||
nbLines = 0
|
||||
for line in content.readlines():
|
||||
nbLines+=1
|
||||
m = p.match(line)
|
||||
if m != None:
|
||||
found = True
|
||||
main = m.group('main')
|
||||
break
|
||||
|
||||
if not found:
|
||||
logger.error('Impossible to retrieve main path.')
|
||||
exit(-1)
|
||||
|
||||
mainUrl = baseUrl+'/'+main
|
||||
logger.debug('Main javascript is located at %s.' % mainUrl)
|
||||
|
||||
r = requests.get(mainUrl, verify=verify)
|
||||
if r.status_code != 200:
|
||||
logger.error('Imposible to retrieve main javascript content.')
|
||||
exit(-1)
|
||||
|
||||
content = StringIO(r.content.decode('utf8'))
|
||||
|
||||
logger.info('Retrieving agent id')
|
||||
p = re.compile('^.*It\.DEVICE_ADMIN="(?P<agentid>[0-9a-f\-]+)".*$')
|
||||
|
||||
found = False
|
||||
nbLines = 0
|
||||
for line in content.readlines():
|
||||
nbLines+=1
|
||||
m = p.match(line)
|
||||
if m != None:
|
||||
found = True
|
||||
agentId = m.group('agentid')
|
||||
break
|
||||
|
||||
if not found:
|
||||
logger.error('Impossible to retrieve agent identifier.')
|
||||
exit(-1)
|
||||
|
||||
logger.debug('Agent identifier: %s' % agentId)
|
||||
|
||||
logger.info('Retrieving redirection to login URL.')
|
||||
|
||||
authUrl = baseUrl+'/cdm/oauth2/v1/authorize'
|
||||
certificateManagementUrl = baseUrl+'/security/certificateManagement/certificates'
|
||||
|
||||
nonce = createNonce()
|
||||
params = { 'response_type':'code', 'client_id':'com.hp.cdm.client.hpEws', 'state':nonce, 'redirect_uri':certificateManagementUrl, 'scope':'com.hp.cdm.auth.alias.deviceRole.deviceAdmin'}
|
||||
|
||||
r = requests.get(authUrl, params=params, allow_redirects=False, verify=verify)
|
||||
if r.status_code != 302:
|
||||
logger.error('Impossible to retrieve redirection to login URL')
|
||||
exit(-1)
|
||||
|
||||
|
||||
loginUrl = baseUrl+r.headers['Location']
|
||||
logger.debug('UR to login: %s' % loginUrl)
|
||||
|
||||
logger.info('Loading login page.')
|
||||
|
||||
r = requests.get(loginUrl, verify=verify)
|
||||
if r.status_code != 200:
|
||||
logger.error('')
|
||||
exit(-1)
|
||||
|
||||
adminUrl = baseUrl+'/cdm/security/v1/deviceAdminConfig'
|
||||
|
||||
authenticationUrl = baseUrl+'/cdm/security/v1/authenticate'
|
||||
payload = { 'agentId':agentId, 'username':username, 'password':password, 'client_id':'com.hp.cdm.client.hpEws', 'scope':'com.hp.cdm.auth.alias.deviceRole.deviceAdmin', 'grant_type':'authorization_code' , 'state':nonce }
|
||||
|
||||
logger.info('Authenticating to receive authentication code.')
|
||||
|
||||
r = requests.post(authenticationUrl, data=json.dumps(payload), verify=verify)
|
||||
if r.status_code != 200:
|
||||
logger.error('Impossible to authenticate.')
|
||||
exit(-1)
|
||||
|
||||
response = json.loads(r.content.decode('utf8'))
|
||||
code = response['code']
|
||||
logger.debug('Authentication code: %s' % code)
|
||||
|
||||
tokenUrl = baseUrl+'/cdm/oauth2/v1/token'
|
||||
logger.debug('Token URL: %s' % tokenUrl)
|
||||
|
||||
logger.info('Retrieving authentication bearer.')
|
||||
|
||||
payload = { 'grant_type': 'authorization_code', 'code': code, 'redirect_uri': baseUrl, 'client_id': 'com.hp.cdm.client.hpEws' }
|
||||
|
||||
r = requests.post(tokenUrl, data=payload, verify=verify)
|
||||
if r.status_code != 200:
|
||||
logger.error('Impossible to obtain Oauth bearer.')
|
||||
exit(-1)
|
||||
|
||||
response = json.loads(r.content.decode('utf8'))
|
||||
bearer = response['access_token']
|
||||
logger.debug('Bearer: %s' % bearer)
|
||||
|
||||
return bearer
|
||||
|
||||
def getCSR(hostname, verify, username, password, ou, org, city, state, country, filename):
|
||||
logger = logging.getLogger(__name__)
|
||||
baseUrl = 'https://%s' % hostname
|
||||
|
||||
bearer = getBearer(hostname, verify, username, password)
|
||||
|
||||
logger.info('Retrieving CSR base configuration.')
|
||||
|
||||
csrUrl = baseUrl+'/cdm/certificate/v1/certificateSigningRequest'
|
||||
headers = { 'Authorization' : 'Bearer %s' % bearer}
|
||||
|
||||
r = requests.get(csrUrl, headers=headers, verify=verify)
|
||||
if r.status_code != 200:
|
||||
logger.error('Impossible to obtain CSR info')
|
||||
exit(-1)
|
||||
|
||||
csrDescription = json.loads(r.content.decode('utf8'))
|
||||
|
||||
logger.debug('Defaut CSR values received: %s' % csrDescription)
|
||||
|
||||
csrDescription['state'] = 'processing'
|
||||
csrDescription['links'][0]['hints'] = None
|
||||
csrDescription['certificateAttributes'] = {'commonName':hostname, 'organization':org,'organizationalUnit':[ou],'cityOrLocality':city,'stateOrProvince':state,'countryOrRegion':country}
|
||||
|
||||
logger.info('Sending CSR updated informations.')
|
||||
|
||||
r = requests.patch(csrUrl, headers=headers, data=json.dumps(csrDescription), verify=verify)
|
||||
|
||||
if r.status_code != 204:
|
||||
logger.error('Impossible to send CSR description')
|
||||
exit(-1)
|
||||
|
||||
finished = False
|
||||
while not finished:
|
||||
logger.info('Waiting for CSR generation.')
|
||||
|
||||
r = requests.get(csrUrl, headers=headers, verify=verify)
|
||||
if r.status_code != 200:
|
||||
logger.error('Impossible to receive CSR. Status code: %d' % r.status_code)
|
||||
exit(-1)
|
||||
|
||||
csr = json.loads(r.content.decode('utf8'))
|
||||
logger.debug(csr)
|
||||
if csr['state'] == 'idle':
|
||||
finished = True
|
||||
|
||||
csr = csr['certificateData']
|
||||
print(csr)
|
||||
|
||||
with open(filename, 'w+') as f:
|
||||
f.write(csr)
|
||||
|
||||
def installCertificate(hostname, verify, username, password, filename):
|
||||
logger = logging.getLogger(__name__)
|
||||
baseUrl = 'https://%s' % hostname
|
||||
|
||||
with open(filename, 'r') as f:
|
||||
cert = f.read()
|
||||
cert = cert.encode('utf8')
|
||||
cert = base64.b64encode(cert)
|
||||
cert = cert.decode('ascii')
|
||||
|
||||
logger.info(cert)
|
||||
|
||||
bearer = getBearer(hostname, verify, username, password)
|
||||
|
||||
url = baseUrl+'/cdm/certificate/v1/certificates'
|
||||
|
||||
# May be cert should be in base 64.
|
||||
certificate = { 'version':'1.0.0', 'requestType':'installId', 'certificateFormat':'pem', 'certificateData':cert }
|
||||
headers = { 'Authorization': 'Bearer %s' % bearer }
|
||||
|
||||
r = requests.post(url, headers=headers, data=json.dumps(certificate), verify=verify)
|
||||
if r.status_code != 200:
|
||||
logger.error('Impossible to install certificate. Status code: %d.' % r.status_code)
|
||||
exit(-1)
|
||||
|
||||
def main():
|
||||
logger = logging.getLogger(__name__)
|
||||
coloredlogs.install()
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("-d", "--debug", dest='debug', action='store_true', required=False, help="Activate debug.")
|
||||
parser.add_argument("-c", "--config", dest='configFileName', nargs='?', required=False, default=None, help="Configuration file.")
|
||||
parser.add_argument("-u", "--user", dest='username', nargs='?', required=False, default='admin', help="Username.")
|
||||
parser.add_argument("-p", "--password", dest='password', nargs='?', required=False, default=None, help="Password.")
|
||||
parser.add_argument("-H", "--host", dest='hostname', nargs='?', required=False, default=None, help="Hostname.")
|
||||
parser.add_argument("-v", "--verify", dest='verify', nargs='?', type=bool, required=False, default=True, help="Verify certificate validity.")
|
||||
|
||||
subparsers = parser.add_subparsers(dest='command', required=True, help='command help')
|
||||
|
||||
parserCreateCSR = subparsers.add_parser('csr', help='Create CSR')
|
||||
parserCreateCSR.add_argument("-C", "--country", dest='country', nargs='?', required=False, default='US', help="Country.")
|
||||
parserCreateCSR.add_argument("-s", "--state", dest='state', nargs='?', required=False, help="State.")
|
||||
parserCreateCSR.add_argument("-c", "--city", dest='city', nargs='?', required=False, help="State.")
|
||||
parserCreateCSR.add_argument("--ou", dest='organizationalUnit', nargs='?', required=False, help="Organizational Unit.")
|
||||
parserCreateCSR.add_argument("--org", dest='organization', nargs='?', required=False, help="Organization.")
|
||||
parserCreateCSR.add_argument("-o", "--output", dest='output', nargs='?', required=False, default=None, help="Output file.")
|
||||
|
||||
parserInstalleCertificate = subparsers.add_parser('pem', help='Install certificate')
|
||||
parserInstalleCertificate.add_argument("-i", "--input", dest='input', nargs='?', required=True, default=None, help="Input file.")
|
||||
|
||||
args = parser.parse_args()
|
||||
logger.info("Arguments: %s" % args)
|
||||
|
||||
if args.configFileName != None:
|
||||
try:
|
||||
configFile = open(args.configFileName, 'r')
|
||||
except Exception as e:
|
||||
logger.info('Impossible to open configuration file. Error: %s' % e)
|
||||
exit(-1)
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read_file(configFile)
|
||||
|
||||
sections = config.sections()
|
||||
|
||||
if 'TLS' in sections:
|
||||
options = config.options('TLS')
|
||||
if 'verify' in options:
|
||||
args.verify = config.getboolean('TLS', 'verify')
|
||||
if 'Login' in sections:
|
||||
options = config.options('Login')
|
||||
if 'user' in options:
|
||||
args.username = config.get('Login','user')
|
||||
if 'password' in options:
|
||||
args.password = config.get('Login','password')
|
||||
if 'host' in options:
|
||||
args.hostname = config.get('Login','host')
|
||||
if 'CSR' in sections:
|
||||
options = config.options('CSR')
|
||||
if 'org' in options:
|
||||
args.organization = config.get('CSR','org')
|
||||
if 'ou' in options:
|
||||
args.organizationalUnit = config.get('CSR','ou')
|
||||
if 'country' in options:
|
||||
args.country = config.get('CSR','country')
|
||||
if 'state' in options:
|
||||
args.state = config.get('CSR','state')
|
||||
if 'city' in options:
|
||||
args.city = config.get('CSR','city')
|
||||
|
||||
logger.info(args)
|
||||
|
||||
if args.password == None:
|
||||
logger.error('Password is mandatory')
|
||||
argparse.usage()
|
||||
if args.username == None:
|
||||
logger.error('Username is mandatory')
|
||||
argparse.usage()
|
||||
if args.hostname == None:
|
||||
logger.error('Hostname is mandatory')
|
||||
argparse.usage()
|
||||
|
||||
if args.debug:
|
||||
logger.info('Setting logging to debug mode')
|
||||
coloredlogs.set_level(level=logging.DEBUG)
|
||||
|
||||
if args.command == 'csr':
|
||||
if args.output == None:
|
||||
args.output = '%s.csr' % args.hostname
|
||||
|
||||
if args.command == 'csr':
|
||||
getCSR(hostname=args.hostname, verify=args.verify, username=args.username, password=args.password, org=args.organization, ou=args.organizationalUnit, city=args.city, state=args.state, country=args.country, filename=args.output)
|
||||
elif args.command == 'pem':
|
||||
installCertificate(hostname=args.hostname, verify=args.verify, username=args.username, password=args.password, filename=args.input)
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user