Files and Batch API

Manage S3-based files and batch inference jobs using the AWS Bedrock SDK (boto3) through FinOps across multiple providers.

Overview

FinOps supports AWS Bedrock's Files and Batch APIs with cross-provider routing. This means you can use boto3 to manage files and batch jobs across multiple providers including Bedrock, OpenAI, and Gemini.

For Bedrock SDK:

  • Files are managed through an S3-compatible API
  • Batches are managed through the Bedrock service API
  • Provider routing is done via the x-model-provider header

Anthropic Limitation: Anthropic does not support file upload via the S3-compatible API. For Anthropic batch operations, use the Anthropic SDK with inline requests instead.


Client Setup

Default Bedrock Client

import boto3

# S3 client for file operations
s3_client = boto3.client(
 "s3",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock/files",
)

# Bedrock client for batch operations
bedrock_client = boto3.client(
 "bedrock",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock",
)

Cross-Provider Client Setup

To route requests to different providers, add the x-model-provider header using boto3 events:

Bedrock Provider (Default)

import boto3

def add_bedrock_header(request, **kwargs):
 request.headers["x-model-provider"] = "bedrock"

# S3 client for Bedrock files
s3_client = boto3.client(
 "s3",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock/files",
)
s3_client.meta.events.register("before-send", add_bedrock_header)

# Bedrock client for batches
bedrock_client = boto3.client(
 "bedrock",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock",
)
bedrock_client.meta.events.register("before-send", add_bedrock_header)

OpenAI Provider

import boto3

def add_openai_header(request, **kwargs):
 request.headers["x-model-provider"] = "openai"

# S3 client for OpenAI files
s3_client = boto3.client(
 "s3",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock/files",
)
s3_client.meta.events.register("before-send", add_openai_header)

# Bedrock client for OpenAI batches
bedrock_client = boto3.client(
 "bedrock",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock",
)
bedrock_client.meta.events.register("before-send", add_openai_header)

Anthropic Provider

Anthropic does not support S3-based file uploads. For Anthropic operations, use the Anthropic SDK with inline batch requests instead.

import boto3

def add_anthropic_header(request, **kwargs):
 request.headers["x-model-provider"] = "anthropic"

# Note: File operations are NOT supported for Anthropic
# Use Anthropic SDK with inline requests instead

# Bedrock client for Anthropic (limited to non-batch operations)
bedrock_client = boto3.client(
 "bedrock",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock",
)
bedrock_client.meta.events.register("before-send", add_anthropic_header)

Gemini Provider

import boto3

def add_gemini_header(request, **kwargs):
 request.headers["x-model-provider"] = "gemini"

# S3 client for Gemini files
s3_client = boto3.client(
 "s3",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock/files",
)
s3_client.meta.events.register("before-send", add_gemini_header)

# Bedrock client for Gemini batches
bedrock_client = boto3.client(
 "bedrock",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock",
)
bedrock_client.meta.events.register("before-send", add_gemini_header)

Helper Function for Provider-Specific Clients

import boto3

def create_provider_header_handler(provider: str):
 """Create a header handler function for a specific provider"""
 def add_provider_header(request, **kwargs):
 request.headers["x-model-provider"] = provider
 return add_provider_header

def get_provider_s3_client(provider: str):
 """Create S3 client with x-model-provider header"""
 client = boto3.client(
 "s3",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock/files",
 )
 client.meta.events.register("before-send", create_provider_header_handler(provider))
 return client

def get_provider_bedrock_client(provider: str):
 """Create Bedrock batch client with x-model-provider header"""
 client = boto3.client(
 "bedrock",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock",
 )
 client.meta.events.register("before-send", create_provider_header_handler(provider))
 return client

Files API (S3-Compatible)

Files are managed through FinOps's S3-compatible endpoint.

Upload a File

Bedrock Provider

import boto3
import json
import time

def add_bedrock_header(request, **kwargs):
 request.headers["x-model-provider"] = "bedrock"

s3_client = boto3.client(
 "s3",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock/files",
)
s3_client.meta.events.register("before-send", add_bedrock_header)

