Support for TLS certificate verification and client certificates. (#301)

If a TLS certificate fingerprint is provided, the client will check it against the SHA256 hex digest of the server's certificate. Different hash algorithms can be specified, and multiple fingerprints can be specified for networks with more than one server.
This commit is contained in:
Samuel Hoffman 2018-01-10 08:58:19 -06:00 committed by Ryan Schmidt
parent a2be87b85a
commit 532386a2b9
4 changed files with 141 additions and 1 deletions

3
.gitignore vendored
View File

@ -8,6 +8,9 @@
__pycache__/ __pycache__/
*.py[co] *.py[co]
# Linter files
.mypy_cache/
# Config files # Config files
botconfig.py botconfig.py
/gamemodes.py /gamemodes.py

View File

@ -22,6 +22,8 @@ import threading
import time import time
import traceback import traceback
import os import os
import hashlib
import hmac
from oyoyo.parse import parse_raw_irc_command from oyoyo.parse import parse_raw_irc_command
@ -98,6 +100,11 @@ class IRCClient:
self.blocking = True self.blocking = True
self.sasl_auth = False self.sasl_auth = False
self.use_ssl = False self.use_ssl = False
self.cert_verify = False
self.cert_fp = ""
self.client_certfile = None
self.client_keyfile = None
self.cipher_list = None
self.server_pass = None self.server_pass = None
self.lock = threading.RLock() self.lock = threading.RLock()
self.stream_handler = lambda output, level=None: print(output) self.stream_handler = lambda output, level=None: print(output)
@ -174,7 +181,115 @@ class IRCClient:
sys.exit(1) sys.exit(1)
if self.use_ssl: if self.use_ssl:
self.socket = ssl.wrap_socket(self.socket) ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
if self.cipher_list:
try:
ctx.set_ciphers(self.cipher_list)
except Exception:
self.stream_handler("No ciphers could be selected from the cipher list. TLS is not available.", level="warning")
self.stream_handler("Use `openssl ciphers' to see which ciphers are available on this system.", level="warning")
raise
# explicitly disable old protocols
ctx.options |= ssl.OP_NO_SSLv2
ctx.options |= ssl.OP_NO_SSLv3
ctx.options |= ssl.OP_NO_TLSv1
# explicitly disable compression (CRIME attack)
ctx.options |= ssl.OP_NO_COMPRESSION
if sys.version_info >= (3, 6):
# TLS session tickets harm forward secrecy (this symbol is only defined in 3.6 and later)
ctx.options |= ssl.OP_NO_TICKET
if self.cert_verify and not self.cert_fp:
ctx.verify_mode = ssl.CERT_REQUIRED
if not self.cert_fp:
ctx.check_hostname = True
ctx.load_default_certs()
elif not self.cert_verify and not self.cert_fp:
self.stream_handler("NOT validating the server's TLS certificate! Set 'SSL_VERIFY=True' or define a fingerprint in 'SSL_CERTFP' in botconfig.py to enable this.", level="warning")
if self.client_certfile:
# if client_keyfile is not specified, the ssl module will look to the
# client_certfile for it.
try:
ctx.load_cert_chain(self.client_certfile, self.client_keyfile)
self.stream_handler("Connecting with a TLS client certificate", level="info")
except Exception as error:
self.stream_handler("Unable to load client cert/key pair: {0}".format(error), level="warning")
try:
self.socket = ctx.wrap_socket(self.socket, server_hostname=self.host)
except Exception as error:
self.stream_handler("Error occured while connecting with TLS: {0}".format(error), level="warning")
raise
if self.cert_fp:
algo = None
if ":" in self.cert_fp:
algo, fp = self.cert_fp.split(":")
fp = fp.split(",")
self.stream_handler("Checking server's certificate {0} hash sum".format(algo), level="info")
else:
fp = self.cert_fp.split(",")
hashlen = {32: "md5", 40: "sha1", 56: "sha224",
64: "sha256", 96: "sha384", 128: "sha512"}
peercert = self.socket.getpeercert(True)
h = None
peercertfp = None
if algo:
try:
h = hashlib.new(algo)
except Exception as error:
self.stream_handler("TLS certificate fingerprint verification failed: {}".format(error), level="warning")
self.stream_handler("Supported algorithms on this system: {0}".format(", ".join(hashlib.algorithms_available)), level="warning")
raise
h.update(peercert)
peercertfp = h.hexdigest()
matched = False
for n, fingerprint in enumerate(fp):
if not h:
fplen = len(fingerprint)
if fplen not in hashlen:
self.stream_handler("Unable to auto-detect fingerprint #{0} ({1}) algorithm type by length".format(n, fp), level="warning")
continue
algo = hashlen[fplen]
self.stream_handler("Checking server's certificate {0} hash sum".format(algo), level="info")
try:
h = hashlib.new(algo)
except Exception as error:
self.stream_handler("TLS certificate fingerprint verification failed: {}".format(error), level="warning")
self.stream_handler("Supported algorithms on this system: {0}".format(", ".join(hashlib.algorithms_available)), level="warning")
raise
h.update(peercert)
peercertfp = h.hexdigest()
if hmac.compare_digest(fingerprint, peercertfp):
matched = fingerprint
if not matched:
self.stream_handler("Certificate fingerprint {0} did not match any expected fingerprints".format(peercertfp), level="warning")
raise ssl.CertificateError("Certificate fingerprint does not match.")
self.stream_handler("Server certificate fingerprint matched {0} ({1})".format(matched, algo), level="info")
self.stream_handler("Connected with cipher {0}".format(self.socket.cipher()[0]), level="info")
if not self.blocking: if not self.blocking:
self.socket.setblocking(0) self.socket.setblocking(0)

View File

@ -6,6 +6,22 @@ from collections import defaultdict, OrderedDict
import botconfig import botconfig
LANGUAGE = 'en' LANGUAGE = 'en'
## TLS settings
# If SSL_CERTFP is supplied, the bot will attempt to verify that with the server. If not, then the
# bot will attempt certificate verification, otherwise it will abort the connection.
# You may specify the hash algorithm to use when verifying fingerprints. If unspecified, it will
# attempt to autodetect the hash type of each fingerprint individually. (Limited to MD5, SHA1,
# SHA224, SHA384, and SHA512).
# Multiple fingerprints may be comma separated for networks with multipleservers. Check your Python
# version for suported algorithms.
# Syntax: SSL_CERTFP = "[HASH-ALGO:]fingerprint1,fingerprint2,fingerprint3"
SSL_VERIFY = True
SSL_CERTFILE = None # Client cert file to connect with in PEM format; can also contain keyfile.
SSL_KEYFILE = None # Keyfile for the certfile in PEM format. if encrypted, password will prompt on the command line.
SSL_CERTFP = None
SSL_CIPHERS = None # Custom list of available ciphers in OpenSSL cipher list format. (<https://wiki.openssl.org/index.php/Manual:Ciphers(1)#CIPHER_LIST_FORMAT>)
MINIMUM_WAIT = 60 MINIMUM_WAIT = 60
EXTRA_WAIT = 30 EXTRA_WAIT = 30
EXTRA_WAIT_JOIN = 0 # Add this many seconds to the waiting time for each !join EXTRA_WAIT_JOIN = 0 # Add this many seconds to the waiting time for each !join

View File

@ -51,6 +51,7 @@ from oyoyo.client import IRCClient
import src import src
from src import handler from src import handler
from src.events import Event from src.events import Event
import src.settings as var
def main(): def main():
evt = Event("init", {}) evt = Event("init", {})
@ -70,6 +71,11 @@ def main():
sasl_auth=botconfig.SASL_AUTHENTICATION, sasl_auth=botconfig.SASL_AUTHENTICATION,
server_pass=botconfig.SERVER_PASS, server_pass=botconfig.SERVER_PASS,
use_ssl=botconfig.USE_SSL, use_ssl=botconfig.USE_SSL,
cert_verify=var.SSL_VERIFY,
cert_fp=var.SSL_CERTFP,
client_certfile=var.SSL_CERTFILE,
client_keyfile=var.SSL_KEYFILE,
cipher_list=var.SSL_CIPHERS,
connect_cb=handler.connect_callback, connect_cb=handler.connect_callback,
stream_handler=src.stream, stream_handler=src.stream,
) )