diff --git a/CHANGES.txt b/CHANGES.txt index ec3409eac..b9ddd00eb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.5.0 ----- + * Analytics should identify if a keyspace is tracked to determine appropriate stream session to use for bulk writes (CASSANALYTICS-160) * Avoid Spark 4 partitioning warnings during bulk reads (CASSANALYTICS-171) * Spark 4.0 Support (CASSANALYTICS-34) * Add IAM credential support for S3 storage transport (CASSANALYTICS-155) diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java index 9d989be51..8511dbaa1 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java @@ -52,6 +52,9 @@ public final class CqlUtils "max_index_interval" ); private static final Pattern REPLICATION_FACTOR_PATTERN = Pattern.compile("WITH REPLICATION = (\\{[^\\}]*\\})"); + private static final String TRACKED_REPLICATION_TYPE = "tracked"; + private static final Pattern REPLICATION_TYPE_PATTERN = Pattern.compile("replication_type\\s*=\\s*'(\\w+)'", + Pattern.CASE_INSENSITIVE); // Initialize a mapper allowing single quotes to process the RF string from the CREATE KEYSPACE statement private static final ObjectMapper MAPPER = new ObjectMapper().configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true); private static final Pattern ESCAPED_WHITESPACE_PATTERN = Pattern.compile("(\\\\r|\\\\n|\\\\r\\n)+"); @@ -296,4 +299,33 @@ public static boolean isTimeRangeFilterSupported(String compactionStrategy) { return compactionStrategy == null || compactionStrategy.endsWith("TimeWindowCompactionStrategy"); } + + /** + * Extracts replication type from create schema statement + * + * @param schemaStr full cluster schema string as returned by Sidecar + * @param keyspace name of the keyspace to check + * @return {@code true} if keyspace is tracked {@code false} otherwise + */ + public static String extractReplicationType(@NotNull String schemaStr, @NotNull String keyspace) + { + String createKeyspaceSchema = extractKeyspaceSchema(schemaStr, keyspace); + Matcher matcher = REPLICATION_TYPE_PATTERN.matcher(createKeyspaceSchema); + if (matcher.find()) + { + return matcher.group(1); + } + return null; + } + + /** + * Returns {@code true} if {@code replication_type = 'tracked'} in create statement otherwise {@code false} + * + * @param replicationType replication type extracted from create statement + * @return {@code true} if replication type is tracked {@code false} otherwise + */ + public static boolean isTracked(String replicationType) + { + return TRACKED_REPLICATION_TYPE.equalsIgnoreCase(replicationType); + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java index a9c120871..53ac97974 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java @@ -356,6 +356,17 @@ public ReplicationFactor replicationFactor() } } + @Override + public String getReplicationType() + { + String keyspaceSchema = getKeyspaceSchema(true); + if (keyspaceSchema == null) + { + throw new RuntimeException("Could not retrieve keyspace schema information for keyspace " + conf.keyspace); + } + return CqlUtils.extractReplicationType(keyspaceSchema, conf.keyspace); + } + @Override public TokenRangeMapping getTokenRangeMapping(boolean cached) { diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraDirectDataTransportContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraDirectDataTransportContext.java index bed36ac4b..fb82483a7 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraDirectDataTransportContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraDirectDataTransportContext.java @@ -46,13 +46,24 @@ public CassandraDirectDataTransportContext(@NotNull BulkWriterContext bulkWriter } @Override - public DirectStreamSession createStreamSession(BulkWriterContext writerContext, - String sessionId, - SortedSSTableWriter sstableWriter, - Range range, - ReplicaAwareFailureHandler failureHandler, - ExecutorService executorService) + public StreamSession createStreamSession( + BulkWriterContext writerContext, + String sessionId, + SortedSSTableWriter sstableWriter, + Range range, + ReplicaAwareFailureHandler failureHandler, + ExecutorService executorService) { + if (bridge().isTracked(clusterInfo.getReplicationType())) + { + return new TrackedDirectStreamSession(writerContext, + sstableWriter, + this, + sessionId, + range, + failureHandler, + executorService); + } return new DirectStreamSession(writerContext, sstableWriter, this, @@ -71,7 +82,11 @@ public DirectDataTransferApi dataTransferApi() // only invoke in constructor protected DirectDataTransferApi createDirectDataTransferApi() { - CassandraBridge bridge = CassandraBridgeFactory.get(clusterInfo.getLowestCassandraVersion()); - return new SidecarDataTransferApi(clusterInfo.getCassandraContext(), bridge, jobInfo); + return new SidecarDataTransferApi(clusterInfo.getCassandraContext(), bridge(), jobInfo); + } + + private CassandraBridge bridge() + { + return CassandraBridgeFactory.get(clusterInfo.getLowestCassandraVersion()); } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java index 64b654449..e4d8a0a0b 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java @@ -82,6 +82,16 @@ public interface ClusterInfo extends StartupValidatable */ ReplicationFactor replicationFactor(); + /** + * @return {@code replication_type} of the enclosing keyspace (e.g. {@code "tracked"}, {@code "untracked"}), + * or {@code null} if replication_type is absent + */ + @Nullable + default String getReplicationType() + { + return null; + } + CassandraContext getCassandraContext(); /** diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TrackedDirectStreamSession.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TrackedDirectStreamSession.java new file mode 100644 index 000000000..166ef2387 --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TrackedDirectStreamSession.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter; + +import java.math.BigInteger; +import java.util.Set; +import java.util.concurrent.ExecutorService; + +import com.google.common.collect.Range; + +import org.apache.cassandra.bridge.SSTableDescriptor; +import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; + +/** + * Stream session for bulk writes to keyspaces with mutation tracking enabled. + * + *