# Create JSONL content for Bedrock batch format
def create_bedrock_batch_jsonl(model_id: str, num_requests: int = 2) -> str:
 lines = []
 for i in range(num_requests):
 record = {
 "recordId": f"request-{i+1}",
 "modelInput": {
 "messages": [
 {
 "role": "user",
 "content": [
 {"text": f"Hello, this is test message {i+1}. Say hi back briefly."}
 ],
 }
 ],
 "inferenceConfig": {"maxTokens": 100},
 },
 }
 lines.append(json.dumps(record))
 return "\n".join(lines)

# Create content
jsonl_content = create_bedrock_batch_jsonl("anthropic.claude-3-sonnet-20240229-v1:0")

# Upload to S3
s3_bucket = "your-s3-bucket"
s3_key = f"finops-batch-input/batch_input_{int(time.time)}.jsonl"

response = s3_client.put_object(
 Bucket=s3_bucket,
 Key=s3_key,
 Body=jsonl_content.encode,
 ContentType="application/jsonl",
)

# Extract file ID from ETag header
file_id = response.get("ETag", "").strip('"')
print(f"Uploaded file ID: {file_id}")
print(f"S3 URI: s3://{s3_bucket}/{s3_key}")

OpenAI Provider

No S3 configuration required. Files are stored in OpenAI's native storage. The bucket/key values are identifiers used by FinOps for routing.

import boto3
import json
import time

def add_openai_header(request, **kwargs):
 request.headers["x-model-provider"] = "openai"

s3_client = boto3.client(
 "s3",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock/files",
)
s3_client.meta.events.register("before-send", add_openai_header)

# Create JSONL content for OpenAI batch format
def create_openai_batch_jsonl(model_id: str, num_requests: int = 2) -> str:
 lines = []
 for i in range(num_requests):
 record = {
 "custom_id": f"request-{i+1}",
 "method": "POST",
 "url": "/v1/chat/completions",
 "body": {
 "model": model_id,
 "messages": [
 {"role": "user", "content": f"Hello, this is test message {i+1}. Say hi back briefly."}
 ],
 "max_tokens": 100,
 },
 }
 lines.append(json.dumps(record))
 return "\n".join(lines)

# Create content
jsonl_content = create_openai_batch_jsonl("gpt-4o-mini")

# Upload file (bucket/key are routing identifiers, not actual S3 paths)
response = s3_client.put_object(
 Bucket="openai-files",
 Key=f"batch_input_{int(time.time)}.jsonl",
 Body=jsonl_content.encode,
 ContentType="application/jsonl",
)

# Extract file ID from ETag header
file_id = response.get("ETag", "").strip('"')
print(f"Uploaded file ID: {file_id}")

Gemini Provider

No S3 configuration required. Files are stored in Google Cloud Storage. The bucket/key values are identifiers used by FinOps for routing.

import boto3
import json
import time

def add_gemini_header(request, **kwargs):
 request.headers["x-model-provider"] = "gemini"

s3_client = boto3.client(
 "s3",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock/files",
)
s3_client.meta.events.register("before-send", add_gemini_header)

# Create JSONL content for Gemini batch format
def create_gemini_batch_jsonl(model_id: str, num_requests: int = 2) -> str:
 lines = []
 for i in range(num_requests):
 record = {
 "request": {
 "contents": [
 {
 "role": "user",
 "parts": [
 {"text": f"Hello, this is test message {i+1}. Say hi back briefly."}
 ],
 }
 ],
 "generationConfig": {"maxOutputTokens": 100},
 },
 "metadata": {"key": f"request-{i+1}"},
 }
 lines.append(json.dumps(record))
 return "\n".join(lines)

# Create content
jsonl_content = create_gemini_batch_jsonl("gemini-1.5-flash")

# Upload file (bucket/key are routing identifiers, not actual S3 paths)
response = s3_client.put_object(
 Bucket="gemini-files",
 Key=f"batch_input_{int(time.time)}.jsonl",
 Body=jsonl_content.encode,
 ContentType="application/jsonl",
)

file_id = response.get("ETag", "").strip('"')
print(f"Uploaded file ID: {file_id}")

List Files

