Skip to content

info

SlurmInfo

Bases: BaseModel

Source code in hpcman/hpcman/queue/info.py
class SlurmInfo(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)
    users: list[str] = []
    accounts: list[str] = []
    partitions: list[str] = []
    nodes: list[str] = []
    states: re.Pattern[str] = re.compile(r'.', re.DOTALL)
    me: bool = False
    gpus: bool = False
    warn: bool = False

    # Set by method
    slurm_account_info: set[str] = set()

    @field_validator("users")  # type: ignore
    @classmethod
    def check_users(cls, v: list[str]) -> list[str]:
        if not any([check_valid_slurm_association(x) for x in v]):
            raise InvalidSlurmUser(f"One or more provided users is invalid: {v}")
        else:
            return v

    @field_validator("nodes")  # type: ignore
    @classmethod
    def check_nodes(cls, v: list[str]) -> list[str]:
        if not any([check_valid_slurm_node(x) for x in v]):
            raise InvalidSlurmNode(f"One or more provided nodes is invalid: {v}")
        else:
            return v

    @field_validator("accounts")  # type: ignore
    @classmethod
    def check_accounts(cls, v: list[str]) -> list[str]:
        if not any([check_valid_slurm_association(x, "accounts") for x in v]):
            raise InvalidSlurmAccount(f"One or more provided accounts is invalid: {v}")
        else:
            return v

    @field_validator("partitions")  # type: ignore
    @classmethod
    def check_partitions(cls, v: list[str]) -> list[str]:
        if not any([check_valid_slurm_partition(part) for part in v]):
            raise InvalidSlurmPartition(f"One or more provided partitions is invalid: {v}")
        else:
            return v

    def get_allowed_nodes(self) -> set[str]:
        """Generates a set of nodes that should be included in the final output."""
        nodes_to_allow: set[str] = set(self.nodes)
        load_partitions()
        from .util import PARTITIONS, ASSOCIATIONS

        if ASSOCIATIONS is not None:
            for user in self.users:
                for acct in ASSOCIATIONS.users[user].accounts:
                    self.slurm_account_info.add(acct)

        for acct in self.accounts:
            self.slurm_account_info.add(acct)

        if is_partitions_set(PARTITIONS):
            for part in self.partitions:
                nodes_to_allow.update(nodelist_from_range_str(PARTITIONS[part].nodes))
        else:
            rprint("Error loading Slurm partitions.")
            exit(1)

        if self.slurm_account_info:
            load_partitions()
            if is_partitions_set(PARTITIONS):
                for partition in PARTITIONS.values():
                    if not set(partition.allowed_accounts).isdisjoint(self.slurm_account_info):
                        nodes_to_allow.update(nodelist_from_range_str(partition.nodes))
        else:
            if not nodes_to_allow:
                load_nodes()
                from .util import NODES
                if is_nodes_set(NODES):
                    nodes_to_allow = set(NODES.keys())

        return nodes_to_allow

    def generate_node_tree(self) -> Tree:
        node_tree = Tree("Slurm Nodes")

        priority_partitions = get_priority_partitions()
        partition_map = get_allowed_accounts_per_partition()
        nodes_to_allow = self.get_allowed_nodes()
        jobs = get_jobs_by_node()
        for node in get_nodes_values():
            warnings = 0
            if node.name is None or node.name not in nodes_to_allow or not node.partitions:
                continue
            elif self.gpus and not node.configured_gres:
                continue
            elif not re.search(self.states, node.state):
                continue
            node_text = Text()
            node_text = node_text.append(node.name, style="cyan")
            node_text.append(f" {node.state} in ")
            part_text = Text()
            for i, part in enumerate(node.partitions):
                part_name = part
                if i > 0:
                    part_text.append(",")
                if part in priority_partitions:
                    part_name = f"{part_name}*"
                    style = Style(color="magenta", bold=True)
                else:
                    style = Style(color="magenta")
                if len(self.slurm_account_info) > 0 and partition_map[part].isdisjoint(self.slurm_account_info):
                    style = Style(color="grey89", strike=True, dim=True)
                part_text.append(part_name, style=style)
            node_text.append_text(part_text)
            if (overload := check_if_node_overloaded(node)):
                node_text.append(f" **{overload.message}**", style=overload.color)
                warnings += 1
            if (overalloc := check_if_mem_overallocated(node)):
                node_text.append(f" **{overalloc.message}**", style=overload.color)
                warnings += 1
            if self.warn and not warnings:
                continue
            node_info = Tree(node_text)
            node_info.add(Tree(f"OS: {node.operating_system}"))
            node_info.add(Tree(get_node_cpu_text(node)))
            node_info.add(Tree(get_node_mem_text(node)))
            if node.configured_gres:
                node_info.add(Tree(get_node_gpu_text(node)))

            # Add job info
            if node.name in jobs:
                jobtree = Tree("Jobs")
                for job in jobs[node.name]:
                    jobtext = Text()
                    state = SlurmJobState(job.state)
                    try:
                        job_user = job.user_name
                    except KeyError:
                        # This happens ocassionally when user id lookup fails at the system level
                        job_user = f'uid={job.user_id}'
                    if self.users and job_user in self.users or job_user == getuser():
                        jobtext.append(job.user_name, "green bold")
                    else:
                        jobtext.append(job_user)
                    jobtext.append(f" {job.id}")
                    state_style = Style()
                    if state.is_fail:
                        state_style = Style(color="red", bold=True)
                    elif state.is_pending:
                        state_style = Style(color="yellow")
                    elif state.is_success:
                        state_style = Style(color="green")

                    jobtext.append(f" {job.state}", style=state_style) 
                    try:
                        resources = job.get_resource_layout_per_node()[node.name]
                    except KeyError:
                        resources = {}
                    if not resources:
                        # rprint(f"pending job info: {job.to_dict()}")
                        jobtext.append(f"({job.state_reason})", style=state_style)
                        jobtext.append(f" {job.name} {job.partition} CPUs={job.cpus} Memory=")
                        try:
                            jobtext.append(f"{humanize(job.memory)}")
                        except AttributeError:
                            jobtext.append("None")
                    else:
                        jobtext.append(f" {job.name} {job.partition} CPUs={job.cpus} Memory=")
                        memory: int | None = resources.get('memory', None)
                        if memory is None:
                            jobtext.append("None")
                        else:
                            jobtext.append(f"{humanize(memory)}")
                        if 'gres' in resources:
                            gres = resources['gres']
                            for key in gres:
                                if key.startswith('gpu'):
                                    gpus = gres[key]
                                    try:
                                        jobtext.append(f" GPUs={key.split(':', 1)[1]}({gpus['count']})")
                                    except IndexError:
                                        jobtext.append(f" GPUs={key}({gpus['count']})")
                    jobtree.add(Tree(jobtext))
                node_info.add(jobtree)
            else:
                node_info.add(Tree("No Jobs"))

            # Finished node
            node_tree.add(node_info)

        return node_tree

