Let’s Encrypt ACME with Alibaba Cloud API Gateway and CDN — Part 3

Let’s Encrypt API Endpoints

ACME Directory

{
"LPTIN-Jj4u0": "https://community.letsencrypt.org/t/adding-random-entries-to-the-directory/33417",
"keyChange": "https://acme-staging-v02.api.letsencrypt.org/acme/key-change",
"meta": {
"caaIdentities": [
"letsencrypt.org"
],
"termsOfService": "https://letsencrypt.org/documents/LE-SA-v1.2-November-15-2017.pdf",
"website": "https://letsencrypt.org/docs/staging-environment/"
},
"newAccount": "https://acme-staging-v02.api.letsencrypt.org/acme/new-acct",
"newNonce": "https://acme-staging-v02.api.letsencrypt.org/acme/new-nonce",
"newOrder": "https://acme-staging-v02.api.letsencrypt.org/acme/new-order",
"revokeCert": "https://acme-staging-v02.api.letsencrypt.org/acme/revoke-cert"
}

Example Code to Get the ACME Directory

""" Let's Encrypt ACME Version 2 Examples - Get Directory"""# This example will call the ACME API directory and display the returned data
# Reference: https://ietf-wg-acme.github.io/acme/draft-ietf-acme-acme.html#rfc.section.7.1.1
import sys
import requests
import helper
path = 'https://acme-staging-v02.api.letsencrypt.org/directory'headers = {
'User-Agent': 'neoprime.io-acme-client/1.0',
'Accept-Language': 'en'
}
try:
print('Calling endpoint:', path)
directory = requests.get(path, headers=headers)
except requests.exceptions.RequestException as error:
print(error)
sys.exit(1)
if directory.status_code < 200 or directory.status_code >= 300:
print('Error calling ACME endpoint:', directory.reason)
sys.exit(1)
# The output should be json. If not something is wrong
try:
acme_config = directory.json()
except Exception as ex:
print("Error: Cannot load returned data:", ex)
sys.exit(1)
print('')
print('Returned Data:')
print('****************************************')
print(directory.text)
acme_config = directory.json()print('')
print('Formatted JSON:')
print('****************************************')
helper.print_dict(acme_config, 0)

Example Code to Create a New Account

acme_config = get_directory()
url = acme_config["newAccount"]
nonce = requests.head(acme_config['newNonce']).headers['Replay-Nonce']
headers = {
'User-Agent': 'neoprime.io-acme-client/1.0',
'Accept-Language': 'en',
'Content-Type': 'application/jose+json'
}
payload = {}    payload["termsOfServiceAgreed"] = True
payload["contact"] = EmailAddresses
body_top = {
"alg": "RS256",
"jwk": myhelper.get_jwk(AccountKeyFile),
"url": url,
"nonce": nonce
}
body_top_b64 = myhelper.b64(json.dumps(body_top).encode("utf8"))
payload_b64 = myhelper.b64(json.dumps(payload).encode("utf8"))
jose = {
"protected": body_top_b64,
"payload": payload_b64,
"signature": myhelper.b64(signature)
}
resp = requests.post(url, json=jose, headers=headers)
resp.headers['Location']
resp.headers['Replay-Nonce']
Content-Type: application/jose+json
""" Let's Encrypt ACME Version 2 Examples - New Account"""# This example will call the ACME API directory and create a new account
# Reference: https://ietf-wg-acme.github.io/acme/draft-ietf-acme-acme.html#rfc.section.7.3.2
import os
import sys
import json
import requests
import myhelper
# Staging URL
path = 'https://acme-staging-v02.api.letsencrypt.org/directory'
# Production URL
# path = 'https://acme-v02.api.letsencrypt.org/directory'
AccountKeyFile = 'account.key'EmailAddresses = ['mailto:someone@eexample.com', 'mailto:someone2@eexample.com']def check_account_key_file():
""" Verify that the Account Key File exists and prompt to create if it does not exist """
if os.path.exists(AccountKeyFile) is not False:
return True
print('Error: File does not exist: {0}'.format(AccountKeyFile)) if myhelper.Confirm('Create new account private key (y/n): ') is False:
print('Cancelled')
return False
myhelper.create_rsa_private_key(AccountKeyFile) if os.path.exists(AccountKeyFile) is False:
print('Error: File does not exist: {0}'.format(AccountKeyFile))
return False
return Truedef get_directory():
""" Get the ACME Directory """
headers = {
'User-Agent': 'neoprime.io-acme-client/1.0',
'Accept-Language': 'en',
}
try:
print('Calling endpoint:', path)
directory = requests.get(path, headers=headers)
except requests.exceptions.RequestException as error:
print(error)
return False
if directory.status_code < 200 or directory.status_code >= 300:
print('Error calling ACME endpoint:', directory.reason)
print(directory.text)
return False
# The following statements are to understand the output
acme_config = directory.json()
return acme_configdef main():
""" Main Program Function """
headers = {
'User-Agent': 'neoprime.io-acme-client/1.0',
'Accept-Language': 'en',
'Content-Type': 'application/jose+json'
}
if check_account_key_file() is False:
sys.exit(1)
acme_config = get_directory() if acme_config is False:
sys.exit(1)
url = acme_config["newAccount"] # Get the URL for the terms of service
terms_service = acme_config.get("meta", {}).get("termsOfService", "")
print('Terms of Service:', terms_service)
nonce = requests.head(acme_config['newNonce']).headers['Replay-Nonce']
print('Nonce:', nonce)
print("")
# Create the account request
payload = {}
if terms_service != "":
payload["termsOfServiceAgreed"] = True
payload["contact"] = EmailAddresses payload_b64 = myhelper.b64(json.dumps(payload).encode("utf8")) body_top = {
"alg": "RS256",
"jwk": myhelper.get_jwk(AccountKeyFile),
"url": url,
"nonce": nonce
}
body_top_b64 = myhelper.b64(json.dumps(body_top).encode("utf8")) data = "{0}.{1}".format(body_top_b64, payload_b64).encode("utf8") signature = myhelper.sign(data, AccountKeyFile) #
# Create the HTML request body
#
jose = {
"protected": body_top_b64,
"payload": payload_b64,
"signature": myhelper.b64(signature)
}
try:
print('Calling endpoint:', url)
resp = requests.post(url, json=jose, headers=headers)
except requests.exceptions.RequestException as error:
resp = error.response
print(resp)
except Exception as ex:
print(ex)
except BaseException as ex:
print(ex)
if resp.status_code < 200 or resp.status_code >= 300:
print('Error calling ACME endpoint:', resp.reason)
print('Status Code:', resp.status_code)
myhelper.process_error_message(resp.text)
sys.exit(1)
print('')
if 'Location' in resp.headers:
print('Account URL:', resp.headers['Location'])
else:
print('Error: Response headers did not contain the header "Location"')
main()sys.exit(0)

