diff --git a/AzureMonitorAgent/agent.py b/AzureMonitorAgent/agent.py index 73b50008c..81c246cad 100644 --- a/AzureMonitorAgent/agent.py +++ b/AzureMonitorAgent/agent.py @@ -2714,19 +2714,32 @@ def get_settings(): '{0}.prv'.format( settings_thumbprint)) decoded_settings = base64.standard_b64decode(encoded_settings) - decrypt_cmd = 'openssl smime -inform DER -decrypt -recip {0} ' \ - '-inkey {1}'.format(encoded_cert_path, - encoded_key_path) - try: - session = subprocess.Popen([decrypt_cmd], shell = True, - stdin = subprocess.PIPE, - stderr = subprocess.STDOUT, - stdout = subprocess.PIPE) - output = session.communicate(decoded_settings) - except OSError: - pass - protected_settings_str = output[0] + + # FIPS 140-3: use 'openssl cms' (supports AES256 & DES_EDE3_CBC) with fallback to legacy 'openssl smime' + cms_cmd = 'openssl cms -inform DER -decrypt -recip {0} -inkey {1}'.format(encoded_cert_path, encoded_key_path) + smime_cmd = 'openssl smime -inform DER -decrypt -recip {0} -inkey {1}'.format(encoded_cert_path, encoded_key_path) + + protected_settings_str = None + for decrypt_cmd in [cms_cmd, smime_cmd]: + try: + session = subprocess.Popen([decrypt_cmd], shell=True, + stdin=subprocess.PIPE, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE) + output = session.communicate(decoded_settings) + # success only if return code is 0 and we have output + if session.returncode == 0 and output[0]: + protected_settings_str = output[0] + if decrypt_cmd == cms_cmd: + hutil_log_info('Decrypted protectedSettings using openssl cms.') + else: + hutil_log_info('Decrypted protectedSettings using openssl smime fallback.') + break + else: + hutil_log_info('Attempt to decrypt protectedSettings with "{0}" failed (rc={1}).'.format(decrypt_cmd, session.returncode)) + except OSError: + pass if protected_settings_str is None: log_and_exit('Enable', GenericErrorCode, 'Failed decrypting protectedSettings') diff --git a/Utils/HandlerUtil.py b/Utils/HandlerUtil.py index ef2e1a419..30f961921 100755 --- a/Utils/HandlerUtil.py +++ b/Utils/HandlerUtil.py @@ -58,6 +58,7 @@ import json import time import re +import subprocess # imp was deprecated in python 3.12 if sys.version_info >= (3, 12): import importlib @@ -195,16 +196,37 @@ def _parse_config(self, ctxt): cert = waagent.LibDir + '/' + thumb + '.crt' pkey = waagent.LibDir + '/' + thumb + '.prv' unencodedSettings = base64.standard_b64decode(protectedSettings) - openSSLcmd = "openssl smime -inform DER -decrypt -recip {0} -inkey {1}" - cleartxt = waagent.RunSendStdin(openSSLcmd.format(cert, pkey), unencodedSettings)[1] - if cleartxt is None: - self.error("OpenSSL decode error using thumbprint " + thumb) - self.do_exit(1, "Enable", 'error', '1', 'Failed to decrypt protectedSettings') + + # FIPS 140-3: use 'openssl cms' (supports AES256 & DES_EDE3_CBC) with fallback to legacy 'openssl smime' + cms_cmd = 'openssl cms -inform DER -decrypt -recip {0} -inkey {1}'.format(cert,pkey) + smime_cmd = 'openssl smime -inform DER -decrypt -recip {0} -inkey {1}'.format(cert,pkey) + + protected_settings_str = None + for decrypt_cmd in [cms_cmd, smime_cmd]: + try: + session = subprocess.Popen([decrypt_cmd], shell=True, + stdin=subprocess.PIPE, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE) + output = session.communicate(unencodedSettings) + # success only if return code is 0 and we have output + if session.returncode == 0 and output[0]: + protected_settings_str = output[0] + if decrypt_cmd == cms_cmd: + self.log('Decrypted protectedSettings using openssl cms.') + else: + self.log('Decrypted protectedSettings using openssl smime fallback.') + break + else: + self.log('Attempt to decrypt protectedSettings with "{0}" failed (rc={1}).'.format(decrypt_cmd, session.returncode)) + except OSError: + pass + jctxt = '' try: - jctxt = json.loads(cleartxt) + jctxt = json.loads(protected_settings_str) except: - self.error('JSON exception decoding ' + HandlerUtility.redact_protected_settings(cleartxt)) + self.error('JSON exception decoding ' + HandlerUtility.redact_protected_settings(protected_settings_str)) handlerSettings['protectedSettings']=jctxt self.log('Config decoded correctly.') return config