Source code for lapis.drone

from lapis.workernode import WorkerNode

from usim import time, Scope, instant, ResourcesUnavailable
from typing import Optional, TYPE_CHECKING

from lapis.cachingjob import CachingJob

from lapis.monitor.duplicates import DroneStatusCaching

if TYPE_CHECKING:
    from lapis.caching.connection import Connection


[docs]class Drone(WorkerNode): """ Represents worker nodes in the simulation. """ def __init__( self, scheduler, pool_resources: Optional[dict] = None, scheduling_duration: Optional[float] = None, ignore_resources: list = None, sitename: str = None, connection: "Connection" = None, empty: callable = lambda drone: drone.theoretical_available_resources.get( "cores", 1 ) < 1, ): """ Drone initialization :param scheduler: scheduler that assigns jobs to the drone :param pool_resources: dict of the drone's resources :param scheduling_duration: amount of time that passes between the drone's start up and it's registration at the scheduler :param ignore_resources: dict of the resource keys that are ignored, e.g. "disk" :param sitename: identifier, used to determine which caches a drone can use :param connection: connection object that holds remote connection and handles file transfers :param empty: callable that determines whether the drone is currently running any jobs """ super().__init__( scheduler, pool_resources, scheduling_duration, ignore_resources ) self.connection = connection """connection object that holds remote connection and handles file transfers""" self.sitename = sitename """identifies the site the drone belongs to, used to determine which caches a drone can use """ self._empty = empty """method that is used to determine whether a drone is empty""" # caching-related self.jobs_with_cached_data = 0 """amount of jobs that currently run on the drone and that could read from the cache""" self.cached_data = 0 """used during scheduling, calculated for each job, is assigned the expectation value for the amount of cached data that is available to the drone"""
[docs] def empty(self): """ Checks whether there are any jobs running on this drone :return: true if no jobs are running on this drone, false else """ return self._empty(self)
[docs] async def run(self): """ Handles the drone's activity during simulation. Upon execution the drone registers itself at the scheduler and once jobs are scheduled to the drone's job queue, these jobs are executed. Starting jobs via a job queue was introduced to avoid errors in resource allocation and monitoring. """ from lapis.monitor.core import sampling_required await (time + self.scheduling_duration) self._supply = 1 self.scheduler.register_drone(self) await sampling_required.put( DroneStatusCaching( repr(self), self.pool_resources["cores"], self.theoretical_available_resources["cores"], self.jobs_with_cached_data, ) ) await sampling_required.put(self) async with Scope() as scope: async for job, kill in self._job_queue: scope.do(self._run_job(job=job, kill=kill))
def _init_allocation_and_utilisation(self): levels = self.resources.levels resources = [] for resource_key in self._valid_resource_keys: if ( getattr(levels, resource_key) == 0 and self.pool_resources[resource_key] == 0 ): pass else: resources.append( getattr(levels, resource_key) / self.pool_resources[resource_key] ) self._allocation = max(resources) self._utilisation = min(resources)
[docs] async def shutdown(self): """ Upon shutdown, the drone unregisters from the scheduler. """ from lapis.monitor.core import sampling_required self._supply = 0 self.scheduler.unregister_drone(self) await sampling_required.put( DroneStatusCaching( repr(self), self.pool_resources["cores"], self.theoretical_available_resources["cores"], self.jobs_with_cached_data, ) ) await sampling_required.put(self) # TODO: introduce state of drone await (time + 1)
[docs] async def _run_job(self, job: CachingJob, kill: bool): # FIXME: needs adaptation """ Method manages to start a job in the context of the given drone. The job is started regardless of the available resources. The resource allocation takes place after starting the job and the job is killed if the drone's overall resources are exceeded. In addition, if the `kill` flag is set, jobs are killed if the resources they use exceed the resources they requested. Then the end of the job's execution is awaited and the drones status known to the scheduler is changed. :param job: the job to start :param kill: if True, a job is killed when used resources exceed requested resources """ job.drone = self async with Scope() as scope: from lapis.monitor.core import sampling_required self._utilisation = self._allocation = None job_execution = scope.do(job.run(self)) self.jobs += 1 if job._cached_data: self.jobs_with_cached_data += 1 try: async with self.resources.claim(**job.resources): await sampling_required.put( DroneStatusCaching( repr(self), self.pool_resources["cores"], self.theoretical_available_resources["cores"], self.jobs_with_cached_data, ) ) await sampling_required.put(self) if kill: for resource_key in job.resources: try: if ( job.resources[resource_key] < job.used_resources[resource_key] ): await instant job_execution.cancel() await instant except KeyError: # check is not relevant if the data is not stored pass # self.scheduler.update_drone(self) await job_execution.done except ResourcesUnavailable: await instant job_execution.cancel() await instant except AssertionError: await instant job_execution.cancel() await instant self.jobs -= 1 if job._cached_data: self.jobs_with_cached_data -= 1 await self.scheduler.job_finished(job) self._utilisation = self._allocation = None self.scheduler.update_drone(self) await sampling_required.put(self) await sampling_required.put( DroneStatusCaching( repr(self), self.pool_resources["cores"], self.theoretical_available_resources["cores"], self.jobs_with_cached_data, ) )
[docs] def look_up_cached_data(self, job: CachingJob): """ Determines the amount of the job's input data that is stored in caches the drone can access and sets the drone's `cached_data` attribute to the resulting value. This quantity can then be used in the job matching process. *Pay attention to the fact that the current implementation only works for hitrate based caching and that while KeyErrors should not occur due to the way the method is called, KeyErrors are not handled here.* :param job: """ cached_data = 0 caches = self.connection.storages.get(self.sitename, None) if caches: if job.inputfiles: cached_data = sum( [ filespecs["hitrates"].get(cache.sitename, 0) * filespecs["filesize"] for cache in caches for filespecs in job.inputfiles.values() ] ) self.cached_data = cached_data
def __repr__(self): return "<%s: %s %s>" % (self.__class__.__name__, id(self), self.sitename)