Example Code to Get Account Information

[acme-neoprime]
UserAgent = neoprime.io-acme-client/1.0
# [Required] ACME account key
AccountKeyFile = account.key
# Certifcate Signing Request (CSR)
CSRFile = example.com.csr
ChainFile = example.com.chain.pem# ACME URL
# Staging URL
# https://acme-staging-v02.api.letsencrypt.org/directory
# Production URL
# https://acme-v02.api.letsencrypt.org/directory
ACMEDirectory = https://acme-staging-v02.api.letsencrypt.org/directory# Email Addresses so that LetsEncrypt can notify about SSL renewals
Contacts = mailto:example.com;mailto:someone2@example.com
# Preferred Language
Language = en
""" Let's Encrypt ACME Version 2 Examples - Get Account Information """############################################################
# This example will call the ACME API directory and get the account information
# Reference: https://ietf-wg-acme.github.io/acme/draft-ietf-acme-acme.html#rfc.section.7.3.3
#
# This program uses the AccountKeyFile set in acme.ini to return information about the ACME account.
############################################################
import sys
import json
import requests
import helper
import myhelper
############################################################
# Start - Global Variables
g_debug = 0acme_path = ''
AccountKeyFile = ''
EmailAddresses = []
headers = {}
# End - Global Variables
############################################################
############################################################
# Load the configuration from acme.ini
############################################################
def load_acme_parameters(debug=0):
""" Load the configuration from acme.ini """
global acme_path
global AccountKeyFile
global EmailAddresses
global headers
config = myhelper.load_acme_config(filename='acme.ini') if debug is not 0:
print(config.get('acme-neoprime', 'accountkeyfile'))
print(config.get('acme-neoprime', 'csrfile'))
print(config.get('acme-neoprime', 'chainfile'))
print(config.get('acme-neoprime', 'acmedirectory'))
print(config.get('acme-neoprime', 'contacts'))
print(config.get('acme-neoprime', 'language'))
acme_path = config.get('acme-neoprime', 'acmedirectory') AccountKeyFile = config.get('acme-neoprime', 'accountkeyfile') EmailAddresses = config.get('acme-neoprime', 'contacts').split(';') headers['User-Agent'] = config.get('acme-neoprime', 'UserAgent')
headers['Accept-Language'] = config.get('acme-neoprime', 'language')
headers['Content-Type'] = 'application/jose+json'
return config############################################################
#
############################################################
def get_account_url(url, nonce):
""" Get the Account URL based upon the account key """
# Create the account request
payload = {}
payload["termsOfServiceAgreed"] = True
payload["contact"] = EmailAddresses
payload["onlyReturnExisting"] = True
payload_b64 = myhelper.b64(json.dumps(payload).encode("utf8")) body_top = {
"alg": "RS256",
"jwk": myhelper.get_jwk(AccountKeyFile),
"url": url,
"nonce": nonce
}
body_top_b64 = myhelper.b64(json.dumps(body_top).encode("utf8")) #
# Create the message digest
#
data = "{0}.{1}".format(body_top_b64, payload_b64).encode("utf8") signature = myhelper.sign(data, AccountKeyFile) #
# Create the HTML request body
#
jose = {
"protected": body_top_b64,
"payload": payload_b64,
"signature": myhelper.b64(signature)
}
#
# Make the ACME request
#
try:
print('Calling endpoint:', url)
resp = requests.post(url, json=jose, headers=headers)
except requests.exceptions.RequestException as error:
resp = error.response
print(resp)
except Exception as error:
print(error)
if resp.status_code < 200 or resp.status_code >= 300:
print('Error calling ACME endpoint:', resp.reason)
print('Status Code:', resp.status_code)
myhelper.process_error_message(resp.text)
sys.exit(1)
if 'Location' in resp.headers:
print('Account URL:', resp.headers['Location'])
else:
print('Error: Response headers did not contain the header "Location"')
# Get the nonce for the next command request
nonce = resp.headers['Replay-Nonce']
account_url = resp.headers['Location'] return nonce, account_url############################################################
#
############################################################
def get_account_info(nonce, url, location):
""" Get the Account Information """
# Create the account request
payload = {}
payload_b64 = myhelper.b64(json.dumps(payload).encode("utf8")) body_top = {
"alg": "RS256",
"kid": location,
"nonce": nonce,
"url": location
}
body_top_b64 = myhelper.b64(json.dumps(body_top).encode("utf8")) #
# Create the message digest
#
data = "{0}.{1}".format(body_top_b64, payload_b64).encode("utf8") signature = myhelper.sign(data, AccountKeyFile) #
# Create the HTML request body
#
jose = {
"protected": body_top_b64,
"payload": payload_b64,
"signature": myhelper.b64(signature)
}
#
# Make the ACME request
#
try:
print('Calling endpoint:', url)
resp = requests.post(url, json=jose, headers=headers)
except requests.exceptions.RequestException as error:
resp = error.response
print(resp)
except Exception as error:
print(error)
if resp.status_code < 200 or resp.status_code >= 300:
print('Error calling ACME endpoint:', resp.reason)
print('Status Code:', resp.status_code)
myhelper.process_error_message(resp.text)
sys.exit(1)
nonce = resp.headers['Replay-Nonce'] # resp.text is the returned JSON data describing the account return nonce, resp.text############################################################
#
############################################################
def load_acme_urls(path):
""" Load the ACME Directory of URLS """
try:
print('Calling endpoint:', path)
resp = requests.get(acme_path, headers=headers)
except requests.exceptions.RequestException as error:
print(error)
sys.exit(1)
if resp.status_code < 200 or resp.status_code >= 300:
print('Error calling ACME endpoint:', resp.reason)
print(resp.text)
sys.exit(1)
return resp.json()############################################################
#
############################################################
def acme_get_nonce(urls):
""" Get the ACME Nonce that is used for the first request """
global headers
path = urls['newNonce'] try:
print('Calling endpoint:', path)
resp = requests.head(path, headers=headers)
except requests.exceptions.RequestException as error:
print(error)
return False
if resp.status_code < 200 or resp.status_code >= 300:
print('Error calling ACME endpoint:', resp.reason)
print(resp.text)
return False
return resp.headers['Replay-Nonce']############################################################
# Main Program Function
############################################################
def main(debug=0):
""" Main Program Function """
acme_urls = load_acme_urls(acme_path)
url = acme_urls["newAccount"] nonce = acme_get_nonce(acme_urls) if nonce is False:
sys.exit(1)
nonce, account_url = get_account_url(url, nonce) # resp is the returned JSON data describing the account
nonce, resp = get_account_info(nonce, account_url, account_url)
info = json.loads(resp) if debug is not 0:
print('')
print('Returned Data:')
print('##################################################')
#print(info)
helper.print_dict(info)
print('##################################################')
print('')
print('ID: ', info['id'])
print('Contact: ', info['contact'])
print('Initial IP:', info['initialIp'])
print('Created At:', info['createdAt'])
print('Status: ', info['status'])
def is_json(data):
try:
json.loads(data)
except ValueError as e:
return False
return True
acme_config = load_acme_parameters(g_debug)main(g_debug)

Summary

--

--

--

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Create your first Slack bot that can answer any queries using WolframAlpha API

GitHub Action Self Hosted Runners

Software Development Life Cycle Models

What is Biztalk Orchestration?

Real-Time Data Synchronization Based on Flink SQL CDC

Introduction to SuperPhone’s API

Mastering Your Tools: Becoming a Better Programmer Without Any Programming

Mercury Insurance login Auto, Home, Business Insurance.

Mercury Insurance  login Auto, Home, Business Insurance.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

More from Medium

Install Postgresql 13 on RHEL 8. Configure scram-sha-256 password authentication.

Command Design Pattern in Golang

Building a Simple Streaming Real-Time Chat App

How to use Redis TimeSeries