For OpenAI and Gemini, use any bucket name as an identifier-files are stored in the provider's native storage and listed by file ID.

import boto3

def add_provider_header(request, **kwargs):
 request.headers["x-model-provider"] = "bedrock"

s3_client = boto3.client(
 "s3",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock/files",
)
s3_client.meta.events.register("before-send", add_provider_header)

# List files (S3 bucket required for Bedrock only)
s3_bucket = "your-s3-bucket"
response = s3_client.list_objects_v2(
 Bucket=s3_bucket,
 Prefix="finops-batch-input/"
)

if "Contents" in response:
 for obj in response["Contents"]:
 print(f"Key: {obj['Key']}")
 print(f"Size: {obj['Size']} bytes")
 print(f"Last Modified: {obj['LastModified']}")
 print("---")

Retrieve File Metadata

import boto3

def add_provider_header(request, **kwargs):
 request.headers["x-model-provider"] = "bedrock"

s3_client = boto3.client(
 "s3",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock/files",
)
s3_client.meta.events.register("before-send", add_provider_header)

# Retrieve file metadata (HEAD request)
# For OpenAI/Gemini: use any bucket name, file_id from upload
s3_bucket = "your-s3-bucket"
s3_key = "finops-batch-input/batch_input.jsonl"
file_id = "abc123" # ETag from upload

response = s3_client.head_object(
 Bucket=s3_bucket,
 Key=s3_key,
 IfMatch=file_id
)

print(f"Content Length: {response['ContentLength']} bytes")
print(f"Content Type: {response['ContentType']}")
print(f"ETag (File ID): {response['ETag']}")

Delete a File

import boto3

def add_provider_header(request, **kwargs):
 request.headers["x-model-provider"] = "bedrock"

s3_client = boto3.client(
 "s3",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock/files",
)
s3_client.meta.events.register("before-send", add_provider_header)

# Delete file
# For OpenAI/Gemini: use any bucket name, file_id from upload
s3_bucket = "your-s3-bucket"
s3_key = "finops-batch-input/batch_input.jsonl"
file_id = "abc123"

s3_client.delete_object(
 Bucket=s3_bucket,
 Key=s3_key,
 IfMatch=file_id
)

print(f"Deleted file: {s3_key}")

Download File Content

File content download is only supported for Bedrock. For OpenAI and Gemini, use their native SDKs to download file content.

import boto3

def add_provider_header(request, **kwargs):
 request.headers["x-model-provider"] = "bedrock"

s3_client = boto3.client(
 "s3",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock/files",
)
s3_client.meta.events.register("before-send", add_provider_header)

# Download file content (Bedrock only)
s3_bucket = "your-s3-bucket"
s3_key = "finops-batch-input/batch_input.jsonl"
file_id = "abc123"

response = s3_client.get_object(
 Bucket=s3_bucket,
 Key=s3_key,
 IfMatch=file_id
)

content = response["Body"].read.decode("utf-8")
print(f"File content:\n{content}")

Batch API

The Bedrock Batch API uses create_model_invocation_job and related methods.

Create a Batch Job

Bedrock Provider

import boto3
import time

def add_bedrock_header(request, **kwargs):
 request.headers["x-model-provider"] = "bedrock"

bedrock_client = boto3.client(
 "bedrock",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock",
)
bedrock_client.meta.events.register("before-send", add_bedrock_header)

# Configuration
s3_bucket = "your-s3-bucket"
model_id = "anthropic.claude-3-sonnet-20240229-v1:0"

# Input/output URIs (file should already be uploaded)
input_uri = f"s3://{s3_bucket}/finops-batch-input/batch_input.jsonl"
output_uri = f"s3://{s3_bucket}/finops-batch-output/"

# Create batch job
response = bedrock_client.create_model_invocation_job(
 jobName=f"finops-batch-{int(time.time)}",
 modelId=model_id,
 inputDataConfig={
 "s3InputDataConfig": {
 "s3Uri": input_uri,
 "s3InputFormat": "JSONL"
 }
 },
 outputDataConfig={
 "s3OutputDataConfig": {
 "s3Uri": output_uri
 }
 },
 tags=[
 {"key": "endpoint", "value": "/v1/chat/completions"},
 {"key": "source", "value": "finops-docs"},
 ],
)

