Package ganeti :: Module luxi
[hide private]
[frames] | no frames]

Source Code for Module ganeti.luxi

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2011, 2012, 2013, 2014 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module for the LUXI protocol 
 32   
 33  This module implements the local unix socket protocol. You only need 
 34  this module and the opcodes module in the client program in order to 
 35  communicate with the master. 
 36   
 37  The module is also used by the master daemon. 
 38   
 39  """ 
 40   
 41  from ganeti import constants 
 42  from ganeti import pathutils 
 43  from ganeti import objects 
 44  import ganeti.rpc.client as cl 
 45  from ganeti.rpc.errors import RequestError 
 46  from ganeti.rpc.transport import Transport 
 47   
 48  __all__ = [ 
 49    # classes: 
 50    "Client" 
 51    ] 
 52   
 53  REQ_SUBMIT_JOB = constants.LUXI_REQ_SUBMIT_JOB 
 54  REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = constants.LUXI_REQ_SUBMIT_JOB_TO_DRAINED_QUEUE 
 55  REQ_SUBMIT_MANY_JOBS = constants.LUXI_REQ_SUBMIT_MANY_JOBS 
 56  REQ_PICKUP_JOB = constants.LUXI_REQ_PICKUP_JOB 
 57  REQ_WAIT_FOR_JOB_CHANGE = constants.LUXI_REQ_WAIT_FOR_JOB_CHANGE 
 58  REQ_CANCEL_JOB = constants.LUXI_REQ_CANCEL_JOB 
 59  REQ_ARCHIVE_JOB = constants.LUXI_REQ_ARCHIVE_JOB 
 60  REQ_CHANGE_JOB_PRIORITY = constants.LUXI_REQ_CHANGE_JOB_PRIORITY 
 61  REQ_AUTO_ARCHIVE_JOBS = constants.LUXI_REQ_AUTO_ARCHIVE_JOBS 
 62  REQ_QUERY = constants.LUXI_REQ_QUERY 
 63  REQ_QUERY_FIELDS = constants.LUXI_REQ_QUERY_FIELDS 
 64  REQ_QUERY_JOBS = constants.LUXI_REQ_QUERY_JOBS 
 65  REQ_QUERY_INSTANCES = constants.LUXI_REQ_QUERY_INSTANCES 
 66  REQ_QUERY_NODES = constants.LUXI_REQ_QUERY_NODES 
 67  REQ_QUERY_GROUPS = constants.LUXI_REQ_QUERY_GROUPS 
 68  REQ_QUERY_NETWORKS = constants.LUXI_REQ_QUERY_NETWORKS 
 69  REQ_QUERY_EXPORTS = constants.LUXI_REQ_QUERY_EXPORTS 
 70  REQ_QUERY_CONFIG_VALUES = constants.LUXI_REQ_QUERY_CONFIG_VALUES 
 71  REQ_QUERY_CLUSTER_INFO = constants.LUXI_REQ_QUERY_CLUSTER_INFO 
 72  REQ_QUERY_TAGS = constants.LUXI_REQ_QUERY_TAGS 
 73  REQ_SET_DRAIN_FLAG = constants.LUXI_REQ_SET_DRAIN_FLAG 
 74  REQ_SET_WATCHER_PAUSE = constants.LUXI_REQ_SET_WATCHER_PAUSE 
 75  REQ_ALL = constants.LUXI_REQ_ALL 
 76   
 77  DEF_RWTO = constants.LUXI_DEF_RWTO 
 78  WFJC_TIMEOUT = constants.LUXI_WFJC_TIMEOUT 
79 80 81 -class Client(cl.AbstractClient):
82 """High-level client implementation. 83 84 This uses a backing Transport-like class on top of which it 85 implements data serialization/deserialization. 86 87 """
88 - def __init__(self, address=None, timeouts=None, transport=Transport):
89 """Constructor for the Client class. 90 91 Arguments are the same as for L{AbstractClient}. 92 93 """ 94 super(Client, self).__init__(timeouts, transport) 95 # Override the version of the protocol: 96 self.version = constants.LUXI_VERSION 97 # Store the socket address 98 if address is None: 99 address = pathutils.QUERY_SOCKET 100 self.address = address 101 self._InitTransport()
102
103 - def _GetAddress(self):
104 return self.address
105
106 - def SetQueueDrainFlag(self, drain_flag):
107 return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
108
109 - def SetWatcherPause(self, until):
110 return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
111
112 - def PickupJob(self, job):
113 return self.CallMethod(REQ_PICKUP_JOB, (job,))
114
115 - def SubmitJob(self, ops):
116 ops_state = map(lambda op: op.__getstate__() 117 if not isinstance(op, objects.ConfigObject) 118 else op.ToDict(_with_private=True), ops) 119 return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
120
121 - def SubmitJobToDrainedQueue(self, ops):
122 ops_state = map(lambda op: op.__getstate__(), ops) 123 return self.CallMethod(REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, (ops_state, ))
124
125 - def SubmitManyJobs(self, jobs):
126 jobs_state = [] 127 for ops in jobs: 128 jobs_state.append([op.__getstate__() for op in ops]) 129 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
130 131 @staticmethod
132 - def _PrepareJobId(request_name, job_id):
133 try: 134 return int(job_id) 135 except ValueError: 136 raise RequestError("Invalid parameter passed to %s as job id: " 137 " expected integer, got value %s" % 138 (request_name, job_id))
139
140 - def CancelJob(self, job_id):
141 job_id = Client._PrepareJobId(REQ_CANCEL_JOB, job_id) 142 return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
143
144 - def ArchiveJob(self, job_id):
145 job_id = Client._PrepareJobId(REQ_ARCHIVE_JOB, job_id) 146 return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
147
148 - def ChangeJobPriority(self, job_id, priority):
149 job_id = Client._PrepareJobId(REQ_CHANGE_JOB_PRIORITY, job_id) 150 return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
151
152 - def AutoArchiveJobs(self, age):
153 timeout = (DEF_RWTO - 1) / 2 154 return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
155
156 - def WaitForJobChangeOnce(self, job_id, fields, 157 prev_job_info, prev_log_serial, 158 timeout=WFJC_TIMEOUT):
159 """Waits for changes on a job. 160 161 @param job_id: Job ID 162 @type fields: list 163 @param fields: List of field names to be observed 164 @type prev_job_info: None or list 165 @param prev_job_info: Previously received job information 166 @type prev_log_serial: None or int/long 167 @param prev_log_serial: Highest log serial number previously received 168 @type timeout: int/float 169 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will 170 be capped to that value) 171 172 """ 173 assert timeout >= 0, "Timeout can not be negative" 174 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE, 175 (job_id, fields, prev_job_info, 176 prev_log_serial, 177 min(WFJC_TIMEOUT, timeout)))
178
179 - def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
180 job_id = Client._PrepareJobId(REQ_WAIT_FOR_JOB_CHANGE, job_id) 181 while True: 182 result = self.WaitForJobChangeOnce(job_id, fields, 183 prev_job_info, prev_log_serial) 184 if result != constants.JOB_NOTCHANGED: 185 break 186 return result
187
188 - def Query(self, what, fields, qfilter):
189 """Query for resources/items. 190 191 @param what: One of L{constants.QR_VIA_LUXI} 192 @type fields: List of strings 193 @param fields: List of requested fields 194 @type qfilter: None or list 195 @param qfilter: Query filter 196 @rtype: L{objects.QueryResponse} 197 198 """ 199 result = self.CallMethod(REQ_QUERY, (what, fields, qfilter)) 200 return objects.QueryResponse.FromDict(result)
201
202 - def QueryFields(self, what, fields):
203 """Query for available fields. 204 205 @param what: One of L{constants.QR_VIA_LUXI} 206 @type fields: None or list of strings 207 @param fields: List of requested fields 208 @rtype: L{objects.QueryFieldsResponse} 209 210 """ 211 result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields)) 212 return objects.QueryFieldsResponse.FromDict(result)
213
214 - def QueryJobs(self, job_ids, fields):
215 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
216
217 - def QueryInstances(self, names, fields, use_locking):
218 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
219
220 - def QueryNodes(self, names, fields, use_locking):
221 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
222
223 - def QueryGroups(self, names, fields, use_locking):
224 return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
225
226 - def QueryNetworks(self, names, fields, use_locking):
227 return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
228
229 - def QueryExports(self, nodes, use_locking):
230 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
231
232 - def QueryClusterInfo(self):
233 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
234
235 - def QueryConfigValues(self, fields):
236 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
237
238 - def QueryTags(self, kind, name):
239 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
240