Start with Cassandra and spring boot

Yogesh Wadile
3 min readJul 7, 2021

Dependency require

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<cassandra.driver>3.7.1</cassandra.driver>
<spring.version>2.4.2</spring.version>
<lombok.version>1.16.8</lombok.version>
</properties>

<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>

<!-- Spring dependency start -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- Spring dependency end -->

<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>${cassandra.driver}</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
<version>${cassandra.driver}</version>
</dependency>
</dependencies>

Configuration



import com.datastax.driver.core.*;
import com.datastax.driver.mapping.*;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;

@Configuration
public class CassandraConfiguration {

@Value("${cassandra.host}")
private String cassandraHost;

@Value("${cassandra.port}")
private int cassandraPort;

@Value("${cassandra.database-dml-username:admin}")
private String userName;

@Value("${cassandra.database-dml-password:admin}")
private String password;

@Value("${cassandra.keyspaces.keyspace.name}")
private String defaultKeyspaceName;

@Value("${cassandra.cluster.pooling.minThread}")
private int minThread;

@Value("${cassandra.cluster.pooling.maxThread}")
private int maxThread;

@Value("${cassandra.cluster.pooling.timeout}")
private int timeout;


@Value("${cassandra.keyspaces.keyspace.readConsistency}")
private String keyspaceReadConsistency;

@Value("${cassandra.keyspaces.keyspace.writeConsistency}")
private String keyspaceWriteConsistency;

private Session cassandraSession;

@Bean(destroyMethod = "close")
@ConditionalOnMissingBean(Cluster.class)
public Cluster cluster() {
PoolingOptions poolingOptions = new PoolingOptions();
poolingOptions.setMaxConnectionsPerHost(HostDistance.LOCAL, maxThread);
poolingOptions.setPoolTimeoutMillis(timeout);
poolingOptions.setCoreConnectionsPerHost(HostDistance.LOCAL, minThread);

QueryOptions queryOptions = new QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(keyspaceReadConsistency));
PlainTextAuthProvider authProvider = new PlainTextAuthProvider(userName, password);

return Cluster.builder()
.addContactPointsWithPorts(convertToInternetAddress())
.withPort(cassandraPort)
.withPoolingOptions(poolingOptions)
.withQueryOptions(queryOptions)
.withAuthProvider(authProvider)
.build();
}

@Bean(destroyMethod = "close")
@DependsOn({"cluster"})
public Session session() throws IOException {
System.out.println("session");
cassandraSession = cluster().connect(defaultKeyspaceName);
return cassandraSession;
}

@Bean
@DependsOn({"session"})
public MappingManager mappingManager() throws IOException {
PropertyMapper propMapper = new DefaultPropertyMapper().setPropertyTransienceStrategy(PropertyTransienceStrategy.OPT_IN);
MappingConfiguration mappingConfig = MappingConfiguration.builder().withPropertyMapper(propMapper).build();
return new MappingManager(cassandraSession, mappingConfig);
}

private List<InetSocketAddress> convertToInternetAddress() {
List<InetSocketAddress> cassandraHosts = Lists.newArrayList();
for (String host : cassandraHost.split(",")) {
InetSocketAddress socketAddress = new InetSocketAddress(host, cassandraPort);
cassandraHosts.add(socketAddress);
}
return cassandraHosts;
}
}

Repository

package com.learn.cassandra.entity.repository;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import com.learn.cassandra.entity.UserEntity;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;

import javax.annotation.PostConstruct;
import java.util.List;

import static com.datastax.driver.core.querybuilder.QueryBuilder.select;

@Repository
public class UserRepository {

private Mapper<UserEntity> mapper;

@Autowired
private MappingManager mappingManager;
@Autowired
private Session session;

@PostConstruct
void init() {
mapper = mappingManager.mapper(UserEntity.class);
}

public List<UserEntity> findAll() {
final ResultSet result = session.execute(select().all().from(UserEntity.NAME));
return mapper.map(result).all();
}
}

entity

package com.learn.cassandra.entity;

import com.datastax.driver.mapping.annotations.ClusteringColumn;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;
import lombok.Data;

import java.io.Serializable;
import java.util.Date;
import java.util.List;
import java.util.UUID;

@Data
@Table(name = UserEntity.NAME)
public class UserEntity implements Serializable {

public static final String NAME = "users";
@ClusteringColumn
private UUID id;

@PartitionKey
@Column(name = "customerid")
private UUID customerId;
@Column(name = "name")
private String name;

@Column(name = "status")
private String status;

@Column(name = "description")
private String description;

}

Service

package com.learn.cassandra.service;

import com.learn.cassandra.entity.UserEntity;
import com.learn.cassandra.entity.repository.UserRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class UserService {
@Autowired
private UserRepository userRepository;

public void printAll() {
List<UserEntity> entities = userRepository.findAll();
entities.forEach(entity ->
System.out.println(entity.getName() + " \t " + entity.getId())
);
}
}

Application

package com.learn.cassandra;

import com.learn.cassandra.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

//https://lankydan.dev/2018/04/15/interacting-with-cassandra-using-the-datastax-java-driver

@SpringBootApplication
@ComponentScan("com.learn")
public class Application implements CommandLineRunner {

@Autowired
private UserService userService;

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Override
public void run(String... args) throws Exception {
userService.printAll();
}
}

Application.yaml

spring:
application:
name: cassandra-learn-application

cassandra:
host: cassdb01,cassdb02, cassdb03
port: 50126
cluster:
pooling:
minThread: 5
maxThread: 10
timeout: 5000
keyspaces:
keyspace:
name: keyspac_name
readConsistency: LOCAL_QUORUM
writeConsistency: LOCAL_QUORUM

Table

CREATE TABLE IF NOT EXISTS keyspac_name.USER (
customerid uuid,
id uuid,
createddate timestamp,
deleted boolean,
deleteddate timestamp,
description text,
name text,
status text
PRIMARY KEY (customerid, id)
) WITH CLUSTERING ORDER BY ( id ASC )
AND bloom_filter_fp_chance = 0.01
AND comment = ''
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE'
AND caching = {
'keys' : 'ALL',
'rows_per_partition' : 'NONE'
}
AND compression = {
'chunk_length_in_kb' : 64,
'class' : 'LZ4Compressor',
'enabled' : true
}
AND compaction = {
'class' : 'SizeTieredCompactionStrategy',
'max_threshold' : 32,
'min_threshold' : 4
};

--

--

Yogesh Wadile

Software Engineer |Distributed | Microservice | Kubernetes | Docker | CI/CD | Mentor