Source code for lapis.scheduler

import random
from abc import ABC, abstractmethod
from statistics import mean
from typing import (
from weakref import WeakKeyDictionary

from sortedcontainers import SortedDict

from classad import parse
from classad._base_expression import Expression
from classad._functions import quantize
from classad._primitives import HTCInt, Undefined
from classad._expression import ClassAd
from usim import Scope, interval, Resources

from lapis.drone import Drone
from lapis.cachingjob import CachingJob
from lapis.monitor.core import sampling_required
from lapis.monitor.duplicates import UserDemand

from usim import time

[docs]class JobQueue(list): pass
quantization_defaults = { "memory": HTCInt(128 * 1024 * 1024), "disk": HTCInt(1024 * 1024), "cores": HTCInt(1), } # ClassAd attributes are not case sensitive machine_ad_defaults = """ requirements = target.requestcpus <= my.cpus """.strip() job_ad_defaults = """ requirements = my.requestcpus <= target.cpus && my.requestmemory <= target.memory """ T = TypeVar("T") DJ = TypeVar("DJ", Drone, CachingJob)
[docs]class WrappedClassAd(ClassAd, Generic[DJ]): """ Combines the original job/drone object and the associated ClassAd. """ __slots__ = "_wrapped", "_temp" def __init__(self, classad: ClassAd, wrapped: DJ): """ Initialization for wrapped ClassAd :param classad: the wrapped objects ClassAd description :param wrapped: wrapped object, either job or drone """ super(WrappedClassAd, self).__init__() self._wrapped = wrapped self._data = classad._data self._temp = {}
[docs] def empty(self): """ Only relevant for wrapped drones to determine whether there are jobs running on them. If this is the case the amount of cores in usage is >= 1. :return: true if no CPU cores are in use, false if this is not the case """ try: return self._temp["cores"] < 1 except KeyError: return self._wrapped.theoretical_available_resources["cores"] < 1
def __getitem__(self, item): """ This method is used when evaluating classad expressions. :param item: name of a quantity in the classad expression :return: current value of this item """ def access_wrapped(name, requested=True): """ Extracts the wrapped object's current quantity of a certain resource ( cores, memory, disk) :param name: name of the resource that is to be accessed :param requested: false if name is a resource of the drone, true if name is a resource requested by a job :return: value of respective resource """ if isinstance(self._wrapped, Drone): return self._wrapped.theoretical_available_resources[name] if requested: return self._wrapped.resources[name] return self._wrapped.used_resources[name] if "target" not in item: if "requestcpus" == item: return access_wrapped("cores", requested=True) elif "requestmemory" == item: return (1 / 1024 / 1024) * access_wrapped("memory", requested=True) elif "requestdisk" == item: return (1 / 1024) * access_wrapped("disk", requested=True) elif "requestwalltime" == item: return self._wrapped.requested_walltime elif "cpus" == item: try: return self._temp["cores"] except KeyError: return access_wrapped("cores", requested=False) elif "memory" == item: try: return (1 / 1000 / 1000) * self._temp["memory"] except KeyError: return (1 / 1000 / 1000) * access_wrapped("memory", requested=False) elif "disk" == item: try: return (1 / 1024) * self._temp["disk"] except KeyError: return (1 / 1024) * access_wrapped("disk", requested=False) elif "cache_demand" == item: caches = self._wrapped.connection.storages.get( self._wrapped.sitename, None ) try: return mean( [1.0 / cache.connection._throughput_scale for cache in caches] ) except TypeError: return 0 elif "cache_scale" == item: caches = self._wrapped.connection.storages.get( self._wrapped.sitename, None ) try: return mean( [cache.connection._throughput_scale for cache in caches] ) except TypeError: return 0 elif "cache_throughput_per_core" == item: caches = self._wrapped.connection.storages.get( self._wrapped.sitename, None ) try: return sum( [ cache.connection.throughput / 1000.0 / 1000.0 / 1000.0 for cache in caches ] ) / float(self._wrapped.pool_resources["cores"]) except TypeError: return 0 elif "cached_data" == item: return self._wrapped.cached_data / 1000.0 / 1000.0 / 1000.0 elif "data_volume" == item: return self._wrapped._total_input_data / 1000.0 / 1000.0 / 1000.0 elif "current_waiting_time" == item: return - self._wrapped.queue_date elif "failed_matches" == item: return self._wrapped.failed_matches elif "jobs_with_cached_data" == item: return self._wrapped.jobs_with_cached_data return super(WrappedClassAd, self).__getitem__(item) def clear_temporary_resources(self): self._temp.clear() def __repr__(self): return f"<{self.__class__.__name__}>: {self._wrapped}" def __eq__(self, other): return super().__eq__(other) and self._wrapped == other._wrapped def __hash__(self): return id(self._wrapped)
[docs]class Cluster(List[WrappedClassAd[DJ]], Generic[DJ]): pass
[docs]class Bucket(List[Cluster[DJ]], Generic[DJ]): pass
[docs]class JobScheduler(ABC): __slots__ = () @property def drone_list(self) -> Iterator[Drone]: """Yields the registered drones""" raise NotImplementedError
[docs] def register_drone(self, drone: Drone): """Register a drone at the scheduler""" raise NotImplementedError
[docs] def unregister_drone(self, drone: Drone): """Unregister a drone at the scheduler""" raise NotImplementedError
[docs] def update_drone(self, drone: Drone): """Update parameters of a drone""" raise NotImplementedError
[docs] async def run(self): """Run method of the scheduler""" raise NotImplementedError
[docs] async def job_finished(self, job): """ Declare a job as finished by a drone. This might even mean, that the job has failed and that the scheduler needs to requeue the job for further processing. """ raise NotImplementedError
[docs]class CondorJobScheduler(JobScheduler): """ Goal of the htcondor job scheduler is to have a scheduler that somehow mimics how htcondor schedules jobs. Htcondor is scheduling based on a priority queue. The priorities itself are managed by operators of htcondor. So different instances can apparently behave very different. A priority queue that sorts job slots by increasing costs is built. The scheduler checks if a job either exactly fits a slot or if it fits several times. The cost for putting a job at a given slot is given by the amount of resources that might remain unallocated. """ def __init__(self, job_queue): self._stream_queue = job_queue self.drone_cluster = [] self.interval = 60 self.job_queue = JobQueue() self._collecting = True self._processing = Resources(jobs=0) @property def drone_list(self): for cluster in self.drone_cluster: for drone in cluster: yield drone
[docs] def register_drone(self, drone: Drone): self._add_drone(drone)
[docs] def unregister_drone(self, drone: Drone): for cluster in self.drone_cluster: try: cluster.remove(drone) except ValueError: pass else: if len(cluster) == 0: self.drone_cluster.remove(cluster)
def _add_drone(self, drone: Drone, drone_resources: Dict = None): minimum_distance_cluster = None distance = float("Inf") if len(self.drone_cluster) > 0: for cluster in self.drone_cluster: current_distance = 0 for key in {*cluster[0].pool_resources, *drone.pool_resources}: if drone_resources: current_distance += abs( cluster[0].theoretical_available_resources.get(key, 0) - drone_resources.get(key, 0) ) else: current_distance += abs( cluster[0].theoretical_available_resources.get(key, 0) - drone.theoretical_available_resources.get(key, 0) ) if current_distance < distance: minimum_distance_cluster = cluster distance = current_distance if distance < 1: minimum_distance_cluster.append(drone) else: self.drone_cluster.append([drone]) else: self.drone_cluster.append([drone])
[docs] def update_drone(self, drone: Drone): self.unregister_drone(drone) self._add_drone(drone)
[docs] async def run(self): async with Scope() as scope: async for _ in interval(self.interval): for job in self.job_queue.copy(): best_match = self._schedule_job(job) if best_match: await best_match.schedule_job(job) self.job_queue.remove(job) await sampling_required.put(self.job_queue) await sampling_required.put(UserDemand(len(self.job_queue))) self.unregister_drone(best_match) left_resources = best_match.theoretical_available_resources left_resources = { key: value - job.resources.get(key, 0) for key, value in left_resources.items() } self._add_drone(best_match, left_resources) if ( not self._collecting and not self.job_queue and == 0 ): break await sampling_required.put(self)
async def _collect_jobs(self): async for job in self._stream_queue: self.job_queue.append(job) await self._processing.increase(jobs=1) # TODO: logging happens with each job await sampling_required.put(self.job_queue) await sampling_required.put(UserDemand(len(self.job_queue))) self._collecting = False
[docs] async def job_finished(self, job): if job.successful: await self._processing.decrease(jobs=1) else: await self._stream_queue.put(job)
def _schedule_job(self, job) -> Optional[Drone]: priorities = {} for cluster in self.drone_cluster: drone = cluster[0] cost = 0 resources = drone.theoretical_available_resources for resource_type in job.resources: if resources.get(resource_type, 0) < job.resources[resource_type]: # Inf for all job resources that a drone does not support # and all resources that are too small to even be considered cost = float("Inf") break else: try: cost += 1 / ( resources[resource_type] // job.resources[resource_type] ) except KeyError: pass for additional_resource_type in [ key for key in drone.pool_resources if key not in job.resources ]: cost += resources[additional_resource_type] cost /= len((*job.resources, *drone.pool_resources)) if cost <= 1: # directly start job return drone try: priorities[cost].append(drone) except KeyError: priorities[cost] = [drone] try: minimal_key = min(priorities) if minimal_key < float("Inf"): return priorities[minimal_key][0] except ValueError: pass return None
# HTCondor ClassAd Scheduler
[docs]class NoMatch(Exception): """A job could not be matched to any drone"""
[docs]class RankedClusterKey(NamedTuple): rank: float key: Tuple[float, ...]
RC = TypeVar("RC", bound="RankedClusters")
[docs]class RankedClusters(Generic[DJ]): """Automatically cluster drones by rank""" @abstractmethod def __init__(self, quantization: Dict[str, HTCInt], ranking: Expression): raise NotImplementedError
[docs] @abstractmethod def empty(self) -> bool: """Whether there are no resources available""" raise NotImplementedError
[docs] @abstractmethod def copy(self: "RankedAutoClusters[DJ]") -> "RankedAutoClusters[DJ]": """Copy the entire ranked auto clusters""" raise NotImplementedError
[docs] @abstractmethod def add(self, item: WrappedClassAd[DJ]) -> None: """Add a new item""" raise NotImplementedError
[docs] @abstractmethod def remove(self, item: WrappedClassAd[DJ]) -> None: """Remove an existing item""" raise NotImplementedError
[docs] def update(self, item) -> None: """Update an existing item with its current state""" self.remove(item) self.add(item)
@abstractmethod def clusters(self) -> Iterator[Set[WrappedClassAd[DJ]]]: raise NotImplementedError @abstractmethod def items(self) -> Iterator[Tuple[Any, Set[WrappedClassAd[DJ]]]]: raise NotImplementedError
[docs] @abstractmethod def cluster_groups(self) -> Iterator[List[Set[WrappedClassAd[Drone]]]]: """Group autoclusters by PreJobRank""" raise NotImplementedError
[docs] @abstractmethod def lookup(self, job: CachingJob) -> None: """Update information about cached data for every drone""" raise NotImplementedError
[docs]class RankedAutoClusters(RankedClusters[DJ]): """Automatically cluster similar jobs or drones""" def __init__(self, quantization: Dict[str, HTCInt], ranking: Expression): """ :param quantization: factors to convert resources into HTCondor scaling :param ranking: prejobrank expression """ self._quantization = quantization self._ranking = ranking self._clusters: Dict[RankedClusterKey, Set[WrappedClassAd[DJ]]] = SortedDict() self._inverse: Dict[WrappedClassAd[DJ], RankedClusterKey] = {}
[docs] def empty(self) -> bool: """ Checks whether all drones in the RankedCluster are empty and currently not running any jobs. :return: """ for drones in self._clusters.values(): if not next(iter(drones)).empty(): return False return True
[docs] def copy(self) -> "RankedAutoClusters[DJ]": clone = type(self)(quantization=self._quantization, ranking=self._ranking) clone._clusters = SortedDict( (key, value.copy()) for key, value in self._clusters.items() ) clone._inverse = self._inverse.copy() return clone
[docs] def add(self, item: WrappedClassAd[DJ]): """ Add a new wrapped item, usually a drone, to the RankedAutoCluster. Unless the item is already contained, the item's key is generated and it is sorted in into the clusters accordingly. If there are already items with the same key, the new item is added to the existing cluster. If not, a new cluster is created. :param item: :return: """ if item in self._inverse: raise ValueError(f"{item!r} already stored; use `.update(item)` instead") item_key = self._clustering_key(item) try: self._clusters[item_key].add(item) except KeyError: self._clusters[item_key] = {item} self._inverse[item] = item_key
[docs] def remove(self, item: WrappedClassAd[DJ]): """ Removes the item. :param item: :return: """ item_key = self._inverse.pop(item) cluster = self._clusters[item_key] cluster.remove(item) if not cluster: del self._clusters[item_key]
[docs] def _clustering_key(self, item: WrappedClassAd[DJ]): """ Calculates an item's clustering key based on the specified ranking (in my use case the prejobrank) and the item's available resource. The resulting key's structure is (prejobrank value, (available cpus, available memory, available disk space)). The clustering key is negative as the SortedDict sorts its entries from low keys to high keys. :param item: drone for which the clustering key is calculated. :return: (prejobrank value, (available cpus, available memory, available disk space)) """ # TODO: assert that order is consistent quantization = self._quantization return RankedClusterKey( rank=-1.0 * self._ranking.evaluate(my=item), key=tuple( int(quantize(item[key], quantization.get(key, 1))) for key in ("cpus", "memory", "disk") ), )
[docs] def clusters(self) -> Iterator[Set[WrappedClassAd[DJ]]]: """ :return: iterator of all clusters """ return iter(self._clusters.values())
[docs] def items(self) -> Iterator[Tuple[RankedClusterKey, Set[WrappedClassAd[DJ]]]]: """ :return: iterator of all clusters and corresponding keys """ return iter(self._clusters.items())
[docs] def cluster_groups(self) -> Iterator[List[Set[WrappedClassAd[Drone]]]]: """ Sort clusters by the ranking key and then by the amount of available resources into nested lists of sets. :return: """ group = [] current_rank = None for ranked_key, drones in self._clusters.items(): if next(iter(drones)).empty(): continue if ranked_key.rank != current_rank: current_rank = ranked_key.rank if group: yield group group = [] group.append(drones) if group: yield group
[docs] def lookup(self, job: CachingJob): for _, drones in self._clusters.items(): for drone in drones: drone._wrapped.look_up_cached_data(job)
[docs]class RankedNonClusters(RankedClusters[DJ]): """Automatically cluster jobs or drones by rank only""" def __init__(self, quantization: Dict[str, HTCInt], ranking: Expression): self._quantization = quantization self._ranking = ranking self._clusters: Dict[float, Set[WrappedClassAd[DJ]]] = SortedDict() self._inverse: Dict[WrappedClassAd[DJ], float] = {}
[docs] def empty(self) -> bool: for drones in self._clusters.values(): for drone in drones: if not drone.empty(): return False return True
[docs] def copy(self) -> "RankedNonClusters[DJ]": clone = type(self)(quantization=self._quantization, ranking=self._ranking) clone._clusters = SortedDict( (key, value.copy()) for key, value in self._clusters.items() ) clone._inverse = self._inverse.copy() return clone
[docs] def add(self, item: WrappedClassAd[DJ]): if item in self._inverse: raise ValueError(f"{item!r} already stored; use `.update(item)` instead") item_key = self._clustering_key(item) try: self._clusters[item_key].add(item) except KeyError: self._clusters[item_key] = {item} self._inverse[item] = item_key
[docs] def remove(self, item: WrappedClassAd[DJ]): item_key = self._inverse.pop(item) cluster = self._clusters[item_key] cluster.remove(item) if not cluster: del self._clusters[item_key]
[docs] def update(self, item): self.remove(item) self.add(item)
[docs] def _clustering_key(self, item: WrappedClassAd[DJ]): """ For RankNonClusters there is only one clustering key, the objects defined ranking. The clustering key is negative as the SortedDict sorts its entries from low keys to high keys. """ return -1.0 * self._ranking.evaluate(my=item)
def clusters(self) -> Iterator[Set[WrappedClassAd[DJ]]]: return iter(self._clusters.values()) def items(self) -> Iterator[Tuple[float, Set[WrappedClassAd[DJ]]]]: return iter(self._clusters.items())
[docs] def cluster_groups(self) -> Iterator[List[Set[WrappedClassAd[Drone]]]]: """ Sorts cluster by the ranking key. As there is no autoclustering, every drone is in a dedicated set and drones of the same ranking are combined into a list. These lists are then sorted by increasing ranking. :return: iterator of the lists containing drones with identical key """ for _ranked_key, drones in self._clusters.items(): yield [{item} for item in drones]
[docs] def lookup(self, job: CachingJob): for _, drones in self._clusters.items(): for drone in drones: drone._wrapped.look_up_cached_data(job)
[docs]class CondorClassadJobScheduler(JobScheduler): """ Goal of the htcondor job scheduler is to have a scheduler that somehow mimics how htcondor does schedule jobs. Htcondor does scheduling based on a priority queue. The priorities itself are managed by operators of htcondor. So different instances can apparently behave very different. In this case a priority queue that sorts job slots by increasing cost is built. The scheduler checks if a job either exactly fits a slot or if it does fit into it several times. The cost for putting a job at a given slot is given by the amount of resources that might remain unallocated. """ def __init__( self, job_queue, machine_ad: str = machine_ad_defaults, job_ad: str = job_ad_defaults, pre_job_rank: str = "0", interval: float = 60, autocluster: bool = False, ): """ Initializes the CondorClassadJobScheduler :param job_queue: queue of jobs that are scheduled in the following simulation :param machine_ad: ClassAd that is used with every drone :param job_ad: ClassAd that is used with every job :param pre_job_rank: ClassAd attribute that all drones are sorted by :param interval: time between scheduling cycles :param autocluster: could be used to decide whether to use autoclusters """ self._stream_queue = job_queue self._drones: RankedClusters[Drone] = RankedNonClusters( quantization=quantization_defaults, ranking=parse(pre_job_rank) ) self.interval = interval self.job_queue = JobQueue() self._collecting = True self._processing = Resources(jobs=0) # temporary solution self._wrapped_classads = WeakKeyDictionary() self._machine_classad = parse(machine_ad) self._job_classad = parse(job_ad) @property def drone_list(self) -> Iterator[Drone]: """ Takes an iterator over the WrappedClassAd objects of drones known to the scheduler, extracts the drones and returns an iterator over the drone objects. :return: """ for cluster in self._drones.clusters(): for drone in cluster: yield drone._wrapped
[docs] def register_drone(self, drone: Drone): """ Provides the drones with the drone ClassAd, combines both into one object and adds the resulting WrappedClassAd object to the drones known to the scheduler as well as the dictionary containing all WrappedClassAd objects the scheduler works with. :param drone: """ wrapped_drone = WrappedClassAd(classad=self._machine_classad, wrapped=drone) self._drones.add(wrapped_drone) self._wrapped_classads[drone] = wrapped_drone
[docs] def unregister_drone(self, drone: Drone): """ Remove a drone's representation from the scheduler's scope. :param drone: :return: """ drone_wrapper = self._wrapped_classads[drone] self._drones.remove(drone_wrapper)
[docs] def update_drone(self, drone: Drone): """ Update a drone's representation in the scheduler scope. :param drone: :return: """ drone_wrapper = self._wrapped_classads[drone] self._drones.update(drone_wrapper)
[docs] async def run(self): """ Runs the scheduler's functionality. One executed, the scheduler starts up and begins to add the jobs that are :return: """ async with Scope() as scope: async for _ in interval(self.interval): await self._schedule_jobs() if ( not self._collecting and not self.job_queue and == 0 ): break
[docs] @staticmethod def _match_job( job: ClassAd, pre_job_clusters: Iterator[List[Set[WrappedClassAd[Drone]]]] ): """ Tries to find a match for the transferred job among the available drones. :param job: job to match :param pre_job_clusters: list of clusters of wrapped drones that are presorted by a clustering mechanism of RankedAutoClusters/RankedNonClusters that mimics the HTCondor NEGOTIATOR_PRE_JOB_RANK, short prejobrank. The clusters contain drones that are considered to be equivalent with respect to all Requirements and Ranks that are used during the matchmaking process. This mimics the Autoclustering functionality of HTCondor. [[highest prejobrank {autocluster}, {autocluster}], ..., [lowest prejobrank { autocluster}, {autocluster}] :return: drone that is the best match for the job The matching is performed in several steps: 1. The job's requirements are evaluted and only drones that meet them are considered further. A drone of every autocluster is extracted from pre_job_clusters and if it meets the job's requirements it is not removed from pre_job_clusters. 2. The autoclusters that are equivalent with respect to the prejobrank are then sorted by the job's rank expression. The resulting format of pre_job_clusters is [[(highest prejobrank, highest jobrank) {autocluster} {autocluster}, ..., (highest prejobrank, lowest jobrank) {autocluster}], ...] 3. The resulting pre_job_clusters are then iterated and the drone with the highest (prejobrank, jobrank) whose requirements are also compatible with the job is returned as best match. """ def debug_evaluate(expr, my, target=None): """ Reimplementation of the classad packages evaluate function. Having it here enables developers to inspect the ClassAd evaluation process more closely and to add debug output if necessary. :param expr: :param my: :param target: :return: """ if type(expr) is str: expr = my[expr] result = expr.evaluate(my=my, target=target) return result if job["Requirements"] != Undefined(): pre_job_clusters_tmp = [] for cluster_group in pre_job_clusters: cluster_group_tmp = [] for cluster in cluster_group: if debug_evaluate( "Requirements", my=job, target=next(iter(cluster)) ): cluster_group_tmp.append(cluster) pre_job_clusters_tmp.append(cluster_group_tmp) pre_job_clusters = pre_job_clusters_tmp if job["Rank"] != Undefined(): pre_job_clusters_tmp = [] for cluster_group in pre_job_clusters: pre_job_clusters_tmp.append( sorted( cluster_group, key=lambda cluster: ( debug_evaluate("Rank", my=job, target=next(iter(cluster))), random.random(), ), reverse=True, ) ) pre_job_clusters = pre_job_clusters_tmp for cluster_group in pre_job_clusters: # TODO: if we have POST_JOB_RANK, collect *all* matches of a group for cluster in cluster_group: for drone in cluster: if drone["Requirements"] == Undefined() or drone.evaluate( "Requirements", my=drone, target=job ): return drone raise NoMatch()
[docs] async def _schedule_jobs(self): """ Handles the scheduling of jobs. Tried to match the jobs in the job queue to available resources. This occurs in several steps. 1. The list of drones known to the scheduler is copied. The copy can then be used to keep track of the drones' available resources while matching jobs as the jobs allocate resources on the original drones before being processed but not during scheduling. 2. The job in the job queue are matched to (the copied)resources iteratively. The actual matching is performed by the `_match_job` method that returns the most suitable drone unless no drone is compatible with the job's requirements. If a match was found, the resources requested by the job are allocated on the matched drone. If no resources remain unallocated after the last job's allocation, the matching process is ended for this scheduler interval. 3. After the job matching is finished, the matched jobs are removed from the job queue as the index of a job in the job queue changes once a job with a lower index is removed from the queue. 4. The matched jobs' execution is triggered. """ # Pre CachingJob Rank is the same for all jobs # Use a copy to allow temporary "remainder after match" estimates if self._drones.empty(): return pre_job_drones = self._drones.copy() matches: List[ Tuple[int, WrappedClassAd[CachingJob], WrappedClassAd[Drone]] ] = [] for queue_index, candidate_job in enumerate(self.job_queue): try: pre_job_drones.lookup(candidate_job._wrapped) matched_drone = self._match_job( candidate_job, pre_job_drones.cluster_groups() ) except NoMatch: candidate_job._wrapped.failed_matches += 1 continue else: matches.append((queue_index, candidate_job, matched_drone)) for key, value in candidate_job._wrapped.resources.items(): matched_drone._temp[key] = ( matched_drone._temp.get( key, matched_drone._wrapped.theoretical_available_resources[key], ) - value ) pre_job_drones.update(matched_drone) # monitoring/coordination stuff if ( candidate_job._wrapped._total_input_data and matched_drone._wrapped.cached_data ): candidate_job._wrapped._cached_data = ( matched_drone._wrapped.cached_data ) if pre_job_drones.empty(): break if not matches: return # TODO: optimize for few matches, many matches, all matches for queue_index, _, _ in reversed(matches): del self.job_queue[queue_index] for _, job, drone in matches: drone.clear_temporary_resources() await self._execute_job(job=job, drone=drone) await sampling_required.put(self) # NOTE: Is this correct? Triggers once instead of for each job await sampling_required.put(self.job_queue) await sampling_required.put(UserDemand(len(self.job_queue)))
[docs] async def _execute_job(self, job: WrappedClassAd, drone: WrappedClassAd): """ Schedules a job on a drone by extracting both objects from the respective WrappedClassAd and using the drone's scheduling functionality :param job: :param drone: """ wrapped_job = job._wrapped wrapped_drone = drone._wrapped await wrapped_drone.schedule_job(wrapped_job)
[docs] async def _collect_jobs(self): """ Combines jobs that are imported from the simulation's job config with a job ClassAd and adds the resulting WrappedClassAd objects to the scheduler's job queue. """ async for job in self._stream_queue: wrapped_job = WrappedClassAd(classad=self._job_classad, wrapped=job) self._wrapped_classads[job] = wrapped_job self.job_queue.append(wrapped_job) await self._processing.increase(jobs=1) # TODO: logging happens with each job # TODO: job queue to the outside now contains wrapped classads... await sampling_required.put(self.job_queue) await sampling_required.put(UserDemand(len(self.job_queue))) self._collecting = False
[docs] async def job_finished(self, job): """ Handles the impact of finishing jobs on the scheduler. If the job is completed successfully, the amount of running jobs matched by the current scheduler instance is reduced. If the job is not finished successfully, it is resubmitted to the scheduler's job queue. :param job: """ if job.successful: await self._processing.decrease(jobs=1) else: self.job_queue.append(self._wrapped_classads[job])