Skip to content

Pipeline API Reference

CommonWorker

CommonWorker(worker_id: str, worker_url: str, repo_url: str | None = None)

Common class for all the SightHouse pipeline workers

Parameters:

  • worker_id

    (str) –

    Unique identifier for the worker.

  • worker_url

    (str) –

    Celery broker/backend URL.

  • repo_url

    (str, default: None ) –

    Optional url for the repo.

Methods:

  • delete_file

    Deletes the specified file from either local filesystem or S3.

  • do_work

    Defines the actual processing behavior for a Job instance.

  • get_file

    Retrieves the content of the specified file from either local filesystem or S3.

  • get_sharefile

    Returns the path or URL for sharing the file.

  • log

    Log a message using worker's logger

  • pack_and_send_task

    Wrapper method that will pack the given files into an archive, upload it onto the

  • push_file

    Pushes or uploads a file to the specified path in either local filesystem or S3.

  • run

    Runs the Celery worker and registers the processing task.

  • send_task

    Sends the job to the next worker in the chain.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def __init__(
    self,
    worker_id: str,
    worker_url: str,
    repo_url: str | None = None,
):
    """Initialize the worker and its Celery app.

    Args:
        worker_id (str): Unique identifier for the worker.
        worker_url (str): Celery broker/backend URL.
        repo_url (str): Optional url for the repo.
    """
    self.__repo = Repo(repo_url, secure=False) if repo_url else None
    self._celery_app = CeleryWorker(
        worker_id,
        worker_url,
    )
    self._logger = get_logger("celery.task")

    # Register signal handlers
    signals.task_success.connect(self._on_task_success, weak=False)
    signals.task_failure.connect(self._on_task_failure, weak=False)

delete_file

delete_file(upload_path: str) -> None

Deletes the specified file from either local filesystem or S3.

Parameters:

  • upload_path

    (str) –

    The path of the file to be deleted.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
436
437
438
439
440
441
442
443
444
445
446
447
def delete_file(self, upload_path: str) -> None:
    """
    Deletes the specified file from either local filesystem or S3.

    Args:
        upload_path (str): The path of the file to be deleted.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self.__repo:
        self.__repo.delete_file(f"artifacts/{upload_path}")

do_work

do_work(job: Job) -> None

Defines the actual processing behavior for a Job instance.

Parameters:

  • job

    (Job) –

    The Job instance to process.

Raises:

  • NotImplementedError

    If not overridden in subclasses.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
602
603
604
605
606
607
608
609
610
611
def do_work(self, job: Job) -> None:
    """Defines the actual processing behavior for a Job instance.

    Args:
        job (Job): The Job instance to process.

    Raises:
        NotImplementedError: If not overridden in subclasses.
    """
    raise NotImplementedError("Subclasses must implement do_work()")

get_file

get_file(upload_path: str) -> Optional[bytes]

Retrieves the content of the specified file from either local filesystem or S3.

Parameters:

  • upload_path

    (str) –

    The path of the file to be retrieved.

Returns:

  • Optional[bytes]

    bytes | None: The content of the file if found, otherwise None.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def get_file(self, upload_path: str) -> Optional[bytes]:
    """
    Retrieves the content of the specified file from either local filesystem or S3.

    Args:
        upload_path (str): The path of the file to be retrieved.

    Returns:
        bytes | None: The content of the file if found, otherwise None.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self.__repo:
        return self.__repo.get_file(f"artifacts/{upload_path}")

    return b""

get_sharefile

get_sharefile(upload_path: str) -> Path | str

Returns the path or URL for sharing the file.

Parameters:

  • upload_path

    (str) –

    The path of the file to be shared.

Returns:

  • Path | str

    Path | str: A POSIX absolute path if local, a pre-signed URL if S3.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
def get_sharefile(self, upload_path: str) -> Path | str:
    """
    Returns the path or URL for sharing the file.

    Args:
        upload_path (str): The path of the file to be shared.

    Returns:
        Path | str: A POSIX absolute path if local, a pre-signed URL if S3.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self.__repo:
        return self.__repo.get_sharefile(f"artifacts/{upload_path}")

    return ""

log

log(message: str, *args, **kwargs) -> None

Log a message using worker's logger

Parameters:

  • message

    (str) –

    The message to log

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
409
410
411
412
413
414
415
def log(self, message: str, *args, **kwargs) -> None:
    """Log a message using worker's logger

    Args:
        message (str): The message to log
    """
    self._logger.info(message)

pack_and_send_task

pack_and_send_task(job: Job, files: Sequence[Union[Path, str]], name: Optional[str] = None, step: Optional[str] = None) -> None

Wrapper method that will pack the given files into an archive, upload it onto the worker repository and send the given Job to the next worker in the execution chain.

Parameters:

  • job

    (Job) –

    (Job): The job to update and send

  • files

    (Sequence[Union[Path, str]]) –

    List of path like to upload

  • name

    (Optional[str], default: None ) –

    Optional name for the packed files

  • step

    (Optional[str], default: None ) –

    Optional substep to target a specific step

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
def pack_and_send_task(
    self,
    job: Job,
    files: Sequence[Union[Path, str]],
    name: Optional[str] = None,
    step: Optional[str] = None,
) -> None:
    """
    Wrapper method that will pack the given files into an archive, upload it onto the
    worker repository and send the given Job to the next worker in the execution chain.

    Params:
        job: (Job): The job to update and send
        files (Sequence[Union[Path, str]]): List of path like to upload
        name (Optional[str]): Optional name for the packed files
        step (Optional[str]): Optional substep to target a specific step
    """
    if files == []:
        return

    self.log("Packing files")
    common_prefix, files = get_minimal_paths(files)
    back = Path.cwd()
    os.chdir(common_prefix)
    tar = create_tar(common_prefix, files).read()
    os.chdir(back)
    name = f"{name if name else get_hash(tar)}.tar.gz"

    if self.push_file(name, tar):
        self.log(f"Publish file: {name}")
        job.job_data.update({"file": name})
        if step:
            self.send_task(job, step=step)
        else:
            self.send_task(job)
    else:
        raise Exception("Fail to publish builder results")

push_file

push_file(upload_path: str, content: bytes) -> bool

Pushes or uploads a file to the specified path in either local filesystem or S3.

Parameters:

  • upload_path

    (str) –

    The path where the file should be uploaded.

  • content

    (bytes) –

    The content of the file to be uploaded.

Returns:

  • bool ( bool ) –

    True if successful, False otherwise.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
def push_file(self, upload_path: str, content: bytes) -> bool:
    """
    Pushes or uploads a file to the specified path in either local filesystem or S3.

    Args:
        upload_path (str): The path where the file should be uploaded.
        content (bytes): The content of the file to be uploaded.

    Returns:
        bool: True if successful, False otherwise.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self.__repo:
        return self.__repo.push_file(f"artifacts/{upload_path}", content)
    return False

run

run(concurrent_task: int = 1) -> None

Runs the Celery worker and registers the processing task.

Parameters:

  • concurrent_task

    (int, default: 1 ) –

    Number of concurrent tasks to process.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
def run(self, concurrent_task: int = 1) -> None:
    """Runs the Celery worker and registers the processing task.

    Args:
        concurrent_task (int): Number of concurrent tasks to process.
    """

    @self._celery_app.task(
        name="do_work",
        queue=self._celery_app.worker_metadata["id"],
        bind=True,
    )
    def __do_work(task: Task, job_dict: Dict[str, Any]) -> Dict[str, Any]:
        job = Job.from_dict(job_dict)
        job.job_metadata["id"] = str(task.request.id)
        dup = deepcopy(job.to_dict())

        try:
            # Store the future "from" value for the next hop
            job._next_from = str(task.request.id)

            self.do_work(job)

            dup["job_metadata"]["state"] = "success"
        except Exception as e:
            error = "".join(format_exception(e))
            self._logger.error(error)
            dup["job_metadata"].update({"state": "failed", "error": error})

        return dup

    self._celery_app.worker_main(
        [
            "--quiet",
            "worker",
            "-n",
            token_urlsafe(12),
            "-c",
            str(concurrent_task),
            "--loglevel=info",
            "-Q",
            self._celery_app.worker_metadata["id"],
        ]
    )

