Skip to content

program

Program

This is the main class of Quokka. It deals with the most common abstraction, the Program.

Program

Bases: dict

Program

The program is quokka main abstraction. It represents the full binary and is in itself a mapping of functions.

Parameters:

Name Type Description Default
export_file Path | str

Path towards the export file (e.g. .quokka)

required
exec_path Path | str

Path towards the binary file

required

Attributes:

Name Type Description
proto Quokka

Contains the protobuf data. This should not be used directly. However, if you don't find another way of accessing some information, feel free to open an issue.

export_file Path

The path to the export file (e.g. the .quokka)

mode ExporterMode

Export mode (LIGHT, NORMAL or FULL)

base_address AddressT

Program base address

isa ArchEnum

Instruction set

address_size int

Default pointer size

arch Type[QuokkaArch]

Program architecture

endianness Endianness

Program endianness

executable

An object to manage the binary file

references

The reference manager

data

The data manager

fun_names dict[str, Function]

A mapping of function names to functions

Raises:

Type Description
QuokkaError

If the loading is not successful.

Source code in bindings/python/quokka/program.py
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
class Program(dict):
    """Program

    The program is `quokka` main abstraction.
    It represents the full binary and is in itself a mapping of functions.

    Arguments:
        export_file: Path towards the export file (e.g. .quokka)
        exec_path: Path towards the binary file

    Attributes:
        proto: Contains the protobuf data. This should not be used directly. However, if
            you don't find another way of accessing some information, feel  free to
            open an issue.
        export_file: The path to the export file (e.g. the .quokka)
        mode: Export mode (LIGHT, NORMAL or FULL)
        base_address: Program base address
        isa: Instruction set
        address_size: Default pointer size
        arch: Program architecture
        endianness: Program endianness
        executable: An object to manage the binary file
        references: The reference manager
        data: The data manager
        fun_names: A mapping of function names to functions

    Raises:
        QuokkaError: If the loading is not successful.

    """

    logger: logging.Logger = logging.getLogger(__name__)

    def __init__(self, export_file: Path|str, exec_path: Path|str):
        """Constructor"""
        super(dict, self).__init__()

        self.proto: quokka.pb.Quokka = quokka.pb.Quokka()
        self.export_file: Path = Path(export_file)
        try:
            with lzma.open(self.export_file, "rb") as fd:
                raw_data = fd.read()
        except lzma.LZMAError:
            # try reading it as a plain-bytes Quokka (but should raise version mismatch later)
            with open(self.export_file, "rb") as fd:
                raw_data = fd.read()

        self.proto.ParseFromString(raw_data)

        # Export mode
        self.mode: ExporterMode = ExporterMode.from_proto(self.proto.exporter_meta.mode)

        # Version checking
        # A change in the major version might break backward compatibility
        proto_version = quokka.parse_version(self.proto.exporter_meta.version)
        current_version = quokka.parse_version(quokka.__version__)
        if proto_version[0] != current_version[0]:
            self.logger.warning(
                "The exported file has been generated by a different version of Quokka."
                f" The file has been generated by Quokka {self.proto.exporter_meta.version}"
                f" while you are using {quokka.__version__}"
            )
        elif self.proto.exporter_meta.version != quokka.__version__:
            self.logger.debug(
                "Version mismatch detected but still compatible with the exported file."
                f" The file has been generated by Quokka {self.proto.exporter_meta.version}"
                f" while you are using {quokka.__version__}"
            )

        # Check if the hashes matches between the export file and the exec
        if not quokka.check_hash(self.proto.meta.hash, Path(exec_path)):
            self.logger.error("Hash does not match with file.")
            raise quokka.QuokkaError("Hash mismatch")

        self.isa: quokka.analysis.ArchEnum = quokka.get_isa(self.proto.meta.isa)
        self.address_size: int = quokka.convert_address_size(
            self.proto.meta.address_size
        )
        self.arch: Type[quokka.analysis.QuokkaArch] = quokka.get_arch(
            self.isa, self.address_size
        )

        self.endianness: Endianness = Endianness.from_proto(self.proto.meta.endianess)

        # Get disassembly backend
        self.disassembler: Disassembler = Disassembler.from_proto(self.proto.meta.backend.name) 
        self.disassembler_version: str = self.proto.meta.backend.version
        self.decompiled_activated: bool = self.proto.meta.decompilation_activated

        self.calling_convention: CallingConvention = CallingConvention.from_proto(self.proto.meta.calling_convention)

        self.executable = quokka.Executable(exec_path, self.endianness)
        self.data = quokka.DataHolder(self.proto, self)

        # Functions
        # self.functions: Dict[int, quokka.Function] = {}
        self.fun_names: dict[str, quokka.Function] = {}
        for func_index, func in enumerate(self.proto.functions):
            function = quokka.Function(func_index, func, self)
            self[function.start] = function
            if function.name not in self.fun_names:
                self.fun_names[function.name] = function
            else:
                if function.type == self.fun_names[function.name]:
                    self.logger.warning("Found two functions with the same name.")
                else:
                    if function.type == FunctionType.NORMAL:
                        self.fun_names[function.name] = function

        # Types
        self._types: dict[Index, TypeT] = {}  # types as they are being loaded

    def __hash__(self) -> int:
        """Hash of the Program (use the hash from the exported file)"""
        return int(self.proto.meta.hash.hash_value, 16)

    @property
    def base_address(self) -> AddressT:
        """Program base address"""
        return min(segment.start for segment in self.segments.values())

    @property
    def name(self) -> str:
        """Returns the underlying binary name"""
        return self.proto.meta.executable_name

    @property
    def hash(self) -> str:
        """Returns the hash value of the binary (either sha256 or MD5)."""
        return self.proto.meta.hash.hash_value

    @property
    def headers(self) -> str:
        """Returns C-style headers of the binary"""
        return self.proto.headers

    @cached_property
    def capstone(self) -> capstone.Cs:
        """Compute a capstone context"""
        return quokka.backends.get_capstone_context(self.arch, self.endianness)

    @cached_property
    def call_graph(self) -> networkx.DiGraph:
        """Compute the Call Graph of the binary

        Every node in the call graph is a function.

        :return: A Call Graph (a networkx DiGraph)
        """
        call_graph: "networkx.DiGraph" = networkx.DiGraph()

        for function in self.values():
            call_graph.add_node(function.start)
            call_graph.add_edges_from(product((function.start,), (x.start for x in function.callees)))

        return call_graph

    @property
    def functions(self) -> Iterable[quokka.Function]:
        """Functions accessor

        Allows to retrieve the different functions of a program (as defined by the
        disassembler).

        Returns:
            A list of functions
        """
        yield from self.values()

    @property
    def types(self) -> Iterable[TypeT]:
        """Types in the program

        Returns:
            Iterable of types in the Program (excludes user-added types)
        """
        for i, pb_type in enumerate(self.proto.types):
            if not pb_type.is_new:
                yield self.get_type(i)

    def virtual_address(self, seg_id: int, seg_offset: int) -> AddressT:
        """Converts an offset in the file to an absolute address

        Arguments:
            seg_id: Segment ID
            seg_offset: Byte offset in the segment

        Returns:
            An absolute address
        """
        return self.segments[seg_id].address + seg_offset

    def address_to_offset(self, address: AddressT) -> int:
        """Converts a program offset to a file offset.

        Arguments:
            address: A virtual address

        Returns:
            A file offset
        """
        try:
            segment = self.get_segment(address)
        except KeyError as exc:
            raise quokka.NotInFileError("Unable to find the segment") from exc

        if segment.file_offset != -1:
            return address - segment.address + segment.file_offset

        raise quokka.NotInFileError("Unable to find the offset in the file")

    @cached_property
    def pypcode(self) -> pypcode.Context:
        """Generate the Pypcode context."""
        from quokka.backends.pypcode import get_pypcode_context

        return get_pypcode_context(self.arch, self.endianness)

    @property
    def structures(self) -> Iterable[StructureType]:
        """Structures accessor

        Allows to retrieve the different structures of a program (as defined by the
        disassembler).

        Returns:
            A list of structures
        """
        for i, t in enumerate(self.proto.types):
            if t.is_new:
                continue
            if t.WhichOneof("OneofType") == "composite_type":
                if t.composite_type.type == Pb.CompositeType.CompositeSubType.TYPE_STRUCT:
                    if i not in self._types:
                        self._types[i] = StructureType(i, t.composite_type, self)
        yield from (t for t in self._types.values() if isinstance(t, StructureType) and not t.is_new)

    @cached_property
    def enums(self) -> Iterable[EnumType]:
        """Enums accessor

        Allows to retrieve the different enums of a program (as defined by the
        disassembler).

        Returns:
            A list of enums
        """
        for i, t in enumerate(self.proto.types):
            if t.is_new:
                continue
            if t.WhichOneof("OneofType") == "enum_type":
                if i not in self._types:
                    self._types[i] = EnumType(i, t.enum_type, self)
        yield from (t for t in self._types.values() if isinstance(t, EnumType) and not t.is_new)

    def get_struct_member(self, struct_index: Index, member_index: Index) -> StructureTypeMember:
        """Get a structure member by its index

        Arguments:
            struct_index: Index of the structure in the proto
            member_index: Index of the member in the structure (starting from 0)

        Returns:
            The corresponding structure member

        Raises:
            KeyError: When the structure or the member is not found
        """
        try:
            struct = self._types[struct_index]
            assert isinstance(struct, StructureType)
            return struct[member_index]
        except KeyError as exc:
            raise KeyError(f"No structure or member with index {struct_index}") from exc

    def get_item(self, addr: AddressT) -> quokka.Function | quokka.Instruction:
        """Get a function, block or instruction by its address

        Arguments:
            addr: Address to query

        Returns:
            A function, block or instruction at the address

        Raises:
            KeyError: When no item is found at the address
        """
        try:
            return self[addr]
        except KeyError as exc:
            raise KeyError(f"No item at address 0x{addr:x}") from exc

    def get_type_reference(self, type_index: Index, member_index: int = -1) -> TypeReference:
        """Get a type by its index

        Arguments:
            type_index: Index of the type in the proto
            member_index: Index of the member in the type (if applicable)

        Returns:
            The corresponding type

        Raises:
            KeyError: When the type is not found or is a user-added type
        """
        try:
            typ = self._types[type_index]
        except KeyError:
            # Unless fill the type from the protobuf
            if type_index >= len(self.proto.types):
                raise KeyError(f"No type with index {type_index}")

            pb_type = self.proto.types[type_index]
            if pb_type.is_new:
                raise KeyError(f"Type at index {type_index} is a user-added type")
            is_new = pb_type.is_new
            if pb_type.WhichOneof("OneofType") == "enum_type":
                self._types[type_index] = EnumType(type_index, pb_type.enum_type, self, is_new=is_new)
            elif pb_type.WhichOneof("OneofType") == "composite_type":
                match pb_type.composite_type.type:
                    case Pb.CompositeType.CompositeSubType.TYPE_STRUCT:
                        self._types[type_index] = StructureType(type_index, pb_type.composite_type, self, is_new=is_new)
                    case Pb.CompositeType.CompositeSubType.TYPE_UNION:
                        self._types[type_index] = UnionType(type_index, pb_type.composite_type, self, is_new=is_new)
                    case Pb.CompositeType.CompositeSubType.TYPE_ARRAY:
                        self._types[type_index] = ArrayType(type_index, pb_type.composite_type, self, is_new=is_new)
                    case Pb.CompositeType.CompositeSubType.TYPE_POINTER:
                        self._types[type_index] = PointerType(type_index, pb_type.composite_type, self, is_new=is_new)
                    case Pb.CompositeType.CompositeSubType.TYPE_TYPEDEF:
                        self._types[type_index] = TypedefType(type_index, pb_type.composite_type, self, is_new=is_new)
                    case _:
                        # Unknown CompositeSubType -- degrade to TYPE_UNK for forward compat
                        self._types[type_index] = BaseType.UNKNOWN
            elif pb_type.WhichOneof("OneofType") == "primitive_type":
                self._types[type_index] = BaseType.from_proto(pb_type.primitive_type)
            else:
                assert False, "Unknown type"
            typ = self._types[type_index]  # here should be loaded in _types

        if isinstance(typ, ComplexType) and typ.is_new:
            raise KeyError(f"Type at index {type_index} is a user-added type")

        if member_index != -1:
            assert isinstance(typ, (StructureType, UnionType, EnumType))
            if isinstance(typ, StructureType) and not isinstance(typ, UnionType):
                return typ.member_at(member_index)
            return typ[member_index]
        else:
            return typ

    def get_type(self, type_index: Index, member_index: int = -1) -> TypeT:
        """Get a type by its index with strict typing.
           For struct or enum members, returns the underlying type.

        Arguments:
            type_index: Index of the type in the proto

        Returns:
            The corresponding type

        Raises:
            KeyError: When the type is not found
            ValueError: When the type is not of the expected type
        """
        typ = self.get_type_reference(type_index, member_index)
        if isinstance(typ, EnumTypeMember):
            return typ.base_type
        elif isinstance(typ, StructureTypeMember):
            return typ.type   # A member cannot point to another member, so it must be a direct reference to a type
        else:
            return typ

    def find_type(self, name: str) -> TypeT|None:
        """Find a type by its name

        Arguments:
            name: Name of the type to find

        Returns:
            The corresponding type, None if not found
        """
        for t in self.types:
            if t.name == name:
                return t
        return None

    def get_type_resolved(self, type_index: Index) -> TypeT:
        """Get a type by index, resolving through any typedef chains.

        This is equivalent to calling get_type() and then resolve() on the
        result if it is a TypedefType. Useful for consumers that do not care
        about typedef names and want the concrete type directly.
        """
        t = self.get_type(type_index)
        while isinstance(t, TypedefType):
            t = t.aliased_type
        return t

    def add_type(self, type: str) -> None:
        """Add a new user-defined type to the program.

        Args:
            type: A C type declaration string
                (e.g. ``"struct foo { int x; float y; }"``).

        Raises:
            QuokkaError: If a type with the same name already exists.
        """

        h = hashlib.sha256(type.encode("utf-8", errors="replace")).hexdigest()[:16]
        type_name = f"__user_type_{h}"
        new_index = len(self.proto.types)
        pb_type = self.proto.types.add()
        pb_type.is_new = True
        ct = pb_type.composite_type
        ct.name = type_name
        ct.type = Pb.CompositeType.TYPE_STRUCT
        ct.c_str = type

        # Check for duplicate names
        for i, t in enumerate(self.proto.types):
            if i == new_index:
                continue
            oneof = t.WhichOneof("OneofType")
            if oneof == "composite_type" and t.composite_type.name == type_name:
                # Remove the just-added entry
                del self.proto.types[new_index]
                raise QuokkaError(f"Type '{type_name}' already exists at index {i}")
            elif oneof == "enum_type" and t.enum_type.name == type_name:
                del self.proto.types[new_index]
                raise QuokkaError(f"Type '{type_name}' already exists at index {i}")

    @cached_property
    def orphaned_blocks(self) -> Iterable[quokka.Block]:
        """Orphaned blocks

        Orphaned blocks are blocks that are not attached to any function. They can be
        useful to analyze code that is not in functions (e.g. hand-written assembly or
        jump tables).

        Returns:
            A list of orphaned blocks.
        """
        raise NotImplementedError("Orphaned blocks loading is not implemented yet")

    @cached_property
    def segments(self) -> dict[int, quokka.Segment]:
        """Returns the list of segments defined in the program."""
        return {i: quokka.Segment(segment, self) for i, segment in enumerate(self.proto.segments)}

    def get_instruction(self, address: AddressT) -> quokka.Instruction:
        """Get an instruction by its address

        Note: the address must be the head of the instruction.

        Arguments:
            address: AddressT: Address to query

        Returns:
            A `quokka.Instruction`

        Raises:
            IndexError: When no instruction is found at this address
        """
        for function in self.values():
            if function.in_function(address):
                try:
                    return function.get_instruction(address)
                except IndexError:
                    pass

        raise IndexError(f"No instruction at address 0x{address:x}")

    def find_function_by_address(self, address: AddressT) -> quokka.Function|None:
        """Get a function by any address.

        Arguments:
            address: AddressT: within the function

        Returns:
            A `quokka.Function` None if not belonging to any function
        """
        if address in self:
            return self[address]
        else:
            # try finding it in the functions
            for function in self.values():
                if function.in_function(address):
                    return function
        return None

    def get_function(
        self, name: str, approximative: bool = False, normal: bool = False
    ) -> quokka.Function:
        """Find a function in a program by its name

        Arguments:
            name: Function name
            approximative: Should the name exactly match or allow partial matches?
            normal: Return only FunctionType.NORMAL functions

        Returns:
            A function matching the research criteria

        Raises:
            ValueError: When no function is found
        """
        if approximative is False:
            try:
                return self.fun_names[name]
            except KeyError as exc:
                raise ValueError("Missing function") from exc

        for function_name, function in self.fun_names.items():
            # TODO(dm) Improve this
            if name in function.name and (
                not normal or function.type == FunctionType.NORMAL
            ):
                return self.fun_names[function_name]

        raise ValueError("Unable to find an appropriate function")

    def get_segment(self, address: AddressT) -> quokka.Segment:
        """Get a `Segment` by an address

        The address must be in [segment.start, segment.end) to be found.

        Arguments:
            address: Segment's address

        Returns:
            The corresponding Segment

        Raises:
            KeyError: When the segment is not found
        """
        for _, segment in self.segments.items():
            if segment.in_segment(address):
                return segment

        raise KeyError(f"No segment has been found for address {address:#x}")

    def read_bytes(self, v_addr: AddressT, size: int) -> bytes:
        """Read raw bytes from a virtual address

        Arguments:
            v_addr: Virtual address of the data to read
            size: Size of the data to read

        Returns:
            The raw data at the specified address
        """

        if (offset := v_addr - self.base_address) < 0:
            raise ValueError("Address outside virtual address space.")
        return self.executable.read_bytes(offset, size)

    def get_data(self, address: AddressT) -> quokka.Data:
        """Get data by address

        Arguments:
            address: Address to query

        Returns:
            A data at the address

        Raises:
            ValueError: When no data is found at the address
        """
        return self.data[address]

    def __repr__(self) -> str:
        """Program representation"""
        return self.__str__()

    def __str__(self) -> str:
        """Program representation"""
        return f"<Program {self.executable.exec_file.name} ({self.arch.__name__})>"

    @staticmethod
    def from_binary(
        exec_path: Path|str,
        output_file: Path|str|None = None,
        database_file: Path|str|None = None,
        decompiled: bool = False,
        debug: bool = False,
        override: bool = True,
        timeout: int|None = 0,
        mode: ExporterMode = ExporterMode.LIGHT,
        disassembler: Disassembler = Disassembler.UNKNOWN,
    ) -> Program|None:
        """Generate an export file directly from the binary.

        Arguments:
            exec_path: Binary to export.
            output_file: Where to store the result (by default: near the executable)
            database_file: Where to store IDA database (by default: near the executable)
            decompiled: Whether to export decompiled code (default: False)
            timeout: How long should we wait for the export to finish (default: 10 min)
            debug: Activate the debug output
            mode: Export mode (LIGHT or FULL)
            disassembler: Backend to use (auto-detect if UNKNOWN)

        Returns:
            A Program instance or None

        Raises:
            QuokkaError: If the export fails
            FileNotFoundError: If the executable is not found
        """
        quokka_file = Program.generate(
            exec_path=exec_path,
            output_file=output_file,
            database_file=database_file,
            decompiled=decompiled,
            override=override,
            debug=debug,
            timeout=timeout,
            mode=mode,
            disassembler=disassembler,
        )

        # In theory if reach here export file exists otherwise an exception has been raised
        if quokka_file.exists():
            return Program.open(quokka_file, exec_path)
        else:
            raise FileNotFoundError(f"Quokka generation failed, export file does not exist: {quokka_file}")


    @staticmethod
    def open(export_file: Path|str, exec_file: Path|str) -> Program:
        """
        Open a BinExport file and return an instance of Program.

        :param export_file: BinExport file path
        :param exec_file: Path to the executable file
        :return: an instance of Program
        """
        return Program(export_file, exec_file)

    @staticmethod
    def _detect_disassembler() -> Disassembler:
        """Auto-detect an available disassembler backend.

        Prefers IDA if ``idascript`` is importable and ``IDA_PATH`` is set,
        otherwise falls back to Ghidra if ``GHIDRA_INSTALL_DIR`` is set.
        """
        if idascript is not None and idascript.get_ida_path():
            return Disassembler.IDA
        if os.environ.get("GHIDRA_INSTALL_DIR"):
            return Disassembler.GHIDRA
        return Disassembler.UNKNOWN

    @staticmethod
    def _generate_ida(
        exec_path: Path,
        output_file: Path,
        database_file: Path|str|None,
        decompiled: bool,
        debug: bool,
        timeout: int|None,
        mode: ExporterMode,
    ) -> Path:
        """Run IDA headless export."""
        if idascript is None:
            raise QuokkaError(
                "idascript is not installed. Install it or use Ghidra backend."
            )

        stale_extensions = (".id0", ".id1", ".id2", ".til", ".nam")
        stale_files = [
            exec_path.parent / f"{exec_path.name}{ext}"
            for ext in stale_extensions
            if (exec_path.parent / f"{exec_path.name}{ext}").is_file()
        ]
        if stale_files:
            names = ", ".join(f.name for f in stale_files)
            raise StaleIDBError(
                f"Stale IDA database files found next to the binary: {names}\n"
                "These files prevent IDA from opening the binary in "
                "autonomous mode.\n"
                "Please delete them before re-exporting."
            )

        exec_file = exec_path
        if database_file is None:
            database_file = exec_file.parent / f"{exec_file.name}.i64"
        else:
            database_file = Path(database_file)

        if not database_file.is_file():
            database_path = database_file.with_suffix("")
        else:
            exec_path = database_file
            database_path = None

        ida = idascript.IDA(
            exec_path,
            script_file=None,
            script_params=[
                "QuokkaAuto:true",
                f"QuokkaDecompiled:{str(decompiled).lower()}",
                f"QuokkaMode:{mode.name}",
                f"QuokkaFile:{output_file}"],
            database_path=database_path,
            timeout=timeout,
        )
        ida.start()
        if (ret_code := ida.wait()) == idascript.IDA.TIMEOUT_RETURNCODE:
            Program.logger.error(
                f"Failed to export the binary {exec_path}. IDA timeout of {timeout}s reached."
            )
        elif ret_code != 0:
            Program.logger.error(
                f"Failed to export the binary {exec_path}. IDA returned code {ret_code}."
            )

        if debug:
            Program.logger.debug(f"IDA returned code {ret_code}")
            Program.logger.debug(ida.stderr.read().decode("utf-8"))

        if ret_code:  # Everything but 0 is an error
            raise QuokkaError(
                f"IDA failed to export the binary {exec_path}. "
                f"IDA returned code {ret_code}."
            )

        return output_file

    @staticmethod
    def _generate_ghidra(
        exec_path: Path,
        output_file: Path,
        debug: bool,
        timeout: int|None,
        mode: ExporterMode,
    ) -> Path:
        """Run Ghidra headless export."""
        import subprocess
        import tempfile

        env_dir = os.environ.get("GHIDRA_INSTALL_DIR")
        if not env_dir:
            raise QuokkaError(
                "Ghidra not found. Set the GHIDRA_INSTALL_DIR environment variable."
            )
        ghidra_dir = Path(env_dir)

        analyze_headless = ghidra_dir / "support" / "analyzeHeadless"
        if not analyze_headless.exists():
            raise QuokkaError(
                f"analyzeHeadless not found at {analyze_headless}"
            )

        # Verify extension is installed
        ext_dir = ghidra_dir / "Ghidra" / "Extensions"
        if not ext_dir.exists() or not any(ext_dir.rglob("QuokkaExporter*")):
            raise QuokkaError(
                "QuokkaExporter extension not found in "
                f"{ext_dir}. Install it first -- see ghidra_extension/README.md."
            )

        # Locate the headless export script.
        # Search order: installed extension dir, then repo source tree.
        script_path = None
        for d in ext_dir.rglob("ghidra_scripts"):
            if (d / "QuokkaExportHeadless.java").exists():
                script_path = d
                break
        if script_path is None:
            # Fall back to repo source tree (works for dev installs)
            repo_script = (
                Path(__file__).resolve().parent.parent.parent.parent
                / "ghidra_extension" / "src" / "script" / "ghidra_scripts"
            )
            if (repo_script / "QuokkaExportHeadless.java").exists():
                script_path = repo_script
        if script_path is None:
            raise QuokkaError(
                "QuokkaExportHeadless.java not found. Install the Ghidra "
                "extension or run from the quokka source tree."
            )

        # Map ExporterMode to Ghidra script mode names
        ghidra_mode = "LIGHT" if mode == ExporterMode.LIGHT else "SELF_CONTAINED"

        proj_dir = tempfile.mkdtemp(prefix="quokka_ghidra_")
        try:
            cmd = [
                str(analyze_headless),
                proj_dir,
                "QuokkaTmp",
                "-import", str(exec_path),
                "-scriptPath", str(script_path),
                "-postScript", "QuokkaExportHeadless.java",
                f"--out={output_file}",
                f"--mode={ghidra_mode}",
                "-readOnly",
            ]

            result = subprocess.run(
                cmd,
                capture_output=True,
                text=True,
                timeout=timeout if timeout else None,
            )

            if debug:
                Program.logger.debug(
                    f"Ghidra returned code {result.returncode}"
                )
                Program.logger.debug(result.stderr[-2000:] if result.stderr else "")

            if result.returncode != 0:
                raise QuokkaError(
                    f"Ghidra failed to export {exec_path} "
                    f"(rc={result.returncode}):\n"
                    f"{result.stderr[-2000:] if result.stderr else ''}"
                )

            if not output_file.exists():
                raise QuokkaError(
                    f"Ghidra export did not produce {output_file}.\n"
                    f"{result.stdout[-2000:] if result.stdout else ''}"
                )
        finally:
            import shutil
            shutil.rmtree(proj_dir, ignore_errors=True)

        return output_file

    @staticmethod
    def generate(
        exec_path: Path|str,
        output_file: Path|str|None = None,
        database_file: Path|str|None = None,
        decompiled: bool = False,
        debug: bool = False,
        override: bool = True,
        timeout: int|None = 600,
        mode: ExporterMode = ExporterMode.LIGHT,
        disassembler: Disassembler = Disassembler.UNKNOWN,
    ) -> Path:
        """Generate an export file directly from the binary.

        Arguments:
            exec_path: Binary to export.
            output_file: Where to store the result (by default: near the executable)
            database_file: Where to store IDA database (by default: near the executable)
            decompiled: Whether to export decompiled code (default: False)
            timeout: How long should we wait for the export to finish (default: 10 min)
            debug: Activate the debug output
            mode: Export mode (LIGHT or FULL)
            disassembler: Backend to use (auto-detect if UNKNOWN)

        Returns:
            Path to the generated .quokka file.

        Raises:
            QuokkaError: If the export fails
            FileNotFoundError: If the executable is not found
        """

        exec_path = Path(exec_path)
        if not exec_path.is_file():
            raise FileNotFoundError("Missing exec file")

        if output_file is None:
            output_file = exec_path.parent / f"{exec_path.name}.quokka"
        else:
            output_file = Path(output_file)

        if output_file.is_file() and not override:
            return output_file

        if disassembler is Disassembler.UNKNOWN:
            disassembler = Program._detect_disassembler()

        match disassembler:
            case Disassembler.GHIDRA:
                if decompiled:
                    Program.logger.warning(
                        "Ghidra export does not support decompilation yet; "
                        "ignoring --decompiled flag."
                    )
                return Program._generate_ghidra(
                    exec_path=exec_path,
                    output_file=output_file,
                    debug=debug,
                    timeout=timeout,
                    mode=mode,
                )
            case Disassembler.IDA:
                return Program._generate_ida(
                    exec_path=exec_path,
                    output_file=output_file,
                    database_file=database_file,
                    decompiled=decompiled,
                    debug=debug,
                    timeout=timeout,
                    mode=mode,
                )
            case _:
                raise QuokkaError(f"Unsupported disassembler: {disassembler}")


    def write(self, output_file: Path|str|None = None) -> None:
        """Write the program to a file

        Arguments:
            output_file: Where to write the program
        """
        if output_file is None:
            output_file = self.export_file
        with open(output_file, "wb") as fd:
            fd.write(self.proto.SerializeToString())

    def _commit_edits_ida(
        self,
        database_file: "Path|str",
        ida_path: "Path|str|None" = None,
        overwrite: bool = False,
        timeout: int = 600,
    ) -> int:
        """Apply in-memory edits to an IDA database by spawning a headless
        IDA instance.

        The quokka file and binary paths are taken from ``self``.  The
        caller must call :meth:`write` beforehand so the serialised
        ``.quokka`` on disk reflects the pending edits.

        Arguments:
            database_file: Path to the ``.i64`` database to modify.
                The database must already exist (e.g. from a prior export).
            ida_path: Optional path to the IDA installation directory.
                When *None*, ``idascript`` resolves it from ``IDA_PATH``
                or ``$PATH``.
            overwrite: If *True*, allow modifying an existing database.
                A warning is logged when an existing database is
                overwritten.  If *False* (the default) and the database
                already exists, a :class:`FileExistsError` is raised.
            timeout: Maximum seconds to wait for IDA (default 600).

        Returns:
            The number of errors reported by ``apply_quokka()`` inside
            IDA (0 means all edits applied successfully).

        Raises:
            FileExistsError: If *database_file* exists and *overwrite*
                is False.
            FileNotFoundError: If *database_file* does not exist and
                cannot be created (parent directory missing).
            RuntimeError: If IDA times out.
            QuokkaError: If IDA exits with an unexpected error.
        """
        assert idascript is not None, "idascript is required for IDA apply-back"

        database_file = Path(database_file)

        if database_file.exists():
            if not overwrite:
                raise FileExistsError(
                    f"Database already exists: {database_file}. "
                    "Pass overwrite=True to modify it."
                )
            self.logger.warning(
                "Overwriting existing IDA database: %s", database_file
            )

        # Point idascript at backends/ida/apply.py.  The script lives in
        # its own sub-package (not directly in backends/) so that IDA's
        # automatic sys.path insertion does not shadow the capstone package.
        script_file = Path(__file__).resolve().parent / "backends" / "ida" / "apply.py"
        script_params = [
            str(self.export_file),
            str(self.executable.exec_file),
        ]

        old_ida_path = os.environ.get("IDA_PATH")
        try:
            if ida_path is not None:
                os.environ["IDA_PATH"] = str(ida_path)

            ida = idascript.IDA(
                database_file,
                script_file=script_file,
                script_params=script_params,
                timeout=timeout,
                database_path=None,
            )
            ida.start()
            ret_code = ida.wait()
        finally:
            if ida_path is not None:
                if old_ida_path is None:
                    os.environ.pop("IDA_PATH", None)
                else:
                    os.environ["IDA_PATH"] = old_ida_path

        if ret_code == idascript.IDA.TIMEOUT_RETURNCODE:
            raise RuntimeError(
                f"IDA apply-back timed out after {timeout}s "
                f"on {database_file}"
            )

        return ret_code

    def commit(
        self,
        database_file: "Path|str|None" = None,
        ida_path: "Path|str|None" = None,
        overwrite: bool = True,
        timeout: int = 600,
    ) -> int:
        """Write the .quokka and apply edits to the disassembler database.

        Arguments:
            database_file: Path to the ``.i64`` database.  Required for
                IDA-originated programs.
            ida_path: Optional IDA installation path (IDA only).
            overwrite: Allow modifying an existing database (IDA only).
            timeout: IDA timeout in seconds (IDA only).

        Returns:
            The number of apply errors (0 = success).

        Raises:
            ValueError: If *database_file* is not provided for an IDA
                program.
        """
        self.write()
        match self.disassembler:
            case Disassembler.IDA:
                if database_file is None:
                    database_file = str(self.executable.exec_file) + ".i64"
                return self._commit_edits_ida(
                    database_file,
                    ida_path=ida_path,
                    overwrite=overwrite,
                    timeout=timeout,
                )
            case Disassembler.GHIDRA:
                pass
                # TODO: Call ghidra script with the write script
            case Disassembler.BINARY_NINJA:
                raise NotImplementedError("Binary Ninja export is not implemented yet")
            case _:
                raise NotImplementedError("Unknown disassembler")
        return 0


    def regenerate(
        self,
        database_file: "Path|str|None" = None,
        ida_path: "Path|str|None" = None,
        overwrite: bool = False,
        timeout: int = 600,
    ) -> 'Program':
        """Apply edits, re-export, and return a fresh Program.

        Calls :meth:`commit` to apply edits, then :meth:`generate` to
        produce a new ``.quokka`` file.

        Arguments:
            database_file: Path to the ``.i64`` database (required for
                IDA programs).
            ida_path: Optional IDA installation path.
            overwrite: Allow modifying an existing database.
            timeout: IDA timeout in seconds.

        Returns:
            A new Program instance with the updated data.

        Raises:
            QuokkaError: If applying changes or regenerating fails.
        """
        errors = self.commit(
            database_file=database_file,
            ida_path=ida_path,
            overwrite=overwrite,
            timeout=timeout,
        )
        if errors:
            self.logger.warning(
                "%d errors occurred while applying edits", errors
            )
        path = Program.generate(
            self.executable.exec_file,
            self.export_file,
            database_file=database_file,
            override=True,
        )
        return Program.open(path, self.executable.exec_file)

