Detailed documentation of all relevant modules

Other modules from LAPIS

exception lapis.scheduler.NoMatch[source]

A job could not be matched to any drone

class lapis.scheduler.Bucket(iterable=(), /)[source]
class lapis.scheduler.Cluster(iterable=(), /)[source]
class lapis.scheduler.CondorClassadJobScheduler(job_queue, machine_ad: str = 'requirements = target.requestcpus <= my.cpus', job_ad: str = '\nrequirements = my.requestcpus <= target.cpus && my.requestmemory <= target.memory\n', pre_job_rank: str = '0', interval: float = 60, autocluster: bool = False)[source]

Goal of the htcondor job scheduler is to have a scheduler that somehow mimics how htcondor does schedule jobs. Htcondor does scheduling based on a priority queue. The priorities itself are managed by operators of htcondor. So different instances can apparently behave very different. In this case a priority queue that sorts job slots by increasing cost is built. The scheduler checks if a job either exactly fits a slot or if it does fit into it several times. The cost for putting a job at a given slot is given by the amount of resources that might remain unallocated.

async _collect_jobs()[source]

Combines jobs that are imported from the simulation’s job config with a job ClassAd and adds the resulting WrappedClassAd objects to the scheduler’s job queue.

async _execute_job(job: lapis.scheduler.WrappedClassAd, drone: lapis.scheduler.WrappedClassAd)[source]

Schedules a job on a drone by extracting both objects from the respective WrappedClassAd and using the drone’s scheduling functionality

Parameters
  • job

  • drone

static _match_job(job: classad._expression.ClassAd, pre_job_clusters: Iterator[List[Set[lapis.scheduler.WrappedClassAd[lapis.drone.Drone]]]])[source]

Tries to find a match for the transferred job among the available drones.

Parameters
  • job – job to match

  • pre_job_clusters – list of clusters of wrapped drones that are

