Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions src/event_gate_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#
import base64
import json
import logging
import sys
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
Expand All @@ -28,6 +29,10 @@
import boto3
from confluent_kafka import Producer

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

with open("conf/config.json", "r") as file:
CONFIG = json.load(file)

Expand All @@ -53,47 +58,47 @@
ACCESS = json.load(file)

TOKEN_PROVIDER_URL = CONFIG["tokenProviderUrl"]
print("Loaded configs")
logger.info("Loaded configs")

token_public_key_encoded = requests.get(CONFIG["tokenPublicKeyUrl"], verify=False).json()["key"]
TOKEN_PUBLIC_KEY = serialization.load_der_public_key(base64.b64decode(token_public_key_encoded))
print("Loaded token public key")
logger.info("Loaded token public key")

kafka_producer = Producer({"bootstrap.servers": CONFIG["kafkaBootstrapServer"]})
print("Initialized kafka producer")
logger.info("Initialized kafka producer")

def kafkaWrite(topicName, message):
print(f"Sending to kafka {topicName}")
logger.info(f"Sending to kafka {topicName}")
error = []
kafka_producer.produce(topicName,
key="",
value=json.dumps(message).encode("utf-8"),
callback = lambda err, msg: error.append(err) if err is not None else None)
kafka_producer.flush()
if error:
print(error)
logger.error(error)
return 500
else:
print("OK")
logger.info("OK")
return 202

def getToken():
print("Handling GET Token")
logger.info("Handling GET Token")
return {
"statusCode": 303,
"headers": {"Location": TOKEN_PROVIDER_URL}
}

def getTopics():
print("Handling GET Topics")
logger.info("Handling GET Topics")
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps([topicName for topicName in TOPICS])
}

def getTopicSchema(topicName):
print(f"Handling GET TopicSchema({topicName})")
logger.info(f"Handling GET TopicSchema({topicName})")
if topicName not in TOPICS:
return { "statusCode": 404 }

Expand All @@ -104,7 +109,7 @@ def getTopicSchema(topicName):
}

def postTopicMessage(topicName, topicMessage, tokenEncoded):
print(f"Handling POST {topicName}")
logger.info(f"Handling POST {topicName}")
try:
token = jwt.decode(tokenEncoded, TOKEN_PUBLIC_KEY, algorithms=["RS256"])
except Exception as e:
Expand Down Expand Up @@ -147,5 +152,5 @@ def lambda_handler(event, context):
sys.exit("TERMINATING")
return {"statusCode": 404}
except Exception as e:
print(f"Unexpected exception: {e}")
logger.error(f"Unexpected exception: {e}")
return {"statusCode": 500}