job_arn = response["jobArn"]
print(f"Created batch job: {job_arn}")

OpenAI Provider

No S3 or IAM configuration required. Files are stored in OpenAI's native storage. The S3 URIs are routing identifiers used by FinOps.

import boto3
import time

def add_openai_header(request, **kwargs):
 request.headers["x-model-provider"] = "openai"

bedrock_client = boto3.client(
 "bedrock",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock",
)
bedrock_client.meta.events.register("before-send", add_openai_header)

# Configuration (no S3 bucket or IAM role needed)
model_id = "gpt-4o-mini"
file_id = "file-abc123" # File ID from upload step

# Create batch job
response = bedrock_client.create_model_invocation_job(
 jobName=f"openai-batch-{int(time.time)}",
 modelId=model_id,
 roleArn="not-required-for-openai",
 inputDataConfig={
 "s3InputDataConfig": {
 "s3Uri": f"s3://openai-files/{file_id}", # Routing identifier
 "s3InputFormat": "JSONL"
 }
 },
 outputDataConfig={
 "s3OutputDataConfig": {
 "s3Uri": "s3://openai-output/"
 }
 },
 tags=[
 {"key": "endpoint", "value": "/v1/chat/completions"},
 {"key": "file_id", "value": file_id},
 ],
)

job_arn = response["jobArn"]
print(f"Created OpenAI batch job: {job_arn}")

Gemini Provider

No S3 or IAM configuration required. Files are stored in Google Cloud Storage. The S3 URIs are routing identifiers used by FinOps.

import boto3
import time

def add_gemini_header(request, **kwargs):
 request.headers["x-model-provider"] = "gemini"

bedrock_client = boto3.client(
 "bedrock",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock",
)
bedrock_client.meta.events.register("before-send", add_gemini_header)

# Configuration (no S3 bucket or IAM role needed)
model_id = "gemini-1.5-flash"
file_id = "file-xyz789" # File ID from upload step

# Create batch job
response = bedrock_client.create_model_invocation_job(
 jobName=f"gemini-batch-{int(time.time)}",
 modelId=model_id,
 roleArn="not-required-for-gemini",
 inputDataConfig={
 "s3InputDataConfig": {
 "s3Uri": f"s3://gemini-files/{file_id}", # Routing identifier
 "s3InputFormat": "JSONL"
 }
 },
 outputDataConfig={
 "s3OutputDataConfig": {
 "s3Uri": "s3://gemini-output/"
 }
 },
)

job_arn = response["jobArn"]
print(f"Created Gemini batch job: {job_arn}")

Anthropic Note: Anthropic prefers inline batch requests rather than file-based batching. When targeting Anthropic from the Bedrock SDK, consider using the Anthropic SDK directly for better compatibility.

List Batch Jobs

import boto3

def add_provider_header(request, **kwargs):
 request.headers["x-model-provider"] = "bedrock" # or "gemini"

bedrock_client = boto3.client(
 "bedrock",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock",
)
bedrock_client.meta.events.register("before-send", add_provider_header)

# List batch jobs
response = bedrock_client.list_model_invocation_jobs(maxResults=10)

if "invocationJobSummaries" in response:
 for job in response["invocationJobSummaries"]:
 print(f"Job ARN: {job['jobArn']}")
 print(f"Job Name: {job['jobName']}")
 print(f"Status: {job['status']}")
 print(f"Model ID: {job.get('modelId', 'N/A')}")
 print("---")

Retrieve Batch Job Status

import boto3

def add_provider_header(request, **kwargs):
 request.headers["x-model-provider"] = "bedrock"

bedrock_client = boto3.client(
 "bedrock",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock",
)
bedrock_client.meta.events.register("before-send", add_provider_header)

# Get batch job status
job_arn = "arn:aws:bedrock:us-west-2:123456789:model-invocation-job/abc123"

response = bedrock_client.get_model_invocation_job(jobIdentifier=job_arn)

