hpcman.queue.util
Classes
SlurmJob
Source code in hpcman/hpcman/queue/util.py
| @dataclass
class SlurmJob:
# Parameters
job_id: int
cluster: str
# Other Parameters
admin_comment: str
comment: str
wckey: str
derived_exit_code: int
extra: str
# Attributes
steps: Any
stats: Any
account: str
num_nodes: int
array_id: int
array_tasks_parallel: int
array_task_id: int
array_tasks_waiting: str
association_id: int
block_id: str
cluster: str
constraints: str
container: str
db_index: int
derived_exit_code: int
derived_exit_code_signal: int
comment: str
elapsed_time: int
eligible_time: int
end_time: int
extra: str
exit_code: int
exit_code_signal: int
failed_node: str
group_id: int
group_name: str
id: int
name: str
mcs_label: str
nodelist: str
partition: str
priority: int
qos: str
cpus: int
memory: int
reservation: str
script: str
start_time: int
state: str
state_reason: str
cancelled_by: str
submit_time: int
submit_command: str
suspended_time: int
system_comment: str
time_limit: int
user_id: int
user_name: str
wckey: str
working_directory: str
|
Attributes
| Name |
Type |
Description |
job_id |
int |
|
cluster |
str |
|
admin_comment |
str |
|
comment |
str |
|
wckey |
str |
|
derived_exit_code |
int |
|
extra |
str |
|
steps |
Any |
|
stats |
Any |
|
account |
str |
|
num_nodes |
int |
|
array_id |
int |
|
array_tasks_parallel |
int |
|
array_task_id |
int |
|
array_tasks_waiting |
str |
|
association_id |
int |
|
block_id |
str |
|
constraints |
str |
|
container |
str |
|
db_index |
int |
|
derived_exit_code_signal |
int |
|
elapsed_time |
int |
|
eligible_time |
int |
|
end_time |
int |
|
exit_code |
int |
|
exit_code_signal |
int |
|
failed_node |
str |
|
group_id |
int |
|
group_name |
str |
|
id |
int |
|
name |
str |
|
mcs_label |
str |
|
nodelist |
str |
|
partition |
str |
|
priority |
int |
|
qos |
str |
|
cpus |
int |
|
memory |
int |
|
reservation |
str |
|
script |
str |
|
start_time |
int |
|
state |
str |
|
state_reason |
str |
|
cancelled_by |
str |
|
submit_time |
int |
|
submit_command |
str |
|
suspended_time |
int |
|
system_comment |
str |
|
time_limit |
int |
|
user_id |
int |
|
user_name |
str |
|
working_directory |
str |
|
ActiveSlurmJob
Source code in hpcman/hpcman/queue/util.py
| @dataclass
class ActiveSlurmJob:
steps: Any
stats: Any
pids: dict[str, list]
name: str
id: int
association_id: int
account: str
user_id: int
user_name: str
group_id: int
group_name: str
priority: int
nice: int
qos: str
min_cpus_per_node: int
state: str
state_reason: str
is_requeueable: bool
requeue_count: int
is_batch_job: bool
node_reboot_required: bool
dependencies: dict
time_limit: int
time_limit_min: int
submit_time: int
eligible_time: int
accrue_time: int
start_time: int
resize_time: int
deadline: int
preempt_eligible_time: int
preempt_time: int
suspend_time: int
last_sched_evaluation_time: int
pre_suspension_time: int
mcs_label: str
partition: str
submit_host: str
batch_host: str
num_nodes: int
max_nodes: int
allocated_nodes: str
required_nodes: str
excluded_nodes: str
scheduled_nodes: str
derived_exit_code: int
derived_exit_code_signal: int
exit_code: int
exit_code_signal: int
batch_constraints: list
federation_origin: str
federation_siblings_active: int
federation_siblings_viable: int
cpus: int
cpus_per_task: int
cpus_per_gpu: int
boards_per_node: int
sockets_per_board: int
sockets_per_node: int
cores_per_socket: int
threads_per_core: int
ntasks: int
ntasks_per_node: int
ntasks_per_board: int
ntasks_per_socket: int
ntasks_per_core: int
ntasks_per_gpu: int
delay_boot_time: int
constraints: list
cluster: str
cluster_constraints: list
reservation: str
resource_sharing: str
requires_contiguous_nodes: bool
licenses: list
network: str
command: str
working_directory: str
admin_comment: str
system_comment: str
container: str
comment: str
standard_input: str
standard_output: str
standard_error: str
required_switches: int
max_wait_time_switches: int
burst_buffer: str
burst_buffer_state: str
cpu_frequency_min: Union[str, int]
cpu_frequency_max: Union[str, int]
cpu_frequency_governor: Union[str, int]
billable_tres: float
wckey: str
mail_user: list
mail_types: list
heterogeneous_id: int
heterogeneous_offset: int
temporary_disk_per_node: int
array_id: int
array_tasks_parallel: int
array_task_id: int
array_tasks_waiting: str
end_time: int
run_time: int
cores_reserved_for_system: int
threads_reserved_for_system: int
memory: int
memory_per_cpu: int
memory_per_node: int
memory_per_gpu: int
gres_per_node: dict
profile_types: list
gres_binding: str
kill_on_invalid_dependency: bool
spreads_over_nodes: bool
power_options: list
is_cronjob: bool
cronjob_time: str
elapsed_cpu_time: int
run_time_remaining: int
|
Attributes
| Name |
Type |
Description |
steps |
Any |
|
stats |
Any |
|
pids |
dict[str, list] |
|
name |
str |
|
id |
int |
|
association_id |
int |
|
account |
str |
|
user_id |
int |
|
user_name |
str |
|
group_id |
int |
|
group_name |
str |
|
priority |
int |
|
nice |
int |
|
qos |
str |
|
min_cpus_per_node |
int |
|
state |
str |
|
state_reason |
str |
|
is_requeueable |
bool |
|
requeue_count |
int |
|
is_batch_job |
bool |
|
node_reboot_required |
bool |
|
dependencies |
dict |
|
time_limit |
int |
|
time_limit_min |
int |
|
submit_time |
int |
|
eligible_time |
int |
|
accrue_time |
int |
|
start_time |
int |
|
resize_time |
int |
|
deadline |
int |
|
preempt_eligible_time |
int |
|
preempt_time |
int |
|
suspend_time |
int |
|
last_sched_evaluation_time |
int |
|
pre_suspension_time |
int |
|
mcs_label |
str |
|
partition |
str |
|
submit_host |
str |
|
batch_host |
str |
|
num_nodes |
int |
|
max_nodes |
int |
|
allocated_nodes |
str |
|
required_nodes |
str |
|
excluded_nodes |
str |
|
scheduled_nodes |
str |
|
derived_exit_code |
int |
|
derived_exit_code_signal |
int |
|
exit_code |
int |
|
exit_code_signal |
int |
|
batch_constraints |
list |
|
federation_origin |
str |
|
federation_siblings_active |
int |
|
federation_siblings_viable |
int |
|
cpus |
int |
|
cpus_per_task |
int |
|
cpus_per_gpu |
int |
|
boards_per_node |
int |
|
sockets_per_board |
int |
|
sockets_per_node |
int |
|
cores_per_socket |
int |
|
threads_per_core |
int |
|
ntasks |
int |
|
ntasks_per_node |
int |
|
ntasks_per_board |
int |
|
ntasks_per_socket |
int |
|
ntasks_per_core |
int |
|
ntasks_per_gpu |
int |
|
delay_boot_time |
int |
|
constraints |
list |
|
cluster |
str |
|
cluster_constraints |
list |
|
reservation |
str |
|
resource_sharing |
str |
|
requires_contiguous_nodes |
bool |
|
licenses |
list |
|
network |
str |
|
command |
str |
|
working_directory |
str |
|
admin_comment |
str |
|
system_comment |
str |
|
container |
str |
|
comment |
str |
|
standard_input |
str |
|
standard_output |
str |
|
standard_error |
str |
|
required_switches |
int |
|
max_wait_time_switches |
int |
|
burst_buffer |
str |
|
burst_buffer_state |
str |
|
cpu_frequency_min |
Union[str, int] |
|
cpu_frequency_max |
Union[str, int] |
|
cpu_frequency_governor |
Union[str, int] |
|
billable_tres |
float |
|
wckey |
str |
|
mail_user |
list |
|
mail_types |
list |
|
heterogeneous_id |
int |
|
heterogeneous_offset |
int |
|
temporary_disk_per_node |
int |
|
array_id |
int |
|
array_tasks_parallel |
int |
|
array_task_id |
int |
|
array_tasks_waiting |
str |
|
end_time |
int |
|
run_time |
int |
|
cores_reserved_for_system |
int |
|
threads_reserved_for_system |
int |
|
memory |
int |
|
memory_per_cpu |
int |
|
memory_per_node |
int |
|
memory_per_gpu |
int |
|
gres_per_node |
dict |
|
profile_types |
list |
|
gres_binding |
str |
|
kill_on_invalid_dependency |
bool |
|
spreads_over_nodes |
bool |
|
power_options |
list |
|
is_cronjob |
bool |
|
cronjob_time |
str |
|
elapsed_cpu_time |
int |
|
run_time_remaining |
int |
|
Functions
is_partitions_set
def is_partitions_set(
value: dict[str, SlurmPartition] | None
) -> TypeGuard[dict[str, SlurmPartition]]
Sets partition type for type checker
Source code in hpcman/hpcman/queue/util.py
| def is_partitions_set(value: dict[str, SlurmPartition] | None) -> TypeGuard[dict[str, SlurmPartition]]:
"""Sets partition type for type checker"""
return value is not None
|
is_nodes_set
def is_nodes_set(
value: dict[str, SlurmNode] | None
) -> TypeGuard[dict[str, SlurmNode]]
Sets node type for type checker
Source code in hpcman/hpcman/queue/util.py
| def is_nodes_set(value: dict[str, SlurmNode] | None) -> TypeGuard[dict[str, SlurmNode]]:
"""Sets node type for type checker"""
return value is not None
|
give_suggestions
def give_suggestions(
query: str,
choices: list[str],
suggestion_type: str,
cutoff_min: int = 50,
cutoff_suggest: int = 85
) -> None
Provide suggestions from list using rapidfuzz
Source code in hpcman/hpcman/queue/util.py
| def give_suggestions(
query: str, choices: list[str], suggestion_type: str, cutoff_min: int = 50, cutoff_suggest: int = 85
) -> None:
"""Provide suggestions from list using rapidfuzz"""
import rapidfuzz as rf
suggestions = rf.process.extract(query, choices, scorer=rf.fuzz.QRatio, score_cutoff=cutoff_min)
if len(suggestions) == 0:
rprint(f"Provided {suggestion_type} '{query}' is invalid. Choose from this list and try again: {choices}")
elif (suggest := suggestions[0])[1] > cutoff_suggest:
rprint(f"Provided {suggestion_type} '{query}' is invalid. Did you mean '{suggest[0]}'?")
else:
suggests = [item[0] for item in suggestions[:5]]
if len(suggests) == 1:
suggests = choices
rprint(f"Provided {suggestion_type} '{query}' is invalid. Choose from this list and try again: {suggests}")
|
check_if_valid_host
def check_if_valid_host(
queuetype: QueueType,
debug: bool,
hosttype: HostType = HostType.SUBMIT
) -> bool
Checks if the current host is a submit host.
Uses the qconf program to determine if the current host is a submit host.
Need to implement a similar feature for SLURM.
Parameters:
| Name |
Type |
Description |
queuetype |
QueueType |
What queue type to check. |
debug |
bool |
Used in development settings. SubmitHosts can be added in the enum.py file for debugging. |
Returns:
| Type |
Description |
bool |
True for success or False for failure |
Source code in hpcman/hpcman/queue/util.py
| def check_if_valid_host(queuetype: QueueType, debug: bool, hosttype: HostType = HostType.SUBMIT) -> bool:
"""Checks if the current host is a submit host.
Uses the `qconf` program to determine if the current host is a submit host.
Need to implement a similar feature for SLURM.
Args:
queuetype: What queue type to check.
debug: Used in development settings. SubmitHosts can be added in the `enum.py` file for debugging.
Raises:
CalledProcessError: When there is a problem with the `qconf` program.
NotASubmitHost: When the current host is not a submit host.
NotImplementedError: When the queuetype is not supported.
Returns:
True for success or False for failure
"""
hostname = socket.getfqdn()
if debug:
load_dotenv(override=True)
submit_hosts: list[str] = [str(environ.get("SUBMIT_HOST"))]
else:
if queuetype is QueueType.SGE:
opt = "-ss"
if hosttype is HostType.ADMIN:
opt = "-sh"
try:
submit_hosts = subprocess.run(["qconf", opt], capture_output=True, text=True).stdout.split()
except subprocess.CalledProcessError as e:
print(f"There was a problem checking the {hosttype.value} host: {e.stderr}")
raise e
if hostname not in submit_hosts and hostname not in [el.value for el in SLURMHosts]:
if hosttype is HostType.SUBMIT:
raise NotASubmitHost(f"{hostname} is not a submit host. Try again on shell.cqls.oregonstate.edu.")
elif hosttype is HostType.ADMIN:
print(f"Unable to check SGE queues from {hostname}. Try on shell-hpc.cqls.oregonstate.edu")
return False
else:
raise NotImplementedError(f"Cannot find hosts for {hosttype.value}")
elif queuetype is QueueType.SLURM:
try:
_ = subprocess.run(["sinfo"], text=True, check=True, capture_output=True)
except subprocess.CalledProcessError as e:
raise NotASubmitHost(
f"{hostname} is not a submit host. Try again on shell-hpc.cqls.oregonstate.edu."
) from e
except FileNotFoundError as e:
raise SinfoMissingError(
f"sinfo does not appear to be in your $PATH. Check your $PATH variable and try again."
)
else:
raise NotImplementedError(f"Cannot find submit host for {queuetype}")
return True
|
check_valid_slurm_association
def check_valid_slurm_association(
query: str,
assoc_type: Literal['users', 'accounts'] = 'users',
debug: bool = False
) -> bool
Compares provided user or account to list of valid Slurm users or accounts
Suggested use is checking for True/False then raising InvalidSlurmUser or InvalidSlurmAccount
Also populates global ASSOCIATIONS variable that can be reused.
Source code in hpcman/hpcman/queue/util.py
| def check_valid_slurm_association(
query: str, assoc_type: Literal["users", "accounts"] = "users", debug: bool = False
) -> bool:
"""Compares provided user or account to list of valid Slurm users or accounts
Suggested use is checking for True/False then raising InvalidSlurmUser or InvalidSlurmAccount
Also populates global ASSOCIATIONS variable that can be reused.
"""
global ASSOCIATIONS
if ASSOCIATIONS is None:
ASSOCIATIONS = get_valid_slurm_associations()
choices = list(getattr(ASSOCIATIONS, assoc_type))
if query in choices:
return True
else:
give_suggestions(query=query, choices=choices, suggestion_type=f"Slurm {assoc_type[:-1]}")
return False
|
get_valid_slurm_associations
def get_valid_slurm_associations() -> SlurmAssociations
Loads associations via the adapter layer to get a list of valid users and accounts
Source code in hpcman/hpcman/queue/util.py
| def get_valid_slurm_associations() -> SlurmAssociations:
"""Loads associations via the adapter layer to get a list of valid users and accounts"""
adapter, _ = get_backend()
assoc = adapter.load_associations()
if not assoc.users and not assoc.accounts:
raise SlurmDBError("Unable to load Slurm association data.")
return assoc
|
get_allowed_accounts_per_partition
def get_allowed_accounts_per_partition() -> dict[str, set[str]]
Uses the adapter layer to get allowed accounts per partition
Source code in hpcman/hpcman/queue/util.py
| def get_allowed_accounts_per_partition() -> dict[str, set[str]]:
"""Uses the adapter layer to get allowed accounts per partition"""
global PARTITIONS
load_partitions()
if is_partitions_set(PARTITIONS):
return {part.name: set(part.allowed_accounts) for part in PARTITIONS.values()}
else:
rprint("Unable to load partition data. Exiting.")
exit(1)
|
get_priority_partitions
def get_priority_partitions(
default_priority: int = 100
) -> set[str]
Compares partitions to default priority (100) to determine if any priority partitions are available.
Source code in hpcman/hpcman/queue/util.py
| def get_priority_partitions(default_priority: int = 100) -> set[str]:
"""Compares partitions to default priority (100) to determine if any priority partitions are available."""
global PARTITIONS
load_partitions()
if is_partitions_set(PARTITIONS):
return {part.name for part in PARTITIONS.values() if part.priority_tier > default_priority}
else:
rprint("Unable to load partition data. Exiting.")
exit(1)
|
load_partitions
def load_partitions() -> None
Loads partitions into global PARTITIONS variable if needed.
Source code in hpcman/hpcman/queue/util.py
| def load_partitions() -> None:
"""Loads partitions into global PARTITIONS variable if needed."""
global PARTITIONS
if PARTITIONS is None:
adapter, _ = get_backend()
PARTITIONS = adapter.load_partitions()
|
check_valid_slurm_partition
def check_valid_slurm_partition(
query: str
) -> bool
Compares provided partition to list of valid Slurm partitions.
Populates the global PARTITIONS variable.
Source code in hpcman/hpcman/queue/util.py
| def check_valid_slurm_partition(query: str) -> bool:
"""Compares provided partition to list of valid Slurm partitions.
Populates the global PARTITIONS variable.
"""
global PARTITIONS
load_partitions()
if is_partitions_set(PARTITIONS):
choices = PARTITIONS
if query in choices:
return True
else:
give_suggestions(query=query, choices=list(choices), suggestion_type=f"Slurm partition")
return False
|
load_nodes
Loads nodes into global NODES variable if needed.
Source code in hpcman/hpcman/queue/util.py
| def load_nodes() -> None:
"""Loads nodes into global NODES variable if needed."""
global NODES
if NODES is None:
adapter, _ = get_backend()
NODES = adapter.load_nodes()
|
get_nodes_values
def get_nodes_values() -> Iterable[SlurmNode]
Source code in hpcman/hpcman/queue/util.py
| def get_nodes_values() -> Iterable[SlurmNode]:
global NODES
load_nodes()
if is_nodes_set(NODES):
for node in NODES.values():
yield node
|
check_valid_slurm_node
def check_valid_slurm_node(
query: str
) -> bool
Compares provided node to list of valid Slurm nodes.
Populates the global NODES variable.
Source code in hpcman/hpcman/queue/util.py
| def check_valid_slurm_node(query: str) -> bool:
"""Compares provided node to list of valid Slurm nodes.
Populates the global NODES variable.
"""
global NODES
load_nodes()
if is_nodes_set(NODES):
choices = NODES
if query in choices:
return True
else:
give_suggestions(query=query, choices=list(choices), suggestion_type=f"Slurm node")
return False
|
get_job_info
def get_job_info(
job_id: int
) -> SlurmJobInfo | None
Get job info from Slurm using the adapter layer
Source code in hpcman/hpcman/queue/util.py
| def get_job_info(job_id: int) -> SlurmJobInfo | None:
"""Get job info from Slurm using the adapter layer"""
try:
adapter, _ = get_backend()
return adapter.get_job(job_id)
except Exception as e:
rprint(f"Unable to get job info for {job_id}: {e}")
return None
|