Package ganeti :: Module jstore
[hide private]
[frames] | no frames]

Source Code for Module ganeti.jstore

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Module implementing the job queue handling.""" 
 23   
 24  import errno 
 25  import os 
 26   
 27  from ganeti import constants 
 28  from ganeti import errors 
 29  from ganeti import runtime 
 30  from ganeti import utils 
 31  from ganeti import pathutils 
 32   
 33   
 34  JOBS_PER_ARCHIVE_DIRECTORY = 10000 
 35   
 36   
37 -def _ReadNumericFile(file_name):
38 """Reads a file containing a number. 39 40 @rtype: None or int 41 @return: None if file is not found, otherwise number 42 43 """ 44 try: 45 contents = utils.ReadFile(file_name) 46 except EnvironmentError, err: 47 if err.errno in (errno.ENOENT, ): 48 return None 49 raise 50 51 try: 52 return int(contents) 53 except (ValueError, TypeError), err: 54 # Couldn't convert to int 55 raise errors.JobQueueError("Content of file '%s' is not numeric: %s" % 56 (file_name, err))
57 58
59 -def ReadSerial():
60 """Read the serial file. 61 62 The queue should be locked while this function is called. 63 64 """ 65 return _ReadNumericFile(pathutils.JOB_QUEUE_SERIAL_FILE)
66 67
68 -def ReadVersion():
69 """Read the queue version. 70 71 The queue should be locked while this function is called. 72 73 """ 74 return _ReadNumericFile(pathutils.JOB_QUEUE_VERSION_FILE)
75 76
77 -def InitAndVerifyQueue(must_lock):
78 """Open and lock job queue. 79 80 If necessary, the queue is automatically initialized. 81 82 @type must_lock: bool 83 @param must_lock: Whether an exclusive lock must be held. 84 @rtype: utils.FileLock 85 @return: Lock object for the queue. This can be used to change the 86 locking mode. 87 88 """ 89 getents = runtime.GetEnts() 90 91 # Lock queue 92 queue_lock = utils.FileLock.Open(pathutils.JOB_QUEUE_LOCK_FILE) 93 try: 94 # The queue needs to be locked in exclusive mode to write to the serial and 95 # version files. 96 if must_lock: 97 queue_lock.Exclusive(blocking=True) 98 holding_lock = True 99 else: 100 try: 101 queue_lock.Exclusive(blocking=False) 102 holding_lock = True 103 except errors.LockError: 104 # Ignore errors and assume the process keeping the lock checked 105 # everything. 106 holding_lock = False 107 108 if holding_lock: 109 # Verify version 110 version = ReadVersion() 111 if version is None: 112 # Write new version file 113 utils.WriteFile(pathutils.JOB_QUEUE_VERSION_FILE, 114 uid=getents.masterd_uid, gid=getents.daemons_gid, 115 mode=constants.JOB_QUEUE_FILES_PERMS, 116 data="%s\n" % constants.JOB_QUEUE_VERSION) 117 118 # Read again 119 version = ReadVersion() 120 121 if version != constants.JOB_QUEUE_VERSION: 122 raise errors.JobQueueError("Found job queue version %s, expected %s", 123 version, constants.JOB_QUEUE_VERSION) 124 125 serial = ReadSerial() 126 if serial is None: 127 # Write new serial file 128 utils.WriteFile(pathutils.JOB_QUEUE_SERIAL_FILE, 129 uid=getents.masterd_uid, gid=getents.daemons_gid, 130 mode=constants.JOB_QUEUE_FILES_PERMS, 131 data="%s\n" % 0) 132 133 # Read again 134 serial = ReadSerial() 135 136 if serial is None: 137 # There must be a serious problem 138 raise errors.JobQueueError("Can't read/parse the job queue" 139 " serial file") 140 141 if not must_lock: 142 # There's no need for more error handling. Closing the lock 143 # file below in case of an error will unlock it anyway. 144 queue_lock.Unlock() 145 146 except: 147 queue_lock.Close() 148 raise 149 150 return queue_lock
151 152
153 -def CheckDrainFlag():
154 """Check if the queue is marked to be drained. 155 156 This currently uses the queue drain file, which makes it a per-node flag. 157 In the future this can be moved to the config file. 158 159 @rtype: boolean 160 @return: True if the job queue is marked drained 161 162 """ 163 return os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
164 165
166 -def SetDrainFlag(drain_flag):
167 """Sets the drain flag for the queue. 168 169 @type drain_flag: boolean 170 @param drain_flag: Whether to set or unset the drain flag 171 @attention: This function should only called the current holder of the queue 172 lock 173 174 """ 175 getents = runtime.GetEnts() 176 177 if drain_flag: 178 utils.WriteFile(pathutils.JOB_QUEUE_DRAIN_FILE, data="", 179 uid=getents.masterd_uid, gid=getents.daemons_gid, 180 mode=constants.JOB_QUEUE_FILES_PERMS) 181 else: 182 utils.RemoveFile(pathutils.JOB_QUEUE_DRAIN_FILE) 183 184 assert (not drain_flag) ^ CheckDrainFlag()
185 186
187 -def FormatJobID(job_id):
188 """Convert a job ID to int format. 189 190 Currently this just is a no-op that performs some checks, but if we 191 want to change the job id format this will abstract this change. 192 193 @type job_id: int or long 194 @param job_id: the numeric job id 195 @rtype: int 196 @return: the formatted job id 197 198 """ 199 if not isinstance(job_id, (int, long)): 200 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) 201 if job_id < 0: 202 raise errors.ProgrammerError("Job ID %s is negative" % job_id) 203 204 return job_id
205 206
207 -def GetArchiveDirectory(job_id):
208 """Returns the archive directory for a job. 209 210 @type job_id: str 211 @param job_id: Job identifier 212 @rtype: str 213 @return: Directory name 214 215 """ 216 return str(ParseJobId(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
217 218
219 -def ParseJobId(job_id):
220 """Parses a job ID and converts it to integer. 221 222 """ 223 try: 224 return int(job_id) 225 except (ValueError, TypeError): 226 raise errors.ParameterError("Invalid job ID '%s'" % job_id)
227