diff --git a/smart_kit/start_points/main_loop_async_http.py b/smart_kit/start_points/main_loop_async_http.py index 45906dbd..1ba27c20 100644 --- a/smart_kit/start_points/main_loop_async_http.py +++ b/smart_kit/start_points/main_loop_async_http.py @@ -155,7 +155,7 @@ async def process_message(self, message: SmartAppFromMessage, *args, **kwargs): await self.save_user(db_uid, user, message) stats += "Saving time: {} msecs\n".format(save_timer.msecs) log(stats, params={log_const.KEY_NAME: "timings"}) - self.postprocessor.postprocess(user, message) + await self.postprocessor.postprocess(user, message) return answer, stats, user async def get_health_check(self, request: aiohttp.web.Request): diff --git a/smart_kit/start_points/main_loop_http.py b/smart_kit/start_points/main_loop_http.py index d139bf19..21cf0b8a 100644 --- a/smart_kit/start_points/main_loop_http.py +++ b/smart_kit/start_points/main_loop_http.py @@ -84,7 +84,7 @@ def process_message(self, message: SmartAppFromMessage, *args, **kwargs): self.loop.run_until_complete(self.save_user(db_uid, user, message)) stats += "Saving time: {} msecs\n".format(save_timer.msecs) log(stats, user=user, params={log_const.KEY_NAME: "timings"}) - self.postprocessor.postprocess(user, message) + self.loop.run_until_complete(self.postprocessor.postprocess(user, message)) return answer, stats def _get_headers(self, environ): diff --git a/smart_kit/start_points/main_loop_kafka.py b/smart_kit/start_points/main_loop_kafka.py index e0436a13..3fd05983 100644 --- a/smart_kit/start_points/main_loop_kafka.py +++ b/smart_kit/start_points/main_loop_kafka.py @@ -458,7 +458,7 @@ async def process_message(self, mq_message, consumer, kafka_key, stats, log_para "uid": user.id, "db_version": str(user.variables.get(user.USER_DB_VERSION))}, level="WARNING") - self.postprocessor.postprocess(user, message) + await self.postprocessor.postprocess(user, message) smart_kit_metrics.counter_save_collision_tries_left(self.app_name) consumer.commit_offset(mq_message) diff --git a/smart_kit/start_points/postprocess.py b/smart_kit/start_points/postprocess.py index 08752619..377d97cf 100644 --- a/smart_kit/start_points/postprocess.py +++ b/smart_kit/start_points/postprocess.py @@ -3,19 +3,19 @@ class PostprocessMainLoop: - def postprocess(self, user, message, *args, **kwargs): + async def postprocess(self, user, message, *args, **kwargs): pass class PostprocessCompose(PostprocessMainLoop): postprocessors: List[PostprocessMainLoop] = [] - def postprocess(self, user, message, *args, **kwargs): + async def postprocess(self, user, message, *args, **kwargs): for processor in self.postprocessors: - processor.postprocess(user, message, *args, **kwargs) + await processor.postprocess(user, message, *args, **kwargs) -def postprocessor_compose(*args: List[Type[PostprocessMainLoop]]): +def postprocessor_compose(*args: Type[PostprocessMainLoop]): class Compose(PostprocessCompose): postprocessors = [processor_cls() for processor_cls in args] return Compose