IRMA是一款类似于VT的杀软检测工具,他可以集成大量的杀软针对文件进行静态特征检测,框架写的还是挺不错的,这里剖析下他的源码。
先上一张官方的原型图。IRMA主要有三个部分组成,FRONTEND、BRAIN、PROBES,其中FRONTEND主要负责前端接口,BRAIN负责任务调度以及文件传输,PROBES是核心的检测模块,会部署很多杀软进行文件检测。
IRMA Probes使用celery进行分布式任务调度,先从主函数irma\probe\probe\tasks.py开始分析。
# 创建引擎插件类 manager = PluginManager() # 发掘引擎插件 manager.discover(plugin_path)
讯享网
IRMA中的检测引擎使用自注册的方式增加杀软。简单跟踪下discover函数。
讯享网 def discover(self, path=os.path.dirname(__file__), prefix=None): dirname = os.path.basename(path) if prefix is None: prefix = dirname for importer, name, ispkg in pkgutil.walk_packages([path]): try: pkg_name = '%s.%s' % (prefix, name) if pkg_name not in sys.modules: __import__(pkg_name) if ispkg: self.discover(os.path.join(path, name), pkg_name) except PluginFormatError as error: logging.warn(' * [{name}] Plugin error: {error}' ''.format(name=name, error=error)) except PluginLoadError as error: logging.warn(' * [{name}] Plugin failed to load: {error}' ''.format(name=name, error=error)) except PluginCrashed as error: logging.warn(' * [{name}] Plugin crashed: {error}' ''.format(name=name, error=error)) except ImportError as error: logging.exception(error)
很简单,就是discover函数接受一个路径参数,然后遍历该路径,通过递归的方式将各个检测Model动态import到环境中,walk_packages函数的作用就是查找所有的python Module。

这里需要注意一点,在调用__import__函数时,会自注册该检测引擎插件,因为每一个插件都使用了metaclass元类,那么import的类本质上就是一个创建的对象,自然会执行元类的__init__函数,元类PluginMetaClass代码如下:
class PluginMetaClass(type): [skip] ... # Plugin methods def __init__(cls, name, bases, attrs): # small hack to skip PluginBase class when initializing if name == 'PluginBase': return # perform some verifications if not cls._plugin_name_: raise PluginFormatError("Invalid plugin_name") if not cls._plugin_display_name_: raise PluginFormatError("Invalid plugin_display_name") if not cls._plugin_author_: raise PluginFormatError("Invalid plugin_author") if not cls._plugin_version_: raise PluginFormatError("Invalid plugin_version") if not cls._plugin_category_: raise PluginFormatError("Invalid plugin_category") # try to register plugin PluginManager().register_plugin(cls)
可以看到,元类中调用了 PluginManager().register_plugin(cls)进行注册该插件,注册过程为:
讯享网 @classmethod def register_plugin(cls, plugin): logging.debug('Found plugin {name}. Trying to register it.' ''.format(name=plugin.plugin_name)) # check for dependencies for dependency in plugin.plugin_dependencies: try: dependency.check() except DependencyMissing as error: # get plugin info plugin_name = plugin.plugin_name # get dependency info dependency = error.dependency dependency_name = dependency.dependency_name dependency_type = dependency.__class__.__name__ dependency_help = dependency.help # warn user and stop loading warning = '{name} miss dependencies: {deps} ({type}).' if dependency_help is not None: warning += ' {help}' raise PluginLoadError(warning.format(type=dependency_type, name=plugin_name, deps=dependency_name, help=dependency_help)) # if required, run additionnal verifications on the plugin if hasattr(plugin, 'verify'): try: plugin.verify() except Exception as error: raise PluginLoadError(error) # add plugin to internal list if plugin.plugin_canonical_name in cls.__plugins_cls: logging.debug('Plugin {name} already registered' ''.format(name=plugin.plugin_name)) else: cls.__plugins_cls[plugin.plugin_canonical_name] = plugin # mark plugin as active if plugin.plugin_active is None: plugin.plugin_active = True logging.debug('Plugin {name} registered, active set as {state}' ''.format(name=plugin.plugin_name, state=plugin.plugin_active))
注册过程为,先是当前的运行平台是否满足插件运行条件,其次调用verify()函数判断插件是否存在,默认情况下只是进行检测引擎可执行路径的判断。如果上面两个步骤都没有问题,就会增加插件module至插件列表中。
# determine dynamically queues to connect to using plugin names probes = manager.get_all_plugins() [skip] ... for p in probes: # register probe on Brain log.info('Register probe %s' % p.plugin_name) delay = 1 while True: try: task = register_probe(p.plugin_name, p.plugin_display_name, p.plugin_category, p.plugin_mimetype_regexp) task.get(timeout=10) break except (TimeoutError, IrmaTaskError): log.error("Registering on brain failed retry in %s seconds...", delay) time.sleep(delay) delay = min([2*delay, RETRY_MAX_DELAY]) pass
至此检测引擎插件自注册完毕,后面便将插件列表赋值给probes,并通知BRAIN,我可以进行哪些方面的检测,之后开始等待任务,并执行相应的插件扫描。
讯享网@probe_app.task(acks_late=True) def probe_scan(frontend, filename): routing_key = current_task.request.delivery_info['routing_key'] if routing_key == 'dlq': log.error("filename %s scan timeout", filename) raise ValueError("Timeout") try: tmpname = None # retrieve queue name and the associated plugin probe = probes[routing_key] log.debug("filename %s probe %s", filename, probe) (fd, tmpname) = tempfile.mkstemp() os.close(fd) ftp_ctrl.download_file(frontend, filename, tmpname) results = probe.run(tmpname) handle_output_files(results, frontend, filename) return bytes_to_utf8(results) except Exception as e: log.exception(type(e).__name__ + " : " + str(e)) raise probe_scan.retry(countdown=2, max_retries=3, exc=e) finally: # Some AV always delete suspicious file if tmpname is not None and os.path.exists(tmpname): log.debug("filename %s probe %s removing tmp_name %s", filename, probe, tmpname) os.remove(tmpname)
检测过程为: 先是获取probe检测引擎,之后通过ftp_ctrl下载文件,调用probe.run(tmpname)进行扫描,之后处理结果并返回。
OK,整体的执行过程大概就是这样,其中的分布式celery,由于没有实践过可能说的也不是很清晰,下一篇讲分析下每个插件又是如何进行检测分析的,以及如何高效的集成一个自定义检测引擎。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/51679.html