From 6557b787e3b87d3915d587e009aa27500e6119b1 Mon Sep 17 00:00:00 2001 From: Sarthak Babbar Date: Tue, 11 Apr 2023 20:42:16 -0400 Subject: [PATCH 1/3] Caching Version 0 Initial Commit --- .../writing_observer/awe_nlp.py | 128 +++++++++++++----- 1 file changed, 93 insertions(+), 35 deletions(-) diff --git a/modules/writing_observer/writing_observer/awe_nlp.py b/modules/writing_observer/writing_observer/awe_nlp.py index ea57732d..e2ac1146 100644 --- a/modules/writing_observer/writing_observer/awe_nlp.py +++ b/modules/writing_observer/writing_observer/awe_nlp.py @@ -4,11 +4,14 @@ import asyncio import enum -import hashlib import time import functools import os import multiprocessing +import datetime +from collections import defaultdict +import json +import warnings from concurrent.futures import ProcessPoolExecutor @@ -20,9 +23,7 @@ import awe_components.components.viewpointFeatures import awe_components.components.lexicalClusters import awe_components.components.contentSegmentation -import json -import time -import warnings + import writing_observer.nlp_indicators import learning_observer.kvs @@ -202,52 +203,109 @@ async def process_texts_parallel(texts, options=None): async def process_texts(writing_data, options=None, mode=RUN_MODES.MULTIPROCESSING): ''' - Process texts with caching - 1. Create hash of text - 2. Fetch cache data - 3. Check to see if all options are present - - Yes? Add to results - - No? Record missing options `needed_options` and add to `need_processing` - 4. If we need to process anything process it - 5. Update results/cache + Caching: + 1. Create text hash. + 2. Check if hash exist in cache. + 3. Check if some options are a subset of features_available + Yes: add the intersection of features_available and options to results + 4. Check if some options are a subset of features_running. + Yes: a. Wait for features_running to finish. + b. Update the cache + c. Add intersection of features_running and Options to results + 5. Check if additional features are required. + Yes: a. Collect options not covered till now and add to features_running. + b. Once finished, update cache and return results. ''' + recurring_sleep_time=1 # Time to wait between recurring calls to cache to check if features have finished running + results = [] + found_options = set() + cache = learning_observer.kvs.KVS() + if options is None: + options = [] processor = { RUN_MODES.MULTIPROCESSING: process_texts_parallel, RUN_MODES.SERIAL: process_texts_serial } - results = [] - need_processing = [] - needed_options = set() - cache = learning_observer.kvs.KVS() - for writing in writing_data: text = writing.get('text', '') if len(text) == 0: continue + + # Creating text hash text_hash = 'NLP_CACHE_' + learning_observer.util.secure_hash(text.encode('utf-8')) text_cache_data = await cache[text_hash] + if text_cache_data is None: + # print("{}: text was not found in cache.".format(asyncio.current_task().get_name())) text_cache_data = {} - writing.update(text_cache_data) - missing_options = set(options if options is not None else []).difference(text_cache_data.keys()) - needed_options.update(missing_options) - if len(missing_options) == 0: - results.append(writing) + + if 'features_running' not in text_cache_data: + features_running = set() else: - need_processing.append(writing) - - if len(need_processing) > 0: - just_the_text = [w.get("text", "") for w in need_processing] - annotated_texts = await processor[mode](just_the_text, list(needed_options)) - - for annotated_text, single_doc in zip(annotated_texts, need_processing): - if annotated_text != "Error": - single_doc.update(annotated_text) - text_hash = 'NLP_CACHE_' + learning_observer.util.secure_hash(single_doc['text'].encode('utf-8')) - new_cache = {k: v for k, v in single_doc.items() if k != 'student'} - await cache.set(text_hash, new_cache) - results.extend(need_processing) + features_running = set(json.loads(text_cache_data['features_running'])) + + if 'features_available' not in text_cache_data: + text_cache_data['features_available'] = dict() + + # Check if some options are a subset of features_available + # print("Features Available: ".format(text_cache_data['features_available'].keys())) + if text_cache_data['features_available']: + for feature in set(options).intersection(text_cache_data['features_available'].keys()): + found_options.add(feature) + # print("{}: Options that were found in features available: {}".format(asyncio.current_task().get_name(), found_options)) + writing.update(text_cache_data['features_available']) + + # If all options were found + if found_options == set(options): + # print("{}: All Options were found. Continuing....".format(asyncio.current_task().get_name())) + results.append(writing) + continue + + # Check if some options are a subset of features_running + # print("{}: Features Running: {}".format(asyncio.current_task().get_name(), features_running)) + # features that are needed but are already running + not_found = set(options) - found_options + features_needed_running = set() #Features that are needed but are already processing + if features_running: + features_needed_running = not_found.intersection(features_running) + if len(features_needed_running) > 0: + # print("{}: Features already processing: {}".format(asyncio.current_task().get_name(), features_needed_running)) + while True: + # print("\n {}: Waiting for features to finish running.".format(asyncio.current_task().get_name())) + new_cache = await cache[text_hash] + if new_cache['stop_time'] != "running": + break + asyncio.sleep(recurring_sleep_time) + + # features_running will be available in features_available after they finish running. + writing.update(text_cache_data['features_available']) + found_options = found_options.union(features_needed_running) + features_needed_running = set() + # If all options are found + if found_options == set(options): + # print("{}: All Options were found. Continuing....".format(asyncio.current_task().get_name())) + results.append(writing) + continue + + # Add not found options to features_running and update cache + # print("{}: Finding the rest of the options: {}".format(asyncio.current_task().get_name(), not_found)) + not_found = set(options) - found_options + features_running = not_found + text_cache_data['features_running'] = json.dumps(list(features_running)) + text_cache_data['start_time'] = str(datetime.datetime.now()) + text_cache_data['stop_time'] = "running" + await cache.set(text_hash, text_cache_data) + annotated_text = list(await processor[mode]([writing.get("text", "")], list(not_found)))[0] + asyncio.sleep(10) + text_cache_data['features_running'] = json.dumps([]) + text_cache_data['stop_time'] = str(datetime.datetime.now()) + if annotated_text != "Error": + text_cache_data['features_available'].update(annotated_text) + writing.update(annotated_text) + await cache.set(text_hash, text_cache_data) + results.append(writing) + return results From 5bfc11af0491ef350ee981917c04db9ce11935e6 Mon Sep 17 00:00:00 2001 From: Sarthak Babbar Date: Wed, 19 Apr 2023 15:36:59 -0400 Subject: [PATCH 2/3] Caching Version 0 Initial Commit With Fixes --- .../writing_observer/awe_nlp.py | 56 +++++++++---------- 1 file changed, 26 insertions(+), 30 deletions(-) diff --git a/modules/writing_observer/writing_observer/awe_nlp.py b/modules/writing_observer/writing_observer/awe_nlp.py index e2ac1146..1677ddf9 100644 --- a/modules/writing_observer/writing_observer/awe_nlp.py +++ b/modules/writing_observer/writing_observer/awe_nlp.py @@ -4,16 +4,15 @@ import asyncio import enum +import hashlib import time import functools import os import multiprocessing import datetime -from collections import defaultdict -import json -import warnings from concurrent.futures import ProcessPoolExecutor +from learning_observer.log_event import debug_log import spacy import coreferee @@ -23,12 +22,14 @@ import awe_components.components.viewpointFeatures import awe_components.components.lexicalClusters import awe_components.components.contentSegmentation - +import json +import warnings import writing_observer.nlp_indicators import learning_observer.kvs import learning_observer.util + RUN_MODES = enum.Enum('RUN_MODES', 'MULTIPROCESSING SERIAL') @@ -212,16 +213,18 @@ async def process_texts(writing_data, options=None, mode=RUN_MODES.MULTIPROCESSI Yes: a. Wait for features_running to finish. b. Update the cache c. Add intersection of features_running and Options to results - 5. Check if additional features are required. + 5. Check if additional features are required. Yes: a. Collect options not covered till now and add to features_running. b. Once finished, update cache and return results. ''' - recurring_sleep_time=1 # Time to wait between recurring calls to cache to check if features have finished running + recurring_sleep_time = 1 # Time to wait between recurring calls to cache to check if features have finished running + running_features_wait_time = 10 # Time to wait for features already running. results = [] found_options = set() cache = learning_observer.kvs.KVS() if options is None: options = [] + set_options = set(options) processor = { RUN_MODES.MULTIPROCESSING: process_texts_parallel, RUN_MODES.SERIAL: process_texts_serial @@ -237,7 +240,6 @@ async def process_texts(writing_data, options=None, mode=RUN_MODES.MULTIPROCESSI text_cache_data = await cache[text_hash] if text_cache_data is None: - # print("{}: text was not found in cache.".format(asyncio.current_task().get_name())) text_cache_data = {} if 'features_running' not in text_cache_data: @@ -249,32 +251,28 @@ async def process_texts(writing_data, options=None, mode=RUN_MODES.MULTIPROCESSI text_cache_data['features_available'] = dict() # Check if some options are a subset of features_available - # print("Features Available: ".format(text_cache_data['features_available'].keys())) if text_cache_data['features_available']: - for feature in set(options).intersection(text_cache_data['features_available'].keys()): - found_options.add(feature) - # print("{}: Options that were found in features available: {}".format(asyncio.current_task().get_name(), found_options)) + found_options = set_options.intersection(text_cache_data['features_available'].keys()) writing.update(text_cache_data['features_available']) # If all options were found - if found_options == set(options): - # print("{}: All Options were found. Continuing....".format(asyncio.current_task().get_name())) + if found_options == set_options: + # debug_log("{}: All Options were found. Continuing....".format(asyncio.current_task().get_name())) results.append(writing) continue # Check if some options are a subset of features_running - # print("{}: Features Running: {}".format(asyncio.current_task().get_name(), features_running)) # features that are needed but are already running - not_found = set(options) - found_options - features_needed_running = set() #Features that are needed but are already processing + not_found = set_options - found_options + features_needed_running = set() # Features that are needed but are already processing if features_running: features_needed_running = not_found.intersection(features_running) if len(features_needed_running) > 0: - # print("{}: Features already processing: {}".format(asyncio.current_task().get_name(), features_needed_running)) + debug_log("{}: Features already processing: {}".format(asyncio.current_task().get_name(), features_needed_running)) while True: - # print("\n {}: Waiting for features to finish running.".format(asyncio.current_task().get_name())) + debug_log("\n {}: Waiting for features to finish running.".format(asyncio.current_task().get_name())) new_cache = await cache[text_hash] - if new_cache['stop_time'] != "running": + if new_cache['stop_time'] != "running" or datetime.datetime.strptime(new_cache['start_time'], "%Y-%m-%d %H:%M:%S.%f") - datetime.datetime.now() > running_features_wait_time: break asyncio.sleep(recurring_sleep_time) @@ -283,26 +281,24 @@ async def process_texts(writing_data, options=None, mode=RUN_MODES.MULTIPROCESSI found_options = found_options.union(features_needed_running) features_needed_running = set() # If all options are found - if found_options == set(options): - # print("{}: All Options were found. Continuing....".format(asyncio.current_task().get_name())) + if found_options == set_options: + # debug_log("{}: All Options were found. Continuing....".format(asyncio.current_task().get_name())) results.append(writing) continue - - # Add not found options to features_running and update cache - # print("{}: Finding the rest of the options: {}".format(asyncio.current_task().get_name(), not_found)) - not_found = set(options) - found_options + + # Add not found options to features_running and update cache + # debug_log("{}: Finding the rest of the options: {}".format(asyncio.current_task().get_name(), not_found)) + not_found = set_options - found_options features_running = not_found text_cache_data['features_running'] = json.dumps(list(features_running)) text_cache_data['start_time'] = str(datetime.datetime.now()) text_cache_data['stop_time'] = "running" await cache.set(text_hash, text_cache_data) - annotated_text = list(await processor[mode]([writing.get("text", "")], list(not_found)))[0] - asyncio.sleep(10) + annotated_text = process_text(writing.get("text", ""), list(not_found)) text_cache_data['features_running'] = json.dumps([]) text_cache_data['stop_time'] = str(datetime.datetime.now()) - if annotated_text != "Error": - text_cache_data['features_available'].update(annotated_text) - writing.update(annotated_text) + text_cache_data['features_available'].update(annotated_text) + writing.update(annotated_text) await cache.set(text_hash, text_cache_data) results.append(writing) From e8ec722cebe3b4cdf6ccaeda074182223694be44 Mon Sep 17 00:00:00 2001 From: Sarthak Babbar Date: Wed, 19 Apr 2023 15:39:11 -0400 Subject: [PATCH 3/3] Caching Version 0 Initial Commit With Fixes --- modules/writing_observer/writing_observer/awe_nlp.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/modules/writing_observer/writing_observer/awe_nlp.py b/modules/writing_observer/writing_observer/awe_nlp.py index 1677ddf9..9f9a2706 100644 --- a/modules/writing_observer/writing_observer/awe_nlp.py +++ b/modules/writing_observer/writing_observer/awe_nlp.py @@ -257,7 +257,6 @@ async def process_texts(writing_data, options=None, mode=RUN_MODES.MULTIPROCESSI # If all options were found if found_options == set_options: - # debug_log("{}: All Options were found. Continuing....".format(asyncio.current_task().get_name())) results.append(writing) continue @@ -282,12 +281,10 @@ async def process_texts(writing_data, options=None, mode=RUN_MODES.MULTIPROCESSI features_needed_running = set() # If all options are found if found_options == set_options: - # debug_log("{}: All Options were found. Continuing....".format(asyncio.current_task().get_name())) results.append(writing) continue # Add not found options to features_running and update cache - # debug_log("{}: Finding the rest of the options: {}".format(asyncio.current_task().get_name(), not_found)) not_found = set_options - found_options features_running = not_found text_cache_data['features_running'] = json.dumps(list(features_running))