shell脚本与python(python编写shell脚本详细讲解)

这次再来给大家分享一波我工作中用到的几个脚本,主要分为:Python和Shell两个部分。

Python 脚本部分实例:企业微信告警、FTP 客户端、SSH 客户端、Saltstack 客户端、vCenter 客户端、获取域名 ssl 证书过期时间、发送今天的天气预报以及未来的天气趋势图;

Shell 脚本部分实例:SVN 完整备份、Zabbix 监控用户密码过期、构建本地 YUM 以及上篇文章中有读者的需求(负载高时,查出占用比较高的进程脚本并存储或推送通知);

篇幅有些长,还请大家耐心翻到文末,毕竟有彩蛋。

shell脚本与python(python编写shell脚本详细讲解)

Python 脚本部分

企业微信告警

此脚本通过企业微信应用,进行微信告警,可用于 Zabbix 监控。

# -*- coding: utf-8 -*-<br><br><br>import requests<br>import json<br><br><br>class DLF:<br>def __init__(self, corpid, corpsecret):<br>self.url = "https://qyapi.weixin.qq.com/cgi-bin"<br>self.corpid = corpid<br>self.corpsecret = corpsecret<br>self._token = self._get_token<br><br>def _get_token(self):<br>'''<br>获取企业微信API接口的access_token<br>:return:<br>'''<br>token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret)<br>try:<br>res = requests.get(token_url).json<br>token = res['access_token']<br>return token<br>except Exception as e:<br>return str(e)<br><br>def _get_media_id(self, file_obj):<br>get_media_url = self.url + "/media/upload?access_token={}&type=file".format(self._token)<br>data = {"media": file_obj}<br><br>try:<br>res = requests.post(url=get_media_url, files=data)<br>media_id = res.json['media_id']<br>return media_id<br>except Exception as e:<br>return str(e)<br><br>def send_text(self, agentid, content, touser=None, toparty=None):<br>send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)<br>send_data = {<br>"touser": touser,<br>"toparty": toparty,<br>"msgtype": "text",<br>"agentid": agentid,<br>"text": {<br>"content": content<br>}<br>}<br><br>try:<br>res = requests.post(send_msg_url, data=json.dumps(send_data))<br>except Exception as e:<br>return str(e)<br><br>def send_image(self, agentid, file_obj, touser=None, toparty=None):<br>media_id = self._get_media_id(file_obj)<br>send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)<br>send_data = {<br>"touser": touser,<br>"toparty": toparty,<br>"msgtype": "image",<br>"agentid": agentid,<br>"image": {<br>"media_id": media_id<br>}<br>}<br><br>try:<br>res = requests.post(send_msg_url, data=json.dumps(send_data))<br>except Exception as e:<br>return str(e)<br>

FTP 客户端

通过 ftplib 模块操作 ftp 服务器,进行上传下载等操作。

