Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ workflows:
spark: ["3"]
scala: ["2.13"]
jdk: ["11"]
cassandra: ["5.0.5"]
cassandra: ["5.0.7"]

# Cassandra 5.0 on Spark 4 / Scala 2.13 / JDK 17
- int-test:
Expand All @@ -386,4 +386,4 @@ workflows:
spark: ["4"]
scala: ["2.13"]
jdk: ["17"]
cassandra: ["5.0.5"]
cassandra: ["5.0.7"]
10 changes: 5 additions & 5 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,13 @@ jobs:
# into each match. To add a new version: add one entry to 'config' and one to
# 'include'.
matrix:
config: ['s2.13-c5.0.5', 's2.12-c4.1.4', 's2.12-c4.0.17', 's2.13-c5.0.5-spark4']
config: ['s2.13-c5.0.7', 's2.12-c4.1.4', 's2.12-c4.0.17', 's2.13-c5.0.7-spark4']
job_index: [0, 1, 2, 3, 4]
job_total: [5]
include:
- config: 's2.13-c5.0.5'
- config: 's2.13-c5.0.7'
scala: '2.13'
cassandra: '5.0.5'
cassandra: '5.0.7'
jdk: '11'
spark: '3'
- config: 's2.12-c4.1.4'
Expand All @@ -242,9 +242,9 @@ jobs:
cassandra: '4.0.17'
jdk: '11'
spark: '3'
- config: 's2.13-c5.0.5-spark4'
- config: 's2.13-c5.0.7-spark4'
scala: '2.13'
cassandra: '5.0.5'
cassandra: '5.0.7'
jdk: '17'
spark: '4'
fail-fast: false
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ ext.dependencyLocation = (System.getenv("CASSANDRA_DEP_DIR") ?: "${rootDir}/depe
// - cassandraFullVersionMap values must match the supported_versions default
// NOTE: Both maps must ALSO stay in sync with the values in build-dtest-jars.sh
ext.cassandraVersionEnumMap = ["4.0": "FOURZERO", "4.1": "FOURONE", "5.0": "FIVEZERO"]
ext.cassandraFullVersionMap = ["4.0": "4.0.17", "4.1": "4.1.4", "5.0": "5.0.5"]
ext.cassandraFullVersionMap = ["4.0": "4.0.17", "4.1": "4.1.4", "5.0": "5.0.7"]

// Shared helper: sets implemented_versions and supported_versions system properties on a Test task.
// When majorMinor is provided (e.g. "4.0"), uses that version directly.
Expand Down
2 changes: 1 addition & 1 deletion cassandra-analytics-cdc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def configureCdcTestTask = { Test task, String majorMinor = null ->
// Full version format to match CDC's TestVersionSupplier; tests both versions for backward compat.
// 4.1 intentionally excluded from gradlew defaults to keep local iteration fast;
// use testCassandra41 for targeted 4.1 runs. CI covers 4.1 via CASSANDRA_VERSION env var.
task.systemProperty "cassandra.sidecar.versions_to_test", "4.0.17,5.0.5"
task.systemProperty "cassandra.sidecar.versions_to_test", "4.0.17,5.0.7"
}

task.minHeapSize = '1024m'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private TestVersionSupplier()

public static Stream<CassandraVersion> testVersions()
{
String versions = System.getProperty("cassandra.sidecar.versions_to_test", "4.0.17,5.0.5");
String versions = System.getProperty("cassandra.sidecar.versions_to_test", "4.0.17,5.0.7");
return Arrays.stream(versions.split(","))
.map(String::trim)
.map(v -> CassandraVersion.fromVersion(v).orElseThrow(() -> new IllegalArgumentException("Unsupported version: " + v)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
* NOTE: The following values need to stay in sync with:
* - build.gradle:
* - ext.cassandraVersionEnumMap = ["4.0": "FOURZERO", "4.1": "FOURONE", "5.0": "FIVEZERO"]
* - ext.cassandraFullVersionMap = ["4.0": "4.0.17", "4.1": "4.1.4", "5.0": "5.0.5"]
* - ext.cassandraFullVersionMap = ["4.0": "4.0.17", "4.1": "4.1.4", "5.0": "5.0.7"]
* - build-dtest-jars.sh:
* - CANDIDATE_BRANCHES=(
* "cassandra-4.0:cassandra-4.0.17"
* "cassandra-4.1:99d9faeef57c9cf5240d11eac9db5b283e45a4f9"
* "cassandra-5.0:cassandra-5.0.5"
* "cassandra-5.0:cassandra-5.0.7"
*/
public enum CassandraVersion
{
Expand Down Expand Up @@ -98,7 +98,7 @@ public String jarBaseName()

// NOTE: These default versions must stay in sync with cassandraFullVersionMap in build.gradle.
String providedSupportedVersionsOrDefault = System.getProperty("cassandra.analytics.bridges.supported_versions",
"cassandra-4.0.17,cassandra-5.0.5");
"cassandra-4.0.17,cassandra-5.0.7");
supportedVersions = Arrays.stream(providedSupportedVersionsOrDefault.split(","))
.filter(version -> CassandraVersion.fromVersion(version)
.filter(v -> v.sstableFormats.contains(sstableFormat))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
public abstract class CassandraTypes
{
public static final Pattern COLLECTION_PATTERN = Pattern.compile("^(set|list|map|tuple)<(.+)>$", Pattern.CASE_INSENSITIVE);
public static final Pattern VECTOR_PATTERN = Pattern.compile("^(vector)<(.+),(.+)>$", Pattern.CASE_INSENSITIVE);
public static final Pattern FROZEN_PATTERN = Pattern.compile("^frozen<(.*)>$", Pattern.CASE_INSENSITIVE);

private final UDTs udts = new UDTs();
Expand Down Expand Up @@ -133,6 +134,8 @@ public List<CqlField.NativeType> supportedTypes()

public abstract CqlField.CqlList list(CqlField.CqlType type);

public abstract CqlField.CqlVector vector(CqlField.CqlType type, int dimensions);

public abstract CqlField.CqlSet set(CqlField.CqlType type);

public abstract CqlField.CqlMap map(CqlField.CqlType keyType, CqlField.CqlType valueType);
Expand Down Expand Up @@ -189,6 +192,14 @@ public CqlField.CqlType parseType(String type, Map<String, CqlField.CqlUdt> udts
.map(collectionType -> parseType(collectionType, udts))
.toArray(CqlField.CqlType[]::new));
}
Matcher vectorMatcher = VECTOR_PATTERN.matcher(type);
if (vectorMatcher.find())
{
// CQL vector
String subType = vectorMatcher.group(2);
int dimensions = Integer.parseInt(vectorMatcher.group(3).trim());
return vector(parseType(subType, udts), dimensions);
}
Matcher frozenMatcher = FROZEN_PATTERN.matcher(type);
if (frozenMatcher.find())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public interface CqlType extends Serializable
{
enum InternalType
{
NativeCql, Set, List, Map, Frozen, Udt, Tuple;
NativeCql, Set, List, Map, Frozen, Udt, Tuple, Vector;

public static InternalType fromString(String name)
{
Expand All @@ -77,6 +77,8 @@ public static InternalType fromString(String name)
return Set;
case "list":
return List;
case "vector":
return Vector;
case "map":
return Map;
case "tuple":
Expand Down Expand Up @@ -237,6 +239,10 @@ public interface CqlList extends CqlCollection
{
}

public interface CqlVector extends CqlCollection
{
}

public interface CqlTuple extends CqlCollection
{
ByteBuffer serializeTuple(Object[] values);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public final class CqlUtils
"min_index_interval",
"max_index_interval"
);
private static final Pattern REPLICATION_FACTOR_PATTERN = Pattern.compile("WITH REPLICATION = (\\{[^\\}]*\\})");
private static final Pattern REPLICATION_FACTOR_PATTERN = Pattern.compile("WITH REPLICATION = (\\{[^\\}]*\\})", Pattern.CASE_INSENSITIVE);
// Initialize a mapper allowing single quotes to process the RF string from the CREATE KEYSPACE statement
private static final ObjectMapper MAPPER = new ObjectMapper().configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
private static final Pattern ESCAPED_WHITESPACE_PATTERN = Pattern.compile("(\\\\r|\\\\n|\\\\r\\n)+");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public final class SqlToCqlTypeConverter implements Serializable
public static final String UDT = "udt";
public static final String VARCHAR = "varchar";
public static final String VARINT = "varint";
public static final String VECTOR = "vector";
private static final Logger LOGGER = LoggerFactory.getLogger(SqlToCqlTypeConverter.class);
private static final NoOp<Object> NO_OP_CONVERTER = new NoOp<>();
private static final LongConverter LONG_CONVERTER = new LongConverter();
Expand Down Expand Up @@ -165,6 +166,7 @@ public static Converter<?> getConverter(CqlField.CqlType cqlType)
case TINYINT:
return NO_OP_CONVERTER;
case LIST:
case VECTOR:
return new ListConverter<>((CqlField.CqlCollection) cqlType);
case MAP:
assert cqlType instanceof CqlField.CqlMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public interface CommitResultSupplier extends BiFunction<List<String>, String, D
{
}

public static final String DEFAULT_CASSANDRA_VERSION = "cassandra-5.0.5";
public static final String DEFAULT_CASSANDRA_VERSION = "cassandra-5.0.7";

private final UUID jobId;
private boolean skipClean = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ void testWriteWithSubRanges(String version)
@MethodSource("data")
void testWriteWithDataInMultipleSubRanges(String version)
{
version = "cassandra-5.0.5";
version = "cassandra-5.0.7";
setUp(version);
MockBulkWriterContext m = Mockito.spy(writerContext);
TokenPartitioner mtp = Mockito.mock(TokenPartitioner.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private void setup(ConsistencyLevel.CL consistencyLevel)
{
digestAlgorithm = new XXHash32DigestAlgorithm();
tableWriter = new MockTableWriter(folder);
writerContext = new MockBulkWriterContext(TOKEN_RANGE_MAPPING, "cassandra-5.0.5", consistencyLevel);
writerContext = new MockBulkWriterContext(TOKEN_RANGE_MAPPING, "cassandra-5.0.7", consistencyLevel);
writerContext.setReplicationFactor(new ReplicationFactor(NetworkTopologyStrategy, rfOptions));
transportContext = (TransportContext.DirectDataBulkWriterContext) writerContext.transportContext();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,21 @@
import org.junit.jupiter.params.provider.MethodSource;

import org.apache.cassandra.bridge.CassandraBridge;
import org.apache.cassandra.bridge.CassandraVersion;
import org.apache.cassandra.spark.TestUtils;
import org.apache.cassandra.spark.Tester;
import org.apache.cassandra.spark.data.CqlField;
import org.apache.cassandra.spark.utils.RandomUtils;
import org.apache.cassandra.spark.utils.test.TestSchema;
import org.apache.spark.sql.Row;
import org.quicktheories.core.Gen;
import scala.collection.mutable.AbstractSeq;

import static org.apache.cassandra.spark.utils.ScalaConversionUtils.mutableSeqAsJavaList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.quicktheories.QuickTheory.qt;
import static org.quicktheories.generators.SourceDSL.arbitrary;

@Tag("Sequential")
public class DataTypeTests
Expand Down Expand Up @@ -103,6 +107,103 @@ public void testSet(CassandraBridge bridge)
);
}

@ParameterizedTest
@MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
public void testVector(CassandraBridge bridge)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this test uses bridge to write sstables to disk and doesn't test complete bulk writer and bulk reader path. Can you add atleast one dtest using bulkWriterDataFrameWriter and bulkReaderDataFrame to test e2e writing and reading of vectors?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure do we need a vector converter in SqlToCqlTypeConverter.java similar to what we did for Tuples in this PR https://github.com/apache/cassandra-analytics/pull/174/changes. A dtest using bulk writer and reader will expose it if a converter is needed

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good catch! Still working on it.

{
assumeThat(bridge.getVersion().versionNumber()).isGreaterThanOrEqualTo(CassandraVersion.FIVEZERO.versionNumber());
qt().forAll(supportedVectorTypes(bridge))
.checkAssert(type ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
.withColumn("a", bridge.vector(type, 10)))
.withExpectedRowCountPerSSTable(Tester.DEFAULT_NUM_ROWS)
.run(bridge.getVersion())
);
}

@ParameterizedTest
@MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
public void testVectorVector(CassandraBridge bridge)
{
assumeThat(bridge.getVersion().versionNumber()).isGreaterThanOrEqualTo(CassandraVersion.FIVEZERO.versionNumber());
qt().forAll(supportedVectorTypes(bridge))
.checkAssert(type ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
.withColumn("a", bridge.vector(bridge.vector(type, 2), 5)))
.withExpectedRowCountPerSSTable(Tester.DEFAULT_NUM_ROWS)
.run(bridge.getVersion())
);
}

@ParameterizedTest
@MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
public void testVectorList(CassandraBridge bridge)
{
assumeThat(bridge.getVersion().versionNumber()).isGreaterThanOrEqualTo(CassandraVersion.FIVEZERO.versionNumber());
qt().forAll(supportedVectorTypes(bridge))
.checkAssert(type ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
.withColumn("a", bridge.vector(bridge.list(type), 3)))
.withExpectedRowCountPerSSTable(Tester.DEFAULT_NUM_ROWS)
.run(bridge.getVersion())
);
}

@ParameterizedTest
@MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
public void testVectorUDT(CassandraBridge bridge)
{
// pk -> a vector<frozen<nested_udt<x int, y type, z int>>, 10>
// Test vector of UDTs
assumeThat(bridge.getVersion().versionNumber()).isGreaterThanOrEqualTo(CassandraVersion.FIVEZERO.versionNumber());
qt().withExamples(10)
.forAll(supportedVectorTypes(bridge))
.checkAssert(type ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
.withColumn("a", bridge.vector(
bridge.udt("keyspace", "nested_udt")
.withField("x", bridge.aInt())
.withField("y", type)
.withField("z", bridge.aInt())
.build().frozen(),
10)))
.run(bridge.getVersion())
);
}

@ParameterizedTest
@MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
public void testVectorTuple(CassandraBridge bridge)
{
// pk -> a vector<frozen<tuple<type, float, text>>, 7>
// Test tuple nested within vector
assumeThat(bridge.getVersion().versionNumber()).isGreaterThanOrEqualTo(CassandraVersion.FIVEZERO.versionNumber());
qt().withExamples(10)
.forAll(supportedVectorTypes(bridge))
.checkAssert(type ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
.withColumn("a", bridge.vector(bridge.tuple(type,
bridge.aFloat(),
bridge.text()).frozen(), 7)))
.run(bridge.getVersion())
);
}

private static Gen<CqlField.NativeType> supportedVectorTypes(CassandraBridge bridge)
{
// TODO: Vector of list of durations fail, because we cannot replace DurationSerializer with
// AnalyticsDurationSerializer across all serializers used by VectorType.
List<CqlField.NativeType> supportedTypes = bridge.supportedTypes().stream()
.filter(t -> !t.equals(bridge.duration()))
.collect(Collectors.toList());
return arbitrary().pick(supportedTypes);
}

@ParameterizedTest
@MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
public void testList(CassandraBridge bridge)
Expand Down
Loading
Loading