by  Oleksandr Berchenko

Cassandra + Spark SQL Performance (including DSE 5.0)

Continuing the overview of Cassandra + Spark setup, this post focuses on quick and dirty performance comparison of different Cassandra + Spark options.

In the previous article, we discussed installation of the following Cassandra + Spark options including both Community Edition and DataStax Enterprise:

  1. Cassandra 2.1 + Spark 1.4.1
  2. Cassandra 3.0 + Spark 1.6.1
  3. DataStax Enterprise 4.8 (C assandra 2.1 + Spark 1.4.2)
  4. DataStax Enterprise 5.0 (Cassandra 3.0 + Spark 1.6.1)

This time let's perform quick and dirty performance comparison of the mentioned options.

Each solution is running on Amazon AWS and has identical environment configuration:

  • US East (N. Virginia)
  • Single m4.xlarge (4 vCPUs and 16 GiB) node
  • 8 GiB of General Purpose SSD (GP2), 100/3000 IOPS
  • CentOS 7.1 x86_64 with cloud-init (HVM) (ami-91e416fa)
  • Java SE Development Kit 8u92

All applications use their default configuration (I haven’t carried out any additional tuning).

For DataStax Enterprise solutions we will separately evaluate these two options:

  1. Tables automatically imported from Cassandra (a prominent DSE feature)
  2. Manually created tables (the only available option for open source Cassandra)

The idea behind the test is extremely simple: create two large tables in Cassandra and then join them in Spark, additionally applying grouping and ordering.

First things first: insert test data into Cassandra. In order to do so, you may leverage a standard Cassandra-stress tool or find specific data generators in the web. However, taking into account simplicity of this particular case, I called it a day and decided to just write a very simple Python script to generate CSV files with the needed data and import them into Cassandra.

The script develops the idea of joining "performers" and "albums" tables. It uses the schema of "countries" and "albums" tables with no changes slightly simplifying the schema of "performers" table by removing "styles LIST <TEXT>" field. Also, it completely reuse s original "countries.csv" with names of 89 countries.

See the final schema below:

countries (
    name text primary key
)
 
performers (
    name text,
    country text,
    gender text,
    type text,
    born text,
    died text,
    primary key (name)
)
 
albums (
    performer text,
    title text,
    year text,
    country text,
    quality text,
    status text,
    primary key (title)
)
 

The script input is original "countries.csv" file, while new "performers.csv" and "albums.csv" files are the output. You may configure a number of performers (500,000 by default) as well the maximal number of albums for one performer (100 by default).

Here’s the script code:

import random
 
PERFORMERS_FILE = 'performers.csv'
ALBUMS_FILE = 'albums.csv'
COUNTRIES_FILE = 'countries.csv'
 
NUM_PERFORMERS = 500000
MAX_NUM_ALBUMS = 100
MIN_YEAR = 1931
MAX_YEAR = 2016
MIN_PERSON_AGE = 18
DIED_CHANCE = 0.5
 
TYPES = ['Person', 'Group']
GENDERS = ['Male', 'Female']
 
def simple_counter():
    counter = [0]
    def _simple_counter():
        counter[0] += 1
        return counter[0]
 
    return _simple_counter
 
PERFORMER_COUNTER = simple_counter()
ALBUM_COUNTER = simple_counter()
 
def read_countries():
    with open(COUNTRIES_FILE, 'r') as f:
        data = f.read()
 
    countries = data.split('\n')
    countries = [c for c in countries if c]
    return countries
 
COUNTRIES = read_countries()
 
def generate_album_data(performer, performer_type, born, died, country):
    title = 'title_%s' % (ALBUM_COUNTER(),)
    if performer_type == 'Person':
        f irst_year = born + MIN_PERSON_AGE
        last_year = died or MAX_YEAR
    else:
        first_year = born
        last_year = died or MAX_YEAR
 
    year = random.randint(first_year, last_year)
    quality = 'normal'
    status = 'Official'
    album_data = [performer, title, year, country, quality, status]
    album_data = ','.join(str(i) for i in album_data) + '\n'
    return album_data
 
