Metadata filtering in the Vector Store¶
Enhance a Question-Answering system with metadata filtering with LangChain and CassIO, using Cassandra as the Vector Database.
NOTE: this uses Cassandra's "Vector Similarity Search" capability. Make sure you are connecting to a vector-enabled database for this demo.
Setup¶
from langchain.indexes import VectorstoreIndexCreator
from langchain.text_splitter import (
CharacterTextSplitter,
RecursiveCharacterTextSplitter,
)
from langchain.docstore.document import Document
from langchain.document_loaders import TextLoader
The following line imports the Cassandra flavor of a LangChain vector store:
from langchain.vectorstores.cassandra import Cassandra
A database connection is needed. (If on a Colab, the only supported option is the cloud service Astra DB.)
# Ensure loading of database credentials into environment variables:
import os
from dotenv import load_dotenv
load_dotenv("../../../.env")
import cassio
Select your choice of database by editing this cell, if needed:
database_mode = "cassandra" # "cassandra" / "astra_db"
if database_mode == "astra_db":
cassio.init(
database_id=os.environ["ASTRA_DB_ID"],
token=os.environ["ASTRA_DB_APPLICATION_TOKEN"],
keyspace=os.environ.get("ASTRA_DB_KEYSPACE"), # this is optional
)
if database_mode == "cassandra":
from cqlsession import getCassandraCQLSession, getCassandraCQLKeyspace
cassio.init(
session=getCassandraCQLSession(),
keyspace=getCassandraCQLKeyspace(),
)
Both an LLM and an embedding function are required.
Below is the logic to instantiate the LLM and embeddings of choice. We chose to leave it in the notebooks for clarity.
import os
from llm_choice import suggestLLMProvider
llmProvider = suggestLLMProvider()
# (Alternatively set llmProvider to 'GCP_VertexAI', 'OpenAI', 'Azure_OpenAI' ... manually if you have credentials)
if llmProvider == 'GCP_VertexAI':
from langchain.llms import VertexAI
from langchain.embeddings import VertexAIEmbeddings
llm = VertexAI()
myEmbedding = VertexAIEmbeddings()
print('LLM+embeddings from Vertex AI')
elif llmProvider == 'OpenAI':
os.environ['OPENAI_API_TYPE'] = 'open_ai'
from langchain.llms import OpenAI
from langchain.embeddings import OpenAIEmbeddings
llm = OpenAI(temperature=0)
myEmbedding = OpenAIEmbeddings()
print('LLM+embeddings from OpenAI')
elif llmProvider == 'Azure_OpenAI':
os.environ['OPENAI_API_TYPE'] = 'azure'
os.environ['OPENAI_API_VERSION'] = os.environ['AZURE_OPENAI_API_VERSION']
os.environ['OPENAI_API_BASE'] = os.environ['AZURE_OPENAI_API_BASE']
os.environ['OPENAI_API_KEY'] = os.environ['AZURE_OPENAI_API_KEY']
from langchain.llms import AzureOpenAI
from langchain.embeddings import OpenAIEmbeddings
llm = AzureOpenAI(temperature=0, model_name=os.environ['AZURE_OPENAI_LLM_MODEL'],
engine=os.environ['AZURE_OPENAI_LLM_DEPLOYMENT'])
myEmbedding = OpenAIEmbeddings(model=os.environ['AZURE_OPENAI_EMBEDDINGS_MODEL'],
deployment=os.environ['AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT'])
print('LLM+embeddings from Azure OpenAI')
else:
raise ValueError('Unknown LLM provider.')
LLM+embeddings from OpenAI
Create the vector store and load data¶
Note: in case you have run this demo already, skip ahead to the next subsection ("B"): you will directly "re-open" a pre-populated store.
A. Create store while loading new documents in it¶
This section creates a brand new vector store and loads some source documents in it. The store is created and filled at once, to be later queried to retrieve relevant parts of the indexed text.
At question-answering time, LangChain will take care of looking for the relevant context fragments, stuff them into a prompt and finally use the prompt and an LLM to get the answer.
The following instantiates an "index creator", which knows about the type of vector store, the embedding to use and how to preprocess the input text:
(Note: stores built with different embedding functions will need different tables. This is why we append the llmProvider
name to the table name in the next cell.)
table_name = 'vs_test_md_' + llmProvider
index_creator = VectorstoreIndexCreator(
vectorstore_cls=Cassandra,
embedding=myEmbedding,
text_splitter=CharacterTextSplitter(
chunk_size=400,
chunk_overlap=0,
),
vectorstore_kwargs={
'session': None,
'keyspace': None,
'table_name': table_name,
},
)
Loading a local text (a few short stories by E. A. Poe will do)
loader1 = TextLoader('texts/amontillado.txt', encoding='utf8')
loader2 = TextLoader('texts/mask.txt', encoding='utf8')
loader3 = TextLoader('texts/manuscript.txt', encoding='utf8')
loaders = [loader1, loader2, loader3]
This takes a few seconds to run, as it must calculate embedding vectors for a number of chunks of the input text:
# Note: Certain LLM providers need workaround to evaluate batch embeddings
# (as done in next cell).
# As of 2023-06-29, Azure OpenAI would error with:
# "InvalidRequestError: Too many inputs. The max number of inputs is 1"
if llmProvider == 'Azure_OpenAI':
from langchain.indexes.vectorstore import VectorStoreIndexWrapper
for loader in loaders:
docs = loader.load()
subdocs = index_creator.text_splitter.split_documents(docs)
#
print(f'subdocument {0} ...', end=' ')
vs = index_creator.vectorstore_cls.from_documents(
subdocs[:1],
index_creator.embedding,
**index_creator.vectorstore_kwargs,
)
print('done.')
for sdi, sd in enumerate(subdocs[1:]):
print(f'subdocument {sdi+1} ...', end=' ')
vs.add_texts(texts=[sd.page_content], metadata=[sd.metadata])
print('done.')
#
index = VectorStoreIndexWrapper(vectorstore=vs)
if llmProvider != 'Azure_OpenAI':
index = index_creator.from_loaders(loaders)
Note: depending on how you load rows in your store, there might be ways to add your own metadata. Ask Langchain docs! For now, we have a source
metadata field with the file path, and we'll use that one.
For later demonstration, extract the vector store itself as a stand-alone object from the index:
myCassandraVStore = index.vectorstore
B. Loading a previously-populated Vector Store¶
In case you already have ingested the documents in the vector store, this is how you would "re-open" an index on it:
from langchain.indexes.vectorstore import VectorStoreIndexWrapper
myCassandraVStore = Cassandra(
embedding=myEmbedding,
session=None,
keyspace=None,
table_name='vs_test_md_' + llmProvider,
)
index = VectorStoreIndexWrapper(vectorstore=myCassandraVStore)
Metadata filtering in Question Answering¶
The crucial thing is that LangChain automatically sets the metadata key-value pair {"source": <file name>}
when loading documents, so you'll use that to constrain the answering process to specific documents.
(In case you need more flexibility in handling the metadata at insertion time, you should look into building your own metadatas
argument to the vector store's add_texts
method. You can see an example usage of add_texts
a few cells above this one.)
You'll concentrate on two questions, whose answer depends largely on the particular short story under scrutiny:
Technical note: ensure you are wrapping your filter
argument in the right dictionary structure, which will depend on whether you are working at the retriever, index, or store abstraction layer. Most of these methods tend to silently swallow unexpected parameters, so extra care is recommended in crafting the right retriever_kwargs
, search_kwargs
or filter
parameter to method calls.
Q1 = "Does the Captain do anything weird?"
Q2 = "Who arrives and scares everyone?"
Without metadata filtering (baseline case)¶
print(f"{'-'*20}\nAnswer to Q1 ({Q1}):\n ===> ", end="")
print(index.query(Q1).strip())
print(f"{'-'*20}\nAnswer to Q2 ({Q2}):\n ===> ", end="")
print(index.query(Q2).strip())
-------------------- Answer to Q1 (Does the Captain do anything weird?): ===> No, the captain does not do anything weird. He pays no attention to the narrator, and he does not seem to be aware of the narrator's presence. He is described as having an intense expression and evidence of old age, but he does not do anything out of the ordinary. -------------------- Answer to Q2 (Who arrives and scares everyone?): ===> The Red Death.
With metadata filtering¶
Additional conditions on metadata filtering are eventually passed as a key-value filter = {"source": <file name>}
parameter to the vector store's similarity search methods.
When using the index's query
method, this means supplying a retriever_kwargs
argument as follows:
retr_kwargs_manuscript = {"search_kwargs": {"filter": {"source": "texts/manuscript.txt"}}}
print("** Using 'manuscript.txt':")
print(f"{'-'*20}\nAnswer to Q1 ({Q1}):\n ===> ", end="")
print(index.query(Q1, retriever_kwargs=retr_kwargs_manuscript).strip())
print(f"{'-'*20}\nAnswer to Q2 ({Q2}):\n ===> ", end="")
print(index.query(Q2, retriever_kwargs=retr_kwargs_manuscript).strip())
** Using 'manuscript.txt': -------------------- Answer to Q1 (Does the Captain do anything weird?): ===> No, the captain does not do anything weird. He pays no attention to the narrator, and he does not seem to be aware of the narrator's presence. He is described as having an intense expression and evidence of old age, but he does not do anything out of the ordinary. -------------------- Answer to Q2 (Who arrives and scares everyone?): ===> A gigantic ship of perhaps four thousand tons.
retr_kwargs_mask = {"search_kwargs": {"filter": {"source": "texts/mask.txt"}}}
print("** Using 'mask.txt':")
print(f"{'-'*20}\nAnswer to Q1 ({Q1}):\n ===> ", end="")
print(index.query(Q1, retriever_kwargs=retr_kwargs_mask).strip())
print(f"{'-'*20}\nAnswer to Q2 ({Q2}):\n ===> ", end="")
print(index.query(Q2, retriever_kwargs=retr_kwargs_mask).strip())
** Using 'mask.txt': -------------------- Answer to Q1 (Does the Captain do anything weird?): ===> No, the Captain does not do anything weird. -------------------- Answer to Q2 (Who arrives and scares everyone?): ===> The Red Death.
Spawning a "retriever" from the index¶
You can also create a "retriever" from the index and use it for subsequent document fetching (based on semantic similarity).
Customizing the retriever amounts to passing a search_kwargs
argument to the vector store's as_retriever
method:
RETRIEVER_Q = "What does the narrator do?"
Without metadata filtering (baseline case)¶
retriever_0 = index.vectorstore.as_retriever(search_kwargs={'k': 4})
for doc_i, doc in enumerate(retriever_0.get_relevant_documents(RETRIEVER_Q)):
print(f"[doc {doc_i}, {doc.metadata['source']}] \"{doc.page_content[:50]}...\"")
[doc 0, texts/amontillado.txt] "The gait of my friend was unsteady, and the bells ..." [doc 1, texts/manuscript.txt] "I had scarcely completed my work, when a footstep ..." [doc 2, texts/manuscript.txt] "In the meantime the wind is still in our poop, and..." [doc 3, texts/amontillado.txt] ""The nitre!" I said; "see, it increases. It hangs..."
With metadata filtering¶
retriever_m = index.vectorstore.as_retriever(search_kwargs={
'k': 4,
'filter': {'source': 'texts/manuscript.txt'},
})
for doc_i, doc in enumerate(retriever_m.get_relevant_documents(RETRIEVER_Q)):
print(f"[doc {doc_i}, {doc.metadata['source']}] \"{doc.page_content[:50]}...\"")
[doc 0, texts/manuscript.txt] "I had scarcely completed my work, when a footstep ..." [doc 1, texts/manuscript.txt] "In the meantime the wind is still in our poop, and..." [doc 2, texts/manuscript.txt] "As I fell, the ship hove in stays, and went about;..." [doc 3, texts/manuscript.txt] "At this instant, I know not what sudden self-posse..."
MMR (maximal-marginal-relevance) Question Answering¶
Metadata filtering can be combined with the MMR technique for fetching, for the answer generation, relevant text fragments which at the same time are as diverse as possible:
MMR_Q = "Whose identity is unknown?"
Once more, depending on whether you are working at the index, retriever or vector store level, you have to encapsulate the filter
parameter differently. The following cells demonstrate this.
Without metadata filtering (baseline case)¶
for doc_i, doc in enumerate(myCassandraVStore.search(MMR_Q, search_type='mmr', k=4)):
print(f"[doc {doc_i}, {doc.metadata['source']}] \"{doc.page_content[:50]}...\"")
[doc 0, texts/mask.txt] "In an assembly of phantasms such as I have painted..." [doc 1, texts/mask.txt] "“Who dares?” he demanded hoarsely of the courtiers..." [doc 2, texts/manuscript.txt] "I had scarcely completed my work, when a footstep ..." [doc 3, texts/manuscript.txt] "A feeling, for which I have no name, has taken pos..."
print(index.query(
MMR_Q,
retriever_kwargs={
"k": 4,
"search_type": "mmr",
}
).strip())
The figure in the masquerade costume.
With metadata¶
mmr_md_filter = {'source': 'texts/manuscript.txt'}
results = myCassandraVStore.search(MMR_Q, search_type='mmr', k=4, filter=mmr_md_filter)
for i, doc in enumerate(results):
print(f"[doc {doc_i}, {doc.metadata['source']}] \"{doc.page_content[:50]}...\"")
[doc 3, texts/manuscript.txt] "I had scarcely completed my work, when a footstep ..." [doc 3, texts/manuscript.txt] "A feeling, for which I have no name, has taken pos..." [doc 3, texts/manuscript.txt] "Of my country and of my family I have little to sa..." [doc 3, texts/manuscript.txt] "When I look around me I feel ashamed of my former ..."
print(index.query(
MMR_Q,
retriever_kwargs={
"search_kwargs": {
"filter": mmr_md_filter,
},
"k": 4,
"search_type": "mmr",
}
).strip())
The man who passed by the speaker's place of concealment.
(optional) Cleanup¶
If you want to delete the data from your database and drop the table altogether, run the following cell:
c_session = cassio.config.resolve_session()
c_keyspace = cassio.config.resolve_keyspace()
c_session.execute(f"DROP TABLE IF EXISTS {c_keyspace}.{table_name};")
<cassandra.cluster.ResultSet at 0x7f80a3497eb0>