Skip to content

hpcman.queue.info

Classes

Overload

Source code in hpcman/hpcman/queue/info.py
@dataclass
class Overload:
    overloaded: bool
    message: str = ""
    color: str = "bold red"

    def __bool__(self):
        return self.overloaded

Attributes

Name Type Description
overloaded bool
message str
color str

SlurmInfo

Bases: BaseModel

Source code in hpcman/hpcman/queue/info.py
class SlurmInfo(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)
    users: list[str] = []
    accounts: list[str] = []
    partitions: list[str] = []
    nodes: list[str] = []
    states: re.Pattern[str] = re.compile(r".", re.DOTALL)
    me: bool = False
    gpus: bool = False
    warn: bool = False

    # Set by method
    slurm_account_info: set[str] = set()

    @field_validator("users")  # type: ignore
    @classmethod
    def check_users(cls, v: list[str]) -> list[str]:
        if not any([check_valid_slurm_association(x) for x in v]):
            raise InvalidSlurmUser(f"One or more provided users is invalid: {v}")
        else:
            return v

    @field_validator("nodes")  # type: ignore
    @classmethod
    def check_nodes(cls, v: list[str]) -> list[str]:
        if not any([check_valid_slurm_node(x) for x in v]):
            raise InvalidSlurmNode(f"One or more provided nodes is invalid: {v}")
        else:
            return v

    @field_validator("accounts")  # type: ignore
    @classmethod
    def check_accounts(cls, v: list[str]) -> list[str]:
        if not any([check_valid_slurm_association(x, "accounts") for x in v]):
            raise InvalidSlurmAccount(f"One or more provided accounts is invalid: {v}")
        else:
            return v

    @field_validator("partitions")  # type: ignore
    @classmethod
    def check_partitions(cls, v: list[str]) -> list[str]:
        if not any([check_valid_slurm_partition(part) for part in v]):
            raise InvalidSlurmPartition(f"One or more provided partitions is invalid: {v}")
        else:
            return v

    def get_allowed_nodes(self) -> set[str]:
        """Generates a set of nodes that should be included in the final output."""
        nodes_to_allow: set[str] = set(self.nodes)
        load_partitions()
        from .util import PARTITIONS, ASSOCIATIONS

        if ASSOCIATIONS is not None:
            for user in self.users:
                for acct in ASSOCIATIONS.users[user].accounts:
                    self.slurm_account_info.add(acct)

        for acct in self.accounts:
            self.slurm_account_info.add(acct)

        if is_partitions_set(PARTITIONS):
            for part in self.partitions:
                nodes_to_allow.update(nodelist_from_range_str(PARTITIONS[part].nodes))
        else:
            rprint("Error loading Slurm partitions.")
            exit(1)

        if self.slurm_account_info:
            load_partitions()
            if is_partitions_set(PARTITIONS):
                for partition in PARTITIONS.values():
                    if not partition.allowed_accounts or not set(partition.allowed_accounts).isdisjoint(self.slurm_account_info):
                        nodes_to_allow.update(nodelist_from_range_str(partition.nodes))
        else:
            if not nodes_to_allow:
                load_nodes()
                from .util import NODES

                if is_nodes_set(NODES):
                    nodes_to_allow = set(NODES.keys())

        return nodes_to_allow

    def generate_node_tree(self) -> Tree:
        node_tree = Tree("Slurm Nodes")

        priority_partitions = get_priority_partitions()
        partition_map = get_allowed_accounts_per_partition()
        filtered = _gather_filtered_nodes(self)
        for node, node_jobs in filtered:
            warnings = 0
            node_text = Text()
            node_text = node_text.append(node.name, style="cyan")
            node_text.append(f" {node.state} ")
            if node.reason:
                node_text.append_text(Text(f"({node.reason}) ", style="red"))
            node_text.append("in ")
            part_text = Text()
            for i, part in enumerate(node.partitions):
                part_name = part
                if i > 0:
                    part_text.append(",")
                if part in priority_partitions:
                    part_name = f"{part_name}*"
                    style = Style(color="magenta", bold=True)
                else:
                    style = Style(color="magenta")
                if len(self.slurm_account_info) > 0 and partition_map[part].isdisjoint(self.slurm_account_info):
                    style = Style(color="grey89", strike=True, dim=True)
                part_text.append(part_name, style=style)
            node_text.append_text(part_text)
            if overload := check_if_node_overloaded(node):
                node_text.append(f" **{overload.message}**", style=overload.color)
                warnings += 1
            if overalloc := check_if_mem_overallocated(node):
                mem_color = overload.color if overload else overalloc.color
                node_text.append(f" **{overalloc.message}**", style=mem_color)
                warnings += 1
            if self.warn and not warnings:
                continue
            node_info = Tree(node_text)
            node_info.add(Tree(f"OS: {node.operating_system}"))
            node_info.add(Tree(get_node_cpu_text(node)))
            node_info.add(Tree(get_node_mem_text(node)))
            if node.configured_gres:
                node_info.add(Tree(get_node_gpu_text(node)))

            # Add job info
            if node_jobs:
                jobtree = Tree("Jobs")
                for job in node_jobs:
                    jobtext = Text()
                    state = SlurmJobState(job.state)
                    try:
                        job_user = job.user_name
                    except KeyError:
                        # This happens ocassionally when user id lookup fails at the system level
                        job_user = f"uid={job.user_id}"
                    if self.users and job_user in self.users or job_user == getuser():
                        jobtext.append(job.user_name, "green bold")
                    else:
                        jobtext.append(job_user)
                    jobtext.append(f" {job.id}")
                    state_style = Style()
                    if state.is_fail:
                        state_style = Style(color="red", bold=True)
                    elif state.is_pending:
                        state_style = Style(color="yellow")
                    elif state.is_success:
                        state_style = Style(color="green")

                    jobtext.append(f" {job.state}", style=state_style)
                    try:
                        resources = job.get_resource_layout_per_node()[node.name]
                    except KeyError:
                        resources = {}
                    if not resources:
                        if job.state_reason and job.state_reason != "None":
                            jobtext.append(f"({job.state_reason})", style=state_style)
                        jobtext.append(f" {job.name} {job.partition} CPUs={job.cpus} Memory=")
                        try:
                            jobtext.append(f"{humanize(job.memory)}")
                        except AttributeError:
                            jobtext.append("None")
                    else:
                        jobtext.append(f" {job.name} {job.partition} CPUs={job.cpus} Memory=")
                        memory: int | None = resources.get("memory", None)
                        if memory is None:
                            jobtext.append("None")
                        else:
                            jobtext.append(f"{humanize(memory)}")
                        if "gres" in resources:
                            gres = resources["gres"]
                            for key in gres:
                                if key.startswith("gpu"):
                                    gpus = gres[key]
                                    try:
                                        jobtext.append(f" GPUs={key.split(':', 1)[1]}({gpus['count']})")
                                    except IndexError:
                                        jobtext.append(f" GPUs={key}({gpus['count']})")
                    jobtree.add(Tree(jobtext))
                node_info.add(jobtree)
            else:
                node_info.add(Tree("No Jobs"))

            # Finished node
            node_tree.add(node_info)

        return node_tree