def write_performer(f_performers, f_albums):
    name = 'name_%s' % (PERFORMER_COUNTER(),)
    country = random.choice(COUNTRIES)
    is_died = random.random() >= DIED_CHANCE
    performer_type = random.choice(TYPES)
    if performer_type == 'Person':
        gender = random.choice(GENDERS)
        born = random.randint(MIN_YEAR, MAX_YEAR - MIN_PERSON_AGE)
        if is_died:
            died = random.randint(born + MIN_PERSON_AGE, MAX_YEAR)
        else:
            died = ''
    else:
        gender = ''
        born = random.randint(MIN_YEAR + MIN_PERSON_AGE, MAX_YEAR)
        if is_died:
            died = random.randint(born, MAX_YEAR)
        else:
            died = ''
 
    performer_data = [name, country, gender, performer_type, born, died]
    performer_data = ','.join(str(i) for i in performer_data) + '\n'
 
    albums_data = []
    num_albums = random.randint(1, MAX_NUM_ALBUMS)
    for _ in range(num_albums):
        albums_data.append(generate_album_data(name, performer_type, born,
                                               died, country))
 
    f_performers.write(performer_data)
    f_albums.write(''.join(albums_data))
 
def main():
    with open(PERFORMERS_FILE, 'w') as f_performers:
        with open(ALBUMS_FILE, 'w') as f_albums:
            for _ in range(NUM_PERFORMERS):
                write_performer(f_performers, f_albums)
 
    print('Done')
 
if __name__ == '__main__':
    main()
 

The script generated "performers.csv" with 500,000 performers (19,460,116 bytes) and "albums.csv" with 5,255,065 albums (292,769,524 bytes). In total, including original "countries.csv" (89 countries, 814 bytes) all the data takes about 298 MB.

Create a new keyspace in each Cassandra database (Cassandra 2.1, Cassandra 3.0, DSE 4.8 and DSE 5.0):

cqlsh
create keyspace if not exists test with replication = {'class': 'SimpleStrategy','replication_factor' : 1};
create table if not exists test.countries (name text primary key);
create table if not exists test.performers (name text, country text, gender text, type text, born text, died text, primary key (name));
create table if not exists test.albums (performer text, title text, year text, country text, quality text, status text, primary key (title));
exit

 

Now let's import those files into every database:

cqlsh -e "copy test.countries(name) from 'countries.csv'"
cqlsh -e "copy test.performers(name, country, gender, type, born, died) from 'performers.csv'"
cqlsh -e "copy test.albums(performer, title, year, country, quality, status) from 'albums.csv'"

 

Cassandra 3.0 and DSE 5.0 have perfectly imported all the files; at the same time, both Cassandra 2.1 and DSE 4.8 failed after importing about 3 million records from "albums.csv" with "Out of memory" error. A quick workaround was to split "albums.csv" into several smaller files:

split -l 1000000 albums.csv albums.csv.
 

Below see the final script to create the tables and insert the data:

cqlsh -e "create keyspace if not exist s test with replication = {'class': 'SimpleStrategy','replication_factor' : 1}"
cqlsh -e "create table if not exists test.countries (name text primary key)"
cqlsh -e "create table if not exists test.performers (name text, country text, gender text, type text, born text, died text, primary key (name))"
cqlsh -e "create table if not exists test.albums (performer text, title text, year text, country text, quality text, status text, primary key (title))"
cqlsh -e "copy test.countries(name) from 'countries.csv'"
cqlsh -e "copy test.performers(name, country, gender, type, born, died) from 'performers.csv'"
for filename in albums.csv.*; do
cqlsh -e "copy test.albums(performer, title, year, country, quality, status) from '$filename'"
done

 

Now let's take a look at how much space it takes in each database (given that original .csv files occupy 298 MB):

Database

Size on disk

% of original .csv size

Cassandra 2.1

498 MB

167%

Cassandra 3.0

269 MB

90%

DSE 4.8 (Cassandra 2.1)

499 MB

167%

DSE 5.0 (Cassandra 3.0)

265 MB

89%

 

Size inside Cassandra 2.1 and DSE 4.8 (that uses Cassandra 2.1 under the hood) is identical and much bigger than the original .csv files. Size inside Cassandra 3.0 and DSE 5.0 (that uses Cassandra 3.0 under the hood) is much smaller and takes even less space than the original .csv files. That was expected, because Cassandra 3.0 storage engine has been significantly improved.