# -*- coding: utf-8 -*-<br><br>from ftplib import FTP<br>from os import path<br>import copy<br><br><br>class FTPClient:<br>def __init__(self, host, user, passwd, port=21):<br>self.host = host<br>self.user = user<br>self.passwd = passwd<br>self.port = port<br>self.res = {'status': True, 'msg': None}<br>self._ftp = None<br>self._login<br><br>def _login(self):<br>'''<br>登录FTP服务器<br>:return: 连接或登录出现异常时返回错误信息<br>'''<br>try:<br>self._ftp = FTP<br>self._ftp.connect(self.host, self.port, timeout=30)<br>self._ftp.login(self.user, self.passwd)<br>except Exception as e:<br>return e<br><br>def upload(self, localpath, remotepath=None):<br>'''<br>上传ftp文件<br>:param localpath: local file path<br>:param remotepath: remote file path<br>:return:<br>'''<br>if not localpath: return 'Please select a local file. '<br># 读取本地文件<br># fp = open(localpath, 'rb')<br><br># 如果未传递远程文件路径,则上传到当前目录,文件名称同本地文件<br>if not remotepath:<br>remotepath = path.basename(localpath)<br><br># 上传文件<br>self._ftp.storbinary('STOR ' + remotepath, localpath)<br># fp.close<br><br>def download(self, remotepath, localpath=None):<br>'''<br>localpath<br>:param localpath: local file path<br>:param remotepath: remote file path<br>:return:<br>'''<br><br>if not remotepath: return 'Please select a remote file. '<br># 如果未传递本地文件路径,则下载到当前目录,文件名称同远程文件<br>if not localpath:<br>localpath = path.basename(remotepath)<br># 如果localpath是目录的话就和remotepath的basename拼接<br>if path.isdir(localpath):<br>localpath = path.join(localpath, path.basename(remotepath))<br><br># 写入本地文件<br>fp = open(localpath, 'wb')<br><br># 下载文件<br>self._ftp.retrbinary('RETR ' + remotepath, fp.write)<br>fp.close<br><br>def nlst(self, dir='/'):<br>'''<br>查看目录下的内容<br>:return: 以列表形式返回目录下的所有内容<br>'''<br>files_list = self._ftp.nlst(dir)<br>return files_list<br><br>def rmd(self, dir=None):<br>'''<br>删除目录<br>:param dir: 目录名称<br>:return: 执行结果<br>'''<br>if not dir: return 'Please input dirname'<br>res = copy.deepcopy(self.res)<br>try:<br>del_d = self._ftp.rmd(dir)<br>res['msg'] = del_d<br>except Exception as e:<br>res['status'] = False<br>res['msg'] = str(e)<br><br>return res<br><br>def mkd(self, dir=None):<br>'''<br>创建目录<br>:param dir: 目录名称<br>:return: 执行结果<br>'''<br>if not dir: return 'Please input dirname'<br>res = copy.deepcopy(self.res)<br>try:<br>mkd_d = self._ftp.mkd(dir)<br>res['msg'] = mkd_d<br>except Exception as e:<br>res['status'] = False<br>res['msg'] = str(e)<br><br>return res<br><br>def del_file(self, filename=None):<br>'''<br>删除文件<br>:param filename: 文件名称<br>:return: 执行结果<br>'''<br>if not filename: return 'Please input filename'<br>res = copy.deepcopy(self.res)<br>try:<br>del_f = self._ftp.delete(filename)<br>res['msg'] = del_f<br>except Exception as e:<br>res['status'] = False<br>res['msg'] = str(e)<br><br>return res<br><br>def get_file_size(self, filenames=[]):<br>'''<br>获取文件大小,单位是字节<br>判断文件类型<br>:param filename: 文件名称<br>:return: 执行结果<br>'''<br>if not filenames: return {'msg': 'This is an empty directory'}<br>res_l = <br>for file in filenames:<br>res_d = {}<br># 如果是目录或者文件不存在就会报错<br>try:<br>size = self._ftp.size(file)<br>type = 'f'<br>except:<br># 如果是路径的话size显示 - , file末尾加/ (/dir/)<br>size = '-'<br>type = 'd'<br>file = file + '/'<br><br>res_d['filename'] = file<br>res_d['size'] = size<br>res_d['type'] = type<br>res_l.append(res_d)<br><br>return res_l<br><br>def rename(self, old_name=None, new_name=None):<br>'''<br>重命名<br>:param old_name: 旧的文件或者目录名称<br>:param new_name: 新的文件或者目录名称<br>:return: 执行结果<br>'''<br>if not old_name or not new_name: return 'Please input old_name and new_name'<br>res = copy.deepcopy(self.res)<br>try:<br>rename_f = self._ftp.rename(old_name, new_name)<br>res['msg'] = rename_f<br>except Exception as e:<br>res['status'] = False<br>res['msg'] = str(e)<br><br>return res<br><br>def close(self):<br>'''<br>退出ftp连接<br>:return:<br>'''<br>try:<br># 向服务器发送quit命令<br>self._ftp.quit<br>except Exception:<br>return 'No response from server'<br>finally:<br># 客户端单方面关闭连接<br>self._ftp.close<br>

SSH 客户端

此脚本仅用于通过 key连接,如需要密码连接,简单修改下即可。

# -*- coding: utf-8 -*-<br><br>import paramiko<br><br>class SSHClient:<br>def __init__(self, host, port, user, pkey):<br>self.ssh_host = host<br>self.ssh_port = port<br>self.ssh_user = user<br>self.private_key = paramiko.RSAKey.from_private_key_file(pkey)<br>self.ssh = None<br>self._connect<br><br>def _connect(self):<br>self.ssh = paramiko.SSHClient<br>self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)<br>try:<br>self.ssh.connect(hostname=self.ssh_host, port=self.ssh_port, username=self.ssh_user, pkey=self.private_key, timeout=10)<br>except:<br>return 'ssh connect fail'<br><br>def execute_command(self, command):<br>stdin, stdout, stderr = self.ssh.exec_command(command)<br>out = stdout.read<br>err = stderr.read<br>return out, err<br><br>def close(self):<br>self.ssh.close<br>

Saltstack 客户端

通过 api 对 Saltstack 服务端进行操作,执行命令。