Fields

Name Type Description
users list[str]
accounts list[str]
partitions list[str]
nodes list[str]
states re.Pattern[str]
me bool
gpus bool
warn bool
slurm_account_info set[str]

Methods

check_users

def check_users(
    v: list[str]
) -> list[str]
Source code in hpcman/hpcman/queue/info.py
    @field_validator("users")  # type: ignore
    @classmethod
    def check_users(cls, v: list[str]) -> list[str]:
        if not any([check_valid_slurm_association(x) for x in v]):
            raise InvalidSlurmUser(f"One or more provided users is invalid: {v}")
        else:
            return v

check_nodes

def check_nodes(
    v: list[str]
) -> list[str]
Source code in hpcman/hpcman/queue/info.py
    @field_validator("nodes")  # type: ignore
    @classmethod
    def check_nodes(cls, v: list[str]) -> list[str]:
        if not any([check_valid_slurm_node(x) for x in v]):
            raise InvalidSlurmNode(f"One or more provided nodes is invalid: {v}")
        else:
            return v

check_accounts

def check_accounts(
    v: list[str]
) -> list[str]
Source code in hpcman/hpcman/queue/info.py
    @field_validator("accounts")  # type: ignore
    @classmethod
    def check_accounts(cls, v: list[str]) -> list[str]:
        if not any([check_valid_slurm_association(x, "accounts") for x in v]):
            raise InvalidSlurmAccount(f"One or more provided accounts is invalid: {v}")
        else:
            return v

