From c77ab9185e92436491b7db9f861014d47c8c7c05 Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Tue, 14 Oct 2025 14:09:46 +0200 Subject: [PATCH 1/4] CASSANALYTICS-26: Support vector data type --- .../cdc/test/TestVersionSupplier.java | 2 +- .../cassandra/bridge/CassandraVersion.java | 2 +- .../cassandra/spark/data/CassandraTypes.java | 11 + .../apache/cassandra/spark/data/CqlField.java | 8 +- .../cassandra/spark/utils/CqlUtils.java | 2 +- .../bulkwriter/SqlToCqlTypeConverter.java | 2 + .../bulkwriter/MockBulkWriterContext.java | 2 +- .../spark/bulkwriter/RecordWriterTest.java | 2 +- .../StreamSessionConsistencyTest.java | 2 +- .../spark/endtoend/DataTypeTests.java | 101 +++ .../cassandra/spark/endtoend/MiscTests.java | 16 +- .../build.gradle | 4 +- .../distributed/impl/CassandraCluster.java | 5 + .../SharedClusterIntegrationTestBase.java | 30 +- .../analytics/BulkReaderVectorTest.java | 114 ++++ .../analytics/BulkWriteVectorTest.java | 87 +++ .../SparkSqlTypeConverterImplementation.java | 4 + .../cassandra/bridge/CassandraBridge.java | 5 + .../data/converter/types/VectorTypeTests.java | 59 ++ .../bridge/CassandraTypesImplementation.java | 8 + .../spark/data/complex/CqlVector.java | 140 +++++ .../cassandra/spark/reader/SchemaBuilder.java | 114 ++++ .../spark/reader/AbstractStreamScanner.java | 2 +- .../bridge/AbstractCassandraTypes.java | 6 + .../spark/reader/AbstractSchemaBuilder.java | 593 ++++++++++++++++++ .../cassandra/spark/reader/SchemaBuilder.java | 513 +-------------- gradle.properties | 4 +- gradlew | 6 + scripts/build-dtest-jars.sh | 2 +- scripts/relocate-dtest-dependencies.pom | 1 + 30 files changed, 1315 insertions(+), 532 deletions(-) create mode 100644 cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderVectorTest.java create mode 100644 cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteVectorTest.java create mode 100644 cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/data/converter/types/VectorTypeTests.java create mode 100644 cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlVector.java create mode 100644 cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java create mode 100644 cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/AbstractSchemaBuilder.java diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/TestVersionSupplier.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/TestVersionSupplier.java index 5443e5bf8..e6034ffd3 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/TestVersionSupplier.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/TestVersionSupplier.java @@ -32,7 +32,7 @@ private TestVersionSupplier() public static Stream testVersions() { - String versions = System.getProperty("cassandra.sidecar.versions_to_test", "4.0.17,5.0.5"); + String versions = System.getProperty("cassandra.sidecar.versions_to_test", "4.0.17,5.0.7"); return Arrays.stream(versions.split(",")) .map(String::trim) .map(v -> CassandraVersion.fromVersion(v).orElseThrow(() -> new IllegalArgumentException("Unsupported version: " + v))); diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/CassandraVersion.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/CassandraVersion.java index 1e23d1350..850aadca8 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/CassandraVersion.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/CassandraVersion.java @@ -98,7 +98,7 @@ public String jarBaseName() // NOTE: These default versions must stay in sync with cassandraFullVersionMap in build.gradle. String providedSupportedVersionsOrDefault = System.getProperty("cassandra.analytics.bridges.supported_versions", - "cassandra-4.0.17,cassandra-5.0.5"); + "cassandra-4.0.17,cassandra-5.0.7"); supportedVersions = Arrays.stream(providedSupportedVersionsOrDefault.split(",")) .filter(version -> CassandraVersion.fromVersion(version) .filter(v -> v.sstableFormats.contains(sstableFormat)) diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CassandraTypes.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CassandraTypes.java index c31e8fe55..5444d5ed4 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CassandraTypes.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CassandraTypes.java @@ -37,6 +37,7 @@ public abstract class CassandraTypes { public static final Pattern COLLECTION_PATTERN = Pattern.compile("^(set|list|map|tuple)<(.+)>$", Pattern.CASE_INSENSITIVE); + public static final Pattern VECTOR_PATTERN = Pattern.compile("^(vector)<(.+),(.+)>$", Pattern.CASE_INSENSITIVE); public static final Pattern FROZEN_PATTERN = Pattern.compile("^frozen<(.*)>$", Pattern.CASE_INSENSITIVE); private final UDTs udts = new UDTs(); @@ -133,6 +134,8 @@ public List supportedTypes() public abstract CqlField.CqlList list(CqlField.CqlType type); + public abstract CqlField.CqlVector vector(CqlField.CqlType type, int dimensions); + public abstract CqlField.CqlSet set(CqlField.CqlType type); public abstract CqlField.CqlMap map(CqlField.CqlType keyType, CqlField.CqlType valueType); @@ -189,6 +192,14 @@ public CqlField.CqlType parseType(String type, Map udts .map(collectionType -> parseType(collectionType, udts)) .toArray(CqlField.CqlType[]::new)); } + Matcher vectorMatcher = VECTOR_PATTERN.matcher(type); + if (vectorMatcher.find()) + { + // CQL vector + String subType = vectorMatcher.group(2); + int dimensions = Integer.parseInt(vectorMatcher.group(3).trim()); + return vector(parseType(subType, udts), dimensions); + } Matcher frozenMatcher = FROZEN_PATTERN.matcher(type); if (frozenMatcher.find()) { diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java index 1c15fad2d..b228e36e0 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java @@ -67,7 +67,7 @@ public interface CqlType extends Serializable { enum InternalType { - NativeCql, Set, List, Map, Frozen, Udt, Tuple; + NativeCql, Set, List, Map, Frozen, Udt, Tuple, Vector; public static InternalType fromString(String name) { @@ -77,6 +77,8 @@ public static InternalType fromString(String name) return Set; case "list": return List; + case "vector": + return Vector; case "map": return Map; case "tuple": @@ -237,6 +239,10 @@ public interface CqlList extends CqlCollection { } + public interface CqlVector extends CqlCollection + { + } + public interface CqlTuple extends CqlCollection { ByteBuffer serializeTuple(Object[] values); diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java index 9d989be51..60ab8c95f 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java @@ -51,7 +51,7 @@ public final class CqlUtils "min_index_interval", "max_index_interval" ); - private static final Pattern REPLICATION_FACTOR_PATTERN = Pattern.compile("WITH REPLICATION = (\\{[^\\}]*\\})"); + private static final Pattern REPLICATION_FACTOR_PATTERN = Pattern.compile("WITH REPLICATION = (\\{[^\\}]*\\})", Pattern.CASE_INSENSITIVE); // Initialize a mapper allowing single quotes to process the RF string from the CREATE KEYSPACE statement private static final ObjectMapper MAPPER = new ObjectMapper().configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true); private static final Pattern ESCAPED_WHITESPACE_PATTERN = Pattern.compile("(\\\\r|\\\\n|\\\\r\\n)+"); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java index acd6721ed..c67fa1121 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java @@ -84,6 +84,7 @@ public final class SqlToCqlTypeConverter implements Serializable public static final String UDT = "udt"; public static final String VARCHAR = "varchar"; public static final String VARINT = "varint"; + public static final String VECTOR = "vector"; private static final Logger LOGGER = LoggerFactory.getLogger(SqlToCqlTypeConverter.class); private static final NoOp NO_OP_CONVERTER = new NoOp<>(); private static final LongConverter LONG_CONVERTER = new LongConverter(); @@ -165,6 +166,7 @@ public static Converter getConverter(CqlField.CqlType cqlType) case TINYINT: return NO_OP_CONVERTER; case LIST: + case VECTOR: return new ListConverter<>((CqlField.CqlCollection) cqlType); case MAP: assert cqlType instanceof CqlField.CqlMap; diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java index cb5564f0c..07fbf0e74 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java @@ -95,7 +95,7 @@ public interface CommitResultSupplier extends BiFunction, String, D { } - public static final String DEFAULT_CASSANDRA_VERSION = "cassandra-5.0.5"; + public static final String DEFAULT_CASSANDRA_VERSION = "cassandra-5.0.7"; private final UUID jobId; private boolean skipClean = false; diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java index b1ce66e35..972728ec8 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java @@ -309,7 +309,7 @@ void testWriteWithSubRanges(String version) @MethodSource("data") void testWriteWithDataInMultipleSubRanges(String version) { - version = "cassandra-5.0.5"; + version = "cassandra-5.0.7"; setUp(version); MockBulkWriterContext m = Mockito.spy(writerContext); TokenPartitioner mtp = Mockito.mock(TokenPartitioner.class); diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java index dd1fbedb3..f3dcda086 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java @@ -88,7 +88,7 @@ private void setup(ConsistencyLevel.CL consistencyLevel) { digestAlgorithm = new XXHash32DigestAlgorithm(); tableWriter = new MockTableWriter(folder); - writerContext = new MockBulkWriterContext(TOKEN_RANGE_MAPPING, "cassandra-5.0.5", consistencyLevel); + writerContext = new MockBulkWriterContext(TOKEN_RANGE_MAPPING, "cassandra-5.0.7", consistencyLevel); writerContext.setReplicationFactor(new ReplicationFactor(NetworkTopologyStrategy, rfOptions)); transportContext = (TransportContext.DirectDataBulkWriterContext) writerContext.transportContext(); } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/endtoend/DataTypeTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/endtoend/DataTypeTests.java index 210549f28..e9fa11850 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/endtoend/DataTypeTests.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/endtoend/DataTypeTests.java @@ -33,17 +33,21 @@ import org.junit.jupiter.params.provider.MethodSource; import org.apache.cassandra.bridge.CassandraBridge; +import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.spark.TestUtils; import org.apache.cassandra.spark.Tester; import org.apache.cassandra.spark.data.CqlField; import org.apache.cassandra.spark.utils.RandomUtils; import org.apache.cassandra.spark.utils.test.TestSchema; import org.apache.spark.sql.Row; +import org.quicktheories.core.Gen; import scala.collection.mutable.AbstractSeq; import static org.apache.cassandra.spark.utils.ScalaConversionUtils.mutableSeqAsJavaList; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; import static org.quicktheories.QuickTheory.qt; +import static org.quicktheories.generators.SourceDSL.arbitrary; @Tag("Sequential") public class DataTypeTests @@ -103,6 +107,103 @@ public void testSet(CassandraBridge bridge) ); } + @ParameterizedTest + @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges") + public void testVector(CassandraBridge bridge) + { + assumeThat(bridge.getVersion().versionNumber()).isGreaterThanOrEqualTo(CassandraVersion.FIVEZERO.versionNumber()); + qt().forAll(supportedVectorTypes(bridge)) + .checkAssert(type -> + Tester.builder(TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("a", bridge.vector(type, 10))) + .withExpectedRowCountPerSSTable(Tester.DEFAULT_NUM_ROWS) + .run(bridge.getVersion()) + ); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges") + public void testVectorVector(CassandraBridge bridge) + { + assumeThat(bridge.getVersion().versionNumber()).isGreaterThanOrEqualTo(CassandraVersion.FIVEZERO.versionNumber()); + qt().forAll(supportedVectorTypes(bridge)) + .checkAssert(type -> + Tester.builder(TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("a", bridge.vector(bridge.vector(type, 2), 5))) + .withExpectedRowCountPerSSTable(Tester.DEFAULT_NUM_ROWS) + .run(bridge.getVersion()) + ); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges") + public void testVectorList(CassandraBridge bridge) + { + assumeThat(bridge.getVersion().versionNumber()).isGreaterThanOrEqualTo(CassandraVersion.FIVEZERO.versionNumber()); + qt().forAll(supportedVectorTypes(bridge)) + .checkAssert(type -> + Tester.builder(TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("a", bridge.vector(bridge.list(type), 3))) + .withExpectedRowCountPerSSTable(Tester.DEFAULT_NUM_ROWS) + .run(bridge.getVersion()) + ); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges") + public void testVectorUDT(CassandraBridge bridge) + { + // pk -> a vector>, 10> + // Test vector of UDTs + assumeThat(bridge.getVersion().versionNumber()).isGreaterThanOrEqualTo(CassandraVersion.FIVEZERO.versionNumber()); + qt().withExamples(10) + .forAll(supportedVectorTypes(bridge)) + .checkAssert(type -> + Tester.builder(TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("a", bridge.vector( + bridge.udt("keyspace", "nested_udt") + .withField("x", bridge.aInt()) + .withField("y", type) + .withField("z", bridge.aInt()) + .build().frozen(), + 10))) + .run(bridge.getVersion()) + ); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges") + public void testVectorTuple(CassandraBridge bridge) + { + // pk -> a vector>, 7> + // Test tuple nested within vector + assumeThat(bridge.getVersion().versionNumber()).isGreaterThanOrEqualTo(CassandraVersion.FIVEZERO.versionNumber()); + qt().withExamples(10) + .forAll(supportedVectorTypes(bridge)) + .checkAssert(type -> + Tester.builder(TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("a", bridge.vector(bridge.tuple(type, + bridge.aFloat(), + bridge.text()).frozen(), 7))) + .run(bridge.getVersion()) + ); + } + + private static Gen supportedVectorTypes(CassandraBridge bridge) + { + // TODO: Vector of list of durations fail, because we cannot replace DurationSerializer with + // AnalyticsDurationSerializer across all serializers used by VectorType. + List supportedTypes = bridge.supportedTypes().stream() + .filter(t -> !t.equals(bridge.duration())) + .collect(Collectors.toList()); + return arbitrary().pick(supportedTypes); + } + @ParameterizedTest @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges") public void testList(CassandraBridge bridge) diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/endtoend/MiscTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/endtoend/MiscTests.java index ac7a1f181..b8259c296 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/endtoend/MiscTests.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/endtoend/MiscTests.java @@ -158,7 +158,7 @@ public void testPartialRowClusteringKeys(CassandraBridge bridge) public void testQuotedKeyspaceName(CassandraBridge bridge) { Tester.builder(keyspace1 -> TestSchema.builder(bridge) - .withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "_")) + .withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "")) .withPartitionKey("pk", bridge.uuid()) .withColumn("c1", bridge.varint()) .withColumn("c2", bridge.text()) @@ -184,7 +184,7 @@ public void testReservedWordKeyspaceName(CassandraBridge bridge) public void testQuotedTableName(CassandraBridge bridge) { Tester.builder(keyspace1 -> TestSchema.builder(bridge) - .withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "_")) + .withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "")) .withTable("Quoted_Table_" + UUID.randomUUID().toString().replaceAll("-", "_")) .withPartitionKey("pk", bridge.uuid()) .withColumn("c1", bridge.varint()) @@ -198,7 +198,7 @@ public void testQuotedTableName(CassandraBridge bridge) public void testReservedWordTableName(CassandraBridge bridge) { Tester.builder(keyspace1 -> TestSchema.builder(bridge) - .withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "_")) + .withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "")) .withTable("table") .withPartitionKey("pk", bridge.uuid()) .withColumn("c1", bridge.varint()) @@ -212,7 +212,7 @@ public void testReservedWordTableName(CassandraBridge bridge) public void testQuotedPartitionKey(CassandraBridge bridge) { Tester.builder(keyspace1 -> TestSchema.builder(bridge) - .withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "_")) + .withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "")) .withTable("Quoted_Table_" + UUID.randomUUID().toString().replaceAll("-", "_")) .withPartitionKey("Partition_Key_0", bridge.uuid()) .withColumn("c1", bridge.varint()) @@ -226,7 +226,7 @@ public void testQuotedPartitionKey(CassandraBridge bridge) public void testMultipleQuotedPartitionKeys(CassandraBridge bridge) { Tester.builder(keyspace1 -> TestSchema.builder(bridge) - .withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "_")) + .withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "")) .withTable("Quoted_Table_" + UUID.randomUUID().toString().replaceAll("-", "_")) .withPartitionKey("Partition_Key_0", bridge.uuid()) .withPartitionKey("Partition_Key_1", bridge.bigint()) @@ -243,7 +243,7 @@ public void testMultipleQuotedPartitionKeys(CassandraBridge bridge) public void testQuotedPartitionClusteringKeys(CassandraBridge bridge) { Tester.builder(keyspace1 -> TestSchema.builder(bridge) - .withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "_")) + .withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "")) .withTable("Quoted_Table_" + UUID.randomUUID().toString().replaceAll("-", "_")) .withPartitionKey("a", bridge.uuid()) .withClusteringKey("Clustering_Key_0", bridge.bigint()) @@ -258,7 +258,7 @@ public void testQuotedPartitionClusteringKeys(CassandraBridge bridge) public void testQuotedColumnNames(CassandraBridge bridge) { Tester.builder(keyspace1 -> TestSchema.builder(bridge) - .withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "_")) + .withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "")) .withTable("Quoted_Table_" + UUID.randomUUID().toString().replaceAll("-", "_")) .withPartitionKey("Partition_Key_0", bridge.uuid()) .withColumn("Column_1", bridge.varint()) @@ -272,7 +272,7 @@ public void testQuotedColumnNames(CassandraBridge bridge) public void testQuotedColumnNamesWithColumnFilter(CassandraBridge bridge) { Tester.builder(keyspace1 -> TestSchema.builder(bridge) - .withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "_")) + .withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "")) .withTable("Quoted_Table_" + UUID.randomUUID().toString().replaceAll("-", "_")) .withPartitionKey("Partition_Key_0", bridge.uuid()) .withColumn("Column_1", bridge.varint()) diff --git a/cassandra-analytics-integration-framework/build.gradle b/cassandra-analytics-integration-framework/build.gradle index a88c0da9a..0bab1e6e9 100644 --- a/cassandra-analytics-integration-framework/build.gradle +++ b/cassandra-analytics-integration-framework/build.gradle @@ -32,7 +32,7 @@ if (propertyWithDefault("artifactType", null) == "spark") apply from: "$rootDir/gradle/common/publishing.gradle" } -ext.dtestJar = System.getenv("DTEST_JAR") ?: "dtest-5.0.5.jar" // latest supported Cassandra build is 5.0 +ext.dtestJar = System.getenv("DTEST_JAR") ?: "dtest-5.0.7.jar" // latest supported Cassandra build is 5.0 def dtestJarFullPath = "${dependencyLocation}${ext.dtestJar}" test { @@ -50,7 +50,7 @@ dependencies { // classpath while running integration tests. Instead, a dedicated classloader will load the // dtest jar while provisioning the in-jvm dtest Cassandra cluster compileOnly(files("${dtestJarFullPath}")) - api("org.apache.cassandra:dtest-api:0.0.16") + api("org.apache.cassandra:dtest-api:0.0.18") // Needed by the Cassandra dtest framework // JUnit api("org.junit.jupiter:junit-jupiter-api:${project.junitVersion}") diff --git a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java index 93a877ea3..69686b900 100644 --- a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java +++ b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java @@ -213,6 +213,11 @@ public IInstanceConfig newInstanceConfig() return delegate.newInstanceConfig(); } + public IInstanceConfig createInstanceConfig(int i) + { + throw new UnsupportedOperationException(); + } + @Override public ICluster delegate() { diff --git a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java index 460d8565b..74e2b3c65 100644 --- a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java +++ b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java @@ -496,13 +496,24 @@ protected Object[][] queryAllData(QualifiedName table, ConsistencyLevel consiste * Convenience method to query all data from the provided {@code table} at consistency level ALL. * * @param table the qualified Cassandra table name - * @return all the data queried from the table + * @return record count limited to 10000 rows */ protected ResultSet queryAllDataWithDriver(QualifiedName table) { return queryAllDataWithDriver(table, ConsistencyLevel.ALL); } + /** + * Convenience method to count rows from the provided {@code table} at consistency level ALL. + * + * @param table the qualified Cassandra table name + * @return all the data queried from the table + */ + protected Long countDataWithDriver(QualifiedName table) + { + return countDataWithDriver(table, ConsistencyLevel.ALL); + } + /** * Convenience method to query all data from the provided {@code table} at the specified consistency level. * @@ -519,6 +530,22 @@ protected ResultSet queryAllDataWithDriver(QualifiedName table, ConsistencyLevel return session.execute(statement); } + /** + * Convenience method to count rows from the provided {@code table} at the specified consistency level. + * + * @param table the qualified Cassandra table name + * @param consistency the consistency level to use for querying the data + * @return record count limited to 10000 rows + */ + protected Long countDataWithDriver(QualifiedName table, ConsistencyLevel consistency) + { + Cluster driverCluster = createDriverCluster(cluster.delegate()); + Session session = driverCluster.connect(); + SimpleStatement statement = new SimpleStatement(String.format("SELECT COUNT(*) FROM %s;", table)); + statement.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.valueOf(consistency.name())); + return session.execute(statement).one().getLong(0); + } + // Utility methods public static Cluster createDriverCluster(ICluster dtest) @@ -762,6 +789,7 @@ static InstanceMetadata buildInstanceMetadata(Vertx vertx, .id(config.num()) .host(hostName) .port(port) + .storagePort(config.getInt("storage_port")) .dataDirs(Arrays.asList(dataDirectories)) .cdcDir(config.getString("cdc_raw_directory")) .commitlogDir(config.getString("commitlog_directory")) diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderVectorTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderVectorTest.java new file mode 100644 index 000000000..c7acb7960 --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderVectorTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.Test; + +import com.vdurmont.semver4j.Semver; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.testing.ClusterBuilderConfiguration; +import org.apache.cassandra.testing.TestUtils; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import static org.apache.cassandra.testing.TestUtils.DC1_RF1; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; + +/** + * Tests bulk reader functionality + */ +class BulkReaderVectorTest extends SharedClusterSparkIntegrationTestBase +{ + static final int ROW_COUNT = 10; + static final int DIMENSIONS = 3; + static final List> DATASET = new ArrayList<>(); + static QualifiedName table1 = uniqueTestTableFullName(TEST_KEYSPACE); + + static { + for (int i = 0; i < ROW_COUNT; i++) + { + List vector = new ArrayList<>(); + for (int j = 0; j < DIMENSIONS; j++) + { + vector.add(ThreadLocalRandom.current().nextFloat()); + } + DATASET.add(vector); + } + } + + @Override + protected void beforeClusterProvisioning() + { + assumeThat(TestUtils.getDTestClusterVersion().isGreaterThanOrEqualTo(new Semver("5.0", Semver.SemverType.LOOSE))) + .describedAs("Vector type was introduced in Cassandra 5.0") + .isTrue(); + } + + @Override + protected ClusterBuilderConfiguration testClusterConfiguration() + { + return super.testClusterConfiguration() + .nodesPerDc(2); + } + + @Test + void testReadingVectorColumn() + { + Dataset data = bulkReaderDataFrame(table1).load(); + + List rows = data.collectAsList().stream() + .sorted(Comparator.comparing(row -> row.getInt(0))) + .collect(Collectors.toList()); + assertThat(rows.size()).isEqualTo(ROW_COUNT); + + for (int i = 0; i < ROW_COUNT; i++) + { + Row row = rows.get(i); + List value = DATASET.get(i); + assertThat(row.getList(1)).isEqualTo(value); + } + } + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(TEST_KEYSPACE, DC1_RF1); + createTestTable(table1, "CREATE TABLE IF NOT EXISTS %s (id int PRIMARY KEY, value vector);"); + + IInstance firstRunningInstance = cluster.getFirstRunningInstance(); + for (int i = 0; i < ROW_COUNT; i++) + { + List value = DATASET.get(i); + String query = String.format("INSERT INTO %s (id, value) VALUES (%d, %s);", table1, i, value.toString()); + firstRunningInstance.coordinator().execute(query, ConsistencyLevel.ALL); + } + } +} diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteVectorTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteVectorTest.java new file mode 100644 index 000000000..756138fd9 --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteVectorTest.java @@ -0,0 +1,87 @@ +package org.apache.cassandra.analytics; + +import org.junit.jupiter.api.Test; + +import com.vdurmont.semver4j.Semver; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICoordinator; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.testing.ClusterBuilderConfiguration; +import org.apache.cassandra.testing.TestUtils; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import static org.apache.cassandra.testing.TestUtils.DC1_RF3; +import static org.apache.cassandra.testing.TestUtils.ROW_COUNT; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; + +public class BulkWriteVectorTest extends SharedClusterSparkIntegrationTestBase +{ + static final QualifiedName VECTOR_TABLE_NAME = new QualifiedName(TEST_KEYSPACE, "test_vector"); + public static final String VECTOR_TABLE_CREATE = "CREATE TABLE " + VECTOR_TABLE_NAME + " (\n" + + " id BIGINT PRIMARY KEY,\n" + + " value vector);"; + + private ICoordinator coordinator; + + @Test + void testVectorOfFloats() + { + int numRowsInserted = populateVectorOfFloats(); + // Create a spark frame with the data inserted during the setup + Dataset sourceData = bulkReaderDataFrame(VECTOR_TABLE_NAME).load(); + assertThat(sourceData.count()).isEqualTo(numRowsInserted); + + // Insert the dataset containing vectors + bulkWriterDataFrameWriter(sourceData, VECTOR_TABLE_NAME).save(); + + // Count rows because Java driver 3.x cannot read vector type + assertThat(countDataWithDriver(VECTOR_TABLE_NAME)).isEqualTo(numRowsInserted); + } + + private int populateVectorOfFloats() + { + String insert = "INSERT INTO %s (id, value) VALUES (%d, [%f, %f, %f])"; + + int i = 0; + for (; i < ROW_COUNT; i++) + { + float j = (float) i; + cluster.schemaChangeIgnoringStoppedInstances(String.format(insert, VECTOR_TABLE_NAME, + i, j, j, j)); + } + + // test null value + coordinator.execute(String.format("insert into %s (id) values (%d)", + VECTOR_TABLE_NAME, i++), ConsistencyLevel.ALL); + + return i; + } + + @Override + protected ClusterBuilderConfiguration testClusterConfiguration() + { + return super.testClusterConfiguration() + .nodesPerDc(3); + } + + @Override + protected void beforeClusterProvisioning() + { + assumeThat(TestUtils.getDTestClusterVersion().isGreaterThanOrEqualTo(new Semver("5.0", Semver.SemverType.LOOSE))) + .describedAs("Vector type was introduced in Cassandra 5.0") + .isTrue(); + } + + @Override + protected void initializeSchemaForTest() + { + coordinator = cluster.getFirstRunningInstance().coordinator(); + + createTestKeyspace(VECTOR_TABLE_NAME, DC1_RF3); + + cluster.schemaChangeIgnoringStoppedInstances(VECTOR_TABLE_CREATE); + } +} diff --git a/cassandra-analytics-spark-four-zero-converter/src/main/java/org/apache/cassandra/spark/data/converter/SparkSqlTypeConverterImplementation.java b/cassandra-analytics-spark-four-zero-converter/src/main/java/org/apache/cassandra/spark/data/converter/SparkSqlTypeConverterImplementation.java index 5c4b9352d..6610a2c39 100644 --- a/cassandra-analytics-spark-four-zero-converter/src/main/java/org/apache/cassandra/spark/data/converter/SparkSqlTypeConverterImplementation.java +++ b/cassandra-analytics-spark-four-zero-converter/src/main/java/org/apache/cassandra/spark/data/converter/SparkSqlTypeConverterImplementation.java @@ -166,6 +166,10 @@ protected static SparkType getOrThrow(CqlField.CqlType cqlType) { return new SparkSet(INSTANCE, (CqlField.CqlSet) cqlType); } + else if (cqlType instanceof CqlField.CqlVector) + { + return new SparkList(INSTANCE, (CqlField.CqlVector) cqlType); + } else if (cqlType instanceof CqlField.CqlList) { return new SparkList(INSTANCE, (CqlField.CqlList) cqlType); diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java index 67c5bc7a0..22db7f2fc 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java @@ -318,6 +318,11 @@ public CqlField.CqlList list(CqlField.CqlType type) return cassandraTypes().list(type); } + public CqlField.CqlVector vector(CqlField.CqlType type, int dimensions) + { + return cassandraTypes().vector(type, dimensions); + } + public CqlField.CqlSet set(CqlField.CqlType type) { return cassandraTypes().set(type); diff --git a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/data/converter/types/VectorTypeTests.java b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/data/converter/types/VectorTypeTests.java new file mode 100644 index 000000000..fc44485c2 --- /dev/null +++ b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/data/converter/types/VectorTypeTests.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.data.converter.types; + +import java.util.List; +import java.util.Set; + +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.bridge.CassandraBridgeImplementation; +import org.apache.cassandra.spark.data.complex.CqlList; +import org.apache.cassandra.spark.data.complex.CqlVector; + +import static org.assertj.core.api.Assertions.assertThat; + +public class VectorTypeTests +{ + private static final CassandraBridgeImplementation BRIDGE = new CassandraBridgeImplementation(); + + @Test + public void testSimpleTypeConversion() + { + CqlVector cqlVector = new CqlVector(org.apache.cassandra.spark.data.types.Float.INSTANCE, 3); + Object cqlWriterObj = cqlVector.convertForCqlWriter(List.of(3.14f, 0.0f, -1f), BRIDGE.getVersion(), false); + assertThat(cqlWriterObj).isInstanceOf(List.class); + List cqlWriterList = (List) cqlWriterObj; + assertThat(cqlWriterList).containsExactly(3.14f, 0.0f, -1f); + } + + @Test + public void testComplexTypeConversion() + { + CqlVector cqlVector = new CqlVector(CqlList.set(org.apache.cassandra.spark.data.types.Float.INSTANCE), 3); + Object cqlWriterObj = cqlVector.convertForCqlWriter(List.of(Set.of(3.14f, 0f), Set.of(1f), Set.of()), BRIDGE.getVersion(), false); + assertThat(cqlWriterObj).isInstanceOf(List.class); + List> cqlWriterList = (List>) cqlWriterObj; + assertThat(cqlWriterList).hasSize(3); + assertThat(cqlWriterList.get(0)).containsExactlyInAnyOrder(3.14f, 0f); + assertThat(cqlWriterList.get(1)).containsExactly(1f); + assertThat(cqlWriterList.get(2)).isEmpty(); + } +} diff --git a/cassandra-five-zero-types/src/main/java/org/apache/cassandra/bridge/CassandraTypesImplementation.java b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/bridge/CassandraTypesImplementation.java index a13071c11..f520ddc4a 100644 --- a/cassandra-five-zero-types/src/main/java/org/apache/cassandra/bridge/CassandraTypesImplementation.java +++ b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/bridge/CassandraTypesImplementation.java @@ -32,6 +32,8 @@ import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.locator.SimpleSnitch; import org.apache.cassandra.security.EncryptionContext; +import org.apache.cassandra.spark.data.CqlField; +import org.apache.cassandra.spark.data.complex.CqlVector; public class CassandraTypesImplementation extends AbstractCassandraTypes { @@ -88,4 +90,10 @@ protected static void setupCommitLogConfigs(Path path) DatabaseDescriptor.getRawConfig().commitlog_total_space = new DataStorageSpec.IntMebibytesBound(1024); DatabaseDescriptor.setCommitLogSegmentMgrProvider(commitLog -> new CommitLogSegmentManagerStandard(commitLog, commitLogPath.toString())); } + + @Override + public CqlField.CqlVector vector(CqlField.CqlType type, int dimensions) + { + return new CqlVector(type, dimensions); + } } diff --git a/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlVector.java b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlVector.java new file mode 100644 index 000000000..bce534c1f --- /dev/null +++ b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlVector.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.data.complex; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.cassandra.bridge.CassandraVersion; +import org.apache.cassandra.cql3.functions.types.SettableByIndexData; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.VectorType; +import org.apache.cassandra.db.rows.BufferCell; +import org.apache.cassandra.db.rows.CellPath; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.serializers.TypeSerializer; +import org.apache.cassandra.spark.data.CqlField; +import org.apache.cassandra.spark.data.CqlType; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.TimeUUID; +import org.apache.cassandra.utils.UUIDGen; + +import static org.apache.cassandra.spark.data.CqlField.NO_TTL; + +public class CqlVector extends CqlCollection implements CqlField.CqlVector +{ + private final int dimensions; + + public CqlVector(CqlField.CqlType type, int dimensions) + { + super(type); + this.dimensions = dimensions; + } + + @Override + public AbstractType dataType(boolean isMultiCell) + { + return VectorType.getInstance(((CqlType) type()).dataType(), dimensions); + } + + @Override + public InternalType internalType() + { + return InternalType.Vector; + } + + @Override + @SuppressWarnings("unchecked") + public TypeSerializer serializer() + { + return (TypeSerializer) dataType(false).getSerializer(); + } + + @Override + public String name() + { + return "vector"; + } + + @Override + public String cqlName() + { + return String.format("%s<%s, %d>", + internalType().name().toLowerCase(), + types.get(0).cqlName(), + dimensions); + } + + @Override + protected void setInnerValueInternal(SettableByIndexData udtValue, int position, Object value) + { + udtValue.setVector(position, (List) value); + } + + @Override + public Object randomValue(int minCollectionSize) + { + return IntStream.range(0, dimensions) + .mapToObj(element -> type().randomValue(minCollectionSize)) + .collect(Collectors.toList()); + } + + @Override + public org.apache.cassandra.cql3.functions.types.DataType driverDataType(boolean isFrozen) + { + return org.apache.cassandra.cql3.functions.types.DataType.vector(((CqlType) type()).driverDataType(isFrozen), dimensions); + } + + @Override + public Object convertForCqlWriter(Object value, CassandraVersion version, boolean isCollectionElement) + { + return ((List) value).stream() + .map(element -> type().convertForCqlWriter(element, version, true)) + .collect(Collectors.toList()); + } + + @Override + public void addCell(final org.apache.cassandra.db.rows.Row.Builder rowBuilder, + ColumnMetadata cd, + long timestamp, + int ttl, + long now, + Object value) + { + for (Object o : (List) value) + { + if (ttl != NO_TTL) + { + rowBuilder.addCell(CqlType.expiring(cd, timestamp, ttl, now, type().serialize(o), + CellPath.create(TimeUUID.Generator.nextTimeUUID().toBytes()))); + } + else + { + rowBuilder.addCell(BufferCell.live(cd, timestamp, type().serialize(o), randomCellPath())); + } + } + } + + protected CellPath randomCellPath() + { + return CellPath.create(TimeUUID.Generator.nextTimeUUID().toBytes()); + } +} diff --git a/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java new file mode 100644 index 000000000..330331f0c --- /dev/null +++ b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.reader; + +import java.util.Collections; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.cql3.CQL3Type; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.VectorType; +import org.apache.cassandra.spark.data.CassandraTypes; +import org.apache.cassandra.spark.data.CqlTable; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.jetbrains.annotations.Nullable; + +public class SchemaBuilder extends AbstractSchemaBuilder +{ + public SchemaBuilder(CqlTable table, Partitioner partitioner, boolean enableCdc) + { + this(table, partitioner, null, enableCdc); + } + + public SchemaBuilder(CqlTable table, Partitioner partitioner) + { + this(table, partitioner, null, false); + } + + public SchemaBuilder(CqlTable table, Partitioner partitioner, UUID tableId, boolean enableCdc) + { + this(table.createStatement(), + table.keyspace(), + table.replicationFactor(), + partitioner, + table::udtCreateStmts, + tableId, + 0, + enableCdc); + } + + @VisibleForTesting + public SchemaBuilder(String createStmt, String keyspace, ReplicationFactor replicationFactor) + { + this(createStmt, keyspace, replicationFactor, Partitioner.Murmur3Partitioner, bridge -> Collections.emptySet(), null, 0, false); + } + + @VisibleForTesting + public SchemaBuilder(String createStmt, + String keyspace, + ReplicationFactor replicationFactor, + Partitioner partitioner) + { + this(createStmt, keyspace, replicationFactor, partitioner, bridge -> Collections.emptySet(), null, 0, false); + } + + public SchemaBuilder(String createStmt, + String keyspace, + ReplicationFactor replicationFactor, + Partitioner partitioner, + Function> udtStatementsProvider, + @Nullable UUID tableId, + int indexCount, + boolean enableCdc) + { + super(createStmt, keyspace, replicationFactor, partitioner, udtStatementsProvider, + tableId, indexCount, enableCdc); + } + + @Override + protected void validateType(CQL3Type cqlType) + { + if (!(cqlType instanceof CQL3Type.Native) + && !(cqlType instanceof CQL3Type.Collection) + && !(cqlType instanceof CQL3Type.UserDefined) + && !(cqlType instanceof CQL3Type.Tuple) + && !(cqlType instanceof CQL3Type.Vector)) + { + throw new UnsupportedOperationException("Only native, collection, tuples, vectors or UDT data types are supported, " + + "unsupported data type: " + cqlType.toString()); + } + if (cqlType instanceof CQL3Type.Vector) + { + CQL3Type.Vector vector = (CQL3Type.Vector) cqlType; + VectorType vectorType = vector.getType(); + for (AbstractType subType : vectorType.subTypes()) + { + validateType(subType); + } + return; + } + super.validateType(cqlType); + } +} diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java index 0853325cb..2e7d30d03 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java @@ -384,7 +384,7 @@ public void consume() { boolean isStatic = cell.column().isStatic(); rowData.setColumnNameCopy(ReaderUtils.encodeCellName(metadata, - isStatic ? Clustering.STATIC_CLUSTERING : clustering, + isStatic ? Clustering.STATIC_CLUSTERING : clustering, cell.column().name.bytes, null)); if (cell.isTombstone()) diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/bridge/AbstractCassandraTypes.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/bridge/AbstractCassandraTypes.java index 8c5537e02..6dfeea89d 100644 --- a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/bridge/AbstractCassandraTypes.java +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/bridge/AbstractCassandraTypes.java @@ -253,6 +253,12 @@ public CqlField.CqlList list(CqlField.CqlType type) return CqlCollection.list(type); } + @Override + public CqlField.CqlVector vector(CqlField.CqlType type, int dimensions) + { + throw new UnsupportedOperationException("Vector data type is available in C* 5.x."); + } + @Override public CqlField.CqlSet set(CqlField.CqlType type) { diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/AbstractSchemaBuilder.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/AbstractSchemaBuilder.java new file mode 100644 index 000000000..f023012a4 --- /dev/null +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/AbstractSchemaBuilder.java @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.reader; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.antlr.runtime.RecognitionException; +import org.apache.cassandra.bridge.CassandraSchema; +import org.apache.cassandra.bridge.CassandraTypesImplementation; +import org.apache.cassandra.bridge.SchemaUpdater; +import org.apache.cassandra.cql3.CQL3Type; +import org.apache.cassandra.cql3.CQLFragmentParser; +import org.apache.cassandra.cql3.CqlParser; +import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; +import org.apache.cassandra.cql3.statements.schema.CreateTypeStatement; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.CollectionType; +import org.apache.cassandra.db.marshal.ListType; +import org.apache.cassandra.db.marshal.MapType; +import org.apache.cassandra.db.marshal.SetType; +import org.apache.cassandra.db.marshal.TupleType; +import org.apache.cassandra.db.marshal.UserType; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; +import org.apache.cassandra.schema.Types; +import org.apache.cassandra.spark.data.CassandraTypes; +import org.apache.cassandra.spark.data.CqlField; +import org.apache.cassandra.spark.data.CqlTable; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.data.complex.CqlFrozen; +import org.apache.cassandra.spark.data.complex.CqlUdt; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.apache.cassandra.utils.Pair; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public abstract class AbstractSchemaBuilder +{ + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSchemaBuilder.class); + + protected final TableMetadata metadata; + protected final KeyspaceMetadata keyspaceMetadata; + protected final String createStmt; + protected final String keyspace; + protected final ReplicationFactor replicationFactor; + protected final CassandraTypes cassandraTypes; + protected final int indexCount; + + public AbstractSchemaBuilder(CqlTable table, Partitioner partitioner, boolean enableCdc) + { + this(table, partitioner, null, enableCdc); + } + + public AbstractSchemaBuilder(CqlTable table, Partitioner partitioner) + { + this(table, partitioner, null, false); + } + + public AbstractSchemaBuilder(CqlTable table, Partitioner partitioner, UUID tableId, boolean enableCdc) + { + this(table.createStatement(), + table.keyspace(), + table.replicationFactor(), + partitioner, + table::udtCreateStmts, + tableId, + 0, + enableCdc); + } + + @VisibleForTesting + public AbstractSchemaBuilder(String createStmt, String keyspace, ReplicationFactor replicationFactor) + { + this(createStmt, keyspace, replicationFactor, Partitioner.Murmur3Partitioner, bridge -> Collections.emptySet(), null, 0, false); + } + + @VisibleForTesting + public AbstractSchemaBuilder(String createStmt, + String keyspace, + ReplicationFactor replicationFactor, + Partitioner partitioner) + { + this(createStmt, keyspace, replicationFactor, partitioner, bridge -> Collections.emptySet(), null, 0, false); + } + + public AbstractSchemaBuilder(String createStmt, + String keyspace, + ReplicationFactor replicationFactor, + Partitioner partitioner, + Function> udtStatementsProvider, + @Nullable UUID tableId, + int indexCount, + boolean enableCdc) + { + this.createStmt = createStmt; + this.keyspace = keyspace; + this.replicationFactor = replicationFactor; + this.cassandraTypes = new CassandraTypesImplementation(); + this.indexCount = indexCount; + + Pair updated = CassandraSchema.apply(schema -> + updateSchema(schema, + this.keyspace, + udtStatementsProvider.apply(cassandraTypes), + this.createStmt, + partitioner, + this.replicationFactor, + tableId, enableCdc, + this::validateColumnMetaData)); + this.keyspaceMetadata = updated.left; + this.metadata = updated.right; + } + + // Update schema with the given keyspace, table and udt. + // It creates the corresponding metadata and opens instances for keyspace and table, if needed. + // At the end, it validates that the input keyspace and table both should have metadata exist and instance opened. + private static Pair updateSchema(Schema schema, + String keyspace, + Set udtStatements, + String createStatement, + Partitioner partitioner, + ReplicationFactor replicationFactor, + UUID tableId, + boolean enableCdc, + Consumer columnValidator) + { + // Set up and open keyspace if needed + IPartitioner cassPartitioner = CassandraTypesImplementation.getPartitioner(partitioner); + setupKeyspace(schema, keyspace, replicationFactor, cassPartitioner); + + // Set up and open table if needed, parse UDTs and include when parsing table schema + List typeStatements = new ArrayList<>(udtStatements.size()); + for (String udt : udtStatements) + { + try + { + typeStatements.add((CreateTypeStatement.Raw) CQLFragmentParser + .parseAnyUnhandled(CqlParser::query, udt)); + } + catch (RecognitionException exception) + { + LOGGER.error("Failed to parse type expression '{}'", udt); + throw new IllegalStateException(exception); + } + } + Types.RawBuilder typesBuilder = Types.rawBuilder(keyspace); + for (CreateTypeStatement.Raw st : typeStatements) + { + st.addToRawBuilder(typesBuilder); + } + Types types = typesBuilder.build(); + CreateTableStatement.Raw createTable = CQLFragmentParser.parseAny(CqlParser::createTableStatement, + createStatement, + "CREATE TABLE"); + // If the table already exists, the tableId should remain the same, unless a non-null tableId is supplied + TableMetadata maybeExistingTableMetadata = schema.getTableMetadata(keyspace, createTable.table()); + if (maybeExistingTableMetadata != null && tableId == null) + { + tableId = maybeExistingTableMetadata.id.asUUID(); + } + + TableMetadata.Builder builder = createTable + .keyspace(keyspace) + .prepare(null) + .builder(types) + .partitioner(cassPartitioner); + + if (tableId != null) + { + builder.id(TableId.fromUUID(tableId)); + } + + TableMetadata tableMetadata = builder.build(); + + if (tableMetadata.params.cdc != enableCdc) + { + tableMetadata = tableMetadata.unbuild() + .params(tableMetadata.params.unbuild() + .cdc(enableCdc) + .build()) + .build(); + } + + tableMetadata.columns().forEach(columnValidator); + setupTableAndUdt(schema, keyspace, tableMetadata, types); + + return validateKeyspaceTable(schema, keyspace, tableMetadata.name); + } + + private void validateColumnMetaData(@NotNull ColumnMetadata column) + { + validateType(column.type); + } + + protected void validateType(AbstractType type) + { + validateType(type.asCQL3Type()); + } + + protected void validateType(CQL3Type cqlType) + { + if (!(cqlType instanceof CQL3Type.Native) + && !(cqlType instanceof CQL3Type.Collection) + && !(cqlType instanceof CQL3Type.UserDefined) + && !(cqlType instanceof CQL3Type.Tuple)) + { + throw new UnsupportedOperationException("Only native, collection, tuples or UDT data types are supported, " + + "unsupported data type: " + cqlType.toString()); + } + + if (cqlType instanceof CQL3Type.Native) + { + CqlField.CqlType type = cassandraTypes.parseType(cqlType.toString()); + if (!type.isSupported()) + { + throw new UnsupportedOperationException(type.name() + " data type is not supported"); + } + } + else if (cqlType instanceof CQL3Type.Collection) + { + // Validate collection inner types + CQL3Type.Collection collection = (CQL3Type.Collection) cqlType; + CollectionType type = (CollectionType) collection.getType(); + switch (type.kind) + { + case LIST: + validateType(((ListType) type).getElementsType()); + return; + case SET: + validateType(((SetType) type).getElementsType()); + return; + case MAP: + validateType(((MapType) type).getKeysType()); + validateType(((MapType) type).getValuesType()); + return; + default: + // Do nothing + } + } + else if (cqlType instanceof CQL3Type.Tuple) + { + CQL3Type.Tuple tuple = (CQL3Type.Tuple) cqlType; + TupleType tupleType = (TupleType) tuple.getType(); + for (AbstractType subType : tupleType.allTypes()) + { + validateType(subType); + } + } + else + { + // Validate UDT inner types + UserType userType = (UserType) ((CQL3Type.UserDefined) cqlType).getType(); + for (AbstractType innerType : userType.fieldTypes()) + { + validateType(innerType); + } + } + } + + private static boolean keyspaceMetadataExists(Schema schema, String keyspaceName) + { + return schema.getKeyspaceMetadata(keyspaceName) != null; + } + + private static boolean tableMetadataExists(Schema schema, String keyspaceName, String tableName) + { + KeyspaceMetadata ksMetadata = schema.getKeyspaceMetadata(keyspaceName); + if (ksMetadata == null) + { + return false; + } + + return ksMetadata.hasTable(tableName); + } + + private static boolean keyspaceInstanceExists(Schema schema, String keyspaceName) + { + return schema.getKeyspaceInstance(keyspaceName) != null; + } + + private static boolean tableInstanceExists(Schema schema, String keyspaceName, String tableName) + { + Keyspace keyspace = schema.getKeyspaceInstance(keyspaceName); + if (keyspace == null) + { + return false; + } + + try + { + keyspace.getColumnFamilyStore(tableName); + } + catch (IllegalArgumentException exception) + { + LOGGER.info("Table instance does not exist. keyspace={} table={} existingCFS={}", + keyspace, tableName, keyspace.getColumnFamilyStores()); + return false; + } + return true; + } + + // Check whether keyspace metadata exists. Create keyspace metadata, if not. + // Check whether keyspace instance is opened. Open the keyspace, if not. + // NOTE: It is possible that external code that just creates metadata, but does not open the keyspace + private static void setupKeyspace(Schema schema, + String keyspaceName, + ReplicationFactor replicationFactor, + IPartitioner partitioner) + { + if (!keyspaceMetadataExists(schema, keyspaceName)) + { + LOGGER.info("Setting up keyspace metadata in schema keyspace={} rfStrategy={} partitioner={}", + keyspaceName, replicationFactor.getReplicationStrategy().name(), partitioner); + KeyspaceMetadata keyspaceMetadata = + KeyspaceMetadata.create(keyspaceName, KeyspaceParams.create(true, rfToMap(replicationFactor))); + SchemaUpdater.load(schema, keyspaceMetadata); + } + + if (!keyspaceInstanceExists(schema, keyspaceName)) + { + LOGGER.info("Setting up keyspace instance in schema keyspace={} rfStrategy={} partitioner={}", + keyspaceName, replicationFactor.getReplicationStrategy().name(), partitioner); + // Create keyspace instance and also initCf (cfs) for the table + Keyspace.openWithoutSSTables(keyspaceName); + } + } + + // Check whether table metadata exists. Create table metadata, if not. + // Check whether table instance is opened. Open/init the table, if not. + // NOTE: It is possible that external code that just creates metadata, but does not open the table + private static void setupTableAndUdt(Schema schema, + String keyspaceName, + TableMetadata tableMetadata, + Types userTypes) + { + String tableName = tableMetadata.name; + KeyspaceMetadata keyspaceMetadata = schema.getKeyspaceMetadata(keyspaceName); + if (keyspaceMetadata == null) + { + LOGGER.error("Keyspace metadata does not exist. keyspace={}", keyspaceName); + throw new IllegalStateException("Keyspace metadata null for '" + keyspaceName + + "' when it should have been initialized already"); + } + + if (!tableMetadataExists(schema, keyspaceName, tableName)) + { + LOGGER.info("Setting up table metadata in schema keyspace={} table={} partitioner={}", + keyspaceName, tableName, tableMetadata.partitioner.getClass().getName()); + keyspaceMetadata = keyspaceMetadata.withSwapped(keyspaceMetadata.tables.with(tableMetadata)); + SchemaUpdater.load(schema, keyspaceMetadata, tableMetadata); + } + + if (!tableMetadata.equals(schema.getTableMetadata(keyspaceName, tableMetadata.name))) + { + // Schema of the table has changed so update it in the schema + updateTableMetaData(schema, keyspaceName, tableMetadata); + LOGGER.info("Table metadata changed schema keyspace={} table={} partitioner={}", + keyspaceName, tableName, tableMetadata.partitioner.getClass().getName()); + } + + // The metadata of the table might not be the input tableMetadata. Fetch the current to be safe. + TableMetadata currentTable = schema.getTableMetadata(keyspaceName, tableName); + if (!tableInstanceExists(schema, keyspaceName, tableName)) + { + LOGGER.info("Setting up table instance in schema keyspace={} table={} partitioner={}", + keyspaceName, tableName, tableMetadata.partitioner.getClass().getName()); + if (keyspaceInstanceExists(schema, keyspaceName)) + { + // initCf (cfs) in the opened keyspace + schema.getKeyspaceInstance(keyspaceName) + .initCf(TableMetadataRef.forOfflineTools(currentTable), false); + } + else + { + // The keyspace has not yet opened, create/open keyspace instance and also initCf (cfs) for the table + Keyspace.openWithoutSSTables(keyspaceName); + } + } + + if (!userTypes.equals(Types.none())) + { + LOGGER.info("Setting up user types in schema keyspace={} types={}", + keyspaceName, userTypes); + // Update Schema instance with any user-defined types built + keyspaceMetadata = keyspaceMetadata.withSwapped(userTypes); + SchemaUpdater.load(schema, keyspaceMetadata, userTypes); + } + } + + private static void updateTableMetaData(Schema schema, String keyspace, TableMetadata tableMetadata) + { + KeyspaceMetadata ks = schema.getKeyspaceMetadata(keyspace); + ks = ks.withSwapped(ks.tables.withSwapped(tableMetadata)); + SchemaUpdater.load(schema, ks, tableMetadata); + } + + private static Pair validateKeyspaceTable(Schema schema, + String keyspaceName, + String tableName) + { + Preconditions.checkState(keyspaceMetadataExists(schema, keyspaceName), + "Keyspace metadata does not exist after building schema. keyspace=%s", + keyspaceName); + Preconditions.checkState(keyspaceInstanceExists(schema, keyspaceName), + "Keyspace instance is not opened after building schema. keyspace=%s", + keyspaceName); + Preconditions.checkState(tableMetadataExists(schema, keyspaceName, tableName), + "Table metadata does not exist after building schema. keyspace=%s table=%s", + keyspaceName, tableName); + Preconditions.checkState(tableInstanceExists(schema, keyspaceName, tableName), + "Table instance is not opened after building schema. keyspace=%s table=%s", + keyspaceName, tableName); + + // Validated above that keyspace and table, both exist and are opened + KeyspaceMetadata keyspaceMetadata = schema.getKeyspaceMetadata(keyspaceName); + TableMetadata tableMetadata = schema.getTableMetadata(keyspaceName, tableName); + return Pair.create(keyspaceMetadata, tableMetadata); + } + + public TableMetadata tableMetaData() + { + return metadata; + } + + public String createStatement() + { + return createStmt; + } + + public CqlTable build() + { + Map udts = buildsUdts(keyspaceMetadata); + List fields = buildFields(metadata, udts).stream().sorted().collect(Collectors.toList()); + return new CqlTable(keyspace, + metadata.name, + createStmt, + replicationFactor, + fields, + new HashSet<>(udts.values()), + indexCount); + } + + private Map buildsUdts(KeyspaceMetadata keyspaceMetadata) + { + List userTypes = new ArrayList<>(); + keyspaceMetadata.types.forEach(userTypes::add); + Map udts = new HashMap<>(userTypes.size()); + while (!userTypes.isEmpty()) + { + UserType userType = userTypes.remove(0); + if (!AbstractSchemaBuilder.nestedUdts(userType).stream().allMatch(udts::containsKey)) + { + // This UDT contains a nested user-defined type that has not been parsed yet + // so re-add to the queue and parse later + userTypes.add(userType); + continue; + } + String name = userType.getNameAsString(); + CqlUdt.Builder builder = CqlUdt.builder(keyspaceMetadata.name, name); + for (int field = 0; field < userType.size(); field++) + { + builder.withField(userType.fieldName(field).toString(), + cassandraTypes.parseType(userType.fieldType(field).asCQL3Type().toString(), udts)); + } + udts.put(name, builder.build()); + } + + return udts; + } + + /** + * @param type an abstract type + * @return a set of UDTs nested within the type parameter + */ + private static Set nestedUdts(AbstractType type) + { + Set result = new HashSet<>(); + nestedUdts(type, result, false); + return result; + } + + private static void nestedUdts(AbstractType type, Set udts, boolean isNested) + { + if (type instanceof UserType) + { + if (isNested) + { + udts.add(((UserType) type).getNameAsString()); + } + for (AbstractType nestedType : ((UserType) type).fieldTypes()) + { + nestedUdts(nestedType, udts, true); + } + } + else if (type instanceof TupleType) + { + for (AbstractType nestedType : ((TupleType) type).allTypes()) + { + nestedUdts(nestedType, udts, true); + } + } + else if (type instanceof SetType) + { + nestedUdts(((SetType) type).getElementsType(), udts, true); + } + else if (type instanceof ListType) + { + nestedUdts(((ListType) type).getElementsType(), udts, true); + } + else if (type instanceof MapType) + { + nestedUdts(((MapType) type).getKeysType(), udts, true); + nestedUdts(((MapType) type).getValuesType(), udts, true); + } + } + + private List buildFields(TableMetadata metadata, Map udts) + { + Iterator it = metadata.allColumnsInSelectOrder(); + List result = new ArrayList<>(); + int position = 0; + while (it.hasNext()) + { + ColumnMetadata col = it.next(); + boolean isPartitionKey = col.isPartitionKey(); + boolean isClusteringColumn = col.isClusteringColumn(); + boolean isStatic = col.isStatic(); + String name = col.name.toString(); + CqlField.CqlType type = col.type.isUDT() ? udts.get(((UserType) col.type).getNameAsString()) + : cassandraTypes.parseType(col.type.asCQL3Type().toString(), udts); + boolean isFrozen = col.type.isFreezable() && !col.type.isMultiCell(); + result.add(new CqlField(isPartitionKey, + isClusteringColumn, + isStatic, + name, + !(type instanceof CqlFrozen) && isFrozen ? CqlFrozen.build(type) : type, + position)); + position++; + } + return result; + } + + static Map rfToMap(ReplicationFactor replicationFactor) + { + Map result = new HashMap<>(replicationFactor.getOptions().size() + 1); + result.put("class", "org.apache.cassandra.locator." + replicationFactor.getReplicationStrategy().name()); + for (Map.Entry entry : replicationFactor.getOptions().entrySet()) + { + result.put(entry.getKey(), Integer.toString(entry.getValue())); + } + return result; + } +} diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java index 5d0493a1f..6dd0111cb 100644 --- a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java @@ -19,73 +19,21 @@ package org.apache.cassandra.spark.reader; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.function.Consumer; import java.util.function.Function; -import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.antlr.runtime.RecognitionException; -import org.apache.cassandra.bridge.CassandraSchema; -import org.apache.cassandra.bridge.CassandraTypesImplementation; -import org.apache.cassandra.bridge.SchemaUpdater; -import org.apache.cassandra.cql3.CQL3Type; -import org.apache.cassandra.cql3.CQLFragmentParser; -import org.apache.cassandra.cql3.CqlParser; -import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; -import org.apache.cassandra.cql3.statements.schema.CreateTypeStatement; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.CollectionType; -import org.apache.cassandra.db.marshal.ListType; -import org.apache.cassandra.db.marshal.MapType; -import org.apache.cassandra.db.marshal.SetType; -import org.apache.cassandra.db.marshal.TupleType; -import org.apache.cassandra.db.marshal.UserType; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.schema.ColumnMetadata; -import org.apache.cassandra.schema.KeyspaceMetadata; -import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.schema.Schema; -import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.schema.TableMetadataRef; -import org.apache.cassandra.schema.Types; import org.apache.cassandra.spark.data.CassandraTypes; -import org.apache.cassandra.spark.data.CqlField; import org.apache.cassandra.spark.data.CqlTable; import org.apache.cassandra.spark.data.ReplicationFactor; -import org.apache.cassandra.spark.data.complex.CqlFrozen; -import org.apache.cassandra.spark.data.complex.CqlUdt; import org.apache.cassandra.spark.data.partitioner.Partitioner; -import org.apache.cassandra.utils.Pair; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -public class SchemaBuilder +public class SchemaBuilder extends AbstractSchemaBuilder { - private static final Logger LOGGER = LoggerFactory.getLogger(SchemaBuilder.class); - - private final TableMetadata metadata; - private final KeyspaceMetadata keyspaceMetadata; - private final String createStmt; - private final String keyspace; - private final ReplicationFactor replicationFactor; - private final CassandraTypes cassandraTypes; - private final int indexCount; - public SchemaBuilder(CqlTable table, Partitioner partitioner, boolean enableCdc) { this(table, partitioner, null, enableCdc); @@ -132,462 +80,7 @@ public SchemaBuilder(String createStmt, int indexCount, boolean enableCdc) { - this.createStmt = createStmt; - this.keyspace = keyspace; - this.replicationFactor = replicationFactor; - this.cassandraTypes = new CassandraTypesImplementation(); - this.indexCount = indexCount; - - Pair updated = CassandraSchema.apply(schema -> - updateSchema(schema, - this.keyspace, - udtStatementsProvider.apply(cassandraTypes), - this.createStmt, - partitioner, - this.replicationFactor, - tableId, enableCdc, - this::validateColumnMetaData)); - this.keyspaceMetadata = updated.left; - this.metadata = updated.right; - } - - // Update schema with the given keyspace, table and udt. - // It creates the corresponding metadata and opens instances for keyspace and table, if needed. - // At the end, it validates that the input keyspace and table both should have metadata exist and instance opened. - private static Pair updateSchema(Schema schema, - String keyspace, - Set udtStatements, - String createStatement, - Partitioner partitioner, - ReplicationFactor replicationFactor, - UUID tableId, - boolean enableCdc, - Consumer columnValidator) - { - // Set up and open keyspace if needed - IPartitioner cassPartitioner = CassandraTypesImplementation.getPartitioner(partitioner); - setupKeyspace(schema, keyspace, replicationFactor, cassPartitioner); - - // Set up and open table if needed, parse UDTs and include when parsing table schema - List typeStatements = new ArrayList<>(udtStatements.size()); - for (String udt : udtStatements) - { - try - { - typeStatements.add((CreateTypeStatement.Raw) CQLFragmentParser - .parseAnyUnhandled(CqlParser::query, udt)); - } - catch (RecognitionException exception) - { - LOGGER.error("Failed to parse type expression '{}'", udt); - throw new IllegalStateException(exception); - } - } - Types.RawBuilder typesBuilder = Types.rawBuilder(keyspace); - for (CreateTypeStatement.Raw st : typeStatements) - { - st.addToRawBuilder(typesBuilder); - } - Types types = typesBuilder.build(); - CreateTableStatement.Raw createTable = CQLFragmentParser.parseAny(CqlParser::createTableStatement, - createStatement, - "CREATE TABLE"); - // If the table already exists, the tableId should remain the same, unless a non-null tableId is supplied - TableMetadata maybeExistingTableMetadata = schema.getTableMetadata(keyspace, createTable.table()); - if (maybeExistingTableMetadata != null && tableId == null) - { - tableId = maybeExistingTableMetadata.id.asUUID(); - } - - TableMetadata.Builder builder = createTable - .keyspace(keyspace) - .prepare(null) - .builder(types) - .partitioner(cassPartitioner); - - if (tableId != null) - { - builder.id(TableId.fromUUID(tableId)); - } - - TableMetadata tableMetadata = builder.build(); - - if (tableMetadata.params.cdc != enableCdc) - { - tableMetadata = tableMetadata.unbuild() - .params(tableMetadata.params.unbuild() - .cdc(enableCdc) - .build()) - .build(); - } - - tableMetadata.columns().forEach(columnValidator); - setupTableAndUdt(schema, keyspace, tableMetadata, types); - - return validateKeyspaceTable(schema, keyspace, tableMetadata.name); - } - - private void validateColumnMetaData(@NotNull ColumnMetadata column) - { - validateType(column.type); - } - - private void validateType(AbstractType type) - { - validateType(type.asCQL3Type()); - } - - private void validateType(CQL3Type cqlType) - { - if (!(cqlType instanceof CQL3Type.Native) - && !(cqlType instanceof CQL3Type.Collection) - && !(cqlType instanceof CQL3Type.UserDefined) - && !(cqlType instanceof CQL3Type.Tuple)) - { - throw new UnsupportedOperationException("Only native, collection, tuples or UDT data types are supported, " - + "unsupported data type: " + cqlType.toString()); - } - - if (cqlType instanceof CQL3Type.Native) - { - CqlField.CqlType type = cassandraTypes.parseType(cqlType.toString()); - if (!type.isSupported()) - { - throw new UnsupportedOperationException(type.name() + " data type is not supported"); - } - } - else if (cqlType instanceof CQL3Type.Collection) - { - // Validate collection inner types - CQL3Type.Collection collection = (CQL3Type.Collection) cqlType; - CollectionType type = (CollectionType) collection.getType(); - switch (type.kind) - { - case LIST: - validateType(((ListType) type).getElementsType()); - return; - case SET: - validateType(((SetType) type).getElementsType()); - return; - case MAP: - validateType(((MapType) type).getKeysType()); - validateType(((MapType) type).getValuesType()); - return; - default: - // Do nothing - } - } - else if (cqlType instanceof CQL3Type.Tuple) - { - CQL3Type.Tuple tuple = (CQL3Type.Tuple) cqlType; - TupleType tupleType = (TupleType) tuple.getType(); - for (AbstractType subType : tupleType.allTypes()) - { - validateType(subType); - } - } - else - { - // Validate UDT inner types - UserType userType = (UserType) ((CQL3Type.UserDefined) cqlType).getType(); - for (AbstractType innerType : userType.fieldTypes()) - { - validateType(innerType); - } - } - } - - private static boolean keyspaceMetadataExists(Schema schema, String keyspaceName) - { - return schema.getKeyspaceMetadata(keyspaceName) != null; - } - - private static boolean tableMetadataExists(Schema schema, String keyspaceName, String tableName) - { - KeyspaceMetadata ksMetadata = schema.getKeyspaceMetadata(keyspaceName); - if (ksMetadata == null) - { - return false; - } - - return ksMetadata.hasTable(tableName); - } - - private static boolean keyspaceInstanceExists(Schema schema, String keyspaceName) - { - return schema.getKeyspaceInstance(keyspaceName) != null; - } - - private static boolean tableInstanceExists(Schema schema, String keyspaceName, String tableName) - { - Keyspace keyspace = schema.getKeyspaceInstance(keyspaceName); - if (keyspace == null) - { - return false; - } - - try - { - keyspace.getColumnFamilyStore(tableName); - } - catch (IllegalArgumentException exception) - { - LOGGER.info("Table instance does not exist. keyspace={} table={} existingCFS={}", - keyspace, tableName, keyspace.getColumnFamilyStores()); - return false; - } - return true; - } - - // Check whether keyspace metadata exists. Create keyspace metadata, if not. - // Check whether keyspace instance is opened. Open the keyspace, if not. - // NOTE: It is possible that external code that just creates metadata, but does not open the keyspace - private static void setupKeyspace(Schema schema, - String keyspaceName, - ReplicationFactor replicationFactor, - IPartitioner partitioner) - { - if (!keyspaceMetadataExists(schema, keyspaceName)) - { - LOGGER.info("Setting up keyspace metadata in schema keyspace={} rfStrategy={} partitioner={}", - keyspaceName, replicationFactor.getReplicationStrategy().name(), partitioner); - KeyspaceMetadata keyspaceMetadata = - KeyspaceMetadata.create(keyspaceName, KeyspaceParams.create(true, rfToMap(replicationFactor))); - SchemaUpdater.load(schema, keyspaceMetadata); - } - - if (!keyspaceInstanceExists(schema, keyspaceName)) - { - LOGGER.info("Setting up keyspace instance in schema keyspace={} rfStrategy={} partitioner={}", - keyspaceName, replicationFactor.getReplicationStrategy().name(), partitioner); - // Create keyspace instance and also initCf (cfs) for the table - Keyspace.openWithoutSSTables(keyspaceName); - } - } - - // Check whether table metadata exists. Create table metadata, if not. - // Check whether table instance is opened. Open/init the table, if not. - // NOTE: It is possible that external code that just creates metadata, but does not open the table - private static void setupTableAndUdt(Schema schema, - String keyspaceName, - TableMetadata tableMetadata, - Types userTypes) - { - String tableName = tableMetadata.name; - KeyspaceMetadata keyspaceMetadata = schema.getKeyspaceMetadata(keyspaceName); - if (keyspaceMetadata == null) - { - LOGGER.error("Keyspace metadata does not exist. keyspace={}", keyspaceName); - throw new IllegalStateException("Keyspace metadata null for '" + keyspaceName - + "' when it should have been initialized already"); - } - - if (!tableMetadataExists(schema, keyspaceName, tableName)) - { - LOGGER.info("Setting up table metadata in schema keyspace={} table={} partitioner={}", - keyspaceName, tableName, tableMetadata.partitioner.getClass().getName()); - keyspaceMetadata = keyspaceMetadata.withSwapped(keyspaceMetadata.tables.with(tableMetadata)); - SchemaUpdater.load(schema, keyspaceMetadata, tableMetadata); - } - - if (!tableMetadata.equals(schema.getTableMetadata(keyspaceName, tableMetadata.name))) - { - // Schema of the table has changed so update it in the schema - updateTableMetaData(schema, keyspaceName, tableMetadata); - LOGGER.info("Table metadata changed schema keyspace={} table={} partitioner={}", - keyspaceName, tableName, tableMetadata.partitioner.getClass().getName()); - } - - // The metadata of the table might not be the input tableMetadata. Fetch the current to be safe. - TableMetadata currentTable = schema.getTableMetadata(keyspaceName, tableName); - if (!tableInstanceExists(schema, keyspaceName, tableName)) - { - LOGGER.info("Setting up table instance in schema keyspace={} table={} partitioner={}", - keyspaceName, tableName, tableMetadata.partitioner.getClass().getName()); - if (keyspaceInstanceExists(schema, keyspaceName)) - { - // initCf (cfs) in the opened keyspace - schema.getKeyspaceInstance(keyspaceName) - .initCf(TableMetadataRef.forOfflineTools(currentTable), false); - } - else - { - // The keyspace has not yet opened, create/open keyspace instance and also initCf (cfs) for the table - Keyspace.openWithoutSSTables(keyspaceName); - } - } - - if (!userTypes.equals(Types.none())) - { - LOGGER.info("Setting up user types in schema keyspace={} types={}", - keyspaceName, userTypes); - // Update Schema instance with any user-defined types built - keyspaceMetadata = keyspaceMetadata.withSwapped(userTypes); - SchemaUpdater.load(schema, keyspaceMetadata, userTypes); - } - } - - private static void updateTableMetaData(Schema schema, String keyspace, TableMetadata tableMetadata) - { - KeyspaceMetadata ks = schema.getKeyspaceMetadata(keyspace); - ks = ks.withSwapped(ks.tables.withSwapped(tableMetadata)); - SchemaUpdater.load(schema, ks, tableMetadata); - } - - private static Pair validateKeyspaceTable(Schema schema, - String keyspaceName, - String tableName) - { - Preconditions.checkState(keyspaceMetadataExists(schema, keyspaceName), - "Keyspace metadata does not exist after building schema. keyspace=%s", - keyspaceName); - Preconditions.checkState(keyspaceInstanceExists(schema, keyspaceName), - "Keyspace instance is not opened after building schema. keyspace=%s", - keyspaceName); - Preconditions.checkState(tableMetadataExists(schema, keyspaceName, tableName), - "Table metadata does not exist after building schema. keyspace=%s table=%s", - keyspaceName, tableName); - Preconditions.checkState(tableInstanceExists(schema, keyspaceName, tableName), - "Table instance is not opened after building schema. keyspace=%s table=%s", - keyspaceName, tableName); - - // Validated above that keyspace and table, both exist and are opened - KeyspaceMetadata keyspaceMetadata = schema.getKeyspaceMetadata(keyspaceName); - TableMetadata tableMetadata = schema.getTableMetadata(keyspaceName, tableName); - return Pair.create(keyspaceMetadata, tableMetadata); - } - - public TableMetadata tableMetaData() - { - return metadata; - } - - public String createStatement() - { - return createStmt; - } - - public CqlTable build() - { - Map udts = buildsUdts(keyspaceMetadata); - List fields = buildFields(metadata, udts).stream().sorted().collect(Collectors.toList()); - return new CqlTable(keyspace, - metadata.name, - createStmt, - replicationFactor, - fields, - new HashSet<>(udts.values()), - indexCount); - } - - private Map buildsUdts(KeyspaceMetadata keyspaceMetadata) - { - List userTypes = new ArrayList<>(); - keyspaceMetadata.types.forEach(userTypes::add); - Map udts = new HashMap<>(userTypes.size()); - while (!userTypes.isEmpty()) - { - UserType userType = userTypes.remove(0); - if (!SchemaBuilder.nestedUdts(userType).stream().allMatch(udts::containsKey)) - { - // This UDT contains a nested user-defined type that has not been parsed yet - // so re-add to the queue and parse later - userTypes.add(userType); - continue; - } - String name = userType.getNameAsString(); - CqlUdt.Builder builder = CqlUdt.builder(keyspaceMetadata.name, name); - for (int field = 0; field < userType.size(); field++) - { - builder.withField(userType.fieldName(field).toString(), - cassandraTypes.parseType(userType.fieldType(field).asCQL3Type().toString(), udts)); - } - udts.put(name, builder.build()); - } - - return udts; - } - - /** - * @param type an abstract type - * @return a set of UDTs nested within the type parameter - */ - private static Set nestedUdts(AbstractType type) - { - Set result = new HashSet<>(); - nestedUdts(type, result, false); - return result; - } - - private static void nestedUdts(AbstractType type, Set udts, boolean isNested) - { - if (type instanceof UserType) - { - if (isNested) - { - udts.add(((UserType) type).getNameAsString()); - } - for (AbstractType nestedType : ((UserType) type).fieldTypes()) - { - nestedUdts(nestedType, udts, true); - } - } - else if (type instanceof TupleType) - { - for (AbstractType nestedType : ((TupleType) type).allTypes()) - { - nestedUdts(nestedType, udts, true); - } - } - else if (type instanceof SetType) - { - nestedUdts(((SetType) type).getElementsType(), udts, true); - } - else if (type instanceof ListType) - { - nestedUdts(((ListType) type).getElementsType(), udts, true); - } - else if (type instanceof MapType) - { - nestedUdts(((MapType) type).getKeysType(), udts, true); - nestedUdts(((MapType) type).getValuesType(), udts, true); - } - } - - private List buildFields(TableMetadata metadata, Map udts) - { - Iterator it = metadata.allColumnsInSelectOrder(); - List result = new ArrayList<>(); - int position = 0; - while (it.hasNext()) - { - ColumnMetadata col = it.next(); - boolean isPartitionKey = col.isPartitionKey(); - boolean isClusteringColumn = col.isClusteringColumn(); - boolean isStatic = col.isStatic(); - String name = col.name.toString(); - CqlField.CqlType type = col.type.isUDT() ? udts.get(((UserType) col.type).getNameAsString()) - : cassandraTypes.parseType(col.type.asCQL3Type().toString(), udts); - boolean isFrozen = col.type.isFreezable() && !col.type.isMultiCell(); - result.add(new CqlField(isPartitionKey, - isClusteringColumn, - isStatic, - name, - !(type instanceof CqlFrozen) && isFrozen ? CqlFrozen.build(type) : type, - position)); - position++; - } - return result; - } - - static Map rfToMap(ReplicationFactor replicationFactor) - { - Map result = new HashMap<>(replicationFactor.getOptions().size() + 1); - result.put("class", "org.apache.cassandra.locator." + replicationFactor.getReplicationStrategy().name()); - for (Map.Entry entry : replicationFactor.getOptions().entrySet()) - { - result.put(entry.getKey(), Integer.toString(entry.getValue())); - } - return result; + super(createStmt, keyspace, replicationFactor, partitioner, udtStatementsProvider, + tableId, indexCount, enableCdc); } } diff --git a/gradle.properties b/gradle.properties index 30e1aa3b9..81a6a13ae 100644 --- a/gradle.properties +++ b/gradle.properties @@ -22,8 +22,8 @@ description=Apache Cassandra Analytics analyticsJDKLevel=17 cassandra40Version=4.0.17 -cassandra50Version=5.0.5 -sidecarVersion=0.2.0 +cassandra50Version=5.0.7 +sidecarVersion=0.4-SNAPSHOT intellijVersion=9.0.4 junitVersion=5.10.2 assertjCoreVersion=3.24.2 diff --git a/gradlew b/gradlew index 23d15a936..d9ea3c010 100755 --- a/gradlew +++ b/gradlew @@ -200,6 +200,12 @@ if "$cygwin" || "$msys" ; then done fi +# We want to increase the file descriptor limit to the MaxFDLimit in MacOS, which, +# by default, is set to a lower limit than the actual system maximum. This line is modified manually, if +# producing a new gradle wrapper, remember to add the change back. +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-XX:-MaxFDLimit\" \"-Dorg.gradle.jvmargs=-XX:-MaxFDLimit\"" +fi # Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' diff --git a/scripts/build-dtest-jars.sh b/scripts/build-dtest-jars.sh index fe7d4a969..ffd3e240b 100755 --- a/scripts/build-dtest-jars.sh +++ b/scripts/build-dtest-jars.sh @@ -45,7 +45,7 @@ else CANDIDATE_BRANCHES=( "cassandra-4.0:cassandra-4.0.17" "cassandra-4.1:99d9faeef57c9cf5240d11eac9db5b283e45a4f9" - "cassandra-5.0:cassandra-5.0.5" + "cassandra-5.0:cassandra-5.0.7" ) BRANCHES=( ${BRANCHES:-cassandra-4.0 cassandra-4.1 cassandra-5.0} ) echo ${BRANCHES[*]} diff --git a/scripts/relocate-dtest-dependencies.pom b/scripts/relocate-dtest-dependencies.pom index 3108b6c46..bb0d5e2b9 100644 --- a/scripts/relocate-dtest-dependencies.pom +++ b/scripts/relocate-dtest-dependencies.pom @@ -153,6 +153,7 @@ *:* **/Log4j2Plugins.dat + META-INF/versions/21/ From 55c4235e62895873104c693c963161a0b2d38044 Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Tue, 9 Jun 2026 11:26:43 +0200 Subject: [PATCH 2/4] Consistent 5.0.7 in tests --- .circleci/config.yml | 4 ++-- .github/workflows/test.yaml | 10 +++++----- build.gradle | 2 +- cassandra-analytics-cdc/build.gradle | 2 +- .../org/apache/cassandra/bridge/CassandraVersion.java | 4 ++-- scripts/build-dtest-jars.sh | 2 +- 6 files changed, 12 insertions(+), 12 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index b3238f772..68e5c11d0 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -374,7 +374,7 @@ workflows: spark: ["3"] scala: ["2.13"] jdk: ["11"] - cassandra: ["5.0.5"] + cassandra: ["5.0.7"] # Cassandra 5.0 on Spark 4 / Scala 2.13 / JDK 17 - int-test: @@ -386,4 +386,4 @@ workflows: spark: ["4"] scala: ["2.13"] jdk: ["17"] - cassandra: ["5.0.5"] + cassandra: ["5.0.7"] diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index db7bf2234..2bc40c45a 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -223,13 +223,13 @@ jobs: # into each match. To add a new version: add one entry to 'config' and one to # 'include'. matrix: - config: ['s2.13-c5.0.5', 's2.12-c4.1.4', 's2.12-c4.0.17', 's2.13-c5.0.5-spark4'] + config: ['s2.13-c5.0.7', 's2.12-c4.1.4', 's2.12-c4.0.17', 's2.13-c5.0.7-spark4'] job_index: [0, 1, 2, 3, 4] job_total: [5] include: - - config: 's2.13-c5.0.5' + - config: 's2.13-c5.0.7' scala: '2.13' - cassandra: '5.0.5' + cassandra: '5.0.7' jdk: '11' spark: '3' - config: 's2.12-c4.1.4' @@ -242,9 +242,9 @@ jobs: cassandra: '4.0.17' jdk: '11' spark: '3' - - config: 's2.13-c5.0.5-spark4' + - config: 's2.13-c5.0.7-spark4' scala: '2.13' - cassandra: '5.0.5' + cassandra: '5.0.7' jdk: '17' spark: '4' fail-fast: false diff --git a/build.gradle b/build.gradle index 38545fa8a..fec0d8305 100644 --- a/build.gradle +++ b/build.gradle @@ -67,7 +67,7 @@ ext.dependencyLocation = (System.getenv("CASSANDRA_DEP_DIR") ?: "${rootDir}/depe // - cassandraFullVersionMap values must match the supported_versions default // NOTE: Both maps must ALSO stay in sync with the values in build-dtest-jars.sh ext.cassandraVersionEnumMap = ["4.0": "FOURZERO", "4.1": "FOURONE", "5.0": "FIVEZERO"] -ext.cassandraFullVersionMap = ["4.0": "4.0.17", "4.1": "4.1.4", "5.0": "5.0.5"] +ext.cassandraFullVersionMap = ["4.0": "4.0.17", "4.1": "4.1.4", "5.0": "5.0.7"] // Shared helper: sets implemented_versions and supported_versions system properties on a Test task. // When majorMinor is provided (e.g. "4.0"), uses that version directly. diff --git a/cassandra-analytics-cdc/build.gradle b/cassandra-analytics-cdc/build.gradle index fb39be349..5a3918cc4 100644 --- a/cassandra-analytics-cdc/build.gradle +++ b/cassandra-analytics-cdc/build.gradle @@ -159,7 +159,7 @@ def configureCdcTestTask = { Test task, String majorMinor = null -> // Full version format to match CDC's TestVersionSupplier; tests both versions for backward compat. // 4.1 intentionally excluded from gradlew defaults to keep local iteration fast; // use testCassandra41 for targeted 4.1 runs. CI covers 4.1 via CASSANDRA_VERSION env var. - task.systemProperty "cassandra.sidecar.versions_to_test", "4.0.17,5.0.5" + task.systemProperty "cassandra.sidecar.versions_to_test", "4.0.17,5.0.7" } task.minHeapSize = '1024m' diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/CassandraVersion.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/CassandraVersion.java index 850aadca8..6767183b4 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/CassandraVersion.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/CassandraVersion.java @@ -36,12 +36,12 @@ * NOTE: The following values need to stay in sync with: * - build.gradle: * - ext.cassandraVersionEnumMap = ["4.0": "FOURZERO", "4.1": "FOURONE", "5.0": "FIVEZERO"] - * - ext.cassandraFullVersionMap = ["4.0": "4.0.17", "4.1": "4.1.4", "5.0": "5.0.5"] + * - ext.cassandraFullVersionMap = ["4.0": "4.0.17", "4.1": "4.1.4", "5.0": "5.0.7"] * - build-dtest-jars.sh: * - CANDIDATE_BRANCHES=( * "cassandra-4.0:cassandra-4.0.17" * "cassandra-4.1:99d9faeef57c9cf5240d11eac9db5b283e45a4f9" - * "cassandra-5.0:cassandra-5.0.5" + * "cassandra-5.0:cassandra-5.0.7" */ public enum CassandraVersion { diff --git a/scripts/build-dtest-jars.sh b/scripts/build-dtest-jars.sh index ffd3e240b..6e2416be7 100755 --- a/scripts/build-dtest-jars.sh +++ b/scripts/build-dtest-jars.sh @@ -40,7 +40,7 @@ else # # NOTE: The following branches need to stay in sync with the values in build.gradle: # ext.cassandraVersionEnumMap = ["4.0": "FOURZERO", "4.1": "FOURONE", "5.0": "FIVEZERO"] - # ext.cassandraFullVersionMap = ["4.0": "4.0.17", "4.1": "4.1.4", "5.0": "5.0.5"] + # ext.cassandraFullVersionMap = ["4.0": "4.0.17", "4.1": "4.1.4", "5.0": "5.0.7"] # NOTE: The following branches also need to remain in sync with CassandraVersion.java CANDIDATE_BRANCHES=( "cassandra-4.0:cassandra-4.0.17" From e6b52ac7bb988f2610dfc51b60e357f6599d871c Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Mon, 15 Jun 2026 13:38:35 +0200 Subject: [PATCH 3/4] Update Sidecar version --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 81a6a13ae..eaa11965a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -23,7 +23,7 @@ description=Apache Cassandra Analytics analyticsJDKLevel=17 cassandra40Version=4.0.17 cassandra50Version=5.0.7 -sidecarVersion=0.4-SNAPSHOT +sidecarVersion=0.4.0 intellijVersion=9.0.4 junitVersion=5.10.2 assertjCoreVersion=3.24.2 From 0d3187ebb1c4475ea3bb7e13b1d53b8ff346949d Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Mon, 15 Jun 2026 13:48:16 +0200 Subject: [PATCH 4/4] Formatting --- .../analytics/BulkReaderVectorTest.java | 3 ++- .../analytics/BulkWriteVectorTest.java | 18 ++++++++++++++++++ .../spark/data/complex/CqlVector.java | 2 -- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderVectorTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderVectorTest.java index c7acb7960..14c93ed38 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderVectorTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderVectorTest.java @@ -52,7 +52,8 @@ class BulkReaderVectorTest extends SharedClusterSparkIntegrationTestBase static final List> DATASET = new ArrayList<>(); static QualifiedName table1 = uniqueTestTableFullName(TEST_KEYSPACE); - static { + static + { for (int i = 0; i < ROW_COUNT; i++) { List vector = new ArrayList<>(); diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteVectorTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteVectorTest.java index 756138fd9..5583c24ca 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteVectorTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteVectorTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.analytics; import org.junit.jupiter.api.Test; diff --git a/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlVector.java b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlVector.java index bce534c1f..1407e0765 100644 --- a/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlVector.java +++ b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlVector.java @@ -33,9 +33,7 @@ import org.apache.cassandra.serializers.TypeSerializer; import org.apache.cassandra.spark.data.CqlField; import org.apache.cassandra.spark.data.CqlType; -import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.TimeUUID; -import org.apache.cassandra.utils.UUIDGen; import static org.apache.cassandra.spark.data.CqlField.NO_TTL;