A Complete Guide for Building ML Pipeline with Athena, SQS, EKS, Keda & SageMaker

A Complete Guide for Building ML Pipeline with Athena, SQS, EKS, Keda & SageMaker

This article illustrates Scalable Modern Machine Learning & Data Pipelines using Athena, SQS, EKS, KEDA, Karpanter & SageMaker

·

11 min read

[ Included: the architecture, YAMLs, and Python Code for the 2 microservices (Athena→SQS & SQS→SageMaker) to make this pipeline work. ]

Secured Data Pipeline: Secure your (Athena, SQS) Credentials in Vault/Secret Manager for the Pods to Query Them

Python Code to load the Secrets from your MicroServices/Containers

import boto3
import hvac
import json

# AWS Secrets Manager
def get_aws_secret(secret_name, region_name):
    session = boto3.session.Session()
    client = session.client(service_name='secretsmanager', region_name=region_name)

    try:
        response = client.get_secret_value(SecretId=secret_name)
        if 'SecretString' in response:
            secret = json.loads(response['SecretString'])
            return secret
        else:
            return json.loads(response['SecretBinary'])
    except Exception as e:
        print(f"Error retrieving secret from AWS: {str(e)}")
        return None

# HashiCorp Vault
def get_vault_secret(vault_url, vault_token, secret_path):
    client = hvac.Client(url=vault_url, token=vault_token)

    try:
        if client.is_authenticated():
            secret = client.secrets.kv.v2.read_secret_version(path=secret_path)
            return secret['data']['data']
        else:
            print("Vault authentication failed")
            return None
    except Exception as e:
        print(f"Error retrieving secret from Vault: {str(e)}")
        return None

# Usage examples
aws_secret = get_aws_secret('my-aws-secret', 'us-west-2')
if aws_secret:
    print("AWS Secret:", aws_secret)

vault_secret = get_vault_secret('https://vault.example.com', 's.your-vault-token', 'secret/my-vault-secret')
if vault_secret:
    print("Vault Secret:", vault_secret)

Pipeline Overview: AWS S3 (Training Data) → Athena (Inventory) → SQS ←→ EKS + Keda → SageMaker → Inference EndPoint

Data Preparation: S3→Athena

Loading the Amazon S3 Inventory into Amazon Athena will allow us to prepare ourselves for as many files as we need.

Alternatives

https://github.com/MeltanoLabs/tap-athena

https://github.com/dbt-labs/dbt-athena

Resources

https://docs.aws.amazon.com/athena/latest/ug/federated-queries.html

https://docs.aws.amazon.com/athena/latest/ug/service-limits.html

Supports Multiple Resources

Athena→SQS: The Batching Microservice