check_partitions

def check_partitions(
    v: list[str]
) -> list[str]
Source code in hpcman/hpcman/queue/info.py
    @field_validator("partitions")  # type: ignore
    @classmethod
    def check_partitions(cls, v: list[str]) -> list[str]:
        if not any([check_valid_slurm_partition(part) for part in v]):
            raise InvalidSlurmPartition(f"One or more provided partitions is invalid: {v}")
        else:
            return v

get_allowed_nodes

def get_allowed_nodes() -> set[str]

Generates a set of nodes that should be included in the final output.

Source code in hpcman/hpcman/queue/info.py
    def get_allowed_nodes(self) -> set[str]:
        """Generates a set of nodes that should be included in the final output."""
        nodes_to_allow: set[str] = set(self.nodes)
        load_partitions()
        from .util import PARTITIONS, ASSOCIATIONS

        if ASSOCIATIONS is not None:
            for user in self.users:
                for acct in ASSOCIATIONS.users[user].accounts:
                    self.slurm_account_info.add(acct)

        for acct in self.accounts:
            self.slurm_account_info.add(acct)

        if is_partitions_set(PARTITIONS):
            for part in self.partitions:
                nodes_to_allow.update(nodelist_from_range_str(PARTITIONS[part].nodes))
        else:
            rprint("Error loading Slurm partitions.")
            exit(1)

        if self.slurm_account_info:
            load_partitions()
            if is_partitions_set(PARTITIONS):
                for partition in PARTITIONS.values():
                    if not partition.allowed_accounts or not set(partition.allowed_accounts).isdisjoint(self.slurm_account_info):
                        nodes_to_allow.update(nodelist_from_range_str(partition.nodes))
        else:
            if not nodes_to_allow:
                load_nodes()
                from .util import NODES

                if is_nodes_set(NODES):
                    nodes_to_allow = set(NODES.keys())

        return nodes_to_allow

generate_node_tree

def generate_node_tree() -> Tree
Source code in hpcman/hpcman/queue/info.py
    def generate_node_tree(self) -> Tree:
        node_tree = Tree("Slurm Nodes")

        priority_partitions = get_priority_partitions()
        partition_map = get_allowed_accounts_per_partition()
        filtered = _gather_filtered_nodes(self)
        for node, node_jobs in filtered:
            warnings = 0
            node_text = Text()
            node_text = node_text.append(node.name, style="cyan")
            node_text.append(f" {node.state} ")
            if node.reason:
                node_text.append_text(Text(f"({node.reason}) ", style="red"))
            node_text.append("in ")
            part_text = Text()
            for i, part in enumerate(node.partitions):
                part_name = part
                if i > 0:
                    part_text.append(",")
                if part in priority_partitions:
                    part_name = f"{part_name}*"
                    style = Style(color="magenta", bold=True)
                else:
                    style = Style(color="magenta")
                if len(self.slurm_account_info) > 0 and partition_map[part].isdisjoint(self.slurm_account_info):
                    style = Style(color="grey89", strike=True, dim=True)
                part_text.append(part_name, style=style)
            node_text.append_text(part_text)
            if overload := check_if_node_overloaded(node):
                node_text.append(f" **{overload.message}**", style=overload.color)
                warnings += 1
            if overalloc := check_if_mem_overallocated(node):
                mem_color = overload.color if overload else overalloc.color
                node_text.append(f" **{overalloc.message}**", style=mem_color)
                warnings += 1
            if self.warn and not warnings:
                continue
            node_info = Tree(node_text)
            node_info.add(Tree(f"OS: {node.operating_system}"))
            node_info.add(Tree(get_node_cpu_text(node)))
            node_info.add(Tree(get_node_mem_text(node)))
            if node.configured_gres:
                node_info.add(Tree(get_node_gpu_text(node)))

            # Add job info
            if node_jobs:
                jobtree = Tree("Jobs")
                for job in node_jobs:
                    jobtext = Text()
                    state = SlurmJobState(job.state)
                    try:
                        job_user = job.user_name
                    except KeyError:
                        # This happens ocassionally when user id lookup fails at the system level
                        job_user = f"uid={job.user_id}"
                    if self.users and job_user in self.users or job_user == getuser():
                        jobtext.append(job.user_name, "green bold")
                    else:
                        jobtext.append(job_user)
                    jobtext.append(f" {job.id}")
                    state_style = Style()
                    if state.is_fail:
                        state_style = Style(color="red", bold=True)
                    elif state.is_pending:
                        state_style = Style(color="yellow")
                    elif state.is_success:
                        state_style = Style(color="green")

                    jobtext.append(f" {job.state}", style=state_style)
                    try:
                        resources = job.get_resource_layout_per_node()[node.name]
                    except KeyError:
                        resources = {}
                    if not resources:
                        if job.state_reason and job.state_reason != "None":
                            jobtext.append(f"({job.state_reason})", style=state_style)
                        jobtext.append(f" {job.name} {job.partition} CPUs={job.cpus} Memory=")
                        try:
                            jobtext.append(f"{humanize(job.memory)}")
                        except AttributeError:
                            jobtext.append("None")
                    else:
                        jobtext.append(f" {job.name} {job.partition} CPUs={job.cpus} Memory=")
                        memory: int | None = resources.get("memory", None)
                        if memory is None:
                            jobtext.append("None")
                        else:
                            jobtext.append(f"{humanize(memory)}")
                        if "gres" in resources:
                            gres = resources["gres"]
                            for key in gres:
                                if key.startswith("gpu"):
                                    gpus = gres[key]
                                    try:
                                        jobtext.append(f" GPUs={key.split(':', 1)[1]}({gpus['count']})")
                                    except IndexError:
                                        jobtext.append(f" GPUs={key}({gpus['count']})")
                    jobtree.add(Tree(jobtext))
                node_info.add(jobtree)
            else:
                node_info.add(Tree("No Jobs"))

            # Finished node
            node_tree.add(node_info)

        return node_tree

