RDS备份上传至OSS

  1. 1. 说明
  2. 2. 方案
  3. 3. 功能要求
  4. 4. 更新
    1. 4.1. 2022-11-12
  5. 5. 代码

说明

最近公司项目在做等保 2.0 等级 3,结果评测机构一测发现数据 i 库没有做异地备份,然后看了下阿里云 rds 的异地备份真贵,和阿里云还有评测机构的沟通下,只用把 RDS 数据备份放在其他其他机房即可以满足,本着为公司节省的态度,决定把每日的物理备份通过阿里云接口来同步到同地区的 OSS 内网中。

方案

使用一台同区域的服务器作为中转服务器,通过 RDS 备份列表接口先将每日的物理备份拉取到中转服务器上,拉取完毕后在上传到 oss,上传完删除本地物理备份,全程走内网,只占用 oss 的空间,毕竟 oss 空间还算便宜。

功能要求

python 3.6 以上
RDS 需要 AliyunRDSFullAccess 权限
OSS 需要桶的 PutObject、ListObjects、ListParts、ListMultipartUploads 权限
如需企业微信推送,需开通企业微信及创建第三方应用

更新

2022-11-12

修复阿里云 RDS 备份格式问题

代码

安装 py 环境库

1
pip3 install aliyun-python-sdk-core aliyun-python-sdk-rds oss2

计划任务

1
2
脚本定时任务设为 每天早上8点
阿里云上RDS备份任务设为 每天早上4点

脚本主代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python
# -*- encoding: utf-8 -*-

# here put the import lib

import re,os,oss2,requests,sys,json,datetime,urllib.request
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.auth.credentials import AccessKeyCredential
from aliyunsdkcore.auth.credentials import StsTokenCredential
from aliyunsdkrds.request.v20140815.DescribeBackupsRequest import DescribeBackupsRequest

def Get_Token(Corpid, Secret):
url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'
values = {
"corpid": Corpid,
"corpsecret": Secret
}
req = requests.post(url, params=values)
data = json.loads(req.text)
Token = data["access_token"]
return Token

def WeiXin(Token, Agentid, Content):
url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s" % Token
data = {
"touser": UserID, # 员工id,与部门ID 2选1
# "toparty": PartyID, # 部门id,与用户ID 2选1
"msgtype": "text", # 消息类型,该字段非空
"agentid": Agentid,
"text": {
"content": Content.replace('\\n', '\n') # 消息内容,非空
},
"safe": "0" # 表示是否是保密消息,0表示否,1表示是,默认0
}
res = requests.post(url, json=data)
return res.text

def Get_BackupDownloadUrl(DBInstanceId, LocalDir):
request = DescribeBackupsRequest()
request.set_accept_format('json')
request.set_DBInstanceId(DBInstanceId)
request.set_BackupStatus("Success")
request.set_BackupMode("Automated")
StartTime = (datetime.datetime.utcnow() - datetime.timedelta(hours = 4)).strftime('%Y-%m-%dT%H:%MZ')
EndTime = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%MZ')
request.set_StartTime(StartTime)
request.set_EndTime(EndTime)
response = client.do_action_with_exception(request)
jdata = json.loads(response.decode('utf-8'))
# 内网备份地址
BackupDownloadURL = jdata['Items']['Backup'][0]['BackupIntranetDownloadURL']
# 备份完成时间
BackupTime = jdata['Items']['Backup'][0]['BackupEndTime']

# 检测RDS全量备份格式
try:
idx = BackupDownloadURL.index('xb.qp')
file_extension = "1"
except ValueError:
idx = BackupDownloadURL.index('qp.xb')
file_extension = "2"

# 获取备份的名字
FileName = BackupDownloadURL[8:idx + 5].replace('/', '_')
SrcFilePath = "%s/%s" % (LocalDir, FileName)
return BackupDownloadURL, SrcFilePath, BackupTime, file_extension

def DownBackupRDS(BackupDownloadURL, SrcFilePath):
urllib.request.urlretrieve(BackupDownloadURL, SrcFilePath)
return

def UpdateOSS(BucketInfo, DstFilePath, SrcFilePath):
try:
oss2.resumable_upload(BucketInfo, DstFilePath, SrcFilePath)
except Exception as e:
print(e)

def Message():
# 发送信息
if WeChatWork_Status == "yes":
Token = Get_Token(Corpid, Secret)
Send_Message = WeiXin(Token, Agentid, Content)
print(Send_Message)
else:
print(Content)

if __name__ == '__main__':
AccessKeyId = '' # 阿里云AccessKey
AccessSecret = '' # 阿里云Secret
RegionId = '' # OSS桶区域
Bucket_name = '' # OSS桶名
DBInstanceId = '' # 数据库ID

LocalDir = '' # 本地路径

# 企业微信通知
WeChatWork_Status = 'no' # 是否开启企业微信通知yes或no
Corpid = '' # 企业微信Corpid
Secret = '' # 企业微信应用Secret
Agentid = '' # 企业微信应用id
# UserID = "" # 用户ID列表,对应用户的'用户ID'字段,与部门ID 2选1
PartyID = '' # 部门ID列表,对应部门的'部门ID'字段,与用户ID 2选1

# Aliyun SDK登陆
credentials = AccessKeyCredential(AccessKeyId, AccessSecret)
client = AcsClient(region_id=RegionId, credential=credentials)
Token = Get_Token(Corpid, Secret)

# 获取RDS备份下载地址
try:
BackupDownloadURL, SrcFilePath, BackupTime, file_extension = Get_BackupDownloadUrl(DBInstanceId, datadir)
print(BackupDownloadURL)
print(BackupTime)
except Exception as e:
Content = "获取RDS备份下载失败,请上线检查"
Message()
exit()

# 下载RDS备份
DownBackupRDS(BackupDownloadURL, SrcFilePath)

# Aliyun OSS登陆
auth = oss2.Auth(AccessKeyId, AccessSecret)
RegionUrl = 'https://oss-' + RegionId + '-internal.aliyuncs.com'
BucketInfo = oss2.Bucket(auth, RegionUrl, Bucket_name)

# 获取当时日期,并进行备份重命名
UtcTime = datetime.datetime.strptime(BackupTime, "%Y-%m-%dT%H:%M:%SZ")
Localtime = UtcTime + datetime.timedelta(hours = 8)
TodayTime = Localtime.strftime("%Y-%m-%d")

if file_extension == "1":
DstFilePath = TodayTime + "-" + "School-RDS-Backup.xb.qp"
elif file_extension == "2":
DstFilePath = TodayTime + "-" + "School-RDS-Backup.qp.xb"

# 上传OSS
try:
UpdateOSS(BucketInfo, DstFilePath, SrcFilePath)
# 删除本地备份文件
os.remove(SrcFilePath)
Content = TodayTime + ' RDS备份上传至OSS成功'
Message()
except Exception as e:
Content = "RDS备份上传oss错误:" + e
Message()