send_task

send_task(job: Job, step: Optional[str] = None) -> None

Sends the job to the next worker in the chain.

Parameters:

  • job

    (Job) –

    The Job instance to forward.

  • step

    (str, default: None ) –

    Optional step to target a specific worker.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
def send_task(self, job: Job, step: Optional[str] = None) -> None:
    """Sends the job to the next worker in the chain.

    Args:
        job (Job): The Job instance to forward.
        step (str): Optional step to target a specific worker.
    """
    # Copy job to avoid inconsistency if send_task is called more than once inside do_work
    dup = deepcopy(job)
    # Store the future "from" value for the next hop
    if dup._next_from is not None:
        dup.job_metadata["from"] = dup._next_from
        dup._next_from = None

    if step:
        substep = dup.execution_chain.get_step(step)
        if not substep:
            raise ValueError(f"Invalid step '{step}'")

        substeps = [substep]
    else:
        substeps = dup.execution_chain.advance_to_next_step() or []

    for s in substeps:
        dup.execution_chain.current_step = s.step
        self._logger.debug(
            "Sending task %s: %s", s, json.dumps(dup.to_dict(), indent=2)
        )
        self._celery_app.send_task(
            "do_work",
            queue=s.package,
            kwargs={"job_dict": dup.to_dict()},
        )

Scrapper

Scrapper(worker_id: str, worker_url: str, repo_url: str | None = None)

Bases: CommonWorker

SightHouse scrapper worker

Parameters:

  • worker_id

    (str) –

    Unique identifier for the worker.

  • worker_url

    (str) –

    Celery broker/backend URL.

  • repo_url

    (str, default: None ) –

    Optional url for the repo.

Methods:

  • delete_file

    Deletes the specified file from either local filesystem or S3.

  • do_work

    Defines the actual processing behavior for a Job instance.

  • get_file

    Retrieves the content of the specified file from either local filesystem or S3.

  • get_sharefile

    Returns the path or URL for sharing the file.

  • log

    Log a message using worker's logger

  • pack_and_send_task

    Wrapper method that will pack the given files into an archive, upload it onto the

  • push_file

    Pushes or uploads a file to the specified path in either local filesystem or S3.

  • run

    Runs the Celery worker and registers the processing task.

  • send_task

    Sends the job to the next worker in the chain.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def __init__(
    self,
    worker_id: str,
    worker_url: str,
    repo_url: str | None = None,
):
    """Initialize the worker and its Celery app.

    Args:
        worker_id (str): Unique identifier for the worker.
        worker_url (str): Celery broker/backend URL.
        repo_url (str): Optional url for the repo.
    """
    self.__repo = Repo(repo_url, secure=False) if repo_url else None
    self._celery_app = CeleryWorker(
        worker_id,
        worker_url,
    )
    self._logger = get_logger("celery.task")

    # Register signal handlers
    signals.task_success.connect(self._on_task_success, weak=False)
    signals.task_failure.connect(self._on_task_failure, weak=False)

delete_file

delete_file(upload_path: str) -> None

Deletes the specified file from either local filesystem or S3.

Parameters:

  • upload_path

    (str) –

    The path of the file to be deleted.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
436
437
438
439
440
441
442
443
444
445
446
447
def delete_file(self, upload_path: str) -> None:
    """
    Deletes the specified file from either local filesystem or S3.

    Args:
        upload_path (str): The path of the file to be deleted.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self.__repo:
        self.__repo.delete_file(f"artifacts/{upload_path}")

do_work

do_work(job: Job) -> None

Defines the actual processing behavior for a Job instance.

Parameters:

  • job

    (Job) –

    The Job instance to process.

Raises:

  • NotImplementedError

    If not overridden in subclasses.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
602
603
604
605
606
607
608
609
610
611
def do_work(self, job: Job) -> None:
    """Defines the actual processing behavior for a Job instance.

    Args:
        job (Job): The Job instance to process.

    Raises:
        NotImplementedError: If not overridden in subclasses.
    """
    raise NotImplementedError("Subclasses must implement do_work()")

get_file

get_file(upload_path: str) -> Optional[bytes]

Retrieves the content of the specified file from either local filesystem or S3.

Parameters:

  • upload_path

    (str) –

    The path of the file to be retrieved.

Returns:

  • Optional[bytes]

    bytes | None: The content of the file if found, otherwise None.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def get_file(self, upload_path: str) -> Optional[bytes]:
    """
    Retrieves the content of the specified file from either local filesystem or S3.

    Args:
        upload_path (str): The path of the file to be retrieved.

    Returns:
        bytes | None: The content of the file if found, otherwise None.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self.__repo:
        return self.__repo.get_file(f"artifacts/{upload_path}")

    return b""

get_sharefile

get_sharefile(upload_path: str) -> Path | str

Returns the path or URL for sharing the file.

Parameters:

  • upload_path

    (str) –

    The path of the file to be shared.

Returns:

  • Path | str

    Path | str: A POSIX absolute path if local, a pre-signed URL if S3.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
def get_sharefile(self, upload_path: str) -> Path | str:
    """
    Returns the path or URL for sharing the file.

    Args:
        upload_path (str): The path of the file to be shared.

    Returns:
        Path | str: A POSIX absolute path if local, a pre-signed URL if S3.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self.__repo:
        return self.__repo.get_sharefile(f"artifacts/{upload_path}")

    return ""

log

log(message: str, *args, **kwargs) -> None

Log a message using worker's logger

Parameters:

  • message

    (str) –

    The message to log

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
409
410
411
412
413
414
415
def log(self, message: str, *args, **kwargs) -> None:
    """Log a message using worker's logger

    Args:
        message (str): The message to log
    """
    self._logger.info(message)

pack_and_send_task

pack_and_send_task(job: Job, files: Sequence[Union[Path, str]], name: Optional[str] = None, step: Optional[str] = None) -> None

Wrapper method that will pack the given files into an archive, upload it onto the worker repository and send the given Job to the next worker in the execution chain.

Parameters:

  • job

    (Job) –

    (Job): The job to update and send

  • files

    (Sequence[Union[Path, str]]) –

    List of path like to upload

  • name

    (Optional[str], default: None ) –

    Optional name for the packed files

  • step

    (Optional[str], default: None ) –

    Optional substep to target a specific step

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
def pack_and_send_task(
    self,
    job: Job,
    files: Sequence[Union[Path, str]],
    name: Optional[str] = None,
    step: Optional[str] = None,
) -> None:
    """
    Wrapper method that will pack the given files into an archive, upload it onto the
    worker repository and send the given Job to the next worker in the execution chain.

    Params:
        job: (Job): The job to update and send
        files (Sequence[Union[Path, str]]): List of path like to upload
        name (Optional[str]): Optional name for the packed files
        step (Optional[str]): Optional substep to target a specific step
    """
    if files == []:
        return

    self.log("Packing files")
    common_prefix, files = get_minimal_paths(files)
    back = Path.cwd()
    os.chdir(common_prefix)
    tar = create_tar(common_prefix, files).read()
    os.chdir(back)
    name = f"{name if name else get_hash(tar)}.tar.gz"

    if self.push_file(name, tar):
        self.log(f"Publish file: {name}")
        job.job_data.update({"file": name})
        if step:
            self.send_task(job, step=step)
        else:
            self.send_task(job)
    else:
        raise Exception("Fail to publish builder results")

push_file

push_file(upload_path: str, content: bytes) -> bool

Pushes or uploads a file to the specified path in either local filesystem or S3.

Parameters:

  • upload_path

    (str) –

    The path where the file should be uploaded.

  • content

    (bytes) –

    The content of the file to be uploaded.

Returns:

  • bool ( bool ) –

    True if successful, False otherwise.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
def push_file(self, upload_path: str, content: bytes) -> bool:
    """
    Pushes or uploads a file to the specified path in either local filesystem or S3.

    Args:
        upload_path (str): The path where the file should be uploaded.
        content (bytes): The content of the file to be uploaded.

    Returns:
        bool: True if successful, False otherwise.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self.__repo:
        return self.__repo.push_file(f"artifacts/{upload_path}", content)
    return False

