tests: Run kmemleak between tests if available

This triggers a kmemleak scan between tests. This allows finding memory
leaks and doing this should attribute the leak to the correct test in
most cases. Note that it does add a sleep after each test, as such it is
most sensible when combined with UML time-travel.

Signed-off-by: Benjamin Berg <benjamin.berg@intel.com>
This commit is contained in:
Benjamin Berg 2023-12-25 12:21:09 +02:00 committed by Jouni Malinen
parent fb90e42c37
commit 597e2be398

View file

@ -123,11 +123,12 @@ def report(conn, prefill, build, commit, run, test, result, duration, logdir,
logdir + "/" + test + "." + log) logdir + "/" + test + "." + log)
class DataCollector(object): class DataCollector(object):
def __init__(self, logdir, testname, args): def __init__(self, logdir, testname, kmemleak, args):
self._logdir = logdir self._logdir = logdir
self._testname = testname self._testname = testname
self._tracing = args.tracing self._tracing = args.tracing
self._dmesg = args.dmesg self._dmesg = args.dmesg
self._kmemleak = kmemleak
self._dbus = args.dbus self._dbus = args.dbus
def __enter__(self): def __enter__(self):
if self._tracing: if self._tracing:
@ -159,6 +160,36 @@ class DataCollector(object):
self._trace_cmd.stdin.write(b'DONE\n') self._trace_cmd.stdin.write(b'DONE\n')
self._trace_cmd.stdin.flush() self._trace_cmd.stdin.flush()
self._trace_cmd.wait() self._trace_cmd.wait()
if self._kmemleak:
output = os.path.join(self._logdir, '%s.kmemleak' % (self._testname, ))
num = 0
while os.path.exists(output):
output = os.path.join(self._logdir, '%s.kmemleak-%d' % (self._testname, num))
num += 1
# Trigger kmemleak
with open('/sys/kernel/debug/kmemleak', 'w+') as kmemleak:
kmemleak.write('scan')
kmemleak.seek(0)
# Minimum reporting age
time.sleep(5)
kmemleak.write('scan')
kmemleak.seek(0)
leaks = []
while l := kmemleak.read():
leaks.append(l)
leaks = ''.join(leaks)
if leaks:
with open(output, 'w') as out:
out.write(leaks)
kmemleak.seek(0)
kmemleak.write('clear')
if self._dmesg: if self._dmesg:
output = os.path.join(self._logdir, '%s.dmesg' % (self._testname, )) output = os.path.join(self._logdir, '%s.dmesg' % (self._testname, ))
num = 0 num = 0
@ -395,6 +426,19 @@ def main():
if args.dmesg: if args.dmesg:
subprocess.call(['dmesg', '-c'], stdout=open('/dev/null', 'w')) subprocess.call(['dmesg', '-c'], stdout=open('/dev/null', 'w'))
try:
# try to clear out any leaks that happened earlier
with open('/sys/kernel/debug/kmemleak', 'w') as kmemleak:
kmemleak.write('scan')
kmemleak.seek(0)
time.sleep(5)
kmemleak.write('scan')
kmemleak.seek(0)
kmemleak.write('clear')
have_kmemleak = True
except OSError:
have_kmemleak = False
if conn and args.prefill: if conn and args.prefill:
for t in tests_to_run: for t in tests_to_run:
name = t.__name__.replace('test_', '', 1) name = t.__name__.replace('test_', '', 1)
@ -494,7 +538,7 @@ def main():
pass pass
reset_ok = True reset_ok = True
with DataCollector(args.logdir, name, args): with DataCollector(args.logdir, name, have_kmemleak, args):
count = count + 1 count = count + 1
msg = "START {} {}/{}".format(name, count, num_tests) msg = "START {} {}/{}".format(name, count, num_tests)
logger.info(msg) logger.info(msg)
@ -651,6 +695,12 @@ def main():
logger.info("Kernel issue found in dmesg - mark test failed") logger.info("Kernel issue found in dmesg - mark test failed")
result = 'FAIL' result = 'FAIL'
if result == 'PASS' and have_kmemleak:
# The file is only created if a leak was found
if os.path.exists(os.path.join(args.logdir, name + '.kmemleak')):
logger.info("Kernel memory leak found - mark test failed")
result = 'FAIL'
if result == 'PASS': if result == 'PASS':
passed.append(name) passed.append(name)
elif result == 'SKIP': elif result == 'SKIP':