get_allowed_nodes()

Generates a set of nodes that should be included in the final output.

Source code in hpcman/hpcman/queue/info.py
def get_allowed_nodes(self) -> set[str]:
    """Generates a set of nodes that should be included in the final output."""
    nodes_to_allow: set[str] = set(self.nodes)
    load_partitions()
    from .util import PARTITIONS, ASSOCIATIONS

    if ASSOCIATIONS is not None:
        for user in self.users:
            for acct in ASSOCIATIONS.users[user].accounts:
                self.slurm_account_info.add(acct)

    for acct in self.accounts:
        self.slurm_account_info.add(acct)

    if is_partitions_set(PARTITIONS):
        for part in self.partitions:
            nodes_to_allow.update(nodelist_from_range_str(PARTITIONS[part].nodes))
    else:
        rprint("Error loading Slurm partitions.")
        exit(1)

    if self.slurm_account_info:
        load_partitions()
        if is_partitions_set(PARTITIONS):
            for partition in PARTITIONS.values():
                if not set(partition.allowed_accounts).isdisjoint(self.slurm_account_info):
                    nodes_to_allow.update(nodelist_from_range_str(partition.nodes))
    else:
        if not nodes_to_allow:
            load_nodes()
            from .util import NODES
            if is_nodes_set(NODES):
                nodes_to_allow = set(NODES.keys())

    return nodes_to_allow

