How to use the b2luigi.core.settings.get_setting function in b2luigi

To help you get started, we’ve selected a few b2luigi examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github nils-braun / b2luigi / b2luigi / batch / processes / gbasf2.py View on Github external
def get_gbasf2_env(gbasf2_install_directory=None):
    """
    Return the gbasf2 environment dict which can be used to run gbasf2 commands.

    :param gbasf2_install_directory: Directory into which gbasf2 has been
        installed.  When set to the default value ``None``, it looks for the
        value of the ``gbasf2_install_directory`` setting and when that is not
        set, it uses the default of most installation instructions, which is
        ``~/gbasf2KEK``.
    :return: Dictionary containing the  environment that you get from sourcing the gbasf2 setup script.
    """
    if gbasf2_install_directory is None:
        gbasf2_install_directory = get_setting("gbasf2_install_directory", default="~/gbasf2KEK")
    gbasf2_setup_path = os.path.join(gbasf2_install_directory, "BelleDIRAC/gbasf2/tools/setup")
    if not os.path.isfile(os.path.expanduser(gbasf2_setup_path)):
        raise FileNotFoundError(
            f"Could not find gbasf2 setup files in ``{gbasf2_install_directory}``.\n" +
            "Make sure to that gbasf2 is installed at that location."
        )
    # complete bash command to set up the gbasf2 environment
    # piping output to /dev/null, because we want that our final script only prints the ``env`` output
    gbasf2_setup_command_str = f"source {gbasf2_setup_path} > /dev/null"
    # command to execute the gbasf2 setup command in a fresh shell and output the produced environment
    echo_gbasf2_env_command = shlex.split(f"env -i bash -c '{gbasf2_setup_command_str} > /dev/null && env'")
    gbasf2_env_string = subprocess.run(
        echo_gbasf2_env_command, check=True, stdout=subprocess.PIPE, encoding="utf-8"
    ).stdout
    gbasf2_env = dict(line.split("=", 1) for line in gbasf2_env_string.splitlines())
    return gbasf2_env
github nils-braun / b2luigi / b2luigi / batch / processes / gbasf2.py View on Github external
if gbasf2_input_dataset is not False:
            gbasf2_command_str += f" -i {gbasf2_input_dataset} "

        gbasf2_n_repition_jobs = get_setting("gbasf2_n_repition_job", default=False, task=self.task)
        if gbasf2_n_repition_jobs is not False:
            gbasf2_command_str += f" --repetition {gbasf2_n_repition_jobs} "

        # now add some additional optional options to the gbasf2 job submission string

        # whether to ask user for confirmation before submitting job
        force_submission = get_setting("gbasf2_force_submission", default=True, task=self.task)
        if force_submission:
            gbasf2_command_str += " --force "

        # estimated cpu time per sub-job in minutes
        cpu_minutes = get_setting("gbasf2_cputime", default=False, task=self.task)
        if cpu_minutes is not False:
            gbasf2_command_str += f" --cputime {cpu_minutes} "

        # estimated number or processed events per second
        evtpersec = get_setting("gbasf2_evtpersec", default=False, task=self.task)
        if evtpersec is not False:
            gbasf2_command_str += f" --evtpersec {evtpersec} "

        # gbasf2 job priority
        priority = get_setting("gbasf2_priority", default=False, task=self.task)
        if priority is not False:
            assert 0 <= priority <= 10, "Priority should be integer between 0 and 10."
            gbasf2_command_str += f" --priority {priority} "

        # gbasf2 job type (e.g. User, Production, ...)
        jobtype = get_setting("gbasf2_jobtype", default=False, task=self.task)