+ * Tracked stream session uploads and triggers import on the coordinator node only. Cassandra's coordinated + * transfer then propagates the data to all other replicas, avoiding the duplicate-row updates that would occur if each + * replica independently streamed the data to its peers. + *

+ */ +public class TrackedDirectStreamSession extends StreamSession +{ + public TrackedDirectStreamSession(BulkWriterContext writerContext, + SortedSSTableWriter sstableWriter, + TransportContext.DirectDataBulkWriterContext transportContext, + String sessionID, + Range tokenRange, + ReplicaAwareFailureHandler failureHandler, + ExecutorService executorService) + { + super(writerContext, sstableWriter, transportContext, sessionID, tokenRange, failureHandler, executorService); + } + + @Override + protected void onSSTablesProduced(Set sstables) + { + throw new UnsupportedOperationException("TrackedDirectStreamSession is not yet implemented"); + } + + @Override + protected StreamResult doFinalizeStream() + { + throw new UnsupportedOperationException("TrackedDirectStreamSession is not yet implemented"); + } + + @Override + protected void sendRemainingSSTables() + { + throw new UnsupportedOperationException("TrackedDirectStreamSession is not yet implemented"); + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfoTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfoTest.java index 5ed4c5221..763082a36 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfoTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfoTest.java @@ -30,8 +30,10 @@ import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; import org.apache.cassandra.spark.exception.TimeSkewTooLargeException; +import org.apache.cassandra.spark.utils.CqlUtils; import static org.apache.cassandra.spark.TestUtils.range; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; @@ -75,6 +77,50 @@ public static CassandraClusterInfo mockClusterInfoForTimeSkewTest(int allowanceM return new MockClusterInfoForTimeSkew(allowanceMinutes, remoteNow); } + @Test + void testGetTrackedReplicationType() + { + String schema = "CREATE KEYSPACE mykeyspace " + + "WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': '3'}" + + " AND durable_writes = true" + + " AND replication_type = 'tracked';"; + CassandraClusterInfo ci = mockClusterInfoWithSchema("mykeyspace", schema); + assertThat(ci.getReplicationType()) + .describedAs("Keyspace with replication_type = 'tracked' should return 'tracked'") + .isEqualTo("tracked"); + } + + @Test + void testGetUntrackedReplicationType() + { + String schema = "CREATE KEYSPACE k " + + "WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': '3'}" + + " AND durable_writes = true" + + " AND replication_type = 'untracked';"; + CassandraClusterInfo ci = mockClusterInfoWithSchema("k", schema); + assertThat(ci.getReplicationType()) + .describedAs("Keyspace with replication_type = 'untracked' should return 'untracked'") + .isEqualTo("untracked"); + } + + @Test + void testGetReplicationTypeReturnsNullWhenPropertyAbsent() + { + // Schema without replication_type — pre-mutation-tracking clusters + String schema = "CREATE KEYSPACE k " + + "WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': '3'}" + + " AND durable_writes = true;"; + CassandraClusterInfo ci = mockClusterInfoWithSchema("k", schema); + assertThat(ci.getReplicationType()) + .describedAs("Keyspace without replication_type property should return null") + .isNull(); + } + + private static CassandraClusterInfo mockClusterInfoWithSchema(String keyspace, String schemaStr) + { + return new MockClusterInfoForSchema(keyspace, schemaStr); + } + private static class MockClusterInfoForTimeSkew extends CassandraClusterInfo { private CassandraContext cassandraContext; @@ -107,4 +153,39 @@ private void mockCassandraContext(int allowanceMinutes, Instant remoteNow) when(cassandraContext.sidecarPort()).thenReturn(9043); } } + + /** + * Minimal ClusterInfo stub that returns a fixed keyspace schema string, used to test + * {@link CassandraClusterInfo#getReplicationType()}. + */ + private static class MockClusterInfoForSchema extends CassandraClusterInfo + { + private final String keyspaceName; + private final String schemaStr; + + MockClusterInfoForSchema(String keyspace, String schemaStr) + { + super((BulkSparkConf) null); + this.keyspaceName = keyspace; + this.schemaStr = schemaStr; + } + + @Override + protected CassandraContext buildCassandraContext() + { + return mock(CassandraContext.class, RETURNS_DEEP_STUBS); + } + + @Override + public String getKeyspaceSchema(boolean cached) + { + return schemaStr; + } + + @Override + public String getReplicationType() + { + return CqlUtils.extractReplicationType(schemaStr, keyspaceName); + } + } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraDirectDataTransportContextTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraDirectDataTransportContextTest.java new file mode 100644 index 000000000..0bd2170bb --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraDirectDataTransportContextTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter; + +import java.math.BigInteger; +import java.nio.file.Path; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import com.google.common.collect.BoundType; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import org.apache.cassandra.spark.bulkwriter.token.MultiClusterReplicaAwareFailureHandler; +import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; +import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm; + +import static org.apache.cassandra.spark.data.ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests that {@link CassandraDirectDataTransportContext#createStreamSession}. + */ +public class CassandraDirectDataTransportContextTest +{ + @TempDir + private Path folder; + + private MockBulkWriterContext writerContext; + private MockTableWriter tableWriter; + private ExecutorService executor; + private Range range; + + @BeforeEach + public void setup() + { + ImmutableMap rfOptions = ImmutableMap.of("DC1", 3); + ReplicationFactor rf = new ReplicationFactor(NetworkTopologyStrategy, rfOptions); + TokenRangeMapping tokenRangeMapping = TokenRangeMappingUtils.buildTokenRangeMapping(0, rfOptions, 12); + writerContext = new MockBulkWriterContext(tokenRangeMapping); + writerContext.setReplicationFactor(rf); + tableWriter = new MockTableWriter(folder); + range = Range.range(BigInteger.valueOf(101L), BoundType.CLOSED, BigInteger.valueOf(199L), BoundType.CLOSED); + executor = Executors.newSingleThreadExecutor(); + } + + @AfterEach + public void tearDown() + { + executor.shutdownNow(); + } + + @Test + void createDirectStreamSessionForUntrackedKeyspace() + { + writerContext.setTrackedKeyspace(false); + CassandraDirectDataTransportContext transportContext = stubTransportContext(); + + StreamSession session = transportContext.createStreamSession( + writerContext, + "session-untracked", + new SortedSSTableWriter(tableWriter, folder, new XXHash32DigestAlgorithm(), 1), + range, + buildFailureHandler(), + executor); + + assertThat(session) + .describedAs("Untracked keyspace should produce a DirectStreamSession") + .isInstanceOf(DirectStreamSession.class); + } + + @Test + void createTrackedDirectStreamSessionForTrackedKeyspace() + { + writerContext.setTrackedKeyspace(true); + CassandraDirectDataTransportContext transportContext = stubTransportContext(); + + StreamSession session = transportContext.createStreamSession( + writerContext, + "session-tracked", + new SortedSSTableWriter(tableWriter, folder, new XXHash32DigestAlgorithm(), 1), + range, + buildFailureHandler(), + executor); + + assertThat(session) + .describedAs("Tracked keyspace should produce a TrackedDirectStreamSession") + .isInstanceOf(TrackedDirectStreamSession.class); + } + + @Test + void createDirectStreamSessionForAbsentReplicationType() + { + CassandraDirectDataTransportContext transportContext = stubTransportContext(); + + StreamSession session = transportContext.createStreamSession( + writerContext, + "session-null-type", + new SortedSSTableWriter(tableWriter, folder, new XXHash32DigestAlgorithm(), 1), + range, + buildFailureHandler(), + executor); + + assertThat(session) + .describedAs("Null replication type (pre-mutation-tracking) should produce a DirectStreamSession") + .isInstanceOf(DirectStreamSession.class); + } + + private CassandraDirectDataTransportContext stubTransportContext() + { + DirectDataTransferApi mockApi = + ((TransportContext.DirectDataBulkWriterContext) writerContext.transportContext()).dataTransferApi(); + + return new CassandraDirectDataTransportContext(writerContext) + { + @Override + protected DirectDataTransferApi createDirectDataTransferApi() + { + return mockApi; + } + }; + } + + private ReplicaAwareFailureHandler buildFailureHandler() + { + return new MultiClusterReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner()); + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java index cb5564f0c..be05715c4 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java @@ -99,6 +99,7 @@ public interface CommitResultSupplier extends BiFunction, String, D private final UUID jobId; private boolean skipClean = false; + private String replicationType = null; public int refreshClusterInfoCallCount = 0; // CHECKSTYLE IGNORE: Public mutable field private final Map> uploads = new ConcurrentHashMap<>(); private final Map> commits = new ConcurrentHashMap<>(); @@ -204,6 +205,17 @@ public void setReplicationFactor(ReplicationFactor replicationFactor) this.replicationFactor = replicationFactor; } + @Override + public String getReplicationType() + { + return replicationType; + } + + public void setTrackedKeyspace(boolean tracked) + { + this.replicationType = tracked ? "tracked" : "untracked"; + } + @Override public CassandraContext getCassandraContext() { diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java index 5bb363d97..58fb5c960 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java @@ -833,6 +833,43 @@ public void testTimeRangeFilterSupported() assertThat(isTimeRangeFilterSupported("")).isFalse(); } + @Test + public void testExtractReplicationType() + { + String schemaStrTracked + = "CREATE KEYSPACE k WITH REPLICATION " + + "= { 'class' : 'org.apache.cassandra.locator.NetworkTopologyStrategy', 'dc1': '3' }" + + " AND DURABLE_WRITES = true AND replication_type = 'tracked';"; + assertThat(CqlUtils.extractReplicationType(schemaStrTracked, "k")).isEqualTo("tracked"); + String schemaStrTrackedCaseInsensitive + = "CREATE KEYSPACE k WITH REPLICATION " + + "= { 'class' : 'org.apache.cassandra.locator.NetworkTopologyStrategy', 'dc1': '3' }" + + " AND DURABLE_WRITES = true AND REPLICATION_TYPE = 'Tracked';"; + assertThat(CqlUtils.extractReplicationType(schemaStrTrackedCaseInsensitive, "k")).isEqualTo("Tracked"); + String schemaStrUntracked + = "CREATE KEYSPACE k WITH REPLICATION " + + "= { 'class' : 'org.apache.cassandra.locator.NetworkTopologyStrategy', 'dc1': '3' }" + + " AND DURABLE_WRITES = true AND replication_type = 'untracked';"; + assertThat(CqlUtils.extractReplicationType(schemaStrUntracked, "k")).isEqualTo("untracked"); + String schemaStrNoReplicationType + = "CREATE KEYSPACE k WITH REPLICATION " + + "= { 'class' : 'org.apache.cassandra.locator.NetworkTopologyStrategy', 'dc1': '3' }" + + " AND DURABLE_WRITES = true;"; + assertThat(CqlUtils.extractReplicationType(schemaStrNoReplicationType, "k")).isNull(); + } + + @Test + public void testIsTracked() + { + assertThat(CqlUtils.isTracked("tracked")).isTrue(); + assertThat(CqlUtils.isTracked("TRACKED")).isTrue(); + assertThat(CqlUtils.isTracked("Tracked")).isTrue(); + assertThat(CqlUtils.isTracked("untracked")).isFalse(); + assertThat(CqlUtils.isTracked("random")).isFalse(); + assertThat(CqlUtils.isTracked(null)).isFalse(); + assertThat(CqlUtils.isTracked("")).isFalse(); + } + private static String loadFullSchemaSample() throws IOException { Path fullSchemaSampleFile = ResourceUtils.writeResourceToPath(CqlUtilsTest.class.getClassLoader(), tempPath, "cql/fullSchema.cql"); diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java index 6bbf7f142..bbfc20fe9 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java @@ -62,6 +62,7 @@ import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter; import org.apache.cassandra.analytics.stats.Stats; import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter; +import org.apache.cassandra.spark.utils.CqlUtils; import org.apache.cassandra.spark.utils.TimeProvider; import org.apache.cassandra.util.CompressionUtil; import org.jetbrains.annotations.NotNull; @@ -167,6 +168,18 @@ public String maybeQuoteIdentifier(String identifier) return cassandraTypes().maybeQuoteIdentifier(identifier); } + /** + * @return {@code true} if mutation tracking is enabled given the replication type, {@code false} otherwise + */ + public boolean isTracked(String replicationType) + { + if (replicationType == null) + { + return false; + } + return CqlUtils.isTracked(replicationType); + } + // CQL Type Parsing public CqlField.CqlType readType(CqlField.CqlType.InternalType type, Input input)