Skip to content

hpcman.queue.util

Classes

SlurmJob

Source code in hpcman/hpcman/queue/util.py
@dataclass
class SlurmJob:
    # Parameters
    job_id: int
    cluster: str
    # Other Parameters
    admin_comment: str
    comment: str
    wckey: str
    derived_exit_code: int
    extra: str
    # Attributes
    steps: Any
    stats: Any
    account: str
    num_nodes: int
    array_id: int
    array_tasks_parallel: int
    array_task_id: int
    array_tasks_waiting: str
    association_id: int
    block_id: str
    cluster: str
    constraints: str
    container: str
    db_index: int
    derived_exit_code: int
    derived_exit_code_signal: int
    comment: str
    elapsed_time: int
    eligible_time: int
    end_time: int
    extra: str
    exit_code: int
    exit_code_signal: int
    failed_node: str
    group_id: int
    group_name: str
    id: int
    name: str
    mcs_label: str
    nodelist: str
    partition: str
    priority: int
    qos: str
    cpus: int
    memory: int
    reservation: str
    script: str
    start_time: int
    state: str
    state_reason: str
    cancelled_by: str
    submit_time: int
    submit_command: str
    suspended_time: int
    system_comment: str
    time_limit: int
    user_id: int
    user_name: str
    wckey: str
    working_directory: str

Attributes

Name Type Description
job_id int
cluster str
admin_comment str
comment str
wckey str
derived_exit_code int
extra str
steps Any
stats Any
account str
num_nodes int
array_id int
array_tasks_parallel int
array_task_id int
array_tasks_waiting str
association_id int
block_id str
constraints str
container str
db_index int
derived_exit_code_signal int
elapsed_time int
eligible_time int
end_time int
exit_code int
exit_code_signal int
failed_node str
group_id int
group_name str
id int
name str
mcs_label str
nodelist str
partition str
priority int
qos str
cpus int
memory int
reservation str
script str
start_time int
state str
state_reason str
cancelled_by str
submit_time int
submit_command str
suspended_time int
system_comment str
time_limit int
user_id int
user_name str
working_directory str

ActiveSlurmJob

Source code in hpcman/hpcman/queue/util.py
@dataclass
class ActiveSlurmJob:
    steps: Any
    stats: Any
    pids: dict[str, list]
    name: str
    id: int
    association_id: int
    account: str
    user_id: int
    user_name: str
    group_id: int
    group_name: str
    priority: int
    nice: int
    qos: str
    min_cpus_per_node: int
    state: str
    state_reason: str
    is_requeueable: bool
    requeue_count: int
    is_batch_job: bool
    node_reboot_required: bool
    dependencies: dict
    time_limit: int
    time_limit_min: int
    submit_time: int
    eligible_time: int
    accrue_time: int
    start_time: int
    resize_time: int
    deadline: int
    preempt_eligible_time: int
    preempt_time: int
    suspend_time: int
    last_sched_evaluation_time: int
    pre_suspension_time: int
    mcs_label: str
    partition: str
    submit_host: str
    batch_host: str
    num_nodes: int
    max_nodes: int
    allocated_nodes: str
    required_nodes: str
    excluded_nodes: str
    scheduled_nodes: str
    derived_exit_code: int
    derived_exit_code_signal: int
    exit_code: int
    exit_code_signal: int
    batch_constraints: list
    federation_origin: str
    federation_siblings_active: int
    federation_siblings_viable: int
    cpus: int
    cpus_per_task: int
    cpus_per_gpu: int
    boards_per_node: int
    sockets_per_board: int
    sockets_per_node: int
    cores_per_socket: int
    threads_per_core: int
    ntasks: int
    ntasks_per_node: int
    ntasks_per_board: int
    ntasks_per_socket: int
    ntasks_per_core: int
    ntasks_per_gpu: int
    delay_boot_time: int
    constraints: list
    cluster: str
    cluster_constraints: list
    reservation: str
    resource_sharing: str
    requires_contiguous_nodes: bool
    licenses: list
    network: str
    command: str
    working_directory: str
    admin_comment: str
    system_comment: str
    container: str
    comment: str
    standard_input: str
    standard_output: str
    standard_error: str
    required_switches: int
    max_wait_time_switches: int
    burst_buffer: str
    burst_buffer_state: str
    cpu_frequency_min: Union[str, int]
    cpu_frequency_max: Union[str, int]
    cpu_frequency_governor: Union[str, int]
    billable_tres: float
    wckey: str
    mail_user: list
    mail_types: list
    heterogeneous_id: int
    heterogeneous_offset: int
    temporary_disk_per_node: int
    array_id: int
    array_tasks_parallel: int
    array_task_id: int
    array_tasks_waiting: str
    end_time: int
    run_time: int
    cores_reserved_for_system: int
    threads_reserved_for_system: int
    memory: int
    memory_per_cpu: int
    memory_per_node: int
    memory_per_gpu: int
    gres_per_node: dict
    profile_types: list
    gres_binding: str
    kill_on_invalid_dependency: bool
    spreads_over_nodes: bool
    power_options: list
    is_cronjob: bool
    cronjob_time: str
    elapsed_cpu_time: int
    run_time_remaining: int

