1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """Utility functions for X509.
22
23 """
24
25 import time
26 import OpenSSL
27 import re
28 import datetime
29 import calendar
30
31 from ganeti import errors
32 from ganeti import constants
33
34 from ganeti.utils import text as utils_text
35 from ganeti.utils import io as utils_io
36 from ganeti.utils import hash as utils_hash
37
38
39 HEX_CHAR_RE = r"[a-zA-Z0-9]"
40 VALID_X509_SIGNATURE_SALT = re.compile("^%s+$" % HEX_CHAR_RE, re.S)
41 X509_SIGNATURE = re.compile(r"^%s:\s*(?P<salt>%s+)/(?P<sign>%s+)$" %
42 (re.escape(constants.X509_CERT_SIGNATURE_HEADER),
43 HEX_CHAR_RE, HEX_CHAR_RE),
44 re.S | re.I)
45
46
47 (CERT_WARNING,
48 CERT_ERROR) = range(1, 3)
49
50
51 _ASN1_TIME_REGEX = re.compile(r"^(\d+)([-+]\d\d)(\d\d)$")
52
53
55 """Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL.
56
57 @type value: string
58 @param value: ASN1 GENERALIZEDTIME timestamp
59 @return: Seconds since the Epoch (1970-01-01 00:00:00 UTC)
60
61 """
62 m = _ASN1_TIME_REGEX.match(value)
63 if m:
64
65 asn1time = m.group(1)
66 hours = int(m.group(2))
67 minutes = int(m.group(3))
68 utcoffset = (60 * hours) + minutes
69 else:
70 if not value.endswith("Z"):
71 raise ValueError("Missing timezone")
72 asn1time = value[:-1]
73 utcoffset = 0
74
75 parsed = time.strptime(asn1time, "%Y%m%d%H%M%S")
76
77 tt = datetime.datetime(*(parsed[:7])) - datetime.timedelta(minutes=utcoffset)
78
79 return calendar.timegm(tt.utctimetuple())
80
81
83 """Returns the validity period of the certificate.
84
85 @type cert: OpenSSL.crypto.X509
86 @param cert: X509 certificate object
87
88 """
89
90
91 try:
92 get_notbefore_fn = cert.get_notBefore
93 except AttributeError:
94 not_before = None
95 else:
96 not_before_asn1 = get_notbefore_fn()
97
98 if not_before_asn1 is None:
99 not_before = None
100 else:
101 not_before = _ParseAsn1Generalizedtime(not_before_asn1)
102
103 try:
104 get_notafter_fn = cert.get_notAfter
105 except AttributeError:
106 not_after = None
107 else:
108 not_after_asn1 = get_notafter_fn()
109
110 if not_after_asn1 is None:
111 not_after = None
112 else:
113 not_after = _ParseAsn1Generalizedtime(not_after_asn1)
114
115 return (not_before, not_after)
116
117
120 """Verifies certificate validity.
121
122 @type expired: bool
123 @param expired: Whether pyOpenSSL considers the certificate as expired
124 @type not_before: number or None
125 @param not_before: Unix timestamp before which certificate is not valid
126 @type not_after: number or None
127 @param not_after: Unix timestamp after which certificate is invalid
128 @type now: number
129 @param now: Current time as Unix timestamp
130 @type warn_days: number or None
131 @param warn_days: How many days before expiration a warning should be reported
132 @type error_days: number or None
133 @param error_days: How many days before expiration an error should be reported
134
135 """
136 if expired:
137 msg = "Certificate is expired"
138
139 if not_before is not None and not_after is not None:
140 msg += (" (valid from %s to %s)" %
141 (utils_text.FormatTime(not_before),
142 utils_text.FormatTime(not_after)))
143 elif not_before is not None:
144 msg += " (valid from %s)" % utils_text.FormatTime(not_before)
145 elif not_after is not None:
146 msg += " (valid until %s)" % utils_text.FormatTime(not_after)
147
148 return (CERT_ERROR, msg)
149
150 elif not_before is not None and not_before > now:
151 return (CERT_WARNING,
152 "Certificate not yet valid (valid from %s)" %
153 utils_text.FormatTime(not_before))
154
155 elif not_after is not None:
156 remaining_days = int((not_after - now) / (24 * 3600))
157
158 msg = "Certificate expires in about %d days" % remaining_days
159
160 if error_days is not None and remaining_days <= error_days:
161 return (CERT_ERROR, msg)
162
163 if warn_days is not None and remaining_days <= warn_days:
164 return (CERT_WARNING, msg)
165
166 return (None, None)
167
168
170 """Verifies a certificate for LUClusterVerify.
171
172 @type cert: OpenSSL.crypto.X509
173 @param cert: X509 certificate object
174 @type warn_days: number or None
175 @param warn_days: How many days before expiration a warning should be reported
176 @type error_days: number or None
177 @param error_days: How many days before expiration an error should be reported
178
179 """
180
181 (not_before, not_after) = GetX509CertValidity(cert)
182
183 now = time.time() + constants.NODE_MAX_CLOCK_SKEW
184
185 return _VerifyCertificateInner(cert.has_expired(), not_before, not_after,
186 now, warn_days, error_days)
187
188
190 """Sign a X509 certificate.
191
192 An RFC822-like signature header is added in front of the certificate.
193
194 @type cert: OpenSSL.crypto.X509
195 @param cert: X509 certificate object
196 @type key: string
197 @param key: Key for HMAC
198 @type salt: string
199 @param salt: Salt for HMAC
200 @rtype: string
201 @return: Serialized and signed certificate in PEM format
202
203 """
204 if not VALID_X509_SIGNATURE_SALT.match(salt):
205 raise errors.GenericError("Invalid salt: %r" % salt)
206
207
208 cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
209
210 return ("%s: %s/%s\n\n%s" %
211 (constants.X509_CERT_SIGNATURE_HEADER, salt,
212 utils_hash.Sha1Hmac(key, cert_pem, salt=salt),
213 cert_pem))
214
215
217 """Helper function to extract signature from X509 certificate.
218
219 """
220
221 for line in cert_pem.splitlines():
222 if line.startswith("---"):
223 break
224
225 m = X509_SIGNATURE.match(line.strip())
226 if m:
227 return (m.group("salt"), m.group("sign"))
228
229 raise errors.GenericError("X509 certificate signature is missing")
230
231
233 """Verifies a signed X509 certificate.
234
235 @type cert_pem: string
236 @param cert_pem: Certificate in PEM format and with signature header
237 @type key: string
238 @param key: Key for HMAC
239 @rtype: tuple; (OpenSSL.crypto.X509, string)
240 @return: X509 certificate object and salt
241
242 """
243 (salt, signature) = _ExtractX509CertificateSignature(cert_pem)
244
245
246 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_pem)
247
248
249 sane_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
250
251 if not utils_hash.VerifySha1Hmac(key, sane_pem, signature, salt=salt):
252 raise errors.GenericError("X509 certificate signature is invalid")
253
254 return (cert, salt)
255
256
258 """Generates a self-signed X509 certificate.
259
260 @type common_name: string
261 @param common_name: commonName value
262 @type validity: int
263 @param validity: Validity for certificate in seconds
264 @return: a tuple of strings containing the PEM-encoded private key and
265 certificate
266
267 """
268
269 key = OpenSSL.crypto.PKey()
270 key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
271
272
273 cert = OpenSSL.crypto.X509()
274 if common_name:
275 cert.get_subject().CN = common_name
276 cert.set_serial_number(1)
277 cert.gmtime_adj_notBefore(0)
278 cert.gmtime_adj_notAfter(validity)
279 cert.set_issuer(cert.get_subject())
280 cert.set_pubkey(key)
281 cert.sign(key, constants.X509_CERT_SIGN_DIGEST)
282
283 key_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
284 cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
285
286 return (key_pem, cert_pem)
287
288
291 """Legacy function to generate self-signed X509 certificate.
292
293 @type filename: str
294 @param filename: path to write certificate to
295 @type common_name: string
296 @param common_name: commonName value
297 @type validity: int
298 @param validity: validity of certificate in number of days
299 @return: a tuple of strings containing the PEM-encoded private key and
300 certificate
301
302 """
303
304
305
306 (key_pem, cert_pem) = GenerateSelfSignedX509Cert(common_name,
307 validity * 24 * 60 * 60)
308
309 utils_io.WriteFile(filename, mode=0400, data=key_pem + cert_pem)
310 return (key_pem, cert_pem)
311