Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
344 changes: 239 additions & 105 deletions composer.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/Database/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ abstract public function updateDocuments(Document $collection, Document $updates
* @param array<Change> $changes
* @return array<Document>
*/
abstract public function createOrUpdateDocuments(
abstract public function upsertDocuments(
Document $collection,
string $attribute,
array $changes
Expand Down
2 changes: 1 addition & 1 deletion src/Database/Adapter/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public function updateDocuments(Document $collection, Document $updates, array $
return $this->delegate(__FUNCTION__, \func_get_args());
}

public function createOrUpdateDocuments(Document $collection, string $attribute, array $changes): array
public function upsertDocuments(Document $collection, string $attribute, array $changes): array
{
return $this->delegate(__FUNCTION__, \func_get_args());
}
Expand Down
2 changes: 1 addition & 1 deletion src/Database/Adapter/SQL.php
Original file line number Diff line number Diff line change
Expand Up @@ -2090,7 +2090,7 @@ public function createDocuments(Document $collection, array $documents): array
* @return array<Document>
* @throws DatabaseException
*/
public function createOrUpdateDocuments(
public function upsertDocuments(
Document $collection,
string $attribute,
array $changes
Expand Down
108 changes: 79 additions & 29 deletions src/Database/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class Database
public const VAR_LINESTRING = 'linestring';
public const VAR_POLYGON = 'polygon';

public const SPATIAL_TYPES = [self::VAR_POINT,self::VAR_LINESTRING, self::VAR_POLYGON];
public const SPATIAL_TYPES = [self::VAR_POINT, self::VAR_LINESTRING, self::VAR_POLYGON];

// Index Types
public const INDEX_KEY = 'key';
Expand Down Expand Up @@ -3826,7 +3826,8 @@ public function createDocument(string $collection, Document $document): Document
* @param string $collection
* @param array<Document> $documents
* @param int $batchSize
* @param callable|null $onNext
* @param (callable(Document): void)|null $onNext
* @param (callable(Throwable): void)|null $onError
* @return int
* @throws AuthorizationException
* @throws StructureException
Expand All @@ -3838,6 +3839,7 @@ public function createDocuments(
array $documents,
int $batchSize = self::INSERT_BATCH_SIZE,
?callable $onNext = null,
?callable $onError = null,
): int {
if (!$this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument()) {
throw new DatabaseException('Shared tables must be enabled if tenant per document is enabled.');
Expand Down Expand Up @@ -3914,7 +3916,13 @@ public function createDocuments(

$document = $this->casting($collection, $document);
$document = $this->decode($collection, $document);
$onNext && $onNext($document);

try {
$onNext && $onNext($document);
} catch (\Throwable $e) {
$onError ? $onError($e) : throw $e;
}

$modified++;
}
}
Expand Down Expand Up @@ -4283,7 +4291,7 @@ public function updateDocument(string $collection, string $id, Document $documen

if ($document->offsetExists('$permissions')) {
$originalPermissions = $old->getPermissions();
$currentPermissions = $document->getPermissions();
$currentPermissions = $document->getPermissions();

sort($originalPermissions);
sort($currentPermissions);
Expand Down Expand Up @@ -4473,8 +4481,8 @@ public function updateDocument(string $collection, string $id, Document $documen
* @param Document $updates
* @param array<Query> $queries
* @param int $batchSize
* @param callable|null $onNext
* @param callable|null $onError
* @param (callable(Document $updated, Document $old): void)|null $onNext
* @param (callable(Throwable): void)|null $onError
* @return int
* @throws AuthorizationException
* @throws ConflictException
Expand Down Expand Up @@ -4597,12 +4605,12 @@ public function updateDocuments(
break;
}

$currentPermissions = $updates->getPermissions();
$old = array_map(fn ($doc) => clone $doc, $batch);
$currentPermissions = $updates->getPermissions();
sort($currentPermissions);

$this->withTransaction(function () use ($collection, $updates, &$batch, $currentPermissions) {
foreach ($batch as $index => $document) {

$skipPermissionsUpdate = true;

if ($updates->offsetExists('$permissions')) {
Expand Down Expand Up @@ -4647,13 +4655,12 @@ public function updateDocuments(
);
});

foreach ($batch as $doc) {
foreach ($batch as $index => $doc) {
$doc->removeAttribute('$skipPermissionsUpdate');

$this->purgeCachedDocument($collection->getId(), $doc->getId());
$doc = $this->decode($collection, $doc);
try {
$onNext && $onNext($doc);
$onNext && $onNext($doc, $old[$index]);
} catch (Throwable $th) {
$onError ? $onError($th) : throw $th;
}
Expand Down Expand Up @@ -5069,28 +5076,62 @@ private function getJunctionCollection(Document $collection, Document $relatedCo
: '_' . $relatedCollection->getSequence() . '_' . $collection->getSequence();
}

/**
* Create or update a document.
*
* @param string $collection
* @param Document $document
* @return Document
* @throws StructureException
* @throws Throwable
*/
public function upsertDocument(
string $collection,
Document $document,
): Document {
$result = null;

$this->upsertDocumentsWithIncrease(
$collection,
'',
[$document],
function (Document $doc, ?Document $_old = null) use (&$result) {
$result = $doc;
}
);

if ($result === null) {
// No-op (unchanged): return the current persisted doc
$result = $this->getDocument($collection, $document->getId());
}
return $result;
}

/**
* Create or update documents.
*
* @param string $collection
* @param array<Document> $documents
* @param int $batchSize
* @param callable|null $onNext
* @param (callable(Document, ?Document): void)|null $onNext
* @param (callable(Throwable): void)|null $onError
* @return int
* @throws StructureException
* @throws \Throwable
*/
public function createOrUpdateDocuments(
public function upsertDocuments(
string $collection,
array $documents,
int $batchSize = self::INSERT_BATCH_SIZE,
?callable $onNext = null,
?callable $onError = null
): int {
return $this->createOrUpdateDocumentsWithIncrease(
return $this->upsertDocumentsWithIncrease(
$collection,
'',
$documents,
$onNext,
$onError,
$batchSize
);
}
Expand All @@ -5101,18 +5142,20 @@ public function createOrUpdateDocuments(
* @param string $collection
* @param string $attribute
* @param array<Document> $documents
* @param callable|null $onNext
* @param (callable(Document, ?Document): void)|null $onNext
* @param (callable(Throwable): void)|null $onError
* @param int $batchSize
* @return int
* @throws StructureException
* @throws \Throwable
* @throws Exception
*/
public function createOrUpdateDocumentsWithIncrease(
public function upsertDocumentsWithIncrease(
string $collection,
string $attribute,
array $documents,
?callable $onNext = null,
?callable $onError = null,
int $batchSize = self::INSERT_BATCH_SIZE
): int {
if (empty($documents)) {
Expand Down Expand Up @@ -5144,7 +5187,7 @@ public function createOrUpdateDocumentsWithIncrease(

if ($document->offsetExists('$permissions')) {
$originalPermissions = $old->getPermissions();
$currentPermissions = $document->getPermissions();
$currentPermissions = $document->getPermissions();

sort($originalPermissions);
sort($currentPermissions);
Expand Down Expand Up @@ -5274,7 +5317,7 @@ public function createOrUpdateDocumentsWithIncrease(
/**
* @var array<Change> $chunk
*/
$batch = $this->withTransaction(fn () => Authorization::skip(fn () => $this->adapter->createOrUpdateDocuments(
$batch = $this->withTransaction(fn () => Authorization::skip(fn () => $this->adapter->upsertDocuments(
$collection,
$attribute,
$chunk
Expand All @@ -5290,7 +5333,7 @@ public function createOrUpdateDocumentsWithIncrease(
}
}

foreach ($batch as $doc) {
foreach ($batch as $index => $doc) {
if ($this->resolveRelationships) {
$doc = $this->silent(fn () => $this->populateDocumentRelationships($collection, $doc));
}
Expand All @@ -5305,7 +5348,13 @@ public function createOrUpdateDocumentsWithIncrease(
$this->purgeCachedDocument($collection->getId(), $doc->getId());
}

$onNext && $onNext($doc);
$old = $chunk[$index]->getOld();

try {
$onNext && $onNext($doc, $old->isEmpty() ? null : $old);
} catch (\Throwable $th) {
$onError ? $onError($th) : throw $th;
}
}
}

Expand Down Expand Up @@ -5972,8 +6021,8 @@ private function deleteCascade(Document $collection, Document $relatedCollection
* @param string $collection
* @param array<Query> $queries
* @param int $batchSize
* @param callable|null $onNext
* @param callable|null $onError
* @param (callable(Document, Document): void)|null $onNext
* @param (callable(Throwable): void)|null $onError
* @return int
* @throws AuthorizationException
* @throws DatabaseException
Expand Down Expand Up @@ -6065,6 +6114,7 @@ public function deleteDocuments(
break;
}

$old = array_map(fn ($doc) => clone $doc, $batch);
$sequences = [];
$permissionIds = [];

Expand Down Expand Up @@ -6101,7 +6151,7 @@ public function deleteDocuments(
);
});

foreach ($batch as $document) {
foreach ($batch as $index => $document) {
if ($this->getSharedTables() && $this->getTenantPerDocument()) {
$this->withTenant($document->getTenant(), function () use ($collection, $document) {
$this->purgeCachedDocument($collection->getId(), $document->getId());
Expand All @@ -6110,7 +6160,7 @@ public function deleteDocuments(
$this->purgeCachedDocument($collection->getId(), $document->getId());
}
try {
$onNext && $onNext($document);
$onNext && $onNext($document, $old[$index]);
} catch (Throwable $th) {
$onError ? $onError($th) : throw $th;
}
Expand Down Expand Up @@ -6520,7 +6570,7 @@ public static function addFilter(string $name, callable $encode, callable $decod
public function encode(Document $collection, Document $document): Document
{
$attributes = $collection->getAttribute('attributes', []);
$internalDateAttributes = ['$createdAt','$updatedAt'];
$internalDateAttributes = ['$createdAt', '$updatedAt'];
foreach ($this->getInternalAttributes() as $attribute) {
$attributes[] = $attribute;
}
Expand Down Expand Up @@ -6937,7 +6987,7 @@ public static function convertQuery(Document $collection, Query $query): Query
}
}

if (! $attribute->isEmpty()) {
if (!$attribute->isEmpty()) {
$query->setOnArray($attribute->getAttribute('array', false));

if ($attribute->getAttribute('type') == Database::VAR_DATETIME) {
Expand Down Expand Up @@ -7195,7 +7245,7 @@ public function decodeSpatialData(string $wkt): array
// POINT(x y)
if (str_starts_with($upper, 'POINT(')) {
$start = strpos($wkt, '(') + 1;
$end = strrpos($wkt, ')');
$end = strrpos($wkt, ')');
$inside = substr($wkt, $start, $end - $start);

$coords = explode(' ', trim($inside));
Expand All @@ -7205,7 +7255,7 @@ public function decodeSpatialData(string $wkt): array
// LINESTRING(x1 y1, x2 y2, ...)
if (str_starts_with($upper, 'LINESTRING(')) {
$start = strpos($wkt, '(') + 1;
$end = strrpos($wkt, ')');
$end = strrpos($wkt, ')');
$inside = substr($wkt, $start, $end - $start);

$points = explode(',', $inside);
Expand All @@ -7218,7 +7268,7 @@ public function decodeSpatialData(string $wkt): array
// POLYGON((x1,y1),(x2,y2))
if (str_starts_with($upper, 'POLYGON((')) {
$start = strpos($wkt, '((') + 2;
$end = strrpos($wkt, '))');
$end = strrpos($wkt, '))');
$inside = substr($wkt, $start, $end - $start);

$rings = explode('),(', $inside);
Expand Down
Loading