From 7e383498f395b4863037924bded8f5a7490dfa76 Mon Sep 17 00:00:00 2001 From: Jeffrey Keene Date: Wed, 13 Oct 2021 16:01:54 -0600 Subject: [PATCH 1/8] jkeene.aws_msk: added aws msk iam to connection and clients of the connection. --- aws_example.py | 102 +++++++++++++++++++++++ example.py | 14 ++-- kafka/admin/client.py | 6 ++ kafka/aws_utils.py | 39 +++++++++ kafka/client_async.py | 8 +- kafka/conn.py | 174 +++++++++++++++++++++++++++++++++++++++- kafka/consumer/group.py | 6 ++ kafka/producer/kafka.py | 6 ++ setup.py | 1 + 9 files changed, 346 insertions(+), 10 deletions(-) create mode 100755 aws_example.py create mode 100644 kafka/aws_utils.py diff --git a/aws_example.py b/aws_example.py new file mode 100755 index 000000000..4e4c233e3 --- /dev/null +++ b/aws_example.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python +import threading, time + +from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer +from kafka.admin import NewTopic +import sys +from os import environ + +BOOTSTRAP_SERVERS = environ.get("BOOTSTRAP_SERVER") +AWS_ACCESS_KEY_ID = environ.get("AWS_ACCESS_KEY_ID") +AWS_SECRET_ACCESS_KEY = environ.get("AWS_SECRET_ACCESS_KEY") +AWS_REGION = environ.get("AWS_REGION") +TOPIC_NAME = 'data-team-dev' + + +class Producer(threading.Thread): + def __init__(self): + threading.Thread.__init__(self) + self.stop_event = threading.Event() + + def stop(self): + self.stop_event.set() + + def run(self): + producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS, + sasl_aws_msk_iam_access_key_id=AWS_ACCESS_KEY_ID, + sasl_aws_msk_iam_secret_access_key=AWS_SECRET_ACCESS_KEY, + sasl_aws_msk_region=AWS_REGION, + ) + + while not self.stop_event.is_set(): + producer.send(TOPIC_NAME, b"test") + producer.send(TOPIC_NAME, b"\xc2Hola, mundo!") + time.sleep(1) + + producer.close() + + +class Consumer(threading.Thread): + def __init__(self): + threading.Thread.__init__(self) + self.stop_event = threading.Event() + + def stop(self): + self.stop_event.set() + + def run(self): + consumer = KafkaConsumer(bootstrap_servers=BOOTSTRAP_SERVERS, + auto_offset_reset='earliest', + consumer_timeout_ms=1000, + sasl_aws_msk_iam_access_key_id=AWS_ACCESS_KEY_ID, + sasl_aws_msk_iam_secret_access_key=AWS_SECRET_ACCESS_KEY, + sasl_aws_msk_region=AWS_REGION, + ) + consumer.subscribe([TOPIC_NAME]) + + while not self.stop_event.is_set(): + for message in consumer: + print(f"consumer: {message}") + if self.stop_event.is_set(): + break + + consumer.close() + + +def main(): + # Create 'TOPIC_NAME' topic + try: + admin = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS, + sasl_aws_msk_iam_access_key_id=AWS_ACCESS_KEY_ID, + sasl_aws_msk_iam_secret_access_key=AWS_SECRET_ACCESS_KEY, + sasl_aws_msk_region=AWS_REGION, + ) + + topic = NewTopic(name=TOPIC_NAME, + num_partitions=1, + replication_factor=1) + admin.create_topics([topic]) + except Exception as e: + print(str(e), file=sys.stderr) + + tasks = [ + Producer(), + Consumer() + ] + + # Start threads of a publisher/producer and a subscriber/consumer to 'my-topic' Kafka topic + for t in tasks: + t.start() + + time.sleep(10) + + # Stop threads + for task in tasks: + task.stop() + + for task in tasks: + task.join() + + +if __name__ == "__main__": + main() diff --git a/example.py b/example.py index 9907450f6..54ebae584 100755 --- a/example.py +++ b/example.py @@ -3,7 +3,9 @@ from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer from kafka.admin import NewTopic +import sys +BOOTSTRAP_SERVERS = 'localhost:9092' class Producer(threading.Thread): def __init__(self): @@ -14,7 +16,7 @@ def stop(self): self.stop_event.set() def run(self): - producer = KafkaProducer(bootstrap_servers='localhost:9092') + producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS) while not self.stop_event.is_set(): producer.send('my-topic', b"test") @@ -33,14 +35,14 @@ def stop(self): self.stop_event.set() def run(self): - consumer = KafkaConsumer(bootstrap_servers='localhost:9092', + consumer = KafkaConsumer(bootstrap_servers=BOOTSTRAP_SERVERS, auto_offset_reset='earliest', consumer_timeout_ms=1000) consumer.subscribe(['my-topic']) while not self.stop_event.is_set(): for message in consumer: - print(message) + print(f"consumer: {message}") if self.stop_event.is_set(): break @@ -50,14 +52,14 @@ def run(self): def main(): # Create 'my-topic' Kafka topic try: - admin = KafkaAdminClient(bootstrap_servers='localhost:9092') + admin = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS) topic = NewTopic(name='my-topic', num_partitions=1, replication_factor=1) admin.create_topics([topic]) - except Exception: - pass + except Exception as e: + print(str(e), file=sys.stderr) tasks = [ Producer(), diff --git a/kafka/admin/client.py b/kafka/admin/client.py index fd4d66110..d4e3025a0 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -146,6 +146,9 @@ class KafkaAdminClient(object): sasl mechanism handshake. Default: one of bootstrap servers sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None + sasl_aws_msk_iam_access_key_id (str): aws access key id for msk_iam auth. Default: None + sasl_aws_msk_iam_secret_access_key (str): aws secret access key for msk_iam auth. Default: None + sasl_aws_msk_region (str): aws region for msk_iam auth. Default: None kafka_client (callable): Custom class / callable for creating KafkaClient instances """ @@ -182,6 +185,9 @@ class KafkaAdminClient(object): 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, + 'sasl_aws_msk_iam_access_key_id': None, + 'sasl_aws_msk_iam_secret_access_key': None, + 'sasl_aws_msk_region': None, # metrics configs 'metric_reporters': [], diff --git a/kafka/aws_utils.py b/kafka/aws_utils.py new file mode 100644 index 000000000..0c144297c --- /dev/null +++ b/kafka/aws_utils.py @@ -0,0 +1,39 @@ +from hashlib import sha256 +from string import digits, ascii_letters + +ALPHA_NUMERIC_AND_SOME_MISC = "".join(['_', '-', '~', '.']) + digits + ascii_letters + + +def is_alpha_numeric_or_some_misc(arg: str) -> bool: + return arg in ALPHA_NUMERIC_AND_SOME_MISC + + +def bin_to_hex(s: str) -> str: + as_bytes = s.encode() + return as_bytes.hex().upper() + + +def aws_uri_encode(arg: str, encode_slash: bool = True) -> str: + result = '' + chars = arg + + for char in chars: + is_alpha_numeric = is_alpha_numeric_or_some_misc(arg) + is_slash = char == '/' + if is_alpha_numeric: + result += char + elif is_slash: + if encode_slash: + result += '%2F' + else: + result += char + else: + result += '%' + bin_to_hex(char) + + return result + + +def get_digester(): + return sha256() + +Ø diff --git a/kafka/client_async.py b/kafka/client_async.py index 58f22d4ec..436243ba9 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -154,6 +154,9 @@ class KafkaClient(object): sasl mechanism handshake. Default: one of bootstrap servers sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None + sasl_aws_msk_iam_access_key_id (str): aws access key id for msk_iam auth. Default: None + sasl_aws_msk_iam_secret_access_key (str): aws secret access key for msk_iam auth. Default: None + sasl_aws_msk_region (str): aws region for msk_iam auth. Default: None """ DEFAULT_CONFIG = { @@ -192,7 +195,10 @@ class KafkaClient(object): 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, - 'sasl_oauth_token_provider': None + 'sasl_oauth_token_provider': None, + 'sasl_aws_msk_iam_access_key_id': None, + 'sasl_aws_msk_iam_secret_access_key': None, + 'sasl_aws_msk_region': None, } def __init__(self, **configs): diff --git a/kafka/conn.py b/kafka/conn.py index cac354875..d8bdf21cb 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1,11 +1,13 @@ from __future__ import absolute_import, division import copy +import datetime import errno import io import logging +import typing from random import shuffle, uniform - +from .aws_utils import aws_uri_encode, get_digester # selectors in stdlib as of py3.4 try: import selectors # pylint: disable=import-error @@ -17,6 +19,7 @@ import struct import threading import time +import json from kafka.vendor import six @@ -34,6 +37,7 @@ from kafka.protocol.types import Int32, Int8 from kafka.scram import ScramClient from kafka.version import __version__ +import hmac if six.PY2: @@ -191,6 +195,9 @@ class BrokerConnection(object): sasl mechanism handshake. Default: one of bootstrap servers sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None + sasl_aws_msk_iam_access_key_id (str): aws access key id for msk_iam auth. Default: None + sasl_aws_msk_iam_secret_access_key (str): aws secret access key for msk_iam auth. Default: None + sasl_aws_msk_region (str): aws region for msk_iam auth. Default: None """ DEFAULT_CONFIG = { @@ -224,10 +231,13 @@ class BrokerConnection(object): 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, - 'sasl_oauth_token_provider': None + 'sasl_oauth_token_provider': None, + 'sasl_aws_msk_iam_access_key_id': None, + 'sasl_aws_msk_iam_secret_access_key': None, + 'sasl_aws_msk_region': None, } SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL') - SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512") + SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512", "AWSMSKIAM") def __init__(self, host, port, afi, **configs): self.host = host @@ -276,6 +286,10 @@ def __init__(self, host, port, afi, **configs): token_provider = self.config['sasl_oauth_token_provider'] assert token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl' assert callable(getattr(token_provider, "token", None)), 'sasl_oauth_token_provider must implement method #token()' + if self.config["sasl_mechanism"] == "AWSMSKIAM": + assert self._amz_access_key_id, "sasl_aws_msk_iam_access_key_id must be provided" + assert self._amz_secret_access_key, "sasl_aws_msk_iam_secret_access_key must be provided" + assert self._amz_region, "sasl_aws_msk_region must be provided" # This is not a general lock / this class is not generally thread-safe yet # However, to avoid pushing responsibility for maintaining # per-connection locks to the upstream client, we will use this lock to @@ -561,6 +575,8 @@ def _handle_sasl_handshake_response(self, future, response): return self._try_authenticate_oauth(future) elif self.config['sasl_mechanism'].startswith("SCRAM-SHA-"): return self._try_authenticate_scram(future) + elif self.config['sasl_mechanism'] == 'AWSMSKIAM': + return self._try_authenticate_awsmskiam(future) else: return future.failure( Errors.UnsupportedSaslMechanismError( @@ -823,6 +839,158 @@ def _build_oauth_client_request(self): token_provider = self.config['sasl_oauth_token_provider'] return "n,,\x01auth=Bearer {}{}\x01\x01".format(token_provider.token(), self._token_extensions()) + @property + def _amz_access_key_id(self) -> typing.Optional[str]: + try: + access_key_id = self.config['sasl_aws_msk_iam_access_key_id'] + return access_key_id + except KeyError: + return None + + @property + def _amz_secret_access_key(self) -> typing.Optional[str]: + try: + secret_key_id = self.config['sasl_aws_msk_iam_secret_access_key'] + return secret_key_id + except KeyError: + return None + + @property + def _amz_region(self) -> typing.Optional[str]: + try: + aws_region = self.config['sasl_aws_msk_region'] + return aws_region + except KeyError: + return None + + @property + def hostname(self) -> str: + return self.host.split(":")[0] + + def _pull_mskiam_server_response(self) -> bytes: + to_read = 1 + message_start_token = b'{' + message_end_token = b'}' + token_stack = [] + buffer = io.BytesIO() + fragment = self._recv_bytes_blocking(to_read) + good_response = fragment == message_start_token + if not good_response: + #Connection errors, including a timeout, will not be caught by this method. + raise ValueError(f"AWS-MSK-IAM: server responded with empty message or an invalid message. NOT AUTHED: {fragment}") + + buffer.write(fragment) + token_stack.push(fragment) + while token_stack: + fragment = self._recv_bytes_blocking(to_read) + buffer.write(fragment) + if fragment == message_start_token: + token_stack.push(fragment) + elif fragment == message_end_token: + token_stack.pop() + return buffer.getvalue() + + def _try_authenticate_awsmskiam(self, future): + message = self._authentication_payload(host=self.hostname).encode() + with self._lock: + if not self._can_send_recv(): + err = Errors.NodeNotReadyError(str(self)) + close = False + else: + try: + self._send_bytes_blocking(message) + response = self._pull_mskiam_server_response().decode() + except (ConnectionError, TimeoutError, ValueError) as e: + log.exception("%s: Error receiving reply from server while trying awsmskiam auth", self) + err = Errors.KafkaConnectionError("%s: %s" % (self, e)) + close = True + if err is not None: + if close: + self.close(error=err) + return future.failure(err) + + log.info(f"AWS-MSK-IAM authentication {response=}") + return future.success(True) + + def _authentication_payload(self, host: str) -> str: + now = datetime.datetime.now() + payload = { + 'version': '2020_10_22', + 'host': host, + 'user-agent': 'kafka-python', + 'action': 'kafka-cluster:Connect', + 'x-amz-algorithm': 'AWS4-HMAC-SHA256', + 'x-amz-credential': self._get_amz_credential(request_time=now), + 'x-amz-date': now.strftime("%Y%m%dT%H%M%SZ"), + 'x-amz-signedheaders': 'host', + 'x-amz-expires': '900', + 'x-amz-signature': self._get_amz_signature(current_time=now, host=host) + } + #TODO: This json structure can handle session_tokens. Add logic to handle it if detected + return json.dumps(payload) + + def _get_canonical_request(self, request_time: datetime.datetime, host: str) -> str: + return "\n".join([ + "GET", + "", + self._get_cannonical_query_string(current_time=request_time), + self._get_amz_canonical_headers(host=host), + self._get_amz_signed_headers(), + self._get_amz_hashed_payload(), + ]) + + @staticmethod + def _get_amz_date_str(arg: datetime.datetime) -> str: + date_str = arg.strftime('%Y%m%d') + return date_str + + def _get_amz_credential(self, request_time: datetime.datetime) -> str: + date_str = self._get_amz_date_str(request_time) + access_key_id = self._amz_region + aws_region = self.config['sasl_aws_msk_region'] + return f'{access_key_id}/{date_str}/{aws_region}/kafka-cluster/aws4_request' + + @staticmethod + def _get_amz_canonical_headers(host: str) -> str: + return f"host:{host}\n" + + @staticmethod + def _get_amz_signed_headers() -> str: + return "host" + + def _get_cannonical_query_string(self, current_time: datetime.datetime) -> str: + query = aws_uri_encode("Action") + "=" + aws_uri_encode("kafka-cluster:Connect") + "&" + \ + aws_uri_encode("X-Amz-Algorithm") + "=" + aws_uri_encode("AWS4-HMAC-SHA256") + "&" + \ + aws_uri_encode("X-Amz-Credential") + "=" + aws_uri_encode(self._get_amz_credential(current_time)) + "&" + \ + aws_uri_encode("X-Amz-Date") + "=" + aws_uri_encode(current_time.strftime("%Y%m%dT%H%M%SZ")) + "&" + \ + aws_uri_encode("X-Amz-Expires") + "=" + aws_uri_encode("900") + "&" + \ + aws_uri_encode("X-Amz-SignedHeaders") + "=" + aws_uri_encode("host") + return query + + @staticmethod + def _get_amz_hashed_payload() -> str: + digest = get_digester() + digest.update("".encode()) + return digest.hexdigest().lower() + + def _get_amz_string_to_sign(self, current_time: datetime.datetime, host: str): + scope = f'self.config["sasl_aws_msk_region"]/kafka-cluster' + canonical_request = self._get_canonical_request(request_time=current_time, host=host) + digester = get_digester() + digester.update(canonical_request.encode()) + hexxed = digester.hexdigest().lower() + return f"AWS4-HMAC-SHA256\n{self._get_amz_date_str(current_time)}\n{scope}\n{hexxed}" + + def _get_amz_signature(self, current_time: datetime.datetime, host: str) -> str: + hmac_digest = "SHA256" + date_key = hmac.digest(key=f"AWS4{self._amz_secret_access_key}".encode(), msg=self._get_amz_date_str(current_time).encode(), + digest=hmac_digest) + date_region_key = hmac.digest(key=date_key, msg=self._amz_region.encode(), digest=hmac_digest) + date_region_service_key = hmac.digest(key=date_region_key, msg='kafka-cluster'.encode(), digest=hmac_digest) + signing_key = hmac.digest(key=date_region_service_key, msg='aws4_request'.encode(), digest=hmac_digest) + signature = hmac.digest(key=signing_key, msg=self._get_amz_string_to_sign(current_time=current_time, host=host).encode(), digest=hmac_digest) + return signature.hex().lower() + def _token_extensions(self): """ Return a string representation of the OPTIONAL key-value pairs that can be sent with an OAUTHBEARER diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index a1d1dfa37..6a271ad6a 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -244,6 +244,9 @@ class KafkaConsumer(six.Iterator): sasl mechanism handshake. Default: one of bootstrap servers sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None + sasl_aws_msk_iam_access_key_id (str): aws access key id for msk_iam auth. Default: None + sasl_aws_msk_iam_secret_access_key (str): aws secret access key for msk_iam auth. Default: None + sasl_aws_msk_region (str): aws region for msk_iam auth. Default: None kafka_client (callable): Custom class / callable for creating KafkaClient instances Note: @@ -306,6 +309,9 @@ class KafkaConsumer(six.Iterator): 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, + 'sasl_aws_msk_iam_access_key_id': None, + 'sasl_aws_msk_iam_secret_access_key': None, + 'sasl_aws_msk_region': None, 'legacy_iterator': False, # enable to revert to < 1.4.7 iterator 'kafka_client': KafkaClient, } diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index dd1cc508c..4363f2247 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -280,6 +280,9 @@ class KafkaProducer(object): sasl mechanism handshake. Default: one of bootstrap servers sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None + sasl_aws_msk_iam_access_key_id (str): aws access key id for msk_iam auth. Default: None + sasl_aws_msk_iam_secret_access_key (str): aws secret access key for msk_iam auth. Default: None + sasl_aws_msk_region (str): aws region for msk_iam auth. Default: None kafka_client (callable): Custom class / callable for creating KafkaClient instances Note: @@ -334,6 +337,9 @@ class KafkaProducer(object): 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, + 'sasl_aws_msk_iam_access_key_id': None, + 'sasl_aws_msk_iam_secret_access_key': None, + 'sasl_aws_msk_region': None, 'kafka_client': KafkaClient, } diff --git a/setup.py b/setup.py index fe8a594f3..39a384e9f 100644 --- a/setup.py +++ b/setup.py @@ -41,6 +41,7 @@ def run(cls): "lz4": ["lz4"], "snappy": ["python-snappy"], "zstd": ["python-zstandard"], + "pytest": "pytest", }, cmdclass={"test": Tox}, packages=find_packages(exclude=['test']), From 9c4fef9cb571152a8bfcb31ab50a9b3dde1b9a96 Mon Sep 17 00:00:00 2001 From: Jeffrey Keene Date: Wed, 13 Oct 2021 16:02:58 -0600 Subject: [PATCH 2/8] jkeene.aws_msk: removed a wayward symbol. --- kafka/aws_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/aws_utils.py b/kafka/aws_utils.py index 0c144297c..6655a9175 100644 --- a/kafka/aws_utils.py +++ b/kafka/aws_utils.py @@ -36,4 +36,4 @@ def aws_uri_encode(arg: str, encode_slash: bool = True) -> str: def get_digester(): return sha256() -Ø + From 527749887ce69216098f31f0b17e4f6a6158e5fb Mon Sep 17 00:00:00 2001 From: Jeffrey Keene Date: Fri, 15 Oct 2021 11:46:41 -0600 Subject: [PATCH 3/8] fixing env variable in aws_example.py --- aws_example.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_example.py b/aws_example.py index 4e4c233e3..4f479a970 100755 --- a/aws_example.py +++ b/aws_example.py @@ -6,7 +6,7 @@ import sys from os import environ -BOOTSTRAP_SERVERS = environ.get("BOOTSTRAP_SERVER") +BOOTSTRAP_SERVERS = environ.get("BOOTSTRAP_SERVERS") AWS_ACCESS_KEY_ID = environ.get("AWS_ACCESS_KEY_ID") AWS_SECRET_ACCESS_KEY = environ.get("AWS_SECRET_ACCESS_KEY") AWS_REGION = environ.get("AWS_REGION") From f88b78b2e893fb17196f75b33441c80d777ba708 Mon Sep 17 00:00:00 2001 From: Jeffrey Keene Date: Fri, 15 Oct 2021 11:57:28 -0600 Subject: [PATCH 4/8] adding info level logging --- kafka/conn.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/conn.py b/kafka/conn.py index d8bdf21cb..5f3059880 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -46,6 +46,7 @@ BlockingIOError = Exception log = logging.getLogger(__name__) +log.setLevel(logging.INFO) DEFAULT_KAFKA_PORT = 9092 From a24dff9af0a4b584744d21e0b58e8d300f9d3100 Mon Sep 17 00:00:00 2001 From: Jeffrey Keene Date: Fri, 15 Oct 2021 13:31:50 -0600 Subject: [PATCH 5/8] adding ssl --- aws_example.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/aws_example.py b/aws_example.py index 4f479a970..dd8f9a5ec 100755 --- a/aws_example.py +++ b/aws_example.py @@ -26,6 +26,7 @@ def run(self): sasl_aws_msk_iam_access_key_id=AWS_ACCESS_KEY_ID, sasl_aws_msk_iam_secret_access_key=AWS_SECRET_ACCESS_KEY, sasl_aws_msk_region=AWS_REGION, + security_protocol="SSL" ) while not self.stop_event.is_set(): @@ -51,6 +52,7 @@ def run(self): sasl_aws_msk_iam_access_key_id=AWS_ACCESS_KEY_ID, sasl_aws_msk_iam_secret_access_key=AWS_SECRET_ACCESS_KEY, sasl_aws_msk_region=AWS_REGION, + security_protocol="SSL" ) consumer.subscribe([TOPIC_NAME]) @@ -70,6 +72,7 @@ def main(): sasl_aws_msk_iam_access_key_id=AWS_ACCESS_KEY_ID, sasl_aws_msk_iam_secret_access_key=AWS_SECRET_ACCESS_KEY, sasl_aws_msk_region=AWS_REGION, + security_protocol="SSL" ) topic = NewTopic(name=TOPIC_NAME, From 1726fdd673c78ec8eefdca36e40ceec97bb6b4f1 Mon Sep 17 00:00:00 2001 From: Jeffrey Keene Date: Fri, 15 Oct 2021 13:36:08 -0600 Subject: [PATCH 6/8] adding print statement for debugging. --- kafka/conn.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/conn.py b/kafka/conn.py index 5f3059880..1746b173e 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -892,6 +892,7 @@ def _pull_mskiam_server_response(self) -> bytes: return buffer.getvalue() def _try_authenticate_awsmskiam(self, future): + print("_try_authenticate_awsmskiam") message = self._authentication_payload(host=self.hostname).encode() with self._lock: if not self._can_send_recv(): From 22f82420aea13f7edf7f811d366514658b1f687e Mon Sep 17 00:00:00 2001 From: Andrew Vaccaro Date: Tue, 19 Oct 2021 22:38:51 -0400 Subject: [PATCH 7/8] use new env vars --- aws_example.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aws_example.py b/aws_example.py index dd8f9a5ec..b9d1787aa 100755 --- a/aws_example.py +++ b/aws_example.py @@ -6,10 +6,10 @@ import sys from os import environ -BOOTSTRAP_SERVERS = environ.get("BOOTSTRAP_SERVERS") -AWS_ACCESS_KEY_ID = environ.get("AWS_ACCESS_KEY_ID") -AWS_SECRET_ACCESS_KEY = environ.get("AWS_SECRET_ACCESS_KEY") -AWS_REGION = environ.get("AWS_REGION") +BOOTSTRAP_SERVERS = environ.get("KAFKA_BROKERS") +AWS_ACCESS_KEY_ID = environ.get("KAFKA_AWS_ACCESS_ID") +AWS_SECRET_ACCESS_KEY = environ.get("KAFKA_AWS_SECRET_ID") +AWS_REGION = environ.get("KAFKA_AWS_REGION") TOPIC_NAME = 'data-team-dev' From 2b409f35c359b67d4b3529428441c714cc6ef229 Mon Sep 17 00:00:00 2001 From: Jeffrey Keene Date: Wed, 20 Oct 2021 14:01:17 -0600 Subject: [PATCH 8/8] changing env variable names. --- aws_example.py | 28 ++++++++++++++++++---------- kafka/conn.py | 2 +- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/aws_example.py b/aws_example.py index dd8f9a5ec..dbcf1244a 100755 --- a/aws_example.py +++ b/aws_example.py @@ -6,12 +6,16 @@ import sys from os import environ -BOOTSTRAP_SERVERS = environ.get("BOOTSTRAP_SERVERS") -AWS_ACCESS_KEY_ID = environ.get("AWS_ACCESS_KEY_ID") -AWS_SECRET_ACCESS_KEY = environ.get("AWS_SECRET_ACCESS_KEY") -AWS_REGION = environ.get("AWS_REGION") -TOPIC_NAME = 'data-team-dev' +BOOTSTRAP_SERVERS = environ.get("KAFKA_BROKERS").split(',') +AWS_ACCESS_KEY_ID = environ.get("KAFKA_AWS_ACCESS_KEY_ID") +AWS_SECRET_ACCESS_KEY = environ.get("KAFKA_AWS_SECRET_ACCESS_KEY") +AWS_REGION = environ.get("KAFKA_AWS_REGION") +TOPIC_NAME = 'data.sandbox' +GROUP_NAME = 'data.sandbox' +SASL_MECHANISM = 'AWSMSKIAM' +SASL_PROTOCOL = 'SSL' +SASL_PROTOCOL = 'SASL_SSL' class Producer(threading.Thread): def __init__(self): @@ -26,7 +30,8 @@ def run(self): sasl_aws_msk_iam_access_key_id=AWS_ACCESS_KEY_ID, sasl_aws_msk_iam_secret_access_key=AWS_SECRET_ACCESS_KEY, sasl_aws_msk_region=AWS_REGION, - security_protocol="SSL" + security_protocol=SASL_PROTOCOL, + sasl_mechanism=SASL_MECHANISM, ) while not self.stop_event.is_set(): @@ -52,7 +57,9 @@ def run(self): sasl_aws_msk_iam_access_key_id=AWS_ACCESS_KEY_ID, sasl_aws_msk_iam_secret_access_key=AWS_SECRET_ACCESS_KEY, sasl_aws_msk_region=AWS_REGION, - security_protocol="SSL" + security_protocol=SASL_PROTOCOL, + group_id=GROUP_NAME, + sasl_mechanism=SASL_MECHANISM, ) consumer.subscribe([TOPIC_NAME]) @@ -72,19 +79,20 @@ def main(): sasl_aws_msk_iam_access_key_id=AWS_ACCESS_KEY_ID, sasl_aws_msk_iam_secret_access_key=AWS_SECRET_ACCESS_KEY, sasl_aws_msk_region=AWS_REGION, - security_protocol="SSL" + security_protocol=SASL_PROTOCOL, + sasl_mechanism=SASL_MECHANISM, ) topic = NewTopic(name=TOPIC_NAME, num_partitions=1, replication_factor=1) - admin.create_topics([topic]) + #admin.create_topics([topic]) except Exception as e: print(str(e), file=sys.stderr) tasks = [ Producer(), - Consumer() + #Consumer() ] # Start threads of a publisher/producer and a subscriber/consumer to 'my-topic' Kafka topic diff --git a/kafka/conn.py b/kafka/conn.py index 1746b173e..6cfbca4d5 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -46,7 +46,7 @@ BlockingIOError = Exception log = logging.getLogger(__name__) -log.setLevel(logging.INFO) +log.setLevel(logging.DEBUG) DEFAULT_KAFKA_PORT = 9092