From fdbbeb20e8817683627e1ed3c706e0cae12bc835 Mon Sep 17 00:00:00 2001 From: Bradley Erickson Date: Mon, 12 Jun 2023 08:52:42 -0400 Subject: [PATCH] linted aggregator and added back in google text memoization hack --- .../writing_observer/aggregator.py | 117 +++++++++++++----- 1 file changed, 87 insertions(+), 30 deletions(-) diff --git a/modules/writing_observer/writing_observer/aggregator.py b/modules/writing_observer/writing_observer/aggregator.py index 654a2ace..eb9cd624 100644 --- a/modules/writing_observer/writing_observer/aggregator.py +++ b/modules/writing_observer/writing_observer/aggregator.py @@ -1,6 +1,7 @@ import sys import time import learning_observer.settings +from learning_observer.stream_analytics.fields import KeyField, KeyStateType, EventField import learning_observer.stream_analytics.helpers # import traceback import learning_observer.util @@ -127,14 +128,14 @@ def aggregate_course_summary_stats(student_data): async def get_latest_student_documents(student_data): ''' This will retrieve the latest student documents from the database. It breaks - abstractions. + abstractions. - It also involves some excess loops that are annoying but briefly we need to - determine which students actually *have* last writing data. Then we need to - go through and build keys for that data. Then we fetch the data itself. + It also involves some excess loops that are annoying but briefly we need to + determine which students actually *have* last writing data. Then we need to + go through and build keys for that data. Then we fetch the data itself. Later on in this file we need to marry the information again. This builds up a series of lists which are successively merged into a single dict with - the resulting data. + the resulting data. Some of what is copied along is clearly duplicative and probably unneeded. ''' @@ -147,21 +148,20 @@ async def get_latest_student_documents(student_data): # Compile a list of the active students. active_students = [s for s in student_data if 'writing_observer.writing_analysis.last_document' in s] - + # Now collect documents for all of the active students. document_keys = ([ learning_observer.stream_analytics.helpers.make_key( writing_observer.writing_analysis.reconstruct, { KeyField.STUDENT: s['user_id'], - EventField('doc_id'): s.get('writing_observer.writing_analysis.last_document', {}).get('document_id', None) + EventField('doc_id'): get_last_document_id(s) }, KeyStateType.INTERNAL - ) for s in active_students]) - + ) for s in active_students]) + kvs_data = await kvs.multiget(keys=document_keys) - # Return blank entries if no data, rather than None. This makes it possible # to use item.get with defaults sanely. For the sake of later alignment # we also zip up the items with the keys and users that they come from @@ -170,16 +170,17 @@ async def get_latest_student_documents(student_data): writing_data = [] for idx in range(len(document_keys)): student = active_students[idx] - doc = kvs_data[idx] - + doc = kvs_data[idx] + # If we have an empty item we simply return an empty dict with the # student but an empty doc value. - if (doc is None): doc = {} + if (doc is None): + doc = {} # Now insert the student data and pass it along. doc['student'] = student writing_data.append(doc) - + return writing_data @@ -219,21 +220,78 @@ async def merge_with_student_data(writing_data, student_data): processor = writing_observer.stub_nlp.process_texts -async def latest_data(student_data, options=None): +def get_last_document_id(s): + """ + Retrieves the ID of the latest document for a given student. + :param s: The student data. + :return: The ID of the latest document. + """ + return s.get('writing_observer.writing_analysis.last_document', {}).get('document_id', None) + + +async def update_reconstruct_data_with_google_api(runtime, student_data): + """ + This function updates the text reconstruction writing data from the extension with the + ground truth data from the Google Docs API. + :param runtime: The runtime for the application + :param student_data: A list of students + :return: A list of writing data, one for each student + """ + @learning_observer.cache.async_memoization() + async def fetch_doc_from_google(student): + """ + This function retrieves a single document text from Google based on the document ID. + :param student: A student object + :return: The text of the latest document + """ + import learning_observer.google + + kvs = learning_observer.kvs.KVS() + + docId = get_last_document_id(student) + # fetch text + text = await learning_observer.google.doctext(runtime, documentId=docId) + # set reconstruction data to ground truth + key = learning_observer.stream_analytics.helpers.make_key( + writing_observer.writing_analysis.reconstruct, + { + KeyField.STUDENT: student['user_id'], + EventField('doc_id'): docId + }, + KeyStateType.INTERNAL + ) + await kvs.set(key, text) + return text + + # For each student, retrieve the document text from Google and store it in a list + writing_data = [ + await fetch_doc_from_google(s) + if get_last_document_id(s) is not None else {} + for s in student_data + ] + return writing_data + + +async def latest_data(runtime, student_data, options=None): ''' - HACK HACK HACK - - This code needs to take the student data as a dict and then - collect the latest writing data for each student (assuming - they have it). The code then passes that writing data on - to Paul's code for processing. For the time being this - works by essentially building up some large dicts that - contain the text and student data together. - - In the long run this should *all* be replaced by a cleaner - object interface that hides some of this from the user - but for the now we'll roll with this. + Retrieves the latest writing data for a set of students. + + + :param runtime: The runtime object from the server + # for annotated_text, single_doc in zip(annotated_texts, writing_data): + :param student_data: The student data. + # if annotated_text != "Error": + :param options: Additional options to pass to the text processing pipeline. + # single_doc.update(annotated_text) + :return: The latest writing data. ''' + + # HACK we have a cache downstream that relies on redis_ephemeral being setup + # when that is resolved, we can remove the feature flag + # Update reconstruct data from KVS with ground truth from Google API + if learning_observer.settings.module_setting('writing_observer', 'use_google_documents', False): + await update_reconstruct_data_with_google_api(runtime, student_data) + # Get the latest documents with the students appended. writing_data = await get_latest_student_documents(student_data) @@ -241,12 +299,11 @@ async def latest_data(student_data, options=None): writing_data = await remove_extra_data(writing_data) # print(">>> WRITE DATA-premerge: {}".format(writing_data)) - + # This is the error. Skipping now. writing_data_merge = await merge_with_student_data(writing_data, student_data) # print(">>> WRITE DATA-postmerge: {}".format(writing_data_merge)) - # #print(">>>> PRINT WRITE DATA: Merge") # #print(writing_data) @@ -260,5 +317,5 @@ async def latest_data(student_data, options=None): writing_data = await merge_with_student_data(writing_data, student_data) writing_data = await processor(writing_data, options) - + return {'latest_writing_data': writing_data}