Package ganeti :: Module luxi
[hide private]
[frames] | no frames]

Source Code for Module ganeti.luxi

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2011, 2012, 2013, 2014 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module for the LUXI protocol 
 32   
 33  This module implements the local unix socket protocol. You only need 
 34  this module and the opcodes module in the client program in order to 
 35  communicate with the master. 
 36   
 37  The module is also used by the master daemon. 
 38   
 39  """ 
 40   
 41  from ganeti import constants 
 42  from ganeti import pathutils 
 43  from ganeti import objects 
 44  import ganeti.rpc.client as cl 
 45  from ganeti.rpc.errors import RequestError 
 46  from ganeti.rpc.transport import Transport 
 47   
 48  __all__ = [ 
 49    # classes: 
 50    "Client" 
 51    ] 
 52   
 53  REQ_SUBMIT_JOB = constants.LUXI_REQ_SUBMIT_JOB 
 54  REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = constants.LUXI_REQ_SUBMIT_JOB_TO_DRAINED_QUEUE 
 55  REQ_SUBMIT_MANY_JOBS = constants.LUXI_REQ_SUBMIT_MANY_JOBS 
 56  REQ_PICKUP_JOB = constants.LUXI_REQ_PICKUP_JOB 
 57  REQ_WAIT_FOR_JOB_CHANGE = constants.LUXI_REQ_WAIT_FOR_JOB_CHANGE 
 58  REQ_CANCEL_JOB = constants.LUXI_REQ_CANCEL_JOB 
 59  REQ_ARCHIVE_JOB = constants.LUXI_REQ_ARCHIVE_JOB 
 60  REQ_CHANGE_JOB_PRIORITY = constants.LUXI_REQ_CHANGE_JOB_PRIORITY 
 61  REQ_AUTO_ARCHIVE_JOBS = constants.LUXI_REQ_AUTO_ARCHIVE_JOBS 
 62  REQ_QUERY = constants.LUXI_REQ_QUERY 
 63  REQ_QUERY_FIELDS = constants.LUXI_REQ_QUERY_FIELDS 
 64  REQ_QUERY_JOBS = constants.LUXI_REQ_QUERY_JOBS 
 65  REQ_QUERY_FILTERS = constants.LUXI_REQ_QUERY_FILTERS 
 66  REQ_REPLACE_FILTER = constants.LUXI_REQ_REPLACE_FILTER 
 67  REQ_DELETE_FILTER = constants.LUXI_REQ_DELETE_FILTER 
 68  REQ_QUERY_INSTANCES = constants.LUXI_REQ_QUERY_INSTANCES 
 69  REQ_QUERY_NODES = constants.LUXI_REQ_QUERY_NODES 
 70  REQ_QUERY_GROUPS = constants.LUXI_REQ_QUERY_GROUPS 
 71  REQ_QUERY_NETWORKS = constants.LUXI_REQ_QUERY_NETWORKS 
 72  REQ_QUERY_EXPORTS = constants.LUXI_REQ_QUERY_EXPORTS 
 73  REQ_QUERY_CONFIG_VALUES = constants.LUXI_REQ_QUERY_CONFIG_VALUES 
 74  REQ_QUERY_CLUSTER_INFO = constants.LUXI_REQ_QUERY_CLUSTER_INFO 
 75  REQ_QUERY_TAGS = constants.LUXI_REQ_QUERY_TAGS 
 76  REQ_SET_DRAIN_FLAG = constants.LUXI_REQ_SET_DRAIN_FLAG 
 77  REQ_SET_WATCHER_PAUSE = constants.LUXI_REQ_SET_WATCHER_PAUSE 
 78  REQ_ALL = constants.LUXI_REQ_ALL 
 79   
 80  DEF_RWTO = constants.LUXI_DEF_RWTO 
 81  WFJC_TIMEOUT = constants.LUXI_WFJC_TIMEOUT 
82 83 84 -class Client(cl.AbstractClient):
85 """High-level client implementation. 86 87 This uses a backing Transport-like class on top of which it 88 implements data serialization/deserialization. 89 90 """
91 - def __init__(self, address=None, timeouts=None, transport=Transport):
92 """Constructor for the Client class. 93 94 Arguments are the same as for L{AbstractClient}. 95 96 """ 97 super(Client, self).__init__(timeouts, transport) 98 # Override the version of the protocol: 99 self.version = constants.LUXI_VERSION 100 # Store the socket address 101 if address is None: 102 address = pathutils.QUERY_SOCKET 103 self.address = address 104 self._InitTransport()
105
106 - def _GetAddress(self):
107 return self.address
108
109 - def SetQueueDrainFlag(self, drain_flag):
110 return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
111
112 - def SetWatcherPause(self, until):
113 return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
114
115 - def PickupJob(self, job):
116 return self.CallMethod(REQ_PICKUP_JOB, (job,))
117
118 - def SubmitJob(self, ops):
119 ops_state = map(lambda op: op.__getstate__() 120 if not isinstance(op, objects.ConfigObject) 121 else op.ToDict(_with_private=True), ops) 122 return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
123
124 - def SubmitJobToDrainedQueue(self, ops):
125 ops_state = map(lambda op: op.__getstate__(), ops) 126 return self.CallMethod(REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, (ops_state, ))
127
128 - def SubmitManyJobs(self, jobs):
129 jobs_state = [] 130 for ops in jobs: 131 jobs_state.append([op.__getstate__() for op in ops]) 132 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
133 134 @staticmethod
135 - def _PrepareJobId(request_name, job_id):
136 try: 137 return int(job_id) 138 except ValueError: 139 raise RequestError("Invalid parameter passed to %s as job id: " 140 " expected integer, got value %s" % 141 (request_name, job_id))
142
143 - def CancelJob(self, job_id, kill=False):
144 job_id = Client._PrepareJobId(REQ_CANCEL_JOB, job_id) 145 return self.CallMethod(REQ_CANCEL_JOB, (job_id, kill))
146
147 - def ArchiveJob(self, job_id):
148 job_id = Client._PrepareJobId(REQ_ARCHIVE_JOB, job_id) 149 return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
150
151 - def ChangeJobPriority(self, job_id, priority):
152 job_id = Client._PrepareJobId(REQ_CHANGE_JOB_PRIORITY, job_id) 153 return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
154
155 - def AutoArchiveJobs(self, age):
156 timeout = (DEF_RWTO - 1) / 2 157 return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
158
159 - def WaitForJobChangeOnce(self, job_id, fields, 160 prev_job_info, prev_log_serial, 161 timeout=WFJC_TIMEOUT):
162 """Waits for changes on a job. 163 164 @param job_id: Job ID 165 @type fields: list 166 @param fields: List of field names to be observed 167 @type prev_job_info: None or list 168 @param prev_job_info: Previously received job information 169 @type prev_log_serial: None or int/long 170 @param prev_log_serial: Highest log serial number previously received 171 @type timeout: int/float 172 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will 173 be capped to that value) 174 175 """ 176 assert timeout >= 0, "Timeout can not be negative" 177 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE, 178 (job_id, fields, prev_job_info, 179 prev_log_serial, 180 min(WFJC_TIMEOUT, timeout)))
181
182 - def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
183 job_id = Client._PrepareJobId(REQ_WAIT_FOR_JOB_CHANGE, job_id) 184 while True: 185 result = self.WaitForJobChangeOnce(job_id, fields, 186 prev_job_info, prev_log_serial) 187 if result != constants.JOB_NOTCHANGED: 188 break 189 return result
190
191 - def Query(self, what, fields, qfilter):
192 """Query for resources/items. 193 194 @param what: One of L{constants.QR_VIA_LUXI} 195 @type fields: List of strings 196 @param fields: List of requested fields 197 @type qfilter: None or list 198 @param qfilter: Query filter 199 @rtype: L{objects.QueryResponse} 200 201 """ 202 result = self.CallMethod(REQ_QUERY, (what, fields, qfilter)) 203 return objects.QueryResponse.FromDict(result)
204
205 - def QueryFields(self, what, fields):
206 """Query for available fields. 207 208 @param what: One of L{constants.QR_VIA_LUXI} 209 @type fields: None or list of strings 210 @param fields: List of requested fields 211 @rtype: L{objects.QueryFieldsResponse} 212 213 """ 214 result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields)) 215 return objects.QueryFieldsResponse.FromDict(result)
216
217 - def QueryJobs(self, job_ids, fields):
218 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
219
220 - def QueryFilters(self, uuids, fields):
221 return self.CallMethod(REQ_QUERY_FILTERS, (uuids, fields))
222
223 - def ReplaceFilter(self, uuid, priority, predicates, action, reason):
224 return self.CallMethod(REQ_REPLACE_FILTER, 225 (uuid, priority, predicates, action, reason))
226
227 - def DeleteFilter(self, uuid):
228 return self.CallMethod(REQ_DELETE_FILTER, (uuid, ))
229
230 - def QueryInstances(self, names, fields, use_locking):
231 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
232
233 - def QueryNodes(self, names, fields, use_locking):
234 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
235
236 - def QueryGroups(self, names, fields, use_locking):
237 return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
238
239 - def QueryNetworks(self, names, fields, use_locking):
240 return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
241
242 - def QueryExports(self, nodes, use_locking):
243 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
244
245 - def QueryClusterInfo(self):
246 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
247
248 - def QueryConfigValues(self, fields):
249 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
250
251 - def QueryTags(self, kind, name):
252 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
253