跳过正文
  1. 文章列表/

零信任网络架构实施:从身份验证到微隔离的完整落地方案

Elone Yue
作者
Elone Yue

引言:为什么传统边界安全模型已经失效
#

在过去十几年里,企业网络安全依赖于一个核心假设——网络边界是可信任的。防火墙将"内部"和"外部"划出明确的信任分界线:外部威胁需要突破防火墙才能进入,而内部流量则被默认为可信。

这个模型在今天已经千疮百孔。远程办公让"内部网络"的概念模糊不清;云服务的采用使流量直接绕过企业边界;供应链攻击可以在不触发任何边界告警的情况下深入核心系统;而内部威胁更是天然就在边界之内。

零信任(Zero Trust)不是某个具体产品,而是一种安全架构理念。NIST SP 800-207 给出了精确定义:零信任是一种安全范式,它要求对每次访问请求进行验证,无论请求源自何处。“从不信任,始终验证”——这八个字概括了零信任的核心。

本文将从零信任的理论基础出发,深入探讨身份为中心的安全模型、设备信任评估、微隔离实现以及持续验证机制,并提供从 Google BeyondCorp 到云原生服务网格的完整落地方案。

零信任核心原则与架构演进
#

NIST SP 800-207 三大支柱
#

┌─────────────────────────────────────────────────────────────┐
│                    零信任架构三支柱                           │
│                                                             │
│  ┌──────────────────┐  ┌──────────────────┐  ┌───────────┐ │
│  │  身份为中心       │  │  设备信任         │  │  微隔离   │ │
│  │  Identity-First  │  │  Device Trust    │  │  Micro-   │ │
│  │                  │  │                  │  │ segmentation│ │
│  │  不信任网络位置   │  │ 验证设备合规性    │  │           │ │
│  │  基于身份授权     │  │ 持续评估风险      │  │ 细粒度访问│ │
│  │  动态信任评估     │  │ 安全状态追踪      │  │ 控制      │ │
│  └──────────────────┘  └──────────────────┘  └───────────┘ │
│                                                             │
│                    ┌──────────────────┐                      │
│                    │  持续验证         │                      │
│                    │  Continuous Verify│                      │
│                    │  实时风险评估      │                      │
│                    │  自适应授权        │                      │
│                    └──────────────────┘                      │
└─────────────────────────────────────────────────────────────┘

从传统架构到零信任的演进路径
#

传统安全架构                     零信任架构
─────────────                    ──────────────────
┌───────────┐                    每个请求都需要认证
│ 防火墙     │                    不依赖网络位置
│ (边界防御) │  →  演进  →      最小权限原则
└───────────┘                    持续验证信任
       │                               │
       │  内部 = 可信                  │  没有"内部"的概念
       │  外部 = 不可信                │  所有流量不可信
       │  一次认证长期有效              │  每次访问重新评估
       │  粗粒度网络分段                │  细粒度微隔离

身份为中心的安全模型
#

多因素认证 (MFA) 实施
#

零信任架构中,身份成为新的安全边界。MFA 不再是可选项,而是必须项。

# 身份验证策略配置示例
# 基于风险的自适应 MFA 策略

authentication:
  # 基础认证要求
  mfa:
    required_for:
      - all_external_access
      - privileged_operations
      - new_device
      - sensitive_resources

    # 根据风险等级动态调整
    adaptive:
      # 低风险 - 仅密码
      low_risk:
        factors: [password]
        conditions:
          - known_device
          - known_location
          - normal_behavior

      # 中风险 - 密码 + TOTP
      medium_risk:
        factors: [password, totp]
        conditions:
          - new_location
          - unknown_device

      # 高风险 - 密码 + FIDO2/WebAuthn
      high_risk:
        factors: [password, fido2]
        conditions:
          - impossible_travel
          - suspicious_behavior
          - compromised_credentials_detected

  # 支持的 MFA 方法优先级
  methods:
    - name: "FIDO2 / WebAuthn"
      priority: 1
      security_level: "highest"
      description: "硬件密钥或设备内置生物识别"

    - name: "TOTP"
      priority: 2
      security_level: "high"
      description: "时间基一次性密码 (RFC 6238)"

    - name: "Push Notification"
      priority: 3
      security_level: "medium"
      description: "移动应用推送验证"

    - name: "SMS/Email OTP"
      priority: 4
      security_level: "low"
      description: "短信/邮件验证码 (不推荐作为主要方法)"

条件访问策略
#

条件访问(Conditional Access)是零信任的核心引擎——它根据多个信号实时决定访问权限:

#!/usr/bin/env python3
"""
条件访问策略引擎
基于多因素信号实时评估访问风险并决定授权
"""

from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Optional, Tuple
from datetime import datetime, timedelta
import hashlib
import json


