Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/sphinx/api_helpers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Bulk
----
.. autofunction:: bulk

Dense Vector packing
--------------------
.. autofunction:: pack_dense_vector

Scan
----
.. autofunction:: scan
Expand Down
2 changes: 2 additions & 0 deletions elasticsearch/helpers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
BULK_FLUSH,
bulk,
expand_action,
pack_dense_vector,
parallel_bulk,
reindex,
scan,
Expand All @@ -37,6 +38,7 @@
"expand_action",
"streaming_bulk",
"bulk",
"pack_dense_vector",
"parallel_bulk",
"scan",
"reindex",
Expand Down
21 changes: 21 additions & 0 deletions elasticsearch/helpers/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
# specific language governing permissions and limitations
# under the License.

import base64
import logging
import queue
import time
from enum import Enum
from operator import methodcaller
from typing import (
TYPE_CHECKING,
Any,
Callable,
Collection,
Expand All @@ -31,6 +33,7 @@
Mapping,
MutableMapping,
Optional,
Sequence,
Tuple,
Union,
)
Expand All @@ -43,6 +46,9 @@
from ..serializer import Serializer
from .errors import BulkIndexError, ScanError

if TYPE_CHECKING:
import numpy as np

logger = logging.getLogger("elasticsearch.helpers")


Expand Down Expand Up @@ -708,6 +714,21 @@ def _setup_queues(self) -> None:
pool.join()


def pack_dense_vector(vector: Union["np.ndarray", Sequence[float]]) -> str:
"""Helper function that packs a dense vector for efficient uploading.

:arg v: the list or numpy array to pack.
Comment on lines +717 to +720
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the name, because it means we can support more than numpy in the future if we want to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even now, you can pass a list of floats and it will handle it too (I'm converting the list to numpy inside the function though).

"""
import numpy as np

if type(vector) is not np.ndarray:
vector = np.array(vector, dtype=np.float32)
elif vector.dtype != np.float32:
raise ValueError("Only arrays of type float32 can be packed")
byte_array = vector.byteswap().tobytes()
return base64.b64encode(byte_array).decode()


def scan(
client: Elasticsearch,
query: Optional[Any] = None,
Expand Down
4 changes: 4 additions & 0 deletions examples/quotes/backend/quotes.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from elasticsearch import NotFoundError, OrjsonSerializer
from elasticsearch.dsl.pydantic import AsyncBaseESModel
from elasticsearch import dsl
from elasticsearch.helpers import pack_dense_vector

model = SentenceTransformer("all-MiniLM-L6-v2")
dsl.async_connections.create_connection(hosts=[os.environ['ELASTICSEARCH_URL']], serializer=OrjsonSerializer())
Expand All @@ -33,6 +34,9 @@ class Config:
class Index:
name = 'quotes'

def clean(self):
# pack the embedding for efficient uploading
self.embedding = pack_dense_vector(self.embedding)

class Tag(BaseModel):
tag: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
from elasticsearch.dsl.query import Match
from elasticsearch.dsl.types import MatchQuery
from elasticsearch.dsl.utils import AttrList
from elasticsearch.helpers import pack_dense_vector
from elasticsearch.helpers.errors import BulkIndexError

snowball = analyzer("my_snow", tokenizer="standard", filter=["lowercase", "snowball"])
Expand Down Expand Up @@ -868,10 +869,19 @@ class Doc(AsyncDocument):
byte_vector: List[int] = mapped_field(DenseVector(element_type="byte"))
bit_vector: List[int] = mapped_field(DenseVector(element_type="bit"))
numpy_float_vector: np.ndarray = mapped_field(NumpyDenseVector())
packed_float_vector: List[float] = mapped_field(DenseVector())
packed_numpy_float_vector: np.ndarray = mapped_field(NumpyDenseVector())

class Index:
name = "vectors"

def clean(self):
# pack the dense vectors before they are sent to Elasticsearch
self.packed_float_vector = pack_dense_vector(self.packed_float_vector)
self.packed_numpy_float_vector = pack_dense_vector(
self.packed_numpy_float_vector
)

await Doc._index.delete(ignore_unavailable=True)
await Doc.init()

Expand All @@ -884,6 +894,8 @@ class Index:
byte_vector=test_byte_vector,
bit_vector=test_bit_vector,
numpy_float_vector=np.array(test_float_vector),
packed_float_vector=test_float_vector,
packed_numpy_float_vector=np.array(test_float_vector, dtype=np.float32),
)
await doc.save(refresh=True)

Expand All @@ -894,6 +906,9 @@ class Index:
assert docs[0].bit_vector == test_bit_vector
assert type(docs[0].numpy_float_vector) is np.ndarray
assert [round(v, 1) for v in docs[0].numpy_float_vector] == test_float_vector
assert [round(v, 1) for v in docs[0].packed_float_vector] == test_float_vector
assert type(docs[0].packed_numpy_float_vector) is np.ndarray
assert [round(v, 1) for v in docs[0].packed_numpy_float_vector] == test_float_vector


@pytest.mark.anyio
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
from elasticsearch.dsl.query import Match
from elasticsearch.dsl.types import MatchQuery
from elasticsearch.dsl.utils import AttrList
from elasticsearch.helpers import pack_dense_vector
from elasticsearch.helpers.errors import BulkIndexError

