Source code for azure.keyvault.custom.key_vault_id

# ---------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# ---------------------------------------------------------------------------------------------

try:
    import urllib.parse as parse
except ImportError:
    import urlparse as parse # pylint: disable=import-error

from enum import Enum


class KeyVaultCollectionType(Enum):
    keys = 'keys'
    secrets = 'secrets'
    certificates = 'certificates'
    certificate_issuers = 'certificates/issuers'


[docs]class KeyVaultId(object): """ An identifier for an Azure Key Vault resource. """ version_none = '' def __init__(self, collection, vault, name, version): """ :param collection: The resource collection type. :type collection: str :param vault: The vault URI. :type vault: str :param name: The resource name. :type name: str :param version: The resource version. :type version: str """ self.vault = vault self.name = name self.collection = collection self.version = version or KeyVaultId.version_none def __str__(self): """ :return: The identifier string of the current KeyVaultId :rtype: str """ return self.id @property def id(self): return '{}/{}'.format(self.base_id, self.version) if self.version != KeyVaultId.version_none else self.base_id @property def base_id(self): return '{}/{}/{}'.format(self.vault, self.collection, self.name)
[docs] @staticmethod def create_object_id(collection, vault, name, version): """ :param collection: The resource collection type. :type collection: str :param vault: The vault URI. :type vault: str :param name: The resource name. :type name: str :param version: The resource version. :type version: str :rtype: KeyVaultId """ collection = _validate_string_argument(collection, 'collection') vault = _validate_string_argument(vault, 'vault') name = _validate_string_argument(name, 'name') version = _validate_string_argument(version, 'version', True) _parse_uri_argument(vault) # check that vault is a valid URI but don't change it return KeyVaultIdentifier(collection=collection, vault=vault, name=name, version=version)
[docs] @staticmethod def parse_object_id(collection, id): """ :param collection: The resource collection type. :type collection: str :param id: The resource uri. :type id: str :rtype: KeyVaultId """ collection = _validate_string_argument(collection, 'collection') return KeyVaultIdentifier(uri=id, collection=collection)
[docs] @staticmethod def create_key_id(vault, name, version=None): """ :param vault: The vault uri. :type vault: str :param name: The key name. :type name: str :param version: The key version. :type version: str :rtype: KeyVaultId """ return KeyId(vault=vault, name=name, version=version)
[docs] @staticmethod def parse_key_id(id): """ :param id: The key uri. :type id: str :rtype: KeyVaultId """ return KeyId(id)
[docs] @staticmethod def create_secret_id(vault, name, version=None): """ :param vault: The vault uri. :type vault: str :param name: The secret name. :type name: str :param version: The secret version. :type version: str :rtype: KeyVaultId """ return SecretId(vault=vault, name=name, version=version)
[docs] @staticmethod def parse_secret_id(id): """ :param id: The secret uri. :type id: str :rtype: KeyVaultId """ return SecretId(id)
[docs] @staticmethod def create_certificate_id(vault, name, version=None): """ :param vault: The vault uri. :type vault: str :param name: The certificate name. :type name: str :param version: The certificate version. :type version: str :rtype: KeyVaultId """ return CertificateId(vault=vault, name=name, version=version)
[docs] @staticmethod def parse_certificate_id(id): """ :param id: The resource collection type. :type id: str :rtype: KeyVaultId """ return CertificateId(id)
[docs] @staticmethod def create_certificate_operation_id(vault, name): """ :param vault: The vault uri. :type vault: str :param name: The certificate name. :type name: str :rtype: KeyVaultId """ return CertificateOperationId(vault=vault, name=name)
[docs] @staticmethod def parse_certificate_operation_id(id): """ :param id: The resource collection type. :type id: str :rtype: KeyVaultId """ return CertificateOperationId(id)
[docs] @staticmethod def create_certificate_issuer_id(vault, name): """ :param vault: The vault uri. :type vault: str :param name: The certificate name. :type name: str :rtype: KeyVaultId """ return CertificateIssuerId(vault=vault, name=name)
[docs] @staticmethod def parse_certificate_issuer_id(id): """ :param id: The resource collection type. :type id: str :rtype: KeyVaultId """ return CertificateIssuerId(id)
class KeyVaultIdentifier(KeyVaultId): _id_format = '{vault}/{collection}/{name}/{version?}' version_none = '' def __init__(self, uri=None, **kwargs): """ Creates a KeyVaultIdentifier based of the specified uri or keyword arguments :param uri: The uri of the key vault object identifier :param kwargs: The format parameters for the key vault object identifier. If uri is specified these are used to validate the components of the uri. """ self.version = KeyVaultIdentifier.version_none # add all the keyword arguments as attributes for key, value in kwargs.items(): self.__dict__[key] = _validate_string_argument(value, key, True) self.version = self.version or KeyVaultIdentifier.version_none # if uri is specified parse the segment values from the specified uri if uri: self._parse(uri, kwargs) @property def id(self): """ :return: The full key vault object identifier uri """ return self._format() @property def base_id(self): """ :return: The version-less key vault object identifier uri, """ return self._format(fmt=self._id_format.replace('/{version?}', '')) def _format(self, fmt=None): """ Formats the KeyVaultIdentifier into a identifier uri based of the specified format string :param fmt: The format string for the identifier uri :return: The formatted key vault object identifier uri """ # if no fmt was specified use the _id_format from the current object fmt = fmt or self._id_format segments = [] # split the formatting string into segments for fmt_seg in fmt.split('/'): # if the segment is a substitution element if fmt_seg.startswith('{') and fmt_seg.endswith('}'): # get the attribute name from the segment element fmt_seg = fmt_seg[1:-1] fmt_prop = fmt_seg.rstrip('?') # get the value of the attribute from the current object seg_val = getattr(self, fmt_prop) # if the attribute is specified add the value to the formatted segments if seg_val: segments.append(seg_val.strip('/').strip()) # if the attribute is not specified and the substitution is not optional raise an error else: if not fmt_seg.endswith('?'): raise ValueError('invalid id: No value specified for the required segment "{}"'.format(fmt_prop)) # if the segment is a literal element simply add it to the formatted segments else: segments.append(fmt_seg) # join all the formatted segments together return '/'.join(segments) def _parse(self, uri, validation_args): """ Parses the specified uri, using _id_format as a format string, and sets the parsed format arguments as attributes on the current id object. :param uri: The key vault identifier uri to be parsed :param validation_args: format arguments to be validated :return: None """ def format_error(): return ValueError('invalid id: The specified uri "{}", does to match the specified format "{}"'.format(uri, self._id_format)) uri = _validate_string_argument(uri, 'uri') parsed_uri = _parse_uri_argument(uri) # split all the id segments from the uri path using and insert the host as the first segment id_segs = list(filter(None, parsed_uri.path.split('/'))) id_segs.insert(0, '{}://{}'.format(parsed_uri.scheme, parsed_uri.hostname)) # split the format segments from the classes format string fmt_segs = list(filter(None, self._id_format.split('/'))) for ix in range(len(fmt_segs)): # get the format segment and the id segment fmt_seg = fmt_segs[ix] id_seg = id_segs.pop(0) if len(id_segs) > 0 else '' # if the segment is a substitution element if fmt_seg.startswith('{') and fmt_seg.endswith('}'): prop = fmt_seg[1:-1] prop_strip = prop.rstrip('?') # if the segment is not present in the specified uri and is not optional raise an error if not id_seg and not prop.endswith('?'): raise format_error() # if the segment is in the segments to validate and doesn't match the expected vault raise an error if prop_strip in validation_args and validation_args[prop_strip] and validation_args[prop_strip] != id_seg: if id_seg and not prop.endswith('?'): raise ValueError('invalid id: The {} "{}" does not match the expected "{}"'.format(prop, id_seg, validation_args[prop_strip])) # set the attribute to the value parsed from the uri self.__dict__[prop_strip] = id_seg # otherwise the segment is a literal element else: # if the value parsed from the uri doesn't match the literal value from the format string raise an error if not fmt_seg == id_seg: raise format_error() # if there are still segments left in the uri which were not accounted for in the format string raise an error if len(id_segs) > 0: raise format_error()
[docs]class KeyId(KeyVaultIdentifier): _id_format = '{vault}/{collection}/{name}/{version?}' def __init__(self, uri=None, vault=None, name=None, version=None): """ Creates a key vault key id. If uri is specified the id properties are parsed from the uri, otherwise builds the id from the specified vault, name and version. :param uri: The uri of the key vault key :param vault: The vault uri :param name: The key name :param version: The key version """ super(KeyId, self).__init__(uri=uri, collection='keys', vault=vault, name=name, version=version)
[docs]class SecretId(KeyVaultIdentifier): _id_format = '{vault}/{collection}/{name}/{version?}' def __init__(self, uri=None, vault=None, name=None, version=None): """ Creates a key vault secret id. If uri is specified the id properties are parsed from the uri, otherwise builds the id from the specified vault, name and version. :param uri: The uri of the key vault secret :param vault: The vault uri :param name: The secret name :param version: The secret version """ super(SecretId, self).__init__(uri=uri, collection='secrets', vault=vault, name=name, version=version)
[docs]class CertificateId(KeyVaultIdentifier): _id_format = '{vault}/{collection}/{name}/{version?}' def __init__(self, uri=None, vault=None, name=None, version=None): """ Creates a key vault certificate id. If uri is specified the id properties are parsed from the uri, otherwise builds the id from the specified vault, name and version. :param uri: The uri of the key vault certificate :param vault: The vault uri :param name: The certificate name :param version: The certificate version """ super(CertificateId, self).__init__(uri=uri, collection='certificates', vault=vault, name=name, version=version)
[docs]class CertificateOperationId(KeyVaultIdentifier): _id_format = '{vault}/{collection}/{name}/{version?}' def __init__(self, uri=None, vault=None, name=None): """ Creates a key vault certificate operation id. If uri is specified the id properties are parsed from the uri, otherwise builds the id from the specified vault and name. :param uri: The uri of the key vault certificate operation :param vault: The vault uri :param name: The certificate name """ super(CertificateOperationId, self).__init__(uri=uri, collection='certificates', version='pending', vault=vault, name=name)
[docs]class CertificateIssuerId(KeyVaultIdentifier): _id_format = '{vault}/{collection}/issuers/{name}' def __init__(self, uri=None, vault=None, name=None): """ Creates a key vault certificate issuer id. If uri is specified the id properties are parsed from the uri, otherwise builds the id from the specified vault and name. :param uri: The uri of the key vault certificate issuer :param vault: The vault uri :param name: The certificate issuer name """ super(CertificateIssuerId, self).__init__(uri=uri, collection='certificates', vault=vault, name=name)
[docs]class StorageAccountId(KeyVaultIdentifier): _id_format = '{vault}/{collection}/{name}' def __init__(self, uri=None, vault=None, name=None): """ Creates a key vault storage account id. If uri is specified the id properties are parsed from the uri, otherwise builds the id from the specified vault and name. :param uri: The uri of the key vault storage account :param vault: The vault uri :param name: The storage account name """ super(StorageAccountId, self).__init__(uri=uri, collection='storage', vault=vault, name=name)
[docs]class StorageSasDefinitionId(KeyVaultIdentifier): _id_format = '{vault}/{collection}/{account_name}/sas/{sas_definition}' def __init__(self, uri=None, vault=None, account_name=None, sas_definition=None): """ Creates a key vault storage account sas definition id. If uri is specified the id properties are parsed from the uri, otherwise builds the id from the specified vault, account_name, and sas_definition. :param uri: The uri of the key vault storage account sas definition :param vault: The vault uri :param account_name: The storage account name :param sas_definition: The sas definition name """ super(StorageSasDefinitionId, self).__init__(uri=uri, collection='storage', vault=vault, account_name=account_name, sas_definition=sas_definition)
def _validate_string_argument(prop, name, nullable=False): try: prop = prop.strip() except AttributeError: if not nullable: raise TypeError("argument '{}' must by of type string".format(name)) prop = prop if prop else None # force falsy types to None if not prop and not nullable: raise ValueError("argument '{}' must be specified".format(name)) return prop def _parse_uri_argument(uri): try: parsed_uri = parse.urlparse(uri) except Exception: # pylint: disable=broad-except raise ValueError("'{}' is not not a valid URI".format(uri)) if not (parsed_uri.scheme and parsed_uri.hostname): raise ValueError("'{}' is not not a valid URI".format(uri)) return parsed_uri