1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """Utility functions for X509.
22
23 """
24
25 import time
26 import OpenSSL
27 import re
28 import datetime
29 import calendar
30
31 from ganeti import errors
32 from ganeti import constants
33
34 from ganeti.utils import text as utils_text
35 from ganeti.utils import io as utils_io
36 from ganeti.utils import hash as utils_hash
37
38
39 HEX_CHAR_RE = r"[a-zA-Z0-9]"
40 VALID_X509_SIGNATURE_SALT = re.compile("^%s+$" % HEX_CHAR_RE, re.S)
41 X509_SIGNATURE = re.compile(r"^%s:\s*(?P<salt>%s+)/(?P<sign>%s+)$" %
42 (re.escape(constants.X509_CERT_SIGNATURE_HEADER),
43 HEX_CHAR_RE, HEX_CHAR_RE),
44 re.S | re.I)
45
46
47 (CERT_WARNING,
48 CERT_ERROR) = range(1, 3)
49
50
51 _ASN1_TIME_REGEX = re.compile(r"^(\d+)([-+]\d\d)(\d\d)$")
52
53
55 """Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL.
56
57 @type value: string
58 @param value: ASN1 GENERALIZEDTIME timestamp
59 @return: Seconds since the Epoch (1970-01-01 00:00:00 UTC)
60
61 """
62 m = _ASN1_TIME_REGEX.match(value)
63 if m:
64
65 asn1time = m.group(1)
66 hours = int(m.group(2))
67 minutes = int(m.group(3))
68 utcoffset = (60 * hours) + minutes
69 else:
70 if not value.endswith("Z"):
71 raise ValueError("Missing timezone")
72 asn1time = value[:-1]
73 utcoffset = 0
74
75 parsed = time.strptime(asn1time, "%Y%m%d%H%M%S")
76
77 tt = datetime.datetime(*(parsed[:7])) - datetime.timedelta(minutes=utcoffset)
78
79 return calendar.timegm(tt.utctimetuple())
80
81
83 """Returns the validity period of the certificate.
84
85 @type cert: OpenSSL.crypto.X509
86 @param cert: X509 certificate object
87
88 """
89
90
91 try:
92 get_notbefore_fn = cert.get_notBefore
93 except AttributeError:
94 not_before = None
95 else:
96 not_before_asn1 = get_notbefore_fn()
97
98 if not_before_asn1 is None:
99 not_before = None
100 else:
101 not_before = _ParseAsn1Generalizedtime(not_before_asn1)
102
103 try:
104 get_notafter_fn = cert.get_notAfter
105 except AttributeError:
106 not_after = None
107 else:
108 not_after_asn1 = get_notafter_fn()
109
110 if not_after_asn1 is None:
111 not_after = None
112 else:
113 not_after = _ParseAsn1Generalizedtime(not_after_asn1)
114
115 return (not_before, not_after)
116
117
120 """Verifies certificate validity.
121
122 @type expired: bool
123 @param expired: Whether pyOpenSSL considers the certificate as expired
124 @type not_before: number or None
125 @param not_before: Unix timestamp before which certificate is not valid
126 @type not_after: number or None
127 @param not_after: Unix timestamp after which certificate is invalid
128 @type now: number
129 @param now: Current time as Unix timestamp
130 @type warn_days: number or None
131 @param warn_days: How many days before expiration a warning should be reported
132 @type error_days: number or None
133 @param error_days: How many days before expiration an error should be reported
134
135 """
136 if expired:
137 msg = "Certificate is expired"
138
139 if not_before is not None and not_after is not None:
140 msg += (" (valid from %s to %s)" %
141 (utils_text.FormatTime(not_before),
142 utils_text.FormatTime(not_after)))
143 elif not_before is not None:
144 msg += " (valid from %s)" % utils_text.FormatTime(not_before)
145 elif not_after is not None:
146 msg += " (valid until %s)" % utils_text.FormatTime(not_after)
147
148 return (CERT_ERROR, msg)
149
150 elif not_before is not None and not_before > now:
151 return (CERT_WARNING,
152 "Certificate not yet valid (valid from %s)" %
153 utils_text.FormatTime(not_before))
154
155 elif not_after is not None:
156 remaining_days = int((not_after - now) / (24 * 3600))
157
158 msg = "Certificate expires in about %d days" % remaining_days
159
160 if error_days is not None and remaining_days <= error_days:
161 return (CERT_ERROR, msg)
162
163 if warn_days is not None and remaining_days <= warn_days:
164 return (CERT_WARNING, msg)
165
166 return (None, None)
167
168
170 """Verifies a certificate for LUClusterVerify.
171
172 @type cert: OpenSSL.crypto.X509
173 @param cert: X509 certificate object
174 @type warn_days: number or None
175 @param warn_days: How many days before expiration a warning should be reported
176 @type error_days: number or None
177 @param error_days: How many days before expiration an error should be reported
178
179 """
180
181 (not_before, not_after) = GetX509CertValidity(cert)
182
183 return _VerifyCertificateInner(cert.has_expired(), not_before, not_after,
184 time.time(), warn_days, error_days)
185
186
188 """Sign a X509 certificate.
189
190 An RFC822-like signature header is added in front of the certificate.
191
192 @type cert: OpenSSL.crypto.X509
193 @param cert: X509 certificate object
194 @type key: string
195 @param key: Key for HMAC
196 @type salt: string
197 @param salt: Salt for HMAC
198 @rtype: string
199 @return: Serialized and signed certificate in PEM format
200
201 """
202 if not VALID_X509_SIGNATURE_SALT.match(salt):
203 raise errors.GenericError("Invalid salt: %r" % salt)
204
205
206 cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
207
208 return ("%s: %s/%s\n\n%s" %
209 (constants.X509_CERT_SIGNATURE_HEADER, salt,
210 utils_hash.Sha1Hmac(key, cert_pem, salt=salt),
211 cert_pem))
212
213
215 """Helper function to extract signature from X509 certificate.
216
217 """
218
219 for line in cert_pem.splitlines():
220 if line.startswith("---"):
221 break
222
223 m = X509_SIGNATURE.match(line.strip())
224 if m:
225 return (m.group("salt"), m.group("sign"))
226
227 raise errors.GenericError("X509 certificate signature is missing")
228
229
231 """Verifies a signed X509 certificate.
232
233 @type cert_pem: string
234 @param cert_pem: Certificate in PEM format and with signature header
235 @type key: string
236 @param key: Key for HMAC
237 @rtype: tuple; (OpenSSL.crypto.X509, string)
238 @return: X509 certificate object and salt
239
240 """
241 (salt, signature) = _ExtractX509CertificateSignature(cert_pem)
242
243
244 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_pem)
245
246
247 sane_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
248
249 if not utils_hash.VerifySha1Hmac(key, sane_pem, signature, salt=salt):
250 raise errors.GenericError("X509 certificate signature is invalid")
251
252 return (cert, salt)
253
254
256 """Generates a self-signed X509 certificate.
257
258 @type common_name: string
259 @param common_name: commonName value
260 @type validity: int
261 @param validity: Validity for certificate in seconds
262
263 """
264
265 key = OpenSSL.crypto.PKey()
266 key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
267
268
269 cert = OpenSSL.crypto.X509()
270 if common_name:
271 cert.get_subject().CN = common_name
272 cert.set_serial_number(1)
273 cert.gmtime_adj_notBefore(0)
274 cert.gmtime_adj_notAfter(validity)
275 cert.set_issuer(cert.get_subject())
276 cert.set_pubkey(key)
277 cert.sign(key, constants.X509_CERT_SIGN_DIGEST)
278
279 key_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
280 cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
281
282 return (key_pem, cert_pem)
283
284
287 """Legacy function to generate self-signed X509 certificate.
288
289 @type filename: str
290 @param filename: path to write certificate to
291 @type common_name: string
292 @param common_name: commonName value
293 @type validity: int
294 @param validity: validity of certificate in number of days
295
296 """
297
298
299
300 (key_pem, cert_pem) = GenerateSelfSignedX509Cert(common_name,
301 validity * 24 * 60 * 60)
302
303 utils_io.WriteFile(filename, mode=0400, data=key_pem + cert_pem)
304