Functions

get_jobs_by_node

def get_jobs_by_node() -> dict[str, list[SlurmJobInfo]]

Loads jobs into a dict per node allocation. Ignores non-allocated jobs.

Source code in hpcman/hpcman/queue/info.py
def get_jobs_by_node() -> dict[str, list[SlurmJobInfo]]:
    """Loads jobs into a dict per node allocation. Ignores non-allocated jobs."""
    adapter, _ = get_backend()
    all_jobs = adapter.load_jobs()
    job_dict: dict[str, list[SlurmJobInfo]] = {}
    for job in all_jobs.values():
        if job.allocated_nodes is not None:
            for node in nodelist_from_range_str(job.allocated_nodes):
                job_dict.setdefault(node, []).append(job)
        elif job.required_nodes is not None:
            job_dict.setdefault(job.required_nodes, []).append(job)
    return job_dict

get_style_from_range

def get_style_from_range(
    value: int
) -> Style

Provides a set Style color for a range of input values between 0-100.

Closer to 100 is green while closer to 0 is red.

Source code in hpcman/hpcman/queue/info.py
def get_style_from_range(value: int) -> Style:
    """Provides a set Style color for a range of input values between 0-100.

    Closer to 100 is green while closer to 0 is red.
    """
    if value in range(70, 100):
        return Style(color="green")
    elif value in range(40, 70):
        return Style(color="yellow")
    elif value in range(10, 40):
        return Style(color="bright_red")
    elif value < 9:
        return Style(color="red")
    else:
        return Style(color="white")

check_if_node_overloaded

def check_if_node_overloaded(
    node: SlurmNode
) -> Overload

Compares CPU load to use to determine utilization/overallocation.

Source code in hpcman/hpcman/queue/info.py
def check_if_node_overloaded(node: SlurmNode) -> Overload:
    """Compares CPU load to use to determine utilization/overallocation."""
    if node.total_cpus == 0:
        return Overload(False)
    if (ratio := node.cpu_load / node.total_cpus * 1.10) > 1:
        return Overload(True, f"Load > {ratio:.2f}X total cpu")
    elif (ratio := node.cpu_load / max(node.allocated_cpus, 1)) > 1.5:
        return Overload(True, f"Load > {ratio:.2f}X allocated cpu")
    elif (ratio := max(node.allocated_cpus, 1) / max(node.cpu_load, 1)) > 2:
        return Overload(True, f"Under-utilized CPU allocation > {ratio:.2f}X CPU Load", "bold yellow")
    else:
        return Overload(False)

check_if_mem_overallocated

def check_if_mem_overallocated(
    node: SlurmNode
) -> Overload

Compares memory allocation to use to determine utilization/overallocation.