#!/usr/bin/env python<br># -*- coding:utf-8 -*-<br><br><br>import requests<br>import json<br>import copy<br><br><br>class SaltApi:<br>"""<br>定义salt api接口的类<br>初始化获得token<br>"""<br>def __init__(self):<br>self.url = "http://172.85.10.21:8000/"<br>self.username = "saltapi"<br>self.password = "saltapi"<br>self.headers = {"Content-type": "application/json"}<br>self.params = {'client': 'local', 'fun': None, 'tgt': None, 'arg': None}<br>self.login_url = self.url + "login"<br>self.login_params = {'username': self.username, 'password': self.password, 'eauth': 'pam'}<br>self.token = self.get_data(self.login_url, self.login_params)['token']<br>self.headers['X-Auth-Token'] = self.token<br><br>def get_data(self, url, params):<br>'''<br>请求url获取数据<br>:param url: 请求的url地址<br>:param params: 传递给url的参数<br>:return: 请求的结果<br>'''<br>send_data = json.dumps(params)<br>request = requests.post(url, data=send_data, headers=self.headers)<br>response = request.json<br>result = dict(response)<br>return result['return'][0]<br><br>def get_auth_keys(self):<br>'''<br>获取所有已经认证的key<br>:return:<br>'''<br>data = copy.deepcopy(self.params)<br>data['client'] = 'wheel'<br>data['fun'] = 'key.list_all'<br>result = self.get_data(self.url, data)<br>try:<br>return result['data']['return']['minions']<br>except Exception as e:<br>return str(e)<br><br>def get_grains(self, tgt, arg='id'):<br>"""<br>获取系统基础信息<br>:tgt: 目标主机<br>:return:<br>"""<br>data = copy.deepcopy(self.params)<br>if tgt:<br>data['tgt'] = tgt<br>else:<br>data['tgt'] = '*'<br>data['fun'] = 'grains.item'<br>data['arg'] = arg<br>result = self.get_data(self.url, data)<br><br>return result<br><br><br>def execute_command(self, tgt, fun='cmd.run', arg=None, tgt_type='list', salt_async=False):<br>"""<br>执行saltstack 模块命令,类似于salt '*' cmd.run 'command'<br>:param tgt: 目标主机<br>:param fun: 模块方法 可为空<br>:param arg: 传递参数 可为空<br>:return: 执行结果<br>"""<br>data = copy.deepcopy(self.params)<br><br>if not tgt: return {'status': False, 'msg': 'target host not exist'}<br>if not arg:<br>data.pop('arg')<br>else:<br>data['arg'] = arg<br>if tgt != '*':<br>data['tgt_type'] = tgt_type<br>if salt_async: data['client'] = 'local_async'<br>data['fun'] = fun<br>data['tgt'] = tgt<br>result = self.get_data(self.url, data)<br><br>return result<br><br><br>def jobs(self, fun='detail', jid=None):<br>"""<br>任务<br>:param fun: active, detail<br>:param jod: Job ID<br>:return: 任务执行结果<br>"""<br><br>data = {'client': 'runner'}<br>data['fun'] = fun<br>if fun == 'detail':<br>if not jid: return {'success': False, 'msg': 'job id is none'}<br>data['fun'] = 'jobs.lookup_jid'<br>data['jid'] = jid<br>else:<br>return {'success': False, 'msg': 'fun is active or detail'}<br>result = self.get_data(self.url, data)<br><br>return result

vCenter 客户端

通过官方 SDK 对 vCenter 进行日常操作,此脚本是我用于 cmdb 平台的,自动获取主机信息,存入数据库。