run

run(concurrent_task: int = 1) -> None

Runs the Celery worker and registers the processing task.

Parameters:

  • concurrent_task

    (int, default: 1 ) –

    Number of concurrent tasks to process.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
def run(self, concurrent_task: int = 1) -> None:
    """Runs the Celery worker and registers the processing task.

    Args:
        concurrent_task (int): Number of concurrent tasks to process.
    """

    @self._celery_app.task(
        name="do_work",
        queue=self._celery_app.worker_metadata["id"],
        bind=True,
    )
    def __do_work(task: Task, job_dict: Dict[str, Any]) -> Dict[str, Any]:
        job = Job.from_dict(job_dict)
        job.job_metadata["id"] = str(task.request.id)
        dup = deepcopy(job.to_dict())

        try:
            # Store the future "from" value for the next hop
            job._next_from = str(task.request.id)

            self.do_work(job)

            dup["job_metadata"]["state"] = "success"
        except Exception as e:
            error = "".join(format_exception(e))
            self._logger.error(error)
            dup["job_metadata"].update({"state": "failed", "error": error})

        return dup

    self._celery_app.worker_main(
        [
            "--quiet",
            "worker",
            "-n",
            token_urlsafe(12),
            "-c",
            str(concurrent_task),
            "--loglevel=info",
            "-Q",
            self._celery_app.worker_metadata["id"],
        ]
    )

send_task

send_task(job: Job, step: Optional[str] = None) -> None

Sends the job to the next worker in the chain.

Parameters:

  • job

    (Job) –

    The Job instance to forward.

  • step

    (str, default: None ) –

    Optional step to target a specific worker.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
def send_task(self, job: Job, step: Optional[str] = None) -> None:
    """Sends the job to the next worker in the chain.

    Args:
        job (Job): The Job instance to forward.
        step (str): Optional step to target a specific worker.
    """
    # Copy job to avoid inconsistency if send_task is called more than once inside do_work
    dup = deepcopy(job)
    # Store the future "from" value for the next hop
    if dup._next_from is not None:
        dup.job_metadata["from"] = dup._next_from
        dup._next_from = None

    if step:
        substep = dup.execution_chain.get_step(step)
        if not substep:
            raise ValueError(f"Invalid step '{step}'")

        substeps = [substep]
    else:
        substeps = dup.execution_chain.advance_to_next_step() or []

    for s in substeps:
        dup.execution_chain.current_step = s.step
        self._logger.debug(
            "Sending task %s: %s", s, json.dumps(dup.to_dict(), indent=2)
        )
        self._celery_app.send_task(
            "do_work",
            queue=s.package,
            kwargs={"job_dict": dup.to_dict()},
        )

Preprocessor

Preprocessor(worker_id: str, worker_url: str, repo_url: str | None = None)

Bases: CommonWorker

SightHouse preprocessor worker

Parameters:

  • worker_id

    (str) –

    Unique identifier for the worker.

  • worker_url

    (str) –

    Celery broker/backend URL.

  • repo_url

    (str, default: None ) –

    Optional url for the repo.

Methods:

  • delete_file

    Deletes the specified file from either local filesystem or S3.

  • do_work

    Defines the actual processing behavior for a Job instance.

  • get_file

    Retrieves the content of the specified file from either local filesystem or S3.

  • get_sharefile

    Returns the path or URL for sharing the file.

  • log

    Log a message using worker's logger

  • pack_and_send_task

    Wrapper method that will pack the given files into an archive, upload it onto the

  • push_file

    Pushes or uploads a file to the specified path in either local filesystem or S3.

  • run

    Runs the Celery worker and registers the processing task.

  • send_task

    Sends the job to the next worker in the chain.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def __init__(
    self,
    worker_id: str,
    worker_url: str,
    repo_url: str | None = None,
):
    """Initialize the worker and its Celery app.

    Args:
        worker_id (str): Unique identifier for the worker.
        worker_url (str): Celery broker/backend URL.
        repo_url (str): Optional url for the repo.
    """
    self.__repo = Repo(repo_url, secure=False) if repo_url else None
    self._celery_app = CeleryWorker(
        worker_id,
        worker_url,
    )
    self._logger = get_logger("celery.task")

    # Register signal handlers
    signals.task_success.connect(self._on_task_success, weak=False)
    signals.task_failure.connect(self._on_task_failure, weak=False)

delete_file

delete_file(upload_path: str) -> None

Deletes the specified file from either local filesystem or S3.

Parameters:

  • upload_path

    (str) –

    The path of the file to be deleted.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
436
437
438
439
440
441
442
443
444
445
446
447
def delete_file(self, upload_path: str) -> None:
    """
    Deletes the specified file from either local filesystem or S3.

    Args:
        upload_path (str): The path of the file to be deleted.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self.__repo:
        self.__repo.delete_file(f"artifacts/{upload_path}")

do_work

do_work(job: Job) -> None

Defines the actual processing behavior for a Job instance.

Parameters:

  • job

    (Job) –

    The Job instance to process.

Raises:

  • NotImplementedError

    If not overridden in subclasses.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
602
603
604
605
606
607
608
609
610
611
def do_work(self, job: Job) -> None:
    """Defines the actual processing behavior for a Job instance.

    Args:
        job (Job): The Job instance to process.

    Raises:
        NotImplementedError: If not overridden in subclasses.
    """
    raise NotImplementedError("Subclasses must implement do_work()")

get_file

get_file(upload_path: str) -> Optional[bytes]

Retrieves the content of the specified file from either local filesystem or S3.

Parameters:

  • upload_path

    (str) –

    The path of the file to be retrieved.

Returns:

  • Optional[bytes]

    bytes | None: The content of the file if found, otherwise None.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def get_file(self, upload_path: str) -> Optional[bytes]:
    """
    Retrieves the content of the specified file from either local filesystem or S3.

    Args:
        upload_path (str): The path of the file to be retrieved.

    Returns:
        bytes | None: The content of the file if found, otherwise None.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self.__repo:
        return self.__repo.get_file(f"artifacts/{upload_path}")

    return b""

get_sharefile

get_sharefile(upload_path: str) -> Path | str

Returns the path or URL for sharing the file.

Parameters:

  • upload_path

    (str) –

    The path of the file to be shared.

Returns:

  • Path | str

    Path | str: A POSIX absolute path if local, a pre-signed URL if S3.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
