Apache Cassandra 向量存储

本站(springdoc.cn)中的内容来源于 spring.io ,原始版权归属于 spring.io。由 springdoc.cn 进行翻译,整理。可供个人学习、研究,未经许可,不得进行任何转载、商用或与之相关的行为。 商标声明:Spring 是 Pivotal Software, Inc. 在美国以及其他国家的商标。

本节将指导你配置 CassandraVectorStore 以存储文档向量并执行相似性搜索。

Apache Cassandra 是什么?

Apache Cassandra® 是真正的开源分布式数据库,以线性扩展能力、经实践验证的容错性和低延迟著称,是处理关键事务型数据的理想平台。

其向量相似性搜索(Vector Similarity Search,VSS)基于 JVector 库实现,确保一流的性能和相关性。

在 Apache Cassandra 中执行向量搜索仅需:

SELECT content FROM table ORDER BY content_vector ANN OF query_embedding;

更多文档可在 此处查阅

该 Spring AI 向量存储设计兼容全新 RAG 应用,并支持基于现有数据和表的改造集成。

该存储也可用于现有数据库中的非 RAG 场景,例如语义搜索、地理邻近搜索等。

存储将根据配置自动创建或优化表结构。若需禁用表结构修改,可通过 initializeSchema 参数配置。

使用 spring-boot-autoconfigure 时,根据 Spring Boot 标准,initializeSchema 默认为 false,必须通过在 application.properties 文件中设置 …​initialize-schema=true 显式启用表结构创建/修改。

JVector 是什么?

JVector 是纯 Java 嵌入式向量搜索引擎。

相较于其他 HNSW 向量相似性搜索实现,其优势在于:

  • 算法高效。JVector 采用受 DiskANN 及相关研究启用的先进图算法,兼具高召回率与低延迟特性。

  • 实现高效。JVector 利用 Panama SIMD API 加速索引构建与查询。

  • 内存高效。JVector 通过乘积量化压缩向量,使其在搜索期间可常驻内存。

  • 磁盘感知。JVector 的磁盘布局设计可在查询时实现最低必要 IOPS。

  • 高并发。索引构建线性扩展至至少 32 线程,线程数翻倍则构建时间减半。

  • 增量构建。支持边构建边查询,向量添加后即刻可被检索。

  • 易嵌入。API 设计简洁,经生产环境验证。

先决条件

  1. 用于计算文档向量的 EmbeddingModel 实例。通常配置为 Spring Bean,可选方案包括:

    • Transformers 嵌入 —— 在本地环境计算向量。默认通过 ONNX 和 all-MiniLM-L6-v2 Sentence Transformers 实现,开箱即用。

    • 若需使用 OpenAI 嵌入 —— 调用 OpenAI 的嵌入接口。需在 OpenAI 注册页面 创建账户,并于API Keys 页面生成 api-key

    • 更多选项请参阅 Embeddings API 文档。

  2. Apache Cassandra 实例(需 5.0-beta1 或更高版本)

    1. 自助快速入门

    2. Astra DB 提供健康的免费管理服务。

依赖项

Spring AI 自动配置及 Starter 模块的 Artifact 名称已发生重大变更。更多信息请参阅 升级说明

建议采用 Spring AI BOM 进行依赖管理,具体操作请参考 “依赖管理” 部分。

将以下依赖添加至项目中:

  • 仅需 Cassandra 向量存储:

<dependency>
    <groupId>org.springframework.ai</groupId>
    <artifactId>spring-ai-cassandra-store</artifactId>
</dependency>
  • 或为 RAG 应用集成全套组件(使用默认 ONNX 嵌入模型):

<dependency>
    <groupId>org.springframework.ai</groupId>
    <artifactId>spring-ai-starter-vector-store-cassandra</artifactId>
</dependency>

配置属性

可通过以下 Spring Boot 配置属性自定义 Apache Cassandra 向量存储:

属性 默认值

