HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //snap/certbot/current/lib64/python3.12/site-packages/certbot/_internal/hooks.py
"""Facilities for implementing hooks that call shell commands."""

import logging
from typing import Optional

from certbot import configuration
from certbot import errors
from certbot import util
from certbot._internal import san
from certbot.compat import filesystem
from certbot.compat import misc
from certbot.compat import os
from certbot.display import ops as display_ops
from certbot.plugins import util as plug_util

logger = logging.getLogger(__name__)


def validate_hooks(config: configuration.NamespaceConfig) -> None:
    """Check hook commands are executable."""
    validate_hook(config.pre_hook, "pre")
    validate_hook(config.post_hook, "post")
    validate_hook(config.deploy_hook, "deploy")
    validate_hook(config.renew_hook, "renew")


def _prog(shell_cmd: str) -> Optional[str]:
    """Extract the program run by a shell command.

    :param str shell_cmd: command to be executed

    :returns: basename of command or None if the command isn't found
    :rtype: str or None

    """
    if not util.exe_exists(shell_cmd):
        plug_util.path_surgery(shell_cmd)
        if not util.exe_exists(shell_cmd):
            return None

    return os.path.basename(shell_cmd)


def validate_hook(shell_cmd: str, hook_name: str) -> None:
    """Check that a command provided as a hook is plausibly executable.

    :raises .errors.HookCommandNotFound: if the command is not found
    """
    if shell_cmd:
        cmd = shell_cmd.split(None, 1)[0]
        if not _prog(cmd):
            path = os.environ["PATH"]
            if os.path.exists(cmd):
                msg = f"{cmd}-hook command {hook_name} exists, but is not executable."
            else:
                msg = (
                    f"Unable to find {hook_name}-hook command {cmd} in the PATH.\n(PATH is "
                    f"{path})\nSee also the --disable-hook-validation option."
                )

            raise errors.HookCommandNotFound(msg)


def pre_hook(config: configuration.NamespaceConfig) -> None:
    """Run pre-hooks if they exist and haven't already been run.

    When Certbot is running with the renew subcommand, this function
    runs any hooks found in the config.renewal_pre_hooks_dir (if they
    have not already been run) followed by any pre-hook in the config.
    If hooks in config.renewal_pre_hooks_dir are run and the pre-hook in
    the config is a path to one of these scripts, it is not run twice.

    :param configuration.NamespaceConfig config: Certbot settings

    """
    all_hooks: list[str] = (list_hooks(config.renewal_pre_hooks_dir) if config.directory_hooks
        else [])
    all_hooks += [config.pre_hook] if config.pre_hook else []
    for hook in all_hooks:
        _run_pre_hook_if_necessary(hook)


executed_pre_hooks: set[str] = set()


def _run_pre_hook_if_necessary(command: str) -> None:
    """Run the specified pre-hook if we haven't already.

    If we've already run this exact command before, a message is logged
    saying the pre-hook was skipped.

    :param str command: pre-hook to be run

    """
    if command in executed_pre_hooks:
        logger.info("Pre-hook command already run, skipping: %s", command)
    else:
        _run_hook("pre-hook", command)
        executed_pre_hooks.add(command)


def post_hook(
    config: configuration.NamespaceConfig,
    renewed_sans: list[san.SAN]
) -> None:

    """Run post-hooks if defined.

    This function also registers any executables found in
    config.renewal_post_hooks_dir to be run when Certbot is used with
    the renew subcommand.

    If the verb is renew, we delay executing any post-hooks until
    :func:`run_saved_post_hooks` is called. In this case, this function
    registers all hooks found in config.renewal_post_hooks_dir to be
    called followed by any post-hook in the config. If the post-hook in
    the config is a path to an executable in the post-hook directory, it
    is not scheduled to be run twice.

    :param configuration.NamespaceConfig config: Certbot settings

    """

    all_hooks: list[str] = (list_hooks(config.renewal_post_hooks_dir) if config.directory_hooks
        else [])
    all_hooks += [config.post_hook] if config.post_hook else []
    # In the "renew" case, we save these up to run at the end
    if config.verb == "renew":
        for hook in all_hooks:
            _run_eventually(hook)
    # certonly / run
    else:
        renewed_sans_str = ' '.join(map(str, renewed_sans))
        # 32k is reasonable on Windows and likely quite conservative on other platforms
        if len(renewed_sans_str) > 32_000:
            logger.warning("Limiting RENEWED_DOMAINS environment variable to 32k characters")
            renewed_sans_str = renewed_sans_str[:32_000]
        for hook in all_hooks:
            _run_hook(
                "post-hook",
                hook,
                {
                    'RENEWED_DOMAINS': renewed_sans_str,
                    # Since other commands stop certbot execution on failure,
                    # it doesn't make sense to have a FAILED_DOMAINS variable
                    'FAILED_DOMAINS': ""
                }
            )


post_hooks: list[str] = []


def _run_eventually(command: str) -> None:
    """Registers a post-hook to be run eventually.

    All commands given to this function will be run exactly once in the
    order they were given when :func:`run_saved_post_hooks` is called.

    :param str command: post-hook to register to be run

    """
    if command not in post_hooks:
        post_hooks.append(command)


