Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.5.0
-----
* Add Storage Attached Index (SAI) support to the bulk writer (CASSANALYTICS-31)
* Upgrade sidecar version to 0.4.0
* Exclude IP address from RingInstance equality so node replacement does not fail bulk write jobs (CASSANALYTICS-175)
* Regenerate bloom filters for CQLSSTableWriter (CASSANALYTICS-167)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class SSTableImportOptions extends HashMap<String, String>
private static final String INVALIDATE_CACHES = "invalidateCaches";
private static final String EXTENDED_VERIFY = "extendedVerify";
private static final String COPY_DATA = "copyData";
private static final String FAIL_ON_MISSING_INDEX = "failOnMissingIndex";
private static final String VALIDATE_INDEX_CHECKSUM = "validateIndexChecksum";

public static SSTableImportOptions defaults()
{
Expand Down Expand Up @@ -126,4 +128,34 @@ public boolean copyData()
{
return Boolean.parseBoolean(get(COPY_DATA));
}

/**
* When enabled, SSTable import fails if a table with Storage Attached Indexes (SAI) is missing its index
* components, instead of silently rebuilding them. Only meaningful for SAI tables (Cassandra 5.0+).
*/
public SSTableImportOptions failOnMissingIndex(boolean enabled)
{
put(FAIL_ON_MISSING_INDEX, Boolean.toString(enabled));
return this;
}

public boolean failOnMissingIndex()
{
return Boolean.parseBoolean(get(FAIL_ON_MISSING_INDEX));
}

/**
* When enabled, SSTable import validates the checksums of Storage Attached Index (SAI) components.
* Only meaningful for SAI tables (Cassandra 5.0+).
*/
public SSTableImportOptions validateIndexChecksum(boolean enabled)
{
put(VALIDATE_INDEX_CHECKSUM, Boolean.toString(enabled));
return this;
}

public boolean validateIndexChecksum()
{
return Boolean.parseBoolean(get(VALIDATE_INDEX_CHECKSUM));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public static class ImportOptions
private Boolean invalidateCaches;
private Boolean extendedVerify;
private Boolean copyData;
private Boolean failOnMissingIndex;
private Boolean validateIndexChecksum;

public ImportOptions()
{
Expand Down Expand Up @@ -153,6 +155,32 @@ public ImportOptions copyData(boolean copyData)
this.copyData = copyData;
return this;
}

/**
* Sets the {@code failOnMissingIndex} and returns a reference to this ImportOptions enabling method chaining.
* When enabled, Cassandra validates SAI index components during import.
*
* @param failOnMissingIndex the {@code failOnMissingIndex} to set
* @return a reference to this ImportOptions
*/
public ImportOptions failOnMissingIndex(boolean failOnMissingIndex)
{
this.failOnMissingIndex = failOnMissingIndex;
return this;
}

/**
* Sets the {@code validateIndexChecksum} and returns a reference to this ImportOptions enabling method chaining.
* When enabled, Cassandra verifies SAI index component checksums during import.
*
* @param validateIndexChecksum the {@code validateIndexChecksum} to set
* @return a reference to this ImportOptions
*/
public ImportOptions validateIndexChecksum(boolean validateIndexChecksum)
{
this.validateIndexChecksum = validateIndexChecksum;
return this;
}
}

static String requestURI(String keyspace, String tableName, String uploadId, ImportOptions importOptions)
Expand Down Expand Up @@ -205,6 +233,14 @@ private static List<String> selectedOptions(ImportOptions importOptions)
{
options.add("copyData=" + importOptions.copyData);
}
if (importOptions.failOnMissingIndex != null)
{
options.add("failOnMissingIndex=" + importOptions.failOnMissingIndex);
}
if (importOptions.validateIndexChecksum != null)
{
options.add("validateIndexChecksum=" + importOptions.validateIndexChecksum);
}

return options;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void testMockedCdc(CassandraVersion version)
Partitioner.Murmur3Partitioner,
table.udtCreateStmts(bridge.cassandraTypes()),
null,
0,
Collections.emptySet(),
true);
SchemaSupplier schemaSupplier = () -> CompletableFuture.completedFuture(ImmutableSet.of(table));
AtomicReference<byte[]> state = new AtomicReference<>();
Expand Down Expand Up @@ -514,13 +514,13 @@ public void testMultiTable(CassandraVersion version)
ReplicationFactor.simpleStrategy(1),
Partitioner.Murmur3Partitioner,
Collections.emptySet(),
null, 0, schema2.withCdc);
null, Collections.emptySet(), schema2.withCdc);
bridge.buildSchema(cqlTable3.createStatement(),
cqlTable3.keyspace(),
ReplicationFactor.simpleStrategy(1),
Partitioner.Murmur3Partitioner,
Collections.emptySet(),
null, 0, schema3.withCdc);
null, Collections.emptySet(), schema3.withCdc);
int numRows = DEFAULT_NUM_ROWS;

