Elasticsearch
Elasticsearch is a distributed, RESTful search and analytics engine, capable of performing both vector and lexical search. It is built on top of the Apache Lucene library.
This notebook shows how to use functionality related to the Elasticsearch
database.
%pip install --upgrade --quiet langchain-elasticsearch langchain-openai tiktoken langchain
Running and connecting to Elasticsearchβ
There are two main ways to setup an Elasticsearch instance for use with:
- Elastic Cloud: Elastic Cloud is a managed Elasticsearch service. Signup for a free trial.
To connect to an Elasticsearch instance that does not require login credentials (starting the docker instance with security enabled), pass the Elasticsearch URL and index name along with the embedding object to the constructor.
- Local Install Elasticsearch: Get started with Elasticsearch by running it locally. The easiest way is to use the official Elasticsearch Docker image. See the Elasticsearch Docker documentation for more information.
Running Elasticsearch via Dockerβ
Example: Run a single-node Elasticsearch instance with security disabled. This is not recommended for production use.
docker run -p 9200:9200 -e "discovery.type=single-node" -e "xpack.security.enabled=false" -e "xpack.security.http.ssl.enabled=false" docker.elastic.co/elasticsearch/elasticsearch:8.12.1
Once the Elasticsearch instance is running, you can connect to it using the Elasticsearch URL and index name along with the embedding object to the constructor.
Example:
from langchain_elasticsearch import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings
embedding = OpenAIEmbeddings()
elastic_vector_search = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="test_index",
embedding=embedding
)
Authenticationβ
For production, we recommend you run with security enabled. To connect with login credentials, you can use the parameters es_api_key
or es_user
and es_password
.
Example:
from langchain_elasticsearch import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings
embedding = OpenAIEmbeddings()
elastic_vector_search = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="test_index",
embedding=embedding,
es_user="elastic",
es_password="changeme"
)
You can also use an Elasticsearch
client object that gives you more flexibility, for example to configure the maximum number of retries.
Example:
import elasticsearch
from langchain_elasticsearch import ElasticsearchStore
es_client= elasticsearch.Elasticsearch(
hosts=["http://localhost:9200"],
es_user="elastic",
es_password="changeme"
max_retries=10,
)
embedding = OpenAIEmbeddings()
elastic_vector_search = ElasticsearchStore(
index_name="test_index",
es_connection=es_client,
embedding=embedding,
)
How to obtain a password for the default "elastic" user?β
To obtain your Elastic Cloud password for the default "elastic" user:
- Log in to the Elastic Cloud console at https://cloud.elastic.co
- Go to "Security" > "Users"
- Locate the "elastic" user and click "Edit"
- Click "Reset password"
- Follow the prompts to reset the password
How to obtain an API key?β
To obtain an API key:
- Log in to the Elastic Cloud console at https://cloud.elastic.co
- Open Kibana and go to Stack Management > API Keys
- Click "Create API key"
- Enter a name for the API key and click "Create"
- Copy the API key and paste it into the
api_key
parameter
Elastic Cloudβ
To connect to an Elasticsearch instance on Elastic Cloud, you can use either the es_cloud_id
parameter or es_url
.
Example:
from langchain_elasticsearch import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings
embedding = OpenAIEmbeddings()
elastic_vector_search = ElasticsearchStore(
es_cloud_id="<cloud_id>",
index_name="test_index",
embedding=embedding,
es_user="elastic",
es_password="changeme"
)
To use the OpenAIEmbeddings
we have to configure the OpenAI API Key in the environment.
import getpass
import os
os.environ["OPENAI_API_KEY"] = getpass.getpass("OpenAI API Key:")
Basic Exampleβ
This example we are going to load "state_of_the_union.txt" via the TextLoader, chunk the text into 500 word chunks, and then index each chunk into Elasticsearch.
Once the data is indexed, we perform a simple query to find the top 4 chunks that similar to the query "What did the president say about Ketanji Brown Jackson".
Elasticsearch is running locally on localhost:9200 with docker. For more details on how to connect to Elasticsearch from Elastic Cloud, see connecting with authentication above.
from langchain_elasticsearch import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import CharacterTextSplitter
loader = TextLoader("../../modules/state_of_the_union.txt")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=0)
docs = text_splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="http://localhost:9200",
index_name="test-basic",
)
db.client.indices.refresh(index="test-basic")
query = "What did the president say about Ketanji Brown Jackson"
results = db.similarity_search(query)
print(results)
[Document(page_content='One of the most serious constitutional responsibilities a President has is nominating someone to serve on the United States Supreme Court. \n\nAnd I did that 4 days ago, when I nominated Circuit Court of Appeals Judge Ketanji Brown Jackson. One of our nationβs top legal minds, who will continue Justice Breyerβs legacy of excellence.', metadata={'source': '../../modules/state_of_the_union.txt'}), Document(page_content='As I said last year, especially to our younger transgender Americans, I will always have your back as your President, so you can be yourself and reach your God-given potential. \n\nWhile it often appears that we never agree, that isnβt true. I signed 80 bipartisan bills into law last year. From preventing government shutdowns to protecting Asian-Americans from still-too-common hate crimes to reforming military justice.', metadata={'source': '../../modules/state_of_the_union.txt'}), Document(page_content='A former top litigator in private practice. A former federal public defender. And from a family of public school educators and police officers. A consensus builder. Since sheβs been nominated, sheβs received a broad range of supportβfrom the Fraternal Order of Police to former judges appointed by Democrats and Republicans. \n\nAnd if we are to advance liberty and justice, we need to secure the Border and fix the immigration system.', metadata={'source': '../../modules/state_of_the_union.txt'}), Document(page_content='This is personal to me and Jill, to Kamala, and to so many of you. \n\nCancer is the #2 cause of death in Americaβsecond only to heart disease. \n\nLast month, I announced our plan to supercharge \nthe Cancer Moonshot that President Obama asked me to lead six years ago. \n\nOur goal is to cut the cancer death rate by at least 50% over the next 25 years, turn more cancers from death sentences into treatable diseases. \n\nMore support for patients and families.', metadata={'source': '../../modules/state_of_the_union.txt'})]
Metadata
ElasticsearchStore
supports metadata to stored along with the document. This metadata dict object is stored in a metadata object field in the Elasticsearch document. Based on the metadata value, Elasticsearch will automatically setup the mapping by infering the data type of the metadata value. For example, if the metadata value is a string, Elasticsearch will setup the mapping for the metadata object field as a string type.
# Adding metadata to documents
for i, doc in enumerate(docs):
doc.metadata["date"] = f"{range(2010, 2020)[i % 10]}-01-01"
doc.metadata["rating"] = range(1, 6)[i % 5]
doc.metadata["author"] = ["John Doe", "Jane Doe"][i % 2]
db = ElasticsearchStore.from_documents(
docs, embeddings, es_url="http://localhost:9200", index_name="test-metadata"
)
query = "What did the president say about Ketanji Brown Jackson"
docs = db.similarity_search(query)
print(docs[0].metadata)
{'source': '../../modules/state_of_the_union.txt', 'date': '2016-01-01', 'rating': 2, 'author': 'John Doe'}
Filtering Metadataβ
With metadata added to the documents, you can add metadata filtering at query time.
Example: Filter by Exact keywordβ
Notice: We are using the keyword subfield thats not analyzed
docs = db.similarity_search(
query, filter=[{"term": {"metadata.author.keyword": "John Doe"}}]
)
print(docs[0].metadata)
{'source': '../../modules/state_of_the_union.txt', 'date': '2016-01-01', 'rating': 2, 'author': 'John Doe'}
Example: Filter by Partial Matchβ
This example shows how to filter by partial match. This is useful when you don't know the exact value of the metadata field. For example, if you want to filter by the metadata field author
and you don't know the exact value of the author, you can use a partial match to filter by the author's last name. Fuzzy matching is also supported.
"Jon" matches on "John Doe" as "Jon" is a close match to "John" token.
docs = db.similarity_search(
query,
filter=[{"match": {"metadata.author": {"query": "Jon", "fuzziness": "AUTO"}}}],
)
print(docs[0].metadata)
{'source': '../../modules/state_of_the_union.txt', 'date': '2016-01-01', 'rating': 2, 'author': 'John Doe'}
Example: Filter by Date Rangeβ
docs = db.similarity_search(
"Any mention about Fred?",
filter=[{"range": {"metadata.date": {"gte": "2010-01-01"}}}],
)
print(docs[0].metadata)
{'source': '../../modules/state_of_the_union.txt', 'date': '2012-01-01', 'rating': 3, 'author': 'John Doe', 'geo_location': {'lat': 40.12, 'lon': -71.34}}
Example: Filter by Numeric Rangeβ
docs = db.similarity_search(
"Any mention about Fred?", filter=[{"range": {"metadata.rating": {"gte": 2}}}]
)
print(docs[0].metadata)
{'source': '../../modules/state_of_the_union.txt', 'date': '2012-01-01', 'rating': 3, 'author': 'John Doe', 'geo_location': {'lat': 40.12, 'lon': -71.34}}
Example: Filter by Geo Distanceβ
Requires an index with a geo_point mapping to be declared for metadata.geo_location
.
docs = db.similarity_search(
"Any mention about Fred?",
filter=[
{
"geo_distance": {
"distance": "200km",
"metadata.geo_location": {"lat": 40, "lon": -70},
}
}
],
)
print(docs[0].metadata)
Filter supports many more types of queries than above.
Read more about them in the documentation.
Distance Similarity Algorithm
Elasticsearch supports the following vector distance similarity algorithms:
- cosine
- euclidean
- dot_product
The cosine similarity algorithm is the default.
You can specify the similarity Algorithm needed via the similarity parameter.
NOTE Depending on the retrieval strategy, the similarity algorithm cannot be changed at query time. It is needed to be set when creating the index mapping for field. If you need to change the similarity algorithm, you need to delete the index and recreate it with the correct distance_strategy.
db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="http://localhost:9200",
index_name="test",
distance_strategy="COSINE"
# distance_strategy="EUCLIDEAN_DISTANCE"
# distance_strategy="DOT_PRODUCT"
)
Retrieval Strategies
Elasticsearch has big advantages over other vector only databases from its ability to support a wide range of retrieval strategies. In this notebook we will configure ElasticsearchStore
to support some of the most common retrieval strategies.
By default, ElasticsearchStore
uses the ApproxRetrievalStrategy
.
ApproxRetrievalStrategyβ
This will return the top k
most similar vectors to the query vector. The k
parameter is set when the ElasticsearchStore
is initialized. The default value is 10
.
db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="http://localhost:9200",
index_name="test",
strategy=ElasticsearchStore.ApproxRetrievalStrategy(),
)
docs = db.similarity_search(
query="What did the president say about Ketanji Brown Jackson?", k=10
)
Example: Approx with hybridβ
This example will show how to configure ElasticsearchStore
to perform a hybrid retrieval, using a combination of approximate semantic search and keyword based search.
We use RRF to balance the two scores from different retrieval methods.
To enable hybrid retrieval, we need to set hybrid=True
in ElasticsearchStore
ApproxRetrievalStrategy
constructor.
db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="http://localhost:9200",
index_name="test",
strategy=ElasticsearchStore.ApproxRetrievalStrategy(
hybrid=True,
)
)
When hybrid
is enabled, the query performed will be a combination of approximate semantic search and keyword based search.
It will use rrf
(Reciprocal Rank Fusion) to balance the two scores from different retrieval methods.
Note RRF requires Elasticsearch 8.9.0 or above.
{
"knn": {
"field": "vector",
"filter": [],
"k": 1,
"num_candidates": 50,
"query_vector": [1.0, ..., 0.0],
},
"query": {
"bool": {
"filter": [],
"must": [{"match": {"text": {"query": "foo"}}}],
}
},
"rank": {"rrf": {}},
}
Example: Approx with Embedding Model in Elasticsearchβ
This example will show how to configure ElasticsearchStore
to use the embedding model deployed in Elasticsearch for approximate retrieval.
To use this, specify the model_id in ElasticsearchStore
ApproxRetrievalStrategy
constructor via the query_model_id
argument.
NOTE This requires the model to be deployed and running in Elasticsearch ml node. See notebook example on how to deploy the model with eland.
APPROX_SELF_DEPLOYED_INDEX_NAME = "test-approx-self-deployed"
# Note: This does not have an embedding function specified
# Instead, we will use the embedding model deployed in Elasticsearch
db = ElasticsearchStore(
es_cloud_id="<your cloud id>",
es_user="elastic",
es_password="<your password>",
index_name=APPROX_SELF_DEPLOYED_INDEX_NAME,
query_field="text_field",
vector_query_field="vector_query_field.predicted_value",
strategy=ElasticsearchStore.ApproxRetrievalStrategy(
query_model_id="sentence-transformers__all-minilm-l6-v2"
),
)
# Setup a Ingest Pipeline to perform the embedding
# of the text field
db.client.ingest.put_pipeline(
id="test_pipeline",
processors=[
{
"inference": {
"model_id": "sentence-transformers__all-minilm-l6-v2",
"field_map": {"query_field": "text_field"},
"target_field": "vector_query_field",
}
}
],
)
# creating a new index with the pipeline,
# not relying on langchain to create the index
db.client.indices.create(
index=APPROX_SELF_DEPLOYED_INDEX_NAME,
mappings={
"properties": {
"text_field": {"type": "text"},
"vector_query_field": {
"properties": {
"predicted_value": {
"type": "dense_vector",
"dims": 384,
"index": True,
"similarity": "l2_norm",
}
}
},
}
},
settings={"index": {"default_pipeline": "test_pipeline"}},
)
db.from_texts(
["hello world"],
es_cloud_id="<cloud id>",
es_user="elastic",
es_password="<cloud password>",
index_name=APPROX_SELF_DEPLOYED_INDEX_NAME,
query_field="text_field",
vector_query_field="vector_query_field.predicted_value",
strategy=ElasticsearchStore.ApproxRetrievalStrategy(
query_model_id="sentence-transformers__all-minilm-l6-v2"
),
)
# Perform search
db.similarity_search("hello world", k=10)
SparseVectorRetrievalStrategy (ELSER)β
This strategy uses Elasticsearch's sparse vector retrieval to retrieve the top-k results. We only support our own "ELSER" embedding model for now.
NOTE This requires the ELSER model to be deployed and running in Elasticsearch ml node.
To use this, specify SparseVectorRetrievalStrategy
in ElasticsearchStore
constructor.
# Note that this example doesn't have an embedding function. This is because we infer the tokens at index time and at query time within Elasticsearch.
# This requires the ELSER model to be loaded and running in Elasticsearch.
db = ElasticsearchStore.from_documents(
docs,
es_cloud_id="My_deployment:dXMtY2VudHJhbDEuZ2NwLmNsb3VkLmVzLmlvOjQ0MyQ2OGJhMjhmNDc1M2Y0MWVjYTk2NzI2ZWNkMmE5YzRkNyQ3NWI4ODRjNWQ2OTU0MTYzODFjOTkxNmQ1YzYxMGI1Mw==",
es_user="elastic",
es_password="GgUPiWKwEzgHIYdHdgPk1Lwi",
index_name="test-elser",
strategy=ElasticsearchStore.SparseVectorRetrievalStrategy(),
)
db.client.indices.refresh(index="test-elser")
results = db.similarity_search(
"What did the president say about Ketanji Brown Jackson", k=4
)
print(results[0])
page_content='One of the most serious constitutional responsibilities a President has is nominating someone to serve on the United States Supreme Court. \n\nAnd I did that 4 days ago, when I nominated Circuit Court of Appeals Judge Ketanji Brown Jackson. One of our nationβs top legal minds, who will continue Justice Breyerβs legacy of excellence.' metadata={'source': '../../modules/state_of_the_union.txt'}
ExactRetrievalStrategyβ
This strategy uses Elasticsearch's exact retrieval (also known as brute force) to retrieve the top-k results.
To use this, specify ExactRetrievalStrategy
in ElasticsearchStore
constructor.
db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="http://localhost:9200",
index_name="test",
strategy=ElasticsearchStore.ExactRetrievalStrategy()
)
BM25RetrievalStrategyβ
This strategy allows the user to perform searches using pure BM25 without vector search.
To use this, specify BM25RetrievalStrategy
in ElasticsearchStore
constructor.
Note that in the example below, the embedding option is not specified, indicating that the search is conducted without using embeddings.
from langchain_elasticsearch import ElasticsearchStore
db = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="test_index",
strategy=ElasticsearchStore.BM25RetrievalStrategy(),
)
db.add_texts(
["foo", "foo bar", "foo bar baz", "bar", "bar baz", "baz"],
)
results = db.similarity_search(query="foo", k=10)
print(results)
[Document(page_content='foo'), Document(page_content='foo bar'), Document(page_content='foo bar baz')]
Customise the Queryβ
With custom_query
parameter at search, you are able to adjust the query that is used to retrieve documents from Elasticsearch. This is useful if you want to use a more complex query, to support linear boosting of fields.
# Example of a custom query thats just doing a BM25 search on the text field.
def custom_query(query_body: dict, query: str):
"""Custom query to be used in Elasticsearch.
Args:
query_body (dict): Elasticsearch query body.
query (str): Query string.
Returns:
dict: Elasticsearch query body.
"""
print("Query Retriever created by the retrieval strategy:")
print(query_body)
print()
new_query_body = {"query": {"match": {"text": query}}}
print("Query thats actually used in Elasticsearch:")
print(new_query_body)
print()
return new_query_body
results = db.similarity_search(
"What did the president say about Ketanji Brown Jackson",
k=4,
custom_query=custom_query,
)
print("Results:")
print(results[0])
Query Retriever created by the retrieval strategy:
{'query': {'bool': {'must': [{'text_expansion': {'vector.tokens': {'model_id': '.elser_model_1', 'model_text': 'What did the president say about Ketanji Brown Jackson'}}}], 'filter': []}}}
Query thats actually used in Elasticsearch:
{'query': {'match': {'text': 'What did the president say about Ketanji Brown Jackson'}}}
Results:
page_content='One of the most serious constitutional responsibilities a President has is nominating someone to serve on the United States Supreme Court. \n\nAnd I did that 4 days ago, when I nominated Circuit Court of Appeals Judge Ketanji Brown Jackson. One of our nationβs top legal minds, who will continue Justice Breyerβs legacy of excellence.' metadata={'source': '../../modules/state_of_the_union.txt'}
Customize the Document Builder
With doc_builder
parameter at search, you are able to adjust how a Document is being built using data retrieved from Elasticsearch. This is especially useful if you have indices which were not created using Langchain.
from typing import Dict
from langchain_core.documents import Document
def custom_document_builder(hit: Dict) -> Document:
src = hit.get("_source", {})
return Document(
page_content=src.get("content", "Missing content!"),
metadata={
"page_number": src.get("page_number", -1),
"original_filename": src.get("original_filename", "Missing filename!"),
},
)
results = db.similarity_search(
"What did the president say about Ketanji Brown Jackson",
k=4,
doc_builder=custom_document_builder,
)
print("Results:")
print(results[0])
FAQ
Question: Im getting timeout errors when indexing documents into Elasticsearch. How do I fix this?β
One possible issue is your documents might take longer to index into Elasticsearch. ElasticsearchStore uses the Elasticsearch bulk API which has a few defaults that you can adjust to reduce the chance of timeout errors.
This is also a good idea when you're using SparseVectorRetrievalStrategy.
The defaults are:
chunk_size
: 500max_chunk_bytes
: 100MB
To adjust these, you can pass in the chunk_size
and max_chunk_bytes
parameters to the ElasticsearchStore add_texts
method.
vector_store.add_texts(
texts,
bulk_kwargs={
"chunk_size": 50,
"max_chunk_bytes": 200000000
}
)
Upgrading to ElasticsearchStore
If you're already using Elasticsearch in your langchain based project, you may be using the old implementations: ElasticVectorSearch
and ElasticKNNSearch
which are now deprecated. We've introduced a new implementation called ElasticsearchStore
which is more flexible and easier to use. This notebook will guide you through the process of upgrading to the new implementation.
What's new?β
The new implementation is now one class called ElasticsearchStore
which can be used for approx, exact, and ELSER search retrieval, via strategies.
Im using ElasticKNNSearchβ
Old implementation:
from langchain_community.vectorstores.elastic_vector_search import ElasticKNNSearch
db = ElasticKNNSearch(
elasticsearch_url="http://localhost:9200",
index_name="test_index",
embedding=embedding
)
New implementation:
from langchain_elasticsearch import ElasticsearchStore
db = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="test_index",
embedding=embedding,
# if you use the model_id
# strategy=ElasticsearchStore.ApproxRetrievalStrategy( query_model_id="test_model" )
# if you use hybrid search
# strategy=ElasticsearchStore.ApproxRetrievalStrategy( hybrid=True )
)
Im using ElasticVectorSearchβ
Old implementation:
from langchain_community.vectorstores.elastic_vector_search import ElasticVectorSearch
db = ElasticVectorSearch(
elasticsearch_url="http://localhost:9200",
index_name="test_index",
embedding=embedding
)
New implementation:
from langchain_elasticsearch import ElasticsearchStore
db = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="test_index",
embedding=embedding,
strategy=ElasticsearchStore.ExactRetrievalStrategy()
)
db.client.indices.delete(
index="test-metadata, test-elser, test-basic",
ignore_unavailable=True,
allow_no_indices=True,
)
ObjectApiResponse({'acknowledged': True})