from pyVim.connect import SmartConnect, Disconnect, SmartConnectNoSSL<br>from pyVmomi import vim<br>from asset import models<br>import atexit<br><br><br>class Vmware:<br>def __init__(self, ip, user, password, port, idc, vcenter_id):<br>self.ip = ip<br>self.user = user<br>self.password = password<br>self.port = port<br>self.idc_id = idc<br>self.vcenter_id = vcenter_id<br><br>def get_obj(self, content, vimtype, name=None):<br>'''<br>列表返回,name 可以指定匹配的对象<br>'''<br>container = content.viewManager.CreateContainerView(content.rootFolder, vimtype, True)<br>obj = [ view for view in container.view ]<br>return obj<br><br>def get_esxi_info(self):<br># 宿主机信息<br>esxi_host = {}<br>res = {"connect_status": True, "msg": None}<br><br>try:<br># connect this thing<br>si = SmartConnectNoSSL(host=self.ip, user=self.user, pwd=self.password, port=self.port, connectionPoolTimeout=60)<br>except Exception as e:<br>res['connect_status'] = False<br>try:<br>res['msg'] = ("%s Caught vmodl fault : " + e.msg) % (self.ip)<br>except Exception as e:<br>res['msg'] = '%s: connection error' % (self.ip)<br>return res<br># disconnect this thing<br>atexit.register(Disconnect, si)<br>content = si.RetrieveContent<br>esxi_obj = self.get_obj(content, [vim.HostSystem])<br><br>for esxi in esxi_obj:<br>esxi_host[esxi.name] = {}<br>esxi_host[esxi.name]['idc_id'] = self.idc_id<br>esxi_host[esxi.name]['vcenter_id'] = self.vcenter_id<br>esxi_host[esxi.name]['server_ip'] = esxi.name<br>esxi_host[esxi.name]['manufacturer'] = esxi.summary.hardware.vendor<br>esxi_host[esxi.name]['server_model'] = esxi.summary.hardware.model<br><br>for i in esxi.summary.hardware.otherIdentifyingInfo:<br>if isinstance(i, vim.host.SystemIdentificationInfo):<br>esxi_host[esxi.name]['server_sn'] = i.identifierValue<br><br># 系统名称<br>esxi_host[esxi.name]['system_name'] = esxi.summary.config.product.fullName<br># cpu总核数<br>esxi_cpu_total = esxi.summary.hardware.numCpuThreads<br># 内存总量 GB<br>esxi_memory_total = esxi.summary.hardware.memorySize / 1024 / 1024 /<strong>1024</strong><br><br># 获取硬盘总量 GB<br>esxi_disk_total =<strong>0</strong><br>for ds in esxi.datastore:<br>esxi_disk_total += ds.summary.capacity / 1024 / 1024 /<strong>1024</strong><br><br># 默认配置4核8G100G,根据这个配置计算剩余可分配虚拟机<br>default_configure = {<br>'cpu': 4,<br>'memory': 8,<br>'disk':<strong>100</strong><br>}<br><br>esxi_host[esxi.name]['vm_host'] = <br>vm_usage_total_cpu =<strong>0</strong><br>vm_usage_total_memory =<strong>0</strong><br>vm_usage_total_disk =<strong>0</strong><br><br># 虚拟机信息<br>for vm in esxi.vm:<br>host_info = {}<br>host_info['vm_name'] = vm.name<br>host_info['power_status'] = vm.runtime.powerState<br>host_info['cpu_total_kernel'] = str(vm.config.hardware.numCPU) + '核'<br>host_info['memory_total'] = str(vm.config.hardware.memoryMB) + 'MB'<br>host_info['system_info'] = vm.config.guestFullName<br><br>disk_info = ''<br>disk_total =<strong>0</strong><br>for d in vm.config.hardware.device:<br>if isinstance(d, vim.vm.device.VirtualDisk):<br>disk_total += d.capacityInKB / 1024 /<strong>1024</strong><br>disk_info += d.deviceInfo.label + ": " + str((d.capacityInKB) / 1024 / 1024) + ' GB' + ','<br><br>host_info['disk_info'] = disk_info<br>esxi_host[esxi.name]['vm_host'].append(host_info)<br><br># 计算当前宿主机可用容量:总量 - 已分配的<br>if host_info['power_status'] == 'poweredOn':<br>vm_usage_total_cpu += vm.config.hardware.numCPU<br>vm_usage_total_disk += disk_total<br>vm_usage_total_memory += (vm.config.hardware.memoryMB / 1024)<br><br>esxi_cpu_free = esxi_cpu_total - vm_usage_total_cpu<br>esxi_memory_free = esxi_memory_total - vm_usage_total_memory<br>esxi_disk_free = esxi_disk_total - vm_usage_total_disk<br><br>esxi_host[esxi.name]['cpu_info'] = 'Total: %d核, Free: %d核' % (esxi_cpu_total, esxi_cpu_free)<br>esxi_host[esxi.name]['memory_info'] = 'Total: %dGB, Free: %dGB' % (esxi_memory_total, esxi_memory_free)<br>esxi_host[esxi.name]['disk_info'] = 'Total: %dGB, Free: %dGB' % (esxi_disk_total, esxi_disk_free)<br><br># 计算cpu 内存 磁盘按照默认资源分配的最小值,即为当前可分配资源<br>if esxi_cpu_free < 4 or esxi_memory_free < 8 or esxi_disk_free < 100:<br>free_allocation_vm_host =<strong>0</strong><br>else:<br>free_allocation_vm_host = int(min(<br>[<br>esxi_cpu_free / default_configure['cpu'],<br>esxi_memory_free / default_configure['memory'],<br>esxi_disk_free / default_configure['disk']<br>]<br>))<br>esxi_host[esxi.name]['free_allocation_vm_host'] = free_allocation_vm_host<br>esxi_host['connect_status'] = True<br>return esxi_host<br><br>def write_to_db(self):<br>esxi_host = self.get_esxi_info<br># 连接失败<br>if not esxi_host['connect_status']:<br>return esxi_host<br><br>del esxi_host['connect_status']<br><br>for machine_ip in esxi_host:<br># 物理机信息<br>esxi_host_dict = esxi_host[machine_ip]<br># 虚拟机信息<br>virtual_host = esxi_host[machine_ip]['vm_host']<br>del esxi_host[machine_ip]['vm_host']<br><br>obj = models.EsxiHost.objects.create(**esxi_host_dict)<br>obj.save<br><br>for host_info in virtual_host:<br>host_info['management_host_id'] = obj.id<br>obj2 = models.virtualHost.objects.create(**host_info)<br>obj2.save<br>

获取域名 ssl 证书过期时间

用于 zabbix 告警