def get_sharefile(self, upload_path: str) -> Path | str:
    """
    Returns the path or URL for sharing the file.

    Args:
        upload_path (str): The path of the file to be shared.

    Returns:
        Path | str: A POSIX absolute path if local, a pre-signed URL if S3.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self.__repo:
        return self.__repo.get_sharefile(f"artifacts/{upload_path}")

    return ""

log

log(message: str, *args, **kwargs) -> None

Log a message using worker's logger

Parameters:

  • message

    (str) –

    The message to log

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
409
410
411
412
413
414
415
def log(self, message: str, *args, **kwargs) -> None:
    """Log a message using worker's logger

    Args:
        message (str): The message to log
    """
    self._logger.info(message)

pack_and_send_task

pack_and_send_task(job: Job, files: Sequence[Union[Path, str]], name: Optional[str] = None, step: Optional[str] = None) -> None

Wrapper method that will pack the given files into an archive, upload it onto the worker repository and send the given Job to the next worker in the execution chain.

Parameters:

  • job

    (Job) –

    (Job): The job to update and send

  • files

    (Sequence[Union[Path, str]]) –

    List of path like to upload

  • name

    (Optional[str], default: None ) –

    Optional name for the packed files

  • step

    (Optional[str], default: None ) –

    Optional substep to target a specific step

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
def pack_and_send_task(
    self,
    job: Job,
    files: Sequence[Union[Path, str]],
    name: Optional[str] = None,
    step: Optional[str] = None,
) -> None:
    """
    Wrapper method that will pack the given files into an archive, upload it onto the
    worker repository and send the given Job to the next worker in the execution chain.

    Params:
        job: (Job): The job to update and send
        files (Sequence[Union[Path, str]]): List of path like to upload
        name (Optional[str]): Optional name for the packed files
        step (Optional[str]): Optional substep to target a specific step
    """
    if files == []:
        return

    self.log("Packing files")
    common_prefix, files = get_minimal_paths(files)
    back = Path.cwd()
    os.chdir(common_prefix)
    tar = create_tar(common_prefix, files).read()
    os.chdir(back)
    name = f"{name if name else get_hash(tar)}.tar.gz"

    if self.push_file(name, tar):
        self.log(f"Publish file: {name}")
        job.job_data.update({"file": name})
        if step:
            self.send_task(job, step=step)
        else:
            self.send_task(job)
    else:
        raise Exception("Fail to publish builder results")

push_file

push_file(upload_path: str, content: bytes) -> bool

Pushes or uploads a file to the specified path in either local filesystem or S3.

Parameters:

  • upload_path

    (str) –

    The path where the file should be uploaded.

  • content

    (bytes) –

    The content of the file to be uploaded.

Returns:

  • bool ( bool ) –

    True if successful, False otherwise.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
def push_file(self, upload_path: str, content: bytes) -> bool:
    """
    Pushes or uploads a file to the specified path in either local filesystem or S3.

    Args:
        upload_path (str): The path where the file should be uploaded.
        content (bytes): The content of the file to be uploaded.

    Returns:
        bool: True if successful, False otherwise.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self.__repo:
        return self.__repo.push_file(f"artifacts/{upload_path}", content)
    return False

run

run(concurrent_task: int = 1) -> None

Runs the Celery worker and registers the processing task.

Parameters:

  • concurrent_task

    (int, default: 1 ) –

    Number of concurrent tasks to process.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
def run(self, concurrent_task: int = 1) -> None:
    """Runs the Celery worker and registers the processing task.

    Args:
        concurrent_task (int): Number of concurrent tasks to process.
    """

    @self._celery_app.task(
        name="do_work",
        queue=self._celery_app.worker_metadata["id"],
        bind=True,
    )
    def __do_work(task: Task, job_dict: Dict[str, Any]) -> Dict[str, Any]:
        job = Job.from_dict(job_dict)
        job.job_metadata["id"] = str(task.request.id)
        dup = deepcopy(job.to_dict())

        try:
            # Store the future "from" value for the next hop
            job._next_from = str(task.request.id)

            self.do_work(job)

            dup["job_metadata"]["state"] = "success"
        except Exception as e:
            error = "".join(format_exception(e))
            self._logger.error(error)
            dup["job_metadata"].update({"state": "failed", "error": error})

        return dup

    self._celery_app.worker_main(
        [
            "--quiet",
            "worker",
            "-n",
            token_urlsafe(12),
            "-c",
            str(concurrent_task),
            "--loglevel=info",
            "-Q",
            self._celery_app.worker_metadata["id"],
        ]
    )

send_task

send_task(job: Job, step: Optional[str] = None) -> None

Sends the job to the next worker in the chain.

Parameters:

  • job

    (Job) –

    The Job instance to forward.

  • step

    (str, default: None ) –

    Optional step to target a specific worker.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
def send_task(self, job: Job, step: Optional[str] = None) -> None:
    """Sends the job to the next worker in the chain.

    Args:
        job (Job): The Job instance to forward.
        step (str): Optional step to target a specific worker.
    """
    # Copy job to avoid inconsistency if send_task is called more than once inside do_work
    dup = deepcopy(job)
    # Store the future "from" value for the next hop
    if dup._next_from is not None:
        dup.job_metadata["from"] = dup._next_from
        dup._next_from = None

    if step:
        substep = dup.execution_chain.get_step(step)
        if not substep:
            raise ValueError(f"Invalid step '{step}'")

        substeps = [substep]
    else:
        substeps = dup.execution_chain.advance_to_next_step() or []

    for s in substeps:
        dup.execution_chain.current_step = s.step
        self._logger.debug(
            "Sending task %s: %s", s, json.dumps(dup.to_dict(), indent=2)
        )
        self._celery_app.send_task(
            "do_work",
            queue=s.package,
            kwargs={"job_dict": dup.to_dict()},
        )

Compiler

Compiler(worker_id: str, worker_url: str, repo_url: str | None = None)

Bases: CommonWorker

SightHouse compiler worker

Parameters:

  • worker_id

    (str) –

    Unique identifier for the worker.

  • worker_url

    (str) –

    Celery broker/backend URL.

  • repo_url

    (str, default: None ) –

    Optional url for the repo.

Methods:

  • delete_file

    Deletes the specified file from either local filesystem or S3.

  • do_work

    Defines the actual processing behavior for a Job instance.

  • get_file

    Retrieves the content of the specified file from either local filesystem or S3.

  • get_sharefile

    Returns the path or URL for sharing the file.

  • log

    Log a message using worker's logger

  • pack_and_send_task

    Wrapper method that will pack the given files into an archive, upload it onto the

  • push_file

    Pushes or uploads a file to the specified path in either local filesystem or S3.

  • run

    Runs the Celery worker and registers the processing task.

  • send_task

    Sends the job to the next worker in the chain.

  • validate_compiler_variants

    Validate compiler_variants structure.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def __init__(
    self,
    worker_id: str,
    worker_url: str,
    repo_url: str | None = None,
):
    """Initialize the worker and its Celery app.

    Args:
        worker_id (str): Unique identifier for the worker.
        worker_url (str): Celery broker/backend URL.
        repo_url (str): Optional url for the repo.
    """
    self.__repo = Repo(repo_url, secure=False) if repo_url else None
    self._celery_app = CeleryWorker(
        worker_id,
        worker_url,
    )
    self._logger = get_logger("celery.task")

    # Register signal handlers
    signals.task_success.connect(self._on_task_success, weak=False)
    signals.task_failure.connect(self._on_task_failure, weak=False)

delete_file

delete_file(upload_path: str) -> None

Deletes the specified file from either local filesystem or S3.

Parameters:

  • upload_path

    (str) –

    The path of the file to be deleted.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
