Source code for dyn.tm.session

# -*- coding: utf-8 -*-
"""This module implements an interface to a DynECT REST Session. It provides
easy access to all other functionality within the dynect library via
methods that return various types of DynECT objects which will provide their
own respective functionality.
"""
# API Libs
from ..core import SessionEngine
from .errors import *
from ..compat import force_unicode
from ..encrypt import AESCipher


[docs]class DynectSession(SessionEngine): """Base object representing a DynectSession Session""" __metakey__ = 'bf7886ea-c61d-40df-8c7b-4241ebed0544' _valid_methods = ('DELETE', 'GET', 'POST', 'PUT') uri_root = '/REST'
[docs] def __init__(self, customer, username, password, host='api.dynect.net', port=443, ssl=True, api_version='current', auto_auth=True, key=None, history=False): """Initialize a Dynect Rest Session object and store the provided credentials :param host: DynECT API server address :param port: Port to connect to DynECT API server :param ssl: Enable SSL :param api_version: version of the api to use :param customer: DynECT customer name :param username: DynECT Customer's username :param password: User's password :param auto_auth: declare whether or not to automatically log in :param key: A valid AES-256 password encryption key to be used when encrypting your password :param history: A boolean flag determining whether or not you would like to store a record of all API calls made to review later """ super(DynectSession, self).__init__(host, port, ssl, history) self.__cipher = AESCipher(key) self.extra_headers = {'API-Version': api_version} self.customer = customer self.username = username self.password = self.__cipher.encrypt(password) self.connect() if auto_auth: self.authenticate()
def __enter__(self): """Yield this instance as a reference for use within the context block """ yield self def __exit__(self, exc_type, exc_val, exc_tb): """We don't particularly care about any exceptions that occured within the context block, so ignore them and force a log out, which handles closing the current session. """ self.log_out() def _handle_error(self, uri, method, raw_args): """Handle the processing of a connection error with the api""" # Our token is no longer valid because our session was killed self._token = None # Need to force a re-connect on next execute self._conn.close() self._conn.connect() # Need to get a new Session token self.execute('/REST/Session/', 'POST', self.__auth_data) # Then try the current call again and Specify final as true so # if we fail again we can raise the actual error return self.execute(uri, method, raw_args, final=True) def _process_response(self, response, method, final=False): """Process an API response for failure, incomplete, or success and throw any appropriate errors :param response: the JSON response from the request being processed :param method: the HTTP method :param final: boolean flag representing whether or not to continue polling """ status = response['status'] self.logger.debug(status) if status == 'success': return response elif status == 'failure': msgs = response['msgs'] if method == 'POST' and 'login' in msgs[0]['INFO']: raise DynectAuthError(response['msgs']) if method == 'POST': raise DynectCreateError(response['msgs']) elif method == 'GET': raise DynectGetError(response['msgs']) elif method == 'PUT': raise DynectUpdateError(response['msgs']) else: raise DynectDeleteError(response['msgs']) else: # Status was incomplete job_id = response['job_id'] if not final: response = self.wait_for_job_to_complete(job_id) return self._process_response(response, method, True) else: raise DynectQueryTimeout({})
[docs] def update_password(self, new_password): """Update the current users password :param new_password: The new password to use """ uri = '/Password/' api_args = {'password': new_password} self.execute(uri, 'PUT', api_args) self.password = self.__cipher.encrypt(new_password)
[docs] def user_permissions_report(self, user_name=None): """Returns information regarding the requested user's permission access :param user_name: The user whose permissions will be returned. Defaults to the current user """ api_args = dict() api_args['user_name'] = user_name or self.username uri = '/UserPermissionReport/' response = self.execute(uri, 'POST', api_args) permissions = [] for key, val in response['data'].items(): if key == 'allowed': for permission in val: permissions.append(permission['name']) return permissions
@property def permissions(self): """Permissions of the currently logged in user""" if self._permissions is None: self._permissions = self.user_permissions_report() return self._permissions @permissions.setter def permissions(self, value): pass
[docs] def authenticate(self): """Authenticate to the DynectSession service with the provided credentials """ api_args = {'customer_name': self.customer, 'user_name': self.username, 'password': self.__cipher.decrypt(self.password)} try: response = self.execute('/Session/', 'POST', api_args) except IOError: raise DynectAuthError('Unable to access the API host') if response['status'] != 'success': self.logger.error('An error was encountered authenticating to Dyn') raise DynectAuthError(response['msgs']) else: self.logger.info('DynectSession Authentication Successful')
[docs] def log_out(self): """Log the current session out from the DynECT API system""" self.execute('/Session/', 'DELETE', {}) self.close_session()
@property def __auth_data(self): """A dict of the authdata required to authenticate as this user""" return {'customer_name': self.customer, 'user_name': self.username, 'password': self.__cipher.decrypt(self.password)} def __str__(self): """str override""" header = super(DynectSession, self).__str__() return header + force_unicode(': {}, {}').format(self.customer, self.username)