import re<br>import sys<br>import time<br>import subprocess<br>from datetime import datetime<br>from io import StringIO<br><br>def main(domain):<br>f = StringIO<br>comm = f"curl -Ivs https://{domain} --connect-timeout 10"<br><br>result = subprocess.getstatusoutput(comm)<br>f.write(result[1])<br><br>try:<br>m = re.search('start date: (.*?)n.*?expire date: (.*?)n.*?common name: (.*?)n.*?issuer: CN=(.*?)n', f.getvalue, re.S)<br>start_date = m.group(1)<br>expire_date = m.group(2)<br>common_name = m.group(3)<br>issuer = m.group(4)<br>except Exception as e:<br>return 999999999<br><br># time 字符串转时间数组<br>start_date = time.strptime(start_date, "%b %d %H:%M:%S %Y GMT")<br>start_date_st = time.strftime("%Y-%m-%d %H:%M:%S", start_date)<br># datetime 字符串转时间数组<br>expire_date = datetime.strptime(expire_date, "%b %d %H:%M:%S %Y GMT")<br>expire_date_st = datetime.strftime(expire_date,"%Y-%m-%d %H:%M:%S")<br><br># 剩余天数<br>remaining = (expire_date-datetime.now).days<br><br>return remaining<br><br>if __name__ == "__main__":<br>domain = sys.argv[1]<br>remaining_days = main(domain)<br>print(remaining_days)

发送今天的天气预报以及未来的天气趋势图

shell脚本与python(python编写shell脚本详细讲解)

此脚本用于给老婆大人发送今天的天气预报以及未来的天气趋势图,现在微信把网页端禁止了,没法发送到微信了,我是通过企业微信进行通知的,需要把你老婆大人拉到企业微信,无兴趣的小伙伴跳过即可。

 # -*- coding: utf-8 -*-<br><br><br>import requests<br>import json<br>import datetime<br><br>def weather(city):<br>url = "http://wthrcdn.etouch.cn/weather_mini?city=%s" % city<br><br>try:<br>data = requests.get(url).json['data']<br>city = data['city']<br>ganmao = data['ganmao']<br><br>today_weather = data['forecast'][0]<br>res = "老婆今天是{}n今天天气概况n城市: {:<10}n时间: {:<10}n高温: {:<10}n低温: {:<10}n风力: {:<10}n风向: {:<10}n天气: {:<10}nn稍后会发送近期温度趋势图,请注意查看。<br>".format(<br>ganmao,<br>city,<br>datetime.datetime.now.strftime('%Y-%m-%d'),<br>today_weather['high'].split[1],<br>today_weather['low'].split[1],<br>today_weather['fengli'].split('[')[2].split(']')[0],<br>today_weather['fengxiang'],today_weather['type'],<br>)<br><br>return {"source_data": data, "res": res}<br>except Exception as e:<br>return str(e)<br>```<br>+ 获取天气预报趋势图<br>```python<br># -*- coding: utf-8 -*-<br><br><br>import matplotlib.pyplot as plt<br>import re<br>import datetime<br><br><br>def Future_weather_states(forecast, save_path, day_num=5):<br>'''<br>展示未来的天气预报趋势图<br>:param forecast: 天气预报预测的数据<br>:param day_num: 未来几天<br>:return: 趋势图<br>'''<br>future_forecast = forecast<br>dict={}<br><br>for i in range(day_num):<br>data = <br>date = future_forecast[i]["date"]<br>date = int(re.findall("d+",date)[0])<br>data.append(int(re.findall("d+", future_forecast[i]["high"])[0]))<br>data.append(int(re.findall("d+", future_forecast[i]["low"])[0]))<br>data.append(future_forecast[i]["type"])<br>dict[date] = data<br><br>data_list = sorted(dict.items)<br>date=<br>high_temperature = <br>low_temperature = <br>for each in data_list:<br>date.append(each[0])<br>high_temperature.append(each[1][0])<br>low_temperature.append(each[1][1])<br>fig = plt.plot(date,high_temperature,"r",date,low_temperature,"b")<br><br>current_date = datetime.datetime.now.strftime('%Y-%m')<br>plt.rcParams['font.sans-serif'] = ['SimHei']<br>plt.rcParams['axes.unicode_minus'] = False<br>plt.xlabel(current_date)<br>plt.ylabel("℃")<br>plt.legend(["高温", "低温"])<br>plt.xticks(date)<br>plt.title("最近几天温度变化趋势")<br>plt.savefig(save_path)<br>```<br>+ 发送到企业微信<br>```python<br># -*- coding: utf-8 -*-<br><br><br>import requests<br>import json<br><br><br>class DLF:<br>def __init__(self, corpid, corpsecret):<br>self.url = "https://qyapi.weixin.qq.com/cgi-bin"<br>self.corpid = corpid<br>self.corpsecret = corpsecret<br>self._token = self._get_token<br><br>def _get_token(self):<br>'''<br>获取企业微信API接口的access_token<br>:return:<br>'''<br>token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret)<br>try:<br>res = requests.get(token_url).json<br>token = res['access_token']<br>return token<br>except Exception as e:<br>return str(e)<br><br>def _get_media_id(self, file_obj):<br>get_media_url = self.url + "/media/upload?access_token={}&type=file".format(self._token)<br>data = {"media": file_obj}<br><br>try:<br>res = requests.post(url=get_media_url, files=data)<br>media_id = res.json['media_id']<br>return media_id<br>except Exception as e:<br>return str(e)<br><br>def send_text(self, agentid, content, touser=None, toparty=None):<br>send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)<br>send_data = {<br>"touser": touser,<br>"toparty": toparty,<br>"msgtype": "text",<br>"agentid": agentid,<br>"text": {<br>"content": content<br>}<br>}<br><br>try:<br>res = requests.post(send_msg_url, data=json.dumps(send_data))<br>except Exception as e:<br>return str(e)<br><br>def send_image(self, agentid, file_obj, touser=None, toparty=None):<br>media_id = self._get_media_id(file_obj)<br>send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)<br>send_data = {<br>"touser": touser,<br>"toparty": toparty,<br>"msgtype": "image",<br>"agentid": agentid,<br>"image": {<br>"media_id": media_id<br>}<br>}<br><br>try:<br>res = requests.post(send_msg_url, data=json.dumps(send_data))<br>except Exception as e:<br>return str(e)<br>+ main脚本<br><br># -*- coding: utf-8 -*-<br><br><br>from plugins.weather_forecast import weather<br>from plugins.trend_chart import Future_weather_states<br>from plugins.send_wechat import DLF<br>import os<br><br><br># 企业微信相关信息<br>corp<br>corpsecret = "xxx"<br>agent<br># 天气预报趋势图保存路径<br>_path = os.path.dirname(os.path.abspath(__file__))<br>save_path = os.path.join(_path ,'./tmp/weather_forecast.jpg')<br><br># 获取天气预报信息<br>content = weather("大兴")<br><br># 发送文字消息<br>dlf = DLF(corpid, corpsecret)<br>dlf.send_text(agentid=agentid, content=content['res'], toparty='1')<br><br># 生成天气预报趋势图<br>Future_weather_states(content['source_data']['forecast'], save_path)<br># 发送图片消息<br>file_obj = open(save_path, 'rb')<br>dlf.send_image(agentid=agentid, toparty='1', file_obj=file_obj)<br>