print(f"Job ARN: {response['jobArn']}")
print(f"Job Name: {response['jobName']}")
print(f"Status: {response['status']}")
print(f"Model ID: {response['modelId']}")

if response["status"] == "Completed" and "statistics" in response:
 stats = response["statistics"]
 print(f"Total Records: {stats.get('totalRecordCount', 'N/A')}")
 print(f"Successful: {stats.get('successfulRecordCount', 'N/A')}")
 print(f"Failed: {stats.get('failedRecordCount', 'N/A')}")

Stop a Batch Job

import boto3

def add_provider_header(request, **kwargs):
 request.headers["x-model-provider"] = "bedrock"

bedrock_client = boto3.client(
 "bedrock",
 region_name="us-west-2",
 endpoint_url="{AI_GATEWAY_URL}/bedrock",
)
bedrock_client.meta.events.register("before-send", add_provider_header)

# Stop batch job
job_arn = "arn:aws:bedrock:us-west-2:123456789:model-invocation-job/abc123"

bedrock_client.stop_model_invocation_job(jobIdentifier=job_arn)

print(f"Stopped job: {job_arn}")

End-to-End Batch Workflow

Bedrock Provider

import boto3
import json
import time

# Configuration
region = "us-west-2"
s3_bucket = "your-s3-bucket"
model_id = "anthropic.claude-3-sonnet-20240229-v1:0"
provider = "bedrock"

# Provider header handler
def add_provider_header(request, **kwargs):
 request.headers["x-model-provider"] = provider

# Setup clients
s3_client = boto3.client(
 "s3",
 region_name=region,
 endpoint_url="{AI_GATEWAY_URL}/bedrock/files",
)
s3_client.meta.events.register("before-send", add_provider_header)

bedrock_client = boto3.client(
 "bedrock",
 region_name=region,
 endpoint_url="{AI_GATEWAY_URL}/bedrock",
)
bedrock_client.meta.events.register("before-send", add_provider_header)

# Step 1: Create JSONL content
print("Step 1: Creating batch input file...")

def create_batch_jsonl(num_requests: int) -> str:
 lines = []
 for i in range(num_requests):
 record = {
 "recordId": f"request-{i+1}",
 "modelInput": {
 "messages": [
 {
 "role": "user",
 "content": [{"text": f"What is {i+1} + {i+1}? Answer briefly."}],
 }
 ],
 "inferenceConfig": {"maxTokens": 100},
 },
 }
 lines.append(json.dumps(record))
 return "\n".join(lines)

jsonl_content = create_batch_jsonl(num_requests=3)

# Step 2: Upload input file to S3
print("Step 2: Uploading input file to S3...")
timestamp = int(time.time)
s3_key = f"finops-batch-input/batch_{timestamp}.jsonl"

upload_response = s3_client.put_object(
 Bucket=s3_bucket,
 Key=s3_key,
 Body=jsonl_content.encode,
 ContentType="application/jsonl",
)
file_id = upload_response.get("ETag", "").strip('"')
input_uri = f"s3://{s3_bucket}/{s3_key}"
print(f" Uploaded: {input_uri}")

# Step 3: Create batch job
print("Step 3: Creating batch job...")
output_uri = f"s3://{s3_bucket}/finops-batch-output/"

job_response = bedrock_client.create_model_invocation_job(
 jobName=f"finops-e2e-{timestamp}",
 modelId=model_id,
 inputDataConfig={
 "s3InputDataConfig": {"s3Uri": input_uri, "s3InputFormat": "JSONL"}
 },
 outputDataConfig={
 "s3OutputDataConfig": {"s3Uri": output_uri}
 },
 tags=[
 {"key": "endpoint", "value": "/v1/chat/completions"},
 {"key": "file_id", "value": file_id},
 ],
)
job_arn = job_response["jobArn"]
print(f" Created job: {job_arn}")