base_address property

Program base address

call_graph cached property

Compute the Call Graph of the binary

Every node in the call graph is a function.

:return: A Call Graph (a networkx DiGraph)

capstone cached property

Compute a capstone context

enums cached property

Enums accessor

Allows to retrieve the different enums of a program (as defined by the disassembler).

Returns:

Type Description
Iterable[EnumType]

A list of enums

functions property

Functions accessor

Allows to retrieve the different functions of a program (as defined by the disassembler).

Returns:

Type Description
Iterable[Function]

A list of functions

hash property

Returns the hash value of the binary (either sha256 or MD5).

headers property

Returns C-style headers of the binary

name property

Returns the underlying binary name

orphaned_blocks cached property

Orphaned blocks

Orphaned blocks are blocks that are not attached to any function. They can be useful to analyze code that is not in functions (e.g. hand-written assembly or jump tables).

Returns:

Type Description
Iterable[Block]

A list of orphaned blocks.

pypcode cached property

Generate the Pypcode context.

segments cached property

Returns the list of segments defined in the program.

structures property

Structures accessor

Allows to retrieve the different structures of a program (as defined by the disassembler).

Returns:

Type Description
Iterable[StructureType]

A list of structures

types property

Types in the program

Returns:

Type Description
Iterable[TypeT]

