File: //home/arjun/projects/buyercall/buyercall/lib/util_boto3_kms.py
import base64
import json
from Crypto.Cipher import AES # pycryptodome
from Crypto import Random
import boto3
import struct
import sys
import os
class AwsKms():
kms_client = None
def init_app(self, app):
self.kms_client = boto3.client(
'kms',
region_name=app.config.get('KMS_REGION_NAME', ''),
aws_access_key_id=app.config.get('AMAZON_ACCESS_KEY', ''),
aws_secret_access_key=app.config.get('AMAZON_SECRET_KEY', '')
)
self.kms_arn = app.config.get('KMS_ARN', '')
self.key_spec = "AES_256"
self.chunksize = 16*1024
def generate_data_key(self):
encrypt_ctx = {"kms_cmk_id": self.kms_arn}
return self.kms_client.generate_data_key(KeyId=self.kms_arn, EncryptionContext=encrypt_ctx, KeySpec=self.key_spec)
def decrypt(self, env_key, enc_ctx):
return self.kms_client.decrypt(CiphertextBlob=env_key, EncryptionContext=enc_ctx)
def encrypt_file(self, infile):
iv = Random.new().read(AES.block_size)
size_infile = os.stat(infile).st_size
out_filename = infile + '.enc'
key_data = self.generate_data_key()
key = key_data['Plaintext']
cipher = AES.new(key, AES.MODE_CBC, iv)
with open(out_filename, 'wb') as outfile:
last_chunk_length = 0
while True:
chunk = infile.read(self.chunksize)
last_chunk_length = len(chunk)
if last_chunk_length == 0 or last_chunk_length < self.chunksize:
break
outfile.write(cipher.encrypt(chunk))
# write the final padding
length_to_pad = 16 - (last_chunk_length % 16)
# not py2 compatible
# chunk += bytes([length])*length
chunk += struct.pack('B', length_to_pad) * length_to_pad
outfile.write(cipher.encrypt(chunk))
def decrypt_file(self, s3, bucket_name, key_name, infile):
location_info = s3.get_bucket_location(Bucket=bucket_name)
bucket_region = location_info['LocationConstraint']
object_info = s3.head_object(Bucket=bucket_name, Key=key_name)
metadata = object_info['Metadata']
material_json = object_info['Metadata']['x-amz-matdesc']
# material_json is a string of json. Yes, json inside json.
envelope_key = base64.b64decode(metadata['x-amz-key-v2'])
envelope_iv = base64.b64decode(metadata['x-amz-iv'])
encrypt_ctx = json.loads(metadata['x-amz-matdesc'])
original_size = metadata['x-amz-unencrypted-content-length']
with open(infile, 'rb') as infile:
decryptor = AES.new(key_name, AES.MODE_CBC, envelope_iv)
with open('decrypted-'+infile, 'wb') as outfile:
while True:
chunk = infile.read(self.chunksize)
if len(chunk) == 0:
break
outfile.write(decryptor.decrypt(chunk))
outfile.truncate(original_size)