diff --git a/consul/base.py b/consul/base.py index 7159fb6..f69a4f6 100755 --- a/consul/base.py +++ b/consul/base.py @@ -1,10 +1,14 @@ +from __future__ import annotations + import abc import base64 import collections +import enum import json import logging import os import re +import threading import warnings import six @@ -299,6 +303,192 @@ def weights(cls, passing, warning): return {'passing': passing, 'warning': warning} +class ConsistencyMode(enum.Enum): + """ + Most of the read query endpoints support multiple levels of consistency. + Since no policy will suit all clients' needs, + these consistency modes allow the user to have the ultimate say in how + to balance the trade-offs inherent in a distributed system. + The three read modes are: + + *DEFAULT* - If not specified, the default is strongly consistent in almost all cases. + However, there is a small window in which a new leader may be elected during which + he old leader may service stale values. The trade-off is fast reads but potentially stale values. + The condition resulting in stale reads is hard to trigger, and most clients should + not need to worry about this case. Also, note that this race condition only applies to reads, not writes. + + *CONSISTENT* - This mode is strongly consistent without caveats. + It requires that a leader verify with a quorum of peers that it is still leader. + This introduces an additional round-trip to all server nodes. The trade-off is increased + latency due to an extra round trip. Most clients should not use + this unless they cannot tolerate a stale read. + + *STALE* - This mode allows any server to service the read regardless of whether it is the leader. + This means reads can be arbitrarily stale; however, results are generally + consistent to within 50 milliseconds of the leader. The trade-off is very fast and scalable reads + with a higher likelihood of stale values. Since this mode allows reads without a leader, a cluster + that is unavailable will still be able to respond to queries. + """ + + DEFAULT = 'default' + CONSISTENT = 'consistent' + STALE = 'stale' + + +class ConsulCacheBase(metaclass=abc.ABCMeta): + """ + Base consul cache implements, that support blocking query. + + *cache* is a dict that consist cache values by key. + + *callbacks* is a list of methods, that will be invoked when cache updated + + *watch_seconds* is the maximum duration for the blocking request. + + *index* is the current Consul index, suitable for making subsequent + calls to wait for changes since this query was last run. + """ + + def __init__(self, watch_seconds: str): + self.cache = dict() + self.callbacks = [] + self.watch_seconds = watch_seconds + self.index = None + self._running = True + self._cache_thread = threading.Thread( + target=self._update_cache, + name='update_consul_cache_thread', + daemon=True) + + def start(self): + self._cache_thread.start() + + def stop(self): + self._running = False + + def add_listener(self, callback, trigger_current=False): + self.callbacks.append(callback) + log.debug(f'Registered callback: {self.callbacks}') + if trigger_current: + for key, value in self.cache.items(): + callback(key, value) + + @abc.abstractmethod + def _update_cache(cls): + pass + + +class HealthCache(ConsulCacheBase): + """ + Consul health service cache. + + *service* is a service name for getting healths info. + + *passing* specifies that the server should return only nodes + with all checks in the passing state. This can be used to avoid + additional filtering on the client side. + """ + + def __init__(self, + health_client: Consul.Health, + watch_seconds: str, + service: str, + passing: bool): + super().__init__(watch_seconds) + self.service = service + self.health_client = health_client + self.passing = passing + + def _update_cache(self): + while self._running: + try: + params = { + 'service': self.service, + 'passing': self.passing, + 'index': self.index, + 'wait': self.watch_seconds + } + log.debug(f'Param for health query: {params}') + self.index, values = self.health_client.service(**params) + old_cache = self.cache + self.cache = {self.service: values} + if self.callbacks and self._running: + for key, old_value in old_cache.items(): + new_value = self.cache.get(key, None) + for callback in self.callbacks: + callback(key, new_value) + except ConsulException as e: + log.error(f'Some problem with update consul cache: {e}') + + +class KVCache(ConsulCacheBase): + """ + Consul key-value cache. + + *path* is a key for getting value + + *consistency_mode* sets the consistency mode to use by default for all reads + that support the consistency option. It's still possible to override + this by passing explicitly for a given request. *consistency* can be + either 'default', 'consistent' or 'stale'. + + *total_timeout* is a ttl of HTTP session. Should be more than *watch_seconds* + + *cache_initial_warmup_timeout* is a ttl of HTTP session for initialize cache. + May be None, will use *total_timeout* insted + """ + + def __init__(self, + kv_client: Consul.KV, + watch_seconds: str, + path: str, + total_timeout: int, + consistency_mode: ConsistencyMode, + cache_initial_warmup_timeout=None): + super().__init__(watch_seconds) + self.kv_client = kv_client + self.path = path + self.consistency_mode = consistency_mode.value + self.total_timeout = total_timeout + self.cache_initial_warmup_timeout = cache_initial_warmup_timeout + self.cache = {self.path: kv_client.get( + key=path, + total_timeout=self._get_warmup_timeout() + )[1]} + + def _get_warmup_timeout(self): + if self.cache_initial_warmup_timeout: + return self.cache_initial_warmup_timeout + return self.total_timeout + + def get_value(self): + return self.cache.get(self.path, None) + + def _update_cache(self): + while self._running: + try: + params = { + 'key': self.path, + 'index': self.index, + 'wait': self.watch_seconds, + 'total_timeout': self.total_timeout, + 'consistency': self.consistency_mode + } + log.debug(f'Param for kv query: {params}') + self.index, values = self.kv_client.get(**params) + old_cache = self.cache + self.cache = {self.path: values} + if self.callbacks and self._running: + for key, new_value in self.cache.items(): + old_value = old_cache.get(key, None) + if old_value != new_value: + log.debug(f'Value was changed for key={key}. old: {old_value} new: {new_value}') + for callback in self.callbacks: + callback(key, new_value) + except ConsulException as e: + log.error(f'Some problem with update consul cache: {e}') + + class HTTPClient(six.with_metaclass(abc.ABCMeta, object)): def __init__(self, host='127.0.0.1', port=8500, scheme='http', verify=True, cert=None, timeout=None):