跳转到主要内容

快速示例

import os
import requests
import time

DEVIN_API_KEY = os.getenv("DEVIN_API_KEY")

# 创建一个新会话
response = requests.post(
    "https://api.devin.ai/v1/sessions",
    headers={"Authorization": f"Bearer {DEVIN_API_KEY}"},
    json={
        "prompt": "Review PR #249",
        "idempotent": True
    }
)

print("Response:\n"+ str(response.json()))
session_id = response.json()["session_id"]

# 监控会话状态
while True:
    status = requests.get(
        f"https://api.devin.ai/v1/sessions/{session_id}",
        headers={"Authorization": f"Bearer {DEVIN_API_KEY}"}
    ).json()
    print("Status:\n" + str(status))
    if status["status_enum"] in ["blocked", "stopped"]:
        break
    time.sleep(5)
import os
import requests

DEVIN_API_KEY = os.getenv("DEVIN_API_KEY")

# 上传文件
with open("data.csv", "rb") as f:
    response = requests.post(
        "https://api.devin.ai/v1/attachments",
        headers={"Authorization": f"Bearer {DEVIN_API_KEY}"},
        files={"file": f}
    )
file_url = response.text

# 创建一个会话来处理文件
response = requests.post(
    "https://api.devin.ai/v1/sessions",
    headers={"Authorization": f"Bearer {DEVIN_API_KEY}"},
    json={
        "prompt": f"Analyze the data in the attached file.\n\nATTACHMENT:\"{file_url}\""
    }
)

print(str(response.json()))
import os
import requests

DEVIN_API_KEY = os.getenv("DEVIN_API_KEY")

# 向一个活动会话发送消息
requests.post(
    f"https://api.devin.ai/v1/sessions/{session_id}/message",
    headers={"Authorization": f"Bearer {DEVIN_API_KEY}"},
    json={
        "message": "Make sure to write units tests when you are done."
    }
)
以下是一个完整示例,演示如何使用 Devin API 分析 GitHub 仓库。该示例展示了:
  • 身份验证和错误处理
  • 创建和监控会话
  • 处理结构化输出
  • 使用 async/await 实现合理的资源管理
import os
import aiohttp
import asyncio
from typing import List, Dict

API_KEY = os.getenv("DEVIN_API_KEY")
API_BASE = "https://api.devin.ai/v1"

async def count_stars(session: aiohttp.ClientSession, repo_url: str) -> int:
    """使用 Devin 统计 GitHub 仓库的 star 数量。"""
    # 创建一个新的 Devin 会话
    async with session.post(
        f"{API_BASE}/sessions",
        json={"prompt": f"Count stars for GitHub repository: {repo_url}"}
    ) as response:
        session_data = await response.json()
        session_id = session_data["session_id"]
        print(f"Created session {session_id} for {repo_url}")
        print(f"URL: {session_data['url']}")

    # 使用指数退避轮询结果
    backoff = 1
    print("Polling for results...")
    while True:
        async with session.get(
            f"{API_BASE}/sessions/{session_id}"
        ) as response:
            response_json = await response.json()
            if response_json["status_enum"] in ["blocked", "stopped"]:
                return response_json["structured_output"].get("star_count", 0)
        await asyncio.sleep(min(backoff, 30))
        backoff *= 2

async def main():
    """分析 GitHub 仓库的主函数。"""
    headers = {"Authorization": f"Bearer {API_KEY}"}
    async with aiohttp.ClientSession(headers=headers) as session:
        # 要分析的示例仓库
        repos = [
            "https://github.com/openai/openai-python",
            "https://github.com/anthropics/anthropic-sdk-python"
        ]
        
        # 为每个仓库统计 star 数量
        for repo in repos:
            stars = await count_stars(session, repo)
            print(f"{repo}: {stars} stars")

if __name__ == "__main__":
    asyncio.run(main())

高级示例

支持

需要帮助?

如有关于 API 的疑问或需要报告问题,请发送电子邮件至 [email protected]