本篇文章主要讲解在CMDB中如何使用自定义的方式进行各种数据的自动发现,并且以mysql作为范例讲解对网络内所有mysql进行自动发现并入库的详细过程,细致入微地讲解mysql自动发现功能的实现过程。
01 前提条件
如果您只想知道怎么去自动发现mysql实例的,可以只关注文中关于mysql自发现的实现细节。

如果您已经使用了维易CMDB,那么可以通读全篇,轻松实现mysql实例的自动发现、入库等。这有两种情况,一是不想安装OneAgent,那么可以使用本篇文章中的发现mysql方法,通过自己的执行调度策略(如自发现的频率、脚本运行在哪等),调用cmdb的api实现,实现mysql实例的资产入库;申请直接使用OneAgent,OneAgent是永久免费的二进制可执行文件,可放心使用,文件可直接在veops.cn上获取。
总之不管您是哪一种情况,本篇文章都可能给您带来一些帮助,您可以根据自己的场景有选择性的读取。
02CMDB中的要求
平台已经内置了一些自动发现的方法,如各种网络设备、多种云设备的自动发现等,平台还动态支持各种自动发现的插件,用户可以根据自己的场景自定义各种各样的自发现插件;平台后续也将建设各种平台插件库,您有需要可以直接在插件库中选择适合您的自发现插件。
在CMDB中自定义的自发现插件可以在后台管理-自动发现中定义,也可以在模型配置中针对具体的模型添加属性自动发现插件。
新建自发现插件的标准样式为:
# -- coding:utf-8 --import jsonclass AutoDiscovery(object): @property def unique_key(self): """ :return: 返回唯一属性的名字 """ key = "private_ip" # 这里返回该模型唯一的属性值 return key @staticmethod def attributes(): """ 定义属性字段 :return: 返回属性字段列表, 列表项是(名称, 类型, 描述), 名称必须是英文 类型: String Integer Float Date DateTime Time JSON 例如: return [ ("ci_type", "String", "模型名称"), ("private_ip", "String", "内网IP, 多值逗号分隔") ] """ return [] @staticmethod def run(): """ 执行入口, 返回采集的属性值 :return: 返回一个列表, 列表项是字典, 字典key是属性名称, value是属性值 例如: return [dict(ci_type="server", private_ip="192.168.1.1")] """ return []if __name__ == "__main__": result = AutoDiscovery().run() if isinstance(result, list): print("AutoDiscovery::Result::{}".format(json.dumps(result))) else: print("ERROR: 采集返回必须是列表")
本文的核心将讲解如何构建如上所示的标准样式来进行mysql实例的发现,然后在平台上如何配置,直到最终在自动发现池中展现所有发现的实例,然后进行入库的完整流程。
03 Mysql自发现
基础脚步编写
脚本的编写只需按照插件的样式定义AutoDiscovery中的三个方法,本mysql自发现中定义如下:
class AutoDiscovery(object): @property def unique_key(self): return "mysql_name" @staticmethod def attributes(): return [ ("mysql_name", "String", "实例名称"), ("ip", "String", "ip"), ("port", "Integer", "端口"), ("version", "String", "版本信息") ] @staticmethod def run(): pass
其中函数unique_key和attributes的定义非常简单,只需要根据自己的场景简单定义即可,核心在函数run上,接下来将定义run的实现。
因为每个公司内部部署mysql的方式可能各种各样,所以自动发现的方式就会有所不同,本示例从几个比较通用的角度实现mysql实例的自发现。
首先定义了一个自动发现的类 Scan, 然后我们从三个方面综合进行实例的自动发现,主要包括:
1. 通过指定网段和端口进行网段扫描,扫描出全网的mysql实例。该种方法需要扫描全部网段,因此一般只需要在一台机器上执行。该方法一般只配置常用的一些mysql端口(如3306, 13306)等,不要进行全端口扫描,否则会给服务器带来极大压力。代码大致如下,详细代码可到开源地址上下载。
def get_from_ip_range(self): instances = [] if self.global_scan_ip != self.local_ip: return instances # ... return instances
2. 对本机所有端口进行扫描,根据扫描信息获取本机的所有mysql信息。
def get_local_listening_ports(self): listening_ports = self.local_listening_ports() instances = [] def worker(scan_ip, scan_port): r1 = self.scan_port(scan_ip, scan_port) if r1: instances.append(r1) semaphore.release() semaphore = threading.Semaphore(10) threads = [] for port in listening_ports: semaphore.acquire() t = threading.Thread(target=worker, args=(self.local_ip, port)) t.daemon = True threads.append(t) t.start() for i in threads: i.join() return instances
3. 本机my.cnf配置文件发现,通过配置本机配置文件的的位置,进行本机mysql实例的发现。
def get_from_config(self): instances = [] if self.local_ip == "": return instances for path in self.config_paths: config = configparser.ConfigParser() config.read(path) if 'mysqld' in config: port = config['mysqld'].get("port", "3306") pid_file = config['mysqld'].get("pid-file") ok, name = Utils.get_pid(pid_file) if ok and name == "mysqld": instances.append((self.local_ip, port, "")) return instances
将配置提到文件的开始,这样可以很方便地进行配置,如配置如下,在添加插件时更具需要改成自己的就可以了。
cidrs = ["192.168.20.8/28"] # Subnet to be scanned.global_ports_range = "3306-3310,3320" # ports to be scanned. such as "3306-3310,3320"paths = ["/etc/my.cnf", "/etc/mysql/my.cnf"]
脚本优化
在第一部进行脚步已经能够完成对mysql实例的数据采集,不过在应用到生产的时候还需要进一步优化。
本文才采集mysql实例的时候主要从下面节点上进行了一些优化:缓存问题、python 依赖安装、全局扫描以及并发上对脚本进行优化,当然在实践中可能有更多的需要优化的空间,比如发现更多没有映射主机端口的mysql容器等,这取决于是否有这样的需求。
缓存问题系统每次调用自动发现脚本时,都会执行一遍run函数,但是,扫描一次可能会耗时很久,因此,不能每次调用都需要执行全网段扫描,因此我们借助临时文件对扫描结果进行缓存。
class Cache: def __init__(self, temp_file=None, duration=None): if not temp_file: self.temp_file = os.path.join(tempfile.gettempdir(), "auto_discover_mysql_result.json") else: self.temp_file = temp_file if duration and isinstance(duration, int) and duration > 3600: self.duration = duration else: self.duration = 3600 @classmethod def convert_data(cls, data): data = { "create_at": int(time.time()), "results": data, } return data def out_date(self, data): if isinstance(data, dict) and time.time() - data.get("create_at", 0) > self.duration: return True return False def read(self): try: with open(self.temp_file, 'r') as temp_file: json_data = json.load(temp_file) except: json_data = {} return json_data def write(self, data): with open(self.temp_file, "w") as temp_file: json.dump(data, temp_file)
2. python依赖
脚本可能需要安装一些依赖,如果不想自动安装依赖,可以选择在执行脚本的节点(服务器)上预先安装好脚本的各种依赖库,当然这对于需要在很多机器上都要执行该脚本来说工作量不小;本示例选择自动安装依赖,这样就可以省去安装依赖的问题了。
class Module: def __init__(self, modules=None, index_url=None): self.modules = modules self.index_url = pip_index_url if not modules: self.modules = pip_modules if not index_url: self.index_url = pip_index_url def check_install_modules(self): for v in self.modules: self.install_missing_module(v) def install_missing_module(self, module_name): try: importlib.import_module(module_name) except ImportError: print("module '{}' is not installed. Installing...".format(module_name)) try: import pip except ImportError: print("pip is not installed. Please install it manually.") return try: pip.main(['install', '--index-url', self.index_url, module_name]) print("module '{}' has been installed successfully.".format(module_name)) except Exception as e: print("Failed to install module '{}': {}".format(module_name, str(e)))
3. 全局扫描
因为脚本可能需要在很多个节点上执行扫描任务,但是并不是所有节点上都需要执行全部的自动发现规则,如网段的扫描,因此引入全局扫描参数,脚本在执行时结合自身ip来确定是否执行网段扫描任务。
global_scan_ip = "192.168.20.10" # Identify on which device the subnet scanning is performed.def get_from_ip_range(self): instances = [] if self.global_scan_ip != self.local_ip: return instances ... pass
4. 并发扫描
对扫描任务进行并发处理,并评估自身环境运行的情况、服务的压力等合理设置并发数,从而提高采集效率。
当所有完成之后,最终在脚本中需要的配置信息示例如下:
global_scan_ip = "192.168.20.1" # Identify on which device the subnet scanning is performed.cidrs = ["192.168.20.1/27"] # Subnet to be scanned.global_ports_range = "3306-3310,3320" # ports to be scanned. such as "3306-3310,3320"paths = ["/etc/my.cnf", "/etc/mysql/my.cnf"]threading_number = 10pip_modules = ["configparser", "ipaddress", "psutil", "tempfile"]pip_index_url = 'https://pypi.douban.com/simple'
脚本执行测试
直接执行脚本,注意这里只需要保证输出结果中包含 AutoDiscovery::Result::开始的一行结果即可,如果输出有其他更多信息不会影响最终结果的获取,本示例输出样式如下:
AutoDiscovery::Result::[{"mysql_name": "192.168.20.1-3306", "ip": "192.168.20.1", "port": "3306", "version": "5.7.36"}, {"mysql_name": "192.168.20.1-3310", "ip": "192.168.20.1", "port": "3310", "version": "5.7.36-log"}, {"mysql_name": "192.168.20.3-3306", "ip": "192.168.20.3", "port": "3306", "version": "5.7.24"}]
在系统临时文件(auto_discover_mysql_result.json)中的内容输入格式大致如下:
# cat /tmp/auto_discover_mysql_result.json | python -m json.tool{ "create_at": 1699713024, "results": [ { "mysql_name": "192.168.20.1-3306", "ip": "192.168.20.1", "port": "3306", "version": "5.7.36" }, { "mysql_name": "192.168.20.1-3310", "ip": "192.168.20.1", "port": "3310", "version": "5.7.36-log" }, { "mysql_name": "192.168.20.3-3306", "ip": "192.168.20.3", "port": "3306", "version": "5.7.24" } ]}
至此,mysql实例发现的脚本完成。整个脚步已经尽可能兼容了python2与python3, 如果在运行中有问题,可适当调整代码的兼容性。
04CMDB平台配置
在模型配置中添加mysql模型,并定义模型属性,平台默认也有mysql的属性字段,用户可根据需要自行修改,本示例使用默认的mysql模型。
主要步骤如下:
1. 进入到属性自动发现标签,进行新增plugin,在模式中选择plugin,将脚本内容拷贝到代码区,执行右下角的更新字段,即可展示如下如下图所示的界面。
2. 结合企业自身的场景,配置相应的自动发现的相关配置,如下图为笔者测试配置。
3. 将模型配置与自发现脚步插件的字段进行映射,一般情形下,模型名与插件名一致会自动匹配,其他需要手动配置相应的映射关系。执行机器则根据需要去进行配置,普通账号只可以选择指定节点和从CMDB选择。测试时可以选择指定节点进行,节点需要时配置的OneAgent的id,具体可到OneAgent的配置文件中查看。
4.稍等片刻,即可在自动发现池中查看发现结果,在自动发现中即可选择对结果进行入库操作,至此,实例的自动发现完成。
关于自动发现的其他知识,可以参考一下这篇文章:尽可能通用的运维CMDB的设计与实践(ⅠⅠ) - 自动发现
05 更多
对于mysql的示例发现,在企业是环境中可能还有很多其他的情况,用户可以根据企业内部的具体场景,在本文的脚本基础上进一步拓展mysql自动发现的能力。
另外本文中对端口探测的细节限于篇幅,没有过多讲解,有兴趣可以看开源的脚本代码,在文章最后将会给出,当然给出的不一定是最佳选项,算是抛砖引玉。
结语
本文主要从mysql实例自发现出发,展示如何利用平台的自发现功能,通过文中的mysql脚本,可以轻松实现mysql实例的入库,对于其他任何实例的自动发现,均可以参考mysql的发现方法,快速实现。
最后附上本次示例中的mysql自发现的完整代码地址:
https://github.com/veops/ops-tools
最后也欢迎您参与到我们的开源CMDB的建设中来:
https://github.com/veops/cmdb