presorted by a clustering mechanism of RankedAutoClusters/RankedNonClusters that mimics the HTCondor NEGOTIATOR_PRE_JOB_RANK, short prejobrank. The clusters contain drones that are considered to be equivalent with respect to all Requirements and Ranks that are used during the matchmaking process. This mimics the Autoclustering functionality of HTCondor. [[highest prejobrank {autocluster}, {autocluster}], …, [lowest prejobrank { autocluster}, {autocluster}] :return: drone that is the best match for the job

The matching is performed in several steps: 1. The job’s requirements are evaluted and only drones that meet them are considered further. A drone of every autocluster is extracted from pre_job_clusters and if it meets the job’s requirements it is not removed from pre_job_clusters. 2. The autoclusters that are equivalent with respect to the prejobrank are then sorted by the job’s rank expression. The resulting format of pre_job_clusters is [[(highest prejobrank, highest jobrank) {autocluster} {autocluster}, …, (highest prejobrank, lowest jobrank) {autocluster}], …] 3. The resulting pre_job_clusters are then iterated and the drone with the highest (prejobrank, jobrank) whose requirements are also compatible with the job is returned as best match.

async _schedule_jobs()[source]

Handles the scheduling of jobs. Tried to match the jobs in the job queue to available resources. This occurs in several steps. 1. The list of drones known to the scheduler is copied. The copy can then be used to keep track of the drones’ available resources while matching jobs as the jobs allocate resources on the original drones before being processed but not during scheduling. 2. The job in the job queue are matched to (the copied)resources iteratively. The actual matching is performed by the _match_job method that returns the most suitable drone unless no drone is compatible with the job’s requirements. If a match was found, the resources requested by the job are allocated on the matched drone. If no resources remain unallocated after the last job’s allocation, the matching process is ended for this scheduler interval. 3. After the job matching is finished, the matched jobs are removed from the job queue as the index of a job in the job queue changes once a job with a lower index is removed from the queue. 4. The matched jobs’ execution is triggered.

async job_finished(job)[source]

Handles the impact of finishing jobs on the scheduler. If the job is completed successfully, the amount of running jobs matched by the current scheduler instance is reduced. If the job is not finished successfully, it is resubmitted to the scheduler’s job queue. :param job:

register_drone(drone: lapis.drone.Drone)[source]

Provides the drones with the drone ClassAd, combines both into one object and adds the resulting WrappedClassAd object to the drones known to the scheduler as well as the dictionary containing all WrappedClassAd objects the scheduler works with.

Parameters

drone

async run()[source]

Runs the scheduler’s functionality. One executed, the scheduler starts up and begins to add the jobs that are

Returns

unregister_drone(drone: lapis.drone.Drone)[source]

Remove a drone’s representation from the scheduler’s scope.

Parameters

drone

Returns

update_drone(drone: lapis.drone.Drone)[source]

Update a drone’s representation in the scheduler scope.

Parameters

drone

Returns

property drone_list

Takes an iterator over the WrappedClassAd objects of drones known to the scheduler, extracts the drones and returns an iterator over the drone objects.

Returns

class lapis.scheduler.CondorJobScheduler(job_queue)[source]

Goal of the htcondor job scheduler is to have a scheduler that somehow mimics how htcondor schedules jobs. Htcondor is scheduling based on a priority queue. The priorities itself are managed by operators of htcondor. So different instances can apparently behave very different. A priority queue that sorts job slots by increasing costs is built. The scheduler checks if a job either exactly fits a slot or if it fits several times. The cost for putting a job at a given slot is given by the amount of resources that might remain unallocated.

async job_finished(job)[source]

Declare a job as finished by a drone. This might even mean, that the job has failed and that the scheduler needs to requeue the job for further processing.

register_drone(drone: lapis.drone.Drone)[source]

Register a drone at the scheduler

async run()[source]

Run method of the scheduler

unregister_drone(drone: lapis.drone.Drone)[source]

Unregister a drone at the scheduler

update_drone(drone: lapis.drone.Drone)[source]

Update parameters of a drone

property drone_list

Yields the registered drones

class lapis.scheduler.JobQueue(iterable=(), /)[source]
class lapis.scheduler.JobScheduler[source]
async job_finished(job)[source]

Declare a job as finished by a drone. This might even mean, that the job has failed and that the scheduler needs to requeue the job for further processing.

register_drone(drone: lapis.drone.Drone)[source]

Register a drone at the scheduler

async run()[source]

Run method of the scheduler

unregister_drone(drone: lapis.drone.Drone)[source]

Unregister a drone at the scheduler

update_drone(drone: lapis.drone.Drone)[source]

Update parameters of a drone

property drone_list

Yields the registered drones

class lapis.scheduler.RankedAutoClusters(quantization: Dict[str, classad._primitives.HTCInt], ranking: classad._base_expression.Expression)[source]

Automatically cluster similar jobs or drones

_clustering_key(item: lapis.scheduler.WrappedClassAd[DJ])[source]

Calculates an item’s clustering key based on the specified ranking (in my use case the prejobrank) and the item’s available resource. The resulting key’s structure is (prejobrank value, (available cpus, available memory, available disk space)). The clustering key is negative as the SortedDict sorts its entries from low keys to high keys.

Parameters

item – drone for which the clustering key is calculated.

Returns

(prejobrank value, (available cpus, available memory, available

disk space))

add(item: lapis.scheduler.WrappedClassAd[DJ])[source]

Add a new wrapped item, usually a drone, to the RankedAutoCluster. Unless the item is already contained, the item’s key is generated and it is sorted in into the clusters accordingly. If there are already items with the same key, the new item is added to the existing cluster. If not, a new cluster is created.

Parameters

item

Returns

cluster_groups()Iterator[List[Set[lapis.scheduler.WrappedClassAd[lapis.drone.Drone]]]][source]

Sort clusters by the ranking key and then by the amount of available resources into nested lists of sets.

Returns

clusters()Iterator[Set[lapis.scheduler.WrappedClassAd[DJ]]][source]
Returns

iterator of all clusters

copy()lapis.scheduler.RankedAutoClusters[DJ][source]

Copy the entire ranked auto clusters

empty()bool[source]

Checks whether all drones in the RankedCluster are empty and currently not running any jobs.

Returns

items()Iterator[Tuple[lapis.scheduler.RankedClusterKey, Set[lapis.scheduler.WrappedClassAd[DJ]]]][source]
Returns

iterator of all clusters and corresponding keys

lookup(job: lapis.cachingjob.CachingJob)[source]

Update information about cached data for every drone

remove(item: lapis.scheduler.WrappedClassAd[DJ])[source]

Removes the item.

Parameters

item

Returns

class lapis.scheduler.RankedClusterKey(rank, key)[source]
_asdict()

Return a new OrderedDict which maps field names to their values.

classmethod _make(iterable)

Make a new RankedClusterKey object from a sequence or iterable

_replace(**kwds)

Return a new RankedClusterKey object replacing specified fields with new values

property key

Alias for field number 1

property rank

Alias for field number 0

class lapis.scheduler.RankedClusters(quantization: Dict[str, classad._primitives.HTCInt], ranking: classad._base_expression.Expression)[source]

Automatically cluster drones by rank

abstract add(item: lapis.scheduler.WrappedClassAd[DJ])None[source]

Add a new item

abstract cluster_groups()Iterator[List[Set[lapis.scheduler.WrappedClassAd[lapis.drone.Drone]]]][source]

Group autoclusters by PreJobRank

abstract copy()lapis.scheduler.RankedAutoClusters[DJ][source]

Copy the entire ranked auto clusters

abstract empty()bool[source]

Whether there are no resources available

abstract lookup(job: lapis.cachingjob.CachingJob)None[source]

Update information about cached data for every drone

abstract remove(item: lapis.scheduler.WrappedClassAd[DJ])None[source]

Remove an existing item

update(item)None[source]

Update an existing item with its current state

class lapis.scheduler.RankedNonClusters(quantization: Dict[str, classad._primitives.HTCInt], ranking: classad._base_expression.Expression)[source]

Automatically cluster jobs or drones by rank only

_clustering_key(item: lapis.scheduler.WrappedClassAd[DJ])[source]

For RankNonClusters there is only one clustering key, the objects defined ranking. The clustering key is negative as the SortedDict sorts its entries from low keys to high keys.

add(item: lapis.scheduler.WrappedClassAd[DJ])[source]

Add a new item

cluster_groups()Iterator[List[Set[lapis.scheduler.WrappedClassAd[lapis.drone.Drone]]]][source]

Sorts cluster by the ranking key. As there is no autoclustering, every drone is in a dedicated set and drones of the same ranking are combined into a list. These lists are then sorted by increasing ranking.

Returns

iterator of the lists containing drones with identical key

copy()lapis.scheduler.RankedNonClusters[DJ][source]

Copy the entire ranked auto clusters

empty()bool[source]

Whether there are no resources available

lookup(job: lapis.cachingjob.CachingJob)[source]

Update information about cached data for every drone

remove(item: lapis.scheduler.WrappedClassAd[DJ])[source]

Remove an existing item

update(item)[source]

Update an existing item with its current state

class lapis.scheduler.WrappedClassAd(classad: classad._expression.ClassAd, wrapped: DJ)[source]

Combines the original job/drone object and the associated ClassAd.

empty()[source]

Only relevant for wrapped drones to determine whether there are jobs running on them. If this is the case the amount of cores in usage is >= 1.

Returns

true if no CPU cores are in use, false if this is not the case

async lapis.job.job_to_queue_scheduler(job_generator, job_queue: usim._basics.streams.Queue)[source]

Handles reading the simulation’s job input and puts the job’s into the job queue

Parameters
  • job_generator – reader object that yields jobs from input

  • job_queue – queue the jobs are added to

class lapis.drone.Drone(scheduler, pool_resources: Optional[dict] = None, scheduling_duration: Optional[float] = None, ignore_resources: list = None, sitename: str = None, connection: Connection = None, empty: callable = <function Drone.<lambda>>)[source]

Represents worker nodes in the simulation.

async _run_job(job: lapis.cachingjob.CachingJob, kill: bool)[source]

Method manages to start a job in the context of the given drone. The job is started regardless of the available resources. The resource allocation takes place after starting the job and the job is killed if the drone’s overall resources are exceeded. In addition, if the kill flag is set, jobs are killed if the resources they use exceed the resources they requested. Then the end of the job’s execution is awaited and the drones status known to the scheduler is changed.

Parameters
  • job – the job to start

  • kill – if True, a job is killed when used resources exceed requested resources

empty()[source]

Checks whether there are any jobs running on this drone

Returns

true if no jobs are running on this drone, false else

look_up_cached_data(job: lapis.cachingjob.CachingJob)[source]

Determines the amount of the job’s input data that is stored in caches the drone can access and sets the drone’s cached_data attribute to the resulting value. This quantity can then be used in the job matching process. Pay attention to the fact that the current implementation only works for hitrate based caching and that while KeyErrors should not occur due to the way the method is called, KeyErrors are not handled here.

Parameters

job

async run()[source]

Handles the drone’s activity during simulation. Upon execution the drone registers itself at the scheduler and once jobs are scheduled to the drone’s job queue, these jobs are executed. Starting jobs via a job queue was introduced to avoid errors in resource allocation and monitoring.

async shutdown()[source]

Upon shutdown, the drone unregisters from the scheduler.

_empty

method that is used to determine whether a drone is empty

cached_data

used during scheduling, calculated for each job, is assigned the expectation value for the amount of cached data that is available to the drone

connection

connection object that holds remote connection and handles file transfers

jobs_with_cached_data

amount of jobs that currently run on the drone and that could read from the cache

sitename

identifies the site the drone belongs to, used to determine which caches a drone can use

class lapis.pool.Pool(make_drone: Callable, *, capacity: int = inf, init: int = 0, name: Optional[str] = None)[source]

A pool encapsulating a number of pools or drones. Given a specific demand, allocation and utilisation, the pool is able to adapt in terms of number of drones providing the given resources.

Parameters
  • capacity – Maximum number of pools that can be instantiated within the pool

  • init – Number of pools to instantiate at creation time of the pool

  • name – Name of the pool

  • make_drone – Callable to create a drone with specific properties for this pool

_init_pool(scope: usim._primitives.context.Scope, init: int = 0)[source]

Initialisation of existing drones at creation time of pool.

Parameters
  • scope

  • init – Number of drones to create.

async run()[source]

Pool periodically checks the current demand and provided drones. If demand is higher than the current level, the pool takes care of initialising new drones. Otherwise drones get removed.

property allocation

Fraction of the provided resources which are assigned for usage

property demand

The volume of resources to be provided by this pool

property supply

The volume of resources that is provided by this pool

property utilisation

Fraction of the provided resources which are actively used

class lapis.pool.StaticPool(make_drone: Callable, capacity: int = 0)[source]

A static pool does not react on changing conditions regarding demand, allocation and utilisation but instead initialises the capacity of given drones with initialised resources.

Parameters
  • capacity – Maximum number of pools that can be instantiated within the pool

  • resources – Dictionary of resources available for each pool instantiated within the pool

async run()[source]

Pool runs forever and does not check if number of drones needs to be adapted.