diff --git a/CHANGES.txt b/CHANGES.txt index 974b9ceaa..ce86121c6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.5.0 ----- + * Add Storage Attached Index (SAI) support to the bulk writer (CASSANALYTICS-31) * Upgrade sidecar version to 0.4.0 * Exclude IP address from RingInstance equality so node replacement does not fail bulk write jobs (CASSANALYTICS-175) * Regenerate bloom filters for CQLSSTableWriter (CASSANALYTICS-167) diff --git a/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/data/SSTableImportOptions.java b/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/data/SSTableImportOptions.java index 60b6a492f..168b08a32 100644 --- a/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/data/SSTableImportOptions.java +++ b/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/data/SSTableImportOptions.java @@ -33,6 +33,8 @@ public class SSTableImportOptions extends HashMap private static final String INVALIDATE_CACHES = "invalidateCaches"; private static final String EXTENDED_VERIFY = "extendedVerify"; private static final String COPY_DATA = "copyData"; + private static final String FAIL_ON_MISSING_INDEX = "failOnMissingIndex"; + private static final String VALIDATE_INDEX_CHECKSUM = "validateIndexChecksum"; public static SSTableImportOptions defaults() { @@ -126,4 +128,34 @@ public boolean copyData() { return Boolean.parseBoolean(get(COPY_DATA)); } + + /** + * When enabled, SSTable import fails if a table with Storage Attached Indexes (SAI) is missing its index + * components, instead of silently rebuilding them. Only meaningful for SAI tables (Cassandra 5.0+). + */ + public SSTableImportOptions failOnMissingIndex(boolean enabled) + { + put(FAIL_ON_MISSING_INDEX, Boolean.toString(enabled)); + return this; + } + + public boolean failOnMissingIndex() + { + return Boolean.parseBoolean(get(FAIL_ON_MISSING_INDEX)); + } + + /** + * When enabled, SSTable import validates the checksums of Storage Attached Index (SAI) components. + * Only meaningful for SAI tables (Cassandra 5.0+). + */ + public SSTableImportOptions validateIndexChecksum(boolean enabled) + { + put(VALIDATE_INDEX_CHECKSUM, Boolean.toString(enabled)); + return this; + } + + public boolean validateIndexChecksum() + { + return Boolean.parseBoolean(get(VALIDATE_INDEX_CHECKSUM)); + } } diff --git a/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ImportSSTableRequest.java b/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ImportSSTableRequest.java index 7633b9440..6d5477b4c 100644 --- a/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ImportSSTableRequest.java +++ b/analytics-sidecar-client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ImportSSTableRequest.java @@ -65,6 +65,8 @@ public static class ImportOptions private Boolean invalidateCaches; private Boolean extendedVerify; private Boolean copyData; + private Boolean failOnMissingIndex; + private Boolean validateIndexChecksum; public ImportOptions() { @@ -153,6 +155,32 @@ public ImportOptions copyData(boolean copyData) this.copyData = copyData; return this; } + + /** + * Sets the {@code failOnMissingIndex} and returns a reference to this ImportOptions enabling method chaining. + * When enabled, Cassandra validates SAI index components during import. + * + * @param failOnMissingIndex the {@code failOnMissingIndex} to set + * @return a reference to this ImportOptions + */ + public ImportOptions failOnMissingIndex(boolean failOnMissingIndex) + { + this.failOnMissingIndex = failOnMissingIndex; + return this; + } + + /** + * Sets the {@code validateIndexChecksum} and returns a reference to this ImportOptions enabling method chaining. + * When enabled, Cassandra verifies SAI index component checksums during import. + * + * @param validateIndexChecksum the {@code validateIndexChecksum} to set + * @return a reference to this ImportOptions + */ + public ImportOptions validateIndexChecksum(boolean validateIndexChecksum) + { + this.validateIndexChecksum = validateIndexChecksum; + return this; + } } static String requestURI(String keyspace, String tableName, String uploadId, ImportOptions importOptions) @@ -205,6 +233,14 @@ private static List selectedOptions(ImportOptions importOptions) { options.add("copyData=" + importOptions.copyData); } + if (importOptions.failOnMissingIndex != null) + { + options.add("failOnMissingIndex=" + importOptions.failOnMissingIndex); + } + if (importOptions.validateIndexChecksum != null) + { + options.add("validateIndexChecksum=" + importOptions.validateIndexChecksum); + } return options; } diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTests.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTests.java index b67fc2fda..30b43f0b1 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTests.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTests.java @@ -175,7 +175,7 @@ public void testMockedCdc(CassandraVersion version) Partitioner.Murmur3Partitioner, table.udtCreateStmts(bridge.cassandraTypes()), null, - 0, + Collections.emptySet(), true); SchemaSupplier schemaSupplier = () -> CompletableFuture.completedFuture(ImmutableSet.of(table)); AtomicReference state = new AtomicReference<>(); @@ -514,13 +514,13 @@ public void testMultiTable(CassandraVersion version) ReplicationFactor.simpleStrategy(1), Partitioner.Murmur3Partitioner, Collections.emptySet(), - null, 0, schema2.withCdc); + null, Collections.emptySet(), schema2.withCdc); bridge.buildSchema(cqlTable3.createStatement(), cqlTable3.keyspace(), ReplicationFactor.simpleStrategy(1), Partitioner.Murmur3Partitioner, Collections.emptySet(), - null, 0, schema3.withCdc); + null, Collections.emptySet(), schema3.withCdc); int numRows = DEFAULT_NUM_ROWS; AtomicReference schema1Holder = new AtomicReference<>(); diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/MicroBatchIteratorTests.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/MicroBatchIteratorTests.java index 7b477d2ae..6521c275f 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/MicroBatchIteratorTests.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/MicroBatchIteratorTests.java @@ -390,7 +390,7 @@ private void runTest(CassandraBridge bridge, ReplicationFactor.simpleStrategy(1), Partitioner.Murmur3Partitioner, Collections.emptySet(), - null, 0, schema.withCdc); + null, Collections.emptySet(), schema.withCdc); schema.setCassandraVersion(bridge.getVersion()); try diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcTester.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcTester.java index f05c0ae3c..314613c32 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcTester.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcTester.java @@ -21,6 +21,7 @@ import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -266,7 +267,7 @@ public void run() { LOGGER.info("Running CDC test testId={} schema='{}' thread={}", testId, cqlTable.fields(), Thread.currentThread().getName()); Set udtStmts = schema.udts.stream().map(e -> e.createStatement(bridge.cassandraTypes(), schema.keyspace)).collect(Collectors.toSet()); - bridge.buildSchema(schema.createStatement, schema.keyspace, schema.rf, partitioner, udtStmts, null, 0, true); + bridge.buildSchema(schema.createStatement, schema.keyspace, schema.rf, partitioner, udtStmts, null, Collections.emptySet(), true); schema.setCassandraVersion(bridge.getVersion()); // write some mutations to CDC CommitLog diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java index b273f6ce3..8fdb72ffb 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java @@ -89,7 +89,7 @@ public void testReaderSeek(CassandraVersion version) ReplicationFactor.simpleStrategy(1), Partitioner.Murmur3Partitioner, Collections.emptySet(), - null, 0, true); + null, Collections.emptySet(), true); int numRows = 1000; // write some rows to a CommitLog diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/SSTables.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/SSTables.java index 68409be71..aa41acc39 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/SSTables.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/SSTables.java @@ -22,14 +22,52 @@ import java.nio.file.Path; import org.apache.cassandra.bridge.SSTableDescriptor; +import org.apache.cassandra.spark.data.FileType; public final class SSTables { + /** + * Suffix identifying the primary SSTable data component, e.g. "-Data.db". + * The leading '-' is significant: it excludes SAI per-index components such as + * "...+TermsData.db" that also end with "Data.db". + */ + private static final String DATA_COMPONENT_SUFFIX = "-" + FileType.DATA.getFileSuffix(); + + /** + * Glob matching primary SSTable data components, e.g. "*-Data.db". + * Suitable for {@link java.nio.file.Files#newDirectoryStream(Path, String)}. + */ + public static final String DATA_COMPONENT_GLOB = "*" + DATA_COMPONENT_SUFFIX; + private SSTables() { throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); } + /** + * Determine whether the given file name is a primary SSTable data component ("<descriptor>-Data.db"). + * The leading '-' check excludes SAI per-index components such as "...+TermsData.db" which also end with "Data.db". + * + * @param fileName file name (not a full path) + * @return true if the name is a primary data component + */ + public static boolean isDataComponent(String fileName) + { + return fileName.endsWith(DATA_COMPONENT_SUFFIX); + } + + /** + * Determine whether the given path is a primary SSTable data component ("<descriptor>-Data.db"). + * + * @param path file path + * @return true if the path's file name is a primary data component + * @see #isDataComponent(String) + */ + public static boolean isDataComponent(Path path) + { + return isDataComponent(path.getFileName().toString()); + } + /** * Get the sstable base name from data file path. * For example, the base name of data file '/path/to/table/nb-1-big-Data.db' is 'nb-1-big' diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlTable.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlTable.java index d71e471c4..c52dd6bb2 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlTable.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlTable.java @@ -57,7 +57,7 @@ public class CqlTable implements Serializable private final List staticColumns; private final List valueColumns; private final transient Map columns; - private final int indexCount; + private final Set indexStatements; public CqlTable(@NotNull String keyspace, @NotNull String table, @@ -65,7 +65,7 @@ public CqlTable(@NotNull String keyspace, @NotNull ReplicationFactor replicationFactor, @NotNull List fields) { - this(keyspace, table, createStatement, replicationFactor, fields, Collections.emptySet(), 0); + this(keyspace, table, createStatement, replicationFactor, fields, Collections.emptySet(), Collections.emptySet()); } public CqlTable(@NotNull String keyspace, @@ -74,7 +74,7 @@ public CqlTable(@NotNull String keyspace, @NotNull ReplicationFactor replicationFactor, @NotNull List fields, @NotNull Set udts, - int indexCount) + @NotNull Set indexStatements) { this.keyspace = keyspace; this.table = table; @@ -87,7 +87,7 @@ public CqlTable(@NotNull String keyspace, this.staticColumns = this.fields.stream().filter(CqlField::isStaticColumn).sorted().collect(Collectors.toList()); this.valueColumns = this.fields.stream().filter(CqlField::isValueColumn).sorted().collect(Collectors.toList()); this.udts = Collections.unmodifiableSet(udts); - this.indexCount = indexCount; + this.indexStatements = Collections.unmodifiableSet(indexStatements); // We use a linked hashmap to guarantee ordering of a 'SELECT * FROM ...' this.columns = new LinkedHashMap<>(); @@ -241,9 +241,14 @@ public String createStatement() return createStatement; } + public Set indexStatements() + { + return indexStatements; + } + public int indexCount() { - return indexCount; + return indexStatements.size(); } /** @@ -382,8 +387,15 @@ public CqlTable read(Kryo kryo, Input input, Class type) { udts.add((CqlField.CqlUdt) CqlField.CqlType.read(input, cassandraTypes)); } - int indexCount = input.readInt(); - return new CqlTable(keyspace, table, createStatement, replicationFactor, fields, udts, indexCount); + + int numIndexStatements = input.readInt(); + Set indexStatements = new LinkedHashSet<>(numIndexStatements); + for (int idx = 0; idx < numIndexStatements; idx++) + { + indexStatements.add(input.readString()); + } + + return new CqlTable(keyspace, table, createStatement, replicationFactor, fields, udts, indexStatements); } @Override @@ -405,7 +417,13 @@ public void write(Kryo kryo, Output output, CqlTable table) { udt.write(output); } - output.writeInt(table.indexCount()); + + Set indexStatements = table.indexStatements(); + output.writeInt(indexStatements.size()); + for (String stmt : indexStatements) + { + output.writeString(stmt); + } } } } diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java index 9d989be51..b613f72d9 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.regex.Matcher; @@ -59,6 +60,9 @@ public final class CqlUtils private static final Pattern ESCAPED_DOUBLE_BACKSLASH = Pattern.compile("\\\\"); private static final Pattern COMPACTION_STRATEGY_PATTERN = Pattern.compile("compaction\\s*=\\s*\\{\\s*'class'\\s*:\\s*'([^']+)'"); + private static final Pattern MULTI_WHITESPACE_PATTERN = Pattern.compile("\\s+"); + private static final Pattern SAI_USING_PATTERN = Pattern.compile("USING '[^']*STORAGEATTACHEDINDEX'"); + private CqlUtils() { throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); @@ -259,16 +263,66 @@ public static Set extractUdts(@NotNull String schemaStr, @NotNull String } public static int extractIndexCount(@NotNull String schemaStr, @NotNull String keyspace, @NotNull String table) + { + return extractIndexStatements(schemaStr, keyspace, table).size(); + } + + /** + * Extracts CREATE INDEX statements for the given table from the schema string. + * + * @param schemaStr full cluster schema text + * @param keyspace the keyspace name + * @param table the table name + * @return set of CREATE INDEX statements for the table + */ + public static Set extractIndexStatements(@NotNull String schemaStr, + @NotNull String keyspace, + @NotNull String table) { String cleaned = cleanCql(schemaStr); Pattern pattern = Pattern.compile(String.format("CREATE (CUSTOM )?INDEX \"?[^ ]* ON ?\"?%s?\"?\\.{1}\"?%s\"?[^;]*;", keyspace, table)); Matcher matcher = pattern.matcher(cleaned); - int indexCount = 0; + Set statements = new HashSet<>(); while (matcher.find()) { - indexCount++; + statements.add(matcher.group()); } - return indexCount; + return statements; + } + + /** + * Returns true if the given CREATE INDEX statement defines a Storage Attached Index (SAI). + *

+ * SAI class may appear either as the short name ("StorageAttachedIndex") or fully qualified + * ("org.apache.cassandra.index.sai.StorageAttachedIndex"). + * + * @param createIndexStatement a CREATE INDEX CQL statement + * @return true if the index uses SAI + */ + public static boolean isSaiIndex(@NotNull String createIndexStatement) + { + // Matches any run of whitespace (spaces, tabs, newlines). Used to collapse a CREATE INDEX statement to + // single-spaced text so the SAI marker can be matched regardless of how the schema text was formatted + // (e.g. "USING 'StorageAttachedIndex'", a newline before USING, or a tab all normalize to one space). + String normalized = MULTI_WHITESPACE_PATTERN.matcher(createIndexStatement).replaceAll(" ").toUpperCase(Locale.ROOT); + + // Matches the SAI marker inside a USING '...' clause (statement already upper-cased and whitespace-collapsed). + // The leading [^']* tolerates the fully-qualified class form + // (e.g. 'org.apache.cassandra.index.sai.StorageAttachedIndex') as well as the short name. + return SAI_USING_PATTERN.matcher(normalized).find(); + } + + /** + * Returns true when {@code indexStatements} is non-empty and every statement defines a Storage Attached Index. + * This is the single "all-SAI table" predicate shared by schema validation and the write/commit paths, so the + * decision to generate SAI components and the decision to enable SAI import options can never disagree. + * + * @param indexStatements the CREATE INDEX statements for a table + * @return true if all indexes are SAI (and at least one exists) + */ + public static boolean hasOnlySaiIndexes(@NotNull Set indexStatements) + { + return !indexStatements.isEmpty() && indexStatements.stream().allMatch(CqlUtils::isSaiIndex); } /** diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java index 447285ff3..0b5a41644 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java @@ -244,8 +244,8 @@ protected SchemaInfo buildSchemaInfo(StructType structType) String createTableSchema = CqlUtils.extractTableSchema(keyspaceSchema, keyspace, table); Set udts = CqlUtils.extractUdts(keyspaceSchema, keyspace); ReplicationFactor replicationFactor = CqlUtils.extractReplicationFactor(keyspaceSchema, keyspace); - int indexCount = CqlUtils.extractIndexCount(keyspaceSchema, keyspace, table); - CqlTable cqlTable = bridge().buildSchema(createTableSchema, keyspace, replicationFactor, partitioner, udts, null, indexCount, false); + Set indexStatements = CqlUtils.extractIndexStatements(keyspaceSchema, keyspace, table); + CqlTable cqlTable = bridge().buildSchema(createTableSchema, keyspace, replicationFactor, partitioner, udts, null, indexStatements, false); TableInfoProvider tableInfoProvider = new CqlTableInfoProvider(createTableSchema, cqlTable); TableSchema tableSchema = initializeTableSchema(bulkSparkConf(), structType, tableInfoProvider, lowestCassandraVersion()); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableTableSchema.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableTableSchema.java index 322e815dd..41afd5ebb 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableTableSchema.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableTableSchema.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.List; +import java.util.Set; import com.google.common.base.Preconditions; @@ -46,7 +47,7 @@ */ public final class BroadcastableTableSchema implements Serializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; // All fields from TableSchema needed for reconstruction on executors private final String createStatement; @@ -60,6 +61,7 @@ public final class BroadcastableTableSchema implements Serializable private final TimestampOption timestampOption; private final String lowestCassandraVersion; private final boolean quoteIdentifiers; + private final Set indexStatements; /** * Creates a BroadcastableTableSchema from a source TableSchema. @@ -81,7 +83,8 @@ public static BroadcastableTableSchema from(@NotNull TableSchema source) source.ttlOption, source.timestampOption, source.lowestCassandraVersion, - source.quoteIdentifiers + source.quoteIdentifiers, + source.indexStatements ); } @@ -95,7 +98,8 @@ private BroadcastableTableSchema(String createStatement, TTLOption ttlOption, TimestampOption timestampOption, String lowestCassandraVersion, - boolean quoteIdentifiers) + boolean quoteIdentifiers, + Set indexStatements) { this.createStatement = createStatement; this.modificationStatement = modificationStatement; @@ -108,6 +112,7 @@ private BroadcastableTableSchema(String createStatement, this.timestampOption = timestampOption; this.lowestCassandraVersion = lowestCassandraVersion; this.quoteIdentifiers = quoteIdentifiers; + this.indexStatements = indexStatements; } public String getCreateStatement() @@ -165,6 +170,11 @@ public boolean isQuoteIdentifiers() return quoteIdentifiers; } + public Set getIndexStatements() + { + return indexStatements; + } + /** * Normalizes a row by applying type converters to each field. * This mirrors the normalize method in TableSchema but uses the broadcast-safe converters list. diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java index 7ba220de3..1e695f27a 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -502,11 +503,21 @@ private CreateRestoreJobRequestPayload.Builder createJobPayloadBuilder(JobInfo j @org.jetbrains.annotations.Nullable String credentialTypeName) { CreateRestoreJobRequestPayload.Builder builder = CreateRestoreJobRequestPayload.builder(secrets, updatedLeaseTime()); + Set indexStatements = writerContext.schema().getTableSchema().getIndexStatements(); + boolean hasSaiIndexes = TableSchema.isSaiWrite(indexStatements, writerContext.cluster().getLowestCassandraVersion()); builder.jobAgent(BuildInfo.APPLICATION_NAME) .jobId(job.getRestoreJobId(clusterId)) .updateImportOptions(importOptions -> { importOptions.verifySSTables(true) // we disallow the end-user to bypass the non-extended verify anymore .extendedVerify(false); // always turn off + + // When the table has SAI indexes, the bulk writer generates SAI components alongside the + // SSTables (Cassandra 5.0+). Enable SAI validation on import so that a slice missing its + // index components fails instead of silently rebuilding, mirroring the direct write path. + if (hasSaiIndexes) + { + importOptions.failOnMissingIndex(true).validateIndexChecksum(true); + } }); if (credentialTypeName != null) { diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraDirectDataTransportContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraDirectDataTransportContext.java index bed36ac4b..157ccdccc 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraDirectDataTransportContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraDirectDataTransportContext.java @@ -20,6 +20,7 @@ package org.apache.cassandra.spark.bulkwriter; import java.math.BigInteger; +import java.util.Set; import java.util.concurrent.ExecutorService; import com.google.common.collect.Range; @@ -36,12 +37,15 @@ public class CassandraDirectDataTransportContext implements TransportContext.Dir @NotNull private final ClusterInfo clusterInfo; @NotNull + private final SchemaInfo schemaInfo; + @NotNull private final DirectDataTransferApi dataTransferApi; public CassandraDirectDataTransportContext(@NotNull BulkWriterContext bulkWriterContext) { this.jobInfo = bulkWriterContext.job(); this.clusterInfo = bulkWriterContext.cluster(); + this.schemaInfo = bulkWriterContext.schema(); this.dataTransferApi = createDirectDataTransferApi(); } @@ -72,6 +76,9 @@ public DirectDataTransferApi dataTransferApi() protected DirectDataTransferApi createDirectDataTransferApi() { CassandraBridge bridge = CassandraBridgeFactory.get(clusterInfo.getLowestCassandraVersion()); - return new SidecarDataTransferApi(clusterInfo.getCassandraContext(), bridge, jobInfo); + Set indexStatements = schemaInfo.getTableSchema().getIndexStatements(); + boolean hasSaiIndexes = TableSchema.isSaiWrite(indexStatements, clusterInfo.getLowestCassandraVersion()); + + return new SidecarDataTransferApi(clusterInfo.getCassandraContext(), bridge, jobInfo, hasSaiIndexes); } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraSchemaInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraSchemaInfo.java index db98cf710..a082da296 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraSchemaInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraSchemaInfo.java @@ -21,6 +21,8 @@ import java.util.Set; +import org.jetbrains.annotations.NotNull; + public class CassandraSchemaInfo implements SchemaInfo { private final TableSchema tableSchema; @@ -51,6 +53,7 @@ public TableSchema getTableSchema() } @Override + @NotNull public Set getUserDefinedTypeStatements() { return userDefinedTypeStatements; diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CqlTableInfoProvider.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CqlTableInfoProvider.java index a4c8c3594..2bc117a40 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CqlTableInfoProvider.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CqlTableInfoProvider.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; +import java.util.Set; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -157,7 +158,13 @@ public String getKeyspaceName() @Override public boolean hasSecondaryIndex() { - return cqlTable.indexCount() > 0; + return !cqlTable.indexStatements().isEmpty(); + } + + @Override + public Set getIndexStatements() + { + return cqlTable.indexStatements(); } @Override diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSession.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSession.java index 7af617090..c661bbc7e 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSession.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSession.java @@ -43,7 +43,6 @@ import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; import org.apache.cassandra.spark.common.Digest; import org.apache.cassandra.spark.common.SSTables; -import org.apache.cassandra.spark.data.FileType; import org.apache.cassandra.util.IntWrapper; public class DirectStreamSession extends StreamSession @@ -86,11 +85,12 @@ protected void onSSTablesProduced(Set sstables) // 3. send the sstables to all replicas // 4. remove the sstables once sent Map fileDigests = sstableWriter.prepareSStablesToSend(writerContext, sstables); - // retain only the SSTable data components + // retain only the primary SSTable data components ("-Data.db"); SAI per-index + // components such as "...+TermsData.db" (which also end with "Data.db") are excluded IntWrapper sstableCounter = new IntWrapper(); fileDigests.keySet() .stream() - .filter(p -> p.getFileName().toString().endsWith(FileType.DATA.getFileSuffix())) + .filter(SSTables::isDataComponent) .forEach(sstable -> { sstableCounter.value++; sendSStableToReplicas(sstable); @@ -153,7 +153,8 @@ protected StreamResult doFinalizeStream() @Override protected void sendRemainingSSTables() { - try (DirectoryStream dataFileStream = Files.newDirectoryStream(sstableWriter.getOutDir(), "*Data.db")) + try (DirectoryStream dataFileStream = Files.newDirectoryStream(sstableWriter.getOutDir(), + SSTables.DATA_COMPONENT_GLOB)) { for (Path dataFile : dataFileStream) { @@ -224,8 +225,9 @@ private void sendSSTableToOneReplica(Path dataFile, { for (Path componentFile : componentFileStream) { - // send data component the last - if (componentFile.getFileName().toString().endsWith("Data.db")) + // send the primary data component ("-Data.db") last; SAI per-index components + // such as "...+TermsData.db" are still streamed here rather than skipped + if (SSTables.isDataComponent(componentFile)) { continue; } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java index 77b8f5f42..344f2cda9 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java @@ -39,6 +39,7 @@ public static SSTableWriter getSSTableWriter(CassandraVersionFeatures serverVers String createStatement, String insertStatement, Set userDefinedTypeStatements, + Set indexCreateStatements, int bufferSizeMB) { CassandraBridge cassandraBridge = CassandraBridgeFactory.get(serverVersion); @@ -47,6 +48,7 @@ public static SSTableWriter getSSTableWriter(CassandraVersionFeatures serverVers createStatement, insertStatement, userDefinedTypeStatements, + indexCreateStatements, bufferSizeMB); } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SidecarDataTransferApi.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SidecarDataTransferApi.java index 9199b01ca..b7debb5c3 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SidecarDataTransferApi.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SidecarDataTransferApi.java @@ -52,13 +52,16 @@ public class SidecarDataTransferApi implements DirectDataTransferApi private final CassandraBridge bridge; private final int sidecarPort; private final JobInfo job; + private final boolean hasSaiIndexes; - public SidecarDataTransferApi(CassandraContext cassandraContext, CassandraBridge bridge, JobInfo job) + public SidecarDataTransferApi(CassandraContext cassandraContext, CassandraBridge bridge, JobInfo job, + boolean hasSaiIndexes) { this.sidecarClient = cassandraContext.getSidecarClient(); this.sidecarPort = cassandraContext.sidecarPort(); this.bridge = bridge; this.job = job; + this.hasSaiIndexes = hasSaiIndexes; } @Override @@ -107,6 +110,14 @@ public RemoteCommitResult commitSSTables(CassandraInstance instance, // Always verify SSTables on import importOptions.verifySSTables(true).extendedVerify(!job.skipExtendedVerify()); + // When SAI index components were generated alongside SSTables, enable SAI validation on import. + // failOnMissingIndex makes the import fail (instead of silently rebuilding) when the uploaded + // SSTables are missing their SAI components, and validateIndexChecksum verifies their checksums. + if (hasSaiIndexes) + { + importOptions.failOnMissingIndex(true).validateIndexChecksum(true); + } + try { QualifiedTableName qt = job.qualifiedTableName(); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java index 877aafa15..02453c8e5 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java @@ -44,7 +44,6 @@ import org.apache.cassandra.spark.common.Digest; import org.apache.cassandra.spark.common.SSTables; import org.apache.cassandra.spark.data.FileSystemSSTable; -import org.apache.cassandra.spark.data.FileType; import org.apache.cassandra.spark.data.LocalDataLayer; import org.apache.cassandra.spark.data.partitioner.Partitioner; import org.apache.cassandra.spark.reader.RowData; @@ -127,13 +126,14 @@ public SortedSSTableWriter(BulkWriterContext writerContext, Path outDir, DigestA SchemaInfo schema = writerContext.schema(); TableSchema tableSchema = schema.getTableSchema(); this.cqlSSTableWriter = SSTableWriterFactory.getSSTableWriter( - CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(packageVersion), - this.outDir.toString(), - writerContext.cluster().getPartitioner().toString(), - tableSchema.createStatement, - tableSchema.modificationStatement, - schema.getUserDefinedTypeStatements(), - writerContext.job().sstableDataSizeInMiB()); + CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(packageVersion), + this.outDir.toString(), + writerContext.cluster().getPartitioner().toString(), + tableSchema.createStatement, + tableSchema.modificationStatement, + schema.getUserDefinedTypeStatements(), + tableSchema.getIndexStatements(), + writerContext.job().sstableDataSizeInMiB()); } @NotNull @@ -236,7 +236,7 @@ public synchronized Map prepareSStablesToSend(@NotNull BulkWriterC { for (Path path : stream) { - if (path.getFileName().toString().endsWith("-" + FileType.DATA.getFileSuffix())) + if (SSTables.isDataComponent(path)) { dataFilePaths.add(path); sstableCount += 1; @@ -387,11 +387,10 @@ private LocalDataLayer buildLocalDataLayer(@NotNull BulkWriterContext writerCont private DirectoryStream getDataFileStream(DirectoryStream.Filter filter) throws IOException { - // Combine the data file filter with the provided filter - DirectoryStream.Filter combinedFilter = path -> { - String fileName = path.getFileName().toString(); - return fileName.endsWith("Data.db") && filter.accept(path); - }; + // Combine the data file filter with the provided filter. + // Match only the primary SSTable data component ("-Data.db"); the leading '-' excludes + // SAI per-index components such as "...+TermsData.db" which also end with "Data.db". + DirectoryStream.Filter combinedFilter = path -> SSTables.isDataComponent(path) && filter.accept(path); return Files.newDirectoryStream(getOutDir(), combinedFilter); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableInfoProvider.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableInfoProvider.java index aaf1ff56f..65213c39d 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableInfoProvider.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableInfoProvider.java @@ -20,6 +20,7 @@ package org.apache.cassandra.spark.bulkwriter; import java.util.List; +import java.util.Set; import org.apache.cassandra.spark.common.schema.ColumnType; import org.apache.cassandra.spark.data.CqlField; @@ -45,4 +46,6 @@ public interface TableInfoProvider boolean hasSecondaryIndex(); List getColumnNames(); + + Set getIndexStatements(); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java index b198af984..7543220f9 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java @@ -27,15 +27,18 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.bridge.CassandraBridge; import org.apache.cassandra.bridge.CassandraBridgeFactory; +import org.apache.cassandra.bridge.CassandraVersionFeatures; import org.apache.cassandra.spark.common.schema.ColumnType; import org.apache.cassandra.spark.data.CqlField; import org.apache.cassandra.spark.exception.UnsupportedAnalyticsOperationException; +import org.apache.cassandra.spark.utils.CqlUtils; import org.apache.spark.sql.types.StructType; import static org.apache.cassandra.bridge.CassandraBridgeFactory.maybeQuotedIdentifier; @@ -62,6 +65,7 @@ public class TableSchema final TimestampOption timestampOption; final String lowestCassandraVersion; final boolean quoteIdentifiers; + final Set indexStatements; public TableSchema(StructType dfSchema, TableInfoProvider tableInfo, @@ -78,23 +82,10 @@ public TableSchema(StructType dfSchema, this.timestampOption = timestampOption; this.lowestCassandraVersion = lowestCassandraVersion; this.quoteIdentifiers = quoteIdentifiers; + this.indexStatements = tableInfo.getIndexStatements(); validateDataFrameCompatibility(dfSchema, tableInfo); - // If a table has indexes on it, some external process (application, DB, etc.) is responsible for rebuilding - // indexes on the table after the bulk write completes; cassandra does this as part of the SSTable import - // process today. 2i and SAI have different ergonomics here regarding if stale data is served during index build; - // ultimately we want the bulk writer to also write native SAI index files alongside sstables but until - // then, this is allowable and fine for users who Know What They're Doing. - if (!skipSecondaryIndexCheck) - { - validateNoSecondaryIndexes(tableInfo); - } - else if (tableInfo.hasSecondaryIndex()) - { - LOGGER.warn("Bulk writing to tables with SecondaryIndexes will have an asynchronous index rebuild " - + "take place automatically after writing. Reads against the index during this time " - + "window will produce inconsistent or stale results until index rebuild is complete."); - } + validateSecondaryIndexes(tableInfo, skipSecondaryIndexCheck, indexStatements, lowestCassandraVersion); validateUserAddedColumns(lowestCassandraVersion, quoteIdentifiers, ttlOption, timestampOption); this.createStatement = getCreateStatement(tableInfo); @@ -125,6 +116,7 @@ public TableSchema(BroadcastableTableSchema broadcastable) this.timestampOption = broadcastable.getTimestampOption(); this.lowestCassandraVersion = broadcastable.getLowestCassandraVersion(); this.quoteIdentifiers = broadcastable.isQuoteIdentifiers(); + this.indexStatements = broadcastable.getIndexStatements(); } private List getRequiredKeyColumns(TableInfoProvider tableInfo) @@ -308,6 +300,47 @@ private static void validateDataframeFieldsInTable(TableInfoProvider tableInfo, Preconditions.checkArgument(unknownFields.isEmpty(), "Unknown fields in data frame => " + unknownFields); } + /** + * Validates secondary index constraints for bulk write operations. + *

+ * When the cluster is Cassandra 5.0+ and ALL indexes are SAI, the write is allowed because + * SAI index components are generated alongside SSTables and are immediately queryable after import. + *

+ * When any index is non-SAI (legacy 2i), the write is blocked unless SKIP_SECONDARY_INDEX_CHECK is set. + * + * @param tableInfo the table info provider + * @param skipSecondaryIndexCheck whether the user explicitly opted out of the check + * @param indexStatements the CREATE INDEX statements for the table + * @param lowestCassandraVersion the lowest Cassandra version in the cluster + */ + static void validateSecondaryIndexes(TableInfoProvider tableInfo, + boolean skipSecondaryIndexCheck, + Set indexStatements, + String lowestCassandraVersion) + { + if (!tableInfo.hasSecondaryIndex()) + { + return; // No indexes — nothing to validate + } + + if (isSaiWrite(indexStatements, lowestCassandraVersion)) + { + LOGGER.info("Table has SAI indexes on Cassandra 5.0+. SAI index components will be generated " + + "alongside SSTables. indexCount={}", indexStatements.size()); + return; + } + + if (skipSecondaryIndexCheck) + { + LOGGER.warn("Bulk writing to tables with SecondaryIndexes will have an asynchronous index rebuild " + + "take place automatically after writing. Reads against the index during this time " + + "window will produce inconsistent or stale results until index rebuild is complete."); + return; + } + + throw new UnsupportedAnalyticsOperationException("Bulkwriter doesn't support secondary indexes"); + } + static void validateNoSecondaryIndexes(TableInfoProvider tableInfo) { if (tableInfo.hasSecondaryIndex()) @@ -316,6 +349,45 @@ static void validateNoSecondaryIndexes(TableInfoProvider tableInfo) } } + public Set getIndexStatements() + { + return indexStatements; + } + + @VisibleForTesting + static boolean isCassandra5OrLater(String version) + { + if (version == null || version.isEmpty()) + { + return false; + } + + try + { + return CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(version).getMajorVersion() >= 50; + } + catch (RuntimeException exception) + { + // Unparseable version string — treat as "not 5.0+". + return false; + } + } + + /** + * Returns true when the bulk writer generates SAI components alongside SSTables for this table, i.e. the + * table's indexes are all SAI and the cluster is Cassandra 5.0+. This single predicate is shared by schema + * validation (to allow the write) and by the commit/restore paths (to enable SAI import options), so the + * "we generate SAI components" and "we validate SAI components on import" decisions can never diverge. + * + * @param indexStatements the CREATE INDEX statements for the table + * @param lowestCassandraVersion the lowest Cassandra version in the cluster + * @return true if this is an all-SAI write on Cassandra 5.0+ + */ + static boolean isSaiWrite(Set indexStatements, String lowestCassandraVersion) + { + return CqlUtils.hasOnlySaiIndexes(indexStatements) && isCassandra5OrLater(lowestCassandraVersion); + } + private static List getKeyFieldPositions(StructType dfSchema, List columnNames, List keyFieldNames) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableLister.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableLister.java index 2897956dc..5d3ff9759 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableLister.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/SSTableLister.java @@ -43,6 +43,7 @@ import org.apache.cassandra.bridge.CassandraBridge; import org.apache.cassandra.bridge.SSTableSummary; import org.apache.cassandra.spark.common.Digest; +import org.apache.cassandra.spark.common.SSTables; import org.apache.cassandra.spark.data.FileSystemSSTable; import org.apache.cassandra.spark.data.QualifiedTableName; import org.apache.cassandra.spark.data.SSTable; @@ -218,7 +219,7 @@ private String getSSTablePrefix(String componentName) private SSTable buildSSTable(List components) { List dataComponents = components.stream() - .filter(path -> path.getFileName().toString().contains("Data.db")) + .filter(SSTables::isDataComponent) .collect(Collectors.toList()); if (dataComponents.size() != 1) { diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index 99cbc6827..a45b12b76 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -295,7 +295,7 @@ private int initBulkReader(@NotNull ClientConfig options) throws ExecutionExcept String fullSchema = schemaFuture.get().schema(); String createStmt = CqlUtils.extractTableSchema(fullSchema, keyspace, table); - int indexCount = CqlUtils.extractIndexCount(fullSchema, keyspace, table); + Set indexStatements = CqlUtils.extractIndexStatements(fullSchema, keyspace, table); Set udts = CqlUtils.extractUdts(fullSchema, keyspace); ReplicationFactor replicationFactor = CqlUtils.extractReplicationFactor(fullSchema, keyspace); @@ -316,7 +316,7 @@ private int initBulkReader(@NotNull ClientConfig options) throws ExecutionExcept validateReplicationFactor(replicationFactor); udts.forEach(udt -> LOGGER.info("Adding schema UDT: '{}'", udt)); - cqlTable = bridge().buildSchema(createStmt, keyspace, replicationFactor, partitioner, udts, null, indexCount, false); + cqlTable = bridge().buildSchema(createStmt, keyspace, replicationFactor, partitioner, udts, null, indexStatements, false); CassandraRing ring = createCassandraRingFromRing(partitioner, replicationFactor, ringFuture.get()); int effectiveNumberOfCores = sizingFuture.get(); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java index 0a30cde03..077198587 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java @@ -51,6 +51,7 @@ import org.apache.cassandra.bridge.CassandraBridge; import org.apache.cassandra.bridge.CassandraBridgeFactory; import org.apache.cassandra.bridge.CassandraVersion; +import org.apache.cassandra.spark.common.SSTables; import org.apache.cassandra.spark.config.SchemaFeature; import org.apache.cassandra.spark.config.SchemaFeatureSet; import org.apache.cassandra.spark.data.partitioner.Partitioner; @@ -365,7 +366,7 @@ public SSTablesSupplier sstables(int partitionId, .stream(paths) .map(Paths::get) .flatMap(Throwing.function(Files::list)) - .filter(path -> path.getFileName().toString().endsWith("-" + FileType.DATA.getFileSuffix())); + .filter(SSTables::isDataComponent); } return new BasicSupplier(dataFilePathsStream diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java index 138bbffe9..7df89d16e 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java @@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.Arrays; +import java.util.Set; import com.google.common.collect.ImmutableMap; import org.apache.commons.io.FileUtils; @@ -260,10 +261,10 @@ public void testSecondaryIndexIsUnsupported() throws Exception { Path fullSchemaSampleFile = ResourceUtils.writeResourceToPath(CqlUtilsTest.class.getClassLoader(), tempPath, "cql/fullSchema.cql"); String fullSchemaSample = FileUtils.readFileToString(fullSchemaSampleFile.toFile(), StandardCharsets.UTF_8); - int indexCount = CqlUtils.extractIndexCount(fullSchemaSample, "cycling", "rank_by_year_and_name"); - assertThat(indexCount).isEqualTo(3); + Set indexStatements = CqlUtils.extractIndexStatements(fullSchemaSample, "cycling", "rank_by_year_and_name"); + assertThat(indexStatements).hasSize(3); CqlTable table = mock(CqlTable.class); - when(table.indexCount()).thenReturn(indexCount); + when(table.indexStatements()).thenReturn(indexStatements); TableInfoProvider tableInfoProvider = new CqlTableInfoProvider("", table); assertThatThrownBy(() -> TableSchema.validateNoSecondaryIndexes(tableInfoProvider)) .isInstanceOf(UnsupportedAnalyticsOperationException.class) diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java index 701ca337d..630b87f30 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java @@ -21,10 +21,13 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -495,6 +498,17 @@ public boolean hasSecondaryIndex() return hasSecondaryIndex; } + @Override + public Set getIndexStatements() + { + if (hasSecondaryIndex) + { + return new HashSet<>(Collections.singletonList("CREATE INDEX test_idx ON test." + uniqueTableName + " (col);")); + } + + return Collections.emptySet(); + } + @Override public List getColumnNames() { diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java index 5bb363d97..423ae1d51 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java @@ -321,7 +321,7 @@ public void testEscapedTableName(CassandraBridge bridge) ImmutableMap.of("datacenter1", 3)), Partitioner.Murmur3Partitioner, Collections.emptySet(), - null, 0, false); + null, Collections.emptySet(), false); assertThat(table.keyspace()).isEqualTo("ks"); assertThat(table.table()).isEqualTo("tb"); assertThat(table.getField("key").name()).isEqualTo("key"); @@ -552,7 +552,7 @@ public void testParseClusteringKeySchema(CassandraBridge bridge) new ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy, ImmutableMap.of("datacenter1", 3)), Partitioner.Murmur3Partitioner, - Collections.emptySet(), null, 0, false); + Collections.emptySet(), null, Collections.emptySet(), false); } @ParameterizedTest diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteSAIIndexTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteSAIIndexTest.java new file mode 100644 index 000000000..972f893bd --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteSAIIndexTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.stream.Stream; + +import com.vdurmont.semver4j.Semver; +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.testing.ClusterBuilderConfiguration; +import org.apache.cassandra.testing.TestUtils; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; +import static org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT; +import static org.apache.cassandra.testing.TestUtils.DC1_RF3; +import static org.apache.cassandra.testing.TestUtils.ROW_COUNT; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; + +/** + * Integration test for bulk write and read operations on a table with multiple Storage Attached Indexes (SAI). + * Verifies that SAI SSTable components are written to disk and that SAI index filtering works after bulk write. + */ +class BulkWriteSAIIndexTest extends SharedClusterSparkIntegrationTestBase +{ + private static final QualifiedName TABLE_SAI = + new QualifiedName(TEST_KEYSPACE, "test_sai"); + + @Test + void testBulkWriteAndReadWithMultipleSaiIndexes() + { + SparkSession spark = getOrCreateSparkSession(); + Dataset dfWrite = DataGenerationUtils.generateCourseData(spark, ROW_COUNT); + + // 1. Bulk write to table with SAI indexes on both course and marks + bulkWriterDataFrameWriter(dfWrite, TABLE_SAI).save(); + + // 2. Flush to ensure SSTable components (including SAI) are written to disk + cluster.getFirstRunningInstance().flush(TEST_KEYSPACE); + + // 3. Verify SAI index SSTable components exist on the filesystem + assertThat(hasSaiIndexFiles()) + .as("SAI index files should exist on disk after bulk write and flush") + .isTrue(); + + // 4. Bulk read the data back and verify equality + Dataset dfRead = bulkReaderDataFrame(TABLE_SAI).load(); + checkSmallDataFrameEquality(dfWrite, dfRead); + + // 5. Verify SAI index filtering works on the course column + Object[][] courseResults = cluster.getFirstRunningInstance() + .coordinator() + .execute(String.format("SELECT * FROM %s WHERE course = 'course0';", + TABLE_SAI), + ConsistencyLevel.ALL); + assertThat(courseResults).isNotNull(); + assertThat(courseResults.length).isGreaterThan(0); + for (Object[] row : courseResults) + { + // course is the second column (id, course, marks) + assertThat(row[1]).isEqualTo("course0"); + } + + // 6. Verify SAI index filtering works on the marks column + Object[][] marksResults = cluster.getFirstRunningInstance() + .coordinator() + .execute(String.format("SELECT * FROM %s WHERE marks = 50;", + TABLE_SAI), + ConsistencyLevel.ALL); + assertThat(marksResults).isNotNull(); + assertThat(marksResults.length) + .as("SAI filter on marks=50 should return the matching row") + .isGreaterThan(0); + + for (Object[] row : marksResults) + { + // marks is the third column (id, course, marks) + assertThat(row[2]).isEqualTo(50); + } + } + + /** + * Checks whether SAI index files exist on the filesystem for the test keyspace. + * SAI stores index data in directories or files with naming patterns that include + * "SAI" or reside under directories indicating SAI components. + */ + private boolean hasSaiIndexFiles() + { + String[] dataDirs = (String[]) cluster.get(1) + .config() + .getParams() + .get("data_file_directories"); + String dataDir = dataDirs[0]; + Path keyspacePath = Paths.get(dataDir, TEST_KEYSPACE); + + try (Stream walkStream = Files.walk(keyspacePath)) + { + return walkStream + .filter(Files::isRegularFile) + .anyMatch(path -> { + String pathStr = path.toString(); + // SAI index components are stored in directories or files + // with patterns like ".sai/" or "SAI" in the path + return pathStr.contains("SAI") || pathStr.contains(".sai"); + }); + } + catch (IOException e) + { + return false; + } + } + + @Override + protected void beforeClusterProvisioning() + { + assumeThat(TestUtils.getDTestClusterVersion().isGreaterThanOrEqualTo(new Semver("5.0", Semver.SemverType.LOOSE))) + .describedAs("Storage Attached Index (SAI) is only available in Cassandra 5.0 and above") + .isTrue(); + } + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(TEST_KEYSPACE, DC1_RF3); + createTestTable(TABLE_SAI, CREATE_TEST_TABLE_STATEMENT); + cluster.schemaChangeIgnoringStoppedInstances( + String.format("CREATE INDEX ON %s(course) USING 'StorageAttachedIndex';", TABLE_SAI)); + cluster.schemaChangeIgnoringStoppedInstances( + String.format("CREATE INDEX ON %s(marks) USING 'StorageAttachedIndex';", TABLE_SAI)); + } + + @Override + protected ClusterBuilderConfiguration testClusterConfiguration() + { + return super.testClusterConfiguration() + .nodesPerDc(3); + } +} diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSAIIndexTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSAIIndexTest.java new file mode 100644 index 000000000..97ab488c7 --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSAIIndexTest.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics.testcontainer; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Stream; + +import com.google.common.collect.ImmutableMap; +import com.vdurmont.semver4j.Semver; +import org.junit.jupiter.api.Test; + +import com.adobe.testing.s3mock.testcontainers.S3MockContainer; +import org.apache.cassandra.analytics.DataGenerationUtils; +import org.apache.cassandra.analytics.SharedClusterSparkIntegrationTestBase; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.sidecar.config.S3ClientConfiguration; +import org.apache.cassandra.sidecar.config.S3ProxyConfiguration; +import org.apache.cassandra.sidecar.config.yaml.S3ClientConfigurationImpl; +import org.apache.cassandra.sidecar.config.yaml.S3ProxyConfigurationImpl; +import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.testing.ClusterBuilderConfiguration; +import org.apache.cassandra.testing.TestUtils; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import static org.apache.cassandra.sidecar.config.yaml.S3ClientConfigurationImpl.DEFAULT_API_CALL_TIMEOUT; +import static org.apache.cassandra.sidecar.config.yaml.S3ClientConfigurationImpl.DEFAULT_THREAD_KEEP_ALIVE; +import static org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT; +import static org.apache.cassandra.testing.TestUtils.DC1_RF3; +import static org.apache.cassandra.testing.TestUtils.ROW_COUNT; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; + +/** + * End-to-end integration test for bulk writing a table with Storage Attached Indexes (SAI) via the + * S3-compatible (cloud storage / restore) transport. + * + *

This exercises both stages of the cloud-storage write path: + *

    + *
  • Stage 1 (write & upload): SSTables and their SAI index components are produced on the Spark + * side by {@code CQLSSTableWriter}, bundled, and uploaded to S3. Because the restore payload now sets + * {@code failOnMissingIndex=true} for SAI tables, the subsequent import would fail if the SAI components + * were absent from the bundle — so a successful write proves the components were generated and uploaded.
  • + *
  • Stage 2 (restore & import): the sidecar restore job downloads the slices and runs Cassandra's + * SSTable import with {@code failOnMissingIndex=true}/{@code validateIndexChecksum=true}, attaching the SAI + * components. Successful completion, the presence of SAI files on disk, and working SAI-filtered queries + * confirm the indexes are usable after restore.
  • + *
+ * + * SAI is a Cassandra 5.0+ feature, so the test is skipped on older clusters. + */ +class BulkWriteS3CompatModeSAIIndexTest extends SharedClusterSparkIntegrationTestBase +{ + // Must match the bucket reported by LocalStorageTransportExtension.getStorageConfiguration() + // (it imports BUCKET_NAME from BulkWriteS3CompatModeSimpleTest = "sbw-bucket"); the writer uploads + // bundles to that bucket, so the S3Mock initial bucket must use the same name or uploads 404. + public static final String BUCKET_NAME = "sbw-bucket"; + private static final QualifiedName TABLE_NAME = + new QualifiedName(TEST_KEYSPACE, BulkWriteS3CompatModeSAIIndexTest.class.getSimpleName().toLowerCase()); + private S3MockContainer s3Mock; + + /** + * Stage 1 + Stage 2 end-to-end: write a SAI table via S3_COMPAT, then verify the data and the SAI indexes + * survive the restore/import round-trip. + */ + @Test + void testS3CompatBulkWriteWithSaiIndexes() + { + SparkSession spark = getOrCreateSparkSession(); + Dataset df = DataGenerationUtils.generateCourseData(spark, ROW_COUNT); + Map s3CompatOptions = ImmutableMap.of( + "data_transport", "S3_COMPAT", + "data_transport_extension_class", LocalStorageTransportExtension.class.getCanonicalName(), + "storage_client_endpoint_override", s3Mock.getHttpEndpoint() // point to s3Mock server + ); + + // Stage 1 + Stage 2: write to S3 and restore/import. With failOnMissingIndex=true for SAI tables, this + // save() would throw if the SAI components were not generated, uploaded, and validated on import. + bulkWriterDataFrameWriter(df, TABLE_NAME, s3CompatOptions).save(); + + // Verify all rows are present after the restore/import round-trip. + sparkTestUtils.validateWrites(df.collectAsList(), queryAllData(TABLE_NAME)); + + // Stage 2 evidence: SAI index components landed on disk after import. + assertThat(hasSaiIndexFiles()) + .as("SAI index files should exist on disk after S3 restore/import") + .isTrue(); + + // Stage 2 evidence: SAI indexes are attached and usable for filtering on non-key columns. + Object[][] courseResults = cluster.getFirstRunningInstance() + .coordinator() + .execute(String.format("SELECT * FROM %s WHERE course = 'course0';", TABLE_NAME), + ConsistencyLevel.ALL); + assertThat(courseResults.length) + .as("SAI filter on course='course0' should return the matching row") + .isGreaterThan(0); + for (Object[] row : courseResults) + { + // course is the second column (id, course, marks) + assertThat(row[1]).isEqualTo("course0"); + } + + Object[][] marksResults = cluster.getFirstRunningInstance() + .coordinator() + .execute(String.format("SELECT * FROM %s WHERE marks = 50;", TABLE_NAME), + ConsistencyLevel.ALL); + assertThat(marksResults.length) + .as("SAI filter on marks=50 should return the matching row") + .isGreaterThan(0); + for (Object[] row : marksResults) + { + // marks is the third column (id, course, marks) + assertThat(row[2]).isEqualTo(50); + } + } + + /** + * Checks whether SAI index component files exist on disk for the test keyspace on the first node. + */ + private boolean hasSaiIndexFiles() + { + String[] dataDirs = (String[]) cluster.get(1) + .config() + .getParams() + .get("data_file_directories"); + Path keyspacePath = Paths.get(dataDirs[0], TEST_KEYSPACE); + try (Stream walkStream = Files.walk(keyspacePath)) + { + return walkStream + .filter(Files::isRegularFile) + .anyMatch(path -> { + String pathStr = path.toString(); + return pathStr.contains("SAI") || pathStr.contains(".sai"); + }); + } + catch (IOException e) + { + return false; + } + } + + @Override + protected void beforeClusterProvisioning() + { + assumeThat(TestUtils.getDTestClusterVersion().isGreaterThanOrEqualTo(new Semver("5.0", Semver.SemverType.LOOSE))) + .describedAs("Storage Attached Index (SAI) is only available in Cassandra 5.0 and above") + .isTrue(); + } + + @Override + protected void afterClusterProvisioned() + { + // must start s3Mock before starting sidecar, in order to provide the actual s3 server port to start sidecar + super.afterClusterProvisioned(); + s3Mock = new S3MockContainer("2.17.0") + .withInitialBuckets(BUCKET_NAME); + s3Mock.start(); + assertThat(s3Mock.isRunning()).isTrue(); + } + + @Override + protected void afterClusterShutdown() + { + if (s3Mock != null) + { + s3Mock.stop(); + } + } + + @Override + protected ClusterBuilderConfiguration testClusterConfiguration() + { + return super.testClusterConfiguration() + .nodesPerDc(3); + } + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(TEST_KEYSPACE, DC1_RF3); + createTestTable(TABLE_NAME, CREATE_TEST_TABLE_STATEMENT); + cluster.schemaChangeIgnoringStoppedInstances( + String.format("CREATE INDEX ON %s(course) USING 'StorageAttachedIndex';", TABLE_NAME)); + cluster.schemaChangeIgnoringStoppedInstances( + String.format("CREATE INDEX ON %s(marks) USING 'StorageAttachedIndex';", TABLE_NAME)); + } + + @Override + protected Function configurationOverrides() + { + return builder -> { + S3ClientConfiguration s3ClientConfig = new S3ClientConfigurationImpl("s3-client", 4, DEFAULT_THREAD_KEEP_ALIVE, + 5242880, DEFAULT_API_CALL_TIMEOUT, + buildTestS3ProxyConfig()); + builder.s3ClientConfiguration(s3ClientConfig); + return builder; + }; + } + + @Override + protected void beforeTestStart() + { + super.beforeTestStart(); + waitForSchemaReady(30, TimeUnit.SECONDS); + } + + private S3ProxyConfiguration buildTestS3ProxyConfig() + { + return new S3MockProxyConfigurationImpl(s3Mock.getHttpEndpoint()); + } + + public static class S3MockProxyConfigurationImpl extends S3ProxyConfigurationImpl + { + S3MockProxyConfigurationImpl(String endpointOverride) + { + super(null, null, null, endpointOverride); + } + } +} diff --git a/cassandra-avro-converter/src/main/java/org/apache/cassandra/cdc/avro/CqlToAvroSchemaConverter.java b/cassandra-avro-converter/src/main/java/org/apache/cassandra/cdc/avro/CqlToAvroSchemaConverter.java index aedbc2ddd..907921c0c 100644 --- a/cassandra-avro-converter/src/main/java/org/apache/cassandra/cdc/avro/CqlToAvroSchemaConverter.java +++ b/cassandra-avro-converter/src/main/java/org/apache/cassandra/cdc/avro/CqlToAvroSchemaConverter.java @@ -69,7 +69,7 @@ default Schema convert(String keyspace, String tableCreateStatement, Set new ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy, Collections.singletonMap("DC1", 3)), Partitioner.Murmur3Partitioner, - udts, null, 0, true); + udts, null, Collections.emptySet(), true); return convert(cqlTable); } diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java index 67c5bc7a0..3b6e21062 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java @@ -143,7 +143,7 @@ public CqlTable buildSchema(String createStatement, Partitioner partitioner, Set udts) { - return buildSchema(createStatement, keyspace, replicationFactor, partitioner, udts, null, 0, false); + return buildSchema(createStatement, keyspace, replicationFactor, partitioner, udts, null, Collections.emptySet(), false); } public abstract CqlTable buildSchema(String createStatement, @@ -152,7 +152,7 @@ public abstract CqlTable buildSchema(String createStatement, Partitioner partitioner, Set udts, @Nullable UUID tableId, - int indexCount, + Set indexStatements, boolean enableCdc); /** @@ -398,6 +398,7 @@ public abstract SSTableWriter getSSTableWriter(String inDirectory, String createStatement, String insertStatement, Set userDefinedTypeStatements, + Set indexCreateStatements, int bufferSizeMB); public abstract SSTableSummary getSSTableSummary(@NotNull String keyspace, diff --git a/cassandra-bridge/src/testFixtures/java/org/apache/cassandra/spark/utils/test/TestSchema.java b/cassandra-bridge/src/testFixtures/java/org/apache/cassandra/spark/utils/test/TestSchema.java index e96847496..12f5b8e53 100644 --- a/cassandra-bridge/src/testFixtures/java/org/apache/cassandra/spark/utils/test/TestSchema.java +++ b/cassandra-bridge/src/testFixtures/java/org/apache/cassandra/spark/utils/test/TestSchema.java @@ -498,7 +498,7 @@ public CqlTable buildTable() rf, allFields, udts, - 0); + Collections.emptySet()); } public void writeSSTable(TemporaryDirectory directory, diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java index 5dd648552..ad33708b4 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java @@ -100,6 +100,7 @@ import org.apache.cassandra.spark.reader.ReaderUtils; import org.apache.cassandra.spark.reader.RowData; import org.apache.cassandra.spark.reader.SchemaBuilder; +import org.apache.cassandra.spark.reader.StorageAttachedIndexApplier; import org.apache.cassandra.spark.reader.StreamScanner; import org.apache.cassandra.spark.reader.SummaryDbUtils; import org.apache.cassandra.spark.sparksql.CellIterator; @@ -281,10 +282,17 @@ public CqlTable buildSchema(String createStatement, Partitioner partitioner, Set udts, @Nullable UUID tableId, - int indexCount, + Set indexStatements, boolean enableCdc) { - return new SchemaBuilder(createStatement, keyspace, replicationFactor, partitioner, cassandraTypes -> udts, tableId, indexCount, enableCdc).build(); + CqlTable cqlTable = new SchemaBuilder(createStatement, keyspace, replicationFactor, partitioner, + cassandraTypes -> udts, tableId, indexStatements, enableCdc).build(); + // SAI is a Cassandra 5.0+ feature: register any Storage Attached Index definitions on the table metadata + // after the shared SchemaBuilder has built/registered the (index-less) table. + // buildSchema is invoked repeatedly within a JVM (the index-carrying per-partition RecordWriter included), + // so this re-applies the SAI definitions whenever a rebuild left the table without them. + StorageAttachedIndexApplier.maybeApply(keyspace, cqlTable.table(), indexStatements); + return cqlTable; } @Override @@ -619,11 +627,12 @@ public SSTableWriter getSSTableWriter(String inDirectory, String partitioner, String createStatement, String insertStatement, - @NotNull Set userDefinedTypeStatements, + Set userDefinedTypeStatements, + Set indexCreateStatements, int bufferSizeMB) { return new SSTableWriterImplementation(inDirectory, partitioner, createStatement, insertStatement, - userDefinedTypeStatements, bufferSizeMB); + userDefinedTypeStatements, indexCreateStatements, bufferSizeMB); } @Override diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java new file mode 100644 index 000000000..67195f210 --- /dev/null +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.bridge; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import javax.annotation.concurrent.NotThreadSafe; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.Config; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.RandomPartitioner; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.io.sstable.CQLSSTableWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.spark.utils.CqlUtils; +import org.jetbrains.annotations.NotNull; + +/** + * Cassandra 5.0 SSTableWriter implementation with SAI index generation support. + *

+ * When provided with CREATE INDEX statements, this writer attempts to configure the + * CQLSSTableWriter to generate SAI index component files alongside SSTables. + * This requires Cassandra 5.0+ with SAI support in CQLSSTableWriter. + */ +@NotThreadSafe +public class SSTableWriterImplementation implements SSTableWriter +{ + private static final Logger LOGGER = LoggerFactory.getLogger(SSTableWriterImplementation.class); + + static + { + Config.setClientMode(true); + } + + private final CQLSSTableWriter writer; + private Consumer> producedSSTablesListener; + + public SSTableWriterImplementation(String inDirectory, + String partitioner, + String createStatement, + String insertStatement, + @NotNull Set userDefinedTypeStatements, + @NotNull Set indexCreateStatements, + int bufferSizeMB) + { + this(inDirectory, determineSupportedPartitioner(partitioner), createStatement, insertStatement, + userDefinedTypeStatements, indexCreateStatements, bufferSizeMB); + } + + @VisibleForTesting + public SSTableWriterImplementation(String inDirectory, + IPartitioner partitioner, + String createStatement, + String insertStatement, + @NotNull Set userDefinedTypeStatements, + @NotNull Set indexCreateStatements, + int bufferSizeMB) + { + this.writer = configureBuilder(inDirectory, createStatement, insertStatement, + bufferSizeMB, userDefinedTypeStatements, indexCreateStatements, + this::onSSTablesProduced, partitioner) + .build(); + } + + private static IPartitioner determineSupportedPartitioner(String partitioner) + { + return partitioner.toLowerCase().contains("random") + ? new RandomPartitioner() + : new Murmur3Partitioner(); + } + + private void onSSTablesProduced(Collection sstables) + { + Objects.requireNonNull(producedSSTablesListener, "producedSSTablesListener is not set"); + Set sstableDescriptors = sstables + .stream() + .map(sstable -> { + String baseFilename = CassandraBridgeImplementation.baseFilename(sstable.descriptor); + // TODO: for now, the sstableReader is closed immediately, + // TODO (CONTI): we can potentially read from the reader to validate the underlying sstable, + // TODO (CONTI): replacing org.apache.cassandra.spark.bulkwriter.SortedSSTableWriter.validateSSTables + sstable.selfRef().close(); + return new SSTableDescriptor(baseFilename); + }) + .collect(Collectors.toSet()); + + producedSSTablesListener.accept(sstableDescriptors); + } + + @Override + public void addRow(Map values) throws IOException + { + try + { + writer.addRow(values); + } + catch (InvalidRequestException exception) + { + throw new RuntimeException(exception); + } + } + + @Override + public void setSSTablesProducedListener(Consumer> listener) + { + producedSSTablesListener = Objects.requireNonNull(listener); + } + + @Override + public void close() throws IOException + { + writer.close(); + } + + @VisibleForTesting + static CQLSSTableWriter.Builder configureBuilder(String inDirectory, + String createStatement, + String insertStatement, + int bufferSizeMB, + Set udts, + Set indexCreateStatements, + Consumer> producedSSTablesListener, + IPartitioner cassPartitioner) + { + CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder(); + + for (String udt : udts) + { + builder.withType(udt); + } + + builder.inDirectory(inDirectory) + .forTable(createStatement) + .withPartitioner(cassPartitioner) + .using(insertStatement) + // The data frame to write is always sorted, + // see org.apache.cassandra.spark.bulkwriter.CassandraBulkSourceRelation.insert + .sorted() + .withSSTableProducedListener(producedSSTablesListener) + .openSSTableOnProduced() + .withMaxSSTableSizeInMiB(bufferSizeMB); + + // Only SAI indexes are generated alongside SSTables. Filter out any non-SAI (legacy 2i) statements so they + // are never handed to CQLSSTableWriter.withIndexes, which expects SAI definitions. Mixed-index tables can + // only reach this point via SKIP_SECONDARY_INDEX_CHECK; their 2i indexes are rebuilt by Cassandra on import. + List saiIndexStatements = indexCreateStatements.stream() + .filter(CqlUtils::isSaiIndex) + .collect(Collectors.toList()); + if (!saiIndexStatements.isEmpty()) + { + builder.withIndexes(saiIndexStatements.toArray(new String[0])); + builder.withBuildIndexes(true); + LOGGER.info("SAI index generation enabled for {} index(es)", saiIndexStatements.size()); + } + + return builder; + } +} diff --git a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/bridge/SSTableWriterImplementationTest.java b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/bridge/SSTableWriterImplementationTest.java index 042b7deb8..d269addae 100644 --- a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/bridge/SSTableWriterImplementationTest.java +++ b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/bridge/SSTableWriterImplementationTest.java @@ -69,6 +69,7 @@ void testSSTableWriterConfiguration() throws NoSuchFieldException, IllegalAccess INSERT_STATEMENT, 250, Collections.emptySet(), + Collections.emptySet(), sstables -> {}, new Murmur3Partitioner()); @@ -86,6 +87,7 @@ void testGetProducedSSTables() throws IOException CREATE_STATEMENT, INSERT_STATEMENT, Collections.emptySet(), + Collections.emptySet(), 1); writer.setSSTablesProducedListener(produced::addAll); assertThat(produced).isEmpty(); diff --git a/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/reader/StorageAttachedIndexApplier.java b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/reader/StorageAttachedIndexApplier.java new file mode 100644 index 000000000..62f280a58 --- /dev/null +++ b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/reader/StorageAttachedIndexApplier.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.reader; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.bridge.CassandraSchema; +import org.apache.cassandra.bridge.SchemaUpdater; +import org.apache.cassandra.cql3.CQLFragmentParser; +import org.apache.cassandra.cql3.CqlParser; +import org.apache.cassandra.cql3.statements.schema.CreateIndexStatement; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.Tables; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.spark.utils.CqlUtils; + +/** + * Registers Storage Attached Index (SAI) definitions onto a table's metadata in the in-JVM Cassandra schema. + *

+ * SAI is a Cassandra 5.0+ feature, so this is invoked only by the 5.0 bridge, after the shared {@link SchemaBuilder} + * has registered the (index-less) table. Idempotent: a no-op if the registered table already carries indexes (the + * shared builder preserves previously-registered indexes across repeated index-less rebuilds within a JVM). + */ +public final class StorageAttachedIndexApplier +{ + private static final Logger LOGGER = LoggerFactory.getLogger(StorageAttachedIndexApplier.class); + + private StorageAttachedIndexApplier() + { + } + + /** + * Applies any Storage Attached Index definitions in {@code indexStatements} to the already-registered table. + * Non-SAI (legacy 2i) and empty index sets are ignored. Idempotent: if the registered table already carries + * indexes (e.g. a prior call within the same JVM applied them), this is a no-op. + * + * @param keyspace the keyspace of the table + * @param table the table name + * @param indexStatements the CREATE INDEX statements associated with the table (may be empty or null) + */ + public static void maybeApply(String keyspace, String table, Set indexStatements) + { + if (indexStatements == null || indexStatements.isEmpty()) + { + return; + } + + List saiStatements = indexStatements.stream() + .filter(CqlUtils::isSaiIndex) + .collect(Collectors.toList()); + if (saiStatements.isEmpty()) + { + return; + } + + CassandraSchema.update(schema -> { + TableMetadata current = schema.getTableMetadata(keyspace, table); + if (current == null) + { + throw new IllegalStateException("SAI index application found no registered table metadata for " + + keyspace + '.' + table); + } + + if (!current.indexes.isEmpty()) + { + // Indexes already registered for this table (buildSchema is invoked repeatedly within a JVM). + return; + } + + KeyspaceMetadata keyspaceMetadata = schema.getKeyspaceMetadata(keyspace); + Keyspaces keyspaces = Keyspaces.of(keyspaceMetadata.withSwapped(Tables.of(current))); + ClientState state = ClientState.forInternalCalls(); + for (String saiStatement : saiStatements) + { + CreateIndexStatement.Raw raw = CQLFragmentParser.parseAny(CqlParser::createIndexStatement, + saiStatement, "CREATE INDEX"); + keyspaces = raw.prepare(state).apply(keyspaces); + } + + TableMetadata withIndexes = keyspaces.get(keyspace) + .flatMap(ks -> ks.tables.get(table)) + .orElseThrow(() -> new IllegalStateException( + "SAI index application produced no table metadata for " + + keyspace + '.' + table)); + SchemaUpdater.updateTable(schema, keyspaceMetadata, withIndexes); + LOGGER.info("Applied {} SAI index(es) to table metadata keyspace={} table={}", + saiStatements.size(), keyspace, table); + }); + } +} diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java index 5a8f9ed8b..59cef5bb3 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java @@ -278,10 +278,11 @@ public CqlTable buildSchema(String createStatement, Partitioner partitioner, Set udts, @Nullable UUID tableId, - int indexCount, + Set indexStatements, boolean enableCdc) { - return new SchemaBuilder(createStatement, keyspace, replicationFactor, partitioner, cassandraTypes -> udts, tableId, indexCount, enableCdc).build(); + return new SchemaBuilder(createStatement, keyspace, replicationFactor, partitioner, + cassandraTypes -> udts, tableId, indexStatements, enableCdc).build(); } @Override @@ -600,9 +601,12 @@ public SSTableWriter getSSTableWriter(String inDirectory, String partitioner, String createStatement, String insertStatement, - @NotNull Set userDefinedTypeStatements, + Set userDefinedTypeStatements, + Set indexCreateStatements, int bufferSizeMB) { + // indexCreateStatements is intentionally ignored: SAI component generation is a Cassandra 5.0+ feature + // handled by the five-zero bridge. On 4.0, indexes are rebuilt by Cassandra after import as before. return new SSTableWriterImplementation(inDirectory, partitioner, createStatement, insertStatement, userDefinedTypeStatements, bufferSizeMB); } diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java index 5d0493a1f..c3e1c09a3 100644 --- a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java @@ -84,7 +84,7 @@ public class SchemaBuilder private final String keyspace; private final ReplicationFactor replicationFactor; private final CassandraTypes cassandraTypes; - private final int indexCount; + private final Set indexStatements; public SchemaBuilder(CqlTable table, Partitioner partitioner, boolean enableCdc) { @@ -104,14 +104,15 @@ public SchemaBuilder(CqlTable table, Partitioner partitioner, UUID tableId, bool partitioner, table::udtCreateStmts, tableId, - 0, + Collections.emptySet(), enableCdc); } @VisibleForTesting public SchemaBuilder(String createStmt, String keyspace, ReplicationFactor replicationFactor) { - this(createStmt, keyspace, replicationFactor, Partitioner.Murmur3Partitioner, bridge -> Collections.emptySet(), null, 0, false); + this(createStmt, keyspace, replicationFactor, Partitioner.Murmur3Partitioner, bridge -> Collections.emptySet(), + null, Collections.emptySet(), false); } @VisibleForTesting @@ -120,7 +121,8 @@ public SchemaBuilder(String createStmt, ReplicationFactor replicationFactor, Partitioner partitioner) { - this(createStmt, keyspace, replicationFactor, partitioner, bridge -> Collections.emptySet(), null, 0, false); + this(createStmt, keyspace, replicationFactor, partitioner, bridge -> Collections.emptySet(), null, + Collections.emptySet(), false); } public SchemaBuilder(String createStmt, @@ -129,14 +131,14 @@ public SchemaBuilder(String createStmt, Partitioner partitioner, Function> udtStatementsProvider, @Nullable UUID tableId, - int indexCount, + Set indexStatements, boolean enableCdc) { this.createStmt = createStmt; this.keyspace = keyspace; this.replicationFactor = replicationFactor; this.cassandraTypes = new CassandraTypesImplementation(); - this.indexCount = indexCount; + this.indexStatements = indexStatements; Pair updated = CassandraSchema.apply(schema -> updateSchema(schema, @@ -222,6 +224,20 @@ private static Pair updateSchema(Schema schema, } tableMetadata.columns().forEach(columnValidator); + + // This builder always produces an index-less table (indexes are applied later by the 5.0 bridge), so a + // rebuild never carries indexes even if the caller passed index statements. buildSchema runs repeatedly per + // table in a JVM; if an earlier call already registered indexes, copy them onto this rebuild so it matches + // the registered table. + if (maybeExistingTableMetadata != null + && !maybeExistingTableMetadata.indexes.isEmpty() + && tableMetadata.indexes.isEmpty()) + { + tableMetadata = tableMetadata.unbuild() + .indexes(maybeExistingTableMetadata.indexes) + .build(); + } + setupTableAndUdt(schema, keyspace, tableMetadata, types); return validateKeyspaceTable(schema, keyspace, tableMetadata.name); @@ -477,7 +493,7 @@ public CqlTable build() replicationFactor, fields, new HashSet<>(udts.values()), - indexCount); + indexStatements); } private Map buildsUdts(KeyspaceMetadata keyspaceMetadata)