Source code for dyn.tm.zones

# -*- coding: utf-8 -*-
"""This module contains all Zone related API objects."""
import os
import logging
from time import sleep
from datetime import datetime

from .utils import unix_date
from ..compat import force_unicode
from .errors import *
from .records import *
from .session import DynectSession
from .services import *

__author__ = 'jnappi'
__all__ = ['get_all_zones', 'Zone', 'SecondaryZone', 'Node']

RECS = {'A': ARecord, 'AAAA': AAAARecord, 'ALIAS': ALIASRecord,
        'CDS': CDSRecord, 'CDNSKEY': CDNSKEYRecord, 'CSYNC': CSYNCRecord,
        'CERT': CERTRecord,'CNAME': CNAMERecord, 'DHCID': DHCIDRecord,
        'DNAME': DNAMERecord, 'DNSKEY': DNSKEYRecord, 'DS': DSRecord,
        'KEY': KEYRecord,'KX': KXRecord, 'LOC': LOCRecord,
        'IPSECKEY': IPSECKEYRecord,'MX': MXRecord, 'NAPTR': NAPTRRecord,
        'PTR': PTRRecord, 'PX': PXRecord, 'NSAP': NSAPRecord,
        'RP': RPRecord, 'NS': NSRecord, 'SOA': SOARecord,
        'SPF': SPFRecord, 'SRV': SRVRecord, 'TLSA': TLSARecord,
        'TXT': TXTRecord, 'SSHFP': SSHFPRecord, 'UNKNOWN': UNKNOWNRecord}