# Step 4: Poll for completion
print("Step 4: Polling job status...")
for i in range(20):
 status_response = bedrock_client.get_model_invocation_job(jobIdentifier=job_arn)
 status = status_response["status"]
 print(f" Poll {i+1}: status = {status}")
 
 if status in ["Completed", "Failed", "Stopped"]:
 print(f" Job reached terminal state: {status}")
 if status == "Completed" and "statistics" in status_response:
 stats = status_response["statistics"]
 print(f" Total: {stats.get('totalRecordCount')}")
 print(f" Successful: {stats.get('successfulRecordCount')}")
 print(f" Failed: {stats.get('failedRecordCount')}")
 break
 
 time.sleep(10)

# Step 5: Verify job is in list
print("Step 5: Verifying job in list...")
list_response = bedrock_client.list_model_invocation_jobs(maxResults=20)
job_arns = [job["jobArn"] for job in list_response.get("invocationJobSummaries", [])]
assert job_arn in job_arns, f"Job {job_arn} should be in list"
print(f" Verified job is in list")

print(f"\nSuccess! Batch workflow completed for job {job_arn}")

OpenAI Provider

No S3 configuration required. Files are stored in OpenAI's native storage. The bucket/key values are routing identifiers used by FinOps.

import boto3
import json
import time

# Configuration (no S3 bucket needed for OpenAI)
region = "us-west-2"
model_id = "gpt-4o-mini"
provider = "openai"

# Provider header handler
def add_provider_header(request, **kwargs):
 request.headers["x-model-provider"] = provider

# Setup clients
s3_client = boto3.client(
 "s3",
 region_name=region,
 endpoint_url="{AI_GATEWAY_URL}/bedrock/files",
)
s3_client.meta.events.register("before-send", add_provider_header)

bedrock_client = boto3.client(
 "bedrock",
 region_name=region,
 endpoint_url="{AI_GATEWAY_URL}/bedrock",
)
bedrock_client.meta.events.register("before-send", add_provider_header)

# Step 1: Create OpenAI JSONL content
print("Step 1: Creating OpenAI batch input file...")

def create_openai_jsonl(num_requests: int) -> str:
 lines = []
 for i in range(num_requests):
 record = {
 "custom_id": f"request-{i+1}",
 "method": "POST",
 "url": "/v1/chat/completions",
 "body": {
 "model": model_id,
 "messages": [
 {"role": "user", "content": f"What is {i+1} + {i+1}? Answer briefly."}
 ],
 "max_tokens": 100,
 },
 }
 lines.append(json.dumps(record))
 return "\n".join(lines)

jsonl_content = create_openai_jsonl(num_requests=3)

# Step 2: Upload input file (bucket/key are routing identifiers)
print("Step 2: Uploading input file...")
timestamp = int(time.time)

upload_response = s3_client.put_object(
 Bucket="openai-files", # Routing identifier, not actual S3
 Key=f"batch_{timestamp}.jsonl",
 Body=jsonl_content.encode,
 ContentType="application/jsonl",
)
file_id = upload_response.get("ETag", "").strip('"')
print(f" Uploaded file ID: {file_id}")

# Step 3: Create batch job
print("Step 3: Creating OpenAI batch job...")

job_response = bedrock_client.create_model_invocation_job(
 jobName=f"openai-e2e-{timestamp}",
 modelId=model_id,
 roleArn="not-required-for-openai", # Not used for OpenAI
 inputDataConfig={
 "s3InputDataConfig": {"s3Uri": f"s3://openai-files/{file_id}", "s3InputFormat": "JSONL"}
 },
 outputDataConfig={
 "s3OutputDataConfig": {"s3Uri": "s3://openai-output/"}
 },
 tags=[
 {"key": "endpoint", "value": "/v1/chat/completions"},
 {"key": "file_id", "value": file_id},
 ],
)
job_arn = job_response["jobArn"]
print(f" Created job: {job_arn}")

# Step 4: Poll for completion
print("Step 4: Polling job status...")
for i in range(20):
 status_response = bedrock_client.get_model_invocation_job(jobIdentifier=job_arn)
 status = status_response["status"]
 print(f" Poll {i+1}: status = {status}")

 if status in ["Completed", "Failed", "Stopped"]:
 print(f" Job reached terminal state: {status}")
 break

 time.sleep(10)

print(f"\nSuccess! OpenAI batch workflow completed for job {job_arn}")

Gemini Provider