Iterable of types in the Program (excludes user-added types)

__hash__()

Hash of the Program (use the hash from the exported file)

Source code in bindings/python/quokka/program.py
186
187
188
def __hash__(self) -> int:
    """Hash of the Program (use the hash from the exported file)"""
    return int(self.proto.meta.hash.hash_value, 16)

__init__(export_file, exec_path)

Constructor

Source code in bindings/python/quokka/program.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def __init__(self, export_file: Path|str, exec_path: Path|str):
    """Constructor"""
    super(dict, self).__init__()

    self.proto: quokka.pb.Quokka = quokka.pb.Quokka()
    self.export_file: Path = Path(export_file)
    try:
        with lzma.open(self.export_file, "rb") as fd:
            raw_data = fd.read()
    except lzma.LZMAError:
        # try reading it as a plain-bytes Quokka (but should raise version mismatch later)
        with open(self.export_file, "rb") as fd:
            raw_data = fd.read()

    self.proto.ParseFromString(raw_data)

    # Export mode
    self.mode: ExporterMode = ExporterMode.from_proto(self.proto.exporter_meta.mode)

    # Version checking
    # A change in the major version might break backward compatibility
    proto_version = quokka.parse_version(self.proto.exporter_meta.version)
    current_version = quokka.parse_version(quokka.__version__)
    if proto_version[0] != current_version[0]:
        self.logger.warning(
            "The exported file has been generated by a different version of Quokka."
            f" The file has been generated by Quokka {self.proto.exporter_meta.version}"
            f" while you are using {quokka.__version__}"
        )
    elif self.proto.exporter_meta.version != quokka.__version__:
        self.logger.debug(
            "Version mismatch detected but still compatible with the exported file."
            f" The file has been generated by Quokka {self.proto.exporter_meta.version}"
            f" while you are using {quokka.__version__}"
        )

    # Check if the hashes matches between the export file and the exec
    if not quokka.check_hash(self.proto.meta.hash, Path(exec_path)):
        self.logger.error("Hash does not match with file.")
        raise quokka.QuokkaError("Hash mismatch")

    self.isa: quokka.analysis.ArchEnum = quokka.get_isa(self.proto.meta.isa)
    self.address_size: int = quokka.convert_address_size(
        self.proto.meta.address_size
    )
    self.arch: Type[quokka.analysis.QuokkaArch] = quokka.get_arch(
        self.isa, self.address_size
    )

    self.endianness: Endianness = Endianness.from_proto(self.proto.meta.endianess)

    # Get disassembly backend
    self.disassembler: Disassembler = Disassembler.from_proto(self.proto.meta.backend.name) 
    self.disassembler_version: str = self.proto.meta.backend.version
    self.decompiled_activated: bool = self.proto.meta.decompilation_activated

    self.calling_convention: CallingConvention = CallingConvention.from_proto(self.proto.meta.calling_convention)

    self.executable = quokka.Executable(exec_path, self.endianness)
    self.data = quokka.DataHolder(self.proto, self)

    # Functions
    # self.functions: Dict[int, quokka.Function] = {}
    self.fun_names: dict[str, quokka.Function] = {}
    for func_index, func in enumerate(self.proto.functions):
        function = quokka.Function(func_index, func, self)
        self[function.start] = function
        if function.name not in self.fun_names:
            self.fun_names[function.name] = function
        else:
            if function.type == self.fun_names[function.name]:
                self.logger.warning("Found two functions with the same name.")
            else:
                if function.type == FunctionType.NORMAL:
                    self.fun_names[function.name] = function

    # Types
    self._types: dict[Index, TypeT] = {}  # types as they are being loaded

