一、为什么 Agent 必须跑在沙箱里 2024 年,OpenAI 的 Code Interpreter 执行了一段用户提交的 Python 代码,触发了内部安全机制——这段代码试图读取宿主机的 /etc/passwd。如果当时没有沙箱隔离,整个服务都可能被拖垮。
这不是个例。
AI Agent 的核心能力是执行 ——调用工具、运行代码、访问文件系统、发起网络请求。这种执行能力是把双刃剑:
提示注入攻击 :恶意用户通过精心构造的提示词,诱导 Agent 执行非授权操作
代码执行风险 :Agent 生成的代码可能包含无限循环、资源耗尽或恶意逻辑
数据泄露 :Agent 可能意外暴露敏感环境变量、密钥或用户数据
供应链污染 :Agent 调用的外部工具可能被篡改或包含漏洞
沙箱的本质 :在受控环境中运行不可信代码,限制其资源访问权限,确保即使发生安全事故,影响也被严格限定。
二、沙箱架构的三种实现路径 2.1 容器化隔离(Container-based) 最主流的方案,以 Docker 为核心。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 version: '3.8' services: agent-sandbox: image: python:3.11-slim read_only: true tmpfs: - /tmp:noexec,nosuid,size=100m security_opt: - no -new-privileges:true cap_drop: - ALL cap_add: - CHOWN network_mode: none pids_limit: 50 mem_limit: 512m cpus: 1.0
优点 :成熟、生态完善、资源控制好缺点 :启动延迟(秒级)、镜像体积大、内核共享存在逃逸风险
2.2 轻量级虚拟化(MicroVM) Firecracker、Kata Containers 为代表,结合 VM 的安全性和容器的速度。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import firecrackersandbox = firecracker.Sandbox( kernel_image="vmlinux-5.10" , rootfs="agent-sandbox.ext4" , vcpu_count=2 , mem_size_mib=512 , smt=False , cpu_template="T2" , mmds_address="169.254.169.254" , ) sandbox.start()
优点 :强隔离、启动快(毫秒级)、适合多租户缺点 :技术较新、运维复杂度高
2.3 进程级隔离(Process-based) 使用 seccomp、namespace、cgroups 直接隔离进程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import subprocessdef run_in_sandbox (code: str , timeout: int = 30 ): """使用 nsjail 隔离执行 Python 代码""" nsjail_config = """ mode: ONCE uidmap {inside_id: "nobody"} gidmap {inside_id: "nogroup"} # 文件系统隔离 mount { src: "/tmp/sandbox-root" dst: "/" is_bind: true } # 网络隔离 mode: EXEC exec_bin {path: "/usr/bin/python3"} # 资源限制 rlimit_as: 536870912 # 512MB 地址空间 rlimit_cpu: 30 # 30秒 CPU 时间 rlimit_nofile: 32 # 32 个文件描述符 rlimit_nproc: 10 # 10 个进程 # seccomp 过滤 seccomp_string: ' POLICY sandbox { ALLOW { open, read, write, close, exit, exit_group } KILL { execve, socket, connect } } ' """ result = subprocess.run( ["nsjail" , "--config" , "/tmp/nsjail.cfg" , "--" , "/usr/bin/python3" , "-c" , code], capture_output=True , text=True , timeout=timeout ) return result
优点 :极致轻量、启动极快(毫秒级)、资源占用低缺点 :隔离强度弱于 VM、配置复杂
三、E2B:开源 Agent 沙箱的实践标杆 E2B 是目前最受欢迎的开源 Agent 沙箱方案,被 LangChain、OpenAI 等公司采用。
3.1 核心架构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ┌─────────────────────────────────────────────────────────┐ │ E2B Sandbox Cloud │ ├─────────────────────────────────────────────────────────┤ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Sandbox 1 │ │ Sandbox 2 │ │ Sandbox N │ │ │ │ (Ubuntu) │ │ (Ubuntu) │ │ (Custom) │ │ │ │ • 2 vCPU │ │ • 4 vCPU │ │ • GPU │ │ │ │ • 512MB │ │ • 2GB │ │ • 8GB │ │ │ │ • 5min TTL │ │ • 30min TTL│ │ • Custom │ │ │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ └─────────────────┴─────────────────┘ │ │ │ │ │ Firecracker │ │ MicroVM Layer │ └─────────────────────────────────────────────────────────┘
E2B 基于 Firecracker MicroVM,每个沙箱:
启动时间 :< 1 秒
资源隔离 :独立的内核、文件系统、网络栈
生命周期 :默认 5 分钟 TTL,可动态续期
快照恢复 :支持从快照快速克隆
3.2 使用实战 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 from e2b import Sandboximport asyncioasync def secure_code_execution (): """在 E2B 沙箱中安全执行代码""" sandbox = await Sandbox.create(timeout=300 ) try : result = await sandbox.process.start_and_wait( cmd="pip install pandas numpy" , timeout=60 ) print (f"安装输出: {result.stdout} " ) await sandbox.files.write( path="/home/user/analysis.py" , content=''' import pandas as pd import numpy as np # 沙箱内无法访问外部网络,只能处理本地数据 data = pd.DataFrame({ 'x': np.random.randn(1000), 'y': np.random.randn(1000) }) print(f"数据统计:\\n{data.describe()}") ''' ) exec_result = await sandbox.process.start_and_wait( cmd="python /home/user/analysis.py" , timeout=30 ) print (f"执行结果:\\n{exec_result.stdout} " ) files = await sandbox.files.list ("/home/user" ) print (f"生成文件: {[f.name for f in files]} " ) await sandbox.set_timeout(600 ) finally : await sandbox.kill() asyncio.run(secure_code_execution())
3.3 自定义模板 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 FROM e2bdev/code-interpreter:latestRUN pip install \ langgraph \ langchain-openai \ pandas \ numpy \ matplotlib \ seaborn RUN python -c "import nltk; nltk.download('punkt')" WORKDIR /home/user COPY startup.sh /opt/startup.sh RUN chmod +x /opt/startup.sh ENTRYPOINT ["/opt/startup.sh" ]
1 2 3 e2b template build --name "my-agent-env" e2b template publish --name "my-agent-env"
四、LangGraph 中的沙箱集成 LangGraph 作为 Agent 编排框架,与沙箱的集成至关重要。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 from langgraph.graph import StateGraph, MessagesStatefrom langgraph.prebuilt import ToolNodefrom e2b import Sandboximport asyncioclass SandboxToolNode (ToolNode ): """在 E2B 沙箱中执行工具的节点""" def __init__ (self, tools, sandbox_template="base" ): super ().__init__(tools) self .sandbox_template = sandbox_template self ._sandbox_pool = [] async def _execute_in_sandbox (self, tool_func, args ): """在隔离沙箱中执行工具""" sandbox = await Sandbox.create(template=self .sandbox_template) try : import pickle import base64 payload = base64.b64encode(pickle.dumps({ 'func' : tool_func.__code__, 'args' : args })).decode() result = await sandbox.process.start_and_wait( cmd=f"python -c 'exec_tool(\"{payload} \")'" , timeout=60 ) if result.exit_code != 0 : raise RuntimeError(f"沙箱执行失败: {result.stderr} " ) return pickle.loads(base64.b64decode(result.stdout)) finally : await sandbox.kill() async def invoke (self, state: MessagesState ): """重写 invoke 以支持沙箱执行""" messages = state["messages" ] last_message = messages[-1 ] if not last_message.tool_calls: return {"messages" : []} tool_results = [] for tool_call in last_message.tool_calls: tool = self .tools_by_name.get(tool_call["name" ]) if not tool: continue result = await self ._execute_in_sandbox( tool_func=tool.func, args=tool_call["args" ] ) tool_results.append({ "role" : "tool" , "content" : str (result), "tool_call_id" : tool_call["id" ] }) return {"messages" : tool_results} from langchain_core.tools import tool@tool def analyze_csv (file_path: str ) -> str : """分析 CSV 文件""" import pandas as pd df = pd.read_csv(file_path) return f"行数: {len (df)} , 列数: {len (df.columns)} " @tool def execute_sql (query: str ) -> str : """执行 SQL 查询""" import sqlite3 conn = sqlite3.connect("/tmp/data.db" ) result = conn.execute(query).fetchall() return str (result) builder = StateGraph(MessagesState) builder.add_node("sandbox_tools" , SandboxToolNode([analyze_csv, execute_sql])) builder.add_edge("__start__" , "sandbox_tools" ) graph = builder.compile ()
4.2 状态隔离策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from typing import TypedDict, Annotatedfrom langgraph.graph.message import add_messagesimport hashlibclass IsolatedState (TypedDict ): """支持多租户隔离的状态""" messages: Annotated[list , add_messages] tenant_id: str sandbox_id: str _secure_context: str def create_isolated_sandbox (state: IsolatedState ): """为每个租户创建独立沙箱""" tenant_hash = hashlib.sha256( state["tenant_id" ].encode() ).hexdigest()[:16 ] return Sandbox.create( template=f"tenant-{tenant_hash} " , metadata={ "tenant_id" : state["tenant_id" ], "created_at" : datetime.utcnow().isoformat() } )
五、生产环境的安全 checklist 5.1 网络隔离 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 NETWORK_POLICIES = { "isolated" : { "egress" : [], "ingress" : [], "dns" : False }, "restricted" : { "egress" : [ "api.openai.com:443" , "api.anthropic.com:443" , "pypi.org:443" ], "ingress" : [], "dns" : True }, "proxied" : { "egress" : ["proxy.internal:8080" ], "ingress" : [], "dns" : True , "http_proxy" : "http://proxy.internal:8080" , "https_proxy" : "http://proxy.internal:8080" } } sandbox = await Sandbox.create( template="base" , network_policy=NETWORK_POLICIES["restricted" ] )
5.2 资源配额 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 apiVersion: v1 kind: ResourceQuota metadata: name: agent-sandbox-quota spec: hard: requests.cpu: "10" requests.memory: 20Gi limits.cpu: "20" limits.memory: 40Gi pods: "50" --- apiVersion: v1 kind: LimitRange metadata: name: agent-sandbox-limits spec: limits: - default: cpu: "500m" memory: "512Mi" defaultRequest: cpu: "100m" memory: "128Mi" max: cpu: "2" memory: "2Gi" min: cpu: "50m" memory: "64Mi" type: Container
5.3 审计与监控 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import structlogfrom datetime import datetimelogger = structlog.get_logger() class SandboxedExecutionLogger : """沙箱执行审计日志""" async def log_execution (self, sandbox_id: str , tool_name: str , args: dict , result: dict , duration_ms: int ): """记录每次工具执行""" logger.info( "tool_execution" , sandbox_id=sandbox_id, tool_name=tool_name, args_hash=self ._hash_args(args), result_status=result.get("status" ), duration_ms=duration_ms, timestamp=datetime.utcnow().isoformat(), risk_score=self ._calculate_risk(tool_name, args) ) def _calculate_risk (self, tool_name: str , args: dict ) -> int : """计算执行风险分(0-100)""" risk = 0 high_risk_tools = ["execute_sql" , "exec_code" , "write_file" ] if tool_name in high_risk_tools: risk += 30 sensitive_patterns = ["password" , "secret" , "key" , "token" ] for pattern in sensitive_patterns: if any (pattern in str (v).lower() for v in args.values()): risk += 20 if "path" in args: path = args["path" ] if any (p in path for p in ["/etc" , "/root" , "/var" , ".." ]): risk += 40 return min (risk, 100 )
5.4 密钥管理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 from azure.identity import DefaultAzureCredentialfrom azure.keyvault.secrets import SecretClientimport osclass SecureSecretProvider : """安全密钥提供者 - 密钥永不下发到沙箱""" def __init__ (self ): self .credential = DefaultAzureCredential() self .client = SecretClient( vault_url=os.environ["KEY_VAULT_URL" ], credential=self .credential ) async def get_secret_for_sandbox (self, sandbox_id: str , secret_name: str ): """ 获取密钥 - 但不在沙箱中直接使用 而是通过代理服务转发请求 """ secret = self .client.get_secret(secret_name) token = await self ._create_scoped_token( sandbox_id=sandbox_id, allowed_apis=["openai/chat/completions" ], expires_in=300 ) return { "type" : "scoped_token" , "token" : token, "expires_at" : datetime.utcnow().timestamp() + 300 }
六、性能优化:沙箱不是免费的 沙箱的隔离是有成本的,我们需要在安全和性能之间找到平衡。
6.1 连接池化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 from e2b import Sandboximport asynciofrom contextlib import asynccontextmanagerclass SandboxPool : """E2B 沙箱连接池""" def __init__ (self, template: str , min_size: int = 5 , max_size: int = 20 ): self .template = template self .min_size = min_size self .max_size = max_size self ._pool = asyncio.Queue() self ._size = 0 self ._lock = asyncio.Lock() async def initialize (self ): """预热池子""" for _ in range (self .min_size): sandbox = await Sandbox.create(template=self .template) await self ._pool.put(sandbox) self ._size += 1 @asynccontextmanager async def acquire (self, timeout: int = 30 ): """获取沙箱(上下文管理器)""" sandbox = None try : sandbox = await asyncio.wait_for( self ._pool.get(), timeout=timeout ) yield sandbox finally : if sandbox: await self ._reset_sandbox(sandbox) await self ._pool.put(sandbox) async def _reset_sandbox (self, sandbox: Sandbox ): """快速重置沙箱状态""" await sandbox.process.start_and_wait("rm -rf /tmp/* /home/user/*" ) await sandbox.process.start_and_wait("env -i PATH=/usr/bin:/bin" ) result = await sandbox.process.start_and_wait("echo 'ping'" ) if result.exit_code != 0 : await sandbox.kill() new_sandbox = await Sandbox.create(template=self .template) return new_sandbox pool = SandboxPool(template="my-agent-env" , min_size=10 ) await pool.initialize()async with pool.acquire() as sandbox: result = await sandbox.process.start_and_wait("python script.py" )
6.2 冷启动优化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 async def create_from_snapshot (base_sandbox_id: str ): """从快照创建新沙箱(比全新启动快 10 倍)""" base_sandbox = await Sandbox.connect(base_sandbox_id) snapshot = await base_sandbox.create_snapshot() new_sandbox = await Sandbox.create( template="base" , snapshot_id=snapshot.id ) return new_sandbox
6.3 分层执行策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 class TieredExecutionEngine : """ 分层执行引擎:根据风险等级选择隔离级别 - L0: 无隔离(纯函数、已知安全代码) - L1: 进程级隔离(seccomp) - L2: 容器隔离(Docker) - L3: VM 隔离(Firecracker/E2B) """ async def execute (self, code: str , risk_level: int = None ): if risk_level is None : risk_level = self ._assess_risk(code) if risk_level == 0 : return await self ._execute_native(code) elif risk_level == 1 : return await self ._execute_seccomp(code) elif risk_level == 2 : return await self ._execute_docker(code) else : return await self ._execute_e2b(code) def _assess_risk (self, code: str ) -> int : """代码风险分析""" risk = 0 dangerous_patterns = [ (r'import\s+os' , 1 ), (r'import\s+subprocess' , 2 ), (r'open\s*\(' , 1 ), (r'__import__' , 2 ), (r'eval\s*\(' , 2 ), (r'exec\s*\(' , 2 ), (r'socket\.' , 2 ), (r'requests\.' , 1 ), ] for pattern, severity in dangerous_patterns: if re.search(pattern, code): risk = max (risk, severity) return risk
七、完整实战:构建企业级 Agent 平台 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 from langgraph.graph import StateGraph, ENDfrom langgraph.checkpoint.sqlite import SqliteSaverfrom e2b import Sandboximport asyncioimport structloglogger = structlog.get_logger() class EnterpriseAgentPlatform : """ 企业级 Agent 平台 - 多租户隔离 - 审计日志 - 资源配额 - 故障自愈 """ def __init__ (self ): self .sandbox_pool = SandboxPool(template="enterprise" , min_size=20 ) self .checkpoint_saver = SqliteSaver("/data/checkpoints.db" ) self .execution_logger = SandboxedExecutionLogger() async def initialize (self ): """平台初始化""" await self .sandbox_pool.initialize() logger.info("platform_initialized" ) async def execute_agent_task ( self, tenant_id: str , agent_config: dict , user_input: str , timeout: int = 300 ): """ 执行 Agent 任务 Args: tenant_id: 租户 ID(多租户隔离) agent_config: Agent 配置 user_input: 用户输入 timeout: 最大执行时间 """ start_time = asyncio.get_event_loop().time() try : async with self .sandbox_pool.acquire(timeout=10 ) as sandbox: await self ._configure_tenant_context(sandbox, tenant_id) graph = self ._build_sandboxed_graph( sandbox=sandbox, agent_config=agent_config, allowed_tools=agent_config.get("tools" , []) ) config = { "configurable" : { "thread_id" : f"{tenant_id} -{uuid.uuid4()} " , "checkpoint_ns" : tenant_id } } result = await asyncio.wait_for( graph.ainvoke( {"messages" : [{"role" : "user" , "content" : user_input}]}, config=config ), timeout=timeout ) duration_ms = int ( (asyncio.get_event_loop().time() - start_time) * 1000 ) await self .execution_logger.log_execution( sandbox_id=sandbox.id , tool_name="agent_task" , args={"tenant_id" : tenant_id, "input" : user_input[:100 ]}, result={"status" : "success" }, duration_ms=duration_ms ) return { "status" : "success" , "output" : result["messages" ][-1 ].content, "sandbox_id" : sandbox.id , "duration_ms" : duration_ms } except asyncio.TimeoutError: logger.error("task_timeout" , tenant_id=tenant_id, timeout=timeout) return {"status" : "timeout" , "error" : "任务执行超时" } except Exception as e: logger.error("task_failed" , tenant_id=tenant_id, error=str (e)) return {"status" : "error" , "error" : str (e)} def _build_sandboxed_graph (self, sandbox: Sandbox, agent_config: dict , allowed_tools: list ): """构建在沙箱中运行的 LangGraph""" tool_node = SandboxToolNode( tools=[t for t in ALL_TOOLS if t.name in allowed_tools], sandbox=sandbox ) def llm_node (state ): response = self ._call_llm_proxy( messages=state["messages" ], model=agent_config.get("model" , "gpt-4" ) ) return {"messages" : [response]} builder = StateGraph(MessagesState) builder.add_node("llm" , llm_node) builder.add_node("tools" , tool_node) builder.set_entry_point("llm" ) builder.add_conditional_edges( "llm" , lambda state: "tools" if state["messages" ][-1 ].tool_calls else END ) builder.add_edge("tools" , "llm" ) return builder.compile (checkpointer=self .checkpoint_saver) async def main (): platform = EnterpriseAgentPlatform() await platform.initialize() result = await platform.execute_agent_task( tenant_id="acme-corp" , agent_config={ "model" : "gpt-4" , "tools" : ["analyze_csv" , "execute_sql" , "send_email" ] }, user_input="分析上个月的销售数据并发送报告" ) print (result) if __name__ == "__main__" : asyncio.run(main())
八、总结与展望 AI Agent 沙箱不是可选项,而是生产部署的必选项。
核心要点回顾 :
容器化是起点 :Docker 提供基础隔离,适合大多数场景
MicroVM 是未来 :Firecracker/E2B 提供更强隔离,启动时间可接受
分层策略 :根据风险等级选择隔离强度,平衡安全与性能
密钥永不进沙箱 :使用令牌代理,原始密钥留在安全区域
审计必须完整 :每次执行都要记录,支持事后追溯
未来趋势 :
WebAssembly :更轻量的隔离方案,Wasmtime、Wasmer 正在探索
机密计算 :Intel SGX、AMD SEV 提供硬件级隔离
零信任架构 :”从不信任,始终验证”,每个工具调用都要鉴权
参考资源