#!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2020 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # import socket import os import sys from subprocess import check_output from time import sleep V4HOST = '127.0.0.1' V6HOST = '::1' TCPPORT = 65432 UNIXSOCK = '/tmp/testsock' TYPE = socket.SOCK_STREAM class GenericTest(object): def __init__(self): raise NotImplementedError("Subclass must override the __init__ method") def setup(self, af, addr): self.sockets = [] self.ls = None self.ls = socket.socket(af, TYPE) self.ls.bind(addr) self.ls.listen(2) self.af = af self.addr = addr def doTest(self, cnt): rv = 0 for i in range(0, cnt): try: s = socket.socket(self.af, TYPE) s.connect(self.addr) except: continue self.sockets.append(s) rv += 1 return rv def __del__(self): for s in self.sockets: s.close() if self.ls is not None: self.ls.close() class IPv4Test(GenericTest): def __init__(self): super(IPv4Test, self).setup(socket.AF_INET, (V4HOST, TCPPORT)) class IPv6Test(GenericTest): def __init__(self): super(IPv6Test, self).setup(socket.AF_INET6, (V6HOST, TCPPORT)) class UnixTest(GenericTest): def __init__(self): super(UnixTest, self).setup(socket.AF_UNIX, UNIXSOCK) def __del__(self): super(UnixTest, self).__del__() os.remove(UNIXSOCK) class LogChecker(): def __init__(self): # Clear the dmesg buffer to prevent rotating causes issues os.system('/sbin/dmesg -c > /dev/null') # Figure out how big the dmesg buffer is. self.dmesgOff = len(check_output("/sbin/dmesg")) def checkForMsg(self, expected): newOff = self.dmesgOff for i in range(0, 3): dmesg = check_output("/sbin/dmesg") newOff = len(dmesg) if newOff >= self.dmesgOff: dmesg = dmesg[self.dmesgOff:] for line in dmesg.splitlines(): try: if str(line).find(expected) >= 0: self.dmesgOff = newOff return True except: pass sleep(0.5) self.dmesgOff = newOff return False def main(): ip4 = IPv4Test() ip6 = IPv6Test() lcl = UnixTest() lc = LogChecker() failure = False STDLOGMSG = "Listen queue overflow: 4 already in queue awaiting acceptance (1 occurrences)" V4LOGMSG = "(%s:%d (proto 6)): %s" % (V4HOST, TCPPORT, STDLOGMSG) ip4.doTest(5) if not lc.checkForMsg(V4LOGMSG): failure = True sys.stderr.write("IPv4 log message not seen\n") else: ip4.doTest(1) if lc.checkForMsg(V4LOGMSG): failure = True sys.stderr.write("Subsequent IPv4 log message not suppressed\n") V6LOGMSG = "([%s]:%d (proto 6)): %s" % (V6HOST, TCPPORT, STDLOGMSG) ip6.doTest(5) if not lc.checkForMsg(V6LOGMSG): failure = True sys.stderr.write("IPv6 log message not seen\n") else: ip6.doTest(1) if lc.checkForMsg(V6LOGMSG): failure = True sys.stderr.write("Subsequent IPv6 log message not suppressed\n") UNIXLOGMSG = "(local:%s): %s" % (UNIXSOCK, STDLOGMSG) lcl.doTest(5) if not lc.checkForMsg(UNIXLOGMSG): failure = True sys.stderr.write("Unix socket log message not seen\n") else: lcl.doTest(1) if lc.checkForMsg(UNIXLOGMSG): failure = True sys.stderr.write("Subsequent Unix socket log message not suppressed\n") if failure: sys.exit(1) sys.exit(0) if __name__ == '__main__': main()