HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/projects/buyercall/buyercall/lib/util_boto3_kms.py
import base64
import json
from Crypto.Cipher import AES  # pycryptodome
from Crypto import Random
import boto3
import struct
import sys
import os


class AwsKms():
    kms_client = None

    def init_app(self, app):
        self.kms_client = boto3.client(
            'kms',
            region_name=app.config.get('KMS_REGION_NAME', ''),
            aws_access_key_id=app.config.get('AMAZON_ACCESS_KEY', ''),
            aws_secret_access_key=app.config.get('AMAZON_SECRET_KEY', '')
        )
        self.kms_arn = app.config.get('KMS_ARN', '')
        self.key_spec = "AES_256"
        self.chunksize = 16*1024

    def generate_data_key(self):
        encrypt_ctx = {"kms_cmk_id": self.kms_arn}
        return self.kms_client.generate_data_key(KeyId=self.kms_arn, EncryptionContext=encrypt_ctx, KeySpec=self.key_spec)

    def decrypt(self, env_key, enc_ctx):
        return self.kms_client.decrypt(CiphertextBlob=env_key, EncryptionContext=enc_ctx)

    def encrypt_file(self, infile):
        iv = Random.new().read(AES.block_size)
        size_infile = os.stat(infile).st_size
        out_filename = infile + '.enc'
        key_data = self.generate_data_key()
        key = key_data['Plaintext']

        cipher = AES.new(key, AES.MODE_CBC, iv)

        with open(out_filename, 'wb') as outfile:
            last_chunk_length = 0
            while True:
                chunk = infile.read(self.chunksize)
                last_chunk_length = len(chunk)
                if last_chunk_length == 0 or last_chunk_length < self.chunksize:
                    break
                outfile.write(cipher.encrypt(chunk))

            # write the final padding
            length_to_pad = 16 - (last_chunk_length % 16)
            # not py2 compatible
            # chunk += bytes([length])*length
            chunk += struct.pack('B', length_to_pad) * length_to_pad
            outfile.write(cipher.encrypt(chunk))

    def decrypt_file(self, s3, bucket_name, key_name, infile):
        location_info = s3.get_bucket_location(Bucket=bucket_name)
        bucket_region = location_info['LocationConstraint']
        object_info = s3.head_object(Bucket=bucket_name, Key=key_name)

        metadata = object_info['Metadata']
        material_json = object_info['Metadata']['x-amz-matdesc']
        # material_json is a string of json. Yes, json inside json.

        envelope_key = base64.b64decode(metadata['x-amz-key-v2'])
        envelope_iv = base64.b64decode(metadata['x-amz-iv'])
        encrypt_ctx = json.loads(metadata['x-amz-matdesc'])
        original_size = metadata['x-amz-unencrypted-content-length']

        with open(infile, 'rb') as infile:
            decryptor = AES.new(key_name, AES.MODE_CBC, envelope_iv)

            with open('decrypted-'+infile, 'wb') as outfile:
                while True:
                    chunk = infile.read(self.chunksize)
                    if len(chunk) == 0:
                        break
                    outfile.write(decryptor.decrypt(chunk))
                outfile.truncate(original_size)