436
437
438
439
440
441
442
443
444
445
446
447
def delete_file(self, upload_path: str) -> None:
    """
    Deletes the specified file from either local filesystem or S3.

    Args:
        upload_path (str): The path of the file to be deleted.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self.__repo:
        self.__repo.delete_file(f"artifacts/{upload_path}")

do_work

do_work(job: Job) -> None

Defines the actual processing behavior for a Job instance.

Parameters:

  • job

    (Job) –

    The Job instance to process.

Raises:

  • NotImplementedError

    If not overridden in subclasses.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
602
603
604
605
606
607
608
609
610
611
def do_work(self, job: Job) -> None:
    """Defines the actual processing behavior for a Job instance.

    Args:
        job (Job): The Job instance to process.

    Raises:
        NotImplementedError: If not overridden in subclasses.
    """
    raise NotImplementedError("Subclasses must implement do_work()")

get_file

get_file(upload_path: str) -> Optional[bytes]

Retrieves the content of the specified file from either local filesystem or S3.

Parameters:

  • upload_path

    (str) –

    The path of the file to be retrieved.

Returns:

  • Optional[bytes]

    bytes | None: The content of the file if found, otherwise None.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def get_file(self, upload_path: str) -> Optional[bytes]:
    """
    Retrieves the content of the specified file from either local filesystem or S3.

    Args:
        upload_path (str): The path of the file to be retrieved.

    Returns:
        bytes | None: The content of the file if found, otherwise None.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self.__repo:
        return self.__repo.get_file(f"artifacts/{upload_path}")

    return b""

get_sharefile

get_sharefile(upload_path: str) -> Path | str

Returns the path or URL for sharing the file.

Parameters:

  • upload_path

    (str) –

    The path of the file to be shared.

Returns:

  • Path | str

    Path | str: A POSIX absolute path if local, a pre-signed URL if S3.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
def get_sharefile(self, upload_path: str) -> Path | str:
    """
    Returns the path or URL for sharing the file.

    Args:
        upload_path (str): The path of the file to be shared.

    Returns:
        Path | str: A POSIX absolute path if local, a pre-signed URL if S3.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self.__repo:
        return self.__repo.get_sharefile(f"artifacts/{upload_path}")

    return ""

log

log(message: str, *args, **kwargs) -> None

Log a message using worker's logger

Parameters:

  • message

    (str) –

    The message to log

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
409
410
411
412
413
414
415
def log(self, message: str, *args, **kwargs) -> None:
    """Log a message using worker's logger

    Args:
        message (str): The message to log
    """
    self._logger.info(message)

pack_and_send_task

pack_and_send_task(job: Job, files: Sequence[Union[Path, str]], metadata: List[Tuple[str, str]], name: Optional[str] = None, step: Optional[str] = None) -> None

Wrapper method that will pack the given files into an archive, upload it onto the worker repository and send the given Job to the next worker in the execution chain.

Parameters:

  • job

    (Job) –

    (Job): The job to update and send

  • files

    (Sequence[Union[Path, str]]) –

    List of path like to upload

  • metadata

    (List[Tuple[str, str]]) –

    List of metadata to send to the analyzer

  • name

    (Optional[str], default: None ) –

    Optional name for the packed files

  • step

    (Optional[str], default: None ) –

    Optional substep to target a specific step

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
def pack_and_send_task(  # type: ignore[override]
    self,
    job: Job,
    files: Sequence[Union[Path, str]],
    metadata: List[Tuple[str, str]],
    name: Optional[str] = None,
    step: Optional[str] = None,
) -> None:
    """
    Wrapper method that will pack the given files into an archive, upload it onto the
    worker repository and send the given Job to the next worker in the execution chain.

    Params:
        job: (Job): The job to update and send
        files (Sequence[Union[Path, str]]): List of path like to upload
        metadata (List[Tuple[str, str]]): List of metadata to send to the analyzer
        name (Optional[str]): Optional name for the packed files
        step (Optional[str]): Optional substep to target a specific step
    """
    job.job_data.update({"metadata": metadata})
    super().pack_and_send_task(job, files, name=name, step=step)

push_file

push_file(upload_path: str, content: bytes) -> bool

Pushes or uploads a file to the specified path in either local filesystem or S3.

Parameters:

  • upload_path

    (str) –

    The path where the file should be uploaded.

  • content

    (bytes) –

    The content of the file to be uploaded.

Returns:

  • bool ( bool ) –

    True if successful, False otherwise.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
def push_file(self, upload_path: str, content: bytes) -> bool:
    """
    Pushes or uploads a file to the specified path in either local filesystem or S3.

    Args:
        upload_path (str): The path where the file should be uploaded.
        content (bytes): The content of the file to be uploaded.

    Returns:
        bool: True if successful, False otherwise.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self.__repo:
        return self.__repo.push_file(f"artifacts/{upload_path}", content)
    return False

run

run(concurrent_task: int = 1) -> None

Runs the Celery worker and registers the processing task.

Parameters:

  • concurrent_task

    (int, default: 1 ) –

    Number of concurrent tasks to process.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
def run(self, concurrent_task: int = 1) -> None:
    """Runs the Celery worker and registers the processing task.

    Args:
        concurrent_task (int): Number of concurrent tasks to process.
    """

    @self._celery_app.task(
        name="do_work",
        queue=self._celery_app.worker_metadata["id"],
        bind=True,
    )
    def __do_work(task: Task, job_dict: Dict[str, Any]) -> Dict[str, Any]:
        job = Job.from_dict(job_dict)
        job.job_metadata["id"] = str(task.request.id)
        dup = deepcopy(job.to_dict())

        try:
            # Store the future "from" value for the next hop
            job._next_from = str(task.request.id)

            self.do_work(job)

            dup["job_metadata"]["state"] = "success"
        except Exception as e:
            error = "".join(format_exception(e))
            self._logger.error(error)
            dup["job_metadata"].update({"state": "failed", "error": error})

        return dup

    self._celery_app.worker_main(
        [
            "--quiet",
            "worker",
            "-n",
            token_urlsafe(12),
            "-c",
            str(concurrent_task),
            "--loglevel=info",
            "-Q",
            self._celery_app.worker_metadata["id"],
        ]
    )

send_task

send_task(job: Job, step: Optional[str] = None) -> None

Sends the job to the next worker in the chain.

Parameters:

  • job

    (Job) –

    The Job instance to forward.

  • step

    (str, default: None ) –

    Optional step to target a specific worker.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
def send_task(self, job: Job, step: Optional[str] = None) -> None:
    """Sends the job to the next worker in the chain.

    Args:
        job (Job): The Job instance to forward.
        step (str): Optional step to target a specific worker.
    """
    # Copy job to avoid inconsistency if send_task is called more than once inside do_work
    dup = deepcopy(job)
    # Store the future "from" value for the next hop
    if dup._next_from is not None:
        dup.job_metadata["from"] = dup._next_from
        dup._next_from = None

    if step:
        substep = dup.execution_chain.get_step(step)
        if not substep:
            raise ValueError(f"Invalid step '{step}'")

        substeps = [substep]
    else:
        substeps = dup.execution_chain.advance_to_next_step() or []

    for s in substeps:
        dup.execution_chain.current_step = s.step
        self._logger.debug(
            "Sending task %s: %s", s, json.dumps(dup.to_dict(), indent=2)
        )
        self._celery_app.send_task(
            "do_work",
            queue=s.package,
            kwargs={"job_dict": dup.to_dict()},
        )

validate_compiler_variants staticmethod

validate_compiler_variants(data: Dict[str, Dict[str, Any]]) -> List[Tuple[str, Dict[str, str]]]

Validate compiler_variants structure.

Parameters:

  • data

    (dict) –

    Dictionary of compiler_variants to validate.

Returns:

  • List[Tuple[str, Dict[str, str]]]

    list[tuple[str, str]]: The list of compiler_variants.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
