diff --git a/odata/batchcontext.py b/odata/batchcontext.py new file mode 100644 index 0000000..44c84ed --- /dev/null +++ b/odata/batchcontext.py @@ -0,0 +1,276 @@ +# -*- coding: utf-8 -*- + +from odata.query import Query +from odata.context import Context +from odata.exceptions import ODataError +from uuid import uuid4 as uuid +from odata.changeset import ChangeSet, Change, ChangeAction + +class BatchContext(Context): + def __init__(self, service, session=None, auth=None): + super().__init__(session=session, auth=auth) + self.boundary = 'batch_%s' % (uuid()) + self.batch = True + self._parts = [] + self._changeset = None + self._content_id_to_entity_map = [] + self._service = service + + def open_changeset(self): + if self._changeset is not None: + raise Exception('Close the current change set first before opening a new one. They cannot be nested.') + self._changeset = ChangeSet() + self._parts.append(self._changeset) + + def close_changeset(self): + if self._changeset is None: + raise Exception('Open a change set first before closing one.') + self._changeset = None + + def reset(self): + self._parts = [] + self._content_id_to_entity_map = [] + self._changeset = None + + def execute(self): + if self._changeset is not None: + raise Exception('Call close_changeset before executing the batch request') + + content_id_to_entity_map = self._content_id_to_entity_map.copy() # store for later use + pl = self._get_payload() + + self.reset() # reset know in case something fails. Prevent unknown state + + url = self._service.url + '$batch' + + headers = self.connection.base_headers.copy() + headers.update({ + 'Content-Type': 'multipart/mixed;boundary=%s' % (self.boundary), + }) + response = self.connection.execute_post_raw( + url, + headers, + pl, + ) + + # print(response) # TODO: remove print + + post_processed = self._apply_response_to_entities(response, content_id_to_entity_map) + return { + 'entities': post_processed['entities'], + 'responses': post_processed['response_map'], + 'response_raw': response, + 'id_to_entity': content_id_to_entity_map, + } + + def _apply_response_to_entities(self, response, content_id_to_entity_map): + m = content_id_to_entity_map + entities = [] + response_map = [] + for entity, content_id in m: + saved_data = {} + error_msg = None + error_code = None + + resp_for_entity = [x for x in response['responses'] if x['id'] == content_id] + if resp_for_entity is None or len(resp_for_entity) != 1: + error_code = 500 + error_msg = 'Server sent no error message. There might be errors in previous operations of the same batch.' + else: + resp_for_entity = resp_for_entity[0] + + if resp_for_entity['status'] < 200 or resp_for_entity['status'] >= 300: + error_code = resp_for_entity['status'] + error_msg = "HTTP %s for changeset '%s' and content_id '%s' with error %s" % ( + resp_for_entity['status'], + resp_for_entity['atomicityGroup'], + resp_for_entity['id'], + resp_for_entity.get('body', {}).get('error', {}).get('message', 'Server sent no error message') + ) + + if error_msg is None: + saved_data = resp_for_entity['body'] + for k in list(saved_data.keys()): + # remove odata annotations in the response + if k.startswith('@odata.'): + saved_data.pop(k) + + entity.__odata__.reset() # reset dirty flags etc + entity.__odata__.update(saved_data) + + response_map.append((entity, resp_for_entity['status'], None)) + else: + response_map.append((entity, error_code, error_msg)) + + entities.append(entity) + + return { + 'entities': entities, + 'response_map': response_map, + } + + def _get_payload(self): + parts_str = [ + '--%s' % (self.boundary), + ] + for part in self._parts: + pl = part.get_payload() + parts_str.append(pl) + + parts_str.append('--%s--' % self.boundary) + return '\n'.join(parts_str) + + + def query(self, entitycls): + raise NotImplementedError('calling an action/function in a batch operation is not implemented') + # if self._changeset is not None: + # raise Exception('Cannot read data within a change set. Call close_changeset first') + # q = Query(entitycls, connection=self.connection) + # return q + + def call(self, action_or_function, **parameters): + raise NotImplementedError('calling an action/function in a batch operation is not implemented') + + def call_with_query(self, action_or_function, query, **parameters): + raise NotImplementedError('calling an action/function with query in a batch operation is not implemented') + + def save(self, entity, force_refresh=True, parent_resource=None) -> str: + """ + Creates a POST or PATCH call to the service. If the entity already has + a primary key, an update is called. Otherwise the entity is inserted + as new. Updating an entity will only send the changed values + + :param entity: Model instance to insert or update + :type entity: EntityBase + :param force_refresh: Read full entity data again from service after PATCH call + :raises ODataConnectionError: Invalid data or serverside error. Server returned an HTTP error code + """ + + if self.is_entity_saved(entity): + if parent_resource is not None: + raise ValueError(( + "Cannot provide a parent_resource for a non-insert operation for entity %s. " + "This feature is only used to reference entities " + "that are created in the same batch request." + ) % (entity.__repr__)) + return self._update_existing(entity, force_refresh=force_refresh) + else: + return self._insert_new(entity, parent_resource=parent_resource) + + def delete(self, entity): + """ + Creates a DELETE call to the service, deleting the entity + + :type entity: EntityBase + :raises ODataConnectionError: Delete not allowed or a serverside error. Server returned an HTTP error code + """ + if self._changeset is None: + raise Exception('Call open_changeset before doing data modification requests') + + raise Exception('Delete still needs to be implemented') + # TODO: + + # self.log.info(u'Deleting entity: {0}'.format(entity)) + # # url = entity.__odata__.instance_url + # url = entity.__odata__.instance_url[len(self._service.url) - 1:] + # self.connection.execute_delete(url) + # entity.__odata__.persisted = False + # self.log.info(u'Success') + + def _insert_new(self, entity, parent_resource=None): + """ + Creates a POST call to the service, sending the complete new entity + + :type entity: EntityBase + :type parent_resource: EntityBase another entity that is also created in the same changeset and that + we want to reference (e.g. create an Author first and then create some books from this author. Author would be the parent.) + """ + if self._changeset is None: + raise Exception('Call open_changeset before doing data modification requests') + + es = entity.__odata__ + insert_data = es.data_for_insert() + + if parent_resource is None: + url = entity.__odata_url__()[len(self._service.url) - 1:] + else: + es_p = parent_resource.__odata__ + entity_type = entity.__odata_schema__['type'] + parent_entity_type = parent_resource.__odata_schema__['type'] + + parent_nav_prop = [x for x in es_p.navigation_properties if x[1].navigated_property_type == entity_type][0][1] + + content_id = [x for x in self._content_id_to_entity_map if x[0] is parent_resource][0][1] + # use $/navProperty as url + url = '$%s/%s' % (content_id, parent_nav_prop.name) + + # via the url we tell odata that we want to create a sub-entity (e.g. Author = parent and Book = sub). + # In case the book has a reference to author (e.g. author_ID) we need to remove it as it has no value and + # defaults to a "null"-value if not set. However, we just dont want to send any value (not even null) for this field + nav_prop = [x for x in es.navigation_properties if x[1].navigated_property_type == parent_entity_type] + if nav_prop and len(nav_prop) > 0: + fk = nav_prop[0][1].foreign_key + if fk is not None and fk in insert_data: + # remove if it exists in the dict + insert_data.pop(fk) + + if url is None: + msg = 'Cannot insert Entity that does not belong to EntitySet: {0}'.format(entity) + raise ODataError(msg) + + def cb(self, saved_data): + es.reset() + es.connection = self.connection + es.persisted = True + if saved_data is not None: + es.update(saved_data) + self.log.info(u'Success') + + content_id = self._changeset.add_change(Change( + url, + insert_data, + ChangeAction.CREATE, + ), callback=cb) + + self._content_id_to_entity_map.append((entity, content_id)) + return content_id + + def _update_existing(self, entity, force_refresh=True): + """ + Creates a PATCH call to the service, sending only the modified values + + :type entity: EntityBase + """ + if self._changeset is None: + raise Exception('Call open_changeset before doing data modification requests') + + es = entity.__odata__ + if es.instance_url is None: + msg = 'Cannot update Entity that does not belong to EntitySet: {0}'.format(entity) + raise ODataError(msg) + + patch_data = es.data_for_update() + + if len([i for i in patch_data if not i.startswith('@')]) == 0: + self.log.debug(u'Nothing to update: {0}'.format(entity)) + return + + # url = es.instance_url + + url = es.instance_url[len(self._service.url) - 1:] + + def cb(self, saved_data): + es.reset() + if saved_data is not None and force_refresh: + saved_data = self.connection.execute_get(url) + if saved_data is not None: + entity.__odata__.update(saved_data) + self.log.info(u'Success') + + content_id = self._changeset.add_change(Change( + url, + patch_data, + ChangeAction.UPDATE, + ), callback=cb) + + self._content_id_to_entity_map.append((entity, content_id)) diff --git a/odata/changeset.py b/odata/changeset.py new file mode 100644 index 0000000..dd3636d --- /dev/null +++ b/odata/changeset.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- + +import json +import enum +from uuid import uuid4 as uuid +import socket + +class ChangeAction: + CREATE = 'POST' + UPDATE = 'PATCH' + DELETE = 'DELETE' + +class Change: + def __init__(self, url: str, data, action: ChangeAction): + self.content_id = None + self.base_headers = { + 'Content-Type': 'application/http', + 'Content-Transfer-Encoding': 'binary', + } + self.data = data + self.method = action + self.url = url + + def get_content_id(self): + return self.content_id + + def set_content_id(self, content_id: str): + self.content_id = content_id + return self + + def get_payload(self): + headers = self.base_headers.copy() + headers.update({ + 'Content-ID': self.content_id, + }) + + parts = [] + + for key, value in headers.items(): + parts.append('%s: %s' % (key, value)) + parts.append('') + + parts.append('%s %s HTTP/1.1' % (self.method, self.url)) + parts.append('Host: %s' % socket.gethostname()) + parts.append('Content-Type: application/json;type=entry') + parts.append('') + parts.append(json.dumps(self.data, indent=2, ensure_ascii=False)) + + return '\n'.join(parts) + + +class ChangeSet: + def __init__(self): + self.boundary = 'changeset_%s' % (uuid()) + self._changes = [] + self._callbacks = [] + + def add_change(self, change: Change, callback=None) -> str: + self._changes.append(change) + self._callbacks.append(callback) + + change_content_id = '%s-%s' % (self.boundary, len(self._changes)) + change.set_content_id(change_content_id) + return change_content_id + + def get_boundary(self): + return self.boundary + + def get_payload(self): + parts = [ + 'Content-Type: multipart/mixed;boundary=%s' % (self.get_boundary()), + '', + ] + + for change in self._changes: + parts.append('--%s' % (self.get_boundary())) + parts.append(change.get_payload()) + + parts.append('--%s--' % (self.get_boundary())) + + return '\n'.join(parts) diff --git a/odata/connection.py b/odata/connection.py index 995557c..d332abe 100644 --- a/odata/connection.py +++ b/odata/connection.py @@ -136,6 +136,21 @@ def execute_post(self, url, data, params=None): return response.json() # no exceptions here, POSTing to Actions may not return data + def execute_post_raw(self, url, headers, data: str, params=None): + headers = {**ODataConnection.base_headers, **headers} + + self.log.info(u'POST {0}'.format(url)) + self.log.info(u'Payload: {0}'.format(data)) + data = data.replace('\n', '\r\n') + response = self._do_post(url, data=data, headers=headers, params=params) + self._handle_odata_error(response) + response_ct = response.headers.get('content-type', '') + + if response.status_code == requests.codes.no_content: + return + if 'application/json' in response_ct: + return response.json() + def execute_patch(self, url, data): headers = { 'Content-Type': 'application/json', diff --git a/odata/context.py b/odata/context.py index d85e47d..25bea65 100644 --- a/odata/context.py +++ b/odata/context.py @@ -12,6 +12,7 @@ class Context: def __init__(self, session=None, auth=None): self.log = logging.getLogger('odata.context') self.connection = ODataConnection(session=session, auth=auth) + self.batch = False def query(self, entitycls): q = Query(entitycls, connection=self.connection) diff --git a/odata/metadata.py b/odata/metadata.py index c794d66..b1bdfc4 100644 --- a/odata/metadata.py +++ b/odata/metadata.py @@ -79,6 +79,7 @@ def _set_object_relationships(self, all_types): _search_entity, collection=is_collection, foreign_key=foreign_key, + navigated_property_type=_search_type ) setattr(entity, name, nav) diff --git a/odata/navproperty.py b/odata/navproperty.py index 4c5527c..4e998a4 100644 --- a/odata/navproperty.py +++ b/odata/navproperty.py @@ -42,11 +42,12 @@ class NavigationProperty(object): A Property-like object for marking relationships between entities, but does not inherit from PropertyBase. """ - def __init__(self, name, entitycls, collection=False, foreign_key=None): + def __init__(self, name, entitycls, collection=False, foreign_key=None, navigated_property_type=None): from odata.property import PropertyBase self.name = name self.entitycls = entitycls self.is_collection = collection + self.navigated_property_type = navigated_property_type if isinstance(foreign_key, PropertyBase): self.foreign_key = foreign_key.name else: diff --git a/odata/service.py b/odata/service.py index 6238462..f28f05d 100644 --- a/odata/service.py +++ b/odata/service.py @@ -59,6 +59,7 @@ from .metadata import MetaData from .exceptions import ODataError from .context import Context +from .batchcontext import BatchContext from .action import Action, Function __all__ = ( @@ -82,6 +83,7 @@ def __init__(self, url, base=None, reflect_entities=False, session=None, auth=No self.collections = {} self.log = logging.getLogger('odata.service') self.default_context = Context(auth=auth, session=session) + self.context = self.default_context self.entities = {} """ @@ -147,17 +149,28 @@ def __init__(self, url, base=None, reflect_entities=False, session=None, auth=No def __repr__(self): return u''.format(self.url) - def create_context(self, auth=None, session=None): + def create_context(self, auth=None, session=None, batch=False): """ Create new context to use for session-like usage :param auth: Custom Requests auth object to use for credentials :param session: Custom Requests session to use for communication with the endpoint + :param batch: Set to true if you want to use odata $batch requests :return: Context instance :rtype: Context """ + if batch == True: + return BatchContext(self, auth=auth, session=session) return Context(auth=auth, session=session) + def use_context(self, context): + if context is None: + return self.use_default_context() + self.context = context + + def use_default_context(self): + self.context = self.default_context + def describe(self, entity): """ Print a debug screen of an entity instance @@ -168,7 +181,7 @@ def describe(self, entity): def is_entity_saved(self, entity): """Returns boolean indicating entity's status""" - return self.default_context.is_entity_saved(entity) + return self.context.is_entity_saved(entity) def query(self, entitycls): """ @@ -177,7 +190,7 @@ def query(self, entitycls): :param entitycls: Entity to query :return: Query object """ - return self.default_context.query(entitycls) + return self.context.query(entitycls) def delete(self, entity): """ @@ -186,9 +199,9 @@ def delete(self, entity): :type entity: EntityBase :raises ODataConnectionError: Delete not allowed or a serverside error. Server returned an HTTP error code """ - return self.default_context.delete(entity) + return self.context.delete(entity) - def save(self, entity, force_refresh=True): + def save(self, entity, force_refresh=True, parent_resource=None): """ Creates a POST or PATCH call to the service. If the entity already has a primary key, an update is called. Otherwise the entity is inserted @@ -198,4 +211,6 @@ def save(self, entity, force_refresh=True): :param force_refresh: Read full entity data again from service after PATCH call :raises ODataConnectionError: Invalid data or serverside error. Server returned an HTTP error code """ - return self.default_context.save(entity, force_refresh=force_refresh) + if self.context.batch: + return self.context.save(entity, force_refresh=force_refresh, parent_resource=parent_resource) + return self.context.save(entity, force_refresh=force_refresh) diff --git a/odata/state.py b/odata/state.py index 02cf05c..a89323d 100644 --- a/odata/state.py +++ b/odata/state.py @@ -170,47 +170,61 @@ def _clean_new_entity(self, entity): for _, prop in es.properties: if prop.is_computed_value: continue - insert_data[prop.name] = es[prop.name] - # Allow pk properties only if they have values for _, pk_prop in es.primary_key_properties: if insert_data[pk_prop.name] is None: insert_data.pop(pk_prop.name) # Deep insert from nav properties for prop_name, prop in es.navigation_properties: - if prop.foreign_key: - insert_data.pop(prop.foreign_key, None) - + # source, value = getattr(entity, prop_name, None) - """:type : None | odata.entity.EntityBase | list[odata.entity.EntityBase]""" - if value is not None: - - if prop.is_collection: - binds = [] - - # binds must be added first - for i in [i for i in value if i.__odata__.id]: - binds.append(i.__odata__.id) - - if len(binds): - insert_data['{0}@odata.bind'.format(prop.name)] = binds - - new_entities = [] - for i in [i for i in value if i.__odata__.id is None]: - new_entities.append(self._clean_new_entity(i)) - - if len(new_entities): - insert_data[prop.name] = new_entities - + if value is None: # no value provided, ignore this property + continue + if prop.is_collection: # to-many navigation (to a collection) + binds = [] + + # binds must be added first + for i in [i for i in value if i.__odata__.id]: + binds.append(i.__odata__.id) + + if len(binds): + insert_data['{0}@odata.bind'.format(prop.name)] = binds + + new_entities = [] + for i in [i for i in value if i.__odata__.id is None]: + new_entities.append(self._clean_new_entity(i)) + + if len(new_entities): + # TODO: our API does not support it but I assume thats a limitation of the server framework + # and not oData itself. + raise ValueError(( + "Deep insert/create for to-many collections is not supported. " + "Property '%s' must not be set or only reference alread existing entity instances." + ) % (prop_name)) + + # if len(new_entities): + # insert_data[prop.name] = new_entities + + else: # to-one navigation + reference_existing = insert_data.get(prop.foreign_key) is not None + if reference_existing: + raise ValueError("Cannot have a foreign key reference in '%s' to an existing entitiy instance and also provide a value for navigation property '%s'" % (prop.foreign_key, prop_name)) + + if value.__odata__.id: + # we are referencing an already created entity instance + raise ValueError(( + "Entity instance '%s' for navigation property already exists (id '%s') and hence " + "cannot be used to create a new instance. To assign an existing instance use " + "the ID as value for property '%s'." + ) % (prop_name, value.__odata__.id, prop.foreign_key)) + # keys = {} + # print(value.__odata__.id) + # for key in value.__odata__.primary_key_properties: + # keys[key[0]] = value.__odata__[value.__odata__.primary_key_properties[0][0]] + # insert_data[prop.name] = keys else: - if value.__odata__.id: - keys = {} - for key in value.__odata__.primary_key_properties: - keys[key[0]] = value.__odata__[value.__odata__.primary_key_properties[0][0]] - insert_data[prop.name] = keys - else: - insert_data[prop.name] = self._clean_new_entity(value) + insert_data[prop.name] = self._clean_new_entity(value) return insert_data diff --git a/odata/test.py b/odata/test.py new file mode 100644 index 0000000..e69de29