spring.ai.vectorstore.cassandra.keyspace

springframework

spring.ai.vectorstore.cassandra.table

ai_vector_store

spring.ai.vectorstore.cassandra.initialize-schema

false

spring.ai.vectorstore.cassandra.index-name

spring.ai.vectorstore.cassandra.content-column-name

content

spring.ai.vectorstore.cassandra.embedding-column-name

embedding

spring.ai.vectorstore.cassandra.fixed-thread-pool-executor-size

16

用法

基础用法

创建 Spring Bean CassandraVectorStore 实例 :

@Bean
public VectorStore vectorStore(CqlSession session, EmbeddingModel embeddingModel) {
    return CassandraVectorStore.builder(embeddingModel)
        .session(session)
        .keyspace("my_keyspace")
        .table("my_vectors")
        .build();
}

配置向量存储实例后,即可添加文档并执行搜索:

// Add documents
vectorStore.add(List.of(
    new Document("1", "content1", Map.of("key1", "value1")),
    new Document("2", "content2", Map.of("key2", "value2"))
));

// Search with filters
List<Document> results = vectorStore.similaritySearch(
    SearchRequest.query("search text")
        .withTopK(5)
        .withSimilarityThreshold(0.7f)
        .withFilterExpression("metadata.key1 == 'value1'")
);

高级配置

针对复杂场景,可在 Spring Bean 中配置以下扩展参数:

@Bean
public VectorStore vectorStore(CqlSession session, EmbeddingModel embeddingModel) {
    return CassandraVectorStore.builder(embeddingModel)
        .session(session)
        .keyspace("my_keyspace")
        .table("my_vectors")
        // Configure primary keys
        .partitionKeys(List.of(
            new SchemaColumn("id", DataTypes.TEXT),
            new SchemaColumn("category", DataTypes.TEXT)
        ))
        .clusteringKeys(List.of(
            new SchemaColumn("timestamp", DataTypes.TIMESTAMP)
        ))
        // Add metadata columns with optional indexing
        .addMetadataColumns(
            new SchemaColumn("category", DataTypes.TEXT, SchemaColumnTags.INDEXED),
            new SchemaColumn("score", DataTypes.DOUBLE)
        )
        // Customize column names
        .contentColumnName("text")
        .embeddingColumnName("vector")
        // Performance tuning
        .fixedThreadPoolExecutorSize(32)
        // Schema management
        .initializeSchema(true)
        // Custom batching strategy
        .batchingStrategy(new TokenCountBatchingStrategy())
        .build();
}

连接配置

配置 Cassandra 连接有两种方式:

  • 方式一:注入 CqlSession(推荐):

@Bean
public VectorStore vectorStore(CqlSession session, EmbeddingModel embeddingModel) {
    return CassandraVectorStore.builder(embeddingModel)
        .session(session)
        .keyspace("my_keyspace")
        .table("my_vectors")
        .build();
}
  • 方式二:通过 Builder 直接配置连接参数:

@Bean
public VectorStore vectorStore(EmbeddingModel embeddingModel) {
    return CassandraVectorStore.builder(embeddingModel)
        .contactPoint(new InetSocketAddress("localhost", 9042))
        .localDatacenter("datacenter1")
        .keyspace("my_keyspace")
        .build();
}

元数据过滤

CassandraVectorStore 支持通用可移植的元数据过滤。要使元数据列可搜索,这些列必须为主键或 SAI 索引列。为非主键列创建索引需通过 SchemaColumnTags.INDEXED 标注。

例如,可使用以下文本表达式语法:

vectorStore.similaritySearch(
    SearchRequest.builder().query("The World")
        .topK(5)
        .filterExpression("country in ['UK', 'NL'] && year >= 2020").build());

或通过 DSL 表达式编程实现:

Filter.Expression f = new FilterExpressionBuilder()
    .and(
        f.in("country", "UK", "NL"),
        f.gte("year", 2020)
    ).build();