def run_saved_post_hooks(renewed_sans: list[san.SAN], failed_sans: list[san.SAN]) -> None:
    """Run any post hooks that were saved up in the course of the 'renew' verb"""

    renewed_sans_str = ' '.join(map(str, renewed_sans))
    failed_sans_str = ' '.join(map(str, failed_sans))

    # 32k combined is reasonable on Windows and likely quite conservative on other platforms
    if len(renewed_sans_str) > 16_000:
        logger.warning("Limiting RENEWED_DOMAINS environment variable to 16k characters")
        renewed_sans_str = renewed_sans_str[:16_000]

    if len(failed_sans_str) > 16_000:
        logger.warning("Limiting FAILED_DOMAINS environment variable to 16k characters")
        renewed_sans_str = failed_sans_str[:16_000]

    for cmd in post_hooks:
        _run_hook(
            "post-hook",
            cmd,
            {
                'RENEWED_DOMAINS': renewed_sans_str,
                'FAILED_DOMAINS': failed_sans_str
            }
        )


def deploy_hook(config: configuration.NamespaceConfig, sans: list[san.SAN],
                lineage_path: str) -> None:
    """Run post-issuance hook if defined.

    :param configuration.NamespaceConfig config: Certbot settings
    :param sans: domains and/or IP addresses in the obtained certificate
    :type sans: `list` of `str`
    :param str lineage_path: live directory path for the new cert

    """
    if config.deploy_hook:
        _run_deploy_hook(config.deploy_hook, sans,
                         lineage_path, config.dry_run, config.run_deploy_hooks)


def renew_hook(config: configuration.NamespaceConfig, sans: list[san.SAN],
               lineage_path: str) -> None:
    """Run post-renewal hooks.

    This function runs any hooks found in
    config.renewal_deploy_hooks_dir followed by any renew-hook in the
    config. If the renew-hook in the config is a path to a script in
    config.renewal_deploy_hooks_dir, it is not run twice.

    If Certbot is doing a dry run, no hooks are run and messages are
    logged saying that they were skipped.

    :param configuration.NamespaceConfig config: Certbot settings
    :param sans: domains and/or IP addresses in the obtained certificate
    :type sans: `list` of `san.SAN`
    :param str lineage_path: live directory path for the new cert

    """
    executed_hooks = set()
    all_hooks: list[str] = (list_hooks(config.renewal_deploy_hooks_dir)if config.directory_hooks
        else [])
    all_hooks += [config.renew_hook] if config.renew_hook else []
    for hook in all_hooks:
        if hook in executed_hooks:
            logger.info("Skipping deploy-hook '%s' as it was already run.", hook)
        else:
            _run_deploy_hook(hook, sans, lineage_path, config.dry_run, config.run_deploy_hooks)
            executed_hooks.add(hook)


def _run_deploy_hook(command: str, sans: list[san.SAN], lineage_path: str, dry_run: bool,
                     run_deploy_hooks: bool) -> None:
    """Run the specified deploy-hook (if not doing a dry run).

    If dry_run is True, command is not run and a message is logged
    saying that it was skipped. If dry_run is False, the hook is run
    after setting the appropriate environment variables.

    :param str command: command to run as a deploy-hook
    :param sans: domains and/or IP addresses in the obtained certificate
    :type sans: `list` of `san.SAN`
    :param str lineage_path: live directory path for the new cert
    :param bool dry_run: True iff Certbot is doing a dry run
    :param bool run_deploy_hooks: True if deploy hooks should run despite Certbot doing a dry run

    """
    if dry_run and not run_deploy_hooks:
        logger.info("Dry run: skipping deploy hook command: %s",
                       command)
        return

    os.environ["RENEWED_DOMAINS"] = " ".join(map(str, sans))
    os.environ["RENEWED_LINEAGE"] = lineage_path
    _run_hook("deploy-hook", command)


def _run_hook(cmd_name: str, shell_cmd: str, extra_env: Optional[dict[str, str]] = None) -> str:
    """Run a hook command.

    :param str cmd_name: the user facing name of the hook being run
    :param shell_cmd: shell command to execute
    :type shell_cmd: `list` of `str` or `str`
    :param dict extra_env: extra environment variables to set
    :type extra_env: `dict` of `str` to `str`

    :returns: stderr if there was any"""
    env = util.env_no_snap_for_external_calls()
    env.update(extra_env or {})
    returncode, err, out = misc.execute_command_status(
        cmd_name, shell_cmd, env=env)
    display_ops.report_executed_command(f"Hook '{cmd_name}'", returncode, out, err)
    return err


def list_hooks(dir_path: str) -> list[str]:
    """List paths to all hooks found in dir_path in sorted order.

    :param str dir_path: directory to search

    :returns: `list` of `str`
    :rtype: sorted list of absolute paths to executables in dir_path

    """
    allpaths = (os.path.join(dir_path, f) for f in os.listdir(dir_path))
    hooks = [path for path in allpaths if filesystem.is_executable(path) and not path.endswith('~')]
    return sorted(hooks)