AtomicReference<TestSchema> schema1Holder = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ private void runTest(CassandraBridge bridge,
ReplicationFactor.simpleStrategy(1),
Partitioner.Murmur3Partitioner,
Collections.emptySet(),
null, 0, schema.withCdc);
null, Collections.emptySet(), schema.withCdc);
schema.setCassandraVersion(bridge.getVersion());

try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -266,7 +267,7 @@ public void run()
{
LOGGER.info("Running CDC test testId={} schema='{}' thread={}", testId, cqlTable.fields(), Thread.currentThread().getName());
Set<String> udtStmts = schema.udts.stream().map(e -> e.createStatement(bridge.cassandraTypes(), schema.keyspace)).collect(Collectors.toSet());
bridge.buildSchema(schema.createStatement, schema.keyspace, schema.rf, partitioner, udtStmts, null, 0, true);
bridge.buildSchema(schema.createStatement, schema.keyspace, schema.rf, partitioner, udtStmts, null, Collections.emptySet(), true);
schema.setCassandraVersion(bridge.getVersion());

// write some mutations to CDC CommitLog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testReaderSeek(CassandraVersion version)
ReplicationFactor.simpleStrategy(1),
Partitioner.Murmur3Partitioner,
Collections.emptySet(),
null, 0, true);
null, Collections.emptySet(), true);
int numRows = 1000;

// write some rows to a CommitLog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,52 @@
import java.nio.file.Path;

import org.apache.cassandra.bridge.SSTableDescriptor;
import org.apache.cassandra.spark.data.FileType;

