A Complete Guide for Building ML Pipeline with Athena, SQS, EKS, Keda & SageMaker
This article illustrates Scalable Modern Machine Learning & Data Pipelines using Athena, SQS, EKS, KEDA, Karpanter & SageMaker
Table of contents
- Secured Data Pipeline: Secure your (Athena, SQS) Credentials in Vault/Secret Manager for the Pods to Query Them
- Pipeline Overview: AWS S3 (Training Data) → Athena (Inventory) → SQS ←→ EKS + Keda → SageMaker → Inference EndPoint
- Data Preparation: S3→Athena
- Athena→SQS: The Batching Microservice
- Keda + SQS: Authentication & Scalability
- ScaledObject YAML for our SQS-Based Scalability for your debugging
- Karpanter: Node Scalability on EKS
- SQS→SAGEMAKER: Training MicroService Query messages from SQS and start SageMaker Training
- Train Transformer Model
- Query SQS with pydantic_sqs and run training
- Maintain a Checkpointing Strategy & Parallelism Strategy
- Production
- 3rd Party Extension: Neo4j, LangChain,OpenAI, VectorDb
[ Included: the architecture, YAMLs, and Python Code for the 2 microservices (Athena→SQS & SQS→SageMaker) to make this pipeline work. ]
Secured Data Pipeline: Secure your (Athena, SQS) Credentials in Vault/Secret Manager for the Pods to Query Them
Python Code to load the Secrets from your MicroServices/Containers
import boto3
import hvac
import json
# AWS Secrets Manager
def get_aws_secret(secret_name, region_name):
session = boto3.session.Session()
client = session.client(service_name='secretsmanager', region_name=region_name)
try:
response = client.get_secret_value(SecretId=secret_name)
if 'SecretString' in response:
secret = json.loads(response['SecretString'])
return secret
else:
return json.loads(response['SecretBinary'])
except Exception as e:
print(f"Error retrieving secret from AWS: {str(e)}")
return None
# HashiCorp Vault
def get_vault_secret(vault_url, vault_token, secret_path):
client = hvac.Client(url=vault_url, token=vault_token)
try:
if client.is_authenticated():
secret = client.secrets.kv.v2.read_secret_version(path=secret_path)
return secret['data']['data']
else:
print("Vault authentication failed")
return None
except Exception as e:
print(f"Error retrieving secret from Vault: {str(e)}")
return None
# Usage examples
aws_secret = get_aws_secret('my-aws-secret', 'us-west-2')
if aws_secret:
print("AWS Secret:", aws_secret)
vault_secret = get_vault_secret('https://vault.example.com', 's.your-vault-token', 'secret/my-vault-secret')
if vault_secret:
print("Vault Secret:", vault_secret)
Pipeline Overview: AWS S3 (Training Data) → Athena (Inventory) → SQS ←→ EKS + Keda → SageMaker → Inference EndPoint
Data Preparation: S3→Athena
Loading the Amazon S3 Inventory into Amazon Athena will allow us to prepare ourselves for as many files as we need.
Alternatives
https://github.com/MeltanoLabs/tap-athena
https://github.com/dbt-labs/dbt-athena
Resources
https://docs.aws.amazon.com/athena/latest/ug/federated-queries.html
https://docs.aws.amazon.com/athena/latest/ug/service-limits.html
Supports Multiple Resources
Athena→SQS: The Batching Microservice
Implementation: Python + Pydantic + SQLALCHEMY
Query Athena with
[ Taken https://medium.com/codex/connecting-to-aws-athena-databases-using-python-4a9194427638 ]
from urllib.parse import quote_plus
from sqlalchemy.engine import create_engineAWS_ACCESS_KEY = "AWS_ACCESS_KEY"
AWS_SECRET_KEY = "AWS_SECRET_KEY"
SCHEMA_NAME = "schema_name"
S3_STAGING_DIR = "s3://s3-results-bucket/output/"
AWS_REGION = "us-east-1"
conn_str = (
"awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@"
"athena.{region_name}.amazonaws.com:443/"
"{schema_name}s3_staging_dir{s3_staging_dir}&work_group=primary"
)
# Create the SQLAlchemy connection. Note that you need to have pyathena installed for this.
engine = create_engine(
conn_str.format(
aws_access_key_id=quote_plus(AWS_ACCESS_KEY),
aws_secret_access_key=quote_plus(AWS_SECRET_KEY),
region_name=AWS_REGION,
schema_name=SCHEMA_NAME,
s3_staging_dir=quote_plus(S3_STAGING_DIR),
)
)athena_connection = engine.connect()
SQS Message Schema/SQSModel
{ "Records": [ { "eventVersion": "2.0", "eventSource": "aws:s3", "awsRegion": "us-west-2", "eventTime": "2025-01-18T04:30:00.000Z", "eventName": "ObjectCreated:Put", "s3": { "bucket": { "name": "my-bucket", "arn": "arn:aws:s3:::my-bucket" }, "object": { "key": "file1.txt", "size": 1024, "eTag": "d41d8cd98f00b204e9800998ecf8427e" } } }, { "eventVersion": "2.0", "eventSource": "aws:s3", "awsRegion": "us-west-2", "eventTime": "2025-01-18T04:31:00.000Z", "eventName": "ObjectCreated:Put", "s3": { "bucket": { "name": "my-bucket", "arn": "arn:aws:s3:::my-bucket" }, "object": { "key": "folder/file2.jpg", "size": 2048, "eTag": "c4ca4238a0b923820dcc509a6f75849b" } } } ] }
Put into SQS (SQSModel)
[ Taken from https://github.com/andrewthetechie/pydantic-sqs ]
from pydantic_sqs import SQSModel, SQSQueue
from pydantic import Field
import asyncio
from pprint import pprint
import os
class PutS3File(SQSModel):
foo: str = Field(..., description="Foo")
async def main():
queue_kwargs = {
"queue_url": os.environ.get("SQS_QUEUE_URL"),
"endpoint_url": os.environ.get("SQS_ENDPOINT_URL", None),
"use_ssl": os.environ.get("SQS_USE_SSL", "true").lower() == "true",
}
if queue_kwargs["endpoint_url"] is None:
del queue_kwargs["endpoint_url"]
queue = SQSQueue(**queue_kwargs)
queue.register_model(ThisModel)
queue.register_model(ThatModel)
file = PutS3File(foo="trainingfile")
await file.to_sqs()
if __name__ == "__main__":
asyncio.run(main())
https://docs.aws.amazon.com/code-library/latest/ug/python_3_sqs_code_examples.html
Keda + SQS: Authentication & Scalability
Key KEDA features in this configuration:
Automatic Scaling:
Scales based on SQS queue length Can scale to zero when the queue is empty Configurable scaling thresholds
Fine-tuned Control:
pollingInterval: How often to check queue metrics
cooldownPeriod: Prevent rapid scale down
queueLength: Target messages per job
activationQueueLength: When to start scaling
Two Options Provided:
ScaledObject: For continuous workloads
ScaledJob: For batch-style processing
Authentication:
Supports IRSA (IAM Roles for Service Accounts) Can use traditional AWS credentials Pod identity or operator identity
Resources
KEDA←→SQS Authentication
apiVersion: v1
kind: Secret
metadata:
name: test-secrets
data:
AWS_ROLE_ARN: <encoded-iam-role-arn>
---
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
name: keda-trigger-auth-aws-credentials
namespace: keda-test
spec:
secretTargetRef:
- parameter: awsRoleArn # The property in KEDA.
name: test-secrets # The name of the kubernetes secret.
key: AWS_ROLE_ARN # The key from the kubernetes secret.
ScaledObject YAML for our SQS-Based Scalability for your debugging
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: aws-sqs-scaledobject
spec:
scaleTargetRef:
apiVersion: batch/v1
kind: Job
name: sqs-worker-job
pollingInterval: 30 # How frequently to check metrics
cooldownPeriod: 300 # How long to wait before scaling down
minReplicaCount: 0 # Scale to zero when no messages
maxReplicaCount: 30 # Maximum number of job replicas
triggers:
- type: aws-sqs-queue
metadata:
queueURL: https://sqs.us-west-2.amazonaws.com/123456789012/my-work-queue
queueLength: "5" # Target messages per job
activationQueueLength: "2" # Min messages to start scaling
region: us-west-2
identityOwner: operator # Use "pod" if using IRSA
authenticationRef:
name: aws-credentials
Karpanter: Node Scalability on EKS
# Karpenter Helm Chart values.yaml
## Global Settings
global:
aws:
defaultInstanceProfile: "KarpenterNodeInstanceProfile"
clusterName: "your-cluster-name"
clusterEndpoint: "https://your-cluster-endpoint.eks.amazonaws.com"
## Karpenter Controller Settings
controller:
resources:
requests:
cpu: 1
memory: 1Gi
limits:
cpu: 1
memory: 1Gi
serviceAccount:
annotations:
eks.amazonaws.com/role-arn: "arn:aws:iam::123456789012:role/KarpenterControllerRole"
settings:
aws:
defaultInstanceProfile: "KarpenterNodeInstanceProfile"
interruptionQueueName: "your-cluster-name"
batchMaxDuration: "10s"
batchIdleDuration: "1s"
## Node Template Defaults
nodeTemplate:
subnetSelector:
karpenter.sh/discovery: "your-cluster-name"
securityGroupSelector:
karpenter.sh/discovery: "your-cluster-name"
tags:
KarpenterManaged: "true"
## Node Pool Defaults
nodePoolDefaults:
limits:
resources:
cpu: 1000
memory: 1000Gi
## Provisioner Settings
provisioner:
ttlSecondsAfterEmpty: 30
## Metrics Server Settings
metrics:
enabled: true
## Webhook Settings
webhook:
enabled: true
YAML Reference for Karpanter Scalability
# Karpanter GPU
spec:
template:
spec:
containers:
- resources:
limits:
nvidia.com/gpu: "<gpu-count-here>"
Karpanter Supported GPU
SQS→SAGEMAKER: Training MicroService Query messages from SQS and start SageMaker Training
Train Transformer Model
Multi-head attention mechanism: “queries”, “keys”, and “values,” over and over again
Query SQS with pydantic_sqs and run training
from pydantic_sqs import SQSModel, SQSQueue
from pydantic import Field
import asyncio
from pprint import pprint
import os
import sagemaker
from sagemaker.huggingface import HuggingFace
from sagemaker.inputs import TrainingInput
from sagemaker.outputs import TrainingOutput
new_trainings = await queue.from_sqs(max_messages=10, wait_time_seconds=90)
pprint(new_things)
for TrainingJobs in TraningQueue:
await thing.delete_from_queue()
pprint(new_things)
print("done training messages we got from the queue")
# Setup SageMaker session and IAM role
session = sagemaker.Session()
role = sagemaker.get_execution_role()
# Define S3 paths
train_data_path = TraningQueue.source # "s3://my-bucket/train-data"
target = # "s3://my-bucket/output"
# Configure training data input
train_data = TrainingInput(
s3_data=train_data_path,
content_type="text/csv"
)
# Define hyperparameters
hyperparameters = {
"model_name": "bert-base-uncased", # or your preferred model
"rnn_type": "lstm",
"hidden_size": 512,
"num_layers": 2,
"dropout": 0.2,
"batch_size": 32,
"epochs": 10,
"learning_rate": 0.001,
"output_format": "safetensors"
}
# Create HuggingFace estimator
huggingface_estimator = HuggingFace(
entry_point='train.py', # Your training script
source_dir='scripts', # Directory containing your scripts
instance_type='ml.p3.2xlarge',
instance_count=1,
role=role,
transformers_version='4.26',
pytorch_version='1.13',
py_version='py39',
hyperparameters=hyperparameters,
output_path=output_path
)
# Start training job
huggingface_estimator.fit({'train': train_data})
Maintain a Checkpointing Strategy & Parallelism Strategy
from pathlib import Path
import json
import torch
from safetensors.torch import save_file, load_file
class CheckpointConverter:
"""Converts existing model checkpoints to safetensor format with custom configurations"""
def __init__(self, checkpoint_path: str):
"""
Initialize with path to existing checkpoint
Args:
checkpoint_path: Path to the existing checkpoint file (.ckpt or .pt)
"""
self.checkpoint_path = Path(checkpoint_path)
if not self.checkpoint_path.exists():
raise FileNotFoundError(f"Checkpoint not found: {self.checkpoint_path}")
self.state_dict = None
def load_checkpoint(self) -> None:
"""Load the existing checkpoint"""
try:
self.state_dict = torch.load(self.checkpoint_path, map_location="cpu")
if "state_dict" in self.state_dict:
self.state_dict = self.state_dict["state_dict"]
except Exception as e:
raise RuntimeError(f"Failed to load checkpoint: {e}")
def apply_config(self, config_path: str) -> None:
"""
Apply configuration from JSON file to modify the checkpoint
Args:
config_path: Path to JSON configuration file
"""
if self.state_dict is None:
self.load_checkpoint()
try:
with open(config_path, 'r') as f:
config = json.load(f)
# Apply modifications based on config
for key, modifications in config.get("modifications", {}).items():
if key in self.state_dict:
if "rename" in modifications:
# Rename key
new_key = modifications["rename"]
self.state_dict[new_key] = self.state_dict.pop(key)
if "scale" in modifications:
# Scale tensor values
scale_factor = modifications["scale"]
self.state_dict[key] *= scale_factor
if "delete" in modifications and modifications["delete"]:
# Remove key from state dict
del self.state_dict[key]
except Exception as e:
raise RuntimeError(f"Failed to apply configuration: {e}")
def save_safetensor(self, output_path: str) -> None:
"""
Save the modified checkpoint as safetensor file
Args:
output_path: Path where to save the safetensor file
"""
if self.state_dict is None:
raise RuntimeError("No checkpoint loaded")
try:
# Convert output path to Path object
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
# Save as safetensor
save_file(self.state_dict, str(output_path))
print(f"Successfully saved safetensor file to {output_path}")
except Exception as e:
raise RuntimeError(f"Failed to save safetensor file: {e}")
def main():
# Example usage
checkpoint_path = "path/to/original_checkpoint.ckpt"
config_path = "config.json"
output_path = "path/to/output.safetensors"
try:
converter = CheckpointConverter(checkpoint_path)
converter.load_checkpoint()
converter.apply_config(config_path)
converter.save_safetensor(output_path)
except Exception as e:
print(f"Error during conversion: {e}")
if __name__ == "__main__":
main()
deployment:
model_config:
tensor_parallel_size: 8 # Tensor parallelism degree
pipeline_parallel_size: 4 # Pipeline parallelism degree
max_num_batched_tokens: 8192
max_num_sequences: 256
trust_remote_code: true
dtype: "bf16" # Use bfloat16 for better performance
gpu_config:
gpu_memory_utilization: 0.95
max_num_gpus: 32
enforce_eager: false
disable_custom_all_reduce: false
checkpoint:
save_interval: 1000 # Save checkpoint every 1000 steps
checkpoint_dir: "/path/to/checkpoints"
keep_n_checkpoints: 3
checkpoint_conversion:
convert_from_hf: true
precision: "bf16"
scheduling:
block_size: 16
swap_space: 4 # GB per GPU for activation checkpointing
max_context_length: 4096
scheduler_queue_size: 2048
parallelism_strategy:
tp_configs: # Tensor Parallel configurations
- size: 1
gpu_memory: "30GB"
recommended_batch: 32
- size: 2
gpu_memory: "60GB"
recommended_batch: 64
- size: 4
gpu_memory: "120GB"
recommended_batch: 128
- size: 8
gpu_memory: "240GB"
recommended_batch: 256
pp_configs: # Pipeline Parallel configurations
- stages: 2
min_gpus: 4
memory_per_gpu: "40GB"
- stages: 4
min_gpus: 8
memory_per_gpu: "40GB"
- stages: 8
min_gpus: 16
memory_per_gpu: "40GB"
optimization:
kv_cache_config:
block_size: 16
num_blocks: 512
activation_checkpointing:
enabled: true
granularity: "selective" # Options: full, selective
cpu_offload: false
memory_efficient_linear: true
monitoring:
log_stats_interval: 10
profile_step_start: 100
profile_step_end: 200
metrics:
- gpu_memory_usage
- throughput
- latency
- cuda_memory_allocated
Define HyperParameter Optimization
hyperparameters = {
'num_class': 10,
'max_depth': 5,
'eta':0.2,
'alpha': 0.2,
'objective':'multi:softmax',
'eval_metric':'accuracy',
'num_round':200,
'early_stopping_rounds': 5}
# construct a SageMaker estimator that calls the XGBoost container
estimator = sagemaker.estimator.Estimator(
image_uri=xgboost_container,
hyperparameters=hyperparameters,
role=role,
instance_count=1,
instance_type='ml.m5.large',
volume_size=5, # 5 GB
output_path=output_path
)
Reference for Configure Observability neptune / Weight&Baises
import os
import boto3
import wandb
import neptune
from sagemaker.estimator import Estimator
from sagemaker.session import Session
from sagemaker.processing import Processor
def setup_tracking(experiment_name, neptune_project, wandb_project):
"""Initialize Neptune.ai and W&B tracking"""
# Neptune setup
neptune_run = neptune.init_run(
project=neptune_project,
api_token=os.getenv("NEPTUNE_API_TOKEN")
)
# W&B setup
wandb.init(
project=wandb_project,
name=experiment_name
)
return neptune_run
def create_sagemaker_estimator(role, instance_type='ml.m5.xlarge'):
"""Create SageMaker estimator with tracking configuration"""
sm_session = Session()
# Define custom environment variables for tracking
env = {
'WANDB_API_KEY': os.getenv('WANDB_API_KEY'),
'NEPTUNE_API_TOKEN': os.getenv('NEPTUNE_API_TOKEN')
}
estimator = Estimator(
role=role,
instance_count=1,
instance_type=instance_type,
environment=env,
sagemaker_session=sm_session
)
return estimator
def training_script():
"""Example training script with integrated tracking"""
# Import required libraries
import argparse
import os
import numpy as np
from sklearn.metrics import accuracy_score
# Initialize tracking
neptune_run = neptune.init_run(
project=os.getenv('NEPTUNE_PROJECT'),
api_token=os.getenv('NEPTUNE_API_TOKEN')
)
wandb.init(
project=os.getenv('WANDB_PROJECT')
)
def train():
# Your model training code here
for epoch in range(num_epochs):
# Training loop
train_loss = ...
val_loss = ...
accuracy = ...
# Log metrics to both platforms
neptune_run["train/loss"].append(train_loss)
neptune_run["val/loss"].append(val_loss)
neptune_run["val/accuracy"].append(accuracy)
wandb.log({
"train_loss": train_loss,
"val_loss": val_loss,
"accuracy": accuracy
})
if __name__ == "__main__":
parser = argparse.ArgumentParser()
# Add your training arguments
args = parser.parse_args()
# Start training
train()
# Close tracking
neptune_run.stop()
wandb.finish()
def monitor_training(job_name):
"""Monitor training job and sync metrics"""
client = boto3.client('sagemaker')
while True:
status = client.describe_training_job(TrainingJobName=job_name)
current_status = status['TrainingJobStatus']
if current_status == 'Completed':
print('Training job completed successfully')
break
elif current_status in ['Failed', 'Stopped']:
print(f'Training job {current_status}')
break
# Get metrics and sync
metrics = client.describe_training_job(
TrainingJobName=job_name
)['MetricData']
# Log metrics to both platforms
for metric in metrics:
neptune_run[f"sagemaker/{metric['MetricName']}"].append(metric['Value'])
wandb.log({f"sagemaker_{metric['MetricName']}": metric['Value']})
def main():
# Set up environment variables
os.environ['NEPTUNE_PROJECT'] = 'your-neptune-project'
os.environ['WANDB_PROJECT'] = 'your-wandb-project'
# Initialize tracking
experiment_name = 'sagemaker-training-job'
neptune_run = setup_tracking(
experiment_name,
os.getenv('NEPTUNE_PROJECT'),
os.getenv('WANDB_PROJECT')
)
# Create and configure SageMaker estimator
role = 'your-sagemaker-role-arn'
estimator = create_sagemaker_estimator(role)
# Start training job
estimator.fit(
wait=False,
job_name=experiment_name
)
# Monitor training and sync metrics
monitor_training(experiment_name)
# Close tracking
neptune_run.stop()
wandb.finish()
if __name__ == "__main__":
main()
Production
Deploy Inference EndPoint with API Gateway
see my article on Key considerations of going to production with SageMaker
Alternative: Deploying/Serving with vLLM on EKS with KubeAI
KubeAI is a Kubernetes Operator that enables you to deploy and manage AI models on Kubernetes. It provides a simple and scalable way to deploy vLLM in production.
Use this Tutorial to deploy vLLM to EKS.
3rd Party Extension: Neo4j, LangChain,OpenAI, VectorDb
Store your NEO4J, Langchain, OpenAI, and VectorDB in Secret Management
Centralized YAML for supporting NEO4J, LANGCHAIN_ENDPOINT Secrets:
global:
env:
- name: NEO4J_URI
valueFrom:
secretKeyRef:
name: neo4j-external-secrets
key: uri
- name: NEO4J_USERNAME
valueFrom:
secretKeyRef:
name: neo4j-external-secrets
key: username
- name: NEO4J_PASSWORD
valueFrom:
secretKeyRef:
name: neo4j-external-secrets
key: password
- name: EMBEDDING_MODEL
value: "text-embedding-ada-002"
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: openai-external-secrets
key: api-key
- name: LANGCHAIN_ENDPOINT
value: "https://api.langchain.com"
- name: LANGCHAIN_TRACING_V2
value: "true"
- name: LANGCHAIN_PROJECT
value: "default"
- name: LANGCHAIN_API_KEY
valueFrom:
secretKeyRef:
name: langchain-external-secrets
key: api-key
- name: HUGGINGFACE_API_KEY
valueFrom:
secretKeyRef:
name: huggingface-external-secrets
key: api-key
- name: HUGGINGFACE_PASSWORD
valueFrom:
secretKeyRef:
name: huggingface-external-secrets
key: password
- name: QDRANT_URL
value: "https://qdrant.qdrant.svc.cluster.local"
- name: QDRANT_API_KEY
valueFrom:
secretKeyRef:
name: qdrant-external-secrets
key: api-key
externalSecrets:
neo4j:
secretStore:
name: aws-secretsmanager
kind: SecretStore
refreshInterval: 1h
target:
name: neo4j-external-secrets
template:
type: Opaque
data:
uri: "{{ .uri | toString }}"
username: "{{ .username | toString }}"
password: "{{ .password | toString }}"
data:
- secretKey: uri
remoteRef:
key: dev/llm/neo4j
property: uri
version: AWSCURRENT
decodingStrategy: Base64
- secretKey: username
remoteRef:
key: dev/llm/neo4j
property: username
version: AWSCURRENT
decodingStrategy: Base64
- secretKey: password
remoteRef:
key: dev/llm/neo4j
property: password
version: AWSCURRENT
decodingStrategy: Base64
dataFrom:
- extract:
key: dev/llm/neo4j
version: AWSCURRENT
openai:
secretStore:
name: aws-secretsmanager
kind: SecretStore
refreshInterval: 1h
target:
name: openai-external-secrets
template:
type: Opaque
data:
api-key: "{{ .apiKey | toString }}"
dataFrom:
- extract:
key: dev/llm/openai
version: AWSCURRENT
langchain:
secretStore:
name: aws-secretsmanager
kind: SecretStore
refreshInterval: 1h
target:
name: langchain-external-secrets
template:
type: Opaque
data:
api-key: "{{ .apiKey | toString }}"
dataFrom:
- extract:
key: dev/llm/langchain
version: AWSCURRENT
huggingface:
secretStore:
name: aws-secretsmanager
kind: SecretStore
refreshInterval: 1h
target:
name: huggingface-external-secrets
template:
type: Opaque
data:
api-key: "{{ .apiKey | toString }}"
password: "{{ .password | toString }}"
dataFrom:
- extract:
key: dev/llm/huggingface
version: AWSCURRENT
qdrant:
secretStore:
name: aws-secretsmanager
kind: SecretStore
refreshInterval: 1h
target:
name: qdrant-external-secrets
template:
type: Opaque
data:
api-key: "{{ .apiKey | toString }}"
dataFrom:
- extract:
key: dev/llm/qdrant
version: AWSCURRENT
secretStore:
aws:
name: aws-secretsmanager
provider:
aws:
service: SecretsManager
region: us-east-1
auth:
secretRef:
accessKeyIDSecretRef:
name: aws-secret-manager-credentials
key: access-key-id
secretAccessKeySecretRef:
name: aws-secret-manager-credentials
key: secret-access-key
role: arn:aws:iam::123456789012:role/external-secrets-operator
kmsKeyID: arn:aws:kms:us-east-1:123456789012:key/1234abcd-12ab-34cd-56ef-1234567890ab
encryption:
type: "aes"
algorithm: "aes-256-gcm"