On each instance, make sure that Spark SQL Thrift Server is running and then connect to the database using beeline that follows:

./beeline -u jdbc:hive2://<IP address>:10000
 

Manually create the tables in Hive:

create table countries using org.apache.spark.sql.cassandra options (cluster 'Test Cluster', keyspace 'test', table 'countries');
create table performers using org.apache.spark.sql.cassandra options (cluster 'Test Cluster', keyspace 'test', table 'performers');
create table albums using org.apache.spark.sql.cassandra options (cluster 'Test Cluster', keyspace 'test', table 'albums');

 

Now run the following query and measure its performance:

select p.name,
       coun t(*) as num
from performers p
join albums a
on p.name = a.performer
group by p.name
order by num desc
limit 20;
 

Additional testing conditions:

  • The queries were run against each database simultaneously
  • I separately measured performance of "cold" and "warm" queries. "Cold" queries were run using the following procedure:
    1. Stop Spark SQL Thrift Server.
    2. Stop Cassandra.
    3. Start Cassandra.
    4. Start Spark SQL Thrift Server.
    5. Run the query.

    In contrast, "warm" queries were consequently run right after a "cold" query.

  • Each query was run three times, with an average value and a standard deviation having been calculated.

Results of the tests are presented below:

 

Solution

Cold queries

Warm queries

Cold/Warm AVG %

AVG, s

% of best

SD, s

SD %

AVG, s

% of best

SD, s

SD %

Cassandra 2.1 + Spark 1.4.1

39.4

104%

1.1

2.8%

28.3

102%

0.9

3.2%

139%

Cassandra 3.0 + Spark 1.6.1

38.0

100%

2.3

6.1%

27.8

100%

0.4

1.4%

137%

DSE 4.8 (Manual Tables)

45.6

120%

0.7

1.5%

32.3

116%

0.6

1.9%

141%

DSE 4.8 (Automatic Tables)

57.4

151%

1.1

1.9%

40.8

147%

1.1

2.7%

141%

DSE 5.0 (Manual Tables)

50.5

133%

0.6

1.2%

33.2

119%

0.5

1.5%

152%

DSE 5.0 (Automatic Tables)

50.2

132%

2.1

4.2%

36.0

129%

1.1

3.1%

139%

 

As you see, the results are more or less consistent; in most tests, standard deviation is less than 4%. In the worst case (“cold” queries for Cassandra 3.0 + Spark 1.6.1) standard deviation equals 6.1%.

To sum things up:

  1. “Cold” queries are about 40-50% slower than “warm” queries (ranging from 37% for Cassandra 3.0 + Spark 1.6.1 to 52% for manual tables for DSE 5.0).
  2. Community Edition Cassandra 3.0 + Spark 1.6.1 seems to be slightly faster than Cassandra 2.1 + Spark 1.4.1, but the difference is less than 5%.
  3. DataStax Enterprise is always slower than Community Edition (ranging from 14% for “warm” queries for DSE 4.8 to 32% for “cold” queries for DSE 5.0).
  4. DSE automatic tables are slower than manual tables. In DSE 4.8 they are about 25% slower. As previously mentioned, automatic tables in DSE 4.8 have an issue with UUID values. In DSE 5.0 automatic tables seem to be improved: for “warm” queries they are about 8% slower and for “cold” queries they are roughly the same. Also, the issue with UUID values has been finally fixed.
  5. Comparing DSE 4.8 and DSE 5.0, automatic tables became certainly faster in DSE 5.0 (for about 15%), but manual tables in DSE 5.0, surprisingly, demonstr ate worse results than in DSE 4.8 (for about 11% for “cold” queries and about 3% for “warm” queries).
  6. If you are looking for speed and additional DSE features are not really important for you, Community Edition Cassandra 3.0 + Spark 1.6.1 is probably the best choice.

Possibly those results may be improved by some additional configuration tuning, but that was out of scope of this investigation. Personally, I was surprised by poor DataStax Enterprise results, especially for DSE 5.0. Share your thoughts!