__repr__()

Program representation

Source code in bindings/python/quokka/program.py
645
646
647
def __repr__(self) -> str:
    """Program representation"""
    return self.__str__()

__str__()

Program representation

Source code in bindings/python/quokka/program.py
649
650
651
def __str__(self) -> str:
    """Program representation"""
    return f"<Program {self.executable.exec_file.name} ({self.arch.__name__})>"

add_type(type)

Add a new user-defined type to the program.

Parameters:

Name Type Description Default
type str

A C type declaration string (e.g. "struct foo { int x; float y; }").

required

Raises:

Type Description
QuokkaError

If a type with the same name already exists.

Source code in bindings/python/quokka/program.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
def add_type(self, type: str) -> None:
    """Add a new user-defined type to the program.

    Args:
        type: A C type declaration string
            (e.g. ``"struct foo { int x; float y; }"``).

    Raises:
        QuokkaError: If a type with the same name already exists.
    """

    h = hashlib.sha256(type.encode("utf-8", errors="replace")).hexdigest()[:16]
    type_name = f"__user_type_{h}"
    new_index = len(self.proto.types)
    pb_type = self.proto.types.add()
    pb_type.is_new = True
    ct = pb_type.composite_type
    ct.name = type_name
    ct.type = Pb.CompositeType.TYPE_STRUCT
    ct.c_str = type

    # Check for duplicate names
    for i, t in enumerate(self.proto.types):
        if i == new_index:
            continue
        oneof = t.WhichOneof("OneofType")
        if oneof == "composite_type" and t.composite_type.name == type_name:
            # Remove the just-added entry
            del self.proto.types[new_index]
            raise QuokkaError(f"Type '{type_name}' already exists at index {i}")
        elif oneof == "enum_type" and t.enum_type.name == type_name:
            del self.proto.types[new_index]
            raise QuokkaError(f"Type '{type_name}' already exists at index {i}")

