Skip to content

Core API Reference

utils

Generic helpers

Modules:

  • analyzer

    Binary Analyzer utils

  • api

    Threaded HTTP server

  • database

    Generic abstraction layer for databases

  • repo

    Generic abstraction for storing files

Functions:

  • create_tar

    Create a tar.gz file containing the given files.

  • download_file

    Download a file from the given URL and return a file-like object containing its content.

  • extract_tar

    Unpack a tar file into the given directory.

  • get_appdata_dir

    Returns an OS-dependent application data directory.

  • get_hash

    Compute the SHA256 of the given data

  • get_minimal_paths

    Calculate the minimal common prefix path from a list of paths and return it

  • is_stdin_piped

    Return True if stdin is piped, False otherwise

  • is_stdout_piped

    Return True if stdout is piped, False otherwise

  • parse_menuconfig

    Parse menuconfig file and return a dict containing the configuration

  • parse_uri

    Parse an uri and return it's components

  • run_process

    Run the given command(s) in new process(es).

  • write_menuconfig

    Write menuconfig options into the given path in menuconfig format

create_tar

create_tar(base_name: Path, files: Sequence[Union[Path, str]]) -> BytesIO

Create a tar.gz file containing the given files.

Parameters:

  • base_name

    (Path) –

    The name of the resulting tar.gz file. This will be used to set relative paths for each file in the archive.

  • files (Sequence[Union[Path, str]]

    A sequence of str or Path objects representing files to include in the tar archive

Returns:

  • BytesIO ( BytesIO ) –

    A BytesIO object containing the tar.gz file data

Raises:

  • ValueError

    If the input list is empty.

  • TypeError

    If the input list contain elements that are neither str or Path objects.

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/__init__.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def create_tar(base_name: Path, files: Sequence[Union[Path, str]]) -> BytesIO:
    """
    Create a tar.gz file containing the given files.

    Args:
        base_name (Path): The name of the resulting tar.gz file.
                          This will be used to set relative paths for each file in the archive.
        files (Sequence[Union[Path, str]]: A sequence of str or Path objects representing files to include in the tar archive

    Returns:
        BytesIO: A BytesIO object containing the tar.gz file data

    Raises:
        ValueError: If the input list is empty.
        TypeError: If the input list contain elements that are neither str or Path objects.
    """

    if len(files) == 0:
        raise ValueError("Could not get minimal paths from an empty list")

    if not all(map(lambda e: isinstance(e, Path) or isinstance(e, str), files)):
        raise TypeError("Unsupported files types")

    # Cast everything to Path
    paths: List[Path] = list(map(Path, files))

    ret = BytesIO()
    with tarfile.open(fileobj=ret, mode="w:gz") as tar:
        for file in paths:
            if base_name == file:
                # Avoid adding a file with a arcname equal to '.'
                tar.add(
                    file,
                    arcname=str(
                        file.absolute().relative_to(base_name.parent.absolute())
                    ),
                )

            else:
                tar.add(
                    file, arcname=str(file.absolute().relative_to(base_name.absolute()))
                )

    # Return at the begining of the file
    ret.seek(0)
    return ret

download_file

download_file(url: str, timeout: int = 3600) -> Optional[BytesIO]

Download a file from the given URL and return a file-like object containing its content. Returns None if the URL is not valid or if an error occurs during the download.

Parameters:

  • url

    (str) –

    The URL of the file to be downloaded

  • timeout

    (int, default: 3600 ) –

    Timeout in seconds

Returns:

  • Optional[BytesIO]

    Optional[BytesIO]: A BytesIO object containing the file's content, or None if an error occurred or the input was invalid

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/__init__.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def download_file(url: str, timeout: int = 3600) -> Optional[BytesIO]:
    """
    Download a file from the given URL and return a file-like object containing its content.
    Returns None if the URL is not valid or if an error occurs during the download.

    Args:
        url (str): The URL of the file to be downloaded
        timeout (int): Timeout in seconds

    Returns:
        Optional[BytesIO]: A BytesIO object containing the file's content, or None
                           if an error occurred or the input was invalid
    """
    if not isinstance(url, str):
        return None

    try:
        req = requests.get(url, timeout=timeout)
        if req.status_code == 200:
            return BytesIO(req.content)
    except requests.exceptions.ConnectionError:
        pass

    return None

extract_tar

extract_tar(file: str | Path | BytesIO | bytes, to: Path) -> bool

Unpack a tar file into the given directory.

Parameters:

  • file

    (Union[str, Path, BytesIO, bytes]) –

    The source of the tar archive. Can be a file path as string or Path object, a BytesIO object, or raw bytes data.

  • to

    (Path) –

    The destination directory where files will be extracted

Returns:

  • bool ( bool ) –

    True on success and False otherwise

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/__init__.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def extract_tar(file: str | Path | BytesIO | bytes, to: Path) -> bool:
    """
    Unpack a tar file into the given directory.

    Args:
        file (Union[str, Path, BytesIO, bytes]): The source of the tar archive.
                                                  Can be a file path as string or Path object,
                                                  a BytesIO object, or raw bytes data.
        to (Path): The destination directory where files will be extracted

    Returns:
        bool: True on success and False otherwise
    """
    try:
        if isinstance(file, BytesIO):
            tar = tarfile.open(fileobj=file)
        elif isinstance(file, str):
            tar = tarfile.open(file)
        elif isinstance(file, Path):
            tar = tarfile.open(str(file))
        elif isinstance(file, bytes):
            tar = tarfile.open(fileobj=BytesIO(file))
        else:
            return False

        _safe_extract_tar(tar, to)
        tar.close()
        return True
    except tarfile.ReadError:
        pass

    return False

get_appdata_dir

get_appdata_dir() -> Path

Returns an OS-dependent application data directory.

Creates the directory and its parents if it doesn't exist.

Returns:

  • Path ( Path ) –

    The Path to the application data directory.

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/__init__.py
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
def get_appdata_dir() -> Path:
    """Returns an OS-dependent application data directory.

    Creates the directory *and its parents* if it doesn't exist.

    Returns:
        Path: The Path to the application data directory.
    """
    # Taken from crypto-condor, thanks @jlm for the code :D
    home: Path = Path.home()

    match sys.platform:
        case "linux":
            appdata = (
                Path(os.getenv("XDG_DATA_HOME", home / ".local" / "share"))
                / "sighthouse"
            )
        case "win32" | "cygwin":
            appdata = (
                Path(os.getenv("LOCALAPPDATA", home / "AppData" / "Local"))
                / "sighthouse"
            )
        case "darwin":
            appdata = home / "Library" / "Caches" / "sighthouse"
        case _:
            raise ValueError(
                f"Unsupported platform {sys.platform}, can't get appdata directory"
            )

    if not appdata.is_dir():
        appdata.mkdir(parents=True)

    return appdata

get_hash

get_hash(data: bytes) -> str

Compute the SHA256 of the given data

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/__init__.py
378
379
380
def get_hash(data: bytes) -> str:
    """Compute the SHA256 of the given data"""
    return sha256(data).hexdigest()

get_minimal_paths

get_minimal_paths(paths: Sequence[Union[Path, str]]) -> Tuple[Path, List[Path]]

Calculate the minimal common prefix path from a list of paths and return it along with the relative paths.

This function finds the shortest common prefix that all provided paths share, then returns this common prefix as well as a list of paths made relative to this common prefix. This is useful for normalizing file structures or comparing paths.

Parameters:

  • paths Sequence[Union[Path, str]]

    A sequence of string/Path objects representing path.

Returns:

  • Tuple[Path, List[Path]]

    Tuple[Path, List[Path]]: A tuple where the first element is the minimal common prefix as a Path object and the second element is a list of paths relative to this common prefix.

Raises:

  • ValueError

    If the input list is empty or if there's an error in finding a valid common prefix (should not occur under normal circumstances).

  • TypeError

    If the input list contain elements that are neither str or Path objects.

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/__init__.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
def get_minimal_paths(paths: Sequence[Union[Path, str]]) -> Tuple[Path, List[Path]]:
    """Calculate the minimal common prefix path from a list of paths and return it
    along with the relative paths.

    This function finds the shortest common prefix that all provided paths share,
    then returns this common prefix as well as a list of paths made relative to this
    common prefix. This is useful for normalizing file structures or comparing paths.

    Args:
        paths Sequence[Union[Path, str]]: A sequence of string/Path objects representing path.

    Returns:
        Tuple[Path, List[Path]]: A tuple where the first element is the minimal common
                                 prefix as a Path object and the second element is a list
                                 of paths relative to this common prefix.

    Raises:
        ValueError: If the input list is empty or if there's an error in finding a valid
                    common prefix (should not occur under normal circumstances).
        TypeError: If the input list contain elements that are neither str or Path objects.
    """

    if len(paths) == 0:
        raise ValueError("Could not get minimal paths from an empty list")

    if not all(map(lambda e: isinstance(e, Path) or isinstance(e, str), paths)):
        raise TypeError("Unsupported path types")

    # Cast everything to Path
    files: List[Path] = list(map(Path, paths))

    if len(paths) == 1:
        common_prefix = Path(files[0].parent)
        return common_prefix, [files[0].relative_to(common_prefix)]

    reference = files[0]
    for i, element in enumerate(reference.parts):
        for lst in files[1:]:
            if i >= len(lst.parts) or lst.parts[i] != element:
                common_prefix = Path(*reference.parts[:i])
                return common_prefix, [p.relative_to(common_prefix) for p in files]

    raise ValueError("Could not get minimal paths from list")

is_stdin_piped

is_stdin_piped() -> bool

Return True if stdin is piped, False otherwise

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/__init__.py
383
384
385
def is_stdin_piped() -> bool:
    """Return True if stdin is piped, False otherwise"""
    return not sys.stdin.isatty()

is_stdout_piped

is_stdout_piped() -> bool

Return True if stdout is piped, False otherwise

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/__init__.py
388
389
390
def is_stdout_piped() -> bool:
    """Return True if stdout is piped, False otherwise"""
    return not sys.stdout.isatty()

parse_menuconfig

parse_menuconfig(path: str | Path) -> Dict[str, Optional[str]]

Parse menuconfig file and return a dict containing the configuration

Parameters:

  • path

    ((str, Path)) –

    The path to the configuration file.

Returns:

  • dict ( Dict[str, Optional[str]] ) –

    A dictionary containing the options

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/__init__.py
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
def parse_menuconfig(path: str | Path) -> Dict[str, Optional[str]]:
    """Parse menuconfig file and return a dict containing the configuration

    Args:
        path (str, Path): The path to the configuration file.

    Returns:
        dict: A dictionary containing the options
    """
    with open(path, "r", encoding="utf-8") as fp:
        lines = [e.strip() for e in fp.readlines()]

    cfg: Dict[str, Optional[str]] = {}
    # Parse config into a dictionary
    for line in lines:
        if line.startswith("# ") and line.endswith(" is not set"):
            # Special option that we need to keep because this language is f#cking dumb
            # https://github.com/wbx-github/uclibc-ng/blob/v1.0.47/extra/config/confdata.c#L316
            name = line[2:-11].strip()
            cfg.update({name: None})  # Special value
        elif not line.startswith("#") and len(line) > 0:
            # Regular line
            key, value = line.split("=", 1)
            cfg.update({key: value})

    return cfg

parse_uri cached

parse_uri(uri: str) -> Dict[str, Any]

Parse an uri and return it's components

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/__init__.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
@functools.cache
def parse_uri(uri: str) -> Dict[str, Any]:
    """Parse an uri and return it's components"""
    parsed: ParseResult = urlparse(uri)
    kind: str = parsed.scheme
    path: str = ""
    data: Dict[str, Any] = {"type": kind}

    if kind == "sqlite":
        if parsed.netloc == ":memory:" or parsed.path == "/:memory:":
            data.update({"database": ":memory:"})
            return data

        # Handle relative or absolute paths
        if parsed.netloc:  # sqlite://localhost/path.db
            path = f"/{parsed.netloc}{parsed.path}"
        else:  # sqlite:///path.db
            path = parsed.path

        path = unquote(
            path[1:] if path[0] == "/" else path
        )  # remove leading slash for relative paths on Unix
        data.update({"database": Path(path).absolute()})

    elif kind in ["postgres", "postgresql"]:
        # Normalize type
        data.update(
            {
                "type": "postgresql",
                "dbname": parsed.path.lstrip("/"),
                "user": parsed.username,
                "password": parsed.password,
                "host": parsed.hostname,
                "port": parsed.port or 5432,
            }
        )

    elif kind in ["elastic"]:
        data.update(
            {
                "dbname": parsed.path.lstrip("/"),
                "user": parsed.username,
                "password": parsed.password,
                "host": parsed.hostname,
                "port": parsed.port or 5432,
            }
        )

    elif kind == "mysql":
        data.update(
            {
                "dbname": parsed.path.lstrip("/"),
                "user": parsed.username,
                "password": parsed.password,
                "host": parsed.hostname,
                "port": parsed.port or 3306,
            }
        )

    elif kind == "local":
        # Handle relative or absolute paths
        if parsed.netloc:  # local://path/to/path.db
            path = unquote(f"{parsed.netloc}{parsed.path}")
        else:  # local:///path.db
            # Remove leading slash for relative paths on Unix
            path = unquote(parsed.path)

        data.update({"database": Path(path).absolute()})

    elif kind == "s3":
        path = parsed.path.lstrip("/")
        if "/" in path:
            # There is a least one '/', safe to split
            bucket, directory = path.split("/", 1)
        else:
            # No directory, use root
            bucket, directory = path, "/"

        # Append leading '/' if needed
        if not directory.startswith("/"):
            directory = "/" + directory

        data.update(
            {
                "dbname": bucket,
                "directory": directory,
                "user": parsed.username,
                "password": parsed.password,
                "host": parsed.hostname,
                "port": parsed.port or 9000,
            }
        )

    else:
        raise ValueError(f"Unsupported URI scheme: {kind}")

    return data

run_process

run_process(process_args: Union[List[str], List[List[str]]], capture_output: bool = False, env: Optional[Dict[str, str]] = None, cwd: Optional[Union[str, Path]] = None, timeout: float = -1.0) -> Tuple[int, bytes, bytes]

Run the given command(s) in new process(es). Supports pipes when process_args is a list of lists.

Supports both single commands and pipes.

Parameters:

  • process_args

    (Union[List[str], List[List[str]]]) –

    List of command argument lists. Single command: ['echo', 'aaa']. Pipes: [['echo', 'aaa'], ['sed', 's/a/b/g']].

  • capture_output

    (bool, default: False ) –

    If True, capture stdout/stderr. Otherwise print to console.

  • env

    (Optional[Dict[str, str]], default: None ) –

    Optional environment variables dictionary.

  • cwd

    (Optional[Union[str, Path]], default: None ) –

    Optional working directory.

  • timeout

    (float, default: -1.0 ) –

    Timeout in seconds. -1 for no timeout.

Returns:

  • int

    Tuple of (returncode, stdout, stderr). If capture_output is False, stdout/stderr

  • bytes

    are empty bytes.

Examples:

Single command: >>> run_process(['echo', 'aaa'], capture_output=True) (0, b'aaa\n', b'')

Pipeline: >>> run_process([['echo', 'aaa'], ['sed', 's/a/b/g']], capture_output=True) (0, b'bbb\n', b'')

With timeout: >>> run_process(['sleep', '10'], timeout=2, capture_output=True) (-9, b'', b'')

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/__init__.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def run_process(
    process_args: Union[List[str], List[List[str]]],
    capture_output: bool = False,
    env: Optional[Dict[str, str]] = None,
    cwd: Optional[Union[str, Path]] = None,
    timeout: float = -1.0,
) -> Tuple[int, bytes, bytes]:
    """Run the given command(s) in new process(es).
    Supports pipes when process_args is a list of lists.

    Supports both single commands and pipes.

    Args:
        process_args: List of command argument lists. Single command: ['echo', 'aaa'].
            Pipes: [['echo', 'aaa'], ['sed', 's/a/b/g']].
        capture_output: If True, capture stdout/stderr. Otherwise print to console.
        env: Optional environment variables dictionary.
        cwd: Optional working directory.
        timeout: Timeout in seconds. -1 for no timeout.

    Returns:
        Tuple of (returncode, stdout, stderr). If capture_output is False, stdout/stderr
        are empty bytes.

    Examples:
        Single command:
            >>> run_process(['echo', 'aaa'], capture_output=True)
            (0, b'aaa\\n', b'')

        Pipeline:
            >>> run_process([['echo', 'aaa'], ['sed', 's/a/b/g']], capture_output=True)
            (0, b'bbb\\n', b'')

        With timeout:
            >>> run_process(['sleep', '10'], timeout=2, capture_output=True)
            (-9, b'', b'')
    """
    # Handle single command list as [[cmd]]
    if isinstance(process_args[0], str):
        process_args = [process_args]  # type: ignore

    procs: List[Popen] = []
    if len(process_args) == 1:
        # Single process
        kwargs: Dict[str, Any] = {"env": env, "cwd": cwd}
        if capture_output:
            kwargs.update({"stdout": PIPE, "stderr": PIPE})

        proc: Popen = Popen(process_args[0], **kwargs)
        procs.append(proc)
    else:
        # Pipes: chain with stdin=previous.stdout
        # First process
        first_kwargs: Dict[str, Any] = {"env": env, "cwd": cwd, "stdout": PIPE}
        procs.append(Popen(process_args[0], **first_kwargs))

        # Chain all subsequent processes
        for i in range(1, len(process_args)):
            kwargs = {"env": env, "cwd": cwd, "stdin": procs[i - 1].stdout}
            if i == len(process_args) - 1:
                if capture_output:
                    # Last process captures output
                    kwargs.update({"stdout": PIPE, "stderr": PIPE})
            else:
                # Middle processes continue piping
                kwargs.update({"stdout": PIPE})

            proc = Popen(process_args[i], **kwargs)
            procs.append(proc)

        # Close first process stdout to prevent deadlock
        if len(procs) > 0 and procs[0].stdout:
            procs[0].stdout.close()

    # Wait logic with timeout
    returncode: int = 0
    stdout: bytes = b""
    stderr: bytes = b""

    if timeout > 0:
        state: Dict[str, bool] = {"interrupted": False}

        # Interrupted callback
        def kill_pipeline(state) -> None:
            state["interrupted"] = True
            for proc in procs:
                proc.kill()

        timer: Timer = Timer(timeout, kill_pipeline, args=(state,))
        try:
            timer.start()
            if capture_output or len(process_args) > 1:
                stdout, stderr = procs[-1].communicate()
            returncode = procs[-1].wait()
        finally:
            timer.cancel()

        if state["interrupted"]:
            raise Exception(f"Process timeout after {timeout} seconds")
    else:
        if capture_output or len(process_args) > 1:
            stdout, stderr = procs[-1].communicate()
        returncode = procs[-1].wait()

    return (returncode, stdout or b"", stderr or b"")

write_menuconfig

write_menuconfig(path: str | Path, options: dict) -> None

Write menuconfig options into the given path in menuconfig format

Parameters:

  • path

    ((str, Path)) –

    The path to the configuration file.

  • options

    (dict) –

    A dictionary containing the options

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/__init__.py
456
457
458
459
460
461
462
463
464
465
466
467
468
469
def write_menuconfig(path: str | Path, options: dict) -> None:
    """Write menuconfig options into the given path in menuconfig format

    Args:
        path (str, Path): The path to the configuration file.
        options (dict): A dictionary containing the options
    """
    with open(path, "w", encoding="utf-8") as fp:
        for key, value in options.items():
            if value:
                fp.write(f"{key}={value}\n")
            else:
                # Undefined -> value is not set
                fp.write(f"# {key} is not set\n")

api

Threaded HTTP server

Classes:

  • ServerThread

    A threaded HTTP server that runs a Flask application.

ServerThread

ServerThread(app: Flask, host: str, port: int)

Bases: Thread

A threaded HTTP server that runs a Flask application.

This class extends the Thread class to host a Flask application in a separate thread, allowing for non-blocking operation in applications that need to handle concurrent requests. The server will run until explicitly shut down.

Parameters:

  • app

    (Flask) –

    The Flask application instance that this server will host.

  • host

    (str) –

    The host address (e.g., '127.0.0.1' or '0.0.0.0') on which the server will listen for incoming requests.

  • port

    (int) –

    The port number (typically between 1024 and 65535) on which the server will accept connections.

Methods:

  • run

    Starts the HTTP server, listening for incoming requests

  • shutdown

    Shuts down the running server gracefully.

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/api.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def __init__(self, app: "Flask", host: str, port: int):  # type: ignore[name-defined]
    """
    Initializes a new instance of the ServerThread class.

    Args:
        app (Flask): The Flask application instance that this server will host.
        host (str): The host address (e.g., '127.0.0.1' or '0.0.0.0') on which the server
                    will listen for incoming requests.
        port (int): The port number (typically between 1024 and 65535) on which the server
                    will accept connections.
    """
    Thread.__init__(self)
    self.__app = app
    self.__ctx = app.app_context()
    self.__ctx.push()
    self.__host = host
    self.__port = port
    self.__server: BaseWSGIServer = make_server(
        self.__host, self.__port, self.__app, threaded=True
    )

run

run() -> None

Starts the HTTP server, listening for incoming requests and serving the Flask application.

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/api.py
37
38
39
40
def run(self) -> None:
    """Starts the HTTP server, listening for incoming requests
    and serving the Flask application."""
    self.__server.serve_forever()

shutdown

shutdown() -> None

Shuts down the running server gracefully.

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/api.py
42
43
44
def shutdown(self) -> None:
    """Shuts down the running server gracefully."""
    self.__server.shutdown()

repo

Generic abstraction for storing files

Classes:

  • Repo

    A class that abstract the storage of files. Repository can be a local file based

Repo

Repo(uri: str, exist_ok: bool = False, secure: bool = True)

A class that abstract the storage of files. Repository can be a local file based or rely on a S3 compatible server to store and retrieve files.

Parameters:

  • uri

    (str) –

    The URI of the repository which could be local or S3. Local URIs typically start with "file://" and S3 URIs start with "s3://".

Methods:

  • delete_file

    Deletes the specified file from either local filesystem or S3.

  • download_sharefile

    Downloads and returns the content of a file from a given URL or local path.

  • get_file

    Retrieves the content of the specified file from either local filesystem or S3.

  • get_sharefile

    Returns the path or URL for sharing the file.

  • list_directory

    Lists all files in the specified directory from either local filesystem or S3.

  • push_file

    Pushes or uploads a file to the specified path in either local filesystem or S3.

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/repo.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def __init__(self, uri: str, exist_ok: bool = False, secure: bool = True):
    """
    Initializes a repository instance.

    Args:
        uri (str): The URI of the repository which could be local or S3.
                   Local URIs typically start with "file://" and S3 URIs
                   start with "s3://".
    """
    self._uri = parse_uri(uri)
    self._uri["uri"] = uri
    self._client: Any = None

    if self._uri["type"] == "local":
        full_path = Path(self._uri["database"])
        if not full_path.exists() and not exist_ok:
            raise FileNotFoundError(f"Directory '{full_path}' does not exists")

    elif self._uri["type"] == "s3":
        self._client = Minio(
            endpoint=f"{self._uri['host']}:{self._uri['port']}",
            access_key=self._uri["user"],
            secret_key=self._uri["password"],
            secure=secure,
        )
    else:
        raise ValueError(f"Unsupported URI scheme: {self._uri.get('type')}")

delete_file

delete_file(upload_path: str) -> None

Deletes the specified file from either local filesystem or S3.

Parameters:

  • upload_path
    (str) –

    The path of the file to be deleted.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/repo.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def delete_file(self, upload_path: str) -> None:
    """
    Deletes the specified file from either local filesystem or S3.

    Args:
        upload_path (str): The path of the file to be deleted.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self._uri["type"] == "local":
        full_path = Path(upload_path)
        if not full_path.is_absolute():
            full_path = (self._uri["database"] / full_path).resolve()
        elif not full_path.is_relative_to(self._uri["database"]):
            root = Path(self._uri["database"])
            full_path = root / full_path.relative_to(full_path.anchor)

        if not full_path.exists() or not full_path.is_relative_to(
            self._uri["database"]
        ):
            raise FileNotFoundError(
                f"The given file does not exists: '{full_path}'"
            )

        full_path.unlink()
        # Remove empty dir up to root
        for parent in full_path.relative_to(self._uri["database"]).parents:
            try:
                parent.rmdir()  # Remove only if directory is empty
            except OSError:
                break

    elif self._uri["type"] == "s3":
        self._client.remove_object(
            self._uri["dbname"],
            str(Path(self._uri["directory"], upload_path).resolve()),
        )

    else:
        raise ValueError(f"Unsupported URI scheme: {self._uri.get('type')}")

download_sharefile staticmethod

download_sharefile(url: str, timeout: int = 3600) -> Optional[bytes]

Downloads and returns the content of a file from a given URL or local path.

Parameters:

  • url
    (str) –

    The URL or local path to the file to be downloaded.

  • timeout
    (int, default: 3600 ) –

    Timeout in seconds

Returns:

  • Optional[bytes]

    bytes | None: The content of the file if found, otherwise None.

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/repo.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
@staticmethod
def download_sharefile(url: str, timeout: int = 3600) -> Optional[bytes]:
    """
    Downloads and returns the content of a file from a given URL or local path.

    Args:
        url (str): The URL or local path to the file to be downloaded.
        timeout (int): Timeout in seconds

    Returns:
        bytes | None: The content of the file if found, otherwise None.
    """
    if url.startswith("http://") or url.startswith("https://"):
        file = download_file(url, timeout=timeout)
        if file is not None:
            return file.read()

    if url.startswith("/"):
        with open(url, "rb") as f:
            return f.read()

    return None

get_file

get_file(upload_path: str) -> Optional[bytes]

Retrieves the content of the specified file from either local filesystem or S3.

Parameters:

  • upload_path
    (str) –

    The path of the file to be retrieved.

Returns:

  • Optional[bytes]

    bytes | None: The content of the file if found, otherwise None.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/repo.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def get_file(self, upload_path: str) -> Optional[bytes]:
    """
    Retrieves the content of the specified file from either local filesystem or S3.

    Args:
        upload_path (str): The path of the file to be retrieved.

    Returns:
        bytes | None: The content of the file if found, otherwise None.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self._uri["type"] == "local":
        full_path = Path(upload_path)
        if not full_path.is_absolute():
            full_path = (self._uri["database"] / full_path).resolve()
        elif not full_path.is_relative_to(self._uri["database"]):
            root = Path(self._uri["database"])
            full_path = root / full_path.relative_to(full_path.anchor)

        if not full_path.exists() or not full_path.is_relative_to(
            self._uri["database"]
        ):
            raise FileNotFoundError(
                f"The given file does not exists: '{full_path}'"
            )

        with open(full_path, "rb") as f:
            return f.read()

    elif self._uri["type"] == "s3":
        response = self._client.get_object(
            self._uri["dbname"],
            str(Path(self._uri["directory"], upload_path).resolve()),
        )
        return response.data if response else None

    else:
        raise ValueError(f"Unsupported URI scheme: {self._uri.get('type')}")

get_sharefile

get_sharefile(upload_path: str) -> Path | str

Returns the path or URL for sharing the file.

Parameters:

  • upload_path
    (str) –

    The path of the file to be shared.

Returns:

  • Path | str

    Path | str: A POSIX absolute path if local, a pre-signed URL if S3.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/repo.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def get_sharefile(self, upload_path: str) -> Path | str:
    """
    Returns the path or URL for sharing the file.

    Args:
        upload_path (str): The path of the file to be shared.

    Returns:
        Path | str: A POSIX absolute path if local, a pre-signed URL if S3.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self._uri["type"] == "local":
        full_path = Path(upload_path)
        if not full_path.is_absolute():
            full_path = (self._uri["database"] / full_path).resolve()
        elif not full_path.is_relative_to(self._uri["database"]):
            root = Path(self._uri["database"])
            full_path = root / full_path.relative_to(full_path.anchor)

        if not full_path.exists() or not full_path.is_relative_to(
            self._uri["database"]
        ):
            raise FileNotFoundError(
                f"The given file does not exists: '{full_path}'"
            )

        return full_path.absolute().as_posix()

    if self._uri["type"] == "s3":
        return self._client.get_presigned_url(
            "GET",
            self._uri["dbname"],
            str(Path(self._uri["directory"], upload_path).resolve()),
        )

    raise ValueError(f"Unsupported URI scheme: {self._uri.get('type')}")

list_directory

list_directory(path: str | Path) -> list[str]

Lists all files in the specified directory from either local filesystem or S3.

Parameters:

  • path
    (str | Path) –

    The directory to be listed.

Returns:

  • list[str]

    list[str]: A list of file names and directories within the specified path.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/repo.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def list_directory(self, path: str | Path) -> list[str]:
    """
    Lists all files in the specified directory from either local filesystem or S3.

    Args:
        path (str | Path): The directory to be listed.

    Returns:
        list[str]: A list of file names and directories within the specified path.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self._uri["type"] == "local":
        full_path = Path(path)
        if not full_path.is_absolute():
            full_path = (self._uri["database"] / full_path).resolve()
        elif not full_path.is_relative_to(self._uri["database"]):
            root = Path(self._uri["database"])
            full_path = root / full_path.relative_to(full_path.anchor)

        if (
            not full_path.exists()
            or not full_path.is_dir()
            or not full_path.is_relative_to(self._uri["database"])
        ):
            raise FileNotFoundError(
                f"The given file does not exists: '{full_path}'"
            )

        return list(map(str, full_path.iterdir()))

    if self._uri["type"] == "s3":
        return list(
            map(
                lambda e: e.object_name,
                self._client.list_objects(self._uri["dbname"], prefix=str(path)),
            )
        )

    raise ValueError(f"Unsupported URI scheme: {self._uri.get('type')}")

push_file

push_file(upload_path: str, content: bytes) -> bool

Pushes or uploads a file to the specified path in either local filesystem or S3.

Parameters:

  • upload_path
    (str) –

    The path where the file should be uploaded.

  • content
    (bytes) –

    The content of the file to be uploaded.

Returns:

  • bool ( bool ) –

    True if successful, False otherwise.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/repo.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def push_file(self, upload_path: str, content: bytes) -> bool:
    """
    Pushes or uploads a file to the specified path in either local filesystem or S3.

    Args:
        upload_path (str): The path where the file should be uploaded.
        content (bytes): The content of the file to be uploaded.

    Returns:
        bool: True if successful, False otherwise.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self._uri["type"] == "local":
        full_path = Path(upload_path)
        if not full_path.is_absolute():
            full_path = (self._uri["database"] / full_path).resolve()
        elif not full_path.is_relative_to(self._uri["database"]):
            root = Path(self._uri["database"])
            full_path = root / full_path.relative_to(full_path.anchor)

        if not full_path.is_relative_to(self._uri["database"]):
            raise FileNotFoundError(
                f"The given file does not exists: '{full_path}'"
            )

        # write file on filesystem
        full_path.parent.mkdir(parents=True, exist_ok=True)
        with open(full_path, "wb") as fp:
            fp.write(content)

    elif self._uri["type"] == "s3":
        self._client.put_object(
            self._uri["dbname"],
            str(Path(self._uri["directory"], upload_path).resolve()),
            BytesIO(content),
            len(content),
        )

    else:
        raise ValueError(f"Unsupported URI scheme: {self._uri.get('type')}")

    return True

database

Generic abstraction layer for databases

Classes:

  • Database

    A class to manage connections and operations for various database types,

Database

Database(uri: str, exist_ok: bool = False)

A class to manage connections and operations for various database types, including SQLite, PostgreSQL, and MySQL.

Parameters:

  • uri

    (str) –

    The URI string used to parse connection parameters.

  • exist_ok

    (bool, default: False ) –

    If True, allows the creation of new databases. Defaults to False.

Methods:

  • close

    Close the database connection safely. This method is Thread safe.

  • connect

    Establish a connection to the database using parameters parsed from the URI.

  • execute

    Execute an SQL query with optional parameters. This method is Thread safe.

  • fetch

    Fetch results from a query, with support for fetching all, one, or many rows.

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/database.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def __init__(self, uri: str, exist_ok: bool = False):
    """
    Initialize a Database object with URI and Logger, optionally creating new DB.

    Args:
        uri (str): The URI string used to parse connection parameters.
        exist_ok (bool): If True, allows the creation of new databases.
                         Defaults to False.
    """
    self._lock: Lock = Lock()

    self._uri: str = uri
    self._type: Optional[str] = None
    self._db: Any = None
    self.connect(exist_ok)

close

close() -> None

Close the database connection safely. This method is Thread safe.

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/database.py
217
218
219
220
221
222
223
224
225
def close(self) -> None:
    """
    Close the database connection safely. This method is Thread safe.
    """
    if self._type == "sqlite":
        with self._lock:
            self.__unsafe_close()
    else:
        self.__unsafe_close()

connect

connect(exist_ok: bool = True) -> None

Establish a connection to the database using parameters parsed from the URI.

Parameters:

  • exist_ok
    (bool, default: True ) –

    If True, allows the creation of new databases. Defaults to True.

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/database.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def connect(self, exist_ok: bool = True) -> None:
    """
    Establish a connection to the database using parameters parsed from the URI.

    Args:
        exist_ok (bool): If True, allows the creation of new databases.
                         Defaults to True.
    """
    params: Dict[str, Any] = parse_uri(self._uri)
    self._type = params["type"]

    if self._type == "sqlite":
        import sqlite3

        if params["database"] != ":memory:":
            database = Path(params["database"])
            # Refuse to open a non existing database
            if not exist_ok and not database.exists():
                raise FileNotFoundError(
                    f"Invalid database path: '{params.get('database')}'. Database should exists"
                )

            # Create parent directory if needed
            database.parent.mkdir(parents=True, exist_ok=True)

        self._db = sqlite3.connect(params["database"], check_same_thread=False)
    elif self._type in ["postgres", "postgresql"]:
        import psycopg

        conn_details = dict(params)
        del conn_details["type"]
        self._db = psycopg.connect(**conn_details)
    elif self._type == "mysql":
        import mysql.connector  # type: ignore[import-not-found]

        self._db = mysql.connector.connect(**params)
    else:
        raise ValueError(f"Unsupported URI scheme: {self._type}")

execute

execute(query: str, params: Optional[Tuple] = None) -> Optional[int]

Execute an SQL query with optional parameters. This method is Thread safe.

Parameters:

  • query
    (str) –

    The SQL command to execute.

  • params
    (Optional[Tuple], default: None ) –

    Optional parameters for the query execution.

Returns:

  • Optional[int]

    Optional[int]: An optional integer corresponding to the ID of the inserted row.

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/database.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def execute(self, query: str, params: Optional[Tuple] = None) -> Optional[int]:
    """
    Execute an SQL query with optional parameters. This method is Thread safe.

    Args:
        query: The SQL command to execute.
        params: Optional parameters for the query execution.

    Returns:
        Optional[int]: An optional integer corresponding to the ID of the inserted row.
    """
    if self._type == "sqlite":
        with self._lock:
            return self.__unsafe_execute(query, params=params)
    else:
        return self.__unsafe_execute(query, params=params)

fetch

fetch(request: str, parameters: Tuple = (), mode: str = 'all') -> List[Tuple]

Fetch results from a query, with support for fetching all, one, or many rows. This method is Thread safe.

Parameters:

  • request
    (str) –

    The SQL query to execute.

  • parameters
    (tuple, default: () ) –

    Optional parameters for the query execution. Defaults to an empty tuple.

  • mode
    (str, default: 'all' ) –

    Mode of fetching: 'all', 'one', or 'many'. Defaults to 'all'.

Returns:

  • List[Tuple]

    list[tuple]: Result set from the executed query.

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/database.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def fetch(
    self, request: str, parameters: Tuple = (), mode: str = "all"
) -> List[Tuple]:
    """
    Fetch results from a query, with support for fetching all, one, or many rows.
    This method is Thread safe.

    Args:
        request (str): The SQL query to execute.
        parameters (tuple): Optional parameters for the query execution.
                            Defaults to an empty tuple.
        mode (str): Mode of fetching: 'all', 'one', or 'many'. Defaults to 'all'.

    Returns:
        list[tuple]: Result set from the executed query.
    """
    if self._type == "sqlite":
        with self._lock:
            return self.__unsafe_fetch(request, parameters=parameters, mode=mode)

    return self.__unsafe_fetch(request, parameters=parameters, mode=mode)

analyzer

Binary Analyzer utils

Functions:

build_script

build_script(ghidradir: Path, script_dir: Path) -> None

Compile Ghidra scripts to .class files to speed up loading process and avoid random OSGi errors

Reversed from https://github.com/NationalSecurityAgency/ghidra/blob/ 7dd38f2d95597c618af3d921f950fd6674805dd2/Ghidra/Features/ Base/src/main/java/ghidra/app/plugin/core/osgi/GhidraSourceBundle.java#L1019

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/analyzer.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def build_script(ghidradir: Path, script_dir: Path) -> None:
    """Compile Ghidra scripts to .class files to speed up loading process
    and avoid random OSGi errors

    Reversed from https://github.com/NationalSecurityAgency/ghidra/blob/
                  7dd38f2d95597c618af3d921f950fd6674805dd2/Ghidra/Features/
                  Base/src/main/java/ghidra/app/plugin/core/osgi/GhidraSourceBundle.java#L1019
    """
    # Check if the GHIDRA_DIR path exists
    if not ghidradir.exists() or not ghidradir.is_dir():
        raise FileNotFoundError(f"The path specified in '{ghidradir}' does not exist")

    source_files = script_dir.rglob("*.java")
    source_path = script_dir

    # Find all jar of Ghidra and create the classpath argument
    jars = ":".join(map(str, ghidradir.rglob("**/*.jar")))
    classpath = f".:{jars}"

    # Iterate over the all the scripts to compile
    for source_file in source_files:
        javac_command = [
            "javac",
            "-g",
            "-d",
            str(script_dir),
            "-sourcepath",
            str(source_path),
            "-cp",
            classpath,
            "-proc:none",
            str(source_file),
        ]

        returncode, stdout, stderr = run_process(javac_command)
        if returncode != 0:
            raise Exception(
                f"Failed to compile '{source_file}': {stdout.decode()}\n{stderr.decode()}"
            )

clean_install

clean_install(ghidradir: Path, jars: Optional[List[str]] = None) -> None

Cleanup ghidra directory with the file we have installed

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/analyzer.py
12
13
14
15
16
17
18
19
20
21
22
23
def clean_install(ghidradir: Path, jars: Optional[List[str]] = None) -> None:
    """Cleanup ghidra directory with the file we have installed"""
    # Check if the GHIDRA_DIR path exists
    if not ghidradir.exists() or not ghidradir.is_dir():
        raise FileNotFoundError(f"The path specified in '{ghidradir}' does not exist")

    # Remove all jar files
    jars = jars or []
    for jar in jars:
        jarpath = ghidradir / "Ghidra" / "patch" / jar
        if jarpath.exists():
            jarpath.unlink()

create_bsim_database

create_bsim_database(ghidradir: Path, bsim_urls: list[str], config_template: str = 'medium_nosize', username: str = 'bsim_user', capture_output: bool = False) -> bool

Create an empty BSIM database

Enumerated Options: - large_32 | medium_32 | medium_64 | medium_cpool | medium_nosize

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/analyzer.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def create_bsim_database(
    ghidradir: Path,
    bsim_urls: list[str],
    config_template: str = "medium_nosize",
    username: str = "bsim_user",
    capture_output: bool = False,
) -> bool:
    """Create an empty BSIM database

    Enumerated Options:
        <config_template> - large_32 | medium_32 | medium_64 | medium_cpool | medium_nosize
    """

    # Override username java properties so bsim client
    # won't complain when connecting
    my_env: Dict[str, str] = environ.copy()
    my_env["_JAVA_OPTIONS"] = f"-Duser.name={username}"
    for bsim_url in bsim_urls:
        args: List[str] = [
            str(ghidradir / "support" / "bsim"),
            "createdatabase",
            bsim_url,
            config_template,
        ]
        # Run process without a timeout
        if run_process(args, env=my_env, capture_output=capture_output)[0] != 0:
            return False

    return True

get_ghidra_languages

get_ghidra_languages(ghidradir: Path) -> List[str]

Return the list of Ghidra supported languages

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/analyzer.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def get_ghidra_languages(ghidradir: Path) -> List[str]:
    """Return the list of Ghidra supported languages"""
    # Check if the GHIDRA_DIR path exists
    if not ghidradir.exists() or not ghidradir.is_dir():
        raise FileNotFoundError(f"The path specified in '{ghidradir}' does not exist")

    # Processors directory holds the list of available processors
    processors: Path = ghidradir / "Ghidra" / "Processors/"
    if not processors.exists() or not processors.is_dir():
        raise FileNotFoundError(f"The path specified in '{processors}' does not exist")

    # Iterate over all the processors
    languages_id: List[str] = []
    for processor in processors.iterdir():
        languages: Path = processor / "data" / "languages"
        if languages.exists() and languages.is_dir():
            # Iterate over all the languages definition
            for ldef in languages.rglob("*.ldefs"):
                # Parse XML language definition
                root: ElementTree[Element[str]] = parse(str(ldef))
                for lang in root.getroot():
                    languages_id.append(lang.attrib["id"])

    return languages_id

get_ghidra_version

get_ghidra_version(ghidradir: Path) -> Optional[str]

Return the version of a given ghidra installation or None on failure

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/analyzer.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def get_ghidra_version(ghidradir: Path) -> Optional[str]:
    """Return the version of a given ghidra installation or None on failure"""
    # Check if the GHIDRA_DIR path exists
    if not ghidradir.exists() or not ghidradir.is_dir():
        raise FileNotFoundError(f"The path specified in '{ghidradir}' does not exist")

    # Look for the application.properties file
    application: Path = ghidradir / "Ghidra" / "application.properties"
    if not application.exists() or not application.is_file():
        raise FileNotFoundError("Fail to find application.properties")

    # Parse the application.version entry
    with open(application, "r", encoding="utf-8") as fp:
        for line in fp:
            if line and line.startswith("application.version="):
                return line[20:].strip()

    # Fail to find the version
    return None

run_ghidra_script

run_ghidra_script(ghidradir: Path, script: Path, args: List[str], env: Optional[Dict[str, str]] = None, capture_output: bool = False, logfile: Optional[Path] = None) -> tuple[int, bytes, bytes]

Run a given Ghidra script

Source code in venv/lib/python3.12/site-packages/sighthouse/core/utils/analyzer.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def run_ghidra_script(
    ghidradir: Path,
    script: Path,
    args: List[str],
    env: Optional[Dict[str, str]] = None,
    capture_output: bool = False,
    logfile: Optional[Path] = None,
) -> tuple[int, bytes, bytes]:
    """Run a given Ghidra script"""

    # Check if the GHIDRA_DIR path exists
    if not ghidradir.exists() or not ghidradir.is_dir():
        raise FileNotFoundError(f"The path specified in '{ghidradir}' does not exist")

    if not script.exists() or not script.is_file():
        raise FileNotFoundError(f"The path specified in '{script}' does not exist")

    if script.suffix not in {".java", ".class"}:
        raise Exception(
            f"Invalid script file. Expecting a java or class file but got '{script.suffix}'"
        )

    script_path: Path = script.parent
    compiled_script: Path = script.with_suffix(".class")
    # Compile script if not already done
    if not compiled_script.exists():
        build_script(ghidradir, script_path)

    # Project is stored in temp directory
    with TemporaryDirectory() as tmpdirname:
        process_args: List[str] = [
            str(ghidradir / "support" / "analyzeHeadless"),
            tmpdirname,
            "tmpproj",
        ]
        # Log file need to be place here if defined
        if logfile is not None:
            process_args += ["-log", str(logfile.absolute())]

        process_args += [
            "-scriptPath",
            str(script_path),
            "-preScript",
            compiled_script.name,
        ]
        process_args += args

        # Run process without a timeout
        return run_process(process_args, env=env, capture_output=capture_output)