github nils-braun / b2luigi / b2luigi / batch / processes / gbasf2.py View on Github external
force_submission = get_setting("gbasf2_force_submission", default=True, task=self.task)
        if force_submission:
            gbasf2_command_str += " --force "

        # estimated cpu time per sub-job in minutes
        cpu_minutes = get_setting("gbasf2_cputime", default=False, task=self.task)
        if cpu_minutes is not False:
            gbasf2_command_str += f" --cputime {cpu_minutes} "

        # estimated number or processed events per second
        evtpersec = get_setting("gbasf2_evtpersec", default=False, task=self.task)
        if evtpersec is not False:
            gbasf2_command_str += f" --evtpersec {evtpersec} "

        # gbasf2 job priority
        priority = get_setting("gbasf2_priority", default=False, task=self.task)
        if priority is not False:
            assert 0 <= priority <= 10, "Priority should be integer between 0 and 10."
            gbasf2_command_str += f" --priority {priority} "

        # gbasf2 job type (e.g. User, Production, ...)
        jobtype = get_setting("gbasf2_jobtype", default=False, task=self.task)
        if jobtype is not False:
            gbasf2_command_str += f" --jobtype {jobtype} "

        # additional basf2 options to use on grid
        basf2opt = get_setting("gbasf2_basf2opt", default=False, task=self.task)
        if basf2opt is not False:
            gbasf2_command_str += f" --basf2opt='{basf2opt}' "

        # optional string of additional parameters to append to gbasf2 command
        gbasf2_additional_params = get_setting("gbasf2_additional_params", default=False, task=self.task)
github nils-braun / b2luigi / b2luigi / batch / processes / htcondor.py View on Github external
stdout_log_file = os.path.abspath(os.path.join(log_file_dir, "stdout"))
        submit_file_content.append(f"output = {stdout_log_file}")

        stderr_log_file = os.path.abspath(os.path.join(log_file_dir, "stderr"))
        submit_file_content.append(f"error = {stderr_log_file}")

        job_log_file = os.path.abspath(os.path.join(log_file_dir, "job.log"))
        submit_file_content.append(f"log = {job_log_file}")

        # Specify the executable
        executable_file = create_executable_wrapper(self.task)
        submit_file_content.append(f"executable = {os.path.basename(executable_file)}")

        # Specify additional settings
        general_settings = get_setting("htcondor_settings", dict())
        try:
            general_settings.update(self.task.htcondor_settings)
        except AttributeError:
            pass

        transfer_files = get_setting("transfer_files", task=self.task, default=[])
        if transfer_files:            
            working_dir = get_setting("working_dir", task=self.task, default="")
            if not working_dir or working_dir != ".":
                raise ValueError("If using transfer_files, the working_dir must be explicitely set to '.'")

            general_settings.setdefault("should_transfer_files", "YES")
            general_settings.setdefault("when_to_transfer_output", "ON_EXIT")
            
            transfer_files = set(transfer_files)
github nils-braun / b2luigi / b2luigi / core / executable.py View on Github external
def create_executable_wrapper(task):
    """
    To incorporate all settings (environment, working paths, remote or locally)
    we create an executable bash script which is called instead of the application
    and which will setup everything accordingly before doing the actual work.
    """
    shell = get_setting("shell", task=task, default="bash")
    executable_wrapper_content = [f"#!/bin/{shell}", "set -e"]

    # 1. First part is the folder we need to change if given
    working_dir = get_setting("working_dir", task=task, default=os.path.abspath(os.path.dirname(get_filename())))
    executable_wrapper_content.append(f"cd {working_dir}")

    executable_wrapper_content.append("echo 'Working in the folder:'; pwd")

    # 2. Second part of the executable wrapper, the environment.
    executable_wrapper_content.append("echo 'Setting up the environment'")
    # (a) If given, use the environment script
    env_setup_script = get_setting("env_script", task=task, default="")
    if env_setup_script:
        # The script will be called from the directory of the script. So we have to make sure the
        # env_script is reachable from there (not from where we are currently)
        if not os.path.isfile(map_folder(env_setup_script)):
            raise FileNotFoundError(f"Environment setup script {env_setup_script} does not exist.")
        executable_wrapper_content.append(f"source {env_setup_script}")

    # (b) Now override with any environment from the task or settings