Source code in hpcman/hpcman/queue/info.py
def check_if_mem_overallocated(node: SlurmNode) -> Overload:
    """Compares memory allocation to use to determine utilization/overallocation."""
    if None in (node.allocated_memory, node.free_memory, node.real_memory):
        return Overload(False)
    elif (ratio := node.allocated_memory / max(node.real_memory - node.free_memory, 1)) > 2:
        return Overload(True, f"Under-utilized memory allocation > {ratio:.2f}X Mem Used", "bold yellow")
    elif (avail := node.real_memory - node.allocated_memory) < 8000:
        return Overload(True, f"Low Mem Available: {humanize(avail)}")
    else:
        return Overload(False)

get_node_cpu_text

def get_node_cpu_text(
    node: SlurmNode
) -> Text

Formatting node cpu information as a rich.Text object.

Source code in hpcman/hpcman/queue/info.py
def get_node_cpu_text(node: SlurmNode) -> Text:
    """Formatting node cpu information as a rich.Text object."""
    if node.threads_per_core > 1:
        cpu_arch = f"{node.architecture} SMT CPUs"
    else:
        cpu_arch = f"{node.architecture} CPUs"
    cpu_text = Text(f"{cpu_arch}: Physical={node.cores_per_socket * node.sockets} Total={node.total_cpus}")
    cpu_text.append(" Free=")
    cpu_style = get_style_from_range(int(node.idle_cpus / node.total_cpus * 100))
    cpu_text.append_text(Text(f"{node.idle_cpus}", style=cpu_style))
    cpu_text.append(" Allocated=")
    cpu_text.append_text(Text(f"{node.allocated_cpus}", style=cpu_style))
    cpu_text.append(" Load=")
    if check_if_node_overloaded(node):
        cpu_text.append(f"{node.cpu_load}", style="red")
    else:
        cpu_text.append(f"{node.cpu_load}")

    return cpu_text

get_node_mem_text

def get_node_mem_text(
    node: SlurmNode
) -> Text

Formatting node mem information as a rich.Text object.

Source code in hpcman/hpcman/queue/info.py
def get_node_mem_text(node: SlurmNode) -> Text:
    """Formatting node mem information as a rich.Text object."""
    mem_text = Text(f"Memory: Total={humanize(node.real_memory)}")
    available_mem: int = node.real_memory - node.allocated_memory
    mem_style = get_style_from_range(int(available_mem / node.real_memory * 100))
    mem_text.append(" Available=")
    mem_text.append_text(Text(f"{humanize(available_mem)}", style=mem_style))
    mem_text.append(" Allocated=")
    mem_text.append_text(Text(f"{humanize(node.allocated_memory)}", style=mem_style))
    mem_text.append(f" Free={humanize(node.free_memory)}")

    return mem_text

get_node_gpu_text

def get_node_gpu_text(
    node: SlurmNode
) -> Text

Formatting node gpu information as a rich.Text object.

Source code in hpcman/hpcman/queue/info.py
def get_node_gpu_text(node: SlurmNode) -> Text:
    """Formatting node gpu information as a rich.Text object."""
    gpu_text = Text(f"GPUs: ")
    for gres in node.configured_gres:
        if gres.startswith("gpu"):
            try:
                gpu_text.append(f"Type={gres.split(':', 1)[1]} Total={node.configured_gres[gres]}")
            except IndexError:
                gpu_text.append(f"{node.configured_gres[gres]} Total=NA")

    for gres in node.allocated_gres:
        if gres.startswith("gpu"):
            try:
                gresname = gres.split(":", 1)[1]
            except IndexError:
                gresname = gres
            try:
                gpu_text.append(f" Allocated={node.allocated_gres[gresname]}")
            except KeyError:
                pass

    return gpu_text

def print_node_tree(
    kwargs = {}
) -> None

Generates and prints node tree.

Source code in hpcman/hpcman/queue/info.py
def print_node_tree(**kwargs) -> None:
    """Generates and prints node tree."""
    json_output = kwargs.pop("json_output", False)
    console = Console()
    try:
        info = SlurmInfo(**{k: v for k, v in kwargs.items() if v is not None})
    except ValidationError as e:
        handle_validation_errors(e, print_error=False)
    if json_output:
        _print_info_json(info)
        return
    console.print(info.generate_node_tree())
    if kwargs["debug"]:
        console.print(info)