Attributes

Name Type Description
steps Any
stats Any
pids dict[str, list]
name str
id int
association_id int
account str
user_id int
user_name str
group_id int
group_name str
priority int
nice int
qos str
min_cpus_per_node int
state str
state_reason str
is_requeueable bool
requeue_count int
is_batch_job bool
node_reboot_required bool
dependencies dict
time_limit int
time_limit_min int
submit_time int
eligible_time int
accrue_time int
start_time int
resize_time int
deadline int
preempt_eligible_time int
preempt_time int
suspend_time int
last_sched_evaluation_time int
pre_suspension_time int
mcs_label str
partition str
submit_host str
batch_host str
num_nodes int
max_nodes int
allocated_nodes str
required_nodes str
excluded_nodes str
scheduled_nodes str
derived_exit_code int
derived_exit_code_signal int
exit_code int
exit_code_signal int
batch_constraints list
federation_origin str
federation_siblings_active int
federation_siblings_viable int
cpus int
cpus_per_task int
cpus_per_gpu int
boards_per_node int
sockets_per_board int
sockets_per_node int
cores_per_socket int
threads_per_core int
ntasks int
ntasks_per_node int
ntasks_per_board int
ntasks_per_socket int
ntasks_per_core int
ntasks_per_gpu int
delay_boot_time int
constraints list
cluster str
cluster_constraints list
reservation str
resource_sharing str
requires_contiguous_nodes bool
licenses list
network str
command str
working_directory str
admin_comment str
system_comment str
container str
comment str
standard_input str
standard_output str
standard_error str
required_switches int
max_wait_time_switches int
burst_buffer str
burst_buffer_state str
cpu_frequency_min Union[str, int]
cpu_frequency_max Union[str, int]
cpu_frequency_governor Union[str, int]
billable_tres float
wckey str
mail_user list
mail_types list
heterogeneous_id int
heterogeneous_offset int
temporary_disk_per_node int
array_id int
array_tasks_parallel int
array_task_id int
array_tasks_waiting str
end_time int
run_time int
cores_reserved_for_system int
threads_reserved_for_system int
memory int
memory_per_cpu int
memory_per_node int
memory_per_gpu int
gres_per_node dict
profile_types list
gres_binding str
kill_on_invalid_dependency bool
spreads_over_nodes bool
power_options list
is_cronjob bool
cronjob_time str
elapsed_cpu_time int
run_time_remaining int

Functions

is_partitions_set

def is_partitions_set(
    value: dict[str, SlurmPartition] | None
) -> TypeGuard[dict[str, SlurmPartition]]

Sets partition type for type checker

Source code in hpcman/hpcman/queue/util.py
def is_partitions_set(value: dict[str, SlurmPartition] | None) -> TypeGuard[dict[str, SlurmPartition]]:
    """Sets partition type for type checker"""
    return value is not None

is_nodes_set

def is_nodes_set(
    value: dict[str, SlurmNode] | None
) -> TypeGuard[dict[str, SlurmNode]]

Sets node type for type checker

Source code in hpcman/hpcman/queue/util.py
def is_nodes_set(value: dict[str, SlurmNode] | None) -> TypeGuard[dict[str, SlurmNode]]:
    """Sets node type for type checker"""
    return value is not None

give_suggestions

def give_suggestions(
    query: str,
    choices: list[str],
    suggestion_type: str,
    cutoff_min: int = 50,
    cutoff_suggest: int = 85
) -> None

Provide suggestions from list using rapidfuzz

Source code in hpcman/hpcman/queue/util.py
def give_suggestions(
    query: str, choices: list[str], suggestion_type: str, cutoff_min: int = 50, cutoff_suggest: int = 85
) -> None:
    """Provide suggestions from list using rapidfuzz"""
    import rapidfuzz as rf

    suggestions = rf.process.extract(query, choices, scorer=rf.fuzz.QRatio, score_cutoff=cutoff_min)
    if len(suggestions) == 0:
        rprint(f"Provided {suggestion_type} '{query}' is invalid. Choose from this list and try again: {choices}")
    elif (suggest := suggestions[0])[1] > cutoff_suggest:
        rprint(f"Provided {suggestion_type} '{query}' is invalid. Did you mean '{suggest[0]}'?")
    else:
        suggests = [item[0] for item in suggestions[:5]]
        if len(suggests) == 1:
            suggests = choices
        rprint(f"Provided {suggestion_type} '{query}' is invalid. Choose from this list and try again: {suggests}")

