AliyunDNSHelper:阿里云dns解析助手-python实现

jupiter
2021-12-08 / 0 评论 / 748 阅读 / 正在检测是否收录...
温馨提示:
本文最后更新于2021年12月08日,已超过1078天没有更新,若内容或图片失效,请留言反馈。

1.准备工作

pip install aliyun-python-sdk-core-v3
pip install aliyun-python-sdk-domain
pip install aliyun-python-sdk-alidns
pip install requests
  • 获取accessKeyId和accessSecret

    • 可以在阿里云控制台个人中心直接获取,但是一般建议使用RAM角色来进行权限控制,这样这个accessKey和accessSecret就只能操作域名,不能操作其他的资源,相对会比较安全。关于RAM快速入门:https://help.aliyun.com/document_detail/28637.html

2.代码实现

2.1 ali-dns-helper.py`

from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkalidns.request.v20150109.DescribeSubDomainRecordsRequest import DescribeSubDomainRecordsRequest
from aliyunsdkalidns.request.v20150109.DescribeDomainRecordsRequest import DescribeDomainRecordsRequest

import requests
from urllib.request import urlopen
import json

class AliyunDNSHelper:
    def __init__(self,accessKeyId = "LTAI4GHqgJgbzPfNnkwyGqks",accessSecret = "CzXkUS8BWcjJ7qEbKrqg6qxUOdnW6M",domain = "4v7p.top"):
        """
        阿里云DNS助手初始化
        需要传入自己的accessKeyId、accessSecret、domain
        """
        self.client = AcsClient(accessKeyId, accessSecret, 'cn-hangzhou')
        self.domain = domain

    def update(self,RecordId, RR, Type, Value): 
        """
        修改域名解析记录方法
        参数列表:
            + RecordId:调用查询方法时候查到对应的记录会返回
            + RR:记录名即子域名
            + Type:记录类型包括CNAME、A、AAAA等
            + Value:记录值
        
        """
        from aliyunsdkalidns.request.v20150109.UpdateDomainRecordRequest import UpdateDomainRecordRequest
        request = UpdateDomainRecordRequest()
        request.set_accept_format('json')
        request.set_RecordId(RecordId)
        request.set_RR(RR)
        request.set_Type(Type)
        request.set_Value(Value)
        response = self.client.do_action_with_exception(request)

    def add(self,DomainName, RR, Type, Value):  
        """
        添加新的域名解析记录方法
        参数列表:
            + DomainName:待修改的主域名
            + RR:记录名即子域名
            + Type:记录类型包括CNAME、A、AAAA等
            + Value:记录值
        
        """
        from aliyunsdkalidns.request.v20150109.AddDomainRecordRequest import AddDomainRecordRequest
        request = AddDomainRecordRequest()
        request.set_accept_format('json')
        request.set_DomainName(DomainName)
        request.set_RR(RR)  # https://blog.zeruns.tech
        request.set_Type(Type)
        request.set_Value(Value)
        response = self.client.do_action_with_exception(request)
    
    def select(self,subDomian):
        """
        查询子域域名解析记录
        参数列表:
            + subDomian:子域名
        """
        request = DescribeSubDomainRecordsRequest()
        request.set_accept_format('json')
        request.set_DomainName(self.domain)
        request.set_SubDomain(subDomian + '.' + self.domain)
        response = self.client.do_action_with_exception(request)  # 获取域名解析记录列表
        domain_list = json.loads(response)  # 将返回的JSON数据转化为Python能识别的
        
        if domain_list["TotalCount"]==0:
            return None
        return domain_list["DomainRecords"]["Record"][0]
    
    def action(self,subDomian,Value,Type="A"):
        """
        执行用户关于DNS操作的的行为
        参数列表:
            + action_type操作类型,包括add,update
            + subDomian:记录名即子域名
            + Type:记录类型包括CNAME、A、AAAA等
            + Value:记录值
        """
        print("待处理的记录为  %s:%s-->Domain:%s" % (Type,Value,subDomian + '.' + self.domain))
        subDomian_record = self.select(subDomian)
        if subDomian_record:
            RecordId,RecordValue = subDomian_record["RecordId"],subDomian_record["Value"]
            if Value==RecordValue:
                print("该域名解析记录已存在,跳过")
            else:
                self.update(RecordId, subDomian, Type, Value)
                print("修改域名解析成功")
        else:
            self.add(self.domain, subDomian, Type, Value)
            print("新建域名解析成功")

dnsHelper = AliyunDNSHelper(accessKeyId = "LTAI4GHqgJgbzPfNnkwyGqks",accessSecret = "CzXkUS8BWcjJ7qEbKrqg6qxUOdnW6M",domain = "4v7p.top")

sub_domain = input("SubDomian:")
value = input("value:")
sub_domain_type = input("Type:")

dnsHelper.action(sub_domain,value,sub_domain_type)

2.2 ali-ddns-ipv6.py:实现ipv6 DDNS功能(ipv4同理)

2.2.1 获取本机公网ipv6地址

  • 方法1:通过http接口获取(推荐)
def getIPv6Address():
    text = requests.get('https://v6.ident.me').text
    return text

getIPv6Address()
'2001:da8:a012:2da:2f0:d2ff:fed0:d0d2'
  • 方法2:python执行shell命令--其中的shell命令需要根据实际情况进行修改
def get_local_ipv6():
    """
    获取本机ipv6地址,其中的shell命令需要根据实际情况进行修改
    """
    import os;
    cmd_get_ipv6 = "ifconfig enp1s0 | awk '{if(NR==3){print $3}}'" # 获取本机ipv6的shell命令,需要根据实际情况进行修改
    ipv6 = os.popen(cmd_get_ipv6).readlines()[0]

    if "/" in ipv6:
        ipv6 = ipv6.split("/")[0]

    return ipv6

get_local_ipv6()
'2001:da8:a012:2da:2f0:d2ff:fed0:d0d2'

2.2.2 配合time.sleep完成DDNS功能

from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkalidns.request.v20150109.DescribeSubDomainRecordsRequest import DescribeSubDomainRecordsRequest
from aliyunsdkalidns.request.v20150109.DescribeDomainRecordsRequest import DescribeDomainRecordsRequest

import requests
from urllib.request import urlopen
import json
import time

class AliyunDNSHelper:
    def __init__(self,accessKeyId = "LTAI4GHqgJgbzPfNnkwyGqks",accessSecret = "CzXkUS8BWcjJ7qEbKrqg6qxUOdnW6M",domain = "4v7p.top"):
        """
        阿里云DNS助手初始化
        需要传入自己的accessKeyId、accessSecret、domain
        """
        self.client = AcsClient(accessKeyId, accessSecret, 'cn-hangzhou')
        self.domain = domain

    def update(self,RecordId, RR, Type, Value): 
        """
        修改域名解析记录方法
        参数列表:
            + RecordId:调用查询方法时候查到对应的记录会返回
            + RR:记录名即子域名
            + Type:记录类型包括CNAME、A、AAAA等
            + Value:记录值
        
        """
        from aliyunsdkalidns.request.v20150109.UpdateDomainRecordRequest import UpdateDomainRecordRequest
        request = UpdateDomainRecordRequest()
        request.set_accept_format('json')
        request.set_RecordId(RecordId)
        request.set_RR(RR)
        request.set_Type(Type)
        request.set_Value(Value)
        response = self.client.do_action_with_exception(request)

    def add(self,DomainName, RR, Type, Value):  
        """
        添加新的域名解析记录方法
        参数列表:
            + DomainName:待修改的主域名
            + RR:记录名即子域名
            + Type:记录类型包括CNAME、A、AAAA等
            + Value:记录值
        
        """
        from aliyunsdkalidns.request.v20150109.AddDomainRecordRequest import AddDomainRecordRequest
        request = AddDomainRecordRequest()
        request.set_accept_format('json')
        request.set_DomainName(DomainName)
        request.set_RR(RR)  # https://blog.zeruns.tech
        request.set_Type(Type)
        request.set_Value(Value)
        response = self.client.do_action_with_exception(request)
    
    def select(self,subDomian):
        """
        查询子域域名解析记录
        参数列表:
            + subDomian:子域名
        """
        request = DescribeSubDomainRecordsRequest()
        request.set_accept_format('json')
        request.set_DomainName(self.domain)
        request.set_SubDomain(subDomian + '.' + self.domain)
        response = self.client.do_action_with_exception(request)  # 获取域名解析记录列表
        domain_list = json.loads(response)  # 将返回的JSON数据转化为Python能识别的
        
        if domain_list["TotalCount"]==0:
            return None
        return domain_list["DomainRecords"]["Record"][0]
    
    def action(self,subDomian,Value,Type="A"):
        """
        执行用户关于DNS操作的的行为
        参数列表:
            + action_type操作类型,包括add,update
            + subDomian:记录名即子域名
            + Type:记录类型包括CNAME、A、AAAA等
            + Value:记录值
        """
        print("待处理的记录为  %s:%s-->Domain:%s" % (Type,Value,subDomian + '.' + self.domain))
        subDomian_record = self.select(subDomian)
        if subDomian_record:
            RecordId,RecordValue = subDomian_record["RecordId"],subDomian_record["Value"]
            if Value==RecordValue:
                print("该域名解析记录已存在,跳过")
            else:
                self.update(RecordId, subDomian, Type, Value)
                print("修改域名解析成功")
        else:
            self.add(self.domain, subDomian, Type, Value)
            print("新建域名解析成功")

def getIPv6Address():
    text = requests.get('https://v6.ident.me').text # 接口失效后需要及时进行修改
    return text


dnsHelper = AliyunDNSHelper(accessKeyId = "LTAI4GHqgJgbzPfNnkwyGqks",accessSecret = "CzXkUS8BWcjJ7qEbKrqg6qxUOdnW6M",domain = "4v7p.top")
sub_domain = "d2550-ipv6"
sub_domain_type = "AAAA"

while True:
    print("当前时间:",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
    value = getIPv6Address()
    dnsHelper.action(sub_domain,value,sub_domain_type)
    time.sleep(2*3600) # 每隔2个小时执行1次

然后配合screen命令或者去掉无限循环使用定时任务进行执行即可完成ipv6的DDNS的功能

参考资料

  1. Python实现阿里云域名DDNS支持ipv4和ipv6
  2. python 3.7.3 shell_python3执行shell命令的几种方式
  3. Python 获取本机公网IPv6地址
  4. Python 日期和时间
0

评论 (0)

打卡
取消