check_if_mem_overallocated(node)

Compares memory allocation to use to determine utilization/overallocation.

Source code in hpcman/hpcman/queue/info.py
def check_if_mem_overallocated(node: pyslurm.Node) -> Overload:
    """Compares memory allocation to use to determine utilization/overallocation."""
    if None in (node.allocated_memory, node.free_memory, node.real_memory):
        return Overload(False)
    elif (ratio := node.allocated_memory / max(node.real_memory - node.free_memory, 1)) > 2:
        return Overload(True, f"Under-utilized memory allocation > {ratio:.2f}X Mem Used", "bold yellow")
    elif (avail := node.real_memory - node.allocated_memory) < 8000:
        return Overload(True, f"Low Mem Available: {humanize(avail)}")
    else:
        return Overload(False)

check_if_node_overloaded(node)

Compares CPU load to use to determine utilization/overallocation.

Source code in hpcman/hpcman/queue/info.py
def check_if_node_overloaded(node: pyslurm.Node) -> Overload:
    """Compares CPU load to use to determine utilization/overallocation."""
    if (ratio := node.cpu_load / node.total_cpus*1.10) > 1:
        return Overload(True, f"Load > {ratio:.2f}X total cpu")
    elif (ratio := node.cpu_load / max(node.allocated_cpus, 1)) > 1.5:
        return Overload(True, f"Load > {ratio:.2f}X allocated cpu")
    elif (ratio := max(node.allocated_cpus, 1) / max(node.cpu_load, 1)) > 2:
        return Overload(True, f"Under-utilized CPU allocation > {ratio:.2f}X CPU Load", "bold yellow")
    else:
        return Overload(False)

get_jobs_by_node()

Loads jobs into a dict per node allocation. Ignores non-allocated jobs.

Source code in hpcman/hpcman/queue/info.py
def get_jobs_by_node() -> dict[str, list[pyslurm.Job]]:
    """Loads jobs into a dict per node allocation. Ignores non-allocated jobs."""
    job_dict: dict[str, list[pyslurm.Job]] = {}
    for job in pyslurm.Jobs.load().values():
        if job.allocated_nodes is not None:
            for node in pyslurm.utils.nodelist_from_range_str(job.allocated_nodes):
                try:
                    job_dict[node].append(job)
                except KeyError:
                    job_dict[node] = [job]
        elif job.required_nodes is not None:
            try:
                job_dict[job.required_nodes].append(job)
            except KeyError:
                job_dict[job.required_nodes] = [job]
    return job_dict

get_node_cpu_text(node)

Formatting node cpu information as a rich.Text object.

Source code in hpcman/hpcman/queue/info.py
def get_node_cpu_text(node: pyslurm.Node) -> Text:
    """Formatting node cpu information as a rich.Text object."""
    if node.threads_per_core > 1:
        cpu_arch = f"{node.architecture} SMT CPUs"
    else:
        cpu_arch = f"{node.architecture} CPUs"
    cpu_text = Text(f"{cpu_arch}: Physical={node.cores_per_socket*node.sockets} Total={node.total_cpus}")
    cpu_text.append(" Free=")
    cpu_style = get_style_from_range(int(node.idle_cpus/node.total_cpus * 100))
    cpu_text.append_text(Text(f"{node.idle_cpus}", style=cpu_style))
    cpu_text.append(" Allocated=")
    cpu_text.append_text(Text(f"{node.allocated_cpus}", style=cpu_style))
    cpu_text.append(" Load=")
    if check_if_node_overloaded(node):
        cpu_text.append(f"{node.cpu_load}", style="red")
    else:
        cpu_text.append(f"{node.cpu_load}")

    return cpu_text

