Newer
Older
from argparse import ArgumentParser
from enum import Enum, auto
from dataclasses import dataclass
from pathlib import Path
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class MsgType(Enum):
"""Type of libfuzzer log message"""
READ = auto()
INITED = auto()
NEW = auto()
REDUCE = auto()
pulse = auto()
DONE = auto()
RELOAD = auto()
@dataclass(repr=True)
class Msg:
"""Libfuzzer log message contents"""
ty: MsgType
n_inputs: int
cov: int | None
ft: int | None
# See libfuzzer output format: https://llvm.org/docs/LibFuzzer.html#output
def parse_line(line: str) -> Msg | None:
"""Parse libfuzzer log message"""
line = line.split()
msg_ty = None
for ty in MsgType:
if ty.name in line:
msg_ty = ty
break
if not msg_ty:
return None
n_inputs = 0
if line[0][0] == "#":
n_inputs = int(line[0][1:])
cov = None
if "cov:" in line:
cov_i = line.index("cov:")
cov = int(line[cov_i + 1])
ft = None
if "ft:" in line:
ft_i = line.index("ft:")
ft = int(line[ft_i + 1])
return Msg(msg_ty, n_inputs, cov, ft)
class Supervisor:
"""A wrapper to run the fuzzer until stopping criteria are satisfied
or an error is encountered"""
def __init__(self, fuzzer_name: str, libfuzzer_log: Path, corpus_dir: Path, wait_2x_cov: bool):
self.libfuzzer_log = libfuzzer_log
self.corpus_dir = corpus_dir
self.wait_2x_cov = wait_2x_cov
self.start_time = time.monotonic()
self.init_cov = None
self.latest_cov = None
self.init_ft = None
self.latest_ft = None
self.latest_n_inputs = 0
Egor Ivkov
committed
self.latest_cov_inc = self.start_time
"""Indicates whether stopping criteria of the corresponding fuzzer
are satisfied. E.g. it was running long enough and covered enough paths."""
# Coverage increased at least twice in comparison with corpus
cov = False
if self.init_cov:
cov = self.latest_cov >= self.init_cov * 2
# Number of unique paths (features) increased at least twice
# in comparison with corpus
ft = False
if self.init_ft:
ft = self.latest_ft >= self.init_ft * 2
# At least 100_000 fuzzer test runs
n_inputs = self.latest_n_inputs > 100_000
Egor Ivkov
committed
# Last coverage increase was detected 2 hours ago
cov_inc = (time.monotonic() - self.latest_cov_inc) > 2 * 60 * 60
Egor Ivkov
committed
return (cov or ft or not self.wait_2x_cov) and n_inputs and cov_inc
def update_stats(self, msg: Msg):
self.latest_n_inputs = msg.n_inputs
if msg.cov:
Egor Ivkov
committed
if self.latest_cov and msg.cov > self.latest_cov:
self.latest_cov_inc = time.monotonic()
self.latest_cov = msg.cov
if msg.ft:
self.latest_ft = msg.ft
if msg.ty == MsgType.INITED:
self.init_cov = msg.cov
self.init_ft = msg.ft
def run(self):
"""Run the corresponding fuzzer until either stopping criteria are
satisfied or fuzzing fails with an error indicating that some problem
was detected."""
print(f"Running {self.fuzzer_name}")
proc = subprocess.Popen(
[
f"./oss-fuzz/build/out/tarantool/{self.fuzzer_name}_fuzzer",
# Read from both corpus folders, write only to the first
self.corpus_dir.resolve(),
f"test/static/corpus/{self.fuzzer_name}"
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
with self.libfuzzer_log.open("w") as libfuzzer_log:
for line in proc.stderr:
msg = parse_line(line)
# Add timestamp in seconds relative to the starting time of a fuzzer
line = f"{time.monotonic() - self.start_time}s {line}"
libfuzzer_log.write(line)
# if it is not a conventional log message print and continue
if not msg:
print(line.strip("\n"), end="\r\n")
continue
# On pulse print the current state
if msg.ty == MsgType.pulse:
print(vars(self), end="\r\n")
self.update_stats(msg)
if self.criteria_satisfied():
print(f"Fuzzer {self.fuzzer_name} satisfied criteria")
proc.terminate()
proc.wait(timeout=15)
outs, errs = proc.communicate(timeout=15)
print(outs)
print(errs)
code = proc.poll()
if code != 0 and (not self.criteria_satisfied()):
raise Exception(
f"Fuzzer {self.fuzzer_name} stopped without satisfying criteria. Exit code: {code}"
)
# The script takes the fuzzing target name as the first argument.
# Then it runs the fuzzing target until either stopping criteria are satisfied
# Fuzzers should be built first. See README.md for instructions.
parser = ArgumentParser(
prog="Fuzzer Supervisor",
description="Runs a supplied fuzzer target until either stopping criteria\
are satisfied or fuzzer detects a bug and fails",
)
parser.add_argument("fuzzer_target_name")
parser.add_argument(
"--libfuzzer-log",
required=True,
type=Path,
help="File where libfuzzer log will be written",
)
parser.add_argument(
"--corpus-dir",
required=True,
type=Path,
help="Directory where libuzzer generated corpus will be stored",
)
parser.add_argument(
"--wait-2x-cov",
action="store_true",
help="Enables 'coverage increased at least twice' criteria",
)
args = parser.parse_args()
Supervisor(args.fuzzer_target_name, args.libfuzzer_log, args.corpus_dir, args.wait_2x_cov).run()