@staticmethod
def validate_compiler_variants(
    data: Dict[str, Dict[str, Any]],
) -> List[Tuple[str, Dict[str, str]]]:
    """
    Validate compiler_variants structure.

    Args:
        data (dict): Dictionary of compiler_variants to validate.

    Returns:
        list[tuple[str, str]]: The list of compiler_variants.

    """
    # Check top-level key
    if not isinstance(data, dict) or "compiler_variants" not in data:
        raise ValueError(
            "YAML must contain a 'compiler_variants' list at the top level."
        )

    compiler_variants = data["compiler_variants"]
    if not isinstance(compiler_variants, dict):
        raise ValueError("'compiler_variants' must be a dict.")

    result: List[Tuple[str, Dict[str, str]]] = []
    # Validate each entry
    for idx, name in enumerate(compiler_variants, start=1):
        variant = compiler_variants[name]
        if not isinstance(variant, dict):
            raise ValueError(f"Variant #{idx} is not a dictionary.")

        required_fields = {"cc", "cflags"}
        missing = required_fields - set(variant.keys())
        if missing:
            raise ValueError(
                f"Variant #{idx} is Missing required fields: {missing}."
            )

        for key in required_fields:
            if not isinstance(variant[key], str):
                raise ValueError(f"Variant #{idx} must contain a key/value string")

        result.append((name, variant))

    return result

Analyzer

Analyzer(worker_id: str, worker_url: str, repo_url: str | None = None)

Bases: CommonWorker

SightHouse analyzer worker

Parameters:

  • worker_id

    (str) –

    Unique identifier for the worker.

  • worker_url

    (str) –

    Celery broker/backend URL.

  • repo_url

    (str, default: None ) –

    Optional url for the repo.

Methods:

  • delete_file

    Deletes the specified file from either local filesystem or S3.

  • do_work

    Defines the actual processing behavior for a Job instance.

  • get_file

    Retrieves the content of the specified file from either local filesystem or S3.

  • get_sharefile

    Returns the path or URL for sharing the file.

  • log

    Log a message using worker's logger

  • pack_and_send_task

    Wrapper method that will pack the given files into an archive, upload it onto the

  • push_file

    Pushes or uploads a file to the specified path in either local filesystem or S3.

  • run

    Runs the Celery worker and registers the processing task.

  • send_task

    Sends the job to the next worker in the chain.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def __init__(
    self,
    worker_id: str,
    worker_url: str,
    repo_url: str | None = None,
):
    """Initialize the worker and its Celery app.

    Args:
        worker_id (str): Unique identifier for the worker.
        worker_url (str): Celery broker/backend URL.
        repo_url (str): Optional url for the repo.
    """
    self.__repo = Repo(repo_url, secure=False) if repo_url else None
    self._celery_app = CeleryWorker(
        worker_id,
        worker_url,
    )
    self._logger = get_logger("celery.task")

    # Register signal handlers
    signals.task_success.connect(self._on_task_success, weak=False)
    signals.task_failure.connect(self._on_task_failure, weak=False)

delete_file

delete_file(upload_path: str) -> None

Deletes the specified file from either local filesystem or S3.

Parameters:

  • upload_path

    (str) –

    The path of the file to be deleted.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
436
437
438
439
440
441
442
443
444
445
446
447
def delete_file(self, upload_path: str) -> None:
    """
    Deletes the specified file from either local filesystem or S3.

    Args:
        upload_path (str): The path of the file to be deleted.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self.__repo:
        self.__repo.delete_file(f"artifacts/{upload_path}")

do_work

do_work(job: Job) -> None

Defines the actual processing behavior for a Job instance.

Parameters:

  • job

    (Job) –

    The Job instance to process.

Raises:

  • NotImplementedError

    If not overridden in subclasses.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
602
603
604
605
606
607
608
609
610
611
def do_work(self, job: Job) -> None:
    """Defines the actual processing behavior for a Job instance.

    Args:
        job (Job): The Job instance to process.

    Raises:
        NotImplementedError: If not overridden in subclasses.
    """
    raise NotImplementedError("Subclasses must implement do_work()")

get_file

get_file(upload_path: str) -> Optional[bytes]

Retrieves the content of the specified file from either local filesystem or S3.

Parameters:

  • upload_path

    (str) –

    The path of the file to be retrieved.

Returns:

  • Optional[bytes]

    bytes | None: The content of the file if found, otherwise None.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def get_file(self, upload_path: str) -> Optional[bytes]:
    """
    Retrieves the content of the specified file from either local filesystem or S3.

    Args:
        upload_path (str): The path of the file to be retrieved.

    Returns:
        bytes | None: The content of the file if found, otherwise None.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self.__repo:
        return self.__repo.get_file(f"artifacts/{upload_path}")

    return b""

get_sharefile

get_sharefile(upload_path: str) -> Path | str

Returns the path or URL for sharing the file.

Parameters:

  • upload_path

    (str) –

    The path of the file to be shared.

Returns:

  • Path | str

    Path | str: A POSIX absolute path if local, a pre-signed URL if S3.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