github nils-braun / b2luigi / b2luigi / batch / processes / gbasf2.py View on Github external
def _build_gbasf2_submit_command(self):
        """
        Function to create the gbasf2 submit command to pass to run_with_gbasf2
        from the task options and attributes.
        """
        gbasf2_release = get_setting("gbasf2_release", default=get_basf2_git_hash(), task=self.task)
        gbasf2_additional_files = get_setting("gbasf2_additional_files", default=[], task=self.task)
        assert not isinstance(gbasf2_additional_files, str), "gbasf2_additional_files should be a list or tuple, not a string."
        gbasf2_input_sandbox_files = [os.path.basename(self.pickle_file_path)] + gbasf2_additional_files
        gbasf2_command_str = (f"gbasf2 {self.wrapper_file_path} -f {' '.join(gbasf2_input_sandbox_files)} " +
                              f"-p {self.gbasf2_project_name} -s {gbasf2_release} ")

        gbasf2_input_dataset = get_setting("gbasf2_input_dataset", default=False, task=self.task)
        if gbasf2_input_dataset is not False:
            gbasf2_command_str += f" -i {gbasf2_input_dataset} "

        gbasf2_n_repition_jobs = get_setting("gbasf2_n_repition_job", default=False, task=self.task)
        if gbasf2_n_repition_jobs is not False:
            gbasf2_command_str += f" --repetition {gbasf2_n_repition_jobs} "

        # now add some additional optional options to the gbasf2 job submission string

        # whether to ask user for confirmation before submitting job
github nils-braun / b2luigi / b2luigi / batch / processes / htcondor.py View on Github external
job_log_file = os.path.abspath(os.path.join(log_file_dir, "job.log"))
        submit_file_content.append(f"log = {job_log_file}")

        # Specify the executable
        executable_file = create_executable_wrapper(self.task)
        submit_file_content.append(f"executable = {os.path.basename(executable_file)}")

        # Specify additional settings
        general_settings = get_setting("htcondor_settings", dict())
        try:
            general_settings.update(self.task.htcondor_settings)
        except AttributeError:
            pass

        transfer_files = get_setting("transfer_files", task=self.task, default=[])
        if transfer_files:            
            working_dir = get_setting("working_dir", task=self.task, default="")
            if not working_dir or working_dir != ".":
                raise ValueError("If using transfer_files, the working_dir must be explicitely set to '.'")

            general_settings.setdefault("should_transfer_files", "YES")
            general_settings.setdefault("when_to_transfer_output", "ON_EXIT")
            
            transfer_files = set(transfer_files)

            for transfer_file in transfer_files:
                if os.path.abspath(transfer_file) != transfer_file:
                    raise ValueError(f"You should only give absolute file names in transfer_files! {os.path.abspath(transfer_file)} != {transfer_file}")

            env_setup_script = get_setting("env_script", task=self.task, default="")
            if env_setup_script:
github nils-braun / b2luigi / b2luigi / batch / processes / gbasf2.py View on Github external
def _build_gbasf2_submit_command(self):
        """
        Function to create the gbasf2 submit command to pass to run_with_gbasf2
        from the task options and attributes.
        """
        gbasf2_release = get_setting("gbasf2_release", default=get_basf2_git_hash(), task=self.task)
        gbasf2_additional_files = get_setting("gbasf2_additional_files", default=[], task=self.task)
        assert not isinstance(gbasf2_additional_files, str), "gbasf2_additional_files should be a list or tuple, not a string."
        gbasf2_input_sandbox_files = [os.path.basename(self.pickle_file_path)] + gbasf2_additional_files
        gbasf2_command_str = (f"gbasf2 {self.wrapper_file_path} -f {' '.join(gbasf2_input_sandbox_files)} " +
                              f"-p {self.gbasf2_project_name} -s {gbasf2_release} ")

        gbasf2_input_dataset = get_setting("gbasf2_input_dataset", default=False, task=self.task)
        if gbasf2_input_dataset is not False:
            gbasf2_command_str += f" -i {gbasf2_input_dataset} "

        gbasf2_n_repition_jobs = get_setting("gbasf2_n_repition_job", default=False, task=self.task)
        if gbasf2_n_repition_jobs is not False:
            gbasf2_command_str += f" --repetition {gbasf2_n_repition_jobs} "

        # now add some additional optional options to the gbasf2 job submission string

        # whether to ask user for confirmation before submitting job
        force_submission = get_setting("gbasf2_force_submission", default=True, task=self.task)
        if force_submission:
            gbasf2_command_str += " --force "

        # estimated cpu time per sub-job in minutes
        cpu_minutes = get_setting("gbasf2_cputime", default=False, task=self.task)