check_if_valid_host

def check_if_valid_host(
    queuetype: QueueType,
    debug: bool,
    hosttype: HostType = HostType.SUBMIT
) -> bool

Checks if the current host is a submit host.

Uses the qconf program to determine if the current host is a submit host.

Need to implement a similar feature for SLURM.

Parameters:

Name Type Description
queuetype QueueType What queue type to check.
debug bool Used in development settings. SubmitHosts can be added in the enum.py file for debugging.

Returns:

Type Description
bool True for success or False for failure
Source code in hpcman/hpcman/queue/util.py
def check_if_valid_host(queuetype: QueueType, debug: bool, hosttype: HostType = HostType.SUBMIT) -> bool:
    """Checks if the current host is a submit host.

    Uses the `qconf` program to determine if the current host is a submit host.

    Need to implement a similar feature for SLURM.

    Args:
        queuetype: What queue type to check.
        debug: Used in development settings. SubmitHosts can be added in the `enum.py` file for debugging.

    Raises:
        CalledProcessError: When there is a problem with the `qconf` program.
        NotASubmitHost: When the current host is not a submit host.
        NotImplementedError: When the queuetype is not supported.

    Returns:
        True for success or False for failure
    """
    hostname = socket.getfqdn()
    if debug:
        load_dotenv(override=True)
        submit_hosts: list[str] = [str(environ.get("SUBMIT_HOST"))]
    else:
        if queuetype is QueueType.SGE:
            opt = "-ss"
            if hosttype is HostType.ADMIN:
                opt = "-sh"
            try:
                submit_hosts = subprocess.run(["qconf", opt], capture_output=True, text=True).stdout.split()
            except subprocess.CalledProcessError as e:
                print(f"There was a problem checking the {hosttype.value} host: {e.stderr}")
                raise e
            if hostname not in submit_hosts and hostname not in [el.value for el in SLURMHosts]:
                if hosttype is HostType.SUBMIT:
                    raise NotASubmitHost(f"{hostname} is not a submit host. Try again on shell.cqls.oregonstate.edu.")
                elif hosttype is HostType.ADMIN:
                    print(f"Unable to check SGE queues from {hostname}. Try on shell-hpc.cqls.oregonstate.edu")
                    return False
                else:
                    raise NotImplementedError(f"Cannot find hosts for {hosttype.value}")
        elif queuetype is QueueType.SLURM:
            try:
                _ = subprocess.run(["sinfo"], text=True, check=True, capture_output=True)
            except subprocess.CalledProcessError as e:
                raise NotASubmitHost(
                    f"{hostname} is not a submit host. Try again on shell-hpc.cqls.oregonstate.edu."
                ) from e
            except FileNotFoundError as e:
                raise SinfoMissingError(
                    f"sinfo does not appear to be in your $PATH. Check your $PATH variable and try again."
                )
        else:
            raise NotImplementedError(f"Cannot find submit host for {queuetype}")
    return True

check_valid_slurm_association

def check_valid_slurm_association(
    query: str,
    assoc_type: Literal['users', 'accounts'] = 'users',
    debug: bool = False
) -> bool

Compares provided user or account to list of valid Slurm users or accounts

Suggested use is checking for True/False then raising InvalidSlurmUser or InvalidSlurmAccount

Also populates global ASSOCIATIONS variable that can be reused.

Source code in hpcman/hpcman/queue/util.py
def check_valid_slurm_association(
    query: str, assoc_type: Literal["users", "accounts"] = "users", debug: bool = False
) -> bool:
    """Compares provided user or account to list of valid Slurm users or accounts

    Suggested use is checking for True/False then raising InvalidSlurmUser or InvalidSlurmAccount

    Also populates global ASSOCIATIONS variable that can be reused.
    """
    global ASSOCIATIONS
    if ASSOCIATIONS is None:
        ASSOCIATIONS = get_valid_slurm_associations()

    choices = list(getattr(ASSOCIATIONS, assoc_type))

    if query in choices:
        return True
    else:
        give_suggestions(query=query, choices=choices, suggestion_type=f"Slurm {assoc_type[:-1]}")
        return False