No S3 configuration required. Files are stored in Google Cloud Storage. The bucket/key values are routing identifiers used by FinOps.

import boto3
import json
import time

# Configuration (no S3 bucket needed for Gemini)
region = "us-west-2"
model_id = "gemini-1.5-flash"
provider = "gemini"

# Provider header handler
def add_provider_header(request, **kwargs):
 request.headers["x-model-provider"] = provider

# Setup clients
s3_client = boto3.client(
 "s3",
 region_name=region,
 endpoint_url="{AI_GATEWAY_URL}/bedrock/files",
)
s3_client.meta.events.register("before-send", add_provider_header)

bedrock_client = boto3.client(
 "bedrock",
 region_name=region,
 endpoint_url="{AI_GATEWAY_URL}/bedrock",
)
bedrock_client.meta.events.register("before-send", add_provider_header)

# Step 1: Create Gemini JSONL content
print("Step 1: Creating Gemini batch input file...")

def create_gemini_jsonl(num_requests: int) -> str:
 lines = []
 for i in range(num_requests):
 record = {
 "request": {
 "contents": [
 {
 "role": "user",
 "parts": [{"text": f"What is {i+1} + {i+1}? Answer briefly."}],
 }
 ],
 "generationConfig": {"maxOutputTokens": 100},
 },
 "metadata": {"key": f"request-{i+1}"},
 }
 lines.append(json.dumps(record))
 return "\n".join(lines)

jsonl_content = create_gemini_jsonl(num_requests=3)

# Step 2: Upload input file (bucket/key are routing identifiers)
print("Step 2: Uploading input file...")
timestamp = int(time.time)

upload_response = s3_client.put_object(
 Bucket="gemini-files", # Routing identifier, not actual S3
 Key=f"batch_{timestamp}.jsonl",
 Body=jsonl_content.encode,
 ContentType="application/jsonl",
)
file_id = upload_response.get("ETag", "").strip('"')
print(f" Uploaded file ID: {file_id}")

# Step 3: Create batch job
print("Step 3: Creating Gemini batch job...")

job_response = bedrock_client.create_model_invocation_job(
 jobName=f"gemini-e2e-{timestamp}",
 modelId=model_id,
 roleArn="not-required-for-gemini", # Not used for Gemini
 inputDataConfig={
 "s3InputDataConfig": {"s3Uri": f"s3://gemini-files/{file_id}", "s3InputFormat": "JSONL"}
 },
 outputDataConfig={
 "s3OutputDataConfig": {"s3Uri": "s3://gemini-output/"}
 },
)
job_arn = job_response["jobArn"]
print(f" Created job: {job_arn}")

# Step 4: Poll for completion (same as Bedrock)
# ... (same polling logic as above)

print(f"\nSuccess! Gemini batch workflow completed.")

JSONL Format Reference

Bedrock Format

{"recordId": "request-1", "modelInput": {"messages": [{"role": "user", "content": [{"text": "Hello!"}]}], "inferenceConfig": {"maxTokens": 100}}}

OpenAI Format

{"custom_id": "request-1", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "gpt-4o-mini", "messages": [{"role": "user", "content": "Hello!"}], "max_tokens": 100}}

Gemini Format

{"request": {"contents": [{"role": "user", "parts": [{"text": "Hello!"}]}], "generationConfig": {"maxOutputTokens": 100}}, "metadata": {"key": "request-1"}}

Provider-Specific Notes

ProviderHeader ValueFile StorageS3 Config Required
BedrockbedrockAWS S3✅ Yes
OpenAIopenaiOpenAI storage❌ No
GeminigeminiGoogle Cloud Storage❌ No
AnthropicanthropicN/AN/A

Bedrock Provider: Requires S3 bucket configuration. The IAM role for batch operations is configured server-side in FinOps. You can use Anthropic models deployed on Bedrock for batch and files APIs.

OpenAI & Gemini Providers: No AWS infrastructure needed. Files are stored in the provider's native storage. The S3 bucket/key values in the examples are routing identifiers used by FinOps.

Anthropic Provider: Does not support S3-based file uploads. Use the Anthropic SDK with inline batch requests instead.


Next Steps