[docs]def get_all_zones(): """Accessor function to retrieve a *list* of all :class:`~dyn.tm.zones.Zone`'s accessible to a user :return: a *list* of :class:`~dyn.tm.zones.Zone`'s """ uri = '/Zone/' api_args = {'detail': 'Y'} response = DynectSession.get_session().execute(uri, 'GET', api_args) zones = [] for zone in response['data']: zones.append(Zone(zone['zone'], api=False, **zone)) return zones
[docs]def get_all_secondary_zones(): """Accessor function to retrieve a *list* of all :class:`SecondaryZone`'s accessible to a user :return: a *list* of :class:`~dyn.tm.zones.SecondaryZone`'s """ uri = '/Secondary/' api_args = {'detail': 'Y'} response = DynectSession.get_session().execute(uri, 'GET', api_args) zones = [] for zone in response['data']: zones.append(SecondaryZone(zone.pop('zone'), api=False, **zone)) return zones
def get_apex(node_name, fullDetails=False): """Accessor function to retireve the apex zone name of a given node available to logged in user. :param node_name: name of the node to search for apex for. :param fullDetails: if true, returns zone_type, serial_type, and serial along with apex zone name :return: if fullDetails = False: a *string* containing apex zone name. :return: if fullDetails = True: a *dict* containing apex zone name, zone_type, serial_type, and serial. """ uri = '/Apex/{}'.format(node_name) api_args = {} response = DynectSession.get_session().execute(uri, 'GET', api_args) if fullDetails: return response['data'] else: return response['data']['zone']
[docs]class Zone(object): """A class representing a DynECT Zone"""
[docs] def __init__(self, name, *args, **kwargs): """Create a :class:`Zone` object. Note: When creating a new :class:`Zone` if no contact is specified the path to a local zone file must be passed to the ``file_name`` param. :param name: the name of the zone to create :param contact: Administrative contact for this zone :param ttl: TTL (in seconds) for records in the zone :param serial_style: The style of the zone's serial. Valid values: increment, epoch, day, minute :param file_name: The path to a valid RFC1035, BIND, or tinydns style Master file. Note: this file must be under 1mb in size. :param master_ip: The IP of the master server from which to fetch zone data for Transferring this :class:`Zone`. Note: This argument is required for performing a valid ZoneTransfer operation. :param timeout: The time, in minutes, to wait for a zone xfer to complete """ super(Zone, self).__init__() self.valid_serials = ('increment', 'epoch', 'day', 'minute') self._name = name self._fqdn = self._name if self._fqdn and not self._fqdn.endswith('.'): self._fqdn += '.' self._contact = self._ttl = self._serial_style = self._serial = None self._zone = self._status = None self.records = {} self.services = {} self.uri = '/Zone/{}/'.format(self._name) if 'api' in kwargs: del kwargs['api'] for key, val in kwargs.items(): setattr(self, '_' + key, val) self._name = self._zone self.uri = '/Zone/{}/'.format(self._name) elif len(args) == 0 and len(kwargs) == 0: self._get() else: self._post(*args, **kwargs) self._status = 'active'
def _post(self, contact=None, ttl=60, serial_style='increment', file_name=None, master_ip=None, timeout=None): """Create a new :class:`Zone` object on the DynECT System""" if contact is None and file_name is None and master_ip is None: raise DynectInvalidArgumentError('contact', None) if file_name is not None: self._post_with_file(file_name) elif master_ip is not None: self._xfer(master_ip, timeout) else: self._contact = contact self._ttl = ttl if serial_style not in self.valid_serials: raise DynectInvalidArgumentError(serial_style, self.valid_serials) self._serial_style = serial_style api_args = {'zone': self._name, 'rname': self._contact, 'ttl': self._ttl, 'serial_style': self._serial_style} response = DynectSession.get_session().execute(self.uri, 'POST', api_args) self._build(response['data']) def _post_with_file(self, file_name): """Create a :class:`Zone` from a RFC1035 style Master file. A ZoneFile for BIND or tinydns will also be accepted :param file_name: The path to a valid ZoneFile """ full_path = os.path.abspath(file_name) file_size = os.path.getsize(full_path) if file_size > 1048576: raise DynectInvalidArgumentError('Zone File Size', file_size, 'Under 1MB') else: uri = '/ZoneFile/{}/'.format(self.name) f = open(full_path, 'r') content = f.read() f.close() api_args = {'file': content} DynectSession.get_session().execute(uri, 'POST', api_args) self.__poll_for_get() def _xfer(self, master_ip, timeout=None): """Create a :class:`Zone` by ZoneTransfer by providing an optional master_ip argument. """ uri = '/ZoneTransfer/{}/'.format(self.name) api_args = {'master_ip': master_ip} DynectSession.get_session().execute(uri, 'POST', api_args) time_out = timeout or 10 count = 0 while count < time_out: response = DynectSession.get_session().execute(uri, 'GET', {}) if response['status'] == 'running' and response['message'] == '': sleep(60) count += 1 else: break self._get() def __poll_for_get(self, n_loops=10, xfer=False, xfer_master_ip=None): """For use ONLY by _post_with_file and _xfer. Will wait at MOST ``n_loops * 2`` seconds for a successfull GET API response. If no successfull get is recieved no error will be raised. """ count = 0 got = False while count < n_loops: try: self._get() got = True break except DynectGetError: sleep(2) count += 1 if not got and xfer: uri = '/ZoneTransfer/{}/'.format(self.name) api_args = {} if xfer_master_ip is not None: api_args['master_ip'] = xfer_master_ip response = DynectSession.get_session().execute(uri, 'GET', api_args) error_labels = ['running', 'waiting', 'failed', 'canceled'] ok_labels = ['ready', 'unpublished', 'ok'] if response['data']['status'] in error_labels: raise DynectCreateError(response['msgs']) elif response['data']['status'] in ok_labels: self._get() else: pass # Should never get here def _get(self): """Get an existing :class:`Zone` object from the DynECT System""" api_args = {} response = DynectSession.get_session().execute(self.uri, 'GET', api_args) self._build(response['data']) def _build(self, data): """Build the variables in this object by pulling out the data from data """ for key, val in data.items(): setattr(self, '_' + key, val) @property def __root_soa(self): """Return the SOA record associated with this Zone""" return self.get_all_records_by_type('SOA')[0] @property def name(self): """The name of this :class:`Zone`""" return self._name @name.setter def name(self, value): pass @property def fqdn(self): """The name of this :class:`Zone`""" return self._fqdn @fqdn.setter def fqdn(self, value): pass @property def contact(self): """The email address of the primary :class:`Contact` associated with this :class:`Zone` """ self._contact = self.__root_soa.rname return self._contact @contact.setter def contact(self, value): self.__root_soa.rname = value @property def ttl(self): """This :class:`Zone`'s default TTL""" self._ttl = self.__root_soa.ttl return self._ttl @ttl.setter def ttl(self, value): self.__root_soa.ttl = value @property def serial(self): """The current serial of this :class:`Zone`""" self._get() return self._serial @serial.setter def serial(self, value): pass @property def serial_style(self): """The current serial style of this :class:`Zone`""" self._get() return self._serial_style @serial_style.setter def serial_style(self, value): if not value in self.valid_serials: raise DynectInvalidArgumentError('serial_style', value, self.valid_serials) self.__root_soa.serial_style = value @property def status(self): """Convenience property for :class:`Zones`. If a :class:`Zones` is frozen the status will read as `'frozen'`, if the :class:`Zones` is not frozen the status will read as `'active'`. Because the API does not return information about whether or not a :class:`Zones` is frozen there will be a few cases where this status will be `None` in order to avoid guessing what the current status actually is. """ self._get() return self._status @status.setter def status(self, value): pass
[docs] def freeze(self): """Causes the zone to become frozen. Freezing a zone prevents changes to the zone until it is thawed. """ api_args = {'freeze': True} response = DynectSession.get_session().execute(self.uri, 'PUT', api_args) self._build(response['data']) if response['status'] == 'success': self._status = 'frozen'
[docs] def thaw(self): """Causes the zone to become thawed. Thawing a frozen zone allows changes to again be made to the zone. """ api_args = {'thaw': True} response = DynectSession.get_session().execute(self.uri, 'PUT', api_args) self._build(response['data']) if response['status'] == 'success': self._status = 'active'
[docs] def publish(self, notes=None): """Causes all pending changes to become part of the zone. The serial number increments based on its serial style and the data is pushed out to the nameservers. """ api_args = {'publish': True} if notes: api_args['notes'] = notes response = DynectSession.get_session().execute(self.uri, 'PUT', api_args) self._build(response['data'])
[docs] def get_notes(self, offset=None, limit=None): """Generates a report containing the Zone Notes for this :class:`Zone` :param offset: The starting point at which to retrieve the notes :param limit: The maximum number of notes to be retrieved :return: A :class:`list` of :class:`dict` containing :class:`Zone` Notes """ uri = '/ZoneNoteReport/' api_args = {'zone': self.name} if offset: api_args['offset'] = offset if limit: api_args['limit'] = limit response = DynectSession.get_session().execute(uri, 'POST', api_args) return response['data']
[docs] def add_record(self, name=None, record_type='A', *args, **kwargs): """Adds an a record with the provided name and data to this :class:`Zone` :param name: The name of the node where this record will be added :param record_type: The type of record you would like to add. Valid record_type arguments are: 'A', 'AAAA', 'CERT', 'CNAME', 'DHCID', 'DNAME', 'DNSKEY', 'DS', 'KEY', 'KX', 'LOC', 'IPSECKEY', 'MX', 'NAPTR', 'PTR', 'PX', 'NSAP', 'RP', 'NS', 'SOA', 'SPF', 'SRV', and 'TXT'. :param args: Non-keyword arguments to pass to the Record constructor :param kwargs: Keyword arguments to pass to the Record constructor """ fqdn = name + '.' + self.name + '.' if name else self.name + '.' # noinspection PyCallingNonCallable rec = RECS[record_type](self.name, fqdn, *args, **kwargs) if record_type in self.records: self.records[record_type].append(rec) else: self.records[record_type] = [rec] return rec
[docs] def add_service(self, name=None, service_type=None, *args, **kwargs): """Add the specified service type to this zone, or to a node under this zone :param name: The name of the :class:`Node` where this service will be attached to or `None` to attach it to the root :class:`Node` of this :class:`Zone` :param service_type: The type of the service you would like to create. Valid service_type arguments are: 'ActiveFailover', 'DDNS', 'DNSSEC', 'DSF', 'GSLB', 'RDNS', 'RTTM', 'HTTPRedirect' :param args: Non-keyword arguments to pass to the Record constructor :param kwargs: Keyword arguments to pass to the Record constructor """ import services.dsf constructors = {'ActiveFailover': ActiveFailover, 'DDNS': DynamicDNS, 'DNSSEC': DNSSEC, 'DSF': services.dsf.TrafficDirector, 'GSLB': GSLB, 'RDNS': ReverseDNS, 'RTTM': RTTM, 'HTTPRedirect': HTTPRedirect} fqdn = self.name + '.' if name: fqdn = name + '.' + fqdn if service_type == 'DNSSEC': # noinspection PyCallingNonCallable service = constructors[service_type](self.name, *args, **kwargs) else: # noinspection PyCallingNonCallable service = constructors[service_type](self.name, fqdn, *args, **kwargs) if service_type in self.services: self.services[service_type].append(service) else: self.services[service_type] = [service] return service
[docs] def get_all_nodes(self): """Returns a list of Node Objects for all subnodes in Zone (Excluding the Zone itself.)""" api_args = {} uri = '/NodeList/{}/'.format(self._name) response = DynectSession.get_session().execute(uri, 'GET', api_args) nodes = [Node(self._name, fqdn) for fqdn in response['data'] if fqdn != self._name] return nodes
[docs] def get_node(self, node=None): """Returns all DNS Records for that particular node :param node: The name of the Node you wish to access, or `None` to get the root :class:`Node` of this :class:`Zone` """ if node: fqdn = node + '.' + self.name + '.' else: fqdn = self.name + '.' return Node(self.name, fqdn)
[docs] def get_all_records(self): """Retrieve a list of all record resources for the specified node and zone combination as well as all records from any Base_Record below that point on the zone hierarchy :return: A :class:`List` of all the :class:`DNSRecord`'s under this :class:`Zone` """ self.records = {} uri = '/AllRecord/{}/'.format(self._name) if self.fqdn is not None: uri += '{}/'.format(self.fqdn) api_args = {'detail': 'Y'} response = DynectSession.get_session().execute(uri, 'GET', api_args) # Strip out empty record_type lists record_lists = {label: rec_list for label, rec_list in response['data'].items() if rec_list != []} records = {} for key, record_list in record_lists.items(): search = key.split('_')[0].upper() try: constructor = RECS[search] except KeyError: constructor = RECS['UNKNOWN'] list_records = [] for record in record_list: del record['zone'] fqdn = record['fqdn'] del record['fqdn'] # Unpack rdata for r_key, r_val in record['rdata'].items(): record[r_key] = r_val record['create'] = False list_records.append(constructor(self._name, fqdn, **record)) records[key] = list_records return records
[docs] def get_all_records_by_type(self, record_type): """Get a list of all :class:`DNSRecord` of type ``record_type`` which are owned by this node. :param record_type: The type of :class:`DNSRecord` you wish returned. Valid record_type arguments are: 'A', 'AAAA', 'CERT', 'CNAME', 'DHCID', 'DNAME', 'DNSKEY', 'DS', 'KEY', 'KX', 'LOC', 'IPSECKEY', 'MX', 'NAPTR', 'PTR', 'PX', 'NSAP', 'RP', 'NS', 'SOA', 'SPF', 'SRV', and 'TXT'. :return: A :class:`List` of :class:`DNSRecord`'s """ names = {'A': 'ARecord', 'AAAA': 'AAAARecord','ALIAS': 'ALIASRecord', 'CDS': 'CDSRecord', 'CDNSKEY': 'CDNSKEYRecord', 'CERT': 'CERTRecord', 'CSYNC': 'CSYNCRecord', 'CNAME': 'CNAMERecord', 'DHCID': 'DHCIDRecord', 'DNAME': 'DNAMERecord', 'DNSKEY': 'DNSKEYRecord', 'DS': 'DSRecord', 'KEY': 'KEYRecord', 'KX': 'KXRecord', 'LOC': 'LOCRecord', 'IPSECKEY': 'IPSECKEYRecord', 'MX': 'MXRecord', 'NAPTR': 'NAPTRRecord', 'PTR': 'PTRRecord', 'PX': 'PXRecord', 'NSAP': 'NSAPRecord', 'RP': 'RPRecord', 'NS': 'NSRecord', 'SOA': 'SOARecord', 'SPF': 'SPFRecord', 'SRV': 'SRVRecord', 'TLSA': 'TLSARecord', 'TXT': 'TXTRecord', 'SSHFP': 'SSHFPRecord',} constructor = RECS[record_type] uri = '/{}/{}/{}/'.format(names[record_type], self._name, self.fqdn) api_args = {'detail': 'Y'} response = DynectSession.get_session().execute(uri, 'GET', api_args) records = [] for record in response['data']: fqdn = record['fqdn'] del record['fqdn'] del record['zone'] # Unpack rdata for key, val in record['rdata'].items(): record[key] = val del record['rdata'] record['create'] = False records.append(constructor(self._name, fqdn, **record)) return records
[docs] def get_any_records(self): """Retrieve a list of all :class:`DNSRecord`'s associated with this :class:`Zone` """ if self.fqdn is None: return api_args = {'detail': 'Y'} uri = '/ANYRecord/{}/{}/'.format(self._name, self.fqdn) response = DynectSession.get_session().execute(uri, 'GET', api_args) # Strip out empty record_type lists record_lists = {label: rec_list for label, rec_list in response['data'].items() if rec_list != []} records = {} for key, record_list in record_lists.items(): search = key.split('_')[0].upper() try: constructor = RECS[search] except KeyError: constructor = RECS['UNKNOWN'] list_records = [] for record in record_list: del record['zone'] del record['fqdn'] # Unpack rdata for r_key, r_val in record['rdata'].items(): record[r_key] = r_val record['create'] = False list_records.append(constructor(self._name, self.fqdn, **record)) records[key] = list_records return records
[docs] def get_all_active_failovers(self): """Retrieve a list of all :class:`ActiveFailover` services associated with this :class:`Zone` :return: A :class:`List` of :class:`ActiveFailover` Services """ uri = '/Failover/{}/'.format(self._name) api_args = {'detail': 'Y'} response = DynectSession.get_session().execute(uri, 'GET', api_args) afos = [] for failover in response['data']: del failover['zone'] del failover['fqdn'] afos.append(ActiveFailover(self._name, self._fqdn, api=False, **failover)) return afos
[docs] def get_all_ddns(self): """Retrieve a list of all :class:`DDNS` services associated with this :class:`Zone` :return: A :class:`List` of :class:`DDNS` Services """ uri = '/DDNS/{}/'.format(self._name) api_args = {'detail': 'Y'} response = DynectSession.get_session().execute(uri, 'GET', api_args) ddnses = [] for ddns in response['data']: del ddns['zone'] del ddns['fqdn'] ddnses.append(DynamicDNS(self._name, self._fqdn, api=False, **ddns)) return ddnses
[docs] def get_all_httpredirect(self): """Retrieve a list of all :class:`HTTPRedirect` services associated with this :class:`Zone` :return: A :class:`List` of :class:`HTTPRedirect` Services """ uri = '/HTTPRedirect/{}/'.format(self._name) api_args = {'detail': 'Y'} response = DynectSession.get_session().execute(uri, 'GET', api_args) httpredirs = [] for httpredir in response['data']: del httpredir['zone'] del httpredir['fqdn'] httpredirs.append(HTTPRedirect(self._name, self._fqdn, api=False, **httpredir)) return httpredirs
[docs] def get_all_gslb(self): """Retrieve a list of all :class:`GSLB` services associated with this :class:`Zone` :return: A :class:`List` of :class:`GSLB` Services """ uri = '/GSLB/{}/'.format(self._name) api_args = {'detail': 'Y'} response = DynectSession.get_session().execute(uri, 'GET', api_args) gslbs = [] for gslb_svc in response['data']: del gslb_svc['zone'] del gslb_svc['fqdn'] gslbs.append(GSLB(self._name, self._fqdn, api=False, **gslb_svc)) return gslbs
[docs] def get_all_rdns(self): """Retrieve a list of all :class:`ReverseDNS` services associated with this :class:`Zone` :return: A :class:`List` of :class:`ReverseDNS` Services """ uri = '/IPTrack/{}/'.format(self._name) api_args = {'detail': 'Y'} response = DynectSession.get_session().execute(uri, 'GET', api_args) rdnses = [] for rdns in response['data']: del rdns['zone'] del rdns['fqdn'] rdnses.append(ReverseDNS(self._name, self._fqdn, api=False, **rdns)) return rdnses
[docs] def get_all_rttm(self): """Retrieve a list of all :class:`RTTM` services associated with this :class:`Zone` :return: A :class:`List` of :class:`RTTM` Services """ uri = '/RTTM/{}/'.format(self._name) api_args = {'detail': 'Y'} response = DynectSession.get_session().execute(uri, 'GET', api_args) rttms = [] for rttm_svc in response['data']: del rttm_svc['zone'] del rttm_svc['fqdn'] rttms.append(RTTM(self._name, self._fqdn, api=False, **rttm_svc)) return rttms
[docs] def get_qps(self, start_ts, end_ts=None, breakdown=None, hosts=None, rrecs=None): """Generates a report with information about Queries Per Second (QPS) for this zone :param start_ts: datetime.datetime instance identifying point in time for the QPS report :param end_ts: datetime.datetime instance indicating the end of the data range for the report. Defaults to datetime.datetime.now() :param breakdown: By default, most data is aggregated together. Valid values ('hosts', 'rrecs', 'zones'). :param hosts: List of hosts to include in the report. :param rrecs: List of record types to include in report. :return: A :class:`str` with CSV data """ end_ts = end_ts or datetime.now() api_args = {'start_ts': unix_date(start_ts), 'end_ts': unix_date(end_ts), 'zones': [self.name]} if breakdown is not None: api_args['breakdown'] = breakdown if hosts is not None: api_args['hosts'] = hosts if rrecs is not None: api_args['rrecs'] = rrecs response = DynectSession.get_session().execute('/QPSReport/', 'POST', api_args) return response['data']
[docs] def delete(self): """Delete this :class:`Zone` and perform nessecary cleanups""" api_args = {} DynectSession.get_session().execute(self.uri, 'DELETE', api_args)
def __eq__(self, other): """Equivalence operations for easily pulling a :class:`Zone` out of a list of :class:`Zone` objects """ if isinstance(other, str): return other == self._name elif isinstance(other, Zone): return other.name == self._name return False def __ne__(self, other): """Non-Equivalence operator""" return not self.__eq__(other) def __str__(self): """str override""" return force_unicode('<Zone>: {}').format(self._name) __repr__ = __unicode__ = __str__ def __bytes__(self): """bytes override""" return bytes(self.__str__())
[docs]class SecondaryZone(object): """A class representing DynECT Secondary zones"""
[docs] def __init__(self, zone, *args, **kwargs): """Create a :class:`SecondaryZone` object :param zone: The name of this secondary zone :param masters: A list of IPv4 or IPv6 addresses of the master nameserver(s) for this zone. :param contact_nickname: Name of the :class:`Contact` that will receive notifications for this zone :param tsig_key_name: Name of the TSIG key that will be used to sign transfer requests to this zone's master """ super(SecondaryZone, self).__init__() self._zone = self._name = zone self.uri = '/Secondary/{}/'.format(self._zone) self._masters = self._contact_nickname = self._tsig_key_name = None if 'api' in kwargs: del kwargs['api'] for key, val in kwargs.items(): setattr(self, '_' + key, val) elif len(args) == 0 and len(kwargs) == 0: self._get() else: self._post(*args, **kwargs)
def _get(self): """Get a :class:`SecondaryZone` object from the DynECT System""" api_args = {} response = DynectSession.get_session().execute(self.uri, 'GET', api_args) for key, val in response['data'].items(): setattr(self, '_' + key, val) def _post(self, masters, contact_nickname=None, tsig_key_name=None): """Create a new :class:`SecondaryZone` object on the DynECT System""" self._masters = masters self._contact_nickname = contact_nickname self._tsig_key_name = tsig_key_name api_args = {'masters': self._masters} if contact_nickname: api_args['contact_nickname'] = self._contact_nickname if tsig_key_name: api_args['tsig_key_name'] = self._tsig_key_name response = DynectSession.get_session().execute(self.uri, 'POST', api_args) for key, val in response['data'].items(): setattr(self, '_' + key, val) @property def zone(self): """The name of this :class:`SecondaryZone`""" return self._zone @zone.setter def zone(self, value): pass @property def masters(self): """A list of IPv4 or IPv6 addresses of the master nameserver(s) for this zone. """ self._get() return self._masters @masters.setter def masters(self, value): self._masters = value api_args = {'masters': self._masters} response = DynectSession.get_session().execute(self.uri, 'PUT', api_args) for key, val in response['data'].items(): setattr(self, '_' + key, val) @property def contact_nickname(self): """Name of the :class:`Contact` that will receive notifications for this zone """ self._get() return self._contact_nickname @contact_nickname.setter def contact_nickname(self, value): self._contact_nickname = value api_args = {'contact_nickname': self._contact_nickname} response = DynectSession.get_session().execute(self.uri, 'PUT', api_args) for key, val in response['data'].items(): setattr(self, '_' + key, val) @property def tsig_key_name(self): """Name of the TSIG key that will be used to sign transfer requests to this zone's master """ self._get() return self._tsig_key_name @tsig_key_name.setter def tsig_key_name(self, value): self._tsig_key_name = value api_args = {'tsig_key_name': self._tsig_key_name} response = DynectSession.get_session().execute(self.uri, 'PUT', api_args) for key, val in response['data'].items(): setattr(self, '_' + key, val)
[docs] def activate(self): """Activates this secondary zone""" api_args = {'activate': True} response = DynectSession.get_session().execute(self.uri, 'PUT', api_args) for key, val in response['data'].items(): setattr(self, '_' + key, val)
[docs] def deactivate(self): """Deactivates this secondary zone""" api_args = {'deactivate': True} response = DynectSession.get_session().execute(self.uri, 'PUT', api_args) for key, val in response['data'].items(): setattr(self, '_' + key, val)
[docs] def retransfer(self): """Retransfers this secondary zone from its original provider into Dyn's Managed DNS """ api_args = {'retransfer': True} response = DynectSession.get_session().execute(self.uri, 'PUT', api_args) for key, val in response['data'].items(): setattr(self, '_' + key, val)
[docs] def delete(self): """Delete this :class:`SecondaryZone`""" api_args = {} uri = '/Zone/{}/'.format(self._zone) DynectSession.get_session().execute(uri, 'DELETE', api_args)
@property def active(self): """Reports the status of :class:`SecondaryZone` Y, L or N""" self._get() return self._active @property def serial(self): """Reports the serial of :class:`SecondaryZone`""" api_args = {} uri = '/Zone/{}/'.format(self._zone) response = DynectSession.get_session().execute(uri,'GET', api_args) for key, val in response['data'].items(): setattr(self, '_' + key, val) return self._serial def __str__(self): """str override""" return force_unicode('<SecondaryZone>: {}').format(self._zone) __repr__ = __unicode__ = __str__ def __bytes__(self): """bytes override""" return bytes(self.__str__())
[docs]class Node(object): """Node object. Represents a valid fqdn node within a zone. It should be noted that simply creating a :class:`Node` object does not actually create anything on the DynECT System. The only way to actively create a :class:`Node` on the DynECT System is by attaching either a record or a service to it. """
[docs] def __init__(self, zone, fqdn=None): """Create a :class:`Node` object :param zone: name of the zone that this Node belongs to :param fqdn: the fully qualified domain name of this zone """ super(Node, self).__init__() self.zone = zone self.fqdn = fqdn or self.zone + '.' self.records = self.my_records = {} self.services = []
[docs] def add_record(self, record_type='A', *args, **kwargs): """Adds an a record with the provided data to this :class:`Node` :param record_type: The type of record you would like to add. Valid record_type arguments are: 'A', 'AAAA', 'CERT', 'CNAME', 'DHCID', 'DNAME', 'DNSKEY', 'DS', 'KEY', 'KX', 'LOC', 'IPSECKEY', 'MX', 'NAPTR', 'PTR', 'PX', 'NSAP', 'RP', 'NS', 'SOA', 'SPF', 'SRV', and 'TXT'. :param args: Non-keyword arguments to pass to the Record constructor :param kwargs: Keyword arguments to pass to the Record constructor """ # noinspection PyCallingNonCallable rec = RECS[record_type](self.zone, self.fqdn, *args, **kwargs) if record_type in self.records: self.records[record_type].append(rec) else: self.records[record_type] = [rec] return rec
[docs] def add_service(self, service_type=None, *args, **kwargs): """Add the specified service type to this :class:`Node` :param service_type: The type of the service you would like to create. Valid service_type arguments are: 'ActiveFailover', 'DDNS', 'DNSSEC', 'DSF', 'GSLB', 'RDNS', 'RTTM', 'HTTPRedirect' :param args: Non-keyword arguments to pass to the Record constructor :param kwargs: Keyword arguments to pass to the Record constructor """ constructors = {'ActiveFailover': ActiveFailover, 'DDNS': DynamicDNS, 'DNSSEC': DNSSEC, 'DSF': TrafficDirector, 'GSLB': GSLB, 'RDNS': ReverseDNS, 'RTTM': RTTM, 'HTTPRedirect': HTTPRedirect} # noinspection PyCallingNonCallable service = constructors[service_type](self.zone, self.fqdn, *args, **kwargs) self.services.append(service) return service
[docs] def get_all_records(self): """Retrieve a list of all record resources for the specified node and zone combination as well as all records from any Base_Record below that point on the zone hierarchy """ self.records = {} uri = '/AllRecord/{}/'.format(self.zone) if self.fqdn is not None: uri += '{}/'.format(self.fqdn) api_args = {'detail': 'Y'} response = DynectSession.get_session().execute(uri, 'GET', api_args) # Strip out empty record_type lists record_lists = {label: rec_list for label, rec_list in response['data'].items() if rec_list != []} records = {} for key, record_list in record_lists.items(): search = key.split('_')[0].upper() try: constructor = RECS[search] except KeyError: constructor = RECS['UNKNOWN'] list_records = [] for record in record_list: del record['zone'] fqdn = record['fqdn'] del record['fqdn'] # Unpack rdata for r_key, r_val in record['rdata'].items(): record[r_key] = r_val record['create'] = False list_records.append(constructor(self.zone, fqdn, **record)) records[key] = list_records return records
[docs] def get_all_records_by_type(self, record_type): """Get a list of all :class:`DNSRecord` of type ``record_type`` which are owned by this node. :param record_type: The type of :class:`DNSRecord` you wish returned. Valid record_type arguments are: 'A', 'AAAA', 'CERT', 'CNAME', 'DHCID', 'DNAME', 'DNSKEY', 'DS', 'KEY', 'KX', 'LOC', 'IPSECKEY', 'MX', 'NAPTR', 'PTR', 'PX', 'NSAP', 'RP', 'NS', 'SOA', 'SPF', 'SRV', and 'TXT'. :return: A list of :class:`DNSRecord`'s """ names = {'A': 'ARecord', 'AAAA': 'AAAARecord', 'CERT': 'CERTRecord', 'CNAME': 'CNAMERecord', 'DHCID': 'DHCIDRecord', 'DNAME': 'DNAMERecord', 'DNSKEY': 'DNSKEYRecord', 'DS': 'DSRecord', 'KEY': 'KEYRecord', 'KX': 'KXRecord', 'LOC': 'LOCRecord', 'IPSECKEY': 'IPSECKEYRecord', 'MX': 'MXRecord', 'NAPTR': 'NAPTRRecord', 'PTR': 'PTRRecord', 'PX': 'PXRecord', 'NSAP': 'NSAPRecord', 'RP': 'RPRecord', 'NS': 'NSRecord', 'SOA': 'SOARecord', 'SPF': 'SPFRecord', 'SRV': 'SRVRecord', 'TLSA': 'TLSARecord', 'TXT': 'TXTRecord', 'SSHFP': 'SSHFPRecord', 'ALIAS': 'ALIASRecord'} constructor = RECS[record_type] uri = '/{}/{}/{}/'.format(names[record_type], self.zone, self.fqdn) api_args = {'detail': 'Y'} response = DynectSession.get_session().execute(uri, 'GET', api_args) records = [] for record in response['data']: fqdn = record['fqdn'] del record['fqdn'] del record['zone'] # Unpack rdata for key, val in record['rdata'].items(): record[key] = val del record['rdata'] record['create'] = False records.append(constructor(self.zone, fqdn, **record)) return records
[docs] def get_any_records(self): """Retrieve a list of all recs""" if self.fqdn is None: return api_args = {'detail': 'Y'} uri = '/ANYRecord/{}/{}/'.format(self.zone, self.fqdn) response = DynectSession.get_session().execute(uri, 'GET', api_args) # Strip out empty record_type lists record_lists = {label: rec_list for label, rec_list in response['data'].items() if rec_list != []} records = {} for key, record_list in record_lists.items(): search = key.split('_')[0].upper() try: constructor = RECS[search] except KeyError: constructor = RECS['UNKNOWN'] list_records = [] for record in record_list: del record['zone'] del record['fqdn'] # Unpack rdata for r_key, r_val in record['rdata'].items(): record[r_key] = r_val record['create'] = False list_records.append(constructor(self.zone, self.fqdn, **record)) records[key] = list_records return records
[docs] def delete(self): """Delete this node, any records within this node, and any nodes underneath this node """ uri = '/Node/{}/{}'.format(self.zone, self.fqdn) DynectSession.get_session().execute(uri, 'DELETE', {})
def __str__(self): """str override""" return force_unicode('<Node>: {}').format(self.fqdn) __repr__ = __unicode__ = __str__ def __bytes__(self): """bytes override""" return bytes(self.__str__())
[docs]class TSIG(object): """A class representing DynECT TSIG Records"""
[docs] def __init__(self, name, *args, **kwargs): """Create a :class:`TSIG` object :param name: The name of the TSIG key for :class:`TSIG` object :param algorithm: Algorithm used for :class:`TSIG` object. Valid options: hmac-sha1, hmac-md5, hmac-sha224,hmac-sha256, hmac-sha384, hmac-sha512 :param secret: Secret key used by :class:`TSIG` object """ self._name = name self.uri = '/TSIGKey/{}/'.format(self._name) self._secret = None self._algorithm = None if len(args) == 0 and len(kwargs) == 0: self._get() else: self._post(*args, **kwargs)
def _get(self): """Get a :class:`TSIG` object from the DynECT System""" api_args = {'name' : self._name} response = DynectSession.get_session().execute(self.uri, 'GET', api_args) for key, val in response['data'].items(): setattr(self, '_' + key, val) def _post(self, *args, **kwargs): """Create a new :class:`TSIG` object on the DynECT System""" api_args = {'name':self._name, 'secret': kwargs['secret'], 'algorithm': kwargs['algorithm']} response = DynectSession.get_session().execute(self.uri, 'POST', api_args) for key, val in response['data'].items(): setattr(self, '_' + key, val) @property def secret(self): """Gets Secret key of :class:`TSIG` object""" self._get() return self._secret @secret.setter def secret(self, secret): """ Sets secret key of :class:`TSIG` object :param secret: key """ api_args = {'name': self._name, 'secret': secret} response = DynectSession.get_session().execute(self.uri, 'PUT', api_args) for key, val in response['data'].items(): setattr(self, '_' + key, val) @property def algorithm(self): """Gets Algorithm of :class:`TSIG` object. """ self._get() return self._algorithm @algorithm.setter def algorithm(self, algorithm): """ Sets Algorithm of :class:`TSIG` object. :param algorithm: hmac-sha1, hmac-md5, hmac-sha224, hmac-sha256, hmac-sha384, hmac-sha512 """ api_args = {'name': self._name, 'algorithm': algorithm} response = DynectSession.get_session().execute(self.uri, 'PUT', api_args) for key, val in response['data'].items(): setattr(self, '_' + key, val) @property def name(self): """Gets name of TSIG Key in :class:`TSIG`""" return self._name
[docs] def delete(self): api_args = {} DynectSession.get_session().execute(self.uri, 'DELETE', api_args)