回顾与总览
上一次我们选择了fuzz task的代码进行阅读,这次我们进一步深入,看看fuzz engine的选择
注册的时候当时我们有疑问说怎么没有afl,现在在读,除了有afl了,还有一个blackbox
src/python/bot/fuzzers/init.py
1 | def run(): |
现在fuzz_task整个调用路径是:
1 | 获取到任务->fuzz_task.py中的execute_task->FuzzingSession->run()->engine.get获取具体的引擎类,调用do_engine_fuzzing(engine_impl) -> run_engine_fuzzer(engine_impl, self.fuzz_target.binary, sync_corpus_directory, self.testcase_directory) -> |
run_engine_fuzzer中,调用prepare生成FuzzOptions(里面也设置了一些fuzz的策略),之后最后调用下面的函数启动fuzz
1 | options = engine_impl.prepare(sync_corpus_directory, target_path, build_dir) |
即下面的类中的fuzz的函数
1 | AFLEngine |
先来看libfuzzer
libfuzzer
prepare
先看prepare,首先获取参数
1 | arguments = fuzzer.get_arguments(target_path) |
参数就是先看看XXX.options文件是否存在(其中XXX为fuzz_target的名字),存在则返回fuzzer_options(类型FuzzerOptions类),通过fuzzer_options.get_engine_arguments(‘libfuzzer’)获取FuzzerArguments(arguments),之后通过获取FuzzerArguments的list方法转化为元素为”-%s=%s”的形式的list,之后就是加上rss_limit_mb设置内存限制,还有timeout的设置
之后获取grammar,这个也是从XXX.options的grammar section中获取的(这个在oss-fuzz中的项目中的options中没找到有这个section,是跟peach相关的,peach的模板的)
1 | grammar = fuzzer.get_grammar(target_path) |
继续,是生成一个策略池,之后选择策略
1 | strategy_pool = strategy_selection.generate_weighted_strategy_pool( |
generate_weighted_strategy_pool
首先generate_weighted_strategy_pool是根据经验设定好的概率生成策略池
策略列表如下,还是很多的
1 | LIBFUZZER_STRATEGY_LIST = [ |
generate_weighted_strategy_pool函数首先获取环境变量
1 | distribution = environment.get_value('STRATEGY_SELECTION_DISTRIBUTION') |
之后从STRATEGY_SELECTION_DISTRIBUTION
这里随机选取一个作为策略
1 | strategy_selection = utils.random_weighted_choice(distribution_tuples, |
,否则使用默认的,调用generate_default_strategy_pool
1 | return generate_default_strategy_pool(strategy_list, use_generator) |
一开始初始化一个StrategyPool类,之后选择生成器,最后就将LIBFUZZER_STRATEGY_LIST
中非GENERATORS
的策略都加到策略池
1 | def generate_default_strategy_pool(strategy_list, use_generator): |
这个choose_generator的功能是通过生成随机数,跟radamsa_prob + ml_rnn_prob比较,假如生成的随机数比较大(比radamsa_prob + ml_rnn_prob大),那就都不选择, 不选用radamsa和ml_rnn(机器学习相关的),假如比较小再调用一次decide_with_probability进行选择radamsa或者是ml_rnn
1 | # /src/python/bot/fuzzers/engine_common.py |
libfuzzer.pick_strategies
接下来看libfuzzer.pick_strategies
,里面就是对各种策略处理,实际将策略需要的工作完成,并返回StrategyInfo
1 | StrategyInfo(fuzzing_strategies, arguments, additional_corpus_dirs, |
DATAFLOW_TRACING_STRATEGY
对于有DFSAN构建的程序以及策略池中有DATAFLOW_TRACING_STRATEGY,先获取dataflow_binary_path(DFSAN的二进制fuzzer路径),之后判断dataflow_trace_dir是否存在,不存在就不执行这个策略了,存在则添加参数 -data_flow_trace=dataflow_trace_dir
,后面再加参数-focus_function=auto
,最后将策略的名字添加到fuzzing_strategies
1 | # Depends on the presense of DFSan instrumented build. |
CORPUS_MUTATION
接下来是Generate new testcase mutations的
首先看看strategy_pool中是否有CORPUS_MUTATION_ML_RNN_STRATEGY或者CORPUS_MUTATION_RADAMSA_STRATEGY(ML_RNN的优先级高于RADAMSA),有的话is_mutations_run就为True
is_mutations_run为True,先create_corpus_directory(‘mutations’)创建样本目录,之后生成样本,将使用的策略的名字添加到fuzzing_strategies,最后将new_testcase_mutations_directory添加到additional_corpus_dirs
1 | # Select a generator to attempt to use for existing testcase mutations. |
策略的核心函数是generate_new_testcase_mutations,根据candidate_generator使用generate_new_testcase_mutations_using_radamsa(RADAMSA会随机选择corpus_directory中符合大小的样,循环编译2000次)或者generate_new_testcase_mutations_using_ml_rnn函数去生成新的样本,假如生成的样本的数量比原来多,才会返回true
1 | def generate_new_testcase_mutations(corpus_directory, |
RANDOM_MAX_LENGTH_STRATEGY
这个就是最大长度策略,首先判断是否已经存在-max_len=参数了,存在就不做任何操作
假如不存在,则生成一个0到10000范围内的数,作为-max_len的值
1 | if strategy_pool.do_strategy(strategy.RANDOM_MAX_LENGTH_STRATEGY): |
RECOMMENDED_DICTIONARY_STRATEGY
这是推荐字典策略,函数add_recommended_dictionary
1 | if (strategy_pool.do_strategy(strategy.RECOMMENDED_DICTIONARY_STRATEGY) and |
add_recommended_dictionary就是从谷歌云下载recommended_dictionary.dict,假如原来有字典则与原来的字典合并,并使用合并后的字典
1 | def add_recommended_dictionary(arguments, fuzzer_name, fuzzer_path): |
VALUE_PROFILE_STRATEGY
这个简单,就是添加参数-use_value_profile=1,下面是帮助信息,应该是使用特殊的值来指导模糊测试
Experimental. Use value profile to guide fuzzing.
1 | if strategy_pool.do_strategy(strategy.VALUE_PROFILE_STRATEGY): |
FORK_STRATEGY
这个是fork策略,从MAX_FUZZ_THREADS获取max_fuzz_threads,默认值是1,
-fork=的参数是cpu的核心数除以max_fuzz_threads,最小为1
1 | # Do not use fork mode for DFT-based fuzzing. This is needed in order to |
MUTATOR_PLUGIN_STRATEGY
这个是use_mutator_plugin函数通过设置extra_env['LD_PRELOAD'] = mutator_plugin_path
来生效的
1 | extra_env = {} |
PEACH_GRAMMAR_MUTATION_STRATEGY
1 | if (not has_existing_mutator_strategy(fuzzing_strategies) and |
首先假如fuzzing_strategies已经有以下策略的其中一个,就不执行PEACH_GRAMMAR_MUTATION_STRATEGY策略了
1 | MUTATOR_STRATEGIES = [ |
PEACH_GRAMMAR_MUTATION_STRATEGY策略也是通过环境变量来生效的,在use_peach_mutator函数中主要是下面环境变量
1 | # Set title and pit environment variables |
MUTATOR_PLUGIN_RADAMSA_STRATEGY
这里的逻辑也是跟上面一样,MUTATOR_STRATEGIES其中之一已经存在,就不执行策略了
1 | if (not has_existing_mutator_strategy(fuzzing_strategies) and |
use_radamsa_mutator_plugin函数就是通过环境变量LD_PRELOAD生效的,extra_env['LD_PRELOAD'] = radamsa_path
1 | def use_radamsa_mutator_plugin(extra_env): |
libfuzzer.pick_strategies之后
展开参数
1 | arguments.extend(strategy_info.arguments) |
解压corpus
1 | # Check for seed corpus and add it into corpus directory. |
假如策略里面有CORPUS_SUBSET_STRATEGY,选择一些数量的corpus作为初始的corpus
1 | # Pick a few testcases from our corpus to use as the initial corpus. |
存在字典参数,检查字典文件参数,并检查字典是否存在
如果不存在字典参数,则检查%target_binary_name%.dict是否存在
最后还检查字典的格式并尝试修复,比如缺少双引号
1 | # Check dict argument to make sure that it's valid. |
prepare函数最后调用process_strategies,返回一个stats,哪个策略开没开,或者策略选择的值,就是strategies变量
1 | strategies = stats.process_strategies( |
fuzz的最大时间
从环境变量获取要fuzz的时长,减去在fuzz中的其他操作的时间,比如合并样本,字典分析等
1 | fuzz_test_timeout = environment.get_value('FUZZ_TEST_TIMEOUT') |
实际fuzz
实际fuzz就是下面这行
1 | result = engine_impl.fuzz(target_path, options, testcase_directory, |
跟进这个fuzz函数
下面的第一行profiler是性能分析相关的,假如设置了USE_PYTHON_PROFILER,并且不是False,就会启动Google Cloud Profiler
第二行就是通过libfuzzer.get_runner一般正常情况是返回的是LibFuzzerRunner(fuzzer_path)
第三行是设置sanitizer_options,比如exitcode为77
第四行创建一个临时的目录作为corpus的目录,之后地5行跟options.fuzz_corpus_dirs合并变成一个corpus_directories数组
之后调用runner.fuzz,就是实际起fuzz了
fuzz之后就简单概括下:
1、将fuzzer的输出splitlines
2、根据log看看有没有crash,并提取crash的文件的路径
3、如果libfuzzer的返回值非0,但是又没找到crash文件,那么这个应该是启动的时候就崩溃了,这时使用空文件作为crash文件
4、根据log_lines的信息,设置一些stats的值,比如crash_count,slow_unit_count,timeout_count,edges_total等
5、删除一些影响merge和字典分析的参数,比如-fork,-max_len,-runs等
6、给复现crash设置更大的超时时间
7、复制crash文件到主crash目录
8、从log中生成推荐字典
9、返回fuzz的结果
1 | profiler.start_if_needed('libfuzzer_fuzz') |
最后进去runner.fuzz函数看看,首先找到LibFuzzerRunner,发现fuzz函数实际调用的是LibFuzzerCommon.fuzz
1 | class LibFuzzerRunner(new_process.UnicodeProcessRunner, LibFuzzerCommon): |
找到LibFuzzerCommon.fuzz,里面处理了一下-artifact_prefix ,加上-max_total_time=和-print_final_stats=1,最后再加corpus_directories列表,最后就调用run_and_wait函数了(就是最多等待fuzz_timeout时间就退出,或者libfuzzer自动退出)
1 | def fuzz(self, |