mirror of
https://git.ugnet.gay/CrossTalk/azul.git
synced 2026-05-27 14:49:50 +00:00
66 lines
2.1 KiB
Python
66 lines
2.1 KiB
Python
from typing import Dict, Tuple, Any, Optional
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
import ssl
|
|
|
|
from cryptography import x509
|
|
from cryptography.hazmat.backends import default_backend
|
|
|
|
class TLSContext:
|
|
def __init__(self, cert_root: str, cert_dir: str) -> None:
|
|
self.cert_dir = Path(cert_dir)
|
|
self.cert_root = cert_root
|
|
self._cert_cache = {} # type: Dict[str, ssl.SSLContext]
|
|
|
|
def create_ssl_context(self) -> ssl.SSLContext:
|
|
self._get_root_cert()
|
|
|
|
ssl_context = ssl.create_default_context(purpose = ssl.Purpose.CLIENT_AUTH)
|
|
|
|
cache = self._cert_cache
|
|
def servername_callback(socket: Any, domain: Optional[str], ssl_context: ssl.SSLSocket) -> Optional[int]:
|
|
if domain is None:
|
|
domain = 'no-domain'
|
|
if domain not in cache:
|
|
ctxt = ssl.create_default_context(purpose = ssl.Purpose.CLIENT_AUTH)
|
|
p_crt, p_key = self._get_cert(domain)
|
|
ctxt.load_cert_chain(str(p_crt), keyfile = str(p_key))
|
|
cache[domain] = ctxt
|
|
socket.context = cache[domain]
|
|
return None
|
|
|
|
ssl_context.set_servername_callback(servername_callback)
|
|
return ssl_context
|
|
|
|
def _get_cert(self, domain: str) -> Tuple[Path, Path]:
|
|
p_crt = self.cert_dir / '{}'.format(domain)
|
|
p_key = self.cert_dir / '{}'.format(domain)
|
|
|
|
if not exists_and_valid(p_crt, p_key):
|
|
raise ssl.CertificateError()
|
|
|
|
return p_crt, p_key
|
|
|
|
def _get_root_cert(self) -> Tuple[Path, Path]:
|
|
assert self.cert_root is not None
|
|
|
|
p_crt = self.cert_dir / '{}'.format(self.cert_root)
|
|
p_key = self.cert_dir / '{}'.format(self.cert_root)
|
|
|
|
if not exists_and_valid(p_crt, p_key):
|
|
raise ssl.CertificateError()
|
|
|
|
return p_crt, p_key
|
|
|
|
def exists_and_valid(p_crt: Path, p_key: Path) -> bool:
|
|
if not p_crt.exists(): return False
|
|
if not p_key.exists(): return False
|
|
backend = default_backend() # type: ignore
|
|
with p_crt.open('rb') as fh:
|
|
crt = x509.load_pem_x509_certificate(fh.read(), backend)
|
|
|
|
now = datetime.now(timezone.utc)
|
|
if now < crt.not_valid_before_utc: return False
|
|
near_future = now + timedelta(days = 1)
|
|
if near_future > crt.not_valid_after_utc: return False
|
|
return True |