public final class SSTables
{
/**
* Suffix identifying the primary SSTable data component, e.g. "-Data.db".
* The leading '-' is significant: it excludes SAI per-index components such as
* "...+TermsData.db" that also end with "Data.db".
*/
private static final String DATA_COMPONENT_SUFFIX = "-" + FileType.DATA.getFileSuffix();

/**
* Glob matching primary SSTable data components, e.g. "*-Data.db".
* Suitable for {@link java.nio.file.Files#newDirectoryStream(Path, String)}.
*/
public static final String DATA_COMPONENT_GLOB = "*" + DATA_COMPONENT_SUFFIX;

private SSTables()
{
throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated");
}

/**
* Determine whether the given file name is a primary SSTable data component ("&lt;descriptor&gt;-Data.db").
* The leading '-' check excludes SAI per-index components such as "...+TermsData.db" which also end with "Data.db".
*
* @param fileName file name (not a full path)
* @return true if the name is a primary data component
*/
public static boolean isDataComponent(String fileName)
{
return fileName.endsWith(DATA_COMPONENT_SUFFIX);
}

/**
* Determine whether the given path is a primary SSTable data component ("&lt;descriptor&gt;-Data.db").
*
* @param path file path
* @return true if the path's file name is a primary data component
* @see #isDataComponent(String)
*/
public static boolean isDataComponent(Path path)
{
return isDataComponent(path.getFileName().toString());
}

/**
* Get the sstable base name from data file path.
* For example, the base name of data file '/path/to/table/nb-1-big-Data.db' is 'nb-1-big'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ public class CqlTable implements Serializable
private final List<CqlField> staticColumns;
private final List<CqlField> valueColumns;
private final transient Map<String, CqlField> columns;
private final int indexCount;
private final Set<String> indexStatements;

public CqlTable(@NotNull String keyspace,
@NotNull String table,
@NotNull String createStatement,
@NotNull ReplicationFactor replicationFactor,
@NotNull List<CqlField> fields)
{
this(keyspace, table, createStatement, replicationFactor, fields, Collections.emptySet(), 0);
this(keyspace, table, createStatement, replicationFactor, fields, Collections.emptySet(), Collections.emptySet());
}

public CqlTable(@NotNull String keyspace,
Expand All @@ -74,7 +74,7 @@ public CqlTable(@NotNull String keyspace,
@NotNull ReplicationFactor replicationFactor,
@NotNull List<CqlField> fields,
@NotNull Set<CqlField.CqlUdt> udts,
int indexCount)
@NotNull Set<String> indexStatements)
{
this.keyspace = keyspace;
this.table = table;
Expand All @@ -87,7 +87,7 @@ public CqlTable(@NotNull String keyspace,
this.staticColumns = this.fields.stream().filter(CqlField::isStaticColumn).sorted().collect(Collectors.toList());
this.valueColumns = this.fields.stream().filter(CqlField::isValueColumn).sorted().collect(Collectors.toList());
this.udts = Collections.unmodifiableSet(udts);
this.indexCount = indexCount;
this.indexStatements = Collections.unmodifiableSet(indexStatements);

// We use a linked hashmap to guarantee ordering of a 'SELECT * FROM ...'
this.columns = new LinkedHashMap<>();
Expand Down Expand Up @@ -241,9 +241,14 @@ public String createStatement()
return createStatement;
}

public Set<String> indexStatements()
{
return indexStatements;
}

public int indexCount()
{
return indexCount;
return indexStatements.size();
}

/**
Expand Down Expand Up @@ -382,8 +387,15 @@ public CqlTable read(Kryo kryo, Input input, Class type)
{
udts.add((CqlField.CqlUdt) CqlField.CqlType.read(input, cassandraTypes));
}
int indexCount = input.readInt();
return new CqlTable(keyspace, table, createStatement, replicationFactor, fields, udts, indexCount);

int numIndexStatements = input.readInt();
Set<String> indexStatements = new LinkedHashSet<>(numIndexStatements);
for (int idx = 0; idx < numIndexStatements; idx++)
{
indexStatements.add(input.readString());
}

return new CqlTable(keyspace, table, createStatement, replicationFactor, fields, udts, indexStatements);
}

@Override
Expand All @@ -405,7 +417,13 @@ public void write(Kryo kryo, Output output, CqlTable table)
{
udt.write(output);
}
output.writeInt(table.indexCount());

Set<String> indexStatements = table.indexStatements();
output.writeInt(indexStatements.size());
for (String stmt : indexStatements)
{
output.writeString(stmt);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -59,6 +60,9 @@ public final class CqlUtils
private static final Pattern ESCAPED_DOUBLE_BACKSLASH = Pattern.compile("\\\\");
private static final Pattern COMPACTION_STRATEGY_PATTERN = Pattern.compile("compaction\\s*=\\s*\\{\\s*'class'\\s*:\\s*'([^']+)'");

private static final Pattern MULTI_WHITESPACE_PATTERN = Pattern.compile("\\s+");
private static final Pattern SAI_USING_PATTERN = Pattern.compile("USING '[^']*STORAGEATTACHEDINDEX'");

private CqlUtils()
{
throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated");
Expand Down Expand Up @@ -259,16 +263,66 @@ public static Set<String> extractUdts(@NotNull String schemaStr, @NotNull String
}

public static int extractIndexCount(@NotNull String schemaStr, @NotNull String keyspace, @NotNull String table)
{
return extractIndexStatements(schemaStr, keyspace, table).size();
}

/**
* Extracts CREATE INDEX statements for the given table from the schema string.
*
* @param schemaStr full cluster schema text
* @param keyspace the keyspace name
* @param table the table name
* @return set of CREATE INDEX statements for the table
*/
public static Set<String> extractIndexStatements(@NotNull String schemaStr,
@NotNull String keyspace,
@NotNull String table)
{
String cleaned = cleanCql(schemaStr);
Pattern pattern = Pattern.compile(String.format("CREATE (CUSTOM )?INDEX \"?[^ ]* ON ?\"?%s?\"?\\.{1}\"?%s\"?[^;]*;", keyspace, table));
Matcher matcher = pattern.matcher(cleaned);
int indexCount = 0;
Set<String> statements = new HashSet<>();
while (matcher.find())
{
indexCount++;
statements.add(matcher.group());
}
return indexCount;
return statements;
}

/**
* Returns true if the given CREATE INDEX statement defines a Storage Attached Index (SAI).
* <p>
* SAI class may appear either as the short name ("StorageAttachedIndex") or fully qualified
* ("org.apache.cassandra.index.sai.StorageAttachedIndex").
*
* @param createIndexStatement a CREATE INDEX CQL statement
* @return true if the index uses SAI
*/
public static boolean isSaiIndex(@NotNull String createIndexStatement)
{
// Matches any run of whitespace (spaces, tabs, newlines). Used to collapse a CREATE INDEX statement to
// single-spaced text so the SAI marker can be matched regardless of how the schema text was formatted
// (e.g. "USING 'StorageAttachedIndex'", a newline before USING, or a tab all normalize to one space).
String normalized = MULTI_WHITESPACE_PATTERN.matcher(createIndexStatement).replaceAll(" ").toUpperCase(Locale.ROOT);

// Matches the SAI marker inside a USING '...' clause (statement already upper-cased and whitespace-collapsed).
// The leading [^']* tolerates the fully-qualified class form
// (e.g. 'org.apache.cassandra.index.sai.StorageAttachedIndex') as well as the short name.
return SAI_USING_PATTERN.matcher(normalized).find();
}

/**
* Returns true when {@code indexStatements} is non-empty and every statement defines a Storage Attached Index.
* This is the single "all-SAI table" predicate shared by schema validation and the write/commit paths, so the
* decision to generate SAI components and the decision to enable SAI import options can never disagree.
*
* @param indexStatements the CREATE INDEX statements for a table
* @return true if all indexes are SAI (and at least one exists)
*/
public static boolean hasOnlySaiIndexes(@NotNull Set<String> indexStatements)
{
return !indexStatements.isEmpty() && indexStatements.stream().allMatch(CqlUtils::isSaiIndex);
}

/**
Expand Down
Loading