Package ganeti :: Package jqueue :: Class JobQueue
[hide private]
[frames] | no frames]

Class JobQueue

source code


Queue used to manage the jobs.

Instance Methods [hide private]
 
__init__(self, context, cfg)
Constructor for JobQueue.
source code
 
_PickupJobUnlocked(self, job_id)
Load a job from the job queue
source code
 
PickupJob(self, job_id) source code
 
_GetRpc(self, address_list)
Gets RPC runner with context.
source code
 
AddNode(self, node)
Register a new node with the queue.
source code
 
RemoveNode(self, node_name)
Callback called when removing nodes from the cluster.
source code
(list, list)
_GetNodeIp(self)
Helper for returning the node name/ip list.
source code
 
_UpdateJobQueueFile(self, file_name, data, replicate)
Writes a file locally and then replicates it to all nodes.
source code
 
_RenameFilesUnlocked(self, rename)
Renames a file locally and then replicate the change.
source code
_QueuedJob or None
_LoadJobUnlocked(self, job_id)
Loads a job from the disk or memory.
source code
_QueuedJob or None
_LoadJobFromDisk(self, job_id, try_archived, writable=None)
Load the given job file from disk.
source code
_QueuedJob or None
SafeLoadJobFromDisk(self, job_id, try_archived, writable=None)
Load the given job file from disk.
source code
 
_UpdateQueueSizeUnlocked(self)
Update the queue size.
source code
 
_EnqueueJobs(self, jobs)
Helper function to add jobs to worker pool's queue.
source code
 
_EnqueueJobsUnlocked(self, jobs)
Helper function to add jobs to worker pool's queue.
source code
 
_GetJobStatusForDependencies(self, job_id)
Gets the status of a job for dependencies.
source code
 
UpdateJobUnlocked(self, job, replicate=True)
Update a job's on disk storage.
source code
boolean
HasJobBeenFinalized(self, job_id)
Checks if a job has been finalized.
source code
 
CancelJob(self, job_id)
Cancels a job.
source code
 
ChangeJobPriority(self, job_id, priority)
Changes a job's priority.
source code
 
_ModifyJobUnlocked(self, job_id, mod_fn)
Modifies a job.
source code
int
_ArchiveJobsUnlocked(self, jobs)
Archives jobs.
source code
 
_Query(self, fields, qfilter) source code
 
QueryJobs(self, fields, qfilter)
Returns a list of jobs in queue.
source code
list
OldStyleQueryJobs(self, job_ids, fields)
Returns a list of jobs in queue.
source code
bool
PrepareShutdown(self)
Prepare to stop the job queue.
source code
 
Shutdown(self)
Stops the job queue.
source code

Inherited from object: __delattr__, __format__, __getattribute__, __hash__, __new__, __reduce__, __reduce_ex__, __repr__, __setattr__, __sizeof__, __str__, __subclasshook__

Class Methods [hide private]
list
_GetJobIDsUnlocked(cls, sort=True, archived=False)
Return all known job IDs.
source code
 
SubmitManyJobs(cls, jobs)
Create and store multiple jobs.
source code
Static Methods [hide private]
 
_CheckRpcResult(result, nodes, failmsg)
Verifies the status of an RPC call.
source code
str
_GetJobPath(job_id)
Returns the job file for a given job id.
source code
str
_GetArchivedJobPath(job_id)
Returns the archived job file for a give job id.
source code
list
_DetermineJobDirectories(archived)
Build list of directories containing job files.
source code
 
_FormatSubmitError(msg, ops)
Formats errors which occurred while submitting a job.
source code
tuple; (boolean, string or list)
_ResolveJobDependencies(resolve_fn, deps)
Resolves relative job IDs in dependencies.
source code
Properties [hide private]

Inherited from object: __class__

Method Details [hide private]

__init__(self, context, cfg)
(Constructor)

source code 

Constructor for JobQueue.

The constructor will initialize the job queue object and then start loading the current jobs from disk, either for starting them (if they were queue) or for aborting them (if they were already running).

Parameters:
  • context (GanetiContext) - the context object for access to the configuration data and other ganeti objects
Overrides: object.__init__

_PickupJobUnlocked(self, job_id)

source code 

Load a job from the job queue

Pick up a job that already is in the job queue and start/resume it.

PickupJob(self, job_id)

source code 
Decorators:
  • @locking.ssynchronized(_LOCK)

AddNode(self, node)

source code 

Register a new node with the queue.

Parameters:
Decorators:
  • @locking.ssynchronized(_LOCK)

RemoveNode(self, node_name)

source code 

Callback called when removing nodes from the cluster.

Parameters:
  • node_name (str) - the name of the node to remove
Decorators:
  • @locking.ssynchronized(_LOCK)

_CheckRpcResult(result, nodes, failmsg)
Static Method

source code 

Verifies the status of an RPC call.

Since we aim to keep consistency should this node (the current master) fail, we will log errors if our rpc fail, and especially log the case when more than half of the nodes fails.

Parameters:
  • result - the data as returned from the rpc call
  • nodes (list) - the list of nodes we made the call to
  • failmsg (str) - the identifier to be used for logging

_GetNodeIp(self)

source code 

Helper for returning the node name/ip list.

Returns: (list, list)
a tuple of two lists, the first one with the node names and the second one with the node addresses

_UpdateJobQueueFile(self, file_name, data, replicate)

source code 

Writes a file locally and then replicates it to all nodes.

This function will replace the contents of a file on the local node and then replicate it to all the other nodes we have.

Parameters:
  • file_name (str) - the path of the file to be replicated
  • data (str) - the new contents of the file
  • replicate (boolean) - whether to spread the changes to the remote nodes

