Source code for omgeo.services.base

import copy
from datetime import datetime
from json import loads
import logging
from traceback import format_exc
from xml.dom import minidom

import requests


logger = logging.getLogger(__name__)


class UpstreamResponseInfo():
    """
    Description of API call result from an upstream provider.
    For cleaning and consistency, set attributes using the given methods
    (the constructor will automatically use these methods, as well).
    """

    def set_response_code(self, response_code):
        if response_code is not None and type(response_code) is not int:
            raise Exception('response_code must be an integer.')
        else:
            self.response_code = response_code

    def set_response_time(self, response_time):
        if response_time is not None:
            if type(response_time) not in (int, float):
                raise Exception('If response_time is provided,'
                                'it must be an integer or float.')
            elif response_time < 0:
                raise Exception('response_time cannot be negative.')
            self.response_time = int(round(response_time))
        else:
            self.response_time = None

    def set_success(self, success):
        if type(success) is not bool:
            raise ('success must be a boolean value.')
        else:
            self.success = success

    def __init__(self, geoservice, processed_pq, response_code=None, response_time=None,
                 success=True, errors=None):
        """
        :arg str geoservice: name of the upstream provider used (required)
        :arg PlaceQuery processed_pq: Processed PlaceQuery object (required)
        :arg response_code: HTTP response code (default None)
        :arg response_time: time in milliseconds that it takes to get a
                            response (default None)
        :arg bool success: indicates if the API call was successful. A 200 response
                           with no candidates is still considered a success.
                           (default True)
        :arg list errors: human-readable error descriptions
        """

        if errors is None:
            errors = []
        self.geoservice = geoservice
        self.processed_pq = processed_pq
        self.set_response_code(response_code)
        self.set_response_time(response_time)
        self.set_success(success)
        self.errors = errors

    def __repr__(self):
        if self.response_code is None:
            repr_ = '%s %sms' % (self.geoservice, self.response_time)
        else:
            repr_ = '%s %s %sms' % (self.geoservice, self.response_code, self.response_time)
        return '<%s>' % repr_


class GeocodeService():
    """
    Base class for geocoding API wrappers
    """

    #: API base endpoint URL to use
    _endpoint = ''

    def __init__(self, preprocessors=None, postprocessors=None,
                 settings=None):
        """
        Overwrite _preprocessors, _postprocessors, and _settings
        if they are set.
        """

        #: Preprocessor classes to apply to the given PlaceQuery, usually
        #: overwritten in subclass.
        self._preprocessors = []

        #: Postprocessor classes to apply to the list of Candidates obtained,
        #: usually overwritten in subclass.
        self._postprocessors = []

        #: Settings for this geocoder, usually overwritten by subclass
        self._settings = {}

        if preprocessors is not None:
            self._preprocessors = preprocessors
        if postprocessors is not None:
            self._postprocessors = postprocessors
        if settings is not None:
            for key in settings:
                self._settings[key] = settings[key]

    def _settings_checker(self, required_settings=None, accept_none=True):
        """
        Take a list of required _settings dictionary keys
        and make sure they are set. This can be added to a custom
        constructor in a subclass and tested to see if it returns ``True``.

        :arg list required_settings: A list of required keys to look for.
        :arg bool accept_none: Boolean set to True if None is an acceptable
                               setting. Set to False if None is not an
                               acceptable setting.

        :returns: * bool ``True`` if all required settings exist, OR
                  * str <key name> for the first key missing from _settings.
        """
        if required_settings is not None:
            for keyname in required_settings:
                if keyname not in self._settings:
                    return keyname
                if accept_none is False and self._settings[keyname] is None:
                    return keyname
        return True

    def _get_response(self, endpoint, query, is_post=False):
        """Returns response or False in event of failure"""
        timeout_secs = self._settings.get('timeout', 10)
        headers = self._settings.get('request_headers', {})
        try:
            if is_post:
                response = requests.post(
                    endpoint, data=query, headers=headers, timeout=timeout_secs)
            else:
                response = requests.get(
                    endpoint, params=query, headers=headers, timeout=timeout_secs)
        except requests.exceptions.Timeout as e:
            raise Exception(
                'API request timed out after %s seconds.' % timeout_secs)
        except Exception as e:
            raise e

        if response.status_code != 200:
            raise Exception('Received status code %s from %s. Content is:\n%s'
                            % (response.status_code,
                               self.get_service_name(),
                               response.text))
        return response

    def _get_json_obj(self, endpoint, query, is_post=False):
        """
        Return False if connection could not be made.
        Otherwise, return a response object from JSON.
        """
        response = self._get_response(endpoint, query, is_post=is_post)
        content = response.text
        try:
            return loads(content)
        except ValueError:
            raise Exception('Could not decode content to JSON:\n%s'
                            % self.__class__.__name__, content)

    def _get_xml_doc(self, endpoint, query, is_post=False):
        """
        Return False if connection could not be made.
        Otherwise, return a minidom Document.
        """
        response = self._get_response(endpoint, query, is_post=is_post)
        return minidom.parse(response.text)

    def _geocode(self, place_query):
        """
        Given a (preprocessed) PlaceQuery object,
        return a list of of Candidate objects.
        """
        raise NotImplementedError(
            'GeocodeService subclasses must implement _geocode().')

    def geocode(self, pq):
        """
        :arg PlaceQuery pq: PlaceQuery instance
        :rtype: tuple
        :returns: post-processed list of Candidate objects and
                  and UpstreamResponseInfo object if an API call was made.

                  Examples:

                  Preprocessor throws out request::

                      ([], None)

                  Postprocessor throws out some candidates::

                      ([<Candidate obj>, <Candidate obj>, ...], <UpstreamResponseInfo obj>)

                  Postprocessor throws out all candidates::

                      ([], <UpstreamResponseInfo obj>)

                  An exception occurs while making the API call::

                      ([], <UpstreamResponseInfo obj>)

        """
        processed_pq = copy.copy(pq)
        for p in self._preprocessors:
            processed_pq = p.process(processed_pq)
            if not processed_pq:
                return [], None
        upstream_response_info = UpstreamResponseInfo(self.get_service_name(),
                                                      processed_pq)
        try:
            start = datetime.now()
            candidates = self._geocode(processed_pq)
            end = datetime.now()
            response_time_sec = (end - start).total_seconds()
            upstream_response_info.set_response_time(1000 * response_time_sec)
        except Exception:
            upstream_response_info.set_success(False)
            upstream_response_info.errors.append(format_exc())
            return [], upstream_response_info
        if len(candidates) > 0:
            for p in self._postprocessors:  # apply universal candidate postprocessing
                candidates = p.process(candidates)  # merge lists
        return candidates, upstream_response_info

    def get_service_name(self):
        return self.__class__.__name__