Package ganeti :: Module luxi
[hide private]
[frames] | no frames]

Source Code for Module ganeti.luxi

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2011, 2012, 2013, 2014 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Module for the LUXI protocol 
 32   
 33  This module implements the local unix socket protocol. You only need 
 34  this module and the opcodes module in the client program in order to 
 35  communicate with the master. 
 36   
 37  The module is also used by the master daemon. 
 38   
 39  """ 
 40   
 41  from ganeti import constants 
 42  from ganeti import objects 
 43  import ganeti.rpc.client as cl 
 44  from ganeti.rpc.errors import RequestError 
 45  from ganeti.rpc.transport import Transport 
 46   
 47  __all__ = [ 
 48    # classes: 
 49    "Client" 
 50    ] 
 51   
 52  REQ_SUBMIT_JOB = constants.LUXI_REQ_SUBMIT_JOB 
 53  REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = constants.LUXI_REQ_SUBMIT_JOB_TO_DRAINED_QUEUE 
 54  REQ_SUBMIT_MANY_JOBS = constants.LUXI_REQ_SUBMIT_MANY_JOBS 
 55  REQ_PICKUP_JOB = constants.LUXI_REQ_PICKUP_JOB 
 56  REQ_WAIT_FOR_JOB_CHANGE = constants.LUXI_REQ_WAIT_FOR_JOB_CHANGE 
 57  REQ_CANCEL_JOB = constants.LUXI_REQ_CANCEL_JOB 
 58  REQ_ARCHIVE_JOB = constants.LUXI_REQ_ARCHIVE_JOB 
 59  REQ_CHANGE_JOB_PRIORITY = constants.LUXI_REQ_CHANGE_JOB_PRIORITY 
 60  REQ_AUTO_ARCHIVE_JOBS = constants.LUXI_REQ_AUTO_ARCHIVE_JOBS 
 61  REQ_QUERY = constants.LUXI_REQ_QUERY 
 62  REQ_QUERY_FIELDS = constants.LUXI_REQ_QUERY_FIELDS 
 63  REQ_QUERY_JOBS = constants.LUXI_REQ_QUERY_JOBS 
 64  REQ_QUERY_INSTANCES = constants.LUXI_REQ_QUERY_INSTANCES 
 65  REQ_QUERY_NODES = constants.LUXI_REQ_QUERY_NODES 
 66  REQ_QUERY_GROUPS = constants.LUXI_REQ_QUERY_GROUPS 
 67  REQ_QUERY_NETWORKS = constants.LUXI_REQ_QUERY_NETWORKS 
 68  REQ_QUERY_EXPORTS = constants.LUXI_REQ_QUERY_EXPORTS 
 69  REQ_QUERY_CONFIG_VALUES = constants.LUXI_REQ_QUERY_CONFIG_VALUES 
 70  REQ_QUERY_CLUSTER_INFO = constants.LUXI_REQ_QUERY_CLUSTER_INFO 
 71  REQ_QUERY_TAGS = constants.LUXI_REQ_QUERY_TAGS 
 72  REQ_SET_DRAIN_FLAG = constants.LUXI_REQ_SET_DRAIN_FLAG 
 73  REQ_SET_WATCHER_PAUSE = constants.LUXI_REQ_SET_WATCHER_PAUSE 
 74  REQ_ALL = constants.LUXI_REQ_ALL 
 75   
 76  DEF_RWTO = constants.LUXI_DEF_RWTO 
 77  WFJC_TIMEOUT = constants.LUXI_WFJC_TIMEOUT 
78 79 80 -class Client(cl.AbstractClient):
81 """High-level client implementation. 82 83 This uses a backing Transport-like class on top of which it 84 implements data serialization/deserialization. 85 86 """
87 - def __init__(self, address=None, timeouts=None, transport=Transport):
88 """Constructor for the Client class. 89 90 Arguments are the same as for L{AbstractClient}. 91 92 """ 93 super(Client, self).__init__(address, timeouts, transport) 94 # Override the version of the protocol: 95 self.version = constants.LUXI_VERSION
96
97 - def SetQueueDrainFlag(self, drain_flag):
98 return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
99
100 - def SetWatcherPause(self, until):
101 return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
102
103 - def PickupJob(self, job):
104 return self.CallMethod(REQ_PICKUP_JOB, (job,))
105
106 - def SubmitJob(self, ops):
107 ops_state = map(lambda op: op.__getstate__(), ops) 108 return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
109
110 - def SubmitJobToDrainedQueue(self, ops):
111 ops_state = map(lambda op: op.__getstate__(), ops) 112 return self.CallMethod(REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, (ops_state, ))
113
114 - def SubmitManyJobs(self, jobs):
115 jobs_state = [] 116 for ops in jobs: 117 jobs_state.append([op.__getstate__() for op in ops]) 118 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
119 120 @staticmethod
121 - def _PrepareJobId(request_name, job_id):
122 try: 123 return int(job_id) 124 except ValueError: 125 raise RequestError("Invalid parameter passed to %s as job id: " 126 " expected integer, got value %s" % 127 (request_name, job_id))
128
129 - def CancelJob(self, job_id):
130 job_id = Client._PrepareJobId(REQ_CANCEL_JOB, job_id) 131 return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
132
133 - def ArchiveJob(self, job_id):
134 job_id = Client._PrepareJobId(REQ_ARCHIVE_JOB, job_id) 135 return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
136
137 - def ChangeJobPriority(self, job_id, priority):
138 job_id = Client._PrepareJobId(REQ_CHANGE_JOB_PRIORITY, job_id) 139 return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
140
141 - def AutoArchiveJobs(self, age):
142 timeout = (DEF_RWTO - 1) / 2 143 return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
144
145 - def WaitForJobChangeOnce(self, job_id, fields, 146 prev_job_info, prev_log_serial, 147 timeout=WFJC_TIMEOUT):
148 """Waits for changes on a job. 149 150 @param job_id: Job ID 151 @type fields: list 152 @param fields: List of field names to be observed 153 @type prev_job_info: None or list 154 @param prev_job_info: Previously received job information 155 @type prev_log_serial: None or int/long 156 @param prev_log_serial: Highest log serial number previously received 157 @type timeout: int/float 158 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will 159 be capped to that value) 160 161 """ 162 assert timeout >= 0, "Timeout can not be negative" 163 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE, 164 (job_id, fields, prev_job_info, 165 prev_log_serial, 166 min(WFJC_TIMEOUT, timeout)))
167
168 - def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
169 job_id = Client._PrepareJobId(REQ_WAIT_FOR_JOB_CHANGE, job_id) 170 while True: 171 result = self.WaitForJobChangeOnce(job_id, fields, 172 prev_job_info, prev_log_serial) 173 if result != constants.JOB_NOTCHANGED: 174 break 175 return result
176
177 - def Query(self, what, fields, qfilter):
178 """Query for resources/items. 179 180 @param what: One of L{constants.QR_VIA_LUXI} 181 @type fields: List of strings 182 @param fields: List of requested fields 183 @type qfilter: None or list 184 @param qfilter: Query filter 185 @rtype: L{objects.QueryResponse} 186 187 """ 188 result = self.CallMethod(REQ_QUERY, (what, fields, qfilter)) 189 return objects.QueryResponse.FromDict(result)
190
191 - def QueryFields(self, what, fields):
192 """Query for available fields. 193 194 @param what: One of L{constants.QR_VIA_LUXI} 195 @type fields: None or list of strings 196 @param fields: List of requested fields 197 @rtype: L{objects.QueryFieldsResponse} 198 199 """ 200 result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields)) 201 return objects.QueryFieldsResponse.FromDict(result)
202
203 - def QueryJobs(self, job_ids, fields):
204 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
205
206 - def QueryInstances(self, names, fields, use_locking):
207 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
208
209 - def QueryNodes(self, names, fields, use_locking):
210 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
211
212 - def QueryGroups(self, names, fields, use_locking):
213 return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
214
215 - def QueryNetworks(self, names, fields, use_locking):
216 return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
217
218 - def QueryExports(self, nodes, use_locking):
219 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
220
221 - def QueryClusterInfo(self):
222 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
223
224 - def QueryConfigValues(self, fields):
225 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
226
227 - def QueryTags(self, kind, name):
228 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
229