AI Agents Development
קורס פרמיום מקיף לבניית סוכני AI מקצועיים בפייתון. מ-Async Python ועד LangGraph ו-FastAPI — כל מה שצריך כדי לפרוס Agents לפרודקשן.
Python לAI Developers
לפני שמתחילים לבנות Agents, חייבים לשלוט בכלים הפייתוניים שעליהם כל מסגרת AI בנויה. מודול זה מכסה את ארבעת הנושאים הקריטיים: תכנות אסינכרוני, עבודה עם HTTP APIs, מודלים עם Pydantic, וניהול סודות. אם כבר יש לך ניסיון ב-Python — הדגש כאן הוא על הדפוסים הספציפיים לעבודה עם LLMs.
Async/Await ו-asyncio בPython
תכנות אסינכרוני הוא אחד ההבדלים הגדולים בין קוד AI בינוני לקוד AI מקצועי. כשסוכן AI שלנו צריך לקרוא ל-5 APIs שונים, לחכות לתשובות מ-LLM ולעדכן מסד נתונים — כל זה בו-זמנית — asyncio הוא הכלי שמאפשר זאת ביעילות.
Sync לעומת Async — ההבדל המעשי
בגישה הסינכרונית, הקוד מחכה לכל בקשת רשת לפני שממשיך לבקשה הבאה. בגישה האסינכרונית, כל הבקשות נשלחות בו-זמנית והתוצאות נאספות יחד. ההבדל בביצועים הוא דרמטי — במיוחד עם LLMs שלהם latency גבוה.
import asyncio
import httpx
from typing import List
# דרך ישנה (סינכרונית) — איטית
def fetch_sync(urls: List[str]):
results = []
with httpx.Client() as client:
for url in urls:
response = client.get(url)
results.append(response.json())
return results
# דרך חדשה (אסינכרונית) — מהירה פי 10
async def fetch_async(urls: List[str]):
async with httpx.AsyncClient() as client:
tasks = [client.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return [r.json() for r in responses]
# הרצה:
# sync: לוקח ~5 שניות ל-5 URLs
# async: לוקח ~1 שנייה ל-5 URLs
Coroutines ו-Event Loop
כל פונקציה שמוגדרת עם async def היא coroutine. היא לא מורצת מיד אלא מחזירה אובייקט שניתן ל-await. ה-event loop מנהל את ביצוע כל ה-coroutines — הוא קופץ בין משימות כאשר אחת מהן ממתינה לקלט/פלט.
gather לעומת wait — מתי להשתמש בכל אחד
asyncio.gather() מריץ כל ה-coroutines במקביל ומחזיר רשימה מסודרת של תוצאות. אם אחד נכשל — כולם נכשלים כברירת מחדל. asyncio.wait() מאפשר שליטה עדינה יותר — ניתן לאסוף תוצאות לפי סדר השלמה ולהחליט מה לעשות עם כשלונות.
טיפול בשגיאות בקוד אסינכרוני
async def safe_fetch(url: str) -> dict | None:
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url)
response.raise_for_status()
return response.json()
except httpx.TimeoutException:
print(f"Timeout: {url}")
return None
except httpx.HTTPStatusError as e:
print(f"HTTP {e.response.status_code}: {url}")
return None
# gather עם return_exceptions=True — לא קורס בשגיאה אחת
results = await asyncio.gather(
*[safe_fetch(url) for url in urls],
return_exceptions=True
)
# סנן תוצאות תקינות
valid = [r for r in results if r and not isinstance(r, Exception)]
טיפ מהנסיון: תמיד הגדר timeout ל-AsyncClient כשעובדים עם LLMs. בקשות לOpenAI יכולות לקחת עד 120 שניות לתגובות ארוכות — בלי timeout, הקוד שלך ייתקע לנצח בלי שגיאה ברורה.
HTTP Clients לAI APIs
ספריות ה-SDK של OpenAI ו-Anthropic נוחות, אבל להבין כיצד לתקשר עם LLMs ישירות דרך HTTP — כולל streaming — הוא כישור קריטי. זה מאפשר לך להוסיף retry logic, לשמור logs, לנהל rate limits ולחבר כל ספק שרוצה.
Streaming Response מOpenAI
import httpx
import asyncio
import json
from typing import AsyncGenerator
OPENAI_API_KEY = "sk-..." # מה-.env בפועל
# Streaming response מOpenAI
async def stream_openai(prompt: str) -> AsyncGenerator[str, None]:
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
json={
"model": "gpt-4o",
"messages": [{"role": "user", "content": prompt}],
"stream": True
},
timeout=60
) as response:
async for line in response.aiter_lines():
if line.startswith("data: ") and line != "data: [DONE]":
chunk = json.loads(line[6:])
if content := chunk["choices"][0]["delta"].get("content"):
yield content
# שימוש:
async def main():
async for token in stream_openai("ספר לי סיפור קצר"):
print(token, end="", flush=True)
Retry Logic אוטומטי
שגיאות rate-limit (429) ושגיאות רשת זמניות הן חלק מהיומיום של עבודה עם LLMs. הפתרון הנכון הוא Exponential Backoff — להמתין עוד ועוד זמן בין כל ניסיון חוזר.
import asyncio
import httpx
async def robust_llm_call(payload: dict, max_retries: int = 3) -> dict:
delay = 1.0
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
json=payload
)
if resp.status_code == 429: # Rate limited
retry_after = float(resp.headers.get("retry-after", delay))
await asyncio.sleep(retry_after)
delay *= 2
continue
resp.raise_for_status()
return resp.json()
except httpx.ConnectError:
if attempt == max_retries - 1:
raise
await asyncio.sleep(delay)
delay *= 2
raise RuntimeError("כל הניסיונות כשלו")
שימוש ב-SDK הרשמי: בפרויקטים אמיתיים, כדאי להשתמש ב-openai הרשמי — הוא כולל retry logic מובנה. הקוד הנ"ל חשוב להבנה של מה קורה מאחורי הקלעים.
Pydantic Models לAI Applications
Pydantic היא אחת הספריות החשובות ביותר לפיתוח AI. היא מאפשרת לנו להגדיר את מבנה הנתונים של הסוכן — הודעות, תצורות, תגובות — ולוודא שהכל תקין בזמן ריצה. LangChain, FastAPI ו-OpenAI SDK כולם נבנו על Pydantic.
from pydantic import BaseModel, Field, validator
from typing import Optional, List
from enum import Enum
from datetime import datetime
class MessageRole(str, Enum):
SYSTEM = "system"
USER = "user"
ASSISTANT = "assistant"
class Message(BaseModel):
role: MessageRole
content: str
timestamp: datetime = Field(default_factory=datetime.now)
@validator('content')
def content_not_empty(cls, v):
if not v.strip():
raise ValueError('Content cannot be empty')
return v.strip()
class ConversationConfig(BaseModel):
model: str = "gpt-4o"
temperature: float = Field(0.7, ge=0.0, le=2.0)
max_tokens: int = Field(1024, gt=0, le=128000)
system_prompt: Optional[str] = None
memory_window: int = Field(10, ge=1, le=100)
class AgentResponse(BaseModel):
content: str
tool_calls: List[dict] = []
tokens_used: int
cost_usd: float
duration_ms: int
Structured Output — LLMs שמחזירים JSON
אחד השימושים החזקים ביותר של Pydantic עם LLMs הוא לאלץ את המודל להחזיר JSON שתואם מבנה מוגדר מראש. OpenAI תומך בזה דרך response_format.
from pydantic import BaseModel
from openai import AsyncOpenAI
class TaskBreakdown(BaseModel):
title: str
subtasks: list[str]
estimated_hours: float
complexity: str # "low" | "medium" | "high"
tools_needed: list[str]
client = AsyncOpenAI()
async def analyze_task(task_description: str) -> TaskBreakdown:
response = await client.beta.chat.completions.parse(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Analyze tasks and break them into subtasks."},
{"role": "user", "content": task_description}
],
response_format=TaskBreakdown
)
return response.choices[0].message.parsed
# שימוש:
# result = await analyze_task("בנה chatbot עם זיכרון לאתר")
# result.subtasks -> ["הגדר Pydantic models", "בנה FastAPI endpoint", ...]
Pydantic בודק טיפוסי נתונים ומגבלות בעת יצירת אובייקט — לא בזמן ריצה לאחר מכן.
כל מודל Pydantic יכול לייצר JSON Schema שניתן לשלוח ישירות לOpenAI כ-tool definition.
IDE מבין את סוגי השדות ומציע autocomplete — פחות bugs, יותר מהירות פיתוח.
Environment Variables ו-Secrets Management
ניהול סודות הוא אחת הטעויות הנפוצות ביותר בקרב מפתחים חדשים. מפתח API שנחשף ב-GitHub יכול לגרום לחיובים של אלפי דולרים. הדפוס הנכון הוא שימוש ב-pydantic-settings עם קבצי .env.
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
# AI APIs
openai_api_key: str
anthropic_api_key: str = ""
# Database
database_url: str = "sqlite:///./app.db"
redis_url: str = "redis://localhost:6379"
# App
debug: bool = False
secret_key: str
allowed_origins: list[str] = ["http://localhost:3000"]
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
@lru_cache() # Singleton pattern — יוצר פעם אחת
def get_settings() -> Settings:
return Settings()
# קובץ .env לדוגמה:
# OPENAI_API_KEY=sk-...
# SECRET_KEY=your-secret-key-here
# DEBUG=false
חוקי הזהב לניהול סודות
תמיד הוסף .env ל-.gitignore לפני הכל — לפני git init.
צור קובץ .env.example עם מפתחות ריקים — כך אחרים יודעים אילו משתנים נדרשים.
בפרודקשן — השתמש ב-secrets manager (AWS Secrets Manager, Doppler, HashiCorp Vault) ולא ב-.env.
הגדר Spending Limits ב-OpenAI dashboard — תמיד, ללא יוצא מן הכלל.
Async Web Scraper עם AI Summary
הפרויקט הראשון מאחד את כל מה שלמדנו: scraping אסינכרוני, קריאה לOpenAI, ו-Pydantic structured output. התוצאה — כלי שסורק מספר URLs בו-זמנית ומחזיר סיכומים מובנים בעברית.
import asyncio
import httpx
from bs4 import BeautifulSoup
from openai import AsyncOpenAI
from pydantic import BaseModel
client = AsyncOpenAI()
class ArticleSummary(BaseModel):
title: str
url: str
summary: str
key_points: list[str]
category: str
async def scrape_url(url: str) -> str:
async with httpx.AsyncClient() as http:
response = await http.get(url, timeout=10,
headers={"User-Agent": "Mozilla/5.0"})
soup = BeautifulSoup(response.text, 'html.parser')
# שלוף טקסט ראשי
for tag in soup(['script', 'style', 'nav', 'footer']):
tag.decompose()
return soup.get_text(separator='\n', strip=True)[:3000]
async def summarize_article(url: str, text: str) -> ArticleSummary:
response = await client.beta.chat.completions.parse(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Summarize articles in Hebrew. Extract key points."},
{"role": "user", "content": f"URL: {url}\n\nContent:\n{text}"}
],
response_format=ArticleSummary
)
result = response.choices[0].message.parsed
result.url = url
return result
async def main():
urls = [
"https://techcrunch.com/latest/",
"https://www.theverge.com/ai-artificial-intelligence",
]
# Scrape במקביל
texts = await asyncio.gather(*[scrape_url(url) for url in urls])
# Summarize במקביל
summaries = await asyncio.gather(*[
summarize_article(url, text)
for url, text in zip(urls, texts)
])
for summary in summaries:
print(f"כותרת: {summary.title}")
print(f"סיכום: {summary.summary}")
print(f"נקודות מפתח: {', '.join(summary.key_points[:3])}")
print()
asyncio.run(main())
- checkasyncio.gather לביצוע מקבילי
- checkBeautifulSoup לניקוי HTML
- checkStructured Output עם Pydantic
- checkניהול תקציב טוקנים
- arrow_backהוסף שמירה לקובץ JSON
- arrow_backהוסף rate limiting
- arrow_backשלח תוצאות ב-Telegram
- arrow_backתזמן עם APScheduler
LangChain Deep Dive
LangChain היא הפריימוורק המרכזי לבניית אפליקציות LLM. במודול זה נעבור מעבר לדוגמאות בסיסיות ונבנה pipelines מקצועיים באמצעות LCEL, נסקור כל סוגי הזיכרון, ונבנה RAG pipeline מלא מהטמעת מסמכים ועד לצ'אטבוט מוכן לשימוש.
LCEL — LangChain Expression Language
LCEL הוא ה-API המודרני של LangChain לבניית chains. במקום לבנות אובייקטים מסורבלים, משתמשים באופרטור ה-pipe (|) לחיבור רכיבים — בדיוק כמו ב-Unix shell. כל chain ב-LCEL היא גם אסינכרונית, גם parallelizable וגם streamable מחוץ לקופסה.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# Chain פשוטה — LCEL syntax
prompt = ChatPromptTemplate.from_messages([
("system", "אתה עוזר AI מקצועי שעונה בעברית."),
("human", "{question}")
])
chain = prompt | llm | StrOutputParser()
# הרצה:
result = chain.invoke({"question": "מה ההבדל בין AI לML?"})
# Chain מורכבת עם parallel branches
from langchain_core.runnables import RunnableParallel
map_chain = RunnableParallel(
summary=prompt | llm | StrOutputParser(),
sentiment=ChatPromptTemplate.from_template(
"Rate the sentiment of: {question}. Return JSON: {{score: 1-10}}"
) | llm | JsonOutputParser()
)
result = map_chain.invoke({"question": "המוצר שלכם נהדר!"})
# → {"summary": "...", "sentiment": {"score": 9}}
Streaming עם LCEL
async def stream_response(question: str):
async for chunk in chain.astream({"question": question}):
print(chunk, end="", flush=True)
# Batch processing — מאות שאלות במקביל
questions = [{"question": q} for q in ["שאלה 1", "שאלה 2", "שאלה 3"]]
results = await chain.abatch(questions, config={"max_concurrency": 5})
LCEL לעומת Legacy Chains: אם אתה רואה קוד עם LLMChain או ConversationChain — זהו קוד ישן. LangChain הצהירה שכל ה-legacy chains יוסרו. תמיד השתמש ב-LCEL.
Memory Types — זיכרון לשיחות ארוכות
ה-context window של LLMs מוגבל ויקר. בחירת מנגנון הזיכרון הנכון היא ההחלטה הארכיטקטורלית החשובה ביותר בבניית chatbot. כל גישה מתאימה למקרה שימוש שונה.
from langchain.memory import (
ConversationBufferMemory,
ConversationSummaryMemory,
ConversationBufferWindowMemory,
ConversationEntityMemory
)
# Buffer Window — זוכר N הודעות אחרונות
memory = ConversationBufferWindowMemory(
k=5, # זוכר 5 זוגות שאלה-תשובה
return_messages=True,
memory_key="chat_history"
)
# Summary Memory — מסכם שיחות ארוכות
summary_memory = ConversationSummaryMemory(
llm=ChatOpenAI(model="gpt-4o-mini"),
return_messages=True
)
# Entity Memory — זוכר עובדות על אנשים ומקומות
entity_memory = ConversationEntityMemory(
llm=ChatOpenAI(model="gpt-4o-mini")
)
# "המשתמש הזכיר שהוא גר בתל אביב ועובד בסטארטאפ"
לשיחות קצרות-בינוניות. פשוט ומהיר. כשלון: מאבד הקשר ישן לחלוטין. עלות: נמוכה וקבועה.
לשיחות ארוכות מאוד. שומר על הקשר כללי. עלות נוספת לסיכום — מחיר שווה עבור UX טוב יותר.
RAG Pipeline — Retrieval Augmented Generation
RAG הוא הארכיטקטורה שמאפשרת ל-LLM לענות על שאלות מתוך מסמכים שלא היו בנתוני האימון שלו. במקום לאמן מחדש את המודל (יקר מאוד), אנו ממירים את המסמכים ל-embeddings, שומרים אותם ב-vector store, ובזמן שאלה — שולפים את הקטעים הרלוונטיים ומוסרים אותם כהקשר.
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
# שלב 1: טעינת PDF
loader = PyPDFLoader("company_docs.pdf")
documents = loader.load()
# שלב 2: חיתוך לחלקים (Chunking)
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ".", " "]
)
chunks = splitter.split_documents(documents)
# שלב 3: יצירת Vector Store
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(
chunks,
embeddings,
persist_directory="./chroma_db"
)
# שלב 4: יצירת RAG Chain
qa_chain = RetrievalQA.from_chain_type(
llm=ChatOpenAI(model="gpt-4o"),
retriever=vectorstore.as_retriever(search_kwargs={"k": 4}),
return_source_documents=True
)
result = qa_chain.invoke({"query": "מה מדיניות ההחזרות שלנו?"})
print(result["result"])
Advanced RAG — שיפורי ביצועים
Document Processing — מ-PDF ל-Vector Store
טעינת מסמכים נכונה היא קריטית להצלחת RAG. LangChain תומך ב-50+ סוגי מסמכים. המפתח הוא שמירת metadata על כל chunk — שם הקובץ, מספר עמוד, תאריך — כדי שניתן יהיה להציגם למשתמש יחד עם התשובה.
from langchain_community.document_loaders import (
PyPDFLoader,
WebBaseLoader,
JSONLoader,
CSVLoader,
UnstructuredWordDocumentLoader
)
from langchain_core.documents import Document
# טעינת מסמכים מרובים עם metadata
async def load_documents(file_paths: list[str]) -> list[Document]:
all_docs = []
for path in file_paths:
if path.endswith('.pdf'):
loader = PyPDFLoader(path)
elif path.endswith('.docx'):
loader = UnstructuredWordDocumentLoader(path)
elif path.endswith('.csv'):
loader = CSVLoader(path)
else:
continue
docs = loader.load()
# הוסף metadata לכל מסמך
for doc in docs:
doc.metadata['source_file'] = path
doc.metadata['loaded_at'] = datetime.now().isoformat()
all_docs.extend(docs)
return all_docs
PDF Chatbot עם זיכרון שיחה
פרויקט המסכם את מודול 2: chatbot שמאפשר שאלות על מסמכי PDF, זוכר את היסטוריית השיחה, ומציג את מקורות המידע בכל תשובה. בסיס למוצרי AI רבים — customer support, Q&A על דוקומנטציה, ניתוח חוזים.
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferWindowMemory
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
def build_pdf_chatbot(persist_dir: str = "./chroma_db") -> ConversationalRetrievalChain:
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
persist_directory=persist_dir,
embedding_function=embeddings
)
memory = ConversationBufferWindowMemory(
k=6,
memory_key="chat_history",
return_messages=True,
output_key="answer"
)
chain = ConversationalRetrievalChain.from_llm(
llm=ChatOpenAI(model="gpt-4o", temperature=0),
retriever=vectorstore.as_retriever(search_kwargs={"k": 4}),
memory=memory,
return_source_documents=True,
verbose=True
)
return chain
# שימוש
chatbot = build_pdf_chatbot()
result = chatbot.invoke({"question": "מה תנאי ביטול ההסכם?"})
print(result["answer"])
print("\nמקורות:")
for doc in result["source_documents"]:
print(f" - {doc.metadata.get('source_file')}, עמוד {doc.metadata.get('page', '?')}")
CrewAI — Multi-Agent Systems
CrewAI מאפשרת לנו לבנות צוותים של סוכני AI שעובדים יחד. כל סוכן מקבל תפקיד מוגדר, כלים ספציפיים ומטרות ברורות. הסוכנים מתקשרים ביניהם, מאציליים משימות ובונים על עבודת זה. זהו מודל חזק במיוחד לתהליכי עבודה מורכבים שדורשים התמחות.
הגדרת Agents, Tasks ו-Crew
היסודות של CrewAI: Agent הוא סוכן עם תפקיד, מטרה ורקע. Task היא משימה עם תיאור ותוצאה צפויה. Crew היא הצוות שמנהל את הביצוע.
from crewai import Agent, Task, Crew, Process
from crewai_tools import SerperDevTool, WebsiteSearchTool
# כלים משותפים
search_tool = SerperDevTool()
web_tool = WebsiteSearchTool()
# הגדרת סוכנים
researcher = Agent(
role="חוקר בכיר",
goal="לאסוף מידע מעמיק ועדכני על נושא נתון",
backstory="""אתה חוקר AI מנוסה עם יכולת מצוינת
למצוא מידע רלוונטי ולסנן רעש.""",
tools=[search_tool, web_tool],
llm="gpt-4o",
verbose=True
)
writer = Agent(
role="כותב תוכן מקצועי",
goal="לכתוב תוכן מרתק ומדויק בעברית",
backstory="אתה כותב עם ניסיון בתוכן טכנולוגי ו-AI.",
llm="gpt-4o",
verbose=True
)
# הגדרת משימות
research_task = Task(
description="חקור את המגמות העדכניות ב-{topic}",
expected_output="דוח מחקר מפורט עם 5 מגמות מרכזיות",
agent=researcher
)
write_task = Task(
description="כתוב מאמר בלוג על {topic} בעברית",
expected_output="מאמר של 800 מילים עם כותרות ונקודות מפתח",
agent=writer,
context=[research_task] # מקבל את תוצאת המחקר
)
# הרצת הצוות
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
process=Process.sequential,
verbose=True
)
result = crew.kickoff(inputs={"topic": "AI Agents בשוק 2026"})
Custom Tools לCrewAI
כתיבת כלים מותאמים אישית — חיבור ל-APIs פרטיים, מסד נתונים פנימי, קריאת קבצים. כל כלי הוא פונקציה Python פשוטה עם דקורטור.
from crewai_tools import tool
@tool("Database Query Tool")
def query_db(sql: str) -> str:
"""Query internal DB"""
conn = get_db_connection()
result = conn.execute(sql)
return str(result.fetchall())
Hierarchical Process
במקום ביצוע סדרתי — סוכן מנהל (Manager) מחליט בזמן אמת איזה סוכן יטפל בכל שלב. גמיש יותר, מתאים לתהליכים מורכבים.
crew = Crew(
agents=[researcher, writer, editor],
tasks=[...],
process=Process.hierarchical,
manager_llm="gpt-4o"
)
Memory ו-Collaboration
CrewAI תומך ב-Short-term, Long-term ו-Entity memory. סוכנים יכולים לשתף מידע ולבנות על ידע שנצבר בריצות קודמות.
Content Research Crew
צוות 4 סוכנים: חוקר, כותב, עורך SEO, מנהל סושיאל. מקבל נושא ומפיק: מאמר בלוג + 5 פוסטים לטוויטר + LinkedIn post.
LangGraph — Stateful Agent Workflows
LangGraph מאפשר לבנות workflows של Agents כגרף מכוון — nodes ו-edges עם לוגיקת ניתוב מותנית. בניגוד ל-linear chains, LangGraph תומך ב-loops, בניתוב דינמי ובמצב מתמשך. זהו הכלי לבניית Agents מורכבים כמו ReAct, Plan-and-Execute ו-reflection loops.
State Graphs — מבנה הבסיס של LangGraph
כל LangGraph workflow מתחיל בהגדרת ה-State — מבנה נתונים TypedDict שמועבר בין ה-nodes. כל node מקבל את ה-state ומחזיר עדכונים חלקיים אליו.
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
current_task: str
tools_output: str
iterations: int
final_answer: str
# הגדרת nodes
def call_llm(state: AgentState) -> dict:
messages = state["messages"]
response = llm.invoke(messages)
return {"messages": [response], "iterations": state["iterations"] + 1}
def run_tools(state: AgentState) -> dict:
last_msg = state["messages"][-1]
# בצע את הכלים שהמודל ביקש
results = execute_tool_calls(last_msg.tool_calls)
return {"tools_output": results}
def should_continue(state: AgentState) -> str:
last_msg = state["messages"][-1]
if state["iterations"] > 5:
return "end" # מניעת לולאה אינסופית
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
return "tools"
return "end"
# בניית הגרף
workflow = StateGraph(AgentState)
workflow.add_node("llm", call_llm)
workflow.add_node("tools", run_tools)
workflow.set_entry_point("llm")
workflow.add_conditional_edges("llm", should_continue, {
"tools": "tools",
"end": END
})
workflow.add_edge("tools", "llm") # לולאה: tools → llm → tools...
app = workflow.compile()
ReAct Agent Pattern
Reason + Act — הפרדיגמה שבה המודל חושב בקול רם, בוחר כלי, ומשלב את התוצאה. מימוש ב-LangGraph עם tool_calls ו-ToolMessage.
Checkpointing ו-Human-in-the-Loop
שמירת מצב ה-agent בין הפעלות עם SqliteSaver. הוספת נקודות עצירה לאישור אנושי לפני פעולות קריטיות.
Multi-Agent עם LangGraph
חיבור מספר graphs כ-sub-graphs — Supervisor שמנהל Worker agents. כל worker הוא graph עצמאי עם state משלו.
Code Review Agent
Agent שמקבל PR, מריץ linters, קורא קוד, מציע שיפורים, ומדגיש security issues. עם human approval לפני comment.
FastAPI Production Deployment
הסוכן שלנו צריך להיות נגיש. FastAPI היא הבחירה הסטנדרטית לחשיפת AI Agents כ-API — היא אסינכרונית מהיסוד, מייצרת תיעוד OpenAPI אוטומטי, ומשתלבת בצורה מושלמת עם ה-async Python שלמדנו. במודול זה נבנה API מלא עם authentication, streaming ו-Docker deployment.
FastAPI עם Streaming SSE
from fastapi import FastAPI, Depends, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
import asyncio
app = FastAPI(title="AI Agent API", version="1.0.0")
security = HTTPBearer()
class ChatRequest(BaseModel):
message: str
session_id: str = "default"
async def verify_token(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> str:
token = credentials.credentials
if not is_valid_token(token):
raise HTTPException(status_code=401, detail="Invalid token")
return token
@app.post("/chat/stream")
async def chat_stream(
request: ChatRequest,
token: str = Depends(verify_token)
):
async def generate():
async for chunk in agent.astream({"message": request.message}):
yield f"data: {chunk}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={"X-Session-ID": request.session_id}
)
@app.get("/health")
async def health_check():
return {"status": "healthy", "model": "gpt-4o"}
Rate Limiting ו-Auth
slowapi לrate limiting, JWT tokens עם python-jose, Redis לשמירת sessions. הגנה מפני abuse וחיובים בלתי צפויים.
Docker ו-Docker Compose
Dockerfile מותאם ל-Python AI apps, multi-stage builds להקטנת image, Docker Compose עם Redis ו-PostgreSQL.
Deploy לRender / Railway
פריסה ל-Render.com ו-Railway עם CI/CD אוטומטי. Environment variables, custom domain, auto-scaling.
Monitoring ו-Observability
AI בפרודקשן בלי monitoring הוא סיכון. עלויות יכולות לנסוק, איכות התשובות יכולה לרדת, ו-errors יכולים לעבור מבלי שנדע. מודול זה מכסה את הכלים הסטנדרטיים: LangSmith לtracing, Prometheus למטריקות ו-Grafana לדשבורד.
LangSmith Tracing
LangSmith מאפשר לראות בדיוק מה קורה בתוך כל chain וagent — כל קריאה ל-LLM, כל tool invocation, הזמן שלקח וכמה טוקנים נצרכו. הגדרה דורשת שינוי של שלוש שורות בלבד.
import os
# הגדרת LangSmith — 3 שורות בלבד
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "ls__..."
os.environ["LANGCHAIN_PROJECT"] = "ai-agents-prod"
# מכאן ואילך — כל LangChain code מתועד אוטומטית!
# ניתן לצפות ב: https://smith.langchain.com
# הוספת metadata מותאם לכל run
from langchain_core.tracers.context import tracing_v2_enabled
with tracing_v2_enabled(project_name="production", tags=["v2.1"]):
result = chain.invoke({"question": user_question})
Cost Tracking ועלויות
מעקב אחר עלות כל שיחה, אצירת usage לפי user_id, alerts כשעלות יומית חורגת מסף. שימוש ב-tiktoken לחישוב טוקנים לפני שליחה.
import tiktoken
def count_tokens(text: str, model: str = "gpt-4o") -> int:
enc = tiktoken.encoding_for_model(model)
return len(enc.encode(text))
# est. cost before API call
tokens = count_tokens(prompt)
est_cost = tokens * 0.000005 # $5/M tokens
Prometheus + Grafana
חשיפת מטריקות מ-FastAPI, דשבורד Grafana עם latency percentiles (p50, p90, p99), error rates ו-token usage לאורך זמן.
from prometheus_client import Counter, Histogram
llm_calls = Counter("llm_calls_total", "Total LLM calls", ["model"])
llm_latency = Histogram("llm_latency_seconds", "LLM latency")
סיימת את הקורס — מה עכשיו?
עכשיו יש לך את כל הכלים לבנות AI Agents מקצועיים. הצטרף לקהילה, שתף את הפרויקטים שבנית, וקבל פידבק ממפתחים אחרים.