Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 87 additions & 30 deletions modules/writing_observer/writing_observer/aggregator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import sys
import time
import learning_observer.settings
from learning_observer.stream_analytics.fields import KeyField, KeyStateType, EventField
import learning_observer.stream_analytics.helpers
# import traceback
import learning_observer.util
Expand Down Expand Up @@ -127,14 +128,14 @@ def aggregate_course_summary_stats(student_data):
async def get_latest_student_documents(student_data):
'''
This will retrieve the latest student documents from the database. It breaks
abstractions.
abstractions.

It also involves some excess loops that are annoying but briefly we need to
determine which students actually *have* last writing data. Then we need to
go through and build keys for that data. Then we fetch the data itself.
It also involves some excess loops that are annoying but briefly we need to
determine which students actually *have* last writing data. Then we need to
go through and build keys for that data. Then we fetch the data itself.
Later on in this file we need to marry the information again. This builds
up a series of lists which are successively merged into a single dict with
the resulting data.
the resulting data.

Some of what is copied along is clearly duplicative and probably unneeded.
'''
Expand All @@ -147,21 +148,20 @@ async def get_latest_student_documents(student_data):

# Compile a list of the active students.
active_students = [s for s in student_data if 'writing_observer.writing_analysis.last_document' in s]

# Now collect documents for all of the active students.
document_keys = ([
learning_observer.stream_analytics.helpers.make_key(
writing_observer.writing_analysis.reconstruct,
{
KeyField.STUDENT: s['user_id'],
EventField('doc_id'): s.get('writing_observer.writing_analysis.last_document', {}).get('document_id', None)
EventField('doc_id'): get_last_document_id(s)
},
KeyStateType.INTERNAL
) for s in active_students])
) for s in active_students])

kvs_data = await kvs.multiget(keys=document_keys)


# Return blank entries if no data, rather than None. This makes it possible
# to use item.get with defaults sanely. For the sake of later alignment
# we also zip up the items with the keys and users that they come from
Expand All @@ -170,16 +170,17 @@ async def get_latest_student_documents(student_data):
writing_data = []
for idx in range(len(document_keys)):
student = active_students[idx]
doc = kvs_data[idx]
doc = kvs_data[idx]

# If we have an empty item we simply return an empty dict with the
# student but an empty doc value.
if (doc is None): doc = {}
if (doc is None):
doc = {}

# Now insert the student data and pass it along.
doc['student'] = student
writing_data.append(doc)

return writing_data


Expand Down Expand Up @@ -219,34 +220,90 @@ async def merge_with_student_data(writing_data, student_data):
processor = writing_observer.stub_nlp.process_texts


async def latest_data(student_data, options=None):
def get_last_document_id(s):
"""
Retrieves the ID of the latest document for a given student.
:param s: The student data.
:return: The ID of the latest document.
"""
return s.get('writing_observer.writing_analysis.last_document', {}).get('document_id', None)


async def update_reconstruct_data_with_google_api(runtime, student_data):
"""
This function updates the text reconstruction writing data from the extension with the
ground truth data from the Google Docs API.
:param runtime: The runtime for the application
:param student_data: A list of students
:return: A list of writing data, one for each student
"""
@learning_observer.cache.async_memoization()
async def fetch_doc_from_google(student):
"""
This function retrieves a single document text from Google based on the document ID.
:param student: A student object
:return: The text of the latest document
"""
import learning_observer.google

kvs = learning_observer.kvs.KVS()

docId = get_last_document_id(student)
# fetch text
text = await learning_observer.google.doctext(runtime, documentId=docId)
# set reconstruction data to ground truth
key = learning_observer.stream_analytics.helpers.make_key(
writing_observer.writing_analysis.reconstruct,
{
KeyField.STUDENT: student['user_id'],
EventField('doc_id'): docId
},
KeyStateType.INTERNAL
)
await kvs.set(key, text)
return text

# For each student, retrieve the document text from Google and store it in a list
writing_data = [
await fetch_doc_from_google(s)
if get_last_document_id(s) is not None else {}
for s in student_data
]
return writing_data


async def latest_data(runtime, student_data, options=None):
'''
HACK HACK HACK

This code needs to take the student data as a dict and then
collect the latest writing data for each student (assuming
they have it). The code then passes that writing data on
to Paul's code for processing. For the time being this
works by essentially building up some large dicts that
contain the text and student data together.

In the long run this should *all* be replaced by a cleaner
object interface that hides some of this from the user
but for the now we'll roll with this.
Retrieves the latest writing data for a set of students.


:param runtime: The runtime object from the server
# for annotated_text, single_doc in zip(annotated_texts, writing_data):
:param student_data: The student data.
# if annotated_text != "Error":
:param options: Additional options to pass to the text processing pipeline.
# single_doc.update(annotated_text)
:return: The latest writing data.
'''

# HACK we have a cache downstream that relies on redis_ephemeral being setup
# when that is resolved, we can remove the feature flag
# Update reconstruct data from KVS with ground truth from Google API
if learning_observer.settings.module_setting('writing_observer', 'use_google_documents', False):
await update_reconstruct_data_with_google_api(runtime, student_data)

# Get the latest documents with the students appended.
writing_data = await get_latest_student_documents(student_data)

# Strip out the unnecessary extra data.
writing_data = await remove_extra_data(writing_data)

# print(">>> WRITE DATA-premerge: {}".format(writing_data))

# This is the error. Skipping now.
writing_data_merge = await merge_with_student_data(writing_data, student_data)
# print(">>> WRITE DATA-postmerge: {}".format(writing_data_merge))


# #print(">>>> PRINT WRITE DATA: Merge")
# #print(writing_data)

Expand All @@ -260,5 +317,5 @@ async def latest_data(student_data, options=None):

writing_data = await merge_with_student_data(writing_data, student_data)
writing_data = await processor(writing_data, options)

return {'latest_writing_data': writing_data}