snowball = analyzer("my_snow", tokenizer="standard", filter=["lowercase", "snowball"])
Expand Down Expand Up @@ -856,10 +857,19 @@ class Doc(Document):
byte_vector: List[int] = mapped_field(DenseVector(element_type="byte"))
bit_vector: List[int] = mapped_field(DenseVector(element_type="bit"))
numpy_float_vector: np.ndarray = mapped_field(NumpyDenseVector())
packed_float_vector: List[float] = mapped_field(DenseVector())
packed_numpy_float_vector: np.ndarray = mapped_field(NumpyDenseVector())

class Index:
name = "vectors"

def clean(self):
# pack the dense vectors before they are sent to Elasticsearch
self.packed_float_vector = pack_dense_vector(self.packed_float_vector)
self.packed_numpy_float_vector = pack_dense_vector(
self.packed_numpy_float_vector
)

Doc._index.delete(ignore_unavailable=True)
Doc.init()

Expand All @@ -872,6 +882,8 @@ class Index:
byte_vector=test_byte_vector,
bit_vector=test_bit_vector,
numpy_float_vector=np.array(test_float_vector),
packed_float_vector=test_float_vector,
packed_numpy_float_vector=np.array(test_float_vector, dtype=np.float32),
)
doc.save(refresh=True)

Expand All @@ -882,6 +894,9 @@ class Index:
assert docs[0].bit_vector == test_bit_vector
assert type(docs[0].numpy_float_vector) is np.ndarray
assert [round(v, 1) for v in docs[0].numpy_float_vector] == test_float_vector
assert [round(v, 1) for v in docs[0].packed_float_vector] == test_float_vector
assert type(docs[0].packed_numpy_float_vector) is np.ndarray
assert [round(v, 1) for v in docs[0].packed_numpy_float_vector] == test_float_vector


@pytest.mark.sync
Expand Down
166 changes: 166 additions & 0 deletions utils/dense-vector-benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import argparse
import asyncio
import json
import os
import time

import numpy as np

from elasticsearch import OrjsonSerializer
from elasticsearch.dsl import AsyncDocument, NumpyDenseVector, async_connections
from elasticsearch.dsl.types import DenseVectorIndexOptions
from elasticsearch.helpers import async_bulk, pack_dense_vector

async_connections.create_connection(
hosts=[os.environ["ELASTICSEARCH_URL"]], serializer=OrjsonSerializer()
)


class Doc(AsyncDocument):
title: str
text: str
emb: np.ndarray = NumpyDenseVector(
dtype=np.float32, index_options=DenseVectorIndexOptions(type="flat")
)

class Index:
name = "benchmark"


async def upload(
data_file: str, chunk_size: int, repetitions: int, pack: bool
) -> tuple[float, float]:
with open(data_file, "rt") as f:
# read the data file, which comes in ndjson format and convert it to JSON
json_data = "[" + f.read().strip().replace("\n", ",") + "]"
dataset = json.loads(json_data)

# replace the embedding lists with numpy arrays for performance
dataset = [
{
"docid": doc["docid"],
"title": doc["title"],
"text": doc["text"],
"emb": np.array(doc["emb"], dtype=np.float32),
}
for doc in dataset
]

# create mapping and index
if await Doc._index.exists():
await Doc._index.delete()
await Doc.init()
await Doc._index.refresh()

async def get_next_document():
for i in range(repetitions):
for doc in dataset:
yield {
"_index": "benchmark",
"_id": doc["docid"] + "_" + str(i),
"_source": {
"title": doc["title"],
"text": doc["text"],
"emb": doc["emb"],
},
}

async def get_next_document_packed():
for i in range(repetitions):
for doc in dataset:
yield {
"_index": "benchmark",
"_id": doc["docid"] + "_" + str(i),
"_source": {
"title": doc["title"],
"text": doc["text"],
"emb": pack_dense_vector(doc["emb"]),
},
}

start = time.time()
result = await async_bulk(
client=async_connections.get_connection(),
chunk_size=chunk_size,
actions=get_next_document_packed() if pack else get_next_document(),
stats_only=True,
)
duration = time.time() - start
assert result[1] == 0
return result[0], duration


async def main():
parser = argparse.ArgumentParser()
parser.add_argument("data_file", metavar="JSON_DATA_FILE")
parser.add_argument(
"--chunk-sizes",
"-s",
type=int,
nargs="+",
help="Chunk size(s) for bulk uploader",
)
parser.add_argument(
"--repetitions",
"-r",
type=int,
default=1,
help="Number of times the dataset is repeated (default: 1)",
)
parser.add_argument(
"--runs",
type=int,
default=3,
help="Number of runs that are averaged for each chunk size (default: 3)",
)
args = parser.parse_args()

for chunk_size in args.chunk_sizes:
print(f"Uploading '{args.data_file}' with chunk size {chunk_size}...")
runs = []
packed_runs = []
for _ in range(args.runs):
runs.append(
await upload(args.data_file, chunk_size, args.repetitions, False)
)
packed_runs.append(
await upload(args.data_file, chunk_size, args.repetitions, True)
)

# ensure that all runs uploaded the same number of documents
size = runs[0][0]
for run in runs:
assert run[0] == size
for run in packed_runs:
assert run[0] == size

dur = sum([run[1] for run in runs]) / len(runs)
packed_dur = sum([run[1] for run in packed_runs]) / len(packed_runs)

print(f"Size: {size}")
print(f"float duration: {dur:.02f}s / {size / dur:.02f} docs/s")
print(
f"float base64 duration: {packed_dur:.02f}s / {size / packed_dur:.02f} docs/s"
)
print(f"Speed up: {dur / packed_dur:.02f}x")


if __name__ == "__main__":
asyncio.run(main())