import os
from typing import Any, Tuple, List
from pydantic import BaseModel
from orkestra import Orkestra, LLMProvider
from openai import OpenAI
from pinecone import Pinecone
# Config
ORKESTRA_API_KEY = os.getenv("ORKESTRA_API_KEY", "YOUR_ORKESTRA_API_KEY")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY")
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY", "YOUR_PINECONE_API_KEY")
PINECONE_INDEX_NAME = os.getenv("PINECONE_INDEX_NAME", "business-knowledge-index")
# Clients
client = Orkestra(api_key=ORKESTRA_API_KEY)
embed_client = OpenAI(api_key=OPENAI_API_KEY)
pc = Pinecone(api_key=PINECONE_API_KEY)
# Helpers: embeddings + Pinecone search
def get_embedding(text: str, model: str = "text-embedding-3-small") -> List[float]:
text = text.replace("\n", " ")
return embed_client.embeddings.create(input=[text], model=model).data[0].embedding
def retrieve_business_knowledge(query: str, top_k: int = 3) -> List[str]:
index = pc.Index(PINECONE_INDEX_NAME)
emb = get_embedding(query)
res = index.query(vector=emb, top_k=top_k, include_metadata=True)
return [m.metadata.get("text", "") for m in res.matches if getattr(m, "metadata", None)]
# I/O models
class PreflightOut(BaseModel):
ambiguous: bool
refined_requirements: str
message_to_user: str
class AnalysisReport(BaseModel):
classification: str
risk_score: float
rationale: str
# Agents
preflight = client.Agent(
name="Preflight",
description="Checks ambiguity and refines user requirements.",
model_provider=LLMProvider.OPENAI,
model_name="gpt-4o",
api_secret=OPENAI_API_KEY,
)
passthrough = client.Agent(
name="Passthrough",
description="Passes input through to the next step.",
model_provider=LLMProvider.OPENAI,
model_name="gpt-3.5-turbo", # cheap model is fine
api_secret=OPENAI_API_KEY,
)
analyzer = client.Agent(
name="Fraud Analyzer",
description="Classifies and analyzes potential fraud cases.",
model_provider=LLMProvider.OPENAI,
model_name="gpt-4o",
api_secret=OPENAI_API_KEY,
)
# Handler: retrieval after Preflight
def preflight_handler(output: PreflightOut) -> Tuple[str, Any]:
if output.ambiguous:
return ("STOP", None)
return ("CONTINUE", output.refined_requirements)
# Handler: retrieval and augmentation
def retrieval_and_augment_handler(refined_requirements: str) -> Tuple[str, Any]:
docs = retrieve_business_knowledge(refined_requirements)
context = "\n- ".join([d for d in docs if d]) or "(no context found)"
augmented_prompt = f"""
Based ONLY on the following context, please perform a fraud analysis for the user's request.
Business Knowledge Context:
- {context}
User's Refined Request:
{refined_requirements}
"""
return ("CONTINUE", augmented_prompt)
# Build workflow
wf = (
client.Workflow()
.add(preflight, response_model=PreflightOut, handler=preflight_handler)
.add(passthrough, handler=retrieval_and_augment_handler)
.add(analyzer)
)
# Run
user_prompt = """
Task: Triage the user's fraud analysis request.
- ambiguous=true ONLY if essential details are missing or contradictory AND no reasonable defaults would allow progress.
- ambiguous=false if you can proceed by applying reasonable defaults. Include those assumptions in refined_requirements.
Return fields: ambiguous, refined_requirements, message_to_user
User Requirements: Detect suspicious refund patterns for merchants in EU region.
"""
result = wf.run(user_prompt, response_model=AnalysisReport)
print(result)