class RiskLevel(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"


class AccessDecision(Enum):
    GRANT = "grant"
    DENY = "deny"
    CHALLENGE = "challenge"   # 需要额外验证
    GRANT_WITH_SESSION = "grant_with_limited_session"


@dataclass
class UserContext:
    """用户身份上下文"""
    user_id: str
    username: str
    roles: List[str] = field(default_factory=list)
    department: str = ""
    mfa_enrolled: bool = False
    password_age_days: int = 0
    last_password_change: Optional[datetime] = None
    account_locked: bool = False
    is_privileged: bool = False


@dataclass
class DeviceContext:
    """设备信任上下文"""
    device_id: str
    device_type: str           # "company_managed" | "personal" | "unknown"
    os_version: str
    is_compliant: bool = False
    encryption_enabled: bool = False
    antivirus_status: str = "unknown"  # "current" | "outdated" | "missing"
    last_checkin: Optional[datetime] = None
    jailbroken: bool = False
    secure_boot: bool = True


@dataclass
class SessionContext:
    """会话上下文"""
    source_ip: str
    user_agent: str
    geo_location: Dict = field(default_factory=dict)
    protocol: str = "HTTPS"
    time_of_day: int = 0      # 0-23
    day_of_week: int = 0      # 0-6
    access_pattern_history: List[str] = field(default_factory=list)
    session_age_minutes: int = 0


@dataclass
class ResourceContext:
    """资源上下文"""
    resource_id: str
    resource_type: str          # "database" | "api" | "file_share" | "admin_panel"
    sensitivity: str            # "public" | "internal" | "confidential" | "restricted"
    requires_mfa: bool = False
    max_concurrent_sessions: int = 1
    current_sessions: int = 0


@dataclass
class AccessDecision:
    """访问决策结果"""
    decision: AccessDecision
    risk_score: int            # 0-100
    risk_level: RiskLevel
    mfa_required: bool = False
    session_limits: Dict = field(default_factory=dict)
    denial_reason: str = ""
    challenge_type: str = ""   # 需要验证时指定验证方式
    evidence: List[str] = field(default_factory=list)


class ConditionalAccessEngine:
    """条件访问策略引擎"""

    # 风险评分权重
    WEIGHTS = {
        'device_trust': 25,
        'user_risk': 20,
        'location_risk': 15,
        'behavior_risk': 20,
        'resource_sensitivity': 20,
    }

    def __init__(self, config: Optional[Dict] = None):
        self.config = config or self._default_config()
        self.policy_db: Dict[str, Dict] = {}

    def _default_config(self) -> Dict:
        return {
            'risk_thresholds': {
                'grant': 30,
                'challenge': 60,
                'deny': 80,
            },
            'geolocation': {
                'allowed_countries': ['CN', 'US', 'JP', 'SG', 'DE'],
                'high_risk_countries': ['XX', 'YY'],  # 示例
                'impossible_travel_hours': 2,
            },
            'device': {
                'require_encryption_for': ['confidential', 'restricted'],
                'require_compliant_for': ['restricted'],
                'max_days_since_checkin': 30,
            },
            'session': {
                'max_duration_hours': 8,
                'idle_timeout_minutes': 30,
            }
        }

    def evaluate_access(
        self,
        user: UserContext,
        device: DeviceContext,
        session: SessionContext,
        resource: ResourceContext
    ) -> AccessDecision:
        """综合评估访问请求"""
        evidence = []
        total_risk_score = 0

        # 1. 基础安全检查
        base_check = self._base_security_check(user, device, session)
        if base_check:
            return base_check

        # 2. 设备信任评估
        device_score = self._evaluate_device_trust(device, resource)
        total_risk_score += device_score['weighted_score']
        evidence.extend(device_score['evidence'])

        # 3. 用户风险评估
        user_score = self._evaluate_user_risk(user, session)
        total_risk_score += user_score['weighted_score']
        evidence.extend(user_score['evidence'])

        # 4. 位置风险评估
        location_score = self._evaluate_location_risk(session)
        total_risk_score += location_score['weighted_score']
        evidence.extend(location_score['evidence'])

        # 5. 行为风险评估
        behavior_score = self._evaluate_behavior_risk(user, session)
        total_risk_score += behavior_score['weighted_score']
        evidence.extend(behavior_score['evidence'])

        # 6. 资源敏感度
        resource_score = self._evaluate_resource_sensitivity(resource, device)
        total_risk_score += resource_score['weighted_score']
        evidence.extend(resource_score['evidence'])

        # 确定最终决策
        thresholds = self.config['risk_thresholds']
        risk_level = self._score_to_risk_level(total_risk_score)

        if total_risk_score >= thresholds['deny']:
            return AccessDecision(
                decision=AccessDecision.DENY,
                risk_score=total_risk_score,
                risk_level=risk_level,
                denial_reason=f"Risk score {total_risk_score} exceeds deny threshold {thresholds['deny']}",
                evidence=evidence
            )
        elif total_risk_score >= thresholds['challenge']:
            mfa_type = self._determine_mfa_type(total_risk_score, resource)
            return AccessDecision(
                decision=AccessDecision.CHALLENGE,
                risk_score=total_risk_score,
                risk_level=risk_level,
                mfa_required=True,
                challenge_type=mfa_type,
                evidence=evidence
            )
        else:
            session_limits = {}
            if resource.sensitivity in ('confidential', 'restricted'):
                session_limits = {
                    'max_duration_minutes': self.config['session']['max_duration_hours'] * 60,
                    'require_reauth': True,
                    'idle_timeout_minutes': self.config['session']['idle_timeout_minutes'],
                }
            return AccessDecision(
                decision=AccessDecision.GRANT if resource.sensitivity == 'public' else AccessDecision.GRANT_WITH_SESSION,
                risk_score=total_risk_score,
                risk_level=risk_level,
                session_limits=session_limits,
                evidence=evidence
            )

    def _base_security_check(
        self, user: UserContext, device: DeviceContext, session: SessionContext
    ) -> Optional[AccessDecision]:
        """基础安全检查 - 一票否决项"""
        # 账户锁定
        if user.account_locked:
            return AccessDecision(
                decision=AccessDecision.DENY,
                risk_score=100,
                risk_level=RiskLevel.CRITICAL,
                denial_reason="Account is locked"
            )

        # 设备被劫持
        if device.jailbroken:
            return AccessDecision(
                decision=AccessDecision.DENY,
                risk_score=100,
                risk_level=RiskLevel.CRITICAL,
                denial_reason="Device is jailbroken/rooted"
            )

        # 不安全协议
        if session.protocol != "HTTPS":
            return AccessDecision(
                decision=AccessDecision.DENY,
                risk_score=100,
                risk_level=RiskLevel.CRITICAL,
                denial_reason="Non-HTTPS access not permitted"
            )

        return None

    def _evaluate_device_trust(
        self, device: DeviceContext, resource: ResourceContext
    ) -> Dict:
        """评估设备信任度"""
        score = 0
        evidence = []

        if device.device_type == "unknown":
            score += 30
            evidence.append("Unknown device type")
        elif device.device_type == "personal":
            score += 15
            evidence.append("Personal device (BYOD)")

        if not device.is_compliant:
            score += 25
            evidence.append("Device not compliant with security policy")

        if not device.encryption_enabled:
            if resource.sensitivity in ('confidential', 'restricted'):
                score += 20
                evidence.append("Encryption required for this resource")

        if device.antivirus_status == "missing":
            score += 15
            evidence.append("No antivirus installed")
        elif device.antivirus_status == "outdated":
            score += 10
            evidence.append("Antivirus definitions outdated")

        if device.last_checkin:
            days_since = (datetime.utcnow() - device.last_checkin).days
            if days_since > self.config['device']['max_days_since_checkin']:
                score += 15
                evidence.append(f"Device last checked in {days_since} days ago")

        # 计算加权分数
        weighted = score * (self.WEIGHTS['device_trust'] / 100)

        return {
            'raw_score': score,
            'weighted_score': weighted,
            'evidence': evidence
        }

    def _evaluate_user_risk(
        self, user: UserContext, session: SessionContext
    ) -> Dict:
        """评估用户风险"""
        score = 0
        evidence = []

        # 密码年龄
        if user.password_age_days > 90:
            score += 15
            evidence.append(f"Password age: {user.password_age_days} days")
        elif user.password_age_days > 180:
            score += 25
            evidence.append(f"Password age exceeds 180 days: {user.password_age_days}")

        # MFA 状态
        if user.is_privileged and not user.mfa_enrolled:
            score += 40
            evidence.append("Privileged user without MFA enrollment")

        # 权限等级
        if user.is_privileged:
            score += 10
            evidence.append("Privileged account access")

        # 异常登录时间
        if session.time_of_day < 6 or session.time_of_day > 23:
            score += 10
            evidence.append("Off-hours access attempt")

        weighted = score * (self.WEIGHTS['user_risk'] / 100)
        return {'raw_score': score, 'weighted_score': weighted, 'evidence': evidence}

    def _evaluate_location_risk(
        self, session: SessionContext
    ) -> Dict:
        """评估位置风险"""
        score = 0
        evidence = []

        country = session.geo_location.get('country_code', '')
        if country in self.config['geolocation']['high_risk_countries']:
            score += 40
            evidence.append(f"Access from high-risk country: {country}")
        elif country not in self.config['geolocation']['allowed_countries']:
            score += 25
            evidence.append(f"Access from non-standard country: {country}")

        is_proxy = session.geo_location.get('is_proxy', False)
        if is_proxy:
            score += 20
            evidence.append("Access via proxy/VPN")

        weighted = score * (self.WEIGHTS['location_risk'] / 100)
        return {'raw_score': score, 'weighted_score': weighted, 'evidence': evidence}

    def _evaluate_behavior_risk(
        self, user: UserContext, session: SessionContext
    ) -> Dict:
        """评估行为风险 - 基于用户历史行为模式"""
        score = 0
        evidence = []

        history = session.access_pattern_history
        if history:
            # 检测异常行为模式
            unusual_resources = [
                r for r in history
                if r not in user.roles  # 简化示例
            ]
            if len(unusual_resources) > 3:
                score += 25
                evidence.append("Accessing unusual resources")

            # 高频访问检测
            if len(history) > 100:
                score += 15
                evidence.append("High frequency access pattern")

        weighted = score * (self.WEIGHTS['behavior_risk'] / 100)
        return {'raw_score': score, 'weighted_score': weighted, 'evidence': evidence}

    def _evaluate_resource_sensitivity(
        self, resource: ResourceContext, device: DeviceContext
    ) -> Dict:
        """评估资源敏感度带来的风险"""
        score = 0
        evidence = []

        sensitivity_scores = {
            'public': 0,
            'internal': 10,
            'confidential': 25,
            'restricted': 40,
        }

        raw = sensitivity_scores.get(resource.sensitivity, 0)
        score += raw
        evidence.append(f"Resource sensitivity: {resource.sensitivity}")

        # 受限资源 + 非合规设备
        if resource.sensitivity == 'restricted' and not device.is_compliant:
            score += 20
            evidence.append("Restricted resource requires compliant device")

        weighted = score * (self.WEIGHTS['resource_sensitivity'] / 100)
        return {'raw_score': score, 'weighted_score': weighted, 'evidence': evidence}

    def _score_to_risk_level(self, score: int) -> RiskLevel:
        if score >= 80:
            return RiskLevel.CRITICAL
        elif score >= 60:
            return RiskLevel.HIGH
        elif score >= 30:
            return RiskLevel.MEDIUM
        return RiskLevel.LOW

    def _determine_mfa_type(self, score: int, resource: ResourceContext) -> str:
        """根据风险等级决定 MFA 方式"""
        if score >= 80 or resource.sensitivity == 'restricted':
            return 'fido2'
        elif score >= 60:
            return 'totp'
        return 'push_notification'


# 使用示例
if __name__ == '__main__':
    engine = ConditionalAccessEngine()

    user = UserContext(
        user_id="usr-001",
        username="zhang.san",
        roles=["developer", "reader"],
        is_privileged=False,
        mfa_enrolled=True,
        password_age_days=45
    )

    device = DeviceContext(
        device_id="dev-123",
        device_type="company_managed",
        os_version="macOS 14.3.1",
        is_compliant=True,
        encryption_enabled=True,
        antivirus_status="current",
        last_checkin=datetime.utcnow(),
        jailbroken=False,
        secure_boot=True
    )

    session = SessionContext(
        source_ip="203.0.113.42",
        user_agent="Mozilla/5.0 ...",
        geo_location={"country_code": "CN", "is_proxy": False},
        protocol="HTTPS",
        time_of_day=14,
        day_of_week=2,
    )

    resource = ResourceContext(
        resource_id="api-prod-01",
        resource_type="api",
        sensitivity="confidential",
        requires_mfa=True
    )

    result = engine.evaluate_access(user, device, session, resource)
    print(f"\nDecision: {result.decision.value}")
    print(f"Risk Score: {result.risk_score}")
    print(f"Risk Level: {result.risk_level.value}")
    print(f"Evidence: {result.evidence}")
    if result.session_limits:
        print(f"Session Limits: {result.session_limits}")

设备信任与 attestation
#

设备信任(Device Attestation)是零信任架构的关键组件——它验证设备本身的合规性和完整性。

设备信任链验证流程
─────────────────────────

1. 设备启动
   ├──> Secure Boot 验证固件签名
   │       └──> TPM 测量启动状态
2. 设备标识
   ├──> TPM 生成设备唯一密钥 (Attestation Key)
   │       └──> AIK 由 TPM 厂商证书签名
3. 状态报告
   ├──> 收集 PCR (Platform Configuration Register) 值
   │       └──> 操作系统版本、补丁状态、安全软件状态
4. 信任评估
   ├──> MDM (Mobile Device Management) 验证设备合规
   │       └──> 加密状态、密码策略、Jailbreak 检测
5. 信任令牌
   └──> 颁发设备信任令牌 (Device Trust Token)
           └──> 包含设备指纹 + 安全状态 + 过期时间

微隔离实现:从 Kubernetes Calico 到 eBPF
#

基于 Calico 的 Kubernetes 微隔离
#

Calico 是 Kubernetes 网络策略的主流实现,支持精细的微隔离:

# Calico 网络策略 - 零信任微隔离示例
# 默认拒绝所有跨命名空间流量

apiVersion: projectcalico.org/v3
kind: GlobalNetworkPolicy
metadata:
  name: default-deny-cross-namespace
spec:
  order: 100  # 高优先级
  types:
    - Ingress
    - Egress
  ingress:
    - action: Deny
      source:
        selector: has(projectcalico.org/name)
        namespaceSelector: projectcalico.org/name != 'default'
  egress:
    - action: Deny
      destination:
        selector: has(projectcalico.org/name)
        namespaceSelector: projectcalico.org/name != 'default'

---
# 允许特定服务间的通信
apiVersion: projectcalico.org/v3
kind: NetworkPolicy
metadata:
  name: api-server-ingress-policy
  namespace: production
spec:
  selector: app == 'api-server' && tier == 'backend'
  types:
    - Ingress
  ingress:
    # 仅允许 API Gateway 访问
    - action: Allow
      protocol: TCP
      source:
        selector: app == 'api-gateway' && namespaceSelector == 'production'
        nets:
          - 10.0.0.0/16
      destination:
        ports:
          - 8443
    # 允许健康检查
    - action: Allow
      protocol: TCP
      source:
        selector: component == 'kube-proxy'
      destination:
        ports:
          - 8080
          - 8081

---
# 数据库微隔离 - 仅允许应用层访问
apiVersion: projectcalico.org/v3
kind: NetworkPolicy
metadata:
  name: database-isolation
  namespace: production
spec:
  selector: app in {'postgres', 'redis', 'mongodb'}
  types:
    - Ingress
    - Egress
  ingress:
    # 仅允许应用服务器连接数据库
    - action: Allow
      protocol: TCP
      source:
        selector: tier == 'application'
      destination:
        ports:
          - 5432   # PostgreSQL
          - 6379   # Redis
          - 27017  # MongoDB
  egress:
    # 数据库只能访问 DNS 和同集群节点
    - action: Allow
      protocol: TCP
      destination:
        selector: k8s-app == 'kube-dns'
        ports:
          - 53
          - 9153
    - action: Allow
      protocol: TCP
      destination:
        selector: app in {'postgres-replica'}
        namespaceSelector: production
      destination:
        ports:
          - 5432

eBPF 实现的零信任网络层
#

/*
 * eBPF 零信任网络过滤
 * 在内核网络层实现基于身份的流量控制
 * 使用 XDP (eXpress Data Path) 实现高性能包过滤
 */

#include <linux/bpf.h>
#include <linux/if_ether.h>
#include <linux/ip.h>
#include <linux/tcp.h>
#include <linux/in.h>
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_endian.h>

/* 设备信任状态映射 */
struct {
    __uint(type, BPF_MAP_TYPE_HASH);
    __uint(max_entries, 65536);
    __type(key, __u32);     /* source IP */
    __type(value, __u8);    /* trust level: 0=untrusted, 1=device_ok, 2=mfa_ok */
} device_trust_map SEC(".maps");

/* 服务间访问策略 */
struct access_policy {
    __u32 src_ip;
    __u32 dst_ip;
    __u16 dst_port;
    __u8  required_trust_level;
    __u8  action;           /* 0=deny, 1=allow */
};

struct {
    __uint(type, BPF_MAP_TYPE_ARRAY);
    __uint(max_entries, 1024);
    __type(key, __u32);
    __type(value, struct access_policy);
} access_policies SEC(".maps");

/* 审计日志 */
struct audit_event {
    __u32 src_ip;
    __u32 dst_ip;
    __u16 dst_port;
    __u8  action;
    __u8  trust_level;
    __u32 timestamp;
};

struct {
    __uint(type, BPF_MAP_TYPE_RINGBUF);
    __uint(max_entries, 256 * 1024);
} audit_log SEC(".maps");

SEC("xdp")
int zero_trust_filter(struct xdp_md *ctx)
{
    void *data = (void *)(long)ctx->data;
    void *data_end = (void *)(long)ctx->data_end;

    struct ethhdr *eth = data;
    if ((void *)eth + sizeof(*eth) > data_end)
        return XDP_PASS;

    /* 仅处理 IPv4 TCP */
    if (eth->h_proto != bpf_htons(ETH_P_IP))
        return XDP_PASS;

    struct iphdr *ip = (void *)eth + sizeof(*eth);
    if ((void *)ip + sizeof(*ip) > data_end)
        return XDP_PASS;

    if (ip->protocol != IPPROTO_TCP)
        return XDP_PASS;

    struct tcphdr *tcp = (void *)ip + sizeof(*ip);
    if ((void *)tcp + sizeof(*tcp) > data_end)
        return XDP_PASS;

    __u32 src_ip = ip->saddr;
    __u32 dst_ip = ip->daddr;
    __u16 dst_port = bpf_ntohs(tcp->dest);

    /* 1. 检查设备信任状态 */
    __u8 *trust_level = bpf_map_lookup_elem(&device_trust_map, &src_ip);
    if (!trust_level) {
        /* 未注册设备 - 默认拒绝 */
        __u8 action = 0; /* deny */
        bpf_ringbuf_output(&audit_log, &(struct audit_event){
            .src_ip = src_ip,
            .dst_ip = dst_ip,
            .dst_port = dst_port,
            .action = action,
            .trust_level = 0,
            .timestamp = bpf_ktime_get_ns(),
        }, sizeof(struct audit_event), 0);
        return XDP_DROP;
    }

    /* 2. 检查访问策略 */
    __u32 policy_idx = 0;
    struct access_policy *policy;
    int i;

    for (i = 0; i < 1024; i++) {
        policy = bpf_map_lookup_elem(&access_policies, &policy_idx);
        if (!policy || !policy->dst_port) {
            policy_idx++;
            continue;
        }

        if (policy->src_ip == src_ip &&
            policy->dst_ip == dst_ip &&
            policy->dst_port == dst_port) {

            if (*trust_level >= policy->required_trust_level) {
                /* 信任等级满足 - 允许 */
                bpf_ringbuf_output(&audit_log, &(struct audit_event){
                    .src_ip = src_ip,
                    .dst_ip = dst_ip,
                    .dst_port = dst_port,
                    .action = 1,
                    .trust_level = *trust_level,
                    .timestamp = bpf_ktime_get_ns(),
                }, sizeof(struct audit_event), 0);
                return XDP_PASS;
            } else {
                /* 信任等级不足 - 拒绝 */
                bpf_ringbuf_output(&audit_log, &(struct audit_event){
                    .src_ip = src_ip,
                    .dst_ip = dst_ip,
                    .dst_port = dst_port,
                    .action = 0,
                    .trust_level = *trust_level,
                    .timestamp = bpf_ktime_get_ns(),
                }, sizeof(struct audit_event), 0);
                return XDP_DROP;
            }
        }
        policy_idx++;
    }

    /* 默认拒绝 - 零信任原则 */
    return XDP_DROP;
}

char _license[] SEC("license") = "GPL";

服务网格 mTLS:Istio 零信任通信
#

# Istio 服务网格 - 全局 mTLS 强制配置
# 确保服务间通信全部加密并双向认证

# 全局 mTLS 策略
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
  name: default-mtls
  namespace: istio-system
spec:
  mtls:
    mode: STRICT  # 强制双向 TLS
---
# 命名空间级别的严格 mTLS
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
  name: production-strict-mtls
  namespace: production
spec:
  mtls:
    mode: STRICT
---
# 服务间授权策略 - 零信任访问控制
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: api-server-policy
  namespace: production
spec:
  selector:
    matchLabels:
      app: api-server
  action: ALLOW
  rules:
    # 仅允许 api-gateway 服务访问
    - from:
        - source:
            principals:
              - "cluster.local/ns/production/sa/api-gateway"
      to:
        - operation:
            methods: ["GET", "POST"]
            paths: ["/api/v1/*"]
---
# 管理面板 - 严格限制
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: admin-panel-policy
  namespace: production
spec:
  selector:
    matchLabels:
      app: admin-panel
  action: DENY
  rules:
    # 拒绝所有未明确允许的流量
    - from:
        - source:
            notPrincipals:
              - "cluster.local/ns/production/sa/authorized-admin"

Google BeyondCorp 与 SASE 模型对比
#

BeyondCorp 架构
#

┌─────────────────────────────────────────────────────────────┐
│                    BeyondCorp 架构                           │
│                                                             │
│  ┌───────────┐     ┌─────────────────┐     ┌─────────────┐ │
│  │ 用户设备   │────>│ 身份验证网关     │────>│ 访问代理     │ │
│  │           │     │ (Identity-aware) │     │ (App Proxy) │ │
│  │ 设备信任   │     │ - SSO 认证      │     │ - 应用访问   │ │
│  │ 评估       │     │ - MFA 验证      │     │ - 会话管理   │ │
│  │           │     │ - 设备合规检查   │     │ - 审计日志   │ │
│  └───────────┘     └─────────────────┘     └─────────────┘ │
│                           │                        │        │
│                           ▼                        ▼        │
│                    ┌─────────────┐         ┌───────────┐   │
│                    │  策略引擎   │         │  应用集群  │   │
│                    │             │         │           │   │
│                    │  - 条件访问 │         │  - 微服务  │   │
│                    │  - 风险分析 │         │  - 数据库  │   │
│                    │  - 上下文   │         │           │   │
│                    └─────────────┘         └───────────┘   │
└─────────────────────────────────────────────────────────────┘

SASE (Secure Access Service Edge) 架构
#

┌─────────────────────────────────────────────────────────────┐
│                    SASE 融合架构                             │
│                                                             │
│  ┌──────────────────────────────────────────────────────┐   │
│  │                   Cloud-native POP                    │   │
│  │  ┌────────┐ ┌────────┐ ┌────────┐ ┌──────────────┐   │   │
│  │  │SD-WAN  │ │FWaaS   │ │ZTNA    │ │SWG / CASB   │   │   │
│  │  │        │ │        │ │        │ │             │   │   │
│  │  └────────┘ └────────┘ └────────┘ └──────────────┘   │   │
│  │                      │                                │   │
│  └──────────────────────┼────────────────────────────────┘   │
│                         │                                    │
│           ┌─────────────┼─────────────┐                      │
│           │             │             │                      │
│     ┌─────▼─────┐ ┌─────▼─────┐ ┌────▼─────┐               │
│     │ Branch    │ │ HQ DC     │ │ Cloud    │               │
│     │ Office    │ │           │ │ Workload │               │
│     └───────────┘ └───────────┘ └──────────┘               │
└─────────────────────────────────────────────────────────────┘

BeyondCorp vs SASE 对比表
#

┌──────────────┬────────────────────────┬──────────────────────────┐
│ 特性         │ BeyondCorp             │ SASE                     │
├──────────────┼────────────────────────┼──────────────────────────┤
│ 核心理念     │ 身份为中心的访问控制    │ 网络与安全功能云化融合   │
│ 部署模式     │ 内部实现为主            │ SaaS 服务模式             │
│ 网络功能     │ 应用层代理              │ SD-WAN + 安全功能栈       │
│ 安全能力     │ ZTNA + 条件访问        │ FWaaS + SWG + CASB + DLP │
│ 适用场景     │ 企业内部应用            │ 分支机构 + 多云 + 远程    │
│ 实施复杂度   │ 高(需要改造应用架构)  │ 中(SaaS 提供商托管)     │
│ 典型厂商     │ Google                  │ Palo Alto, Netscaler    │
│              │                        │ Cisco, Fortinet          │
└──────────────┴────────────────────────┴──────────────────────────┘

云网络微隔离:Terraform 实现
#

# Terraform - 零信任云网络分段
# 实现基于标签的精细网络隔离

variable "tags" {
  default = {
    Environment = "production"
    Team        = "platform"
  }
}

# === 网络分段 (VPC) ===

# 前端 VPC - 面向用户的 Web 服务
resource "aws_vpc" "frontend" {
  cidr_block           = "10.1.0.0/16"
  enable_dns_support   = true
  enable_dns_hostnames = true

  tags = merge(var.tags, {
    Name        = "frontend-vpc"
    SecurityTier = "perimeter"
  })
}

# 应用 VPC - 业务逻辑层
resource "aws_vpc" "application" {
  cidr_block           = "10.2.0.0/16"
  enable_dns_support   = true
  enable_dns_hostnames = true

  tags = merge(var.tags, {
    Name        = "application-vpc"
    SecurityTier = "internal"
  })
}

# 数据 VPC - 数据库层
resource "aws_vpc" "database" {
  cidr_block           = "10.3.0.0/16"
  enable_dns_support   = true
  enable_dns_hostnames = true

  tags = merge(var.tags, {
    Name        = "database-vpc"
    SecurityTier = "restricted"
  })
}

# === 跨 VPC 安全组规则 - 最小权限 ===

# 前端 -> 应用:仅允许 API 端口
resource "aws_security_group" "frontend_to_app" {
  name        = "frontend-to-app"
  description = "Allow frontend to application API calls only"
  vpc_id      = aws_vpc.frontend.id

  egress {
    description = "Application API"
    from_port   = 8443
    to_port     = 8443
    protocol    = "tcp"
    # 通过 VPC Peering 指向应用 VPC
    cidr_blocks = ["10.2.0.0/16"]
  }

  egress {
    description = "Health checks"
    from_port   = 8080
    to_port     = 8081
    protocol    = "tcp"
    cidr_blocks = ["10.2.0.0/16"]
  }

  tags = {
    Name = "frontend-to-app-sg"
  }
}

# 应用 -> 数据库:精确的数据库端口
resource "aws_security_group" "app_to_db" {
  name        = "application-to-database"
  description = "Allow application servers to database only"
  vpc_id      = aws_vpc.application.id

  egress {
    description = "PostgreSQL"
    from_port   = 5432
    to_port     = 5432
    protocol    = "tcp"
    cidr_blocks = ["10.3.0.0/16"]
  }

  egress {
    description = "Redis"
    from_port   = 6379
    to_port     = 6379
    protocol    = "tcp"
    cidr_blocks = ["10.3.0.0/16"]
  }

  # 禁止出站互联网访问
  # 无 0.0.0.0/0 的 egress 规则

  tags = {
    Name = "app-to-db-sg"
  }
}

# 数据层安全组 - 默认拒绝所有入站
resource "aws_security_group" "database_default_deny" {
  name        = "database-default-deny"
  description = "Default deny all inbound - explicit allow only"
  vpc_id      = aws_vpc.database.id

  # 无入站规则 = 默认拒绝
  # 仅允许来自应用 VPC 的特定端口
  ingress {
    description     = "PostgreSQL from app tier"
    from_port       = 5432
    to_port         = 5432
    protocol        = "tcp"
    security_groups = [aws_security_group.app_to_db.id]
  }

  tags = {
    Name = "database-default-deny"
  }
}

# === VPC 端点 - 避免流量经 NAT/互联网 ===

# S3 VPC 端点
resource "aws_vpc_endpoint" "s3" {
  vpc_id            = aws_vpc.application.id
  service_name      = "com.amazonaws.${var.region}.s3"
  vpc_endpoint_type = "Gateway"

  route_table_ids = [
    aws_route_table.application_private.id
  ]

  tags = {
    Name = "app-vpc-s3-endpoint"
  }
}

# Secrets Manager VPC 端点
resource "aws_vpc_endpoint" "secrets_manager" {
  vpc_id              = aws_vpc.application.id
  service_name        = "com.amazonaws.${var.region}.secretsmanager"
  vpc_endpoint_type   = "Interface"

  security_group_ids  = [aws_security_group.app_to_db.id]
  subnet_ids          = aws_subnet.application_private[*].id

  private_dns_enabled = true

  tags = {
    Name = "app-vpc-secrets-endpoint"
  }
}

持续验证与行为分析
#

零信任不是一次性认证,而是持续的信任评估过程。

"""
持续信任评估模块
监控用户行为并动态调整信任等级
"""

import time
import hashlib
from collections import defaultdict, deque
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime, timedelta


@dataclass
class BehaviorEvent:
    """用户行为事件"""
    timestamp: datetime
    user_id: str
    event_type: str       # "login", "resource_access", "data_export", "admin_action"
    resource: str
    action: str           # "read", "write", "delete", "execute"
    source_ip: str
    device_id: str
    geo_location: Dict
    metadata: Dict = field(default_factory=dict)


@dataclass
class UserBehaviorProfile:
    """用户行为档案"""
    user_id: str
    # 历史行为窗口(滑动窗口)
    access_history: deque = field(default_factory=lambda: deque(maxlen=1000))
    # 统计指标
    typical_hours: List[int] = field(default_factory=lambda: list(range(8, 20)))
    typical_locations: List[str] = field(default_factory=list)
    typical_resources: List[str] = field(default_factory=list)
    # 信任评分
    trust_score: float = 100.0
    last_risk_assessment: Optional[datetime] = None
    # 告警
    alerts: List[Dict] = field(default_factory=list)


class ContinuousTrustEvaluator:
    """持续信任评估器"""

    ANOMALY_WEIGHTS = {
        'off_hours_access': 15,
        'new_location': 25,
        'unusual_resource': 20,
        'data_export_volume': 30,
        'privilege_escalation': 40,
        'lateral_movement': 35,
        'access_frequency_spike': 20,
        'impossible_travel': 45,
    }

    def __init__(self, trust_decay_rate: float = 0.1, alert_threshold: float = 60.0):
        self.user_profiles: Dict[str, UserBehaviorProfile] = {}
        self.trust_decay_rate = trust_decay_rate
        self.alert_threshold = alert_threshold

    def process_event(self, event: BehaviorEvent) -> Optional[Dict]:
        """处理行为事件并更新信任评分"""
        profile = self._get_or_create_profile(event.user_id)

        # 1. 添加事件到历史
        profile.access_history.append(event)

        # 2. 检测异常行为
        anomalies = self._detect_anomalies(profile, event)

        # 3. 更新信任评分
        trust_change = self._calculate_trust_change(anomalies)
        profile.trust_score = max(0.0, min(100.0, profile.trust_score + trust_change))

        # 4. 判断是否需要告警
        alert = None
        if profile.trust_score <= self.alert_threshold:
            alert = {
                'user_id': event.user_id,
                'trust_score': round(profile.trust_score, 1),
                'risk_level': self._score_to_level(profile.trust_score),
                'anomalies': anomalies,
                'recommendation': self._generate_recommendation(profile),
                'timestamp': datetime.utcnow().isoformat(),
            }
            profile.alerts.append(alert)

        # 5. 定期衰减(信任衰减模型)
        if profile.last_risk_assessment:
            hours_since = (datetime.utcnow() - profile.last_risk_assessment).total_seconds() / 3600
            profile.trust_score = max(0, profile.trust_score - self.trust_decay_rate * hours_since)

        profile.last_risk_assessment = datetime.utcnow()

        return alert

    def _detect_anomalies(
        self, profile: UserBehaviorProfile, event: BehaviorEvent
    ) -> List[Dict]:
        """检测行为异常"""
        anomalies = []

        # 1. 非正常工作时间访问
        if event.timestamp.hour not in profile.typical_hours:
            anomalies.append({
                'type': 'off_hours_access',
                'severity': self.ANOMALY_WEIGHTS['off_hours_access'],
                'detail': f"Access at hour {event.timestamp.hour} (typical: {profile.typical_hours[0]}-{profile.typical_hours[-1]})"
            })

        # 2. 新地理位置
        country = event.geo_location.get('country_code', '')
        if country and country not in profile.typical_locations:
            # 检查是否为不可能旅行
            if profile.access_history:
                last_event = profile.access_history[-1]
                time_diff = (event.timestamp - last_event.timestamp).total_seconds() / 3600
                if time_diff < 2:  # 2小时内跨国家
                    anomalies.append({
                        'type': 'impossible_travel',
                        'severity': self.ANOMALY_WEIGHTS['impossible_travel'],
                        'detail': f"Impossible travel: {last_event.geo_location} -> {event.geo_location} in {time_diff:.1f}h"
                    })
                else:
                    anomalies.append({
                        'type': 'new_location',
                        'severity': self.ANOMALY_WEIGHTS['new_location'],
                        'detail': f"New country: {country}"
                    })

        # 3. 不常访问的资源
        if event.resource not in profile.typical_resources:
            access_count = sum(
                1 for e in profile.access_history
                if e.resource == event.resource
            )
            if access_count < 3:
                anomalies.append({
                    'type': 'unusual_resource',
                    'severity': self.ANOMALY_WEIGHTS['unusual_resource'],
                    'detail': f"Rarely accessed resource: {event.resource}"
                })

        # 4. 数据导出异常
        if event.action == 'read' and event.metadata.get('bytes_out', 0) > 10_000_000:
            anomalies.append({
                'type': 'data_export_volume',
                'severity': self.ANOMALY_WEIGHTS['data_export_volume'],
                'detail': f"Large data export: {event.metadata['bytes_out']} bytes"
            })

        # 5. 管理员操作
        if event.event_type == 'admin_action':
            anomalies.append({
                'type': 'privilege_escalation',
                'severity': self.ANOMALY_WEIGHTS['privilege_escalation'],
                'detail': f"Admin action: {event.action} on {event.resource}"
            })

        # 6. 横向移动检测
        unique_resources = len(set(e.resource for e in profile.access_history[-50:]))
        if unique_resources > 20:
            anomalies.append({
                'type': 'lateral_movement',
                'severity': self.ANOMALY_WEIGHTS['lateral_movement'],
                'detail': f"Accessing {unique_resources} unique resources in last 50 events"
            })

        # 7. 访问频率突增
        recent_events = [
            e for e in profile.access_history
            if (datetime.utcnow() - e.timestamp).total_seconds() < 300
        ]
        if len(recent_events) > 50:
            anomalies.append({
                'type': 'access_frequency_spike',
                'severity': self.ANOMALY_WEIGHTS['access_frequency_spike'],
                'detail': f"{len(recent_events)} events in last 5 minutes"
            })

        return anomalies

    def _calculate_trust_change(self, anomalies: List[Dict]) -> float:
        """计算信任评分变化"""
        total_severity = sum(a['severity'] for a in anomalies)
        # 异常扣分,无异常时缓慢恢复
        if anomalies:
            return -total_severity
        else:
            return 2.0  # 每次正常操作恢复少量信任分

    def _score_to_level(self, score: float) -> str:
        if score >= 80:
            return "trusted"
        elif score >= 60:
            return "verified"
        elif score >= 40:
            return "suspicious"
        elif score >= 20:
            return "risky"
        return "compromised"

    def _generate_recommendation(self, profile: UserBehaviorProfile) -> str:
        """基于信任评分生成建议"""
        score = profile.trust_score
        if score >= 80:
            return "No action required"
        elif score >= 60:
            return "Consider additional MFA challenge"
        elif score >= 40:
            return "Require step-up authentication and session review"
        elif score >= 20:
            return "Block access pending security review"
        return "Immediately revoke session and investigate"

    def _get_or_create_profile(self, user_id: str) -> UserBehaviorProfile:
        if user_id not in self.user_profiles:
            self.user_profiles[user_id] = UserBehaviorProfile(user_id=user_id)
        return self.user_profiles[user_id]

实施路径:从零信任理念到落地
#

阶段 1:资产盘点 (Weeks 1-4)
├── 识别所有应用、数据和用户
├── 绘制现有网络拓扑
├── 分类数据敏感度
└── 输出:资产清单 + 数据流图

阶段 2:身份基础设施 (Weeks 5-10)
├── 部署统一身份提供商 (IdP)
├── 启用 MFA(所有用户)
├── 实施条件访问策略
├── 部署设备管理 (MDM/UEM)
└── 输出:身份认证体系 + MFA 100% 覆盖

阶段 3:微隔离试点 (Weeks 11-18)
├── 选择试点应用/服务
├── 部署服务网格 (Istio)
├── 实施默认拒绝策略
├── 建立白名单通信
└── 输出:试点微隔离环境

阶段 4:逐步扩展 (Weeks 19-30)
├── 逐步覆盖关键业务
├── 部署 eBPF 网络策略
├── 实施端到端 mTLS
├── 整合日志和监控
└── 输出:核心业务零信任化

阶段 5:持续优化 (Weeks 31+)
├── 行为分析与异常检测
├── 自适应策略调优
├── 自动化响应
└── 输出:智能零信任平台

结语
#

零信任不是一种产品,而是一种思维方式。它要求我们放弃对"内部网络"的信任幻想,将安全控制从网络层提升到身份层和应用层。实施零信任是一段旅程——从身份验证开始,逐步建立设备信任,部署微隔离,最终实现持续验证的自适应安全体系。

在这个过程中,最重要的不是选择哪个厂商的方案,而是坚持零信任的核心原则:永不信任,始终验证。只有这样,我们才能在不确定的威胁环境中,构建真正 resilient 的安全架构。