Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 190 additions & 0 deletions consul/base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from __future__ import annotations

import abc
import base64
import collections
import enum
import json
import logging
import os
import re
import threading
import warnings

import six
Expand Down Expand Up @@ -299,6 +303,192 @@ def weights(cls, passing, warning):
return {'passing': passing, 'warning': warning}


class ConsistencyMode(enum.Enum):
"""
Most of the read query endpoints support multiple levels of consistency.
Since no policy will suit all clients' needs,
these consistency modes allow the user to have the ultimate say in how
to balance the trade-offs inherent in a distributed system.
The three read modes are:

*DEFAULT* - If not specified, the default is strongly consistent in almost all cases.
However, there is a small window in which a new leader may be elected during which
he old leader may service stale values. The trade-off is fast reads but potentially stale values.
The condition resulting in stale reads is hard to trigger, and most clients should
not need to worry about this case. Also, note that this race condition only applies to reads, not writes.

*CONSISTENT* - This mode is strongly consistent without caveats.
It requires that a leader verify with a quorum of peers that it is still leader.
This introduces an additional round-trip to all server nodes. The trade-off is increased
latency due to an extra round trip. Most clients should not use
this unless they cannot tolerate a stale read.

*STALE* - This mode allows any server to service the read regardless of whether it is the leader.
This means reads can be arbitrarily stale; however, results are generally
consistent to within 50 milliseconds of the leader. The trade-off is very fast and scalable reads
with a higher likelihood of stale values. Since this mode allows reads without a leader, a cluster
that is unavailable will still be able to respond to queries.
"""

DEFAULT = 'default'
CONSISTENT = 'consistent'
STALE = 'stale'


class ConsulCacheBase(metaclass=abc.ABCMeta):
"""
Base consul cache implements, that support blocking query.

*cache* is a dict that consist cache values by key.

*callbacks* is a list of methods, that will be invoked when cache updated

*watch_seconds* is the maximum duration for the blocking request.

*index* is the current Consul index, suitable for making subsequent
calls to wait for changes since this query was last run.
"""

def __init__(self, watch_seconds: str):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А watch_seconds точно должен быть str? Выглядит несколько нелогичным

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Да, это консула. Он в таком виде : 10s, 10m, 10ms передается

self.cache = dict()
self.callbacks = []
self.watch_seconds = watch_seconds

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

для watch_seconds как параметра этот Cache-объекта имя бы какое-нибудь другое дать, просто щас непотяно что оно значит.
По факту же это сколько максимально по времени держать long-pooling соединение, в случае если ничего не изменится в консуле за это время?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тут все так, это дань унификации. У нас в клиенте джавовом и конфигах он так же называется. Мне кажется лучше оставить так, чтобы разъездов меньше было. Если переименовывать, то по хорошему и в конфигах и в джавовом клиенте. Что думаешь?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

думаю, надо оставить

self.index = None
self._running = True
self._cache_thread = threading.Thread(
target=self._update_cache,
name='update_consul_cache_thread',
daemon=True)

def start(self):
self._cache_thread.start()

def stop(self):
self._running = False

def add_listener(self, callback, trigger_current=False):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Возможно, правильнее чтобы callback не только value принимал но и key

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А зачем, кстати, там в cache лежит dict всегда из 1-го элемента?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Прав, для текущих кешей это не нужно, но кажется, что при расширении могут быть разные варианты наполнения кеша с ключами. И да, тогда имеет смысл key тоже передавать в callback.

self.callbacks.append(callback)
log.debug(f'Registered callback: {self.callbacks}')
if trigger_current:
for key, value in self.cache.items():
callback(key, value)

@abc.abstractmethod
def _update_cache(cls):
pass


class HealthCache(ConsulCacheBase):
"""
Consul health service cache.

*service* is a service name for getting healths info.

*passing* specifies that the server should return only nodes
with all checks in the passing state. This can be used to avoid
additional filtering on the client side.
"""

def __init__(self,
health_client: Consul.Health,
watch_seconds: str,
service: str,
passing: bool):
super().__init__(watch_seconds)
self.service = service
self.health_client = health_client
self.passing = passing

def _update_cache(self):
while self._running:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Красивее было бы сделать на Event(), но так как тут всего один тред, то не принципиально. Кстати, если тред и должен будет остаться один, то можно подумать о том, чтобы просто унаследоваться от threading.Thread и переопределить run

try:
params = {
'service': self.service,
'passing': self.passing,
'index': self.index,
'wait': self.watch_seconds
}
log.debug(f'Param for health query: {params}')
self.index, values = self.health_client.service(**params)
old_cache = self.cache
self.cache = {self.service: values}
if self.callbacks and self._running:
for key, old_value in old_cache.items():
new_value = self.cache.get(key, None)
for callback in self.callbacks:
callback(key, new_value)
except ConsulException as e:
log.error(f'Some problem with update consul cache: {e}')


class KVCache(ConsulCacheBase):
"""
Consul key-value cache.

*path* is a key for getting value

*consistency_mode* sets the consistency mode to use by default for all reads
that support the consistency option. It's still possible to override
this by passing explicitly for a given request. *consistency* can be
either 'default', 'consistent' or 'stale'.

*total_timeout* is a ttl of HTTP session. Should be more than *watch_seconds*

*cache_initial_warmup_timeout* is a ttl of HTTP session for initialize cache.
May be None, will use *total_timeout* insted
"""

def __init__(self,
kv_client: Consul.KV,
watch_seconds: str,
path: str,
total_timeout: int,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cache_initial_warmup_timeout? и наверное этот параметр необязательный. если меня общий таймаут устраивает, то че бы нет

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Имеешь ввиду еще один параметр добавить, чтобы тут его использовать при инициализации кеша ?
self.cache = {self.path: kv_client.get(key=path, total_timeout=total_timeout)[1]}

consistency_mode: ConsistencyMode,
cache_initial_warmup_timeout=None):
super().__init__(watch_seconds)
self.kv_client = kv_client
self.path = path
self.consistency_mode = consistency_mode.value
self.total_timeout = total_timeout
self.cache_initial_warmup_timeout = cache_initial_warmup_timeout
self.cache = {self.path: kv_client.get(
key=path,
total_timeout=self._get_warmup_timeout()
)[1]}

def _get_warmup_timeout(self):
if self.cache_initial_warmup_timeout:
return self.cache_initial_warmup_timeout
return self.total_timeout

def get_value(self):
return self.cache.get(self.path, None)

def _update_cache(self):
while self._running:
try:
params = {
'key': self.path,
'index': self.index,
'wait': self.watch_seconds,
'total_timeout': self.total_timeout,
'consistency': self.consistency_mode
}
log.debug(f'Param for kv query: {params}')
self.index, values = self.kv_client.get(**params)
old_cache = self.cache
self.cache = {self.path: values}
if self.callbacks and self._running:
for key, new_value in self.cache.items():
old_value = old_cache.get(key, None)
if old_value != new_value:
log.debug(f'Value was changed for key={key}. old: {old_value} new: {new_value}')
for callback in self.callbacks:
callback(key, new_value)
except ConsulException as e:
log.error(f'Some problem with update consul cache: {e}')


class HTTPClient(six.with_metaclass(abc.ABCMeta, object)):
def __init__(self, host='127.0.0.1', port=8500, scheme='http',
verify=True, cert=None, timeout=None):
Expand Down