def get_sharefile(self, upload_path: str) -> Path | str:
    """
    Returns the path or URL for sharing the file.

    Args:
        upload_path (str): The path of the file to be shared.

    Returns:
        Path | str: A POSIX absolute path if local, a pre-signed URL if S3.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self.__repo:
        return self.__repo.get_sharefile(f"artifacts/{upload_path}")

    return ""

log

log(message: str, *args, **kwargs) -> None

Log a message using worker's logger

Parameters:

  • message

    (str) –

    The message to log

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
409
410
411
412
413
414
415
def log(self, message: str, *args, **kwargs) -> None:
    """Log a message using worker's logger

    Args:
        message (str): The message to log
    """
    self._logger.info(message)

pack_and_send_task

pack_and_send_task(job: Job, files: Sequence[Union[Path, str]], name: Optional[str] = None, step: Optional[str] = None) -> None

Wrapper method that will pack the given files into an archive, upload it onto the worker repository and send the given Job to the next worker in the execution chain.

Parameters:

  • job

    (Job) –

    (Job): The job to update and send

  • files

    (Sequence[Union[Path, str]]) –

    List of path like to upload

  • name

    (Optional[str], default: None ) –

    Optional name for the packed files

  • step

    (Optional[str], default: None ) –

    Optional substep to target a specific step

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
def pack_and_send_task(
    self,
    job: Job,
    files: Sequence[Union[Path, str]],
    name: Optional[str] = None,
    step: Optional[str] = None,
) -> None:
    """
    Wrapper method that will pack the given files into an archive, upload it onto the
    worker repository and send the given Job to the next worker in the execution chain.

    Params:
        job: (Job): The job to update and send
        files (Sequence[Union[Path, str]]): List of path like to upload
        name (Optional[str]): Optional name for the packed files
        step (Optional[str]): Optional substep to target a specific step
    """
    if files == []:
        return

    self.log("Packing files")
    common_prefix, files = get_minimal_paths(files)
    back = Path.cwd()
    os.chdir(common_prefix)
    tar = create_tar(common_prefix, files).read()
    os.chdir(back)
    name = f"{name if name else get_hash(tar)}.tar.gz"

    if self.push_file(name, tar):
        self.log(f"Publish file: {name}")
        job.job_data.update({"file": name})
        if step:
            self.send_task(job, step=step)
        else:
            self.send_task(job)
    else:
        raise Exception("Fail to publish builder results")

push_file

push_file(upload_path: str, content: bytes) -> bool

Pushes or uploads a file to the specified path in either local filesystem or S3.

Parameters:

  • upload_path

    (str) –

    The path where the file should be uploaded.

  • content

    (bytes) –

    The content of the file to be uploaded.

Returns:

  • bool ( bool ) –

    True if successful, False otherwise.

Raises:

  • ValueError

    If URI scheme is unsupported.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
def push_file(self, upload_path: str, content: bytes) -> bool:
    """
    Pushes or uploads a file to the specified path in either local filesystem or S3.

    Args:
        upload_path (str): The path where the file should be uploaded.
        content (bytes): The content of the file to be uploaded.

    Returns:
        bool: True if successful, False otherwise.

    Raises:
        ValueError: If URI scheme is unsupported.
    """
    if self.__repo:
        return self.__repo.push_file(f"artifacts/{upload_path}", content)
    return False

run

run(concurrent_task: int = 1) -> None

Runs the Celery worker and registers the processing task.

Parameters:

  • concurrent_task

    (int, default: 1 ) –

    Number of concurrent tasks to process.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
def run(self, concurrent_task: int = 1) -> None:
    """Runs the Celery worker and registers the processing task.

    Args:
        concurrent_task (int): Number of concurrent tasks to process.
    """

    @self._celery_app.task(
        name="do_work",
        queue=self._celery_app.worker_metadata["id"],
        bind=True,
    )
    def __do_work(task: Task, job_dict: Dict[str, Any]) -> Dict[str, Any]:
        job = Job.from_dict(job_dict)
        job.job_metadata["id"] = str(task.request.id)
        dup = deepcopy(job.to_dict())

        try:
            # Store the future "from" value for the next hop
            job._next_from = str(task.request.id)

            self.do_work(job)

            dup["job_metadata"]["state"] = "success"
        except Exception as e:
            error = "".join(format_exception(e))
            self._logger.error(error)
            dup["job_metadata"].update({"state": "failed", "error": error})

        return dup

    self._celery_app.worker_main(
        [
            "--quiet",
            "worker",
            "-n",
            token_urlsafe(12),
            "-c",
            str(concurrent_task),
            "--loglevel=info",
            "-Q",
            self._celery_app.worker_metadata["id"],
        ]
    )

send_task

send_task(job: Job, step: Optional[str] = None) -> None

Sends the job to the next worker in the chain.

Parameters:

  • job

    (Job) –

    The Job instance to forward.

  • step

    (str, default: None ) –

    Optional step to target a specific worker.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
def send_task(self, job: Job, step: Optional[str] = None) -> None:
    """Sends the job to the next worker in the chain.

    Args:
        job (Job): The Job instance to forward.
        step (str): Optional step to target a specific worker.
    """
    # Copy job to avoid inconsistency if send_task is called more than once inside do_work
    dup = deepcopy(job)
    # Store the future "from" value for the next hop
    if dup._next_from is not None:
        dup.job_metadata["from"] = dup._next_from
        dup._next_from = None

    if step:
        substep = dup.execution_chain.get_step(step)
        if not substep:
            raise ValueError(f"Invalid step '{step}'")

        substeps = [substep]
    else:
        substeps = dup.execution_chain.advance_to_next_step() or []

    for s in substeps:
        dup.execution_chain.current_step = s.step
        self._logger.debug(
            "Sending task %s: %s", s, json.dumps(dup.to_dict(), indent=2)
        )
        self._celery_app.send_task(
            "do_work",
            queue=s.package,
            kwargs={"job_dict": dup.to_dict()},
        )

Job

Job(execution_chain: ExecutionChain, job_metadata: Dict[str, Any], job_data: Dict[str, Any] | None = None)

Represents a job in an execution chain.

This class wraps the job's execution logic, data, and metadata. It supports serialization to and from a dictionary for persistence.

Parameters:

  • execution_chain

    (ExecutionChain) –

    The job's execution steps or dependencies.

  • job_metadata

    (Dict[str, Any]) –

    Metadata about the job such as id, predecessor, and state.

  • job_data

    (Dict[str, Any] | None, default: None ) –

    The data associated with the job.

Methods:

  • from_dict

    Creates a Job instance from a dictionary.

  • get_next_worker_args

    Gets the argument dictionaries for the next step(s) in sequence.

  • to_dict

    Converts the Job instance into a dictionary.

Attributes:

  • package (Optional[str]) –

    Retrieves the package name for the current step.

  • worker_args (Dict[str, Any]) –

    Retrieves the argument dictionary for the current step.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def __init__(
    self,
    execution_chain: ExecutionChain,
    job_metadata: Dict[str, Any],
    job_data: Dict[str, Any] | None = None,
) -> None:
    """Initializes a Job instance.

    Args:
        execution_chain (ExecutionChain): The job's execution steps or dependencies.
        job_metadata (Dict[str, Any]): Metadata about the job such as id,
                                       predecessor, and state.
        job_data (Dict[str, Any] | None): The data associated with the job.
    """
    self.execution_chain = execution_chain
    self.job_data = job_data or {}
    self.job_metadata = job_metadata
    # Internal, not serialized
    self._next_from: str | None = None

package property

package: Optional[str]

Retrieves the package name for the current step.

Returns:

  • Optional[str]

    Optional[str]: The package name of the module responsible for the current step,

  • Optional[str]

    or None if unavailable.

worker_args property

worker_args: Dict[str, Any]

Retrieves the argument dictionary for the current step.

Returns:

  • Dict[str, Any]

    Dict[str, Any]: The arguments associated with the current step in the execution chain.

from_dict classmethod

from_dict(data: Dict[str, Any]) -> Job

Creates a Job instance from a dictionary.

Parameters:

  • data

    (Dict[str, Any]) –

    The dictionary containing job data, structured as: { "execution_chain": {...}, "job_data": {...}, "job_metadata": {...} }

Returns:

  • Job ( Job ) –

    A new Job instance created from the given dictionary.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Job":
    """Creates a Job instance from a dictionary.

    Args:
        data (Dict[str, Any]): The dictionary containing job data, structured as:
            {
                "execution_chain": {...},
                "job_data": {...},
                "job_metadata": {...}
            }

    Returns:
        Job: A new Job instance created from the given dictionary.
    """
    execution_chain = ExecutionChain.from_dict(data.get("execution_chain", {}))
    job_data = data.get("job_data", {})
    job_metadata = data["job_metadata"]
    return cls(execution_chain, job_metadata, job_data)

get_next_worker_args

get_next_worker_args() -> List[Tuple[str, Dict[str, Any]]]

Gets the argument dictionaries for the next step(s) in sequence.

Returns:

  • List[Tuple[str, Dict[str, Any]]]

    List[Tuple[str, Dict[str, Any]]]: A list of tuples containing each next step label

  • List[Tuple[str, Dict[str, Any]]]

    and its corresponding argument dictionary.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
295
296
297
298
299
300
301
302
def get_next_worker_args(self) -> List[Tuple[str, Dict[str, Any]]]:
    """Gets the argument dictionaries for the next step(s) in sequence.

    Returns:
        List[Tuple[str, Dict[str, Any]]]: A list of tuples containing each next step label
        and its corresponding argument dictionary.
    """
    return self.execution_chain.get_next_worker_args()

to_dict

to_dict() -> Dict[str, Any]

Converts the Job instance into a dictionary.

Returns:

  • Dict[str, Any]

    Dict[str, Any]: The dictionary representation of the Job.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
264
265
266
267
268
269
270
271
272
273
274
def to_dict(self) -> Dict[str, Any]:
    """Converts the Job instance into a dictionary.

    Returns:
        Dict[str, Any]: The dictionary representation of the Job.
    """
    return {
        "execution_chain": self.execution_chain.to_dict(),
        "job_data": self.job_data,
        "job_metadata": self.job_metadata,
    }

ExecutionStep

ExecutionStep(package: str, args: Dict[str, Any], step: str)

Represent a step in the execution chain of a Job

Parameters:

  • package

    (str) –

    The name of the package containing the step.

  • args

    (Dict[str, Any]) –

    The arguments required for executing the step.

  • step

    (str) –

    The name of the step.

Methods:

  • to_dict

    Converts the execution step to a dictionary representation.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
27
28
29
30
31
32
33
34
35
36
37
def __init__(self, package: str, args: Dict[str, Any], step: str):
    """Initializes a new execution step.

    Args:
        package (str): The name of the package containing the step.
        args (Dict[str, Any]): The arguments required for executing the step.
        step (str): The name of the step.
    """
    self.package = package
    self.args = args
    self.step = step

to_dict

to_dict() -> Dict[str, Any]

Converts the execution step to a dictionary representation.

Returns:

  • Dict[str, Any]

    Dict[str, Any]: A dictionary containing the step's package, arguments, and name.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
39
40
41
42
43
44
45
def to_dict(self) -> Dict[str, Any]:
    """Converts the execution step to a dictionary representation.

    Returns:
        Dict[str, Any]: A dictionary containing the step's package, arguments, and name.
    """
    return {"package": self.package, "args": self.args, "step": self.step}

ExecutionChain

ExecutionChain(execution_steps: List[ExecutionStep], current_step: Optional[str] = None)

Represents an ordered list of execution steps for a Job.

The class manages a sequential chain of ExecutionStep instances that define the workflow of a job. It supports navigation between steps, retrieval of step arguments, and advancement to the next logical set of steps.

Parameters:

  • execution_steps

    (List[ExecutionStep]) –

    The sequence of steps to execute.

  • current_step

    (Optional[str], default: None ) –

    The label of the current step. Defaults to "1" if not provided.

Methods:

  • advance_to_next_step

    Advance to the next major step and return ALL its substeps.

  • from_dict

    Creates an ExecutionChain instance from a dictionary representation.

  • get_next_worker_args

    Retrieves arguments for the next main step(s) in the sequence.

  • get_step

    Retrieves the specified execution step.

  • to_dict

    Converts the execution chain to a dictionary representation.

Attributes:

  • package (Optional[str]) –

    Returns the package name associated with the current step.

  • worker_args (Dict[str, Any]) –

    Returns the argument dictionary for the current step.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
69
70
71
72
73
74
75
76
77
78
79
80
def __init__(
    self, execution_steps: List[ExecutionStep], current_step: Optional[str] = None
):
    """Initializes an execution chain.

    Args:
        execution_steps (List[ExecutionStep]): The sequence of steps to execute.
        current_step (Optional[str]): The label of the current step. Defaults to "1"
            if not provided.
    """
    self.execution_steps = execution_steps
    self.current_step = current_step or self.DEFAULT_STEP

package property

package: Optional[str]

Returns the package name associated with the current step.

Returns:

  • Optional[str]

    Optional[str]: The package name, or None if the current step does not exist.

worker_args property

worker_args: Dict[str, Any]

Returns the argument dictionary for the current step.

Returns:

  • Dict[str, Any]

    Dict[str, Any]: The arguments for the current step, or an empty dict if not found.

advance_to_next_step

advance_to_next_step() -> Optional[List[ExecutionStep]]

Advance to the next major step and return ALL its substeps.

  • Moves from current position (e.g. "3.2") to next major step (e.g. "4")
  • Sets current_step to first substep of that major step
  • Returns all substeps for that major step as a batch

Returns:

  • Optional[List[ExecutionStep]]

    List[ExecutionStep]: All substeps of next major step, or None if complete.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def advance_to_next_step(self) -> Optional[List["ExecutionStep"]]:
    """
    Advance to the next major step and return ALL its substeps.

    - Moves from current position (e.g. "3.2") to next major step (e.g. "4")
    - Sets `current_step` to first substep of that major step
    - Returns all substeps for that major step as a batch

    Returns:
        List[ExecutionStep]: All substeps of next major step, or None if complete.
    """
    # Inline grouping: {main_num: [steps]}
    groups: Dict[int, List["ExecutionStep"]] = {}
    for step in self.execution_steps:
        try:
            main_num = int(step.step.split(".", 1)[0])
            groups.setdefault(main_num, []).append(step)
        except ValueError:
            continue

    current_main = int(self.current_step.split(".", 1)[0])

    # Get next major step substeps
    next_steps = next(
        (steps for n, steps in groups.items() if n > current_main), None
    )

    if next_steps is None:
        return None

    self.current_step = min(step.step for step in next_steps)
    return [deepcopy(step) for step in next_steps]

from_dict classmethod

from_dict(data: Dict[str, Any]) -> ExecutionChain

Creates an ExecutionChain instance from a dictionary representation.

Parameters:

  • data

    (Dict[str, Any]) –

    A dictionary containing step data and current step label. Expected format: { "execution_steps": [ {"package": str, "args": Dict[str, Any], "step": str}, ... ], "current_step": str }

Returns:

  • ExecutionChain ( ExecutionChain ) –

    A new instance built from the provided dictionary.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ExecutionChain":
    """Creates an ExecutionChain instance from a dictionary representation.

    Args:
        data (Dict[str, Any]): A dictionary containing step data and current step label.
            Expected format:
            {
                "execution_steps": [
                    {"package": str, "args": Dict[str, Any], "step": str},
                    ...
                ],
                "current_step": str
            }

    Returns:
        ExecutionChain: A new instance built from the provided dictionary.
    """
    steps = [
        ExecutionStep(
            package=step["package"], args=step.get("args", {}), step=step["step"]
        )
        for step in data.get("execution_steps", [])
    ]
    return cls(execution_steps=steps, current_step=data.get("current_step"))

get_next_worker_args

get_next_worker_args() -> List[Tuple[str, Dict[str, Any]]]

Retrieves arguments for the next main step(s) in the sequence.

The method identifies the next major step (based on numeric prefixes) and collects the arguments for all substeps within that next step.

Returns:

  • List[Tuple[str, Dict[str, Any]]]

    List[Tuple[str, Dict[str, Any]]]: A list of tuples where each tuple contains the

  • List[Tuple[str, Dict[str, Any]]]

    step identifier and a copy of its argument dictionary. Returns an empty list if there

  • List[Tuple[str, Dict[str, Any]]]

    are no subsequent steps.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def get_next_worker_args(self) -> List[Tuple[str, Dict[str, Any]]]:
    """Retrieves arguments for the next main step(s) in the sequence.

    The method identifies the next major step (based on numeric prefixes)
    and collects the arguments for all substeps within that next step.

    Returns:
        List[Tuple[str, Dict[str, Any]]]: A list of tuples where each tuple contains the
        step identifier and a copy of its argument dictionary. Returns an empty list if there
        are no subsequent steps.
    """
    args = []

    try:
        current_main = int(self.current_step.split(".", 1)[0])
    except ValueError:
        return []

    next_main = current_main + 1

    for step in self.execution_steps:
        try:
            main_index = int(step.step.split(".", 1)[0])
        except ValueError:
            continue
        if main_index == next_main:
            args.append((step.step, deepcopy(step.args or {})))

    return args

get_step

get_step(step: str) -> Optional[ExecutionStep]

Retrieves the specified execution step.

Parameters:

  • step

    (str) –

    The identifier of the step to retrieve.

Returns:

  • Optional[ExecutionStep]

    Optional[ExecutionStep]: The matching execution step, or None if not found.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
119
120
121
122
123
124
125
126
127
128
129
130
131
def get_step(self, step: str) -> Optional[ExecutionStep]:
    """Retrieves the specified execution step.

    Args:
        step (str): The identifier of the step to retrieve.

    Returns:
        Optional[ExecutionStep]: The matching execution step, or None if not found.
    """
    for s in self.execution_steps:
        if s.step == step:
            return s
    return None

to_dict

to_dict() -> Dict[str, Any]

Converts the execution chain to a dictionary representation.

Returns:

  • Dict[str, Any]

    Dict[str, Any]: A dictionary describing all steps and the current step.

Source code in venv/lib/python3.12/site-packages/sighthouse/pipeline/worker.py
108
109
110
111
112
113
114
115
116
117
def to_dict(self) -> Dict[str, Any]:
    """Converts the execution chain to a dictionary representation.

    Returns:
        Dict[str, Any]: A dictionary describing all steps and the current step.
    """
    return {
        "execution_steps": [step.to_dict() for step in self.execution_steps],
        "current_step": self.current_step,
    }