Detailed documentation of all relevant modules¶
Other modules from LAPIS¶
-
class
lapis.scheduler.
CondorClassadJobScheduler
(job_queue, machine_ad: str = 'requirements = target.requestcpus <= my.cpus', job_ad: str = '\nrequirements = my.requestcpus <= target.cpus && my.requestmemory <= target.memory\n', pre_job_rank: str = '0', interval: float = 60, autocluster: bool = False)[source]¶ Goal of the htcondor job scheduler is to have a scheduler that somehow mimics how htcondor does schedule jobs. Htcondor does scheduling based on a priority queue. The priorities itself are managed by operators of htcondor. So different instances can apparently behave very different. In this case a priority queue that sorts job slots by increasing cost is built. The scheduler checks if a job either exactly fits a slot or if it does fit into it several times. The cost for putting a job at a given slot is given by the amount of resources that might remain unallocated.
-
async
_collect_jobs
()[source]¶ Combines jobs that are imported from the simulation’s job config with a job ClassAd and adds the resulting WrappedClassAd objects to the scheduler’s job queue.
-
async
_execute_job
(job: lapis.scheduler.WrappedClassAd, drone: lapis.scheduler.WrappedClassAd)[source]¶ Schedules a job on a drone by extracting both objects from the respective WrappedClassAd and using the drone’s scheduling functionality
- Parameters
job –
drone –
-
static
_match_job
(job: classad._expression.ClassAd, pre_job_clusters: Iterator[List[Set[lapis.scheduler.WrappedClassAd[lapis.drone.Drone]]]])[source]¶ Tries to find a match for the transferred job among the available drones.
- Parameters
job – job to match
pre_job_clusters – list of clusters of wrapped drones that are
presorted by a clustering mechanism of RankedAutoClusters/RankedNonClusters that mimics the HTCondor NEGOTIATOR_PRE_JOB_RANK, short prejobrank. The clusters contain drones that are considered to be equivalent with respect to all Requirements and Ranks that are used during the matchmaking process. This mimics the Autoclustering functionality of HTCondor. [[highest prejobrank {autocluster}, {autocluster}], …, [lowest prejobrank { autocluster}, {autocluster}] :return: drone that is the best match for the job
The matching is performed in several steps: 1. The job’s requirements are evaluted and only drones that meet them are considered further. A drone of every autocluster is extracted from pre_job_clusters and if it meets the job’s requirements it is not removed from pre_job_clusters. 2. The autoclusters that are equivalent with respect to the prejobrank are then sorted by the job’s rank expression. The resulting format of pre_job_clusters is [[(highest prejobrank, highest jobrank) {autocluster} {autocluster}, …, (highest prejobrank, lowest jobrank) {autocluster}], …] 3. The resulting pre_job_clusters are then iterated and the drone with the highest (prejobrank, jobrank) whose requirements are also compatible with the job is returned as best match.
-
async
_schedule_jobs
()[source]¶ Handles the scheduling of jobs. Tried to match the jobs in the job queue to available resources. This occurs in several steps. 1. The list of drones known to the scheduler is copied. The copy can then be used to keep track of the drones’ available resources while matching jobs as the jobs allocate resources on the original drones before being processed but not during scheduling. 2. The job in the job queue are matched to (the copied)resources iteratively. The actual matching is performed by the _match_job method that returns the most suitable drone unless no drone is compatible with the job’s requirements. If a match was found, the resources requested by the job are allocated on the matched drone. If no resources remain unallocated after the last job’s allocation, the matching process is ended for this scheduler interval. 3. After the job matching is finished, the matched jobs are removed from the job queue as the index of a job in the job queue changes once a job with a lower index is removed from the queue. 4. The matched jobs’ execution is triggered.
-
async
job_finished
(job)[source]¶ Handles the impact of finishing jobs on the scheduler. If the job is completed successfully, the amount of running jobs matched by the current scheduler instance is reduced. If the job is not finished successfully, it is resubmitted to the scheduler’s job queue. :param job:
-
register_drone
(drone: lapis.drone.Drone)[source]¶ Provides the drones with the drone ClassAd, combines both into one object and adds the resulting WrappedClassAd object to the drones known to the scheduler as well as the dictionary containing all WrappedClassAd objects the scheduler works with.
- Parameters
drone –
-
async
run
()[source]¶ Runs the scheduler’s functionality. One executed, the scheduler starts up and begins to add the jobs that are
- Returns
-
unregister_drone
(drone: lapis.drone.Drone)[source]¶ Remove a drone’s representation from the scheduler’s scope.
- Parameters
drone –
- Returns
-
update_drone
(drone: lapis.drone.Drone)[source]¶ Update a drone’s representation in the scheduler scope.
- Parameters
drone –
- Returns
-
property
drone_list
¶ Takes an iterator over the WrappedClassAd objects of drones known to the scheduler, extracts the drones and returns an iterator over the drone objects.
- Returns
-
async
-
class
lapis.scheduler.
CondorJobScheduler
(job_queue)[source]¶ Goal of the htcondor job scheduler is to have a scheduler that somehow mimics how htcondor schedules jobs. Htcondor is scheduling based on a priority queue. The priorities itself are managed by operators of htcondor. So different instances can apparently behave very different. A priority queue that sorts job slots by increasing costs is built. The scheduler checks if a job either exactly fits a slot or if it fits several times. The cost for putting a job at a given slot is given by the amount of resources that might remain unallocated.
-
async
job_finished
(job)[source]¶ Declare a job as finished by a drone. This might even mean, that the job has failed and that the scheduler needs to requeue the job for further processing.
-
register_drone
(drone: lapis.drone.Drone)[source]¶ Register a drone at the scheduler
-
unregister_drone
(drone: lapis.drone.Drone)[source]¶ Unregister a drone at the scheduler
-
update_drone
(drone: lapis.drone.Drone)[source]¶ Update parameters of a drone
-
property
drone_list
¶ Yields the registered drones
-
async
-
class
lapis.scheduler.
JobScheduler
[source]¶ -
async
job_finished
(job)[source]¶ Declare a job as finished by a drone. This might even mean, that the job has failed and that the scheduler needs to requeue the job for further processing.
-
register_drone
(drone: lapis.drone.Drone)[source]¶ Register a drone at the scheduler
-
unregister_drone
(drone: lapis.drone.Drone)[source]¶ Unregister a drone at the scheduler
-
update_drone
(drone: lapis.drone.Drone)[source]¶ Update parameters of a drone
-
property
drone_list
¶ Yields the registered drones
-
async
-
class
lapis.scheduler.
RankedAutoClusters
(quantization: Dict[str, classad._primitives.HTCInt], ranking: classad._base_expression.Expression)[source]¶ Automatically cluster similar jobs or drones
-
_clustering_key
(item: lapis.scheduler.WrappedClassAd[DJ])[source]¶ Calculates an item’s clustering key based on the specified ranking (in my use case the prejobrank) and the item’s available resource. The resulting key’s structure is (prejobrank value, (available cpus, available memory, available disk space)). The clustering key is negative as the SortedDict sorts its entries from low keys to high keys.
- Parameters
item – drone for which the clustering key is calculated.
- Returns
(prejobrank value, (available cpus, available memory, available
disk space))
-
add
(item: lapis.scheduler.WrappedClassAd[DJ])[source]¶ Add a new wrapped item, usually a drone, to the RankedAutoCluster. Unless the item is already contained, the item’s key is generated and it is sorted in into the clusters accordingly. If there are already items with the same key, the new item is added to the existing cluster. If not, a new cluster is created.
- Parameters
item –
- Returns
-
cluster_groups
() → Iterator[List[Set[lapis.scheduler.WrappedClassAd[lapis.drone.Drone]]]][source]¶ Sort clusters by the ranking key and then by the amount of available resources into nested lists of sets.
- Returns
-
clusters
() → Iterator[Set[lapis.scheduler.WrappedClassAd[DJ]]][source]¶ - Returns
iterator of all clusters
-
copy
() → lapis.scheduler.RankedAutoClusters[DJ][source]¶ Copy the entire ranked auto clusters
-
empty
() → bool[source]¶ Checks whether all drones in the RankedCluster are empty and currently not running any jobs.
- Returns
-
items
() → Iterator[Tuple[lapis.scheduler.RankedClusterKey, Set[lapis.scheduler.WrappedClassAd[DJ]]]][source]¶ - Returns
iterator of all clusters and corresponding keys
-
lookup
(job: lapis.cachingjob.CachingJob)[source]¶ Update information about cached data for every drone
-
remove
(item: lapis.scheduler.WrappedClassAd[DJ])[source]¶ Removes the item.
- Parameters
item –
- Returns
-
-
class
lapis.scheduler.
RankedClusterKey
(rank, key)[source]¶ -
_asdict
()¶ Return a new OrderedDict which maps field names to their values.
-
classmethod
_make
(iterable)¶ Make a new RankedClusterKey object from a sequence or iterable
-
_replace
(**kwds)¶ Return a new RankedClusterKey object replacing specified fields with new values
-
property
key
¶ Alias for field number 1
-
property
rank
¶ Alias for field number 0
-
-
class
lapis.scheduler.
RankedClusters
(quantization: Dict[str, classad._primitives.HTCInt], ranking: classad._base_expression.Expression)[source]¶ Automatically cluster drones by rank
-
abstract
add
(item: lapis.scheduler.WrappedClassAd[DJ]) → None[source]¶ Add a new item
-
abstract
cluster_groups
() → Iterator[List[Set[lapis.scheduler.WrappedClassAd[lapis.drone.Drone]]]][source]¶ Group autoclusters by PreJobRank
-
abstract
copy
() → lapis.scheduler.RankedAutoClusters[DJ][source]¶ Copy the entire ranked auto clusters
-
abstract
lookup
(job: lapis.cachingjob.CachingJob) → None[source]¶ Update information about cached data for every drone
-
abstract
remove
(item: lapis.scheduler.WrappedClassAd[DJ]) → None[source]¶ Remove an existing item
-
abstract
-
class
lapis.scheduler.
RankedNonClusters
(quantization: Dict[str, classad._primitives.HTCInt], ranking: classad._base_expression.Expression)[source]¶ Automatically cluster jobs or drones by rank only
-
_clustering_key
(item: lapis.scheduler.WrappedClassAd[DJ])[source]¶ For RankNonClusters there is only one clustering key, the objects defined ranking. The clustering key is negative as the SortedDict sorts its entries from low keys to high keys.
-
add
(item: lapis.scheduler.WrappedClassAd[DJ])[source]¶ Add a new item
-
cluster_groups
() → Iterator[List[Set[lapis.scheduler.WrappedClassAd[lapis.drone.Drone]]]][source]¶ Sorts cluster by the ranking key. As there is no autoclustering, every drone is in a dedicated set and drones of the same ranking are combined into a list. These lists are then sorted by increasing ranking.
- Returns
iterator of the lists containing drones with identical key
-
copy
() → lapis.scheduler.RankedNonClusters[DJ][source]¶ Copy the entire ranked auto clusters
-
lookup
(job: lapis.cachingjob.CachingJob)[source]¶ Update information about cached data for every drone
-
remove
(item: lapis.scheduler.WrappedClassAd[DJ])[source]¶ Remove an existing item
-
-
class
lapis.scheduler.
WrappedClassAd
(classad: classad._expression.ClassAd, wrapped: DJ)[source]¶ Combines the original job/drone object and the associated ClassAd.
-
async
lapis.job.
job_to_queue_scheduler
(job_generator, job_queue: usim._basics.streams.Queue)[source]¶ Handles reading the simulation’s job input and puts the job’s into the job queue
- Parameters
job_generator – reader object that yields jobs from input
job_queue – queue the jobs are added to
-
class
lapis.drone.
Drone
(scheduler, pool_resources: Optional[dict] = None, scheduling_duration: Optional[float] = None, ignore_resources: list = None, sitename: str = None, connection: Connection = None, empty: callable = <function Drone.<lambda>>)[source]¶ Represents worker nodes in the simulation.
-
async
_run_job
(job: lapis.cachingjob.CachingJob, kill: bool)[source]¶ Method manages to start a job in the context of the given drone. The job is started regardless of the available resources. The resource allocation takes place after starting the job and the job is killed if the drone’s overall resources are exceeded. In addition, if the kill flag is set, jobs are killed if the resources they use exceed the resources they requested. Then the end of the job’s execution is awaited and the drones status known to the scheduler is changed.
- Parameters
job – the job to start
kill – if True, a job is killed when used resources exceed requested resources
-
empty
()[source]¶ Checks whether there are any jobs running on this drone
- Returns
true if no jobs are running on this drone, false else
-
look_up_cached_data
(job: lapis.cachingjob.CachingJob)[source]¶ Determines the amount of the job’s input data that is stored in caches the drone can access and sets the drone’s cached_data attribute to the resulting value. This quantity can then be used in the job matching process. Pay attention to the fact that the current implementation only works for hitrate based caching and that while KeyErrors should not occur due to the way the method is called, KeyErrors are not handled here.
- Parameters
job –
-
async
run
()[source]¶ Handles the drone’s activity during simulation. Upon execution the drone registers itself at the scheduler and once jobs are scheduled to the drone’s job queue, these jobs are executed. Starting jobs via a job queue was introduced to avoid errors in resource allocation and monitoring.
-
_empty
¶ method that is used to determine whether a drone is empty
-
cached_data
¶ used during scheduling, calculated for each job, is assigned the expectation value for the amount of cached data that is available to the drone
-
connection
¶ connection object that holds remote connection and handles file transfers
-
jobs_with_cached_data
¶ amount of jobs that currently run on the drone and that could read from the cache
-
sitename
¶ identifies the site the drone belongs to, used to determine which caches a drone can use
-
async
-
class
lapis.pool.
Pool
(make_drone: Callable, *, capacity: int = inf, init: int = 0, name: Optional[str] = None)[source]¶ A pool encapsulating a number of pools or drones. Given a specific demand, allocation and utilisation, the pool is able to adapt in terms of number of drones providing the given resources.
- Parameters
capacity – Maximum number of pools that can be instantiated within the pool
init – Number of pools to instantiate at creation time of the pool
name – Name of the pool
make_drone – Callable to create a drone with specific properties for this pool
-
_init_pool
(scope: usim._primitives.context.Scope, init: int = 0)[source]¶ Initialisation of existing drones at creation time of pool.
- Parameters
scope –
init – Number of drones to create.
-
async
run
()[source]¶ Pool periodically checks the current demand and provided drones. If demand is higher than the current level, the pool takes care of initialising new drones. Otherwise drones get removed.
-
property
allocation
¶ Fraction of the provided resources which are assigned for usage
-
property
demand
¶ The volume of resources to be provided by this pool
-
property
supply
¶ The volume of resources that is provided by this pool
-
property
utilisation
¶ Fraction of the provided resources which are actively used
-
class
lapis.pool.
StaticPool
(make_drone: Callable, capacity: int = 0)[source]¶ A static pool does not react on changing conditions regarding demand, allocation and utilisation but instead initialises the capacity of given drones with initialised resources.
- Parameters
capacity – Maximum number of pools that can be instantiated within the pool
resources – Dictionary of resources available for each pool instantiated within the pool