_RenameFilesUnlocked(self, rename)

source code 

Renames a file locally and then replicate the change.

This function will rename a file in the local queue directory and then replicate this rename to all the other nodes we have.

Parameters:
  • rename (list of (old, new)) - List containing tuples mapping old to new names

_GetJobPath(job_id)
Static Method

source code 

Returns the job file for a given job id.

Parameters:
  • job_id (str) - the job identifier
Returns: str
the path to the job file

_GetArchivedJobPath(job_id)
Static Method

source code 

Returns the archived job file for a give job id.

Parameters:
  • job_id (str) - the job identifier
Returns: str
the path to the archived job file

_DetermineJobDirectories(archived)
Static Method

source code 

Build list of directories containing job files.

Parameters:
  • archived (bool) - Whether to include directories for archived jobs
Returns: list

_GetJobIDsUnlocked(cls, sort=True, archived=False)
Class Method

source code 

Return all known job IDs.

The method only looks at disk because it's a requirement that all jobs are present on disk (so in the _memcache we don't have any extra IDs).

Parameters:
  • sort (boolean) - perform sorting on the returned job ids
Returns: list
the list of job IDs

_LoadJobUnlocked(self, job_id)

source code 

Loads a job from the disk or memory.

Given a job id, this will return the cached job object if existing, or try to load the job from the disk. If loading from disk, it will also add the job to the cache.

Parameters:
  • job_id (int) - the job id
Returns: _QueuedJob or None
either None or the job object

_LoadJobFromDisk(self, job_id, try_archived, writable=None)

source code 

Load the given job file from disk.

Given a job file, read, load and restore it in a _QueuedJob format.

Parameters:
  • job_id (int) - job identifier
  • try_archived (bool) - Whether to try loading an archived job
Returns: _QueuedJob or None
either None or the job object

SafeLoadJobFromDisk(self, job_id, try_archived, writable=None)

source code 

Load the given job file from disk.

Given a job file, read, load and restore it in a _QueuedJob format. In case of error reading the job, it gets returned as None, and the exception is logged.

Parameters:
  • job_id (int) - job identifier
  • try_archived (bool) - Whether to try loading an archived job
Returns: _QueuedJob or None
either None or the job object

_ResolveJobDependencies(resolve_fn, deps)
Static Method

source code 

Resolves relative job IDs in dependencies.

Parameters:
  • resolve_fn (callable) - Function to resolve a relative job ID
  • deps (list) - Dependencies
Returns: tuple; (boolean, string or list)
If successful (first tuple item), the returned list contains resolved job IDs along with the requested status; if not successful, the second element is an error message

_EnqueueJobs(self, jobs)

source code 

Helper function to add jobs to worker pool's queue.

Parameters:
  • jobs (list) - List of all jobs
Decorators:
  • @locking.ssynchronized(_LOCK)

_EnqueueJobsUnlocked(self, jobs)

source code 

Helper function to add jobs to worker pool's queue.

Parameters:
  • jobs (list) - List of all jobs

_GetJobStatusForDependencies(self, job_id)

source code 

Gets the status of a job for dependencies.

Parameters:
  • job_id (int) - Job ID
Raises:

UpdateJobUnlocked(self, job, replicate=True)

source code 

Update a job's on disk storage.

After a job has been modified, this function needs to be called in order to write the changes to disk and replicate them to the other nodes.

Parameters:
  • job (_QueuedJob) - the changed job
  • replicate (boolean) - whether to replicate the change to remote nodes

HasJobBeenFinalized(self, job_id)

source code 

Checks if a job has been finalized.

Parameters:
  • job_id (int) - Job identifier
Returns: boolean
True if the job has been finalized, False if the timeout has been reached, None if the job doesn't exist

CancelJob(self, job_id)

source code 

Cancels a job.

This will only succeed if the job has not started yet.

Parameters:
  • job_id (int) - job ID of job to be cancelled.
Decorators:
  • @locking.ssynchronized(_LOCK)

ChangeJobPriority(self, job_id, priority)

source code 

Changes a job's priority.

Parameters:
  • job_id (int) - ID of the job whose priority should be changed
  • priority (int) - New priority
Decorators:
  • @locking.ssynchronized(_LOCK)

_ModifyJobUnlocked(self, job_id, mod_fn)

source code 

Modifies a job.

Parameters:
  • job_id (int) - Job ID
  • mod_fn (callable) - Modifying function, receiving job object as parameter, returning tuple of (status boolean, message string)

_ArchiveJobsUnlocked(self, jobs)

source code 

Archives jobs.

Parameters:
Returns: int
Number of archived jobs

QueryJobs(self, fields, qfilter)

source code 

Returns a list of jobs in queue.

Parameters:
  • fields (sequence) - List of wanted fields
  • qfilter (None or query2 filter (list)) - Query filter

OldStyleQueryJobs(self, job_ids, fields)

source code 

Returns a list of jobs in queue.

Parameters:
  • job_ids (list) - sequence of job identifiers or None for all
  • fields (list) - names of fields to return
Returns: list
list one element per job, each element being list with the requested fields

PrepareShutdown(self)

source code 

Prepare to stop the job queue.

Returns whether there are any jobs currently running. If the latter is the case, the job queue is not yet ready for shutdown. Once this function returns True Shutdown can be called without interfering with any job.

Returns: bool
Whether there are any running jobs
Decorators:
  • @locking.ssynchronized(_LOCK)

Shutdown(self)

source code 

Stops the job queue.

This shutdowns all the worker threads an closes the queue.

Decorators:
  • @locking.ssynchronized(_LOCK)