address_to_offset(address)

Converts a program offset to a file offset.

Parameters:

Name Type Description Default
address AddressT

A virtual address

required

Returns:

Type Description
int

A file offset

Source code in bindings/python/quokka/program.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def address_to_offset(self, address: AddressT) -> int:
    """Converts a program offset to a file offset.

    Arguments:
        address: A virtual address

    Returns:
        A file offset
    """
    try:
        segment = self.get_segment(address)
    except KeyError as exc:
        raise quokka.NotInFileError("Unable to find the segment") from exc

    if segment.file_offset != -1:
        return address - segment.address + segment.file_offset

    raise quokka.NotInFileError("Unable to find the offset in the file")

commit(database_file=None, ida_path=None, overwrite=True, timeout=600)

Write the .quokka and apply edits to the disassembler database.

Parameters:

Name Type Description Default
database_file 'Path|str|None'

Path to the .i64 database. Required for IDA-originated programs.

None
ida_path 'Path|str|None'

Optional IDA installation path (IDA only).

None
overwrite bool

Allow modifying an existing database (IDA only).

True
timeout int

IDA timeout in seconds (IDA only).

600

Returns:

Type Description
int

The number of apply errors (0 = success).

Raises:

Type Description
ValueError

If database_file is not provided for an IDA program.