get_valid_slurm_associations

def get_valid_slurm_associations() -> SlurmAssociations

Loads associations via the adapter layer to get a list of valid users and accounts

Source code in hpcman/hpcman/queue/util.py
def get_valid_slurm_associations() -> SlurmAssociations:
    """Loads associations via the adapter layer to get a list of valid users and accounts"""
    adapter, _ = get_backend()
    assoc = adapter.load_associations()
    if not assoc.users and not assoc.accounts:
        raise SlurmDBError("Unable to load Slurm association data.")
    return assoc

get_allowed_accounts_per_partition

def get_allowed_accounts_per_partition() -> dict[str, set[str]]

Uses the adapter layer to get allowed accounts per partition

Source code in hpcman/hpcman/queue/util.py
def get_allowed_accounts_per_partition() -> dict[str, set[str]]:
    """Uses the adapter layer to get allowed accounts per partition"""
    global PARTITIONS
    load_partitions()
    if is_partitions_set(PARTITIONS):
        return {part.name: set(part.allowed_accounts) for part in PARTITIONS.values()}
    else:
        rprint("Unable to load partition data. Exiting.")
        exit(1)

get_priority_partitions

def get_priority_partitions(
    default_priority: int = 100
) -> set[str]

Compares partitions to default priority (100) to determine if any priority partitions are available.

Source code in hpcman/hpcman/queue/util.py
def get_priority_partitions(default_priority: int = 100) -> set[str]:
    """Compares partitions to default priority (100) to determine if any priority partitions are available."""
    global PARTITIONS
    load_partitions()
    if is_partitions_set(PARTITIONS):
        return {part.name for part in PARTITIONS.values() if part.priority_tier > default_priority}
    else:
        rprint("Unable to load partition data. Exiting.")
        exit(1)

load_partitions

def load_partitions() -> None

Loads partitions into global PARTITIONS variable if needed.

Source code in hpcman/hpcman/queue/util.py
def load_partitions() -> None:
    """Loads partitions into global PARTITIONS variable if needed."""
    global PARTITIONS
    if PARTITIONS is None:
        adapter, _ = get_backend()
        PARTITIONS = adapter.load_partitions()

check_valid_slurm_partition

def check_valid_slurm_partition(
    query: str
) -> bool

Compares provided partition to list of valid Slurm partitions.

Populates the global PARTITIONS variable.

Source code in hpcman/hpcman/queue/util.py
def check_valid_slurm_partition(query: str) -> bool:
    """Compares provided partition to list of valid Slurm partitions.

    Populates the global PARTITIONS variable.
    """
    global PARTITIONS
    load_partitions()

    if is_partitions_set(PARTITIONS):
        choices = PARTITIONS

    if query in choices:
        return True
    else:
        give_suggestions(query=query, choices=list(choices), suggestion_type=f"Slurm partition")
        return False

load_nodes

def load_nodes() -> None

Loads nodes into global NODES variable if needed.

Source code in hpcman/hpcman/queue/util.py
def load_nodes() -> None:
    """Loads nodes into global NODES variable if needed."""
    global NODES
    if NODES is None:
        adapter, _ = get_backend()
        NODES = adapter.load_nodes()

get_nodes_values

def get_nodes_values() -> Iterable[SlurmNode]
Source code in hpcman/hpcman/queue/util.py
def get_nodes_values() -> Iterable[SlurmNode]:
    global NODES
    load_nodes()
    if is_nodes_set(NODES):
        for node in NODES.values():
            yield node

check_valid_slurm_node

def check_valid_slurm_node(
    query: str
) -> bool

Compares provided node to list of valid Slurm nodes.

Populates the global NODES variable.

Source code in hpcman/hpcman/queue/util.py
def check_valid_slurm_node(query: str) -> bool:
    """Compares provided node to list of valid Slurm nodes.

    Populates the global NODES variable.
    """
    global NODES
    load_nodes()

    if is_nodes_set(NODES):
        choices = NODES

    if query in choices:
        return True
    else:
        give_suggestions(query=query, choices=list(choices), suggestion_type=f"Slurm node")
        return False

get_job_info

def get_job_info(
    job_id: int
) -> SlurmJobInfo | None

Get job info from Slurm using the adapter layer

Source code in hpcman/hpcman/queue/util.py
def get_job_info(job_id: int) -> SlurmJobInfo | None:
    """Get job info from Slurm using the adapter layer"""
    try:
        adapter, _ = get_backend()
        return adapter.get_job(job_id)
    except Exception as e:
        rprint(f"Unable to get job info for {job_id}: {e}")
        return None