Package ganeti :: Module jstore
[hide private]
[frames] | no frames]

Source Code for Module ganeti.jstore

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module implementing the job queue handling.""" 
 32   
 33  import errno 
 34  import os 
 35   
 36  from ganeti import constants 
 37  from ganeti import errors 
 38  from ganeti import runtime 
 39  from ganeti import utils 
 40  from ganeti import pathutils 
 41   
 42   
 43  JOBS_PER_ARCHIVE_DIRECTORY = constants.JSTORE_JOBS_PER_ARCHIVE_DIRECTORY 
 44   
 45   
46 -def _ReadNumericFile(file_name):
47 """Reads a file containing a number. 48 49 @rtype: None or int 50 @return: None if file is not found, otherwise number 51 52 """ 53 try: 54 contents = utils.ReadFile(file_name) 55 except EnvironmentError, err: 56 if err.errno in (errno.ENOENT, ): 57 return None 58 raise 59 60 try: 61 return int(contents) 62 except (ValueError, TypeError), err: 63 # Couldn't convert to int 64 raise errors.JobQueueError("Content of file '%s' is not numeric: %s" % 65 (file_name, err))
66 67
68 -def ReadSerial():
69 """Read the serial file. 70 71 The queue should be locked while this function is called. 72 73 """ 74 return _ReadNumericFile(pathutils.JOB_QUEUE_SERIAL_FILE)
75 76
77 -def ReadVersion():
78 """Read the queue version. 79 80 The queue should be locked while this function is called. 81 82 """ 83 return _ReadNumericFile(pathutils.JOB_QUEUE_VERSION_FILE)
84 85
86 -def InitAndVerifyQueue(must_lock):
87 """Open and lock job queue. 88 89 If necessary, the queue is automatically initialized. 90 91 @type must_lock: bool 92 @param must_lock: Whether an exclusive lock must be held. 93 @rtype: utils.FileLock 94 @return: Lock object for the queue. This can be used to change the 95 locking mode. 96 97 """ 98 getents = runtime.GetEnts() 99 100 # Lock queue 101 queue_lock = utils.FileLock.Open(pathutils.JOB_QUEUE_LOCK_FILE) 102 try: 103 # The queue needs to be locked in exclusive mode to write to the serial and 104 # version files. 105 if must_lock: 106 queue_lock.Exclusive(blocking=True) 107 holding_lock = True 108 else: 109 try: 110 queue_lock.Exclusive(blocking=False) 111 holding_lock = True 112 except errors.LockError: 113 # Ignore errors and assume the process keeping the lock checked 114 # everything. 115 holding_lock = False 116 117 if holding_lock: 118 # Verify version 119 version = ReadVersion() 120 if version is None: 121 # Write new version file 122 utils.WriteFile(pathutils.JOB_QUEUE_VERSION_FILE, 123 uid=getents.masterd_uid, gid=getents.daemons_gid, 124 mode=constants.JOB_QUEUE_FILES_PERMS, 125 data="%s\n" % constants.JOB_QUEUE_VERSION) 126 127 # Read again 128 version = ReadVersion() 129 130 if version != constants.JOB_QUEUE_VERSION: 131 raise errors.JobQueueError("Found job queue version %s, expected %s", 132 version, constants.JOB_QUEUE_VERSION) 133 134 serial = ReadSerial() 135 if serial is None: 136 # Write new serial file 137 utils.WriteFile(pathutils.JOB_QUEUE_SERIAL_FILE, 138 uid=getents.masterd_uid, gid=getents.daemons_gid, 139 mode=constants.JOB_QUEUE_FILES_PERMS, 140 data="%s\n" % 0) 141 142 # Read again 143 serial = ReadSerial() 144 145 if serial is None: 146 # There must be a serious problem 147 raise errors.JobQueueError("Can't read/parse the job queue" 148 " serial file") 149 150 if not must_lock: 151 # There's no need for more error handling. Closing the lock 152 # file below in case of an error will unlock it anyway. 153 queue_lock.Unlock() 154 155 except: 156 queue_lock.Close() 157 raise 158 159 return queue_lock
160 161
162 -def CheckDrainFlag():
163 """Check if the queue is marked to be drained. 164 165 This currently uses the queue drain file, which makes it a per-node flag. 166 In the future this can be moved to the config file. 167 168 @rtype: boolean 169 @return: True if the job queue is marked drained 170 171 """ 172 return os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
173 174
175 -def SetDrainFlag(drain_flag):
176 """Sets the drain flag for the queue. 177 178 @type drain_flag: boolean 179 @param drain_flag: Whether to set or unset the drain flag 180 @attention: This function should only called the current holder of the queue 181 lock 182 183 """ 184 getents = runtime.GetEnts() 185 186 if drain_flag: 187 utils.WriteFile(pathutils.JOB_QUEUE_DRAIN_FILE, data="", 188 uid=getents.masterd_uid, gid=getents.daemons_gid, 189 mode=constants.JOB_QUEUE_FILES_PERMS) 190 else: 191 utils.RemoveFile(pathutils.JOB_QUEUE_DRAIN_FILE) 192 193 assert (not drain_flag) ^ CheckDrainFlag()
194 195
196 -def FormatJobID(job_id):
197 """Convert a job ID to int format. 198 199 Currently this just is a no-op that performs some checks, but if we 200 want to change the job id format this will abstract this change. 201 202 @type job_id: int or long 203 @param job_id: the numeric job id 204 @rtype: int 205 @return: the formatted job id 206 207 """ 208 if not isinstance(job_id, (int, long)): 209 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) 210 if job_id < 0: 211 raise errors.ProgrammerError("Job ID %s is negative" % job_id) 212 213 return job_id
214 215
216 -def GetArchiveDirectory(job_id):
217 """Returns the archive directory for a job. 218 219 @type job_id: str 220 @param job_id: Job identifier 221 @rtype: str 222 @return: Directory name 223 224 """ 225 return str(ParseJobId(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
226 227
228 -def ParseJobId(job_id):
229 """Parses a job ID and converts it to integer. 230 231 """ 232 try: 233 return int(job_id) 234 except (ValueError, TypeError): 235 raise errors.ParameterError("Invalid job ID '%s'" % job_id)
236