Source code in bindings/python/quokka/program.py
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
def commit(
    self,
    database_file: "Path|str|None" = None,
    ida_path: "Path|str|None" = None,
    overwrite: bool = True,
    timeout: int = 600,
) -> int:
    """Write the .quokka and apply edits to the disassembler database.

    Arguments:
        database_file: Path to the ``.i64`` database.  Required for
            IDA-originated programs.
        ida_path: Optional IDA installation path (IDA only).
        overwrite: Allow modifying an existing database (IDA only).
        timeout: IDA timeout in seconds (IDA only).

    Returns:
        The number of apply errors (0 = success).

    Raises:
        ValueError: If *database_file* is not provided for an IDA
            program.
    """
    self.write()
    match self.disassembler:
        case Disassembler.IDA:
            if database_file is None:
                database_file = str(self.executable.exec_file) + ".i64"
            return self._commit_edits_ida(
                database_file,
                ida_path=ida_path,
                overwrite=overwrite,
                timeout=timeout,
            )
        case Disassembler.GHIDRA:
            pass
            # TODO: Call ghidra script with the write script
        case Disassembler.BINARY_NINJA:
            raise NotImplementedError("Binary Ninja export is not implemented yet")
        case _:
            raise NotImplementedError("Unknown disassembler")
    return 0

find_function_by_address(address)

Get a function by any address.

Parameters:

Name Type Description Default
address AddressT

AddressT: within the function

required

Returns:

Type Description
Function | None

A quokka.Function None if not belonging to any function

Source code in bindings/python/quokka/program.py
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
def find_function_by_address(self, address: AddressT) -> quokka.Function|None:
    """Get a function by any address.

    Arguments:
        address: AddressT: within the function

    Returns:
        A `quokka.Function` None if not belonging to any function
    """
    if address in self:
        return self[address]
    else:
        # try finding it in the functions
        for function in self.values():
            if function.in_function(address):
                return function
    return None

find_type(name)

Find a type by its name

Parameters:

Name Type Description Default
name str

Name of the type to find

required

Returns:

Type Description
TypeT | None

The corresponding type, None if not found

Source code in bindings/python/quokka/program.py
446
447
448
449
450
451
452
453
454
455
456
457
458
def find_type(self, name: str) -> TypeT|None:
    """Find a type by its name

    Arguments:
        name: Name of the type to find

    Returns:
        The corresponding type, None if not found
    """
    for t in self.types:
        if t.name == name:
            return t
    return None

from_binary(exec_path, output_file=None, database_file=None, decompiled=False, debug=False, override=True, timeout=0, mode=ExporterMode.LIGHT, disassembler=Disassembler.UNKNOWN) staticmethod

Generate an export file directly from the binary.

Parameters:

Name Type Description Default
exec_path Path | str

Binary to export.

required
output_file Path | str | None

Where to store the result (by default: near the executable)

None
database_file Path | str | None

Where to store IDA database (by default: near the executable)

None
decompiled bool

Whether to export decompiled code (default: False)

False
timeout int | None

How long should we wait for the export to finish (default: 10 min)

0
debug bool

Activate the debug output

False
mode ExporterMode

Export mode (LIGHT or FULL)

LIGHT
disassembler Disassembler

Backend to use (auto-detect if UNKNOWN)

UNKNOWN

Returns:

Type Description
Program | None

A Program instance or None

Raises:

Type Description
QuokkaError

If the export fails

FileNotFoundError

If the executable is not found

Source code in bindings/python/quokka/program.py
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
@staticmethod
def from_binary(
    exec_path: Path|str,
    output_file: Path|str|None = None,
    database_file: Path|str|None = None,
    decompiled: bool = False,
    debug: bool = False,
    override: bool = True,
    timeout: int|None = 0,
    mode: ExporterMode = ExporterMode.LIGHT,
    disassembler: Disassembler = Disassembler.UNKNOWN,
) -> Program|None:
    """Generate an export file directly from the binary.

    Arguments:
        exec_path: Binary to export.
        output_file: Where to store the result (by default: near the executable)
        database_file: Where to store IDA database (by default: near the executable)
        decompiled: Whether to export decompiled code (default: False)
        timeout: How long should we wait for the export to finish (default: 10 min)
        debug: Activate the debug output
        mode: Export mode (LIGHT or FULL)
        disassembler: Backend to use (auto-detect if UNKNOWN)

    Returns:
        A Program instance or None

    Raises:
        QuokkaError: If the export fails
        FileNotFoundError: If the executable is not found
    """
    quokka_file = Program.generate(
        exec_path=exec_path,
        output_file=output_file,
        database_file=database_file,
        decompiled=decompiled,
        override=override,
        debug=debug,
        timeout=timeout,
        mode=mode,
        disassembler=disassembler,
    )

    # In theory if reach here export file exists otherwise an exception has been raised
    if quokka_file.exists():
        return Program.open(quokka_file, exec_path)
    else:
        raise FileNotFoundError(f"Quokka generation failed, export file does not exist: {quokka_file}")

generate(exec_path, output_file=None, database_file=None, decompiled=False, debug=False, override=True, timeout=600, mode=ExporterMode.LIGHT, disassembler=Disassembler.UNKNOWN) staticmethod

Generate an export file directly from the binary.

Parameters:

Name Type Description Default
exec_path Path | str

Binary to export.

required
output_file Path | str | None

Where to store the result (by default: near the executable)

None
database_file Path | str | None

Where to store IDA database (by default: near the executable)

None
decompiled bool

Whether to export decompiled code (default: False)

False
timeout int | None

How long should we wait for the export to finish (default: 10 min)

600
debug bool

Activate the debug output

False
mode ExporterMode

Export mode (LIGHT or FULL)

LIGHT
disassembler Disassembler

Backend to use (auto-detect if UNKNOWN)

UNKNOWN

Returns:

Type Description
Path

Path to the generated .quokka file.

Raises:

Type Description
QuokkaError

If the export fails

FileNotFoundError

If the executable is not found

