Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dotnet/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
<PackageVersion Include="Milvus.Client" Version="2.3.0-preview.1" />
<PackageVersion Include="Testcontainers.Milvus" Version="3.8.0" />
<PackageVersion Include="Microsoft.Data.SqlClient" Version="5.2.0" />
<PackageVersion Include="Qdrant.Client" Version="1.9.0" />
<!-- Symbols -->
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
<!-- Toolset -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ namespace Microsoft.SemanticKernel.Connectors.AzureAISearch;
public sealed class AzureAISearchVectorRecordStore<TRecord> : IVectorRecordStore<string, TRecord>
where TRecord : class
{
/// <summary>The name of this database for telemetry purposes.</summary>
private const string DatabaseName = "AzureAISearch";

/// <summary>A set of types that a key on the provided model may have.</summary>
private static readonly HashSet<Type> s_supportedKeyTypes =
[
Expand Down Expand Up @@ -100,7 +103,7 @@ public AzureAISearchVectorRecordStore(SearchIndexClient searchIndexClient, Azure
}

/// <inheritdoc />
public Task<TRecord> GetAsync(string key, GetRecordOptions? options = default, CancellationToken cancellationToken = default)
public Task<TRecord?> GetAsync(string key, GetRecordOptions? options = default, CancellationToken cancellationToken = default)
{
Verify.NotNullOrWhiteSpace(key);

Expand All @@ -126,7 +129,13 @@ public async IAsyncEnumerable<TRecord> GetBatchAsync(IEnumerable<string> keys, G
var searchClient = this.GetSearchClient(collectionName);
var tasks = keys.Select(key => this.GetDocumentAndMapToDataModelAsync(searchClient, collectionName, key, innerOptions, cancellationToken));
var results = await Task.WhenAll(tasks).ConfigureAwait(false);
foreach (var result in results) { yield return result; }
foreach (var result in results)
{
if (result is not null)
{
yield return result;
}
}
}

/// <inheritdoc />
Expand Down Expand Up @@ -203,32 +212,40 @@ public async IAsyncEnumerable<string> UpsertBatchAsync(IEnumerable<TRecord> reco
/// <param name="innerOptions">The azure ai search sdk options for getting a document.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>The retrieved document, mapped to the consumer data model.</returns>
private async Task<TRecord> GetDocumentAndMapToDataModelAsync(
private async Task<TRecord?> GetDocumentAndMapToDataModelAsync(
SearchClient searchClient,
string collectionName,
string key,
GetDocumentOptions innerOptions,
CancellationToken cancellationToken)
{
const string OperationName = "GetDocument";

// Use the user provided mapper.
if (this._options.MapperType == AzureAISearchRecordMapperType.JsonObjectCustomMapper)
{
var jsonObject = await RunOperationAsync(
collectionName,
"GetDocument",
() => searchClient.GetDocumentAsync<JsonObject>(key, innerOptions, cancellationToken)).ConfigureAwait(false);
OperationName,
() => GetDocumentWithNotFoundHandlingAsync<JsonObject>(searchClient, key, innerOptions, cancellationToken)).ConfigureAwait(false);

if (jsonObject is null)
{
return null;
}

return RunModelConversion(
return VectorStoreErrorHandler.RunModelConversion(
DatabaseName,
collectionName,
"GetDocument",
OperationName,
() => this._options.JsonObjectCustomMapper!.MapFromStorageToDataModel(jsonObject));
}

// Use the built in Azure AI Search mapper.
return await RunOperationAsync(
collectionName,
"GetDocument",
() => searchClient.GetDocumentAsync<TRecord>(key, innerOptions, cancellationToken)).ConfigureAwait(false);
OperationName,
() => GetDocumentWithNotFoundHandlingAsync<TRecord>(searchClient, key, innerOptions, cancellationToken)).ConfigureAwait(false);
}

/// <summary>
Expand All @@ -247,24 +264,27 @@ private Task<Response<IndexDocumentsResult>> MapToStorageModelAndUploadDocumentA
IndexDocumentsOptions innerOptions,
CancellationToken cancellationToken)
{
const string OperationName = "UploadDocuments";

// Use the user provided mapper.
if (this._options.MapperType == AzureAISearchRecordMapperType.JsonObjectCustomMapper)
{
var jsonObjects = RunModelConversion(
var jsonObjects = VectorStoreErrorHandler.RunModelConversion(
DatabaseName,
collectionName,
"UploadDocuments",
OperationName,
() => records.Select(this._options.JsonObjectCustomMapper!.MapFromDataToStorageModel));

return RunOperationAsync(
collectionName,
"UploadDocuments",
OperationName,
() => searchClient.UploadDocumentsAsync<JsonObject>(jsonObjects, innerOptions, cancellationToken));
}

// Use the built in Azure AI Search mapper.
return RunOperationAsync(
collectionName,
"UploadDocuments",
OperationName,
() => searchClient.UploadDocumentsAsync<TRecord>(records, innerOptions, cancellationToken));
}

Expand Down Expand Up @@ -321,6 +341,31 @@ private GetDocumentOptions ConvertGetDocumentOptions(GetRecordOptions? options)
return innerOptions;
}

/// <summary>
/// Get a document with the given key, and return null if it is not found.
/// </summary>
/// <typeparam name="T">The type to deserialize the document to.</typeparam>
/// <param name="searchClient">The search client to use when fetching the document.</param>
/// <param name="key">The key of the record to get.</param>
/// <param name="innerOptions">The azure ai search sdk options for getting a document.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>The retrieved document, mapped to the consumer data model, or null if not found.</returns>
private static async Task<T?> GetDocumentWithNotFoundHandlingAsync<T>(
SearchClient searchClient,
string key,
GetDocumentOptions innerOptions,
CancellationToken cancellationToken)
{
try
{
return await searchClient.GetDocumentAsync<T>(key, innerOptions, cancellationToken).ConfigureAwait(false);
}
catch (RequestFailedException ex) when (ex.Status == 404)
{
return default;
}
}

/// <summary>
/// Run the given operation and wrap any <see cref="RequestFailedException"/> with <see cref="VectorStoreOperationException"/>."/>
/// </summary>
Expand All @@ -341,7 +386,7 @@ private static async Task<T> RunOperationAsync<T>(string collectionName, string

// Using Open Telemetry standard for naming of these entries.
// https://opentelemetry.io/docs/specs/semconv/attributes-registry/db/
wrapperException.Data.Add("db.system", "AzureAISearch");
wrapperException.Data.Add("db.system", DatabaseName);
Comment thread
westey-m marked this conversation as resolved.
wrapperException.Data.Add("db.collection.name", collectionName);
wrapperException.Data.Add("db.operation.name", operationName);

Expand All @@ -353,35 +398,7 @@ private static async Task<T> RunOperationAsync<T>(string collectionName, string

// Using Open Telemetry standard for naming of these entries.
// https://opentelemetry.io/docs/specs/semconv/attributes-registry/db/
wrapperException.Data.Add("db.system", "AzureAISearch");
wrapperException.Data.Add("db.collection.name", collectionName);
wrapperException.Data.Add("db.operation.name", operationName);

throw wrapperException;
}
}

/// <summary>
/// Run the given model conversion and wrap any exceptions with <see cref="VectorStoreRecordMappingException"/>.
/// </summary>
/// <typeparam name="T">The response type of the operation.</typeparam>
/// <param name="collectionName">The name of the collection the operation is being run on.</param>
/// <param name="operationName">The type of database operation being run.</param>
/// <param name="operation">The operation to run.</param>
/// <returns>The result of the operation.</returns>
private static T RunModelConversion<T>(string collectionName, string operationName, Func<T> operation)
{
try
{
return operation.Invoke();
}
catch (Exception ex)
{
var wrapperException = new VectorStoreRecordMappingException("Failed to convert vector store record.", ex);

// Using Open Telemetry standard for naming of these entries.
// https://opentelemetry.io/docs/specs/semconv/attributes-registry/db/
wrapperException.Data.Add("db.system", "AzureAISearch");
wrapperException.Data.Add("db.system", DatabaseName);
wrapperException.Data.Add("db.collection.name", collectionName);
wrapperException.Data.Add("db.operation.name", operationName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

<ItemGroup>
<PackageReference Include="System.Text.Json" />
<PackageReference Include="Qdrant.Client" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) Microsoft. All rights reserved.

using Qdrant.Client.Grpc;

namespace Microsoft.SemanticKernel.Connectors.Qdrant;

/// <summary>
/// The types of mapper supported by <see cref="QdrantVectorRecordStore{TRecord}"/>.
/// </summary>
public enum QdrantRecordMapperType
{
/// <summary>
/// Use the default mapper that is provided by the semantic kernel SDK that uses json as an intermediary to allows automatic mapping to a wide variety of types.
/// </summary>
Default,

/// <summary>
/// Use a custom mapper between <see cref="PointStruct"/> and the data model.
/// </summary>
QdrantPointStructCustomMapper
}
Loading