Shell 脚本部分

SVN 完整备份

通过 hotcopy进行 SVN 完整备份,备份保留 7 天。

#!/bin/bash<br># Filename : svn_backup_repos.sh<br># Date : 2020/12/14<br># Author : JakeTian<br># Email : JakeTian@***.com<br># Crontab : 59 23 * * * /bin/bash $BASE_PATH/svn_backup_repos.sh >/dev/ 2>&1<br># Notes : 将脚本加入crontab中,每天定时执行<br># Description: SVN完全备份<br><br><br>set -e<br><br>SRC_PATH="/opt/svndata"<br>DST_PATH="/data/svnbackup"<br>LOG_FILE="$DST_PATH/logs/svn_backup.log"<br>SVN_BACKUP_C="/bin/svnadmin hotcopy"<br>SVN_LOOK_C="/bin/svnlook youngest"<br>TODAY=$(date +'%F')<br>cd $SRC_PATH<br>ALL_REPOS=$(find ./ -maxdepth 1 -type d ! -name 'httpd' -a ! -name 'bak' | tr -d './')<br><br># 创建备份目录,备份脚本日志目录<br>test -d $DST_PATH || mkdir -p $DST_PATH<br>test -d $DST_PATH/logs || mkdir $DST_PATH/logs<br>test -d $DST_PATH/$TODAY || mkdir $DST_PATH/$TODAY<br><br># 备份repos文件<br>for repo in $ALL_REPOS<br>do<br>$SVN_BACKUP_C $SRC_PATH/$repo $DST_PATH/$TODAY/$repo<br><br># 判断备份是否完成<br>if $SVN_LOOK_C $DST_PATH/$TODAY/$repo;then<br>echo "$TODAY: $repo Backup Success" >> $LOG_FILE<br>else<br>echo "$TODAY: $repo Backup Fail" >> $LOG_FILE<br>fi<br>done<br><br># # 备份用户密码文件和权限文件<br>cp -p authz access.conf $DST_PATH/$TODAY<br><br># 日志文件转储<br>mv $LOG_FILE $LOG_FILE-$TODAY<br><br># 删除七天前的备份<br>seven_days_ago=$(date -d "7 days ago" +'%F')<br>rm -rf $DST_PATH/$seven_days_ago<br>

zabbix 监控用户密码过期

用于 Zabbix 监控 Linux 系统用户(shell 为 /bin/bash 和 /bin/sh)密码过期,密码有效期剩余 7 天触发加自动发现用户。