Source code in bindings/python/quokka/program.py
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
@staticmethod
def generate(
    exec_path: Path|str,
    output_file: Path|str|None = None,
    database_file: Path|str|None = None,
    decompiled: bool = False,
    debug: bool = False,
    override: bool = True,
    timeout: int|None = 600,
    mode: ExporterMode = ExporterMode.LIGHT,
    disassembler: Disassembler = Disassembler.UNKNOWN,
) -> Path:
    """Generate an export file directly from the binary.

    Arguments:
        exec_path: Binary to export.
        output_file: Where to store the result (by default: near the executable)
        database_file: Where to store IDA database (by default: near the executable)
        decompiled: Whether to export decompiled code (default: False)
        timeout: How long should we wait for the export to finish (default: 10 min)
        debug: Activate the debug output
        mode: Export mode (LIGHT or FULL)
        disassembler: Backend to use (auto-detect if UNKNOWN)

    Returns:
        Path to the generated .quokka file.

    Raises:
        QuokkaError: If the export fails
        FileNotFoundError: If the executable is not found
    """

    exec_path = Path(exec_path)
    if not exec_path.is_file():
        raise FileNotFoundError("Missing exec file")

    if output_file is None:
        output_file = exec_path.parent / f"{exec_path.name}.quokka"
    else:
        output_file = Path(output_file)

    if output_file.is_file() and not override:
        return output_file

    if disassembler is Disassembler.UNKNOWN:
        disassembler = Program._detect_disassembler()

    match disassembler:
        case Disassembler.GHIDRA:
            if decompiled:
                Program.logger.warning(
                    "Ghidra export does not support decompilation yet; "
                    "ignoring --decompiled flag."
                )
            return Program._generate_ghidra(
                exec_path=exec_path,
                output_file=output_file,
                debug=debug,
                timeout=timeout,
                mode=mode,
            )
        case Disassembler.IDA:
            return Program._generate_ida(
                exec_path=exec_path,
                output_file=output_file,
                database_file=database_file,
                decompiled=decompiled,
                debug=debug,
                timeout=timeout,
                mode=mode,
            )
        case _:
            raise QuokkaError(f"Unsupported disassembler: {disassembler}")

get_data(address)

Get data by address

Parameters:

Name Type Description Default
address AddressT

Address to query

required

Returns:

Type Description
Data

A data at the address

Raises:

Type Description
ValueError

When no data is found at the address

Source code in bindings/python/quokka/program.py
631
632
633
634
635
636
637
638
639
640
641
642
643
def get_data(self, address: AddressT) -> quokka.Data:
    """Get data by address

    Arguments:
        address: Address to query

    Returns:
        A data at the address

    Raises:
        ValueError: When no data is found at the address
    """
    return self.data[address]

get_function(name, approximative=False, normal=False)

Find a function in a program by its name

Parameters:

Name Type Description Default
name str

Function name

required
approximative bool

Should the name exactly match or allow partial matches?

False
normal bool

Return only FunctionType.NORMAL functions

False

Returns:

Type Description
Function

A function matching the research criteria

Raises:

Type Description
ValueError

When no function is found

Source code in bindings/python/quokka/program.py
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
def get_function(
    self, name: str, approximative: bool = False, normal: bool = False
) -> quokka.Function:
    """Find a function in a program by its name

    Arguments:
        name: Function name
        approximative: Should the name exactly match or allow partial matches?
        normal: Return only FunctionType.NORMAL functions

    Returns:
        A function matching the research criteria

    Raises:
        ValueError: When no function is found
    """
    if approximative is False:
        try:
            return self.fun_names[name]
        except KeyError as exc:
            raise ValueError("Missing function") from exc

    for function_name, function in self.fun_names.items():
        # TODO(dm) Improve this
        if name in function.name and (
            not normal or function.type == FunctionType.NORMAL
        ):
            return self.fun_names[function_name]

    raise ValueError("Unable to find an appropriate function")

get_instruction(address)

Get an instruction by its address

Note: the address must be the head of the instruction.

Parameters:

Name Type Description Default
address AddressT

AddressT: Address to query

required

Returns:

Type Description
Instruction

A quokka.Instruction

Raises:

Type Description
IndexError

When no instruction is found at this address

Source code in bindings/python/quokka/program.py
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
def get_instruction(self, address: AddressT) -> quokka.Instruction:
    """Get an instruction by its address

    Note: the address must be the head of the instruction.

    Arguments:
        address: AddressT: Address to query

    Returns:
        A `quokka.Instruction`

    Raises:
        IndexError: When no instruction is found at this address
    """
    for function in self.values():
        if function.in_function(address):
            try:
                return function.get_instruction(address)
            except IndexError:
                pass

    raise IndexError(f"No instruction at address 0x{address:x}")

get_item(addr)

Get a function, block or instruction by its address

Parameters:

Name Type Description Default
addr AddressT

Address to query

required

Returns:

Type Description
Function | Instruction

A function, block or instruction at the address

Raises:

Type Description
KeyError

When no item is found at the address

Source code in bindings/python/quokka/program.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def get_item(self, addr: AddressT) -> quokka.Function | quokka.Instruction:
    """Get a function, block or instruction by its address

    Arguments:
        addr: Address to query

    Returns:
        A function, block or instruction at the address

    Raises:
        KeyError: When no item is found at the address
    """
    try:
        return self[addr]
    except KeyError as exc:
        raise KeyError(f"No item at address 0x{addr:x}") from exc

get_segment(address)

Get a Segment by an address

The address must be in [segment.start, segment.end) to be found.

Parameters:

Name Type Description Default
address AddressT

Segment's address

required

Returns:

Type Description
Segment

The corresponding Segment

Raises:

Type Description
KeyError

When the segment is not found

Source code in bindings/python/quokka/program.py
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
def get_segment(self, address: AddressT) -> quokka.Segment:
    """Get a `Segment` by an address

    The address must be in [segment.start, segment.end) to be found.

    Arguments:
        address: Segment's address

    Returns:
        The corresponding Segment

    Raises:
        KeyError: When the segment is not found
    """
    for _, segment in self.segments.items():
        if segment.in_segment(address):
            return segment

    raise KeyError(f"No segment has been found for address {address:#x}")

get_struct_member(struct_index, member_index)

Get a structure member by its index

Parameters:

Name Type Description Default
struct_index Index

Index of the structure in the proto

required
member_index Index

Index of the member in the structure (starting from 0)

required

Returns:

Type Description
StructureTypeMember

The corresponding structure member

Raises:

Type Description
KeyError

When the structure or the member is not found

Source code in bindings/python/quokka/program.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def get_struct_member(self, struct_index: Index, member_index: Index) -> StructureTypeMember:
    """Get a structure member by its index

    Arguments:
        struct_index: Index of the structure in the proto
        member_index: Index of the member in the structure (starting from 0)

    Returns:
        The corresponding structure member

    Raises:
        KeyError: When the structure or the member is not found
    """
    try:
        struct = self._types[struct_index]
        assert isinstance(struct, StructureType)
        return struct[member_index]
    except KeyError as exc:
        raise KeyError(f"No structure or member with index {struct_index}") from exc

get_type(type_index, member_index=-1)

Get a type by its index with strict typing. For struct or enum members, returns the underlying type.

Parameters:

Name Type Description Default
type_index Index

Index of the type in the proto

required

Returns:

Type Description
TypeT

The corresponding type

Raises:

Type Description
KeyError

When the type is not found

ValueError

When the type is not of the expected type

Source code in bindings/python/quokka/program.py
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
def get_type(self, type_index: Index, member_index: int = -1) -> TypeT:
    """Get a type by its index with strict typing.
       For struct or enum members, returns the underlying type.

    Arguments:
        type_index: Index of the type in the proto

    Returns:
        The corresponding type

    Raises:
        KeyError: When the type is not found
        ValueError: When the type is not of the expected type
    """
    typ = self.get_type_reference(type_index, member_index)
    if isinstance(typ, EnumTypeMember):
        return typ.base_type
    elif isinstance(typ, StructureTypeMember):
        return typ.type   # A member cannot point to another member, so it must be a direct reference to a type
    else:
        return typ

