Source code for awsket.ket

#!/usr/bin/env python

# TO DO
# Be able to initallize DynamoDB backend

import boto3
import botocore


[docs]def get_iam_user(): """get_iam_user() Retrieves information about the specified IAM user, including the user creation date, path, unique ID, and ARN. It uses the AWS Access Key and Secret Key to retrieve the user information Returns ------- dict A dictioary scontaining details about the IAM user. """ try: client_iam = boto3.client('iam') response = client_iam.get_user() return response['User'] except botocore.exceptions.ClientError as error: response = error.response if ( response["Error"]["Code"] == "UnrecognizedClientException" and response["ResponseMetadata"]["HTTPStatusCode"] == 400 ): print("Invalid AWS Client Token") return "UnrecognizedClientException" if response["Error"]["Code"] == "AccessDenied" and response["ResponseMetadata"]["HTTPStatusCode"] == 403: print(response['Error']['Message'])
[docs]def create_s3_bucket(bucket_name, region): """Create S3 Bucket Creates an s3 bucket in AWS with the provided name and specified region. The user must have create bucket permission. Parameters ---------- bucket_name : str Name of the bucket to be created region : str Region where the bucket will be created. e.g. us-east-2. (Note: S3 and KMS must be in same region) """ try: client_s3 = boto3.client('s3', region_name=region) response = client_s3.create_bucket( ACL='private', Bucket=bucket_name, CreateBucketConfiguration={'LocationConstraint': 'us-east-2'}, ObjectLockEnabledForBucket=True, ObjectOwnership='BucketOwnerEnforced', ) except botocore.exceptions.ClientError as error: response = error.response if ( response["Error"]["Code"] == "BucketAlreadyOwnedByYou" and response["ResponseMetadata"]["HTTPStatusCode"] == 409 ): print(response['Error']['Message']) if response["Error"]["Code"] == "InvalidBucketName" and response["ResponseMetadata"]["HTTPStatusCode"] == 400: print(response['Error']['Message'])
[docs]def push_to_s3(bucket_name, remote_file_name, content, region): """Push files or texts to S3 Creates an s3 bucket in AWS with the provided name and specified region. The user must have create bucket permission. Parameters ---------- bucket_name : str Name of the bucket to be created remote_file_name : str File or object name with which it will be stored as in the remote S3 bucket content: str Text, object or file that will be pushed to S3. Often referred as the body of the request. region: str Region where the bucket is located. e.g. us-east-2. (Note: S3 and KMS must be in same region) Raises ---------- NoSuchBucket Exception If trying to push to a bucket that does nnot exists. Can be casued by typing an incorrect bucket name. AllAccessDisabled Exception Usually raised if no filename provided in the parameter """ try: print(bucket_name) client_s3 = boto3.resource('s3', region_name=region) client_s3.Bucket(bucket_name).put_object(Key=remote_file_name, Body=content) except botocore.exceptions.ClientError as error: response = error.response if response["Error"]["Code"] == "NoSuchBucket" and response["ResponseMetadata"]["HTTPStatusCode"] == 404: print(response['Error']['Message']) if response["Error"]["Code"] == "AllAccessDisabled" and response["ResponseMetadata"]["HTTPStatusCode"] == 403: print(response['Error']['Message']) except botocore.exceptions.ParamValidationError as param_error: print(param_error)
[docs]def create_kms_key(region): """Create KMS Key in AWS Creates a KMS key in AWS in the specified region. The user must have create bucket permission. Parameters ---------- region: str Region where the KMS key will be created. e.g. us-east-2. (Note: S3 and KMS must be in same region) Returns ------- str ID of the KMS key Raises ------- UnrecognizedClientException If AWS Access key does not exists AccessDeniedException If the user does not have suffiecient permission to create a KMS key """ try: client_kms = boto3.client('kms', region_name=region) response = client_kms.create_key(Description='AWS KET KMS KEY', KeyUsage='ENCRYPT_DECRYPT', Origin='AWS_KMS') return response['KeyMetadata']['KeyId'] except botocore.exceptions.ClientError as error: response = error.response if ( response["Error"]["Code"] == "UnrecognizedClientException" and response["ResponseMetadata"]["HTTPStatusCode"] == 400 ): print(response['Error']['Message']) return response['Error']['Code'] if ( response["Error"]["Code"] == "AccessDeniedException" and response["ResponseMetadata"]["HTTPStatusCode"] == 400 ): print(response['Error']['Message']) return response['Error']['Code']
[docs]def create_kms_alias(key_id, alias_name, region): """Create an Alias for KMS Key in AWS Creates an alias for kms key in AWS in the specified region. The user must have create kms alias permission. Parameters ---------- key_id: str ID of the kms key alias_name: str Alias for the kms key. Prefer a simple name that can be used. For this project it is set as alias/aws-ket region: str Region where the KMS key will be created. e.g. us-east-2. (Note: S3 and KMS must be in same region) Returns ------- str name of the alias Raises ------- ValidationException If the alias name does not follow AWS guidelines or convention AlreadyExistsException If an alias with same name already exists in the same region """ try: client_kms = boto3.client('kms', region_name=region) response = client_kms.create_alias(AliasName=alias_name, TargetKeyId=key_id) return alias_name except botocore.exceptions.ClientError as error: response = error.response if response["Error"]["Code"] == "ValidationException" and response["ResponseMetadata"]["HTTPStatusCode"] == 400: print(response['Error']['Message']) return "ValidationException" if ( response["Error"]["Code"] == "AlreadyExistsException" and response["ResponseMetadata"]["HTTPStatusCode"] == 400 ): print(response['Error']['Message']) return "AlreadyExistsException"
[docs]def check_alias(alias_name, region): """Check for AWS KMS alias Checks if an alias for kms key with specified name already exists in a region. Parameters ---------- alias_name: str Alias for the kms key. For this project it is set as alias/aws-ket region: str Region where the KMS key is be created. e.g. us-east-2. (Note: S3 and KMS must be in same region) Returns ------- str Name of the alias or None Raises ------- UnrecognizedClientException Incorrect name for alias is provided AccessDeniedException If user does not have permisions to check for kms key alias name """ try: client_kms = boto3.client('kms', region_name=region) aliases = client_kms.list_aliases() for alias in aliases['Aliases']: if alias_name == alias['AliasName']: return alias_name return None except botocore.exceptions.ClientError as error: response = error.response if ( response["Error"]["Code"] == "UnrecognizedClientException" and response["ResponseMetadata"]["HTTPStatusCode"] == 400 ): print("Invalid AWS Client Token") return "UnrecognizedClientException" if ( response["Error"]["Code"] == "AccessDeniedException" and response["ResponseMetadata"]["HTTPStatusCode"] == 400 ): print(response['Error']['Message']) return response['Error']['Code']
[docs]def encrypt_text(kms_key, text, region): """Encrypt text using KMS Encrypts plaintext of up to 4,096 bytes using a KMS key from the specified region. Parameters ---------- kms_key: str This can be either kms key id, kms arn, alias or alias arn text: str Either plain text or any type of content that needs to be encrypted region: str Region where the KMS key is located. e.g. us-east-2. (Note: S3 and KMS must be in same region) Returns ------- dict CiphertextBlob Raises ------- UnrecognizedClientException Incorrect name for alias is provided AccessDeniedException If user does not have permisions to encrypt using kms key or alias name """ try: client_kms = boto3.client('kms', region_name=region) encrypted_kms_request = client_kms.encrypt(KeyId=kms_key, Plaintext=text) encrypted_string = encrypted_kms_request["CiphertextBlob"] return encrypted_string except botocore.exceptions.ClientError as error: response = error.response if response["Error"]["Code"] == "NotFoundException" and response["ResponseMetadata"]["HTTPStatusCode"] == 400: print(response['Error']['Message']) return response['Error']['Code'] if response['Error']['Code'] == "ValidationException" and response["ResponseMetadata"]["HTTPStatusCode"] == 400: print('The text you provided must be greater than 0 characters in length') return response['Error']['Code'] if ( response['Error']['Code'] == "AccessDeniedException" and response["ResponseMetadata"]["HTTPStatusCode"] == 400 ): print(response['Error']['Message']) return response['Error']['Code'] except botocore.exceptions.ParamValidationError as param_error: print(param_error)
[docs]def encrypt_file(kms_key, file_name, region): """Encrypt file using KMS Encrypts the content of a specified file using a KMS key. Parameters ---------- kms_key: str This can be either kms key id, kms arn, alias or alias arn file_name: str File name or path to a file that needs to be encrypted region: str Region where the KMS key is located. e.g. us-east-2. (Note: S3 and KMS must be in same region) Returns ------- dict CiphertextBlob Raises ------- NotFoundException Incorrect name of file or file does not exist ValidationException The file must contain more than 0 characters text AccessDeniedException If user does not have permisions to check for kms key alias name ParamValidationError Incorrect parameter passed to the function """ try: client_kms = boto3.client('kms', region_name=region) file = open(file_name, "r") encrypted_kms_request = client_kms.encrypt(KeyId=kms_key, Plaintext=file.read()) encrypted_string = encrypted_kms_request["CiphertextBlob"] return encrypted_string except FileNotFoundError as error: print(error) except botocore.exceptions.ClientError as error: response = error.response if response["Error"]["Code"] == "NotFoundException" and response["ResponseMetadata"]["HTTPStatusCode"] == 400: print(response['Error']['Message']) return response['Error']['Code'] if response['Error']['Code'] == "ValidationException" and response["ResponseMetadata"]["HTTPStatusCode"] == 400: print('The text you provided must be greater than 0 characters in length') return response['Error']['Code'] if ( response['Error']['Code'] == "AccessDeniedException" and response["ResponseMetadata"]["HTTPStatusCode"] == 400 ): print(response['Error']['Message']) return response['Error']['Code'] except botocore.exceptions.ParamValidationError as param_error: print(param_error)
[docs]def decrypt_text(bucket_name, remote_file_name, kms_key_id, region): """Decrypt file from S3 using KMS Decrypts file from S3 using KMS key. Parameters ---------- bucket_name: str Bucket where the enrypted file is stored remote_file_name: str Object name or path to an object in S3 that needs to be decrypted kms_key_id: str This can be either kms key id, kms arn, alias or alias arn region: str Region where the KMS key is located. e.g. us-east-2. (Note: S3 and KMS must be in same region) Returns ------- str Plaintext Raises ------- NoSuchBucket Incorrect bucket name provided NoSuchKey Incorret object name or object does not exist in S3 IncorrectKeyException The key does not match with what was used to encrypt the file ParamValidationError Incorrect parameter passed to the function """ try: client_kms = boto3.client('kms', region_name=region) client_s3 = boto3.client('s3', region_name=region) s3_object = client_s3.get_object(Bucket=bucket_name, Key=remote_file_name) body = s3_object["Body"].read() kms_decrypt_request = client_kms.decrypt(CiphertextBlob=body, KeyId=kms_key_id) kms_decrypted_text = kms_decrypt_request["Plaintext"].decode("UTF-8") return kms_decrypted_text except botocore.exceptions.ClientError as error: response = error.response if response["Error"]["Code"] == "NoSuchBucket" and response["ResponseMetadata"]["HTTPStatusCode"] == 404: print(response['Error']['Message']) if response["Error"]["Code"] == "NoSuchKey" and response["ResponseMetadata"]["HTTPStatusCode"] == 404: print(f'The specified file: {remote_file_name} in {bucket_name} bucket does not exist') if ( response["Error"]["Code"] == "IncorrectKeyException" and response["ResponseMetadata"]["HTTPStatusCode"] == 400 ): print(response['Error']['Message']) except botocore.exceptions.ParamValidationError as param_error: print(param_error)
[docs]def save_to_file(file_name, decrypted_string): """Save file Saves the derypted text to a local file Parameters ---------- file_name: str Local file name where the decrypted output will be sotred decrypted_string: str Decrypted utput in string format. """ file = open(file_name, "w") file.write(decrypted_string) file.close()