hpcman.queue.info
Classes
Overload
Source code in hpcman/hpcman/queue/info.py
| @dataclass
class Overload:
overloaded: bool
message: str = ""
color: str = "bold red"
def __bool__(self):
return self.overloaded
|
Attributes
| Name |
Type |
Description |
overloaded |
bool |
|
message |
str |
|
color |
str |
|
SlurmInfo
Bases: BaseModel
Source code in hpcman/hpcman/queue/info.py
| class SlurmInfo(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
users: list[str] = []
accounts: list[str] = []
partitions: list[str] = []
nodes: list[str] = []
states: re.Pattern[str] = re.compile(r".", re.DOTALL)
me: bool = False
gpus: bool = False
warn: bool = False
# Set by method
slurm_account_info: set[str] = set()
@field_validator("users") # type: ignore
@classmethod
def check_users(cls, v: list[str]) -> list[str]:
if not any([check_valid_slurm_association(x) for x in v]):
raise InvalidSlurmUser(f"One or more provided users is invalid: {v}")
else:
return v
@field_validator("nodes") # type: ignore
@classmethod
def check_nodes(cls, v: list[str]) -> list[str]:
if not any([check_valid_slurm_node(x) for x in v]):
raise InvalidSlurmNode(f"One or more provided nodes is invalid: {v}")
else:
return v
@field_validator("accounts") # type: ignore
@classmethod
def check_accounts(cls, v: list[str]) -> list[str]:
if not any([check_valid_slurm_association(x, "accounts") for x in v]):
raise InvalidSlurmAccount(f"One or more provided accounts is invalid: {v}")
else:
return v
@field_validator("partitions") # type: ignore
@classmethod
def check_partitions(cls, v: list[str]) -> list[str]:
if not any([check_valid_slurm_partition(part) for part in v]):
raise InvalidSlurmPartition(f"One or more provided partitions is invalid: {v}")
else:
return v
def get_allowed_nodes(self) -> set[str]:
"""Generates a set of nodes that should be included in the final output."""
nodes_to_allow: set[str] = set(self.nodes)
load_partitions()
from .util import PARTITIONS, ASSOCIATIONS
if ASSOCIATIONS is not None:
for user in self.users:
for acct in ASSOCIATIONS.users[user].accounts:
self.slurm_account_info.add(acct)
for acct in self.accounts:
self.slurm_account_info.add(acct)
if is_partitions_set(PARTITIONS):
for part in self.partitions:
nodes_to_allow.update(nodelist_from_range_str(PARTITIONS[part].nodes))
else:
rprint("Error loading Slurm partitions.")
exit(1)
if self.slurm_account_info:
load_partitions()
if is_partitions_set(PARTITIONS):
for partition in PARTITIONS.values():
if not partition.allowed_accounts or not set(partition.allowed_accounts).isdisjoint(self.slurm_account_info):
nodes_to_allow.update(nodelist_from_range_str(partition.nodes))
else:
if not nodes_to_allow:
load_nodes()
from .util import NODES
if is_nodes_set(NODES):
nodes_to_allow = set(NODES.keys())
return nodes_to_allow
def generate_node_tree(self) -> Tree:
node_tree = Tree("Slurm Nodes")
priority_partitions = get_priority_partitions()
partition_map = get_allowed_accounts_per_partition()
filtered = _gather_filtered_nodes(self)
for node, node_jobs in filtered:
warnings = 0
node_text = Text()
node_text = node_text.append(node.name, style="cyan")
node_text.append(f" {node.state} ")
if node.reason:
node_text.append_text(Text(f"({node.reason}) ", style="red"))
node_text.append("in ")
part_text = Text()
for i, part in enumerate(node.partitions):
part_name = part
if i > 0:
part_text.append(",")
if part in priority_partitions:
part_name = f"{part_name}*"
style = Style(color="magenta", bold=True)
else:
style = Style(color="magenta")
if len(self.slurm_account_info) > 0 and partition_map[part].isdisjoint(self.slurm_account_info):
style = Style(color="grey89", strike=True, dim=True)
part_text.append(part_name, style=style)
node_text.append_text(part_text)
if overload := check_if_node_overloaded(node):
node_text.append(f" **{overload.message}**", style=overload.color)
warnings += 1
if overalloc := check_if_mem_overallocated(node):
mem_color = overload.color if overload else overalloc.color
node_text.append(f" **{overalloc.message}**", style=mem_color)
warnings += 1
if self.warn and not warnings:
continue
node_info = Tree(node_text)
node_info.add(Tree(f"OS: {node.operating_system}"))
node_info.add(Tree(get_node_cpu_text(node)))
node_info.add(Tree(get_node_mem_text(node)))
if node.configured_gres:
node_info.add(Tree(get_node_gpu_text(node)))
# Add job info
if node_jobs:
jobtree = Tree("Jobs")
for job in node_jobs:
jobtext = Text()
state = SlurmJobState(job.state)
try:
job_user = job.user_name
except KeyError:
# This happens ocassionally when user id lookup fails at the system level
job_user = f"uid={job.user_id}"
if self.users and job_user in self.users or job_user == getuser():
jobtext.append(job.user_name, "green bold")
else:
jobtext.append(job_user)
jobtext.append(f" {job.id}")
state_style = Style()
if state.is_fail:
state_style = Style(color="red", bold=True)
elif state.is_pending:
state_style = Style(color="yellow")
elif state.is_success:
state_style = Style(color="green")
jobtext.append(f" {job.state}", style=state_style)
try:
resources = job.get_resource_layout_per_node()[node.name]
except KeyError:
resources = {}
if not resources:
if job.state_reason and job.state_reason != "None":
jobtext.append(f"({job.state_reason})", style=state_style)
jobtext.append(f" {job.name} {job.partition} CPUs={job.cpus} Memory=")
try:
jobtext.append(f"{humanize(job.memory)}")
except AttributeError:
jobtext.append("None")
else:
jobtext.append(f" {job.name} {job.partition} CPUs={job.cpus} Memory=")
memory: int | None = resources.get("memory", None)
if memory is None:
jobtext.append("None")
else:
jobtext.append(f"{humanize(memory)}")
if "gres" in resources:
gres = resources["gres"]
for key in gres:
if key.startswith("gpu"):
gpus = gres[key]
try:
jobtext.append(f" GPUs={key.split(':', 1)[1]}({gpus['count']})")
except IndexError:
jobtext.append(f" GPUs={key}({gpus['count']})")
jobtree.add(Tree(jobtext))
node_info.add(jobtree)
else:
node_info.add(Tree("No Jobs"))
# Finished node
node_tree.add(node_info)
return node_tree
|
Fields
| Name |
Type |
Description |
users |
list[str] |
|
accounts |
list[str] |
|
partitions |
list[str] |
|
nodes |
list[str] |
|
states |
re.Pattern[str] |
|
me |
bool |
|
gpus |
bool |
|
warn |
bool |
|
slurm_account_info |
set[str] |
|
Methods
check_users
def check_users(
v: list[str]
) -> list[str]
Source code in hpcman/hpcman/queue/info.py
| @field_validator("users") # type: ignore
@classmethod
def check_users(cls, v: list[str]) -> list[str]:
if not any([check_valid_slurm_association(x) for x in v]):
raise InvalidSlurmUser(f"One or more provided users is invalid: {v}")
else:
return v
|
check_nodes
def check_nodes(
v: list[str]
) -> list[str]
Source code in hpcman/hpcman/queue/info.py
| @field_validator("nodes") # type: ignore
@classmethod
def check_nodes(cls, v: list[str]) -> list[str]:
if not any([check_valid_slurm_node(x) for x in v]):
raise InvalidSlurmNode(f"One or more provided nodes is invalid: {v}")
else:
return v
|
check_accounts
def check_accounts(
v: list[str]
) -> list[str]
Source code in hpcman/hpcman/queue/info.py
| @field_validator("accounts") # type: ignore
@classmethod
def check_accounts(cls, v: list[str]) -> list[str]:
if not any([check_valid_slurm_association(x, "accounts") for x in v]):
raise InvalidSlurmAccount(f"One or more provided accounts is invalid: {v}")
else:
return v
|
check_partitions
def check_partitions(
v: list[str]
) -> list[str]
Source code in hpcman/hpcman/queue/info.py
| @field_validator("partitions") # type: ignore
@classmethod
def check_partitions(cls, v: list[str]) -> list[str]:
if not any([check_valid_slurm_partition(part) for part in v]):
raise InvalidSlurmPartition(f"One or more provided partitions is invalid: {v}")
else:
return v
|
get_allowed_nodes
def get_allowed_nodes() -> set[str]
Generates a set of nodes that should be included in the final output.
Source code in hpcman/hpcman/queue/info.py
| def get_allowed_nodes(self) -> set[str]:
"""Generates a set of nodes that should be included in the final output."""
nodes_to_allow: set[str] = set(self.nodes)
load_partitions()
from .util import PARTITIONS, ASSOCIATIONS
if ASSOCIATIONS is not None:
for user in self.users:
for acct in ASSOCIATIONS.users[user].accounts:
self.slurm_account_info.add(acct)
for acct in self.accounts:
self.slurm_account_info.add(acct)
if is_partitions_set(PARTITIONS):
for part in self.partitions:
nodes_to_allow.update(nodelist_from_range_str(PARTITIONS[part].nodes))
else:
rprint("Error loading Slurm partitions.")
exit(1)
if self.slurm_account_info:
load_partitions()
if is_partitions_set(PARTITIONS):
for partition in PARTITIONS.values():
if not partition.allowed_accounts or not set(partition.allowed_accounts).isdisjoint(self.slurm_account_info):
nodes_to_allow.update(nodelist_from_range_str(partition.nodes))
else:
if not nodes_to_allow:
load_nodes()
from .util import NODES
if is_nodes_set(NODES):
nodes_to_allow = set(NODES.keys())
return nodes_to_allow
|
generate_node_tree
def generate_node_tree() -> Tree
Source code in hpcman/hpcman/queue/info.py
| def generate_node_tree(self) -> Tree:
node_tree = Tree("Slurm Nodes")
priority_partitions = get_priority_partitions()
partition_map = get_allowed_accounts_per_partition()
filtered = _gather_filtered_nodes(self)
for node, node_jobs in filtered:
warnings = 0
node_text = Text()
node_text = node_text.append(node.name, style="cyan")
node_text.append(f" {node.state} ")
if node.reason:
node_text.append_text(Text(f"({node.reason}) ", style="red"))
node_text.append("in ")
part_text = Text()
for i, part in enumerate(node.partitions):
part_name = part
if i > 0:
part_text.append(",")
if part in priority_partitions:
part_name = f"{part_name}*"
style = Style(color="magenta", bold=True)
else:
style = Style(color="magenta")
if len(self.slurm_account_info) > 0 and partition_map[part].isdisjoint(self.slurm_account_info):
style = Style(color="grey89", strike=True, dim=True)
part_text.append(part_name, style=style)
node_text.append_text(part_text)
if overload := check_if_node_overloaded(node):
node_text.append(f" **{overload.message}**", style=overload.color)
warnings += 1
if overalloc := check_if_mem_overallocated(node):
mem_color = overload.color if overload else overalloc.color
node_text.append(f" **{overalloc.message}**", style=mem_color)
warnings += 1
if self.warn and not warnings:
continue
node_info = Tree(node_text)
node_info.add(Tree(f"OS: {node.operating_system}"))
node_info.add(Tree(get_node_cpu_text(node)))
node_info.add(Tree(get_node_mem_text(node)))
if node.configured_gres:
node_info.add(Tree(get_node_gpu_text(node)))
# Add job info
if node_jobs:
jobtree = Tree("Jobs")
for job in node_jobs:
jobtext = Text()
state = SlurmJobState(job.state)
try:
job_user = job.user_name
except KeyError:
# This happens ocassionally when user id lookup fails at the system level
job_user = f"uid={job.user_id}"
if self.users and job_user in self.users or job_user == getuser():
jobtext.append(job.user_name, "green bold")
else:
jobtext.append(job_user)
jobtext.append(f" {job.id}")
state_style = Style()
if state.is_fail:
state_style = Style(color="red", bold=True)
elif state.is_pending:
state_style = Style(color="yellow")
elif state.is_success:
state_style = Style(color="green")
jobtext.append(f" {job.state}", style=state_style)
try:
resources = job.get_resource_layout_per_node()[node.name]
except KeyError:
resources = {}
if not resources:
if job.state_reason and job.state_reason != "None":
jobtext.append(f"({job.state_reason})", style=state_style)
jobtext.append(f" {job.name} {job.partition} CPUs={job.cpus} Memory=")
try:
jobtext.append(f"{humanize(job.memory)}")
except AttributeError:
jobtext.append("None")
else:
jobtext.append(f" {job.name} {job.partition} CPUs={job.cpus} Memory=")
memory: int | None = resources.get("memory", None)
if memory is None:
jobtext.append("None")
else:
jobtext.append(f"{humanize(memory)}")
if "gres" in resources:
gres = resources["gres"]
for key in gres:
if key.startswith("gpu"):
gpus = gres[key]
try:
jobtext.append(f" GPUs={key.split(':', 1)[1]}({gpus['count']})")
except IndexError:
jobtext.append(f" GPUs={key}({gpus['count']})")
jobtree.add(Tree(jobtext))
node_info.add(jobtree)
else:
node_info.add(Tree("No Jobs"))
# Finished node
node_tree.add(node_info)
return node_tree
|
Functions
get_jobs_by_node
def get_jobs_by_node() -> dict[str, list[SlurmJobInfo]]
Loads jobs into a dict per node allocation. Ignores non-allocated jobs.
Source code in hpcman/hpcman/queue/info.py
| def get_jobs_by_node() -> dict[str, list[SlurmJobInfo]]:
"""Loads jobs into a dict per node allocation. Ignores non-allocated jobs."""
adapter, _ = get_backend()
all_jobs = adapter.load_jobs()
job_dict: dict[str, list[SlurmJobInfo]] = {}
for job in all_jobs.values():
if job.allocated_nodes is not None:
for node in nodelist_from_range_str(job.allocated_nodes):
job_dict.setdefault(node, []).append(job)
elif job.required_nodes is not None:
job_dict.setdefault(job.required_nodes, []).append(job)
return job_dict
|
get_style_from_range
def get_style_from_range(
value: int
) -> Style
Provides a set Style color for a range of input values between 0-100.
Closer to 100 is green while closer to 0 is red.
Source code in hpcman/hpcman/queue/info.py
| def get_style_from_range(value: int) -> Style:
"""Provides a set Style color for a range of input values between 0-100.
Closer to 100 is green while closer to 0 is red.
"""
if value in range(70, 100):
return Style(color="green")
elif value in range(40, 70):
return Style(color="yellow")
elif value in range(10, 40):
return Style(color="bright_red")
elif value < 9:
return Style(color="red")
else:
return Style(color="white")
|
check_if_node_overloaded
def check_if_node_overloaded(
node: SlurmNode
) -> Overload
Compares CPU load to use to determine utilization/overallocation.
Source code in hpcman/hpcman/queue/info.py
| def check_if_node_overloaded(node: SlurmNode) -> Overload:
"""Compares CPU load to use to determine utilization/overallocation."""
if node.total_cpus == 0:
return Overload(False)
if (ratio := node.cpu_load / node.total_cpus * 1.10) > 1:
return Overload(True, f"Load > {ratio:.2f}X total cpu")
elif (ratio := node.cpu_load / max(node.allocated_cpus, 1)) > 1.5:
return Overload(True, f"Load > {ratio:.2f}X allocated cpu")
elif (ratio := max(node.allocated_cpus, 1) / max(node.cpu_load, 1)) > 2:
return Overload(True, f"Under-utilized CPU allocation > {ratio:.2f}X CPU Load", "bold yellow")
else:
return Overload(False)
|
check_if_mem_overallocated
def check_if_mem_overallocated(
node: SlurmNode
) -> Overload
Compares memory allocation to use to determine utilization/overallocation.
Source code in hpcman/hpcman/queue/info.py
| def check_if_mem_overallocated(node: SlurmNode) -> Overload:
"""Compares memory allocation to use to determine utilization/overallocation."""
if None in (node.allocated_memory, node.free_memory, node.real_memory):
return Overload(False)
elif (ratio := node.allocated_memory / max(node.real_memory - node.free_memory, 1)) > 2:
return Overload(True, f"Under-utilized memory allocation > {ratio:.2f}X Mem Used", "bold yellow")
elif (avail := node.real_memory - node.allocated_memory) < 8000:
return Overload(True, f"Low Mem Available: {humanize(avail)}")
else:
return Overload(False)
|
get_node_cpu_text
def get_node_cpu_text(
node: SlurmNode
) -> Text
Formatting node cpu information as a rich.Text object.
Source code in hpcman/hpcman/queue/info.py
| def get_node_cpu_text(node: SlurmNode) -> Text:
"""Formatting node cpu information as a rich.Text object."""
if node.threads_per_core > 1:
cpu_arch = f"{node.architecture} SMT CPUs"
else:
cpu_arch = f"{node.architecture} CPUs"
cpu_text = Text(f"{cpu_arch}: Physical={node.cores_per_socket * node.sockets} Total={node.total_cpus}")
cpu_text.append(" Free=")
cpu_style = get_style_from_range(int(node.idle_cpus / node.total_cpus * 100))
cpu_text.append_text(Text(f"{node.idle_cpus}", style=cpu_style))
cpu_text.append(" Allocated=")
cpu_text.append_text(Text(f"{node.allocated_cpus}", style=cpu_style))
cpu_text.append(" Load=")
if check_if_node_overloaded(node):
cpu_text.append(f"{node.cpu_load}", style="red")
else:
cpu_text.append(f"{node.cpu_load}")
return cpu_text
|
get_node_mem_text
def get_node_mem_text(
node: SlurmNode
) -> Text
Formatting node mem information as a rich.Text object.
Source code in hpcman/hpcman/queue/info.py
| def get_node_mem_text(node: SlurmNode) -> Text:
"""Formatting node mem information as a rich.Text object."""
mem_text = Text(f"Memory: Total={humanize(node.real_memory)}")
available_mem: int = node.real_memory - node.allocated_memory
mem_style = get_style_from_range(int(available_mem / node.real_memory * 100))
mem_text.append(" Available=")
mem_text.append_text(Text(f"{humanize(available_mem)}", style=mem_style))
mem_text.append(" Allocated=")
mem_text.append_text(Text(f"{humanize(node.allocated_memory)}", style=mem_style))
mem_text.append(f" Free={humanize(node.free_memory)}")
return mem_text
|
get_node_gpu_text
def get_node_gpu_text(
node: SlurmNode
) -> Text
Formatting node gpu information as a rich.Text object.
Source code in hpcman/hpcman/queue/info.py
| def get_node_gpu_text(node: SlurmNode) -> Text:
"""Formatting node gpu information as a rich.Text object."""
gpu_text = Text(f"GPUs: ")
for gres in node.configured_gres:
if gres.startswith("gpu"):
try:
gpu_text.append(f"Type={gres.split(':', 1)[1]} Total={node.configured_gres[gres]}")
except IndexError:
gpu_text.append(f"{node.configured_gres[gres]} Total=NA")
for gres in node.allocated_gres:
if gres.startswith("gpu"):
try:
gresname = gres.split(":", 1)[1]
except IndexError:
gresname = gres
try:
gpu_text.append(f" Allocated={node.allocated_gres[gresname]}")
except KeyError:
pass
return gpu_text
|
print_node_tree
def print_node_tree(
kwargs = {}
) -> None
Generates and prints node tree.
Source code in hpcman/hpcman/queue/info.py
| def print_node_tree(**kwargs) -> None:
"""Generates and prints node tree."""
json_output = kwargs.pop("json_output", False)
console = Console()
try:
info = SlurmInfo(**{k: v for k, v in kwargs.items() if v is not None})
except ValidationError as e:
handle_validation_errors(e, print_error=False)
if json_output:
_print_info_json(info)
return
console.print(info.generate_node_tree())
if kwargs["debug"]:
console.print(info)
|