#!/bin/bash<br><br><br>diskarray=(`awk -F':' '$NF ~ //bin/bash/||//bin/sh/{print $1}' /etc/passwd`)<br>length=${#diskarray[@]}<br><br>printf "{n"<br>printf 't'""data":["<br>for ((i=0;i<$length;i++))<br>do<br>printf 'ntt{'<br>printf ""{#USER_NAME}":"${diskarray[$i]}"}"<br>if [ $i -lt $[$length-1] ];then<br>printf ','<br>fi<br>done<br>printf "nt]n"<br>printf "}n"<br><br>检查用户密码过期<br><br>#!/bin/bash<br><br>export LANG=en_US.UTF-8<br><br>SEVEN_DAYS_AGO=$(date -d '-7 day' +'%s')<br>user="$1"<br><br># 将Sep 09, 2018格式的时间转换成unix时间<br>expires_date=$(sudo chage -l $user | awk -F':' '/Password expires/{print $NF}' | sed -n 's/^ //p')<br>if [[ "$expires_date" != "never" ]];then<br>expires_date=$(date -d "$expires_date" +'%s')<br><br>if [ "$expires_date" -le "$SEVEN_DAYS_AGO" ];then<br>echo "1"<br>else<br>echo "0"<br>fi<br>else<br>echo "0"<br>fi<br>

构建本地YUM

通过 rsync 的方式同步 yum,通过 nginx 只做 http yum 站点;

但是 centos6 的镜像最近都不能用了,国内貌似都禁用了,如果找到合适的自行更换地址。

#!/bin/bash<br># 更新yum镜像<br><br><br>RsyncCommand="rsync -rvutH -P --delete --delete-after --delay-updates --bwlimit=1000"<br>DIR="/app/yumData"<br>LogDir="$DIR/logs"<br>Centos6Base="$DIR/Centos6/x86_64/Base"<br>Centos7Base="$DIR/Centos7/x86_64/Base"<br>Centos6Epel="$DIR/Centos6/x86_64/Epel"<br>Centos7Epel="$DIR/Centos7/x86_64/Epel"<br>Centos6Salt="$DIR/Centos6/x86_64/Salt"<br>Centos7Salt="$DIR/Centos7/x86_64/Salt"<br>Centos6Update="$DIR/Centos6/x86_64/Update"<br>Centos7Update="$DIR/Centos7/x86_64/Update"<br>Centos6Docker="$DIR/Centos6/x86_64/Docker"<br>Centos7Docker="$DIR/Centos7/x86_64/Docker"<br>Centos6Mysql5_7="$DIR/Centos6/x86_64/Mysql/Mysql5.7"<br>Centos7Mysql5_7="$DIR/Centos7/x86_64/Mysql/Mysql5.7"<br>Centos6Mysql8_0="$DIR/Centos6/x86_64/Mysql/Mysql8.0"<br>Centos7Mysql8_0="$DIR/Centos7/x86_64/Mysql/Mysql8.0"<br>MirrorDomain="rsync://rsync.mirrors.ustc.edu.cn"<br><br># 目录不存在就创建<br>check_dir{<br>for dir in $*<br>do<br>test -d $dir || mkdir -p $dir<br>done<br>}<br><br># 检查rsync同步结果<br>check_rsync_status{<br>if [ $? -eq 0 ];then<br>echo "rsync success" >> $1<br>else<br>echo "rsync fail" >> $1<br>fi<br>}<br><br><br>check_dir $DIR $LogDir $Centos6Base $Centos7Base $Centos6Epel $Centos7Epel $Centos6Salt $Centos7Salt $Centos6Update $Centos7Update $Centos6Docker $Centos7Docker $Centos6Mysql5_7 $Centos7Mysql5_7 $Centos6Mysql8_0 $Centos7Mysql8_0<br><br><br># Base yumrepo<br>#$RsyncCommand "$MirrorDomain"/repo/centos/6/os/x86_64/ $Centos6Base >> "$LogDir/centos6Base.log" 2>&1<br># check_rsync_status "$LogDir/centos6Base.log"<br>$RsyncCommand "$MirrorDomain"/repo/centos/7/os/x86_64/ $Centos7Base >> "$LogDir/centos7Base.log" 2>&1<br>check_rsync_status "$LogDir/centos7Base.log"<br><br># Epel yumrepo<br># $RsyncCommand "$MirrorDomain"/repo/epel/6/x86_64/ $Centos6Epel >> "$LogDir/centos6Epel.log" 2>&1<br># check_rsync_status "$LogDir/centos6Epel.log"<br>$RsyncCommand "$MirrorDomain"/repo/epel/7/x86_64/ $Centos7Epel >> "$LogDir/centos7Epel.log" 2>&1<br>check_rsync_status "$LogDir/centos7Epel.log"<br><br># SaltStack yumrepo<br># $RsyncCommand "$MirrorDomain"/repo/salt/yum/redhat/6/x86_64/ $Centos6Salt >> "$LogDir/centos6Salt.log" 2>&1<br># ln -s $Centos6Salt/archive/$(ls $Centos6Salt/archive | tail -1) $Centos6Salt/latest<br># check_rsync_status "$LogDir/centos6Salt.log"<br>$RsyncComman "$MirrorDomain"/repo/salt/yum/redhat/7/x86_64/ $Centos7Salt >> "$LogDir/centos7Salt.log" 2>&1<br>check_rsync_status "$LogDir/centos7Salt.log"<br># ln -s $Centos7Salt/archive/$(ls $Centos7Salt/archive | tail -1) $Centos7Salt/latest<br><br># Docker yumrepo<br>$RsyncCommand "$MirrorDomain"/repo/docker-ce/linux/centos/7/x86_64/stable/ $Centos7Docker >> "$LogDir/centos7Docker.log" 2>&1<br>check_rsync_status "$LogDir/centos7Docker.log"<br><br># centos update yumrepo<br># $RsyncCommand "$MirrorDomain"/repo/centos/6/updates/x86_64/ $Centos6Update >> "$LogDir/centos6Update.log" 2>&1<br># check_rsync_status "$LogDir/centos6Update.log"<br>$RsyncCommand "$MirrorDomain"/repo/centos/7/updates/x86_64/ $Centos7Update >> "$LogDir/centos7Update.log" 2>&1<br>check_rsync_status "$LogDir/centos7Update.log"<br><br># mysql 5.7 yumrepo<br># $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-5.7-community/el/6/x86_64/ "$Centos6Mysql5_7" >> "$LogDir/centos6Mysql5.7.log" 2>&1<br># check_rsync_status "$LogDir/centos6Mysql5.7.log"<br>$RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-5.7-community/el/7/x86_64/ "$Centos7Mysql5_7" >> "$LogDir/centos7Mysql5.7.log" 2>&1<br>check_rsync_status "$LogDir/centos7Mysql5.7.log"<br><br># mysql 8.0 yumrepo<br># $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-8.0-community/el/6/x86_64/ "$Centos6Mysql8_0" >> "$LogDir/centos6Mysql8.0.log" 2>&1<br># check_rsync_status "$LogDir/centos6Mysql8.0.log"<br>$RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-8.0-community/el/7/x86_64/ "$Centos7Mysql8_0" >> "$LogDir/centos7Mysql8.0.log" 2>&1<br>check_rsync_status "$LogDir/centos7Mysql8.0.log"