vectorStore.similaritySearch(
    SearchRequest.builder().query("The World")
        .topK(5)
        .filterExpression(f).build());

可移植的过滤表达式会自动转换为 CQL 查询

高级示例:基于 Wikipedia 数据集的向量存储

以下示例演示如何在现有表结构上使用该存储。此处采用 github.com/datastax-labs/colbert-wikipedia-data 项目的表结构,该项目提供预向量化的完整 Wikipedia 数据集。

首先,在 Cassandra 数据库中创建表结构:

wget https://s.apache.org/colbert-wikipedia-schema-cql -O colbert-wikipedia-schema.cql
cqlsh -f colbert-wikipedia-schema.cql

随后通过构建器(Builder)模式配置存储:

@Bean
public VectorStore vectorStore(CqlSession session, EmbeddingModel embeddingModel) {
    List<SchemaColumn> partitionColumns = List.of(
        new SchemaColumn("wiki", DataTypes.TEXT),
        new SchemaColumn("language", DataTypes.TEXT),
        new SchemaColumn("title", DataTypes.TEXT)
    );

    List<SchemaColumn> clusteringColumns = List.of(
        new SchemaColumn("chunk_no", DataTypes.INT),
        new SchemaColumn("bert_embedding_no", DataTypes.INT)
    );

    List<SchemaColumn> extraColumns = List.of(
        new SchemaColumn("revision", DataTypes.INT),
        new SchemaColumn("id", DataTypes.INT)
    );

    return CassandraVectorStore.builder()
        .session(session)
        .embeddingModel(embeddingModel)
        .keyspace("wikidata")
        .table("articles")
        .partitionKeys(partitionColumns)
        .clusteringKeys(clusteringColumns)
        .contentColumnName("body")
        .embeddingColumnName("all_minilm_l6_v2_embedding")
        .indexName("all_minilm_l6_v2_ann")
        .initializeSchema(false)
        .addMetadataColumns(extraColumns)
        .primaryKeyTranslator((List<Object> primaryKeys) -> {
            if (primaryKeys.isEmpty()) {
                return "test§¶0";
            }
            return String.format("%s§¶%s", primaryKeys.get(2), primaryKeys.get(3));
        })
        .documentIdTranslator((id) -> {
            String[] parts = id.split("§¶");
            String title = parts[0];
            int chunk_no = parts.length > 1 ? Integer.parseInt(parts[1]) : 0;
            return List.of("simplewiki", "en", title, chunk_no, 0);
        })
        .build();
}

@Bean
public EmbeddingModel embeddingModel() {
    // default is ONNX all-MiniLM-L6-v2 which is what we want
    return new TransformersEmbeddingModel();
}

加载完整 Wikipedia 数据集

加载完整 Wikipedia 数据集的操作步骤

  1. s.apache.org/simplewiki-sstable-tar 下载 simplewiki-sstable.tar(文件大小数十GB,下载需较长时间)

  2. 加载数据:

tar -xf simplewiki-sstable.tar -C ${CASSANDRA_DATA}/data/wikidata/articles-*/
nodetool import wikidata articles ${CASSANDRA_DATA}/data/wikidata/articles-*/
  • 若表中已存在数据,需确保解压 tarball 文件时不会覆盖现有 SSTable

  • nodetool import 外,重启 Cassandra 也可触发数据加载

  • 索引若存在故障将自动重建

原生客户端

Cassandra 向量存储实现通过 getNativeClient() 方法提供对底层原生 Cassandra 客户端(CqlSession)的访问:

CassandraVectorStore vectorStore = context.getBean(CassandraVectorStore.class);
Optional<CqlSession> nativeClient = vectorStore.getNativeClient();

if (nativeClient.isPresent()) {
    CqlSession session = nativeClient.get();
    // Use the native client for Cassandra-specific operations
}

原生客户端支持访问 Cassandra 特有功能及操作,这些功能可能未通过 VectorStore 接口暴露。