{ "cells": [ { "cell_type": "markdown", "id": "discrete-tyler", "metadata": {}, "source": [ "Playing with Hooks\n", "==================\n", "\n", "Setting hooks is the main interface with an execution and an exploration to perform\n", "user-defined actions. TritonDSE enables hooking the following events:\n", "\n", "* address reached\n", "* instruction executed *(all of them)*\n", "* memory address read or written\n", "* register read or written\n", "* function reached *(from its name)*\n", "* end of an execution\n", "* thread context switch\n", "* new input creation *(before it gets appended in the pool of seeds)*\n", "\n", "The library introduces a `CallbackManager` object which enables registering\n", "callbacks. A `SymbolicExecutor` does contain this object.\n", "\n", "A `SymbolicExplorator` object it also contains a callback_manager instance. In this\n", "case, callbacks will be transmitted to all subsequent `SymbolicExecutor` instances.\n" ] }, { "cell_type": "markdown", "id": "private-exemption", "metadata": {}, "source": [ "Let's reuse the following base snippet:" ] }, { "cell_type": "code", "execution_count": 41, "id": "periodic-wallet", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "WARNING:root:symbol __gmon_start__ imported but unsupported\n" ] } ], "source": [ "from triton import Instruction\n", "from tritondse import SymbolicExecutor, Config, Seed, Program, ProcessState, SeedFormat, CompositeData\n", "\n", "p = Program(\"crackme_xor\")\n", "config = Config(pipe_stdout=False, seed_format=SeedFormat.COMPOSITE)\n", "seed = Seed(CompositeData(argv=[b\"./crackme_xor\", b\"AAAAAAAAAAAA\"]))\n", "\n", "executor = SymbolicExecutor(config, seed)\n", "executor.load(p)" ] }, { "cell_type": "markdown", "id": "interracial-namibia", "metadata": {}, "source": [ "## I. Instruction hooking" ] }, { "cell_type": "markdown", "id": "chemical-ballot", "metadata": {}, "source": [ "Instruction hooking enables hooking the execution of every instructions executed regardless of theirs address etc.\n", "\n", "The signature for an instruction hook is the following:\n", "\n", "```python\n", "Callable[['SymbolicExecutor', ProcessState, Instruction], None]\n", "```\n", "\n", "We can use it to print every instructions executed:" ] }, { "cell_type": "code", "execution_count": 42, "id": "substantial-preliminary", "metadata": {}, "outputs": [], "source": [ "def trace_inst(se: SymbolicExecutor, pstate: ProcessState, inst: Instruction):\n", " print(f\"[tid:{inst.getThreadId()}] 0x{inst.getAddress():x}: {inst.getDisassembly()}\")\n", "\n", "executor.callback_manager.register_post_instruction_callback(trace_inst)" ] }, { "cell_type": "code", "execution_count": 43, "id": "great-warehouse", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[tid:0] 0x400460: xor ebp, ebp\n", "[tid:0] 0x400462: mov r9, rdx\n", "[tid:0] 0x400465: pop rsi\n", "[tid:0] 0x400466: mov rdx, rsp\n", "[tid:0] 0x400469: and rsp, 0xfffffffffffffff0\n", "[tid:0] 0x40046d: push rax\n", "[tid:0] 0x40046e: push rsp\n", "[tid:0] 0x40046f: mov r8, 0x400680\n", "[tid:0] 0x400476: mov rcx, 0x400610\n", "[tid:0] 0x40047d: mov rdi, 0x4005b3\n", "[tid:0] 0x400484: call 0x400440\n", "[tid:0] 0x400440: jmp qword ptr [rip + 0x200bda]\n", "[tid:0] 0x4005b3: push rbp\n", "[tid:0] 0x4005b4: mov rbp, rsp\n", "[tid:0] 0x4005b7: sub rsp, 0x20\n", "[tid:0] 0x4005bb: mov dword ptr [rbp - 0x14], edi\n", "[tid:0] 0x4005be: mov qword ptr [rbp - 0x20], rsi\n", "[tid:0] 0x4005c2: cmp dword ptr [rbp - 0x14], 2\n", "[tid:0] 0x4005c6: je 0x4005cf\n", "[tid:0] 0x4005cf: mov rax, qword ptr [rbp - 0x20]\n", "[tid:0] 0x4005d3: add rax, 8\n", "[tid:0] 0x4005d7: mov rax, qword ptr [rax]\n", "[tid:0] 0x4005da: mov rdi, rax\n", "[tid:0] 0x4005dd: call 0x400556\n", "[tid:0] 0x400556: push rbp\n", "[tid:0] 0x400557: mov rbp, rsp\n", "[tid:0] 0x40055a: mov qword ptr [rbp - 0x18], rdi\n", "[tid:0] 0x40055e: mov dword ptr [rbp - 4], 0\n", "[tid:0] 0x400565: jmp 0x4005a6\n", "[tid:0] 0x4005a6: cmp dword ptr [rbp - 4], 4\n", "[tid:0] 0x4005aa: jle 0x400567\n", "[tid:0] 0x400567: mov eax, dword ptr [rbp - 4]\n", "[tid:0] 0x40056a: movsxd rdx, eax\n", "[tid:0] 0x40056d: mov rax, qword ptr [rbp - 0x18]\n", "[tid:0] 0x400571: add rax, rdx\n", "[tid:0] 0x400574: movzx eax, byte ptr [rax]\n", "[tid:0] 0x400577: movsx eax, al\n", "[tid:0] 0x40057a: sub eax, 1\n", "[tid:0] 0x40057d: xor eax, 0x55\n", "[tid:0] 0x400580: mov ecx, eax\n", "[tid:0] 0x400582: mov rdx, qword ptr [rip + 0x200ab7]\n", "[tid:0] 0x400589: mov eax, dword ptr [rbp - 4]\n", "[tid:0] 0x40058c: cdqe\n", "[tid:0] 0x40058e: add rax, rdx\n", "[tid:0] 0x400591: movzx eax, byte ptr [rax]\n", "[tid:0] 0x400594: movsx eax, al\n", "[tid:0] 0x400597: cmp ecx, eax\n", "[tid:0] 0x400599: je 0x4005a2\n", "[tid:0] 0x40059b: mov eax, 1\n", "[tid:0] 0x4005a0: jmp 0x4005b1\n", "[tid:0] 0x4005b1: pop rbp\n", "[tid:0] 0x4005b2: ret\n", "[tid:0] 0x4005e2: mov dword ptr [rbp - 4], eax\n", "[tid:0] 0x4005e5: cmp dword ptr [rbp - 4], 0\n", "[tid:0] 0x4005e9: jne 0x4005f7\n", "[tid:0] 0x4005f7: mov edi, 0x40069e\n", "[tid:0] 0x4005fc: call 0x400430\n", "[tid:0] 0x400430: jmp qword ptr [rip + 0x200be2]\n", "[tid:0] 0x400601: mov eax, 0\n", "[tid:0] 0x400606: leave\n", "[tid:0] 0x400607: ret\n" ] } ], "source": [ "executor.run()" ] }, { "cell_type": "markdown", "id": "experienced-ottawa", "metadata": {}, "source": [ "The **pre** and **post** defines whether the callback is called before the instruction is executed or after.\n", "\n", "In this case we could also have used `register_pre_instruction_callback` but the `Instruction` object would not be decoded yet, so it prevents getting its disassembly." ] }, { "cell_type": "markdown", "id": "suspended-paris", "metadata": {}, "source": [ "## II. Address/Function hooking\n", "\n", "We can hook any address and perform an associated action. \n", "We can also hook any function as long as the symbol is set.\n", "They both have the same signature:\n", "\n", "```python\n", "Callable[['SymbolicExecutor', ProcessState, Addr], None]\n", "```\n", "\n", "For the purpose of the challenge let's hook the compare instruction and patch de ZF flag to force looping.\n", "Let's also hook the `puts` function to print the string given to each call." ] }, { "cell_type": "code", "execution_count": 44, "id": "decreased-italy", "metadata": {}, "outputs": [], "source": [ "def hook_cmp(se: SymbolicExecutor, pstate: ProcessState, addr: int):\n", " print(f\"{pstate.cpu.al} - {pstate.cpu.cl}\")\n", " pstate.cpu.zf = 1\n", " #exec.abort()\n", "\n", "def hook_puts(se: SymbolicExecutor, pstate: ProcessState, routine: str, addr: int):\n", " s = pstate.memory.read_string(pstate.get_argument_value(0))\n", " print(f\"puts: {s}\")" ] }, { "cell_type": "code", "execution_count": 45, "id": "alien-grace", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "WARNING:root:symbol __gmon_start__ imported but unsupported\n" ] } ], "source": [ "executor = SymbolicExecutor(config, seed)\n", "executor.load(p)\n", "\n", "# Remove trace printing callback\n", "executor.callback_manager.reset()\n", "executor.callback_manager.register_post_addr_callback(0x0400597, hook_cmp)\n", "executor.callback_manager.register_post_imported_routine_callback(\"puts\", hook_puts)" ] }, { "cell_type": "code", "execution_count": 46, "id": "growing-athens", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "49 - 21\n", "62 - 21\n", "61 - 21\n", "38 - 21\n", "49 - 21\n", "puts: Win\n" ] } ], "source": [ "executor.run()" ] }, { "cell_type": "markdown", "id": "analyzed-valve", "metadata": {}, "source": [ "We did not really won where as we forced the ZF flag, but we retrieved encoded values on wich the comparison is made." ] }, { "cell_type": "markdown", "id": "smooth-catering", "metadata": {}, "source": [ "## III. Solving queries\n", "\n", "We can modify our hooks to directly solve by SMT what shall be the appropriate value of CL in order to match the comparison." ] }, { "cell_type": "code", "execution_count": 47, "id": "unnecessary-tours", "metadata": {}, "outputs": [], "source": [ "from tritondse.types import SolverStatus\n", "\n", "def hook_cmp2(se: SymbolicExecutor, pstate: ProcessState, addr: int):\n", " # CL contains the input of the user (hashed)\n", " \n", " # retrieve the symbolic value of both characters\n", " sym_al = pstate.read_symbolic_register(pstate.registers.al)\n", " sym_cl = pstate.read_symbolic_register(pstate.registers.cl)\n", " \n", " # Solve the constraint such that one match the other\n", " status, model = pstate.solve(sym_al.getAst() == sym_cl.getAst())\n", " \n", " # If formula is SAT retrieve input values\n", " if status == SolverStatus.SAT:\n", " # Retrieve value of the input variable involved in the cl value here (shall be only one here)\n", " var_values = pstate.get_expression_variable_values_model(sym_cl, model)\n", " for var, value in var_values.items():\n", " print(f\"{var}: {chr(value)}\")\n", " else:\n", " print(status.name)\n", " \n", " pstate.cpu.zf = 1" ] }, { "cell_type": "code", "execution_count": 48, "id": "guilty-botswana", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "WARNING:root:symbol __gmon_start__ imported but unsupported\n" ] } ], "source": [ "executor = SymbolicExecutor(config, seed)\n", "executor.load(p)\n", "\n", "executor.callback_manager.reset()\n", "executor.callback_manager.register_post_addr_callback(0x0400597, hook_cmp2)" ] }, { "cell_type": "code", "execution_count": 49, "id": "medical-premiere", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "argv[1][0]:8: e\n", "argv[1][1]:8: l\n", "argv[1][2]:8: i\n", "argv[1][3]:8: t\n", "argv[1][4]:8: e\n" ] } ], "source": [ "executor.run()" ] }, { "cell_type": "markdown", "id": "multiple-appearance", "metadata": {}, "source": [ "## IV. Hooking exploration events\n", "\n", "We can similarly put callbacks on a `SymbolicExplorator`. In this case, the callback manager\n", "will be shared among all the `SymbolicExecutor` instances. Let's hook every iteration to print\n", "some statistics:" ] }, { "cell_type": "code", "execution_count": 53, "id": "pharmaceutical-salad", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "WARNING:root:symbol __gmon_start__ imported but unsupported\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "input: 0063e1d416400b0a0401dc471be64a8f [b'./crackme_xor', b'AAAAAAAAAAAA'] status:OK_DONE [exitcode:0]\n", "input: 415d0d4405119b88530788282aa06d7d [b'./crackme_xor', b'eAAAAAAAAAAA'] status:OK_DONE [exitcode:0]\n" ] }, { "data": { "text/plain": [ "" ] }, "execution_count": 53, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from tritondse import SymbolicExplorator, Config, Seed, Program, ProcessState, SeedFormat, CompositeData\n", "\n", "def pre_exec_hook(se: SymbolicExecutor, state: ProcessState):\n", " print(f\"input: {se.seed.hash} {se.seed.content.argv} \", end=\"\")\n", "\n", "def post_exec_hook(se: SymbolicExecutor, state: ProcessState):\n", " print(f\"status:{se.seed.status.name} [exitcode:{se.exitcode}]\")\n", "\n", "dse = SymbolicExplorator(Config(seed_format=SeedFormat.COMPOSITE), p)\n", "dse.add_input_seed(Seed(CompositeData(argv=[b\"./crackme_xor\", b\"AAAAAAAAAAAA\"])))\n", "\n", "dse.callback_manager.register_pre_execution_callback(pre_exec_hook)\n", "dse.callback_manager.register_post_execution_callback(post_exec_hook)\n", "\n", "dse.explore()" ] } ], "metadata": { "kernelspec": { "display_name": "neopastis", "language": "python", "name": "neopastis" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.2" } }, "nbformat": 4, "nbformat_minor": 5 }