Source code for lapis.job

import logging
from typing import Optional, TYPE_CHECKING, Dict

from usim import time, Queue
from usim import CancelTask

from lapis.monitor.core import sampling_required

if TYPE_CHECKING:
    from lapis.workernode import WorkerNode


class Job(object):
    __slots__ = (
        "resources",
        "used_resources",
        "walltime",
        "requested_walltime",
        "queue_date",
        "in_queue_since",
        "in_queue_until",
        "_name",
        "drone",
        "_success",
    )

    def __init__(
        self,
        resources: Dict[str, float],
        used_resources: Dict[str, float],
        in_queue_since: float = 0,
        queue_date: float = 0,
        name: str = None,
    ):
        """
        Definition of a job that uses a specified amount of resources `used_resources`
        over a given amount of time, `walltime`. A job is described by its user
        via the parameter `resources`. This is a user prediction and is expected
        to deviate from `used_resources`.

        :param resources: Requested resources of the job
        :param used_resources: Resource usage of the job
        :param in_queue_since: Time when job was inserted into the queue of the
                               simulation scheduler
        :param queue_date: Time when job was inserted into queue in real life
        :param name: Name of the job
        """
        self.resources = resources
        self.used_resources = used_resources
        for key in used_resources:
            if key not in resources:
                logging.getLogger("implementation").info(
                    "job uses different resources than specified, added %s: %s",
                    key,
                    self.used_resources[key],
                )
                self.resources[key] = self.used_resources[key]
        self.walltime: float = used_resources.pop("walltime")
        """the job's runtime, in reality as well as in the simulation"""
        self.requested_walltime: Optional[float] = resources.pop("walltime", None)
        """estimate of the job's walltime"""
        self.queue_date = queue_date
        """ point in time when the job was submitted to the simulated job queue"""
        assert in_queue_since >= 0, "Queue time cannot be negative"
        self.in_queue_since = in_queue_since
        """Time when job was inserted into the queue of the simulation scheduler"""
        self.in_queue_until: Optional[float] = None
        """point in time when the job left the job queue"""
        self.drone = None
        self._name = name
        """identifier of the job"""
        self._success: Optional[bool] = None
        """flag indicating whether the job was completed successfully"""

    @property
    def name(self) -> str:
        return self._name or id(self)

    @property
    def successful(self) -> Optional[bool]:
        return self._success

    @property
    def waiting_time(self) -> float:
        """
        The time the job spent in the simulators scheduling queue. `Inf` when
        the job is still waiting.

        :return: Time in queue
        """
        if self.in_queue_until is not None:
            return self.in_queue_until - self.in_queue_since
        return float("Inf")

    async def run(self, drone: "WorkerNode"):
        assert drone, "Jobs cannot run without a drone being assigned"
        self.drone = drone
        self.in_queue_until = time.now
        self._success = None
        await sampling_required.put(self)
        try:
            await (time + self.walltime)
        except CancelTask:
            self.drone = None
            self._success = False
        except BaseException:
            self.drone = None
            self._success = False
            raise
        else:
            self.drone = None
            self._success = True
        await sampling_required.put(self)

    def __repr__(self):
        return "<%s: %s>" % (self.__class__.__name__, self._name or id(self))


[docs]async def job_to_queue_scheduler(job_generator, job_queue: Queue): """ Handles reading the simulation's job input and puts the job's into the job queue :param job_generator: reader object that yields jobs from input :param job_queue: queue the jobs are added to """ base_date = None for job in job_generator: if base_date is None: base_date = job.queue_date current_time = job.queue_date - base_date if time.now < current_time: await (time >= current_time) job.in_queue_since = time.now await job_queue.put(job) await job_queue.close()