Integration: Elasticsearch
Use an Elasticsearch database with Haystack
Table of Contents
Haystack 2.0
The ElasticsearchDocumentStore
is maintained in
haystack-core-integrations repo. It allows you to use
Elasticsearch as data storage for your Haystack pipelines.
For a details on available methods, visit the API Reference
Installation
To run an Elasticsearch instance locally, first follow the installation and start up guides.
pip install elasticsearch-haystack
Usage
Once installed, you can start using your Elasticsearch database with Haystack by initializing it:
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
document_store = ElasticsearchDocumentStore(hosts = "http://localhost:9200")
Writing Documents to ElasticsearchDocumentStore
To write documents to your ElasticsearchDocumentStore
, create an indexing pipeline with a
DocumentWriter, or use the write_documents()
function.
For this step, you can use the available
TextFileToDocument and
DocumentSplitter, as well as other
Integrations that might help you fetch data from other resources.
Indexing Pipeline
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
from haystack.pipeline import Pipeline
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.components.converters import TextFileToDocument
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.writers import DocumentWriter
document_store = ElasticsearchDocumentStore(hosts = "http://localhost:9200")
converter = TextFileToDocument()
splitter = DocumentSplitter()
doc_embedder = SentenceTransformersDocumentEmbedder(model="sentence-transformers/multi-qa-mpnet-base-dot-v1")
writer = DocumentWriter(document_store)
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("converter", converter)
indexing_pipeline.add_component("splitter", splitter)
indexing_pipeline.add_component("doc_embedder", doc_embedder)
indexing_pipeline.add_component("writer", writer)
indexing_pipeline.connect("converter", "splitter")
indexing_pipeline.connect("splitter", "doc_embedder")
indexing_pipeline.connect("doc_embedder", "writer")
indexing_pipeline.run({
"converter":{"sources":["filename.txt"]}
})
Using Elasticsearch in a Query Pipeline
Once you have documents in your ElasticsearchDocumentStore
, it’s ready to be used with with
ElasticsearchEmbeddingRetriever in the retrieval step of any Haystack pipeline such as a Retrieval Augmented Generation (RAG) pipelines. Learn more about
Retrievers to make use of vector search within your LLM pipelines.
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
from haystack.pipeline import Pipeline
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack_integrations.components.retrievers.elasticsearch import ElasticsearchEmbeddingRetriever
model = "sentence-transformers/multi-qa-mpnet-base-dot-v1"
document_store = ElasticsearchDocumentStore(hosts = "http://localhost:9200")
retriever = ElasticsearchEmbeddingRetriever(document_store=document_store)
text_embedder = SentenceTransformersTextEmbedder(model=model)
query_pipeline = Pipeline()
query_pipeline.add_component("text_embedder", text_embedder)
query_pipeline.add_component("retriever", retriever)
query_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
result = query_pipeline.run({"text_embedder": {"text": "historical places in Instanbul"}})
print(result)
Haystack 1.x
The ElasticsearchDocumentStore
is maintained within the core Haystack project. It allows you to use
Elasticsearch as data storage for your Haystack pipelines.
For a details on available methods, visit the API Reference
Installation (1.x)
To run an Elasticsearch instance locally, first follow the installation and start up guides.
pip install farm-haystack[elasticsearch]
To install Elasticsearch 7, you can run pip install farm-haystac[elasticsearch7]
.
Usage (1.x)
Once installed, you can start using your Elasticsearch database with Haystack by initializing it:
from haystack.document_stores import ElasticsearchDocumentStore
document_store = ElasticsearchDocumentStore(host = "localhost",
port = 9200,
embedding_dim = 768)
Writing Documents to ElasticsearchDocumentStore
To write documents to your ElasticsearchDocumentStore
, create an indexing pipeline, or use the write_documents()
function.
For this step, you may make use of the available
FileConverters and
PreProcessors, as well as other
Integrations that might help you fetch data from other resources.
Indexing Pipeline
from haystack import Pipeline
from haystack.document_stores import ElasticsearchDocumentStore
from haystack.nodes import TextConverter, PreProcessor
document_store = ElasticsearchDocumentStore(host = "localhost", port = 9200)
converter = TextConverter()
preprocessor = PreProcessor()
indexing_pipeline = Pipeline()
indexing_pipeline.add_node(component=converter, name="TextConverter", inputs=["File"])
indexing_pipeline.add_node(component=preprocessor, name="PreProcessor", inputs=["TextConverter"])
indexing_pipeline.add_node(component=document_store, name="DocumentStore", inputs=["PreProcessor"])
indexing_pipeline.run(file_paths=["filename.txt"])
Using Elasticsearch in a Query Pipeline
Once you have documents in your ElasitsearchDocumentStore
, it’s ready to be used in any Haystack pipeline. Such as a Retrieval Augmented Generation (RAG) pipeline. Learn more about
Retrievers to make use of vector search within your LLM pipelines.
from haystack import Pipeline
from haystack.document_stores import ElasticsearchDocumentStore
from haystack.nodes import EmbeddingRetriever, PromptNode
document_store = ElasticsearchDocumentStore()
retriever = EmbeddingRetriever(document_store = document_store,
embedding_model="sentence-transformers/multi-qa-mpnet-base-dot-v1")
prompt_node = PromptNode(model_name_or_path = "google/flan-t5-xl", default_prompt_template = "deepset/question-answering")
query_pipeline = Pipeline()
query_pipeline.add_node(component=retriever, name="Retriever", inputs=["Query"])
query_pipeline.add_node(component=prompt_node, name="PromptNode", inputs=["Retriever"])
query_pipeline.run(query = "Where is Istanbul?")