get_node_gpu_text(node)

Formatting node gpu information as a rich.Text object.

Source code in hpcman/hpcman/queue/info.py
def get_node_gpu_text(node: pyslurm.Node) -> Text:
    """Formatting node gpu information as a rich.Text object."""
    gpu_text = Text(f"GPUs: ")
    for gres in node.configured_gres:
        if gres.startswith("gpu"):
            try:
                gpu_text.append(f"Type={gres.split(':', 1)[1]} Total={node.configured_gres[gres]}")
            except IndexError:
                gpu_text.append(f"{node.configured_gres[gres]} Total=NA")

    for gres in node.allocated_gres:
        if gres.startswith("gpu"):
            try:
                gresname = gres.split(':', 1)[1]
            except IndexError:
                gresname = gres
            try:
                gpu_text.append(f" Allocated={node.allocated_gres[gresname]}")
            except KeyError:
                pass

    return gpu_text

get_node_mem_text(node)

Formatting node mem information as a rich.Text object.

Source code in hpcman/hpcman/queue/info.py
def get_node_mem_text(node: pyslurm.Node) -> Text:
    """Formatting node mem information as a rich.Text object."""
    mem_text = Text(f"Memory: Total={humanize(node.real_memory)}")
    available_mem: int = node.real_memory - node.allocated_memory
    mem_style = get_style_from_range(int(available_mem/node.real_memory * 100))
    mem_text.append(" Available=")
    mem_text.append_text(Text(f"{humanize(available_mem)}", style=mem_style))
    mem_text.append(" Allocated=")
    mem_text.append_text(Text(f"{humanize(node.allocated_memory)}", style=mem_style))
    mem_text.append(f" Free={humanize(node.free_memory)}")

    return mem_text

get_priority_partitions(default_priority=100)

Compares partitions to default priority (100) to determine if any priority partitions are available.

Source code in hpcman/hpcman/queue/info.py
def get_priority_partitions(default_priority: int = 100) -> set[str]:
    """Compares partitions to default priority (100) to determine if any priority partitions are available."""
    load_partitions()
    from .util import PARTITIONS
    if is_partitions_set(PARTITIONS):
        return {part.name for part in PARTITIONS.values() if part.priority_tier > default_priority}
    else:
        rprint("Unable to load partition data. Exiting.")
        exit(1)

get_style_from_range(value)

Provides a set Style color for a range of input values between 0-100.

Closer to 100 is green while closer to 0 is red.

Source code in hpcman/hpcman/queue/info.py
def get_style_from_range(value: int) -> Style:
    """Provides a set Style color for a range of input values between 0-100.

    Closer to 100 is green while closer to 0 is red.
    """
    if value in range(70, 100):
        return Style(color="green")
    elif value in range(40, 70):
        return Style(color="yellow")
    elif value in range(10, 40):
        return Style(color="bright_red")
    elif value < 9:
        return Style(color="red")
    else:
        return Style(color="white")

print_node_tree(**kwargs)

Generates and prints node tree.

Source code in hpcman/hpcman/queue/info.py
def print_node_tree(**kwargs) -> None:
    """Generates and prints node tree."""
    console = Console()
    try:
        info = SlurmInfo(**{k: v for k, v in kwargs.items() if v is not None})
    except ValidationError as e:
        handle_validation_errors(e, print_error=False)
    console.print(info.generate_node_tree())
    if kwargs["debug"]:
        console.print(info)