读者需求解答

负载高时,查出占用比较高的进程脚本并存储或推送通知

这部分内容是上篇 Shell 脚本实例中底部读者留言的需求,如下:

shell脚本与python(python编写shell脚本详细讲解)

#!/bin/bash<br><br># 物理cpu个数<br>physical_cpu_count=$(egrep 'physical id' /proc/cpuinfo | sort | uniq | wc -l)<br># 单个物理cpu核数<br>physical_cpu_cores=$(egrep 'cpu cores' /proc/cpuinfo | uniq | awk '{print $NF}')<br># 总核数<br>total_cpu_cores=$((physical_cpu_count*physical_cpu_cores))<br><br># 分别是一分钟、五分钟、十五分钟负载的阈值,其中有一项超过阈值才会触发<br>one_min_load_threshold="$total_cpu_cores"<br>five_min_load_threshold=$(awk 'BEGIN {print '"$total_cpu_cores"' * "0.8"}')<br>fifteen_min_load_threshold=$(awk 'BEGIN {print '"$total_cpu_cores"' * "0.7"}')<br><br># 分别是分钟、五分钟、十五分钟负载平均值<br>one_min_load=$(uptime | awk '{print $(NF-2)}' | tr -d ',')<br>five_min_load=$(uptime | awk '{print $(NF-1)}' | tr -d ',')<br>fifteen_min_load=$(uptime | awk '{print $NF}' | tr -d ',')<br><br># 获取当前cpu 内存 磁盘io信息,并写入日志文件<br># 如果需要发送消息或者调用其他,请自行编写函数即可<br>get_info{<br>log_dir="cpu_high_script_log"<br>test -d "$log_dir" || mkdir "$log_dir"<br>ps -eo user,pid,%cpu,stat,time,command --sort -%cpu | head -10 > "$log_dir"/cpu_top10.log<br>ps -eo user,pid,%mem,rss,vsz,stat,time,command --sort -%mem | head -10 > "$log_dir"/mem_top10.log<br>iostat -dx 1 10 > "$log_dir"/disk_io_10.log<br>}<br><br><br>export -f get_info<br><br>echo "$one_min_load $one_min_load_threshold $five_min_load $five_min_load_threshold $fifteen_min_load $fifteen_min_load_threshold" | <br>awk '{ if ($1>=$2 || $3>=$4 || $5>=$6) system("get_info") }'<br>

以上,就是今天分享的全部内容了。

希望大家通过这些案例能够学以致用,结合自身的实际场景进行运用,从而提高自己的工作效率。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发表评论

登录后才能评论