引言:数据库——渗透攻击的终极目标 #
在每一次渗透测试和安全评估中,数据库始终是高价值目标的代名词。无论是用户凭证、交易记录、商业机密还是个人隐私数据,最终都存储在某个数据库中。根据 Verizon DBIR 2024 报告,超过 70% 的数据泄露事件涉及数据库攻击,而 SQL 注入依然是 OWASP Top 10 中最常见、破坏力最强的漏洞之一。
然而,数据库安全问题远不止 SQL 注入。配置错误、默认口令、过度授权、明文存储、缺乏审计——这些日常被忽视的配置层面问题,往往比单一漏洞造成更广泛的影响。本文将系统化地讲解数据库安全加固的工程实践,从配置审计到注入防御,从加密方案到监控体系,覆盖主流数据库的安全加固全流程。
数据库安全审计清单 #
MySQL 安全配置审计 #
MySQL 是最广泛使用的开源关系型数据库,其默认配置中存在多个安全隐患:
# mysql_secure_configuration.cnf
# 安全加固后的 MySQL 配置
[mysqld]
# === 网络连接安全 ===
# 禁用远程 root 登录 (仅监听本地)
bind-address = 127.0.0.1
# 如果必须暴露到网络,限制端口
port = 3306
# 禁用 SSL 以外的明文连接
require_secure_transport = ON
# === 认证安全 ===
# 使用强认证插件
default_authentication_plugin = caching_sha2_password
# 启用密码策略
validate_password.policy = STRONG
validate_password.length = 14
validate_password.mixed_case_count = 1
validate_password.number_count = 1
validate_password.special_char_count = 1
# === 文件操作安全 ===
# 禁用文件读写 (防止 INTO OUTFILE 攻击)
secure_file_priv = /var/lib/mysql-files/
# === 日志安全 ===
# 启用审计日志
plugin_load_add = audit_log.so
audit_log_policy = ALL
audit_log_format = JSON
# 禁用查询日志中的敏感信息
general_log = OFF
slow_query_log = ON
log_slow_admin_statements = ON
# === 权限控制 ===
# 禁用 LOCAL INFILE (防止本地文件读取)
local_infile = 0
# 限制最大连接
max_connections = 200
# === 加密 ===
# 启用 TLS 1.2+
ssl-ca = /etc/mysql/certs/ca.pem
ssl-cert = /etc/mysql/certs/server-cert.pem
ssl-key = /etc/mysql/certs/server-key.pem
tls_version = TLSv1.2,TLSv1.3PostgreSQL 安全配置审计 #
# postgresql.conf - 安全加固配置
# === 网络连接 ===
listen_addresses = 'localhost'
port = 5432
ssl = on
ssl_cert_file = '/etc/postgresql/certs/server.crt'
ssl_key_file = '/etc/postgresql/certs/server.key'
ssl_ca_file = '/etc/postgresql/certs/ca.crt'
ssl_min_protocol_version = 'TLSv1.2'
ssl_ciphers = 'HIGH:MEDIUM:+3DES:!aNULL'
# === 认证 ===
password_encryption = 'scram-sha-256'
log_destination = 'stderr'
logging_collector = on
log_directory = 'log'
log_filename = 'postgresql-%Y-%m-%d.log'
# === 安全日志 ===
log_connections = on
log_disconnections = on
log_statement = 'ddl' # 记录所有 DDL 语句
log_line_prefix = '%m [%p] %q%u@%d '
log_hostnames = on
# === 权限限制 ===
# 禁用不需要的扩展
shared_preload_libraries = 'pg_stat_statements'
# 限制连接
max_connections = 100# pg_hba.conf - 精细访问控制
# TYPE DATABASE USER ADDRESS METHOD
# 本地管理连接 - 仅限特定用户
local postgres dbadmin peer map=admin_map
# 应用连接 - 要求 SSL + 密码
hostssl appdb app_user 10.0.0.0/24 scram-sha-256
hostssl analytics analytics_ro 10.0.1.0/24 scram-sha-256
# 复制连接
hostssl replication replicator 10.0.0.10/32 scram-sha-256
# 拒绝所有其他连接
host all all 0.0.0.0/0 rejectMongoDB 安全配置审计 #
# mongod.conf - 安全加固配置
# MongoDB 默认安装后没有启用认证,这是一个严重的默认配置缺陷
# === 网络配置 ===
net:
port: 27017
bindIp: 127.0.0.1 # 仅监听本地
maxIncomingConnections: 200
wireObjectCheck: true # 拒绝 BSON 无效内容
tls:
mode: requireTLS
certificateKeyFile: /etc/mongo/certs/mongodb.pem
CAFile: /etc/mongo/certs/ca.pem
minTLSVersion: "TLS1_2"
# === 安全配置 ===
security:
authorization: enabled # 启用基于角色的访问控制
javascriptEnabled: false # 禁用 JavaScript 执行 ($where)
enableLocalhostAuthBypass: false # 禁用本地认证绕过
clusterAuthMode: "x509" # 副本集认证
# === 审计 ===
auditLog:
destination: file
format: BSON
path: /var/log/mongodb/audit.bson
filter: '{ atype: { $in: [ "authCheck", "createCollection", "dropCollection" ] } }'
# === 操作限制 ===
operationProfiling:
mode: slowOp
slowOpThresholdMs: 100
slowOpSampling:
rate: 1.0SQL 注入:从基础到高级攻击技术 #
SQL 注入的根本原因 #
SQL 注入的本质是将用户输入直接拼接到 SQL 查询字符串中,导致攻击者可以篡改查询语义:
# 漏洞代码 - 永远不要这样做
def get_user(username):
query = f"SELECT * FROM users WHERE username = '{username}'"
cursor.execute(query)
return cursor.fetchone()
# 攻击者输入:
# username = "' OR '1'='1' --"
# 生成的 SQL:
# SELECT * FROM users WHERE username = '' OR '1'='1' --'
# 结果: 返回所有用户记录高级 SQL 注入技术 #
1. 时间基盲注 (Time-based Blind Injection) #
当应用不返回查询结果,但执行时间可以被观测时:
-- 判断数据库类型
' AND (SELECT CASE WHEN (SELECT COUNT(*) FROM pg_tables) > 0 THEN
pg_sleep(5) ELSE pg_sleep(0) END) --
-- 逐字符爆破数据库名
' AND (SELECT CASE WHEN
substr((SELECT current_database()), 1, 1) = 'p'
THEN pg_sleep(5) ELSE pg_sleep(0) END) --
-- 使用条件延迟实现二分查找(高效爆破)
' AND (SELECT CASE WHEN
ascii(substr((SELECT current_database()), 1, 1)) > 100
THEN pg_sleep(5) ELSE pg_sleep(0) END) --2. 二阶 SQL 注入 (Second-Order SQL Injection) #
恶意数据先被存储,然后在后续查询中被使用:
# 阶段1: 注入恶意数据(注册时)
# 用户名被"安全"处理并存储
def register_user(username, email):
# 使用了参数化查询,看似安全
cursor.execute(
"INSERT INTO users (username, email) VALUES (%s, %s)",
(username, email)
)
# 攻击者注册的用户名: admin'--
# 存储的数据: admin'--
# 阶段2: 恶意数据在后续查询中被使用(不安全的内部逻辑)
def change_password(username, new_password):
# 从数据库读取用户名后直接拼接!
stored_username = get_stored_username(username)
query = f"UPDATE users SET password = '{new_password}' WHERE username = '{stored_username}'"
cursor.execute(query)
# 实际执行:
# UPDATE users SET password = 'hacked' WHERE username = 'admin'--'
# 结果: 管理员密码被篡改3. 多语注入 (Polyglot Injection) #
能够跨多种数据库引擎工作的注入载荷:
-- 适用于 MySQL + PostgreSQL + SQLite
'; SELECT CASE WHEN (SELECT 1=1) THEN 'true' ELSE 'false' END; --
-- 适用于 SQL Server + MySQL
'; IF (1=1) WAITFOR DELAY '0:0:5' --
-- 跨数据库版本信息提取
' UNION SELECT NULL,
CASE WHEN (SELECT @@version) IS NOT NULL
THEN (SELECT @@version)
ELSE (SELECT version()) END --自动化 SQL 注入检测脚本 #
#!/usr/bin/env python3
"""
SQL 注入自动化检测工具
支持多种注入类型的检测与验证
"""
import re
import time
import requests
import argparse
import logging
from typing import Dict, List, Optional, Tuple
from urllib.parse import urlencode, urlparse, parse_qs
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger('sqli_detector')
class SQLInjectionDetector:
"""SQL 注入检测引擎"""
# 常见错误消息特征
ERROR_PATTERNS = {
'MySQL': [
r'You have an error in your SQL syntax',
r'Warning.*mysql_',
r'check the manual that corresponds to your MySQL',
r'Unknown column.*in .+field list',
r'Column count doesn\'t match value count',
],
'PostgreSQL': [
r'ERROR:.*parse error at or near',
r'ERROR:.*syntax error at or near',
r'query failed:.*ERROR',
r'pg_query.*\[\]:',
r'Warning:.*pg_.*\(\)',
],
'SQL Server': [
r'SqlException:',
r'Microsoft OLE DB Provider',
r'Incorrect syntax near',
r'Syntax error in string in query expression',
r'Unclosed quotation mark',
],
'SQLite': [
r'SQLITE_ERROR:',
r'near.*syntax error',
r'SQLite3::SQLException',
r'SQL error or missing database',
],
'Oracle': [
r'ORA-[0-9]{5}:',
r'Oracle.*Driver',
r'Warning:.*oci_',
],
}
# 盲注检测载荷
BLIND_PAYLOADS = [
# Boolean-based
{"payload": "' AND '1'='1", "expect": "match"},
{"payload": "' AND '1'='2", "expect": "diff"},
{"payload": "' OR 1=1--", "expect": "match"},
{"payload": "' AND 1=2--", "expect": "diff"},
# Time-based
{"payload": "' OR SLEEP(5)--", "time": 5},
{"payload": "'; WAITFOR DELAY '0:0:5'--", "time": 5},
{"payload": "' OR pg_sleep(5)--", "time": 5},
# UNION-based
{"payload": "' UNION SELECT NULL--", "error": True},
{"payload": "' UNION SELECT NULL,NULL--", "error": True},
{"payload": "' UNION SELECT NULL,NULL,NULL--", "error": True},
]
def __init__(self, timeout: int = 30, delay_threshold: float = 4.0):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'SecurityScanner/1.0',
})
self.timeout = timeout
self.delay_threshold = delay_threshold
self.findings: List[Dict] = []
def detect_sqli_in_parameter(
self,
url: str,
param_name: str,
param_value: str,
method: str = 'GET'
) -> List[Dict]:
"""检测单个参数的 SQL 注入漏洞"""
findings = []
logger.info(f"Testing parameter: {param_name} in {url}")
# 1. 错误基注入检测
error_result = self._test_error_based(url, param_name, param_value, method)
if error_result:
findings.append(error_result)
# 2. 布尔基盲注检测
boolean_result = self._test_boolean_based(url, param_name, param_value, method)
if boolean_result:
findings.append(boolean_result)
# 3. 时间基盲注检测
time_result = self._test_time_based(url, param_name, param_value, method)
if time_result:
findings.append(time_result)
# 4. UNION 基注入检测
union_result = self._test_union_based(url, param_name, param_value, method)
if union_result:
findings.append(union_result)
return findings
def _test_error_based(
self, url: str, param: str, value: str, method: str
) -> Optional[Dict]:
"""错误基注入检测"""
payloads = ["'", '"', "''", "' OR '1'='1'", "1' ORDER BY 1--"]
for payload in payloads:
test_value = f"{value}{payload}"
response = self._make_request(url, param, test_value, method)
if not response:
continue
for db_name, patterns in self.ERROR_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, response.text, re.IGNORECASE):
return {
'type': 'Error-Based SQL Injection',
'database': db_name,
'parameter': param,
'payload': payload,
'url': url,
'evidence': pattern,
'severity': 'CRITICAL',
}
return None
def _test_boolean_based(
self, url: str, param: str, value: str, method: str
) -> Optional[Dict]:
"""布尔基盲注检测"""
# 获取基准响应
baseline = self._make_request(url, param, value, method)
if not baseline:
return None
baseline_hash = hash(baseline.text)
baseline_len = len(baseline.text)
# 测试 True/False 条件
true_payloads = [
f"' AND '1'='1",
f"' AND 1=1--",
f"1' AND '1'='1",
]
false_payloads = [
f"' AND '1'='2",
f"' AND 1=2--",
f"1' AND '1'='0",
]
true_responses = []
false_responses = []
for payload in true_payloads:
resp = self._make_request(url, param, f"{value}{payload}", method)
if resp:
true_responses.append(len(resp.text))
for payload in false_payloads:
resp = self._make_request(url, param, f"{value}{payload}", method)
if resp:
false_responses.append(len(resp.text))
# 分析响应差异
if true_responses and false_responses:
true_avg = sum(true_responses) / len(true_responses)
false_avg = sum(false_responses) / len(false_responses)
if abs(true_avg - false_avg) > baseline_len * 0.1:
# True 和 False 条件产生了显著不同的响应
return {
'type': 'Boolean-Based Blind SQL Injection',
'parameter': param,
'url': url,
'baseline_length': baseline_len,
'true_avg_length': true_avg,
'false_avg_length': false_avg,
'severity': 'HIGH',
}
return None
def _test_time_based(
self, url: str, param: str, value: str, method: str
) -> Optional[Dict]:
"""时间基盲注检测"""
time_payloads = [
{"payload": "' OR SLEEP(5)--", "db": "MySQL"},
{"payload": "'; WAITFOR DELAY '0:0:5'--", "db": "SQL Server"},
{"payload": "' OR pg_sleep(5)--", "db": "PostgreSQL"},
{"payload": "' || dbms_pipe.receive_message(('a'),5) FROM dual--", "db": "Oracle"},
]
for item in time_payloads:
payload = item['payload']
db = item['db']
# 发送延迟载荷
start_time = time.time()
response = self._make_request(
url, param, f"{value}{payload}", method
)
elapsed = time.time() - start_time
if elapsed >= self.delay_threshold and response:
return {
'type': 'Time-Based Blind SQL Injection',
'database': db,
'parameter': param,
'payload': payload,
'url': url,
'response_time': round(elapsed, 2),
'severity': 'CRITICAL',
}
return None
def _test_union_based(
self, url: str, param: str, value: str, method: str
) -> Optional[Dict]:
"""UNION 基注入检测"""
union_payloads = []
# 尝试不同列数
for i in range(1, 20):
columns = ','.join([f'NULL'] * i)
union_payloads.append(f"' UNION SELECT {columns}--")
baseline = self._make_request(url, param, value, method)
if not baseline:
return None
for payload in union_payloads:
response = self._make_request(url, param, f"{value}{payload}", method)
if not response:
continue
# 检测 UNION 成功特征
# 1. 错误消息变化
for db_name, patterns in self.ERROR_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, response.text, re.IGNORECASE):
# 错误从语法错误变成列数不匹配
if 'column' in response.text.lower() or 'match' in response.text.lower():
return {
'type': 'UNION-Based SQL Injection',
'database': db_name,
'parameter': param,
'payload': payload,
'url': url,
'columns_tested': i,
'severity': 'CRITICAL',
}
# 2. 响应内容显著变化
if abs(len(response.text) - len(baseline.text)) > len(baseline.text) * 0.3:
return {
'type': 'Potential UNION-Based SQL Injection',
'parameter': param,
'payload': payload,
'url': url,
'response_change': f"{len(response.text)} vs {len(baseline.text)}",
'severity': 'HIGH',
}
return None
def _make_request(
self, url: str, param: str, value: str, method: str
) -> Optional[requests.Response]:
"""发送 HTTP 请求"""
try:
if method.upper() == 'GET':
return self.session.get(
url,
params={param: value},
timeout=self.timeout,
allow_redirects=False,
)
else:
return self.session.post(
url,
data={param: value},
timeout=self.timeout,
allow_redirects=False,
)
except requests.exceptions.Timeout:
logger.warning(f"Request timeout for {url}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}")
return None
def scan_url(self, url: str, method: str = 'GET') -> List[Dict]:
"""扫描 URL 中所有参数的 SQL 注入"""
findings = []
parsed = urlparse(url)
params = parse_qs(parsed.query, keep_blank_values=True)
if not params:
logger.warning(f"No parameters found in URL: {url}")
return findings
for param_name, param_values in params.items():
param_value = param_values[0] if param_values else ''
param_findings = self.detect_sqli_in_parameter(
url, param_name, param_value, method
)
findings.extend(param_findings)
self.findings = findings
return findings
# 使用示例
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='SQL Injection Detection Tool')
parser.add_argument('-u', '--url', required=True, help='Target URL')
parser.add_argument('-m', '--method', default='GET', choices=['GET', 'POST'])
args = parser.parse_args()
detector = SQLInjectionDetector()
results = detector.scan_url(args.url, args.method)
if results:
print(f"\n{'='*60}")
print(f"SQL Injection Vulnerabilities Detected!")
print(f"{'='*60}")
for i, finding in enumerate(results, 1):
print(f"\n[{i}] {finding['type']}")
print(f" URL: {finding['url']}")
print(f" Parameter: {finding['parameter']}")
print(f" Severity: {finding['severity']}")
if 'payload' in finding:
print(f" Payload: {finding['payload']}")
if 'database' in finding:
print(f" Database: {finding['database']}")
else:
print(f"\nNo SQL injection vulnerabilities detected.")参数化查询——唯一可靠的防御 #
# === 防御方案:参数化查询(Prepared Statements) ===
# --- Python / psycopg2 (PostgreSQL) ---
import psycopg2
def get_user_safe(username: str) -> dict:
conn = psycopg2.connect(dsn)
cursor = conn.cursor()
# 使用 %s 占位符,驱动层自动处理转义
cursor.execute(
"SELECT id, username, email FROM users WHERE username = %s",
(username,) # 参数作为元组传递
)
result = cursor.fetchone()
cursor.close()
conn.close()
return result
# --- Python / pymysql (MySQL) ---
import pymysql
def get_user_mysql_safe(username: str) -> dict:
conn = pymysql.connect(host='db', user='app', password='secret', database='appdb')
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute(
"SELECT id, username, email FROM users WHERE username = %s",
(username,)
)
return cursor.fetchone()
# --- Python / SQLAlchemy (ORM) ---
from sqlalchemy import text
def get_user_sqlalchemy_safe(username: str, engine) -> dict:
with engine.connect() as conn:
result = conn.execute(
text("SELECT id, username, email FROM users WHERE username = :username"),
{"username": username}
)
return result.mappings().first()
# --- 动态列名怎么办? ---
# 参数化查询不能用于动态标识符(表名、列名)
# 正确做法:白名单验证
ALLOWED_COLUMNS = {'id', 'username', 'email', 'created_at', 'last_login'}
def sort_users_safe(sort_column: str) -> list:
if sort_column not in ALLOWED_COLUMNS:
raise ValueError(f"Invalid sort column: {sort_column}")
# 经过白名单验证后,标识符是安全的
query = text(f"SELECT id, username, email FROM users ORDER BY {sort_column}")
# 但其他值仍然使用参数化
# cursor.execute(query, params)数据库加密实践 #
MySQL TDE(透明数据加密)配置 #
-- MySQL TDE 配置
-- 启用 InnoDB 表空间加密和 redo/undo 日志加密
-- 1. 安装密钥管理插件
INSTALL COMPONENT 'file://component_keyring_file';
-- 2. 创建加密表空间
CREATE TABLE encrypted_users (
id INT PRIMARY KEY,
username VARCHAR(255),
email VARCHAR(255),
ssn VARCHAR(11)
) ENGINE=InnoDB ENCRYPTION='Y';
-- 3. 加密现有表
ALTER TABLE users ENCRYPTION='Y';
-- 4. 验证加密状态
SELECT
TABLE_SCHEMA,
TABLE_NAME,
CREATE_OPTIONS
FROM information_schema.TABLES
WHERE CREATE_OPTIONS LIKE '%ENCRYPTION%';
-- 5. 检查 redo/undo 日志加密
SELECT * FROM performance_schema.keyring_engine_status;PostgreSQL 加密方案 #
-- PostgreSQL 数据加密方案
-- 使用 pgcrypto 扩展进行列级加密
-- 1. 安装扩展
CREATE EXTENSION IF NOT EXISTS pgcrypto;
-- 2. 加密敏感字段
CREATE TABLE secure_users (
id SERIAL PRIMARY KEY,
username VARCHAR(255),
email BYTEA, -- 加密存储
ssn BYTEA, -- 加密存储
password_hash VARCHAR(255) -- bcrypt/scrypt hash
);
-- 3. 插入加密数据
INSERT INTO secure_users (username, email, ssn, password_hash)
VALUES (
'testuser',
pgp_sym_encrypt('test@example.com', 'encryption_key_2024'),
pgp_sym_encrypt('123-45-6789', 'encryption_key_2024'),
crypt('P@ssw0rd!', gen_salt('bf', 12)) -- bcrypt hash
);
-- 4. 解密查询
SELECT
username,
pgp_sym_decrypt(email, 'encryption_key_2024') AS email,
pgp_sym_decrypt(ssn, 'encryption_key_2024') AS ssn
FROM secure_users
WHERE username = 'testuser';
-- 5. 更好的做法:使用应用层密钥管理
-- 密钥从环境变量或 KMS 获取,不在 SQL 中硬编码MongoDB 加密配置 #
# MongoDB 透明数据加密 (TDE)
# 在 mongod.conf 中配置
security:
encryptionKeyFile: /etc/mongo/encryption/keyfile
enableEncryption: true
# encryptionKeyFile 需要创建:
# openssl rand -base64 756 > /etc/mongo/encryption/keyfile
# chmod 400 /etc/mongo/encryption/keyfile
# chown mongodb:mongodb /etc/mongo/encryption/keyfile数据库活动监控与审计 #
审计日志配置 #
-- PostgreSQL 细粒度审计
-- 使用 pgaudit 扩展
-- 在 postgresql.conf 中
shared_preload_libraries = 'pgaudit'
pgaudit.log = 'read, write, ddl, role'
pgaudit.log_catalog = off
pgaudit.log_level = info
pgaudit.log_parameter = on -- 记录查询参数
pgaudit.log_relation = on -- 记录表级操作
-- 审计特定表的所有访问
CREATE POLICY audit_users_policy ON users
FOR ALL
TO PUBLIC
USING (true)
WITH CHECK (true);
-- 审计角色
CREATE ROLE auditor;
GRANT pg_read_all_stats TO auditor;Python 数据库安全审计脚本 #
#!/usr/bin/env python3
"""
数据库安全审计脚本
检查数据库配置、权限、加密和审计状态
"""
import psycopg2
import pymysql
import json
import sys
from typing import Dict, List, Any
from datetime import datetime
class DatabaseSecurityAuditor:
"""数据库安全审计器"""
def __init__(self, db_type: str = 'postgresql'):
self.db_type = db_type
self.findings: List[Dict] = []
def audit_postgresql(self, conn_string: str) -> Dict:
"""审计 PostgreSQL 数据库安全"""
results = {
'timestamp': datetime.utcnow().isoformat(),
'database': 'PostgreSQL',
'categories': {}
}
try:
conn = psycopg2.connect(conn_string)
conn.autocommit = True
cursor = conn.cursor()
# === 1. 密码策略审计 ===
results['categories']['password_policy'] = self._audit_pg_passwords(cursor)
# === 2. 权限审计 ===
results['categories']['permissions'] = self._audit_pg_permissions(cursor)
# === 3. 配置审计 ===
results['categories']['configuration'] = self._audit_pg_config(cursor)
# === 4. 审计日志审计 ===
results['categories']['audit_logging'] = self._audit_pg_logging(cursor)
# === 5. 加密审计 ===
results['categories']['encryption'] = self._audit_pg_encryption(cursor)
cursor.close()
conn.close()
except Exception as e:
results['error'] = str(e)
return results
def _audit_pg_passwords(self, cursor) -> Dict:
"""审计密码策略"""
findings = []
# 检查密码强度
cursor.execute("""
SELECT rolname,
CASE WHEN rolpassword IS NULL THEN 'NO_PASSWORD'
WHEN rolpassword LIKE 'SCRAM-SHA-256%' THEN 'SCRAM-SHA-256'
ELSE 'MD5'
END as auth_method,
rolvaliduntil
FROM pg_authid
WHERE rolcanlogin = true
""")
users = cursor.fetchall()
weak_password_count = 0
for user in users:
username, auth_method, valid_until = user
if auth_method == 'MD5':
findings.append({
'severity': 'HIGH',
'issue': f'User "{username}" uses MD5 password hashing',
'recommendation': 'ALTER USER ... PASSWORD with SCRAM-SHA-256'
})
weak_password_count += 1
if not valid_until:
findings.append({
'severity': 'MEDIUM',
'issue': f'User "{username}" has no password expiration',
'recommendation': 'Set VALID UNTIL for all user accounts'
})
return {
'total_users': len(users),
'weak_password_count': weak_password_count,
'findings': findings
}
def _audit_pg_permissions(self, cursor) -> Dict:
"""审计权限配置"""
findings = []
# 检查超级用户
cursor.execute("""
SELECT rolname FROM pg_roles WHERE rolsuper = true
""")
superusers = cursor.fetchall()
if len(superusers) > 1:
findings.append({
'severity': 'HIGH',
'issue': f'Multiple superusers detected: {[s[0] for s in superusers]}',
'recommendation': 'Only postgres should be superuser'
})
# 检查公共模式权限
cursor.execute("""
SELECT schemaname, tablename, grantee, privilege_type
FROM information_schema.role_table_grants
WHERE grantee = 'PUBLIC'
AND privilege_type IN ('INSERT', 'UPDATE', 'DELETE')
""")
public_grants = cursor.fetchall()
if public_grants:
findings.append({
'severity': 'CRITICAL',
'issue': f'Public users have write permissions on {len(public_grants)} tables',
'tables': [f"{g[0]}.{g[1]}" for g in public_grants],
'recommendation': 'REVOKE ALL ON ... FROM PUBLIC'
})
# 检查默认权限
cursor.execute("""
SELECT defaclnamespace::regnamespace, defaclobjtype,
defaclacl
FROM pg_default_acl
WHERE defaclacl::text LIKE '%PUBLIC%'
""")
default_acls = cursor.fetchall()
return {
'superusers': [s[0] for s in superusers],
'findings': findings
}
def _audit_pg_config(self, cursor) -> Dict:
"""审计数据库配置"""
findings = []
critical_settings = {
'ssl': ('on', 'SSL should be enabled'),
'password_encryption': ('scram-sha-256', 'Use SCRAM-SHA-256'),
'log_statement': ('ddl', 'Log at least DDL statements'),
'log_connections': ('on', 'Log connection attempts'),
'log_disconnections': ('on', 'Log disconnection events'),
'wal_level': ('replica', 'Enable WAL for audit trail'),
'max_connections': (None, 'Review connection limits'),
}
cursor.execute("SHOW ALL;")
all_settings = {row[0]: row[1] for row in cursor.fetchall()}
for setting, (expected, desc) in critical_settings.items():
actual = all_settings.get(setting, 'UNKNOWN')
if expected and actual != expected:
findings.append({
'severity': 'MEDIUM' if not expected.startswith('log') else 'LOW',
'setting': setting,
'expected': expected,
'actual': actual,
'description': desc
})
return {'findings': findings, 'total_issues': len(findings)}
def _audit_pg_logging(self, cursor) -> Dict:
"""审计日志配置"""
findings = []
cursor.execute("""
SELECT name, setting FROM pg_settings
WHERE name IN (
'logging_collector',
'log_connections',
'log_disconnections',
'log_statement',
'log_hostname',
'pgaudit.log'
)
""")
settings = dict(cursor.fetchall())
if settings.get('logging_collector') != 'on':
findings.append({
'severity': 'HIGH',
'issue': 'Log collector is not enabled',
'recommendation': 'Set logging_collector = on'
})
if settings.get('log_statement', 'none') == 'none':
findings.append({
'severity': 'MEDIUM',
'issue': 'No SQL statements are being logged',
'recommendation': 'Set log_statement to at least "ddl"'
})
if 'pgaudit' not in str(settings):
findings.append({
'severity': 'LOW',
'issue': 'pgaudit extension not configured',
'recommendation': 'Install and configure pgaudit for detailed audit logging'
})
return {'findings': findings}
def _audit_pg_encryption(self, cursor) -> Dict:
"""审计加密配置"""
findings = []
# 检查 pgcrypto
cursor.execute("""
SELECT extname FROM pg_extension WHERE extname = 'pgcrypto'
""")
has_pgcrypto = cursor.fetchone() is not None
if not has_pgcrypto:
findings.append({
'severity': 'LOW',
'issue': 'pgcrypto extension not installed',
'recommendation': 'CREATE EXTENSION pgcrypto for column-level encryption'
})
# 检查 SSL 连接
cursor.execute("""
SELECT ssl FROM pg_stat_ssl WHERE pid = pg_backend_pid()
""")
ssl_status = cursor.fetchone()
if ssl_status and not ssl_status[0]:
findings.append({
'severity': 'HIGH',
'issue': 'Current connection is not using SSL',
'recommendation': 'Configure ssl = on in postgresql.conf'
})
return {'findings': findings, 'has_pgcrypto': has_pgcrypto}
def run_audit():
"""执行数据库安全审计"""
print("=" * 60)
print("Database Security Audit")
print("=" * 60)
# PostgreSQL 审计
pg_auditor = DatabaseSecurityAuditor('postgresql')
results = pg_auditor.audit_postgresql(
"host=localhost dbname=appdb user=auditor password=audit_pass"
)
# 统计发现
total_findings = 0
severity_counts = {'CRITICAL': 0, 'HIGH': 0, 'MEDIUM': 0, 'LOW': 0}
for category, data in results.get('categories', {}).items():
category_findings = data.get('findings', [])
total_findings += len(category_findings)
for finding in category_findings:
sev = finding.get('severity', 'LOW')
severity_counts[sev] = severity_counts.get(sev, 0) + 1
print(f"\nTotal findings: {total_findings}")
print(f"\nBy severity:")
for sev, count in severity_counts.items():
if count > 0:
print(f" [{sev}] {count}")
return results
if __name__ == '__main__':
run_audit()Docker Compose 安全数据库部署 #
# docker-compose.security.yml
# 安全加固的数据库部署配置
version: '3.8'
services:
# === PostgreSQL ===
postgres:
image: postgres:16-alpine
container_name: secure-postgres
restart: unless-stopped
ports:
- "127.0.0.1:5432:5432" # 仅绑定 localhost
environment:
POSTGRES_DB: appdb
POSTGRES_USER: app_user
POSTGRES_PASSWORD_FILE: /run/secrets/pg_password
POSTGRES_INITDB_ARGS: "--auth-host=scram-sha-256"
volumes:
- pg_data:/var/lib/postgresql/data
- ./certs:/etc/postgresql/certs:ro
- ./postgres/init:/docker-entrypoint-initdb.d:ro
command: >
postgres
-c ssl=on
-c ssl_cert_file=/etc/postgresql/certs/server.crt
-c ssl_key_file=/etc/postgresql/certs/server.key
-c ssl_ca_file=/etc/postgresql/certs/ca.crt
-c ssl_min_protocol_version=TLSv1.2
-c password_encryption=scram-sha-256
-c logging_collector=on
-c log_destination=stderr
-c log_statement=ddl
-c log_connections=on
-c log_disconnections=on
-c log_hostname=on
-c max_connections=100
networks:
- app_network
deploy:
resources:
limits:
memory: 512M
cpus: '1.0'
healthcheck:
test: ["CMD-SHELL", "pg_isready -U app_user"]
interval: 10s
timeout: 5s
retries: 5
security_opt:
- no-new-privileges:true
read_only: true
tmpfs:
- /tmp
- /run/postgresql
# === MySQL ===
mysql:
image: mysql:8.0
container_name: secure-mysql
restart: unless-stopped
ports:
- "127.0.0.1:3306:3306"
environment:
MYSQL_ROOT_PASSWORD_FILE: /run/secrets/mysql_root_password
MYSQL_DATABASE: appdb
MYSQL_USER: app_user
MYSQL_PASSWORD_FILE: /run/secrets/mysql_password
volumes:
- mysql_data:/var/lib/mysql
- ./mysql/conf.d:/etc/mysql/conf.d:ro
- ./mysql/certs:/etc/mysql/certs:ro
command: >
--require-secure-transport=ON
--default-authentication-plugin=caching_sha2_password
--ssl-ca=/etc/mysql/certs/ca.pem
--ssl-cert=/etc/mysql/certs/server-cert.pem
--ssl-key=/etc/mysql/certs/server-key.pem
--tls-version=TLSv1.2,TLSv1.3
--local-infile=0
--secure-file-priv=/var/lib/mysql-files
--log-error=/var/log/mysql/error.log
--general-log=0
networks:
- app_network
deploy:
resources:
limits:
memory: 512M
cpus: '1.0'
security_opt:
- no-new-privileges:true
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "localhost"]
interval: 10s
timeout: 5s
retries: 5
# === MongoDB ===
mongodb:
image: mongo:7.0
container_name: secure-mongodb
restart: unless-stopped
ports:
- "127.0.0.1:27017:27017"
environment:
MONGO_INITDB_ROOT_USERNAME_FILE: /run/secrets/mongo_root_user
MONGO_INITDB_ROOT_PASSWORD_FILE: /run/secrets/mongo_root_password
volumes:
- mongo_data:/data/db
- ./mongo/certs:/etc/mongo/certs:ro
- ./mongo/mongod.conf:/etc/mongo/mongod.conf:ro
command: >
--config /etc/mongo/mongod.conf
networks:
- app_network
deploy:
resources:
limits:
memory: 512M
cpus: '1.0'
security_opt:
- no-new-privileges:true
healthcheck:
test: echo 'db.runCommand("ping").ok' | mongosh localhost:27017/test --quiet
interval: 10s
timeout: 5s
retries: 5
# === Audit Logging ===
audit_logger:
image: fluent/fluentd:latest
container_name: db-audit-logger
restart: unless-stopped
volumes:
- ./fluentd/conf:/fluentd/etc:ro
- audit_logs:/var/log/db-audit
networks:
- app_network
volumes:
pg_data:
driver: local
mysql_data:
driver: local
mongo_data:
driver: local
audit_logs:
driver: local
networks:
app_network:
driver: bridge
ipam:
config:
- subnet: 172.28.0.0/16
secrets:
pg_password:
file: ./secrets/pg_password.txt
mysql_root_password:
file: ./secrets/mysql_root_password.txt
mysql_password:
file: ./secrets/mysql_password.txt
mongo_root_user:
file: ./secrets/mongo_root_user.txt
mongo_root_password:
file: ./secrets/mongo_root_password.txt数据库安全加固清单速查表 #
┌─────────────────────────────────────────────────────────────────────────────┐
│ 数据库安全加固检查表 │
├──────────┬──────────────┬──────────────┬────────────────────────────────────┤
│ 类别 │ 检查项 │ MySQL │ PostgreSQL │
├──────────┼──────────────┼──────────────┼────────────────────────────────────┤
│ 认证 │ 强密码策略 │ validate_ │ scram-sha-256 + │
│ │ │ password │ pg_hba.conf 控制 │
│ ├──────────────┼──────────────┼────────────────────────────────────┤
│ │ 默认账户清理 │ 删除匿名账户 │ 删除 public schema 权限 │
│ ├──────────────┼──────────────┼────────────────────────────────────┤
│ │ 密码过期 │ 支持 │ ALTER USER ... VALID UNTIL │
├──────────┼──────────────┼──────────────┼────────────────────────────────────┤
│ 网络 │ TLS 加密 │ require_ │ ssl = on │
│ │ │ secure_ │ │
│ │ │ transport │ │
│ ├──────────────┼──────────────┼────────────────────────────────────┤
│ │ 绑定地址 │ 127.0.0.1 │ listen_addresses │
├──────────┼──────────────┼──────────────┼────────────────────────────────────┤
│ 权限 │ 最小权限 │ GRANT 精确 │ 角色 + 列级权限 │
│ ├──────────────┼──────────────┼────────────────────────────────────┤
│ │ 删除权限 │ REVOKE 不必要│ DROP PUBLIC 权限 │
│ │ │ 的 GRANT │ │
├──────────┼──────────────┼──────────────┼────────────────────────────────────┤
│ 加密 │ TDE │ InnoDB │ pgcrypto / pg_tde │
│ │ │ encryption │ │
│ ├──────────────┼──────────────┼────────────────────────────────────┤
│ │ 列级加密 │ AES_ENCRYPT │ pgp_sym_encrypt │
│ ├──────────────┼──────────────┼────────────────────────────────────┤
│ │ 密码哈希 │ caching_ │ scram-sha-256 │
│ │ │ sha256 + │ + crypt() (bcrypt) │
│ │ │ bcrypt │ │
├──────────┼──────────────┼──────────────┼────────────────────────────────────┤
│ 审计 │ 查询日志 │ general_log │ log_statement = 'ddl' │
│ │ │ (谨慎使用) │ + pgaudit │
│ ├──────────────┼──────────────┼────────────────────────────────────┤
│ │ 连接审计 │ audit_log │ log_connections / │
│ │ │ plugin │ log_disconnections │
├──────────┼──────────────┼──────────────┼────────────────────────────────────┤
│ 备份 │ 加密备份 │ mysql_ │ pg_dump + gpg │
│ │ │ backup │ │
│ │ │ --ssl-mode │ │
└──────────┴──────────────┴──────────────┴────────────────────────────────────┘结语 #
数据库安全加固是一项持续性的工程实践,而非一次性的配置任务。从配置审计到注入防御,从加密方案到监控体系,每个环节都需要认真对待。最关键的原则是:永远不要信任用户输入,永远使用参数化查询,永远遵循最小权限原则。
在实际的安全评估中,我见过太多因为一个默认密码、一条宽松 GRANT 语句、一个未参数化的查询而导致的数据泄露事件。做好数据库安全,从今天的配置审计开始。