get_type_reference(type_index, member_index=-1)

Get a type by its index

Parameters:

Name Type Description Default
type_index Index

Index of the type in the proto

required
member_index int

Index of the member in the type (if applicable)

-1

Returns:

Type Description
TypeReference

The corresponding type

Raises:

Type Description
KeyError

When the type is not found or is a user-added type

Source code in bindings/python/quokka/program.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
def get_type_reference(self, type_index: Index, member_index: int = -1) -> TypeReference:
    """Get a type by its index

    Arguments:
        type_index: Index of the type in the proto
        member_index: Index of the member in the type (if applicable)

    Returns:
        The corresponding type

    Raises:
        KeyError: When the type is not found or is a user-added type
    """
    try:
        typ = self._types[type_index]
    except KeyError:
        # Unless fill the type from the protobuf
        if type_index >= len(self.proto.types):
            raise KeyError(f"No type with index {type_index}")

        pb_type = self.proto.types[type_index]
        if pb_type.is_new:
            raise KeyError(f"Type at index {type_index} is a user-added type")
        is_new = pb_type.is_new
        if pb_type.WhichOneof("OneofType") == "enum_type":
            self._types[type_index] = EnumType(type_index, pb_type.enum_type, self, is_new=is_new)
        elif pb_type.WhichOneof("OneofType") == "composite_type":
            match pb_type.composite_type.type:
                case Pb.CompositeType.CompositeSubType.TYPE_STRUCT:
                    self._types[type_index] = StructureType(type_index, pb_type.composite_type, self, is_new=is_new)
                case Pb.CompositeType.CompositeSubType.TYPE_UNION:
                    self._types[type_index] = UnionType(type_index, pb_type.composite_type, self, is_new=is_new)
                case Pb.CompositeType.CompositeSubType.TYPE_ARRAY:
                    self._types[type_index] = ArrayType(type_index, pb_type.composite_type, self, is_new=is_new)
                case Pb.CompositeType.CompositeSubType.TYPE_POINTER:
                    self._types[type_index] = PointerType(type_index, pb_type.composite_type, self, is_new=is_new)
                case Pb.CompositeType.CompositeSubType.TYPE_TYPEDEF:
                    self._types[type_index] = TypedefType(type_index, pb_type.composite_type, self, is_new=is_new)
                case _:
                    # Unknown CompositeSubType -- degrade to TYPE_UNK for forward compat
                    self._types[type_index] = BaseType.UNKNOWN
        elif pb_type.WhichOneof("OneofType") == "primitive_type":
            self._types[type_index] = BaseType.from_proto(pb_type.primitive_type)
        else:
            assert False, "Unknown type"
        typ = self._types[type_index]  # here should be loaded in _types

    if isinstance(typ, ComplexType) and typ.is_new:
        raise KeyError(f"Type at index {type_index} is a user-added type")

    if member_index != -1:
        assert isinstance(typ, (StructureType, UnionType, EnumType))
        if isinstance(typ, StructureType) and not isinstance(typ, UnionType):
            return typ.member_at(member_index)
        return typ[member_index]
    else:
        return typ

get_type_resolved(type_index)

Get a type by index, resolving through any typedef chains.

This is equivalent to calling get_type() and then resolve() on the result if it is a TypedefType. Useful for consumers that do not care about typedef names and want the concrete type directly.

Source code in bindings/python/quokka/program.py
460
461
462
463
464
465
466
467
468
469
470
def get_type_resolved(self, type_index: Index) -> TypeT:
    """Get a type by index, resolving through any typedef chains.

    This is equivalent to calling get_type() and then resolve() on the
    result if it is a TypedefType. Useful for consumers that do not care
    about typedef names and want the concrete type directly.
    """
    t = self.get_type(type_index)
    while isinstance(t, TypedefType):
        t = t.aliased_type
    return t

open(export_file, exec_file) staticmethod

Open a BinExport file and return an instance of Program.

:param export_file: BinExport file path :param exec_file: Path to the executable file :return: an instance of Program

Source code in bindings/python/quokka/program.py
703
704
705
706
707
708
709
710
711
712
@staticmethod
def open(export_file: Path|str, exec_file: Path|str) -> Program:
    """
    Open a BinExport file and return an instance of Program.

    :param export_file: BinExport file path
    :param exec_file: Path to the executable file
    :return: an instance of Program
    """
    return Program(export_file, exec_file)

read_bytes(v_addr, size)

Read raw bytes from a virtual address

Parameters:

Name Type Description Default
v_addr AddressT

Virtual address of the data to read

required
size int

Size of the data to read

required

Returns:

Type Description
bytes

The raw data at the specified address

Source code in bindings/python/quokka/program.py
616
617
618
619
620
621
622
623
624
625
626
627
628
629
def read_bytes(self, v_addr: AddressT, size: int) -> bytes:
    """Read raw bytes from a virtual address

    Arguments:
        v_addr: Virtual address of the data to read
        size: Size of the data to read

    Returns:
        The raw data at the specified address
    """

    if (offset := v_addr - self.base_address) < 0:
        raise ValueError("Address outside virtual address space.")
    return self.executable.read_bytes(offset, size)

regenerate(database_file=None, ida_path=None, overwrite=False, timeout=600)

Apply edits, re-export, and return a fresh Program.

Calls :meth:commit to apply edits, then :meth:generate to produce a new .quokka file.

Parameters:

Name Type Description Default
database_file 'Path|str|None'

Path to the .i64 database (required for IDA programs).

None
ida_path 'Path|str|None'

Optional IDA installation path.

None
overwrite bool

Allow modifying an existing database.

False
timeout int

IDA timeout in seconds.

600

Returns:

Type Description
'Program'

A new Program instance with the updated data.

Raises:

Type Description
QuokkaError

If applying changes or regenerating fails.

Source code in bindings/python/quokka/program.py
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
def regenerate(
    self,
    database_file: "Path|str|None" = None,
    ida_path: "Path|str|None" = None,
    overwrite: bool = False,
    timeout: int = 600,
) -> 'Program':
    """Apply edits, re-export, and return a fresh Program.

    Calls :meth:`commit` to apply edits, then :meth:`generate` to
    produce a new ``.quokka`` file.

    Arguments:
        database_file: Path to the ``.i64`` database (required for
            IDA programs).
        ida_path: Optional IDA installation path.
        overwrite: Allow modifying an existing database.
        timeout: IDA timeout in seconds.

    Returns:
        A new Program instance with the updated data.

    Raises:
        QuokkaError: If applying changes or regenerating fails.
    """
    errors = self.commit(
        database_file=database_file,
        ida_path=ida_path,
        overwrite=overwrite,
        timeout=timeout,
    )
    if errors:
        self.logger.warning(
            "%d errors occurred while applying edits", errors
        )
    path = Program.generate(
        self.executable.exec_file,
        self.export_file,
        database_file=database_file,
        override=True,
    )
    return Program.open(path, self.executable.exec_file)

virtual_address(seg_id, seg_offset)

Converts an offset in the file to an absolute address

Parameters:

Name Type Description Default
seg_id int

Segment ID

required
seg_offset int

Byte offset in the segment

required

Returns:

Type Description
AddressT

An absolute address

Source code in bindings/python/quokka/program.py
254
255
256
257
258
259
260
261
262
263
264
def virtual_address(self, seg_id: int, seg_offset: int) -> AddressT:
    """Converts an offset in the file to an absolute address

    Arguments:
        seg_id: Segment ID
        seg_offset: Byte offset in the segment

    Returns:
        An absolute address
    """
    return self.segments[seg_id].address + seg_offset

write(output_file=None)

Write the program to a file

Parameters:

Name Type Description Default
output_file Path | str | None

Where to write the program

None
Source code in bindings/python/quokka/program.py
980
981
982
983
984
985
986
987
988
989
def write(self, output_file: Path|str|None = None) -> None:
    """Write the program to a file

    Arguments:
        output_file: Where to write the program
    """
    if output_file is None:
        output_file = self.export_file
    with open(output_file, "wb") as fd:
        fd.write(self.proto.SerializeToString())