Driller
记录一下我对于 Driller / shellphuzz / AFL 的了解
prelogue
记录一下我对于 Driller / shellphuzz / fuzzer 的了解 从读源码开始. 相关部分源码我打包附在这里attachment 需要相关库的全部源码可以安装库之后去看看.
shellphuzz
shellphuzz -d 1 -c 1 -w workdir/shellphuzz/main -C --length-extension 4 ./main
这是一条很常见的shellphuzz
启动命令,在help里可以了解到:
-d
表示使用driller_workers数目
—c
表示AFL核心数目.
-w
表示工作目录所在
-C
表示stop on the frist crash
--length-extension
表示Try extending inputs
字节数.
但是我更想知道shellphuzz
的工作流程.
➜ ~ file `which shellphuzz`
/home/n132/.local/bin/shellphuzz: Python script, ASCII text executable
发现是一个python脚本.
shellphuzz
整个看下来发shellphuzz
只是一个启动器,主要就是启动了driller
和fuzzer
下面的shellphuzz
分析可以直接跳过。
import and prepare
1-15行import了一些需要用到的库
import *
16-37行为真正执行功能部分完成了参数的处理.
parser = argparse.ArgumentParser(description="Shellphish fuzzer interface")...
38-43行创建了log以及在/dev/shm/
创建了工作文件夹.
44-54行简单处理了各种可能的模块和拓展
52-61行判断是否需要启动Greas
(这个我好像没有用到我也不太清楚这个是个什么东西好像是helper_module
里的)
62-64行通过设置了LocalCallback
设置了driller
(这个函数会在后面分析)
65-70行完成以上两个拓展的设置.
初始化seed
#line 71-82
seeds = None
if args.seed_dir:
seeds = []
print ("[*] Seeding...")
for dirpath in args.seed_dir:
for filename in os.listdir(dirpath):
filepath = os.path.join(dirpath, filename)
if not os.path.isfile(filepath):
continue
with open(filepath, 'rb') as seedfile:
seeds.append(seedfile.read())
如果有指定seed的目录那就用其初始化seed,如果这样的话seed是一个list.
Fuzz
# line 84-93
fuzzer = fuzzer.Fuzzer(
args.binary, args.work_dir, afl_count=args.afl_cores, force_interval=args.force_interval,
create_dictionary=not args.no_dictionary, stuck_callback=stuck_callback, time_limit=args.timeout,
memory=args.memory, seeds=seeds, timeout=args.run_timeout,
)
# start it!
print ("[*] Starting fuzzer...")
fuzzer.start()
初始化了一个fuzzer
对象然后start,这个库和相关函数后文会有分析.
现在可以把他看成一个AFL
的一个壳用于从python初始化相关命令行参数以及启动AFL-fuzzer
并监控其运行状态.
94-103行,ipython模块 这个没用到..大概是一个交互用的模式…
Run
#line 104-147
try:
print ("[*] Waiting for fuzzer completion (timeout: %s, first_crash: %s)." % (args.timeout, args.first_crash))
crash_seen = False
while True:
time.sleep(5)
if not crash_seen and fuzzer.found_crash():
print ("[*] Crash found!")
crash_seen = True
if args.first_crash:
break
if fuzzer.timed_out():
print ("[*] Timeout reached.")
break
except KeyboardInterrupt:
print ("[*] Aborting wait. Ctrl-C again for KeyboardInterrupt.")
except Exception as e:
print ("[*] Unknown exception received (%s). Terminating fuzzer." % e)
fuzzer.kill()
if drill_extension:
drill_extension.kill()
raise
print ("[*] Terminating fuzzer.")
fuzzer.kill()
if drill_extension:
drill_extension.kill()
if args.tarball:
print ("[*] Dumping results...")
p = os.path.join("/tmp/", "afl_sync")
try:
shutil.rmtree(p)
except (OSError, IOError):
pass
shutil.copytree(fuzzer.out_dir, p)
tar_name = args.tarball.replace("{}", socket.gethostname())
tar = tarfile.open("/tmp/afl_sync.tar.gz", "w:gz")
tar.add(p, arcname=socket.gethostname()+'-'+os.path.basename(args.binary))
tar.close()
print ("[*] Copying out result tarball to %s" % tar_name)
shutil.move("/tmp/afl_sync.tar.gz", tar_name)
这是一个运行时的检测模块每隔5秒看一下是否找到了creash
和相关推出操作以及结束时的一些处理.
这里没有特别多可以讲的东西.主要的工作还是AFL-fuzzer
以及driller
接下来看一下第二简单的fuzzer
fuzzer
主要分析fuzzer/fuzzer.py
这个有660行左右就不一点点分析了主要分析一些会在__init__
/start
函数中用到的函数..(infact 函数都挺简单的名字取得都挺好基本上看名字就知道功能不知道的每个函数下面都有个简介..非常友好)
事实上如果耐心看完整个库你会发现其实就是AFL的python封装可以直接知道结论然后跳过到Driller
。
结论: fuzzer 是一个 AFL 的python封装主要工作是设置参数然后用subprocess开AFL-Fuzzer.
init
初始化函数. 开头一段是各个参数的说明.可以先不看下面如果遇到不知道意义的变量名就到这里来看看解释.
接下来是各种赋值以及初始化,主要是一些模式以及一些路径关键参数之类的都没什么好看的注释挺丰富的后面遇到难理解的变量直接这里找就行了…整个函数看下来没什么好说的就是一个初始化函数..(一水就200多行过去了)
start
启动函数启动fuzzer代码灰常简洁.
# spin up the AFL workers
self._start_afl()
# start the callback timer
self._timer.start()
self._on = True
启动afl,启动timer,打开开关,结束.
在分析函数_start_afl
之前我把其他可能会用到的函数的作用list一下..功能都挺简单的我安安静静看了挺久看完后感觉没什么必要其实只要看名字猜一下功能就行了.
other funcs
|name|func| |—|—| |alive()|检查是否有活着的fuzzer| |kill()|停掉proc中的所有进程| |state/bitmap()|获取fuzzer的stat,就是读文件(fuzzer_stats/fuzzer_bitmap)| |found_crash()|检查是否存在crash| |add/remove_fuzzer(s)()|add/remove fuzzer(s)| |_initialize_seeds()|set all seeds| |…|…|
_start_afl
def _start_afl(self): # trigger _start_afl_instance
'''
start up a number of AFL instances to begin fuzzing
'''
# spin up the master AFL instance
master = self._start_afl_instance() # the master fuzzer
self.procs.append(master)
if self.afl_count > 1:
driller = self._start_afl_instance()
self.procs.append(driller)
# only spins up an AFL instances if afl_count > 1
for _ in range(2, self.afl_count):
slave = self._start_afl_instance()
self.procs.append(slave)
通过afl_count
判断启动几个fuzzer
会有一个master以及count-1个solavers
主要启动AFL还是通过_start_afl_instance()
其实我感觉也没什么好看的主要就是通过subprocess
启动了AFL
def _start_afl_instance(self):# a wrapper of of AFL set all arg and run afl.
args = [self.afl_path]
args += ["-i", self.in_dir]
args += ["-o", self.out_dir]
args += ["-m", self.memory]
if self.qemu:
args += ["-Q"]
if self.crash_mode:
args += ["-C"]
if self.fuzz_id == 0:
args += ["-M", "fuzzer-master"]
outfile = "fuzzer-master.log"
else:
args += ["-S", "fuzzer-%d" % self.fuzz_id]
outfile = "fuzzer-%d.log" % self.fuzz_id
if self.dictionary is not None:
args += ["-x", self.dictionary]
if self.extra_opts is not None:
args += self.extra_opts
# auto-calculate timeout based on the number of binaries
if self.is_multicb:
args += ["-t", "%d+" % (1000 * len(self.binary_path))]
elif self.timeout:
args += ["-t", "%d+" % self.timeout]
args += ["--"]
args += self.binary_path if self.is_multicb else [self.binary_path]
args.extend(self.target_opts)
l.debug("execing: %s > %s", ' '.join(args), outfile)
# increment the fuzzer ID
self.fuzz_id += 1
outfile = os.path.join(self.job_dir, outfile)
with open(outfile, "w") as fp:
return subprocess.Popen(args, stdout=fp, close_fds=True)
就是设置了参数然后一个subprocess
.
Driller
driller是shellphuzz的核心部件,起作用为在fuzzer遇到困难一筹莫展之时利用symbolic execution
.来发现新的路径以提升代码覆盖率或者说是提高挖洞效率.
这篇里面不打算深入分析细节之后会有斜街分析
在开始driller代码初步分析之前 我先附上流程分析的结论:
driller先开一个fullinit的state然后把queue中的样例作为stdin之后创建simulation_manager再然后就一直step()遇到divert就pop出来将约束后的stdin放入queue以推动afl.
local_callback
def driller_callback(self, fuzz):
...
while len(self._running_workers) < self._num_workers and len(not_drilled) > 0:
to_drill_path = list(not_drilled)[0]
not_drilled.remove(to_drill_path)
self._already_drilled_inputs.add(to_drill_path)
proc = multiprocessing.Process(target=_run_drill, args=(self, fuzz, to_drill_path),
kwargs={'length_extension': self._length_extension})
proc.start()
self._running_workers.append(proc)
获取queue中的样例然后设置参数最后multiprocessing.Process
来_run_drill
然后就跑去看_run_drill
发现峰回路转其实又是一个壳…它其实就是设置参数然后调用了.main…也就是这货。
parser = argparse.ArgumentParser(description="Driller local callback")
parser.add_argument('binary_path')
parser.add_argument('fuzzer_out_dir')
parser.add_argument('bitmap_path')
parser.add_argument('path_to_input_to_drill')
parser.add_argument('--length-extension', help="Try extending inputs to driller by this many bytes", type=int)
args = parser.parse_args()
logcfg_file = os.path.join(os.getcwd(), '.driller.ini')
if os.path.isfile(logcfg_file):
logging.config.fileConfig(logcfg_file)
binary_path, fuzzer_out_dir, bitmap_path, path_to_input_to_drill = sys.argv[1:5]
fuzzer_bitmap = open(args.bitmap_path, "rb").read()
# create a folder
driller_dir = os.path.join(args.fuzzer_out_dir, "driller")
driller_queue_dir = os.path.join(driller_dir, "queue")
try: os.mkdir(driller_dir)
except OSError: pass
try: os.mkdir(driller_queue_dir)
except OSError: pass
l.debug('drilling %s', path_to_input_to_drill)
# get the input
inputs_to_drill = [open(args.path_to_input_to_drill, "rb").read()]
if args.length_extension:
inputs_to_drill.append(inputs_to_drill[0] + b'\0' * args.length_extension)
for input_to_drill in inputs_to_drill:
d = driller.Driller(args.binary_path, input_to_drill, fuzzer_bitmap)
count = 0
for new_input in d.drill_generator():
id_num = len(os.listdir(driller_queue_dir))
fuzzer_from = args.path_to_input_to_drill.split("sync/")[1].split("/")[0] + args.path_to_input_to_drill.split("id:")[1].split(",")[0]
filepath = "id:" + ("%d" % id_num).rjust(6, "0") + ",from:" + fuzzer_from
filepath = os.path.join(driller_queue_dir, filepath)
with open(filepath, "wb") as f:
f.write(new_input[1])
count += 1
l.warning("found %d new inputs", count)
显然和shellphuzz的代码风格很像前面都是在设置参数和完成初始化.
直到最后几行才可以看到创建了一个Driller
对象然后调用了drill_generator
…看到这里发现前面的一大堆处理好像又多余了…也就是是说drill_generator
才是drill
工作的过程。
drill_generator
在driller_main.py
中事实上最终需要关注的函数是其中的_drill_input
def drill_generator(self):
"""
A generator interface to the actual drilling.
"""
# Set up alarm for timeouts.
if config.DRILL_TIMEOUT is not None:
signal.alarm(config.DRILL_TIMEOUT)
for i in self._drill_input():
yield i
_drill_input
r = tracer.qemu_runner.QEMURunner(self.binary, self.input, argv=self.argv)
p = angr.Project(self.binary)
for addr, proc in self._hooks.items():
p.hook(addr, proc)
l.debug("Hooking %#x -> %s...", addr, proc.display_name)
创建两个对象
- tracer-r
- angr.project-p
以及完成hook工作.
if p.loader.main_object.os == 'cgc':
p.simos.syscall_library.update(angr.SIM_LIBRARIES['cgcabi_tracer'])
s = p.factory.entry_state(stdin=angr.SimFileStream, flag_page=r.magic, mode='tracing')
else:
s = p.factory.full_init_state(stdin=angr.SimFileStream, mode='tracing')
s.preconstrainer.preconstrain_file(self.input, s.posix.stdin, True)
simgr = p.factory.simulation_manager(s, save_unsat=True, hierarchy=False, save_unconstrained=r.crash_mode)
产生一个init
了的state
预先限制stdin
为queue
中获得的样例之后用该state
生成一个simulation_manager
t = angr.exploration_techniques.Tracer(trace=r.trace, crash_addr=r.crash_addr, copy_states=True)
self._core = angr.exploration_techniques.DrillerCore(trace=r.trace)
simgr.use_technique(t)
simgr.use_technique(angr.exploration_techniques.Oppologist())
simgr.use_technique(self._core)
self._set_concretizations(simgr.one_active)
接下来这几行我目前还看不太懂因为相关的函数基本之前学习angr的时候没用到过,理解只停留在字面上的理解大致是产生了一个tracer并且设置了一些策略(这部分之后再看angr-api结合使用分析).
while simgr.active and simgr.one_active.globals['trace_idx'] < len(r.trace) - 1:
simgr.step()
# Check here to see if a crash has been found.
if self.redis and self.redis.sismember(self.identifier + '-finished', True):
return
if 'diverted' not in simgr.stashes:
continue
while simgr.diverted:
state = simgr.diverted.pop(0)
l.debug("Found a diverted state, exploring to some extent.")
w = self._writeout(state.history.bbl_addrs[-1], state)
if w is not None:
yield w
for i in self._symbolic_explorer_stub(state):
yield i
然后开启循环疯狂step()
直到遇到了diverted
也就是说发现了分叉.这时候将该状态拿出来返回.
如此,就可以推动afl啦!
//这部分的内容可以继续探究,angr相关部分.
summary
shellfuzz==>启动AFL-fuzzer+启动driller.
其中driller是被shellfuzz调用来帮助AFL-fuzzer
来拓展新路径.
方式是利用符号执行的方法preconstrain stdin为queue中样例之后遇到分叉后求解获得可以达到新code-edge的输入传回给AFL-fuzzer
//终于理解driller这个名字的意思了…