Implementation: Python + Pydantic + SQLALCHEMY

  1. Query Athena with

    [ Taken https://medium.com/codex/connecting-to-aws-athena-databases-using-python-4a9194427638 ]

from urllib.parse import quote_plus
from sqlalchemy.engine import create_engineAWS_ACCESS_KEY = "AWS_ACCESS_KEY"
AWS_SECRET_KEY = "AWS_SECRET_KEY"
SCHEMA_NAME = "schema_name"
S3_STAGING_DIR = "s3://s3-results-bucket/output/"
AWS_REGION = "us-east-1"


conn_str = (
    "awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@"
    "athena.{region_name}.amazonaws.com:443/"
    "{schema_name}s3_staging_dir{s3_staging_dir}&work_group=primary"
)


# Create the SQLAlchemy connection. Note that you need to have pyathena installed for this.
engine = create_engine(
    conn_str.format(
        aws_access_key_id=quote_plus(AWS_ACCESS_KEY),
        aws_secret_access_key=quote_plus(AWS_SECRET_KEY),
        region_name=AWS_REGION,
        schema_name=SCHEMA_NAME,
        s3_staging_dir=quote_plus(S3_STAGING_DIR),
    )
)athena_connection = engine.connect()
  1. SQS Message Schema/SQSModel

     {
       "Records": [
         {
           "eventVersion": "2.0",
           "eventSource": "aws:s3",
           "awsRegion": "us-west-2",
           "eventTime": "2025-01-18T04:30:00.000Z",
           "eventName": "ObjectCreated:Put",
           "s3": {
             "bucket": {
               "name": "my-bucket",
               "arn": "arn:aws:s3:::my-bucket"
             },
             "object": {
               "key": "file1.txt",
               "size": 1024,
               "eTag": "d41d8cd98f00b204e9800998ecf8427e"
             }
           }
         },
         {
           "eventVersion": "2.0",
           "eventSource": "aws:s3",
           "awsRegion": "us-west-2",
           "eventTime": "2025-01-18T04:31:00.000Z",
           "eventName": "ObjectCreated:Put",
           "s3": {
             "bucket": {
               "name": "my-bucket",
               "arn": "arn:aws:s3:::my-bucket"
             },
             "object": {
               "key": "folder/file2.jpg",
               "size": 2048,
               "eTag": "c4ca4238a0b923820dcc509a6f75849b"
             }
           }
         }
       ]
     }
    
  2. Put into SQS (SQSModel)

[ Taken from https://github.com/andrewthetechie/pydantic-sqs ]

from pydantic_sqs import SQSModel, SQSQueue
from pydantic import Field
import asyncio
from pprint import pprint
import os


class PutS3File(SQSModel):
    foo: str = Field(..., description="Foo")


async def main():
    queue_kwargs = {
        "queue_url": os.environ.get("SQS_QUEUE_URL"),
        "endpoint_url": os.environ.get("SQS_ENDPOINT_URL", None),
        "use_ssl": os.environ.get("SQS_USE_SSL", "true").lower() == "true",
    }
    if queue_kwargs["endpoint_url"] is None:
        del queue_kwargs["endpoint_url"]

    queue = SQSQueue(**queue_kwargs)

    queue.register_model(ThisModel)
    queue.register_model(ThatModel)

    file = PutS3File(foo="trainingfile")
    await file.to_sqs()

if __name__ == "__main__":
    asyncio.run(main())

https://docs.aws.amazon.com/code-library/latest/ug/python_3_sqs_code_examples.html

Keda + SQS: Authentication & Scalability

Key KEDA features in this configuration:

Automatic Scaling:

Scales based on SQS queue length Can scale to zero when the queue is empty Configurable scaling thresholds

Fine-tuned Control:

pollingInterval: How often to check queue metrics

cooldownPeriod: Prevent rapid scale down

queueLength: Target messages per job

activationQueueLength: When to start scaling

Two Options Provided:

ScaledObject: For continuous workloads

ScaledJob: For batch-style processing

Authentication:

Supports IRSA (IAM Roles for Service Accounts) Can use traditional AWS credentials Pod identity or operator identity

Resources

Keda-SQS

SQS Keda/Karpenter EKS

AWS Keda/Karpanter Git

KEDA←→SQS Authentication


apiVersion: v1
kind: Secret
metadata:
  name: test-secrets
data:
  AWS_ROLE_ARN: <encoded-iam-role-arn>
---
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
  name: keda-trigger-auth-aws-credentials
  namespace: keda-test
spec:
  secretTargetRef:
  - parameter: awsRoleArn    # The property in KEDA.
    name: test-secrets       # The name of the kubernetes secret.
    key: AWS_ROLE_ARN        # The key from the kubernetes secret.

ScaledObject YAML for our SQS-Based Scalability for your debugging

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: aws-sqs-scaledobject
spec:
  scaleTargetRef:
    apiVersion: batch/v1
    kind: Job
    name: sqs-worker-job
  pollingInterval: 30  # How frequently to check metrics
  cooldownPeriod: 300 # How long to wait before scaling down
  minReplicaCount: 0  # Scale to zero when no messages
  maxReplicaCount: 30 # Maximum number of job replicas
  triggers:
  - type: aws-sqs-queue
    metadata:
      queueURL: https://sqs.us-west-2.amazonaws.com/123456789012/my-work-queue
      queueLength: "5"  # Target messages per job
      activationQueueLength: "2" # Min messages to start scaling
      region: us-west-2
      identityOwner: operator # Use "pod" if using IRSA
    authenticationRef:
      name: aws-credentials

Karpanter: Node Scalability on EKS

# Karpenter Helm Chart values.yaml

## Global Settings
global:
  aws:
    defaultInstanceProfile: "KarpenterNodeInstanceProfile"
    clusterName: "your-cluster-name"
    clusterEndpoint: "https://your-cluster-endpoint.eks.amazonaws.com"

## Karpenter Controller Settings
controller:
  resources:
    requests:
      cpu: 1
      memory: 1Gi
    limits:
      cpu: 1
      memory: 1Gi

  serviceAccount:
    annotations:
      eks.amazonaws.com/role-arn: "arn:aws:iam::123456789012:role/KarpenterControllerRole"

  settings:
    aws:
      defaultInstanceProfile: "KarpenterNodeInstanceProfile"
      interruptionQueueName: "your-cluster-name"
    batchMaxDuration: "10s"
    batchIdleDuration: "1s"

## Node Template Defaults
nodeTemplate:
  subnetSelector:
    karpenter.sh/discovery: "your-cluster-name"
  securityGroupSelector:
    karpenter.sh/discovery: "your-cluster-name"
  tags:
    KarpenterManaged: "true"

## Node Pool Defaults
nodePoolDefaults:
  limits:
    resources:
      cpu: 1000
      memory: 1000Gi

## Provisioner Settings
provisioner:
  ttlSecondsAfterEmpty: 30

## Metrics Server Settings
metrics:
  enabled: true

## Webhook Settings
webhook:
  enabled: true

YAML Reference for Karpanter Scalability

# Karpanter GPU
spec:
  template:
    spec:
      containers:
      - resources:
          limits:
            nvidia.com/gpu: "<gpu-count-here>"

Karpanter Supported GPU

SQS→SAGEMAKER: Training MicroService Query messages from SQS and start SageMaker Training

Train Transformer Model

Multi-head attention mechanism: “queries”, “keys”, and “values,” over and over again

Query SQS with pydantic_sqs and run training


from pydantic_sqs import SQSModel, SQSQueue
from pydantic import Field
import asyncio
from pprint import pprint
import os
import sagemaker
from sagemaker.huggingface import HuggingFace
from sagemaker.inputs import TrainingInput
from sagemaker.outputs import TrainingOutput

    new_trainings = await queue.from_sqs(max_messages=10, wait_time_seconds=90)
    pprint(new_things)
    for TrainingJobs in TraningQueue:
        await thing.delete_from_queue()

    pprint(new_things)
    print("done training messages we got from the queue")
# Setup SageMaker session and IAM role
session = sagemaker.Session()
role = sagemaker.get_execution_role()

# Define S3 paths
train_data_path = TraningQueue.source # "s3://my-bucket/train-data"
target = # "s3://my-bucket/output"

# Configure training data input
train_data = TrainingInput(
    s3_data=train_data_path,
    content_type="text/csv"
)

# Define hyperparameters
hyperparameters = {
    "model_name": "bert-base-uncased",  # or your preferred model
    "rnn_type": "lstm",
    "hidden_size": 512,
    "num_layers": 2,
    "dropout": 0.2,
    "batch_size": 32,
    "epochs": 10,
    "learning_rate": 0.001,
    "output_format": "safetensors"
}

# Create HuggingFace estimator
huggingface_estimator = HuggingFace(
    entry_point='train.py',  # Your training script
    source_dir='scripts',    # Directory containing your scripts
    instance_type='ml.p3.2xlarge',
    instance_count=1,
    role=role,
    transformers_version='4.26',
    pytorch_version='1.13',
    py_version='py39',
    hyperparameters=hyperparameters,
    output_path=output_path
)

# Start training job
huggingface_estimator.fit({'train': train_data})

Maintain a Checkpointing Strategy & Parallelism Strategy

from pathlib import Path
import json
import torch
from safetensors.torch import save_file, load_file

class CheckpointConverter:
    """Converts existing model checkpoints to safetensor format with custom configurations"""

    def __init__(self, checkpoint_path: str):
        """
        Initialize with path to existing checkpoint

        Args:
            checkpoint_path: Path to the existing checkpoint file (.ckpt or .pt)
        """
        self.checkpoint_path = Path(checkpoint_path)
        if not self.checkpoint_path.exists():
            raise FileNotFoundError(f"Checkpoint not found: {self.checkpoint_path}")

        self.state_dict = None

    def load_checkpoint(self) -> None:
        """Load the existing checkpoint"""
        try:
            self.state_dict = torch.load(self.checkpoint_path, map_location="cpu")
            if "state_dict" in self.state_dict:
                self.state_dict = self.state_dict["state_dict"]
        except Exception as e:
            raise RuntimeError(f"Failed to load checkpoint: {e}")

    def apply_config(self, config_path: str) -> None:
        """
        Apply configuration from JSON file to modify the checkpoint

        Args:
            config_path: Path to JSON configuration file
        """
        if self.state_dict is None:
            self.load_checkpoint()

        try:
            with open(config_path, 'r') as f:
                config = json.load(f)

            # Apply modifications based on config
            for key, modifications in config.get("modifications", {}).items():
                if key in self.state_dict:
                    if "rename" in modifications:
                        # Rename key
                        new_key = modifications["rename"]
                        self.state_dict[new_key] = self.state_dict.pop(key)

                    if "scale" in modifications:
                        # Scale tensor values
                        scale_factor = modifications["scale"]
                        self.state_dict[key] *= scale_factor

                    if "delete" in modifications and modifications["delete"]:
                        # Remove key from state dict
                        del self.state_dict[key]

        except Exception as e:
            raise RuntimeError(f"Failed to apply configuration: {e}")

    def save_safetensor(self, output_path: str) -> None:
        """
        Save the modified checkpoint as safetensor file

        Args:
            output_path: Path where to save the safetensor file
        """
        if self.state_dict is None:
            raise RuntimeError("No checkpoint loaded")

        try:
            # Convert output path to Path object
            output_path = Path(output_path)
            output_path.parent.mkdir(parents=True, exist_ok=True)

            # Save as safetensor
            save_file(self.state_dict, str(output_path))
            print(f"Successfully saved safetensor file to {output_path}")

        except Exception as e:
            raise RuntimeError(f"Failed to save safetensor file: {e}")

def main():
    # Example usage
    checkpoint_path = "path/to/original_checkpoint.ckpt"
    config_path = "config.json"
    output_path = "path/to/output.safetensors"

    try:
        converter = CheckpointConverter(checkpoint_path)
        converter.load_checkpoint()
        converter.apply_config(config_path)
        converter.save_safetensor(output_path)

    except Exception as e:
        print(f"Error during conversion: {e}")

if __name__ == "__main__":
    main()
deployment:
  model_config:
    tensor_parallel_size: 8  # Tensor parallelism degree
    pipeline_parallel_size: 4  # Pipeline parallelism degree
    max_num_batched_tokens: 8192
    max_num_sequences: 256
    trust_remote_code: true
    dtype: "bf16"  # Use bfloat16 for better performance

  gpu_config:
    gpu_memory_utilization: 0.95
    max_num_gpus: 32
    enforce_eager: false
    disable_custom_all_reduce: false

  checkpoint:
    save_interval: 1000  # Save checkpoint every 1000 steps
    checkpoint_dir: "/path/to/checkpoints"
    keep_n_checkpoints: 3
    checkpoint_conversion:
      convert_from_hf: true
      precision: "bf16"

  scheduling:
    block_size: 16
    swap_space: 4  # GB per GPU for activation checkpointing
    max_context_length: 4096
    scheduler_queue_size: 2048

  parallelism_strategy:
    tp_configs:  # Tensor Parallel configurations
      - size: 1
        gpu_memory: "30GB"
        recommended_batch: 32
      - size: 2
        gpu_memory: "60GB"
        recommended_batch: 64
      - size: 4
        gpu_memory: "120GB"
        recommended_batch: 128
      - size: 8
        gpu_memory: "240GB"
        recommended_batch: 256

    pp_configs:  # Pipeline Parallel configurations
      - stages: 2
        min_gpus: 4
        memory_per_gpu: "40GB"
      - stages: 4
        min_gpus: 8
        memory_per_gpu: "40GB"
      - stages: 8
        min_gpus: 16
        memory_per_gpu: "40GB"

  optimization:
    kv_cache_config:
      block_size: 16
      num_blocks: 512
    activation_checkpointing:
      enabled: true
      granularity: "selective"  # Options: full, selective
      cpu_offload: false
    memory_efficient_linear: true

  monitoring:
    log_stats_interval: 10
    profile_step_start: 100
    profile_step_end: 200
    metrics:
      - gpu_memory_usage
      - throughput
      - latency
      - cuda_memory_allocated

Define HyperParameter Optimization

  1. SageMaker

  2. XGBoost Notebook for tuning

hyperparameters = {
        'num_class': 10,
        'max_depth': 5,
        'eta':0.2,
        'alpha': 0.2, 
        'objective':'multi:softmax',
        'eval_metric':'accuracy',
        'num_round':200,
        'early_stopping_rounds': 5}

# construct a SageMaker estimator that calls the XGBoost container
estimator = sagemaker.estimator.Estimator(
    image_uri=xgboost_container, 
    hyperparameters=hyperparameters,
    role=role,
    instance_count=1, 
    instance_type='ml.m5.large', 
    volume_size=5, # 5 GB 
    output_path=output_path
)

Reference for Configure Observability neptune / Weight&Baises

import os
import boto3
import wandb
import neptune
from sagemaker.estimator import Estimator
from sagemaker.session import Session
from sagemaker.processing import Processor

def setup_tracking(experiment_name, neptune_project, wandb_project):
    """Initialize Neptune.ai and W&B tracking"""

    # Neptune setup
    neptune_run = neptune.init_run(
        project=neptune_project,
        api_token=os.getenv("NEPTUNE_API_TOKEN")
    )

    # W&B setup
    wandb.init(
        project=wandb_project,
        name=experiment_name
    )

    return neptune_run

def create_sagemaker_estimator(role, instance_type='ml.m5.xlarge'):
    """Create SageMaker estimator with tracking configuration"""

    sm_session = Session()

    # Define custom environment variables for tracking
    env = {
        'WANDB_API_KEY': os.getenv('WANDB_API_KEY'),
        'NEPTUNE_API_TOKEN': os.getenv('NEPTUNE_API_TOKEN')
    }

    estimator = Estimator(
        role=role,
        instance_count=1,
        instance_type=instance_type,
        environment=env,
        sagemaker_session=sm_session
    )

    return estimator

def training_script():
    """Example training script with integrated tracking"""

    # Import required libraries
    import argparse
    import os
    import numpy as np
    from sklearn.metrics import accuracy_score

    # Initialize tracking
    neptune_run = neptune.init_run(
        project=os.getenv('NEPTUNE_PROJECT'),
        api_token=os.getenv('NEPTUNE_API_TOKEN')
    )

    wandb.init(
        project=os.getenv('WANDB_PROJECT')
    )

    def train():
        # Your model training code here
        for epoch in range(num_epochs):
            # Training loop
            train_loss = ...
            val_loss = ...
            accuracy = ...

            # Log metrics to both platforms
            neptune_run["train/loss"].append(train_loss)
            neptune_run["val/loss"].append(val_loss)
            neptune_run["val/accuracy"].append(accuracy)

            wandb.log({
                "train_loss": train_loss,
                "val_loss": val_loss,
                "accuracy": accuracy
            })

    if __name__ == "__main__":
        parser = argparse.ArgumentParser()
        # Add your training arguments
        args = parser.parse_args()

        # Start training
        train()

        # Close tracking
        neptune_run.stop()
        wandb.finish()

def monitor_training(job_name):
    """Monitor training job and sync metrics"""

    client = boto3.client('sagemaker')

    while True:
        status = client.describe_training_job(TrainingJobName=job_name)
        current_status = status['TrainingJobStatus']

        if current_status == 'Completed':
            print('Training job completed successfully')
            break
        elif current_status in ['Failed', 'Stopped']:
            print(f'Training job {current_status}')
            break

        # Get metrics and sync
        metrics = client.describe_training_job(
            TrainingJobName=job_name
        )['MetricData']

        # Log metrics to both platforms
        for metric in metrics:
            neptune_run[f"sagemaker/{metric['MetricName']}"].append(metric['Value'])
            wandb.log({f"sagemaker_{metric['MetricName']}": metric['Value']})

def main():
    # Set up environment variables
    os.environ['NEPTUNE_PROJECT'] = 'your-neptune-project'
    os.environ['WANDB_PROJECT'] = 'your-wandb-project'

    # Initialize tracking
    experiment_name = 'sagemaker-training-job'
    neptune_run = setup_tracking(
        experiment_name,
        os.getenv('NEPTUNE_PROJECT'),
        os.getenv('WANDB_PROJECT')
    )

    # Create and configure SageMaker estimator
    role = 'your-sagemaker-role-arn'
    estimator = create_sagemaker_estimator(role)

    # Start training job
    estimator.fit(
        wait=False,
        job_name=experiment_name
    )

    # Monitor training and sync metrics
    monitor_training(experiment_name)

    # Close tracking
    neptune_run.stop()
    wandb.finish()

if __name__ == "__main__":
    main()

Production

Deploy Inference EndPoint with API Gateway

see my article on Key considerations of going to production with SageMaker

Alternative: Deploying/Serving with vLLM on EKS with KubeAI

KubeAI is a Kubernetes Operator that enables you to deploy and manage AI models on Kubernetes. It provides a simple and scalable way to deploy vLLM in production.

Use this Tutorial to deploy vLLM to EKS.

3rd Party Extension: Neo4j, LangChain,OpenAI, VectorDb

Store your NEO4J, Langchain, OpenAI, and VectorDB in Secret Management

Centralized YAML for supporting NEO4J, LANGCHAIN_ENDPOINT Secrets:

global:
  env:
    - name: NEO4J_URI
      valueFrom:
        secretKeyRef:
          name: neo4j-external-secrets
          key: uri
    - name: NEO4J_USERNAME
      valueFrom:
        secretKeyRef:
          name: neo4j-external-secrets
          key: username
    - name: NEO4J_PASSWORD
      valueFrom:
        secretKeyRef:
          name: neo4j-external-secrets
          key: password
    - name: EMBEDDING_MODEL
      value: "text-embedding-ada-002"
    - name: OPENAI_API_KEY
      valueFrom:
        secretKeyRef:
          name: openai-external-secrets
          key: api-key
    - name: LANGCHAIN_ENDPOINT
      value: "https://api.langchain.com"
    - name: LANGCHAIN_TRACING_V2
      value: "true"
    - name: LANGCHAIN_PROJECT
      value: "default"
    - name: LANGCHAIN_API_KEY
      valueFrom:
        secretKeyRef:
          name: langchain-external-secrets
          key: api-key
    - name: HUGGINGFACE_API_KEY
      valueFrom:
        secretKeyRef:
          name: huggingface-external-secrets
          key: api-key
    - name: HUGGINGFACE_PASSWORD
      valueFrom:
        secretKeyRef:
          name: huggingface-external-secrets
          key: password
    - name: QDRANT_URL
      value: "https://qdrant.qdrant.svc.cluster.local"
    - name: QDRANT_API_KEY
      valueFrom:
        secretKeyRef:
          name: qdrant-external-secrets
          key: api-key

externalSecrets:
  neo4j:
    secretStore:
      name: aws-secretsmanager
      kind: SecretStore
    refreshInterval: 1h
    target:
      name: neo4j-external-secrets
      template:
        type: Opaque
        data:
          uri: "{{ .uri | toString }}"
          username: "{{ .username | toString }}"
          password: "{{ .password | toString }}"
    data:
      - secretKey: uri
        remoteRef:
          key: dev/llm/neo4j
          property: uri
          version: AWSCURRENT
          decodingStrategy: Base64
      - secretKey: username
        remoteRef:
          key: dev/llm/neo4j
          property: username
          version: AWSCURRENT
          decodingStrategy: Base64
      - secretKey: password
        remoteRef:
          key: dev/llm/neo4j
          property: password
          version: AWSCURRENT
          decodingStrategy: Base64
    dataFrom:
      - extract:
          key: dev/llm/neo4j
          version: AWSCURRENT

  openai:
    secretStore:
      name: aws-secretsmanager
      kind: SecretStore
    refreshInterval: 1h
    target:
      name: openai-external-secrets
      template:
        type: Opaque
        data:
          api-key: "{{ .apiKey | toString }}"
    dataFrom:
      - extract:
          key: dev/llm/openai
          version: AWSCURRENT

  langchain:
    secretStore:
      name: aws-secretsmanager
      kind: SecretStore
    refreshInterval: 1h
    target:
      name: langchain-external-secrets
      template:
        type: Opaque
        data:
          api-key: "{{ .apiKey | toString }}"
    dataFrom:
      - extract:
          key: dev/llm/langchain
          version: AWSCURRENT

  huggingface:
    secretStore:
      name: aws-secretsmanager
      kind: SecretStore
    refreshInterval: 1h
    target:
      name: huggingface-external-secrets
      template:
        type: Opaque
        data:
          api-key: "{{ .apiKey | toString }}"
          password: "{{ .password | toString }}"
    dataFrom:
      - extract:
          key: dev/llm/huggingface
          version: AWSCURRENT

  qdrant:
    secretStore:
      name: aws-secretsmanager
      kind: SecretStore
    refreshInterval: 1h
    target:
      name: qdrant-external-secrets
      template:
        type: Opaque
        data:
          api-key: "{{ .apiKey | toString }}"
    dataFrom:
      - extract:
          key: dev/llm/qdrant
          version: AWSCURRENT

secretStore:
  aws:
    name: aws-secretsmanager
    provider:
      aws:
        service: SecretsManager
        region: us-east-1
        auth:
          secretRef:
            accessKeyIDSecretRef:
              name: aws-secret-manager-credentials
              key: access-key-id
            secretAccessKeySecretRef:
              name: aws-secret-manager-credentials
              key: secret-access-key
        role: arn:aws:iam::123456789012:role/external-secrets-operator
        kmsKeyID: arn:aws:kms:us-east-1:123456789012:key/1234abcd-12ab-34cd-56ef-1234567890ab
        encryption:
          type: "aes"
          algorithm: "aes-256-gcm"