A Mixture of Musings

: BigData, Programming

Understand Cassandra by Implementing It

Over the years Cassandra has begun to look more and more like a normal SQL database, which is why people are increasingly confused using it, as it’s various limitations on what can be used in WHERE clauses; peculiar performance characteristics; and odd data-modelling advice, are all contrary to ones expectations of what a SQL RDMS should look like.

In this post I’ll go through how one would implement a database like Cassandra, in very high-level terms, which should hopefully explain why it works the way it does, and give a better insight into how data should be modelled.

Basic Tables and Queries

At the simplest level, Cassandra is a map (or dictionary) from a key to a value. Using some Scala-ish pseudocode here’s how we’d define a simple table

type CdbTable = HashMap[K <: Hashable,V]

Where K is the type of our key, and V is the type of our value. In this pseudo code we have to explicitly say things are Hashable, and we presumble Hashable things are Equatable (i.e. have an equalsTo method)

So we could say simply store the word-counts for books as

wordCounts : CdbTable[String,Int] = new HashMap() 

But lets say we wanted to have more than one lookup value: lets say we wanted word counts for each book by each author. To do this we have to create a composite key type

class HKeySet(keys : List[Hashable]) : Hashable {

	override def hash() : Int {
		// return the hash of the hashes the keys
	}
}

type CdbTable = HashMap[HKeySet,V]

Now our word-count database is

wordCounts : CdbTable[HKeySet,Int] = new HashMap()

With this model we can do lookups by existence: e.g. fetch the word-count for this book by that author; however we can’t do range queries. If we had a third item in our key, chapter say, and we wanted to get the word counts for chapters 2 <= c <= 10 we would have to manually enumerate all values and do a lookup for each one, which is costly1.

What if we want to delete something? Well one option is just to remove it entirely from the HashMap, but lets say – for the sake of argument – that we’re using some fancy thread-safe concurrent HashMap that doesn’t support deletion. In that case we need a marker to say when a value is gone. So we write an “envelope” for our value with this metadata

class Envelope[V] {
	var wrappedValue : V
	var timeStamp : TimeStamp
	var ttl : Option<TimePeriod> // maybe delete this after a while
	var deleted : Bool
	
	Envelope(theWrappedValue) {
		wrappedValue = theWrappedValue
		timeStamp    = now()
		deleted      = false
		ttl          = None
	}
}

The timestamp is updated on every change and is for… explaining later. The time-to-live (TTL) is a special Cassandra feature. Both are unimportant.

Our table of word-counts by book and author thus becomes

wordCounts : CdbTable[HKeySet,Envelope[Int]] = new HashMap()

And if we delete something we can just set the deleted property to true. Of course this leaves a key in the HashMap recording the existence of our long-lost value, like a tombstone in a graveyard, which we hope will be eventually removed by some background process2.

But lets go back to the chapter problem. Lets say we decide to add a second layer of keys to our database, that allow ranges. This time we choose to use a Map that keeps things compactly ordered on disk and in memory. The easiest implementation is a sorted associative array

class ArrayMap[K <: Comparable,V] {
	var entries : ArrayList[Tuple2[K,V]]
		
	def insert(key : K, value : V) {
		// binary search for key's position i
		// shift things right to make room
		entries[i] = (key, value)
	}
	
	def get(key : K) : Option[V] {
		// binary search for key's position
		// return the value or None if its absent
	}
}

In order for this to work, our keys have to be comparable, which is ideal for range queries.

Our table is now:

type CdbTable = HashMap[HKeySet,ArrayMap[CKeySet, Envelope[V]]]

The type CKeySet is a list of keys like HKeySet, except in this case it’s Comparable rather than Hashable, and it compares from the outside in i.e. the map is sorted by the first key, then the second, then the third, and so on.

With values ordered in memory, it should now be a lot easier to do range queries, as we can just use the HashMap lookup to fetch a block of values (a partition of the total values) and then iterate through the second ArrayMap.

Hence in Cassandra when one writes


CREATE TABLE word_counts (
	title   Text,
	author  Text,
	edition Int,
	chapter Int,
	count   Int
	PRIMARY KEY ((title, author), edition, chapter)
)

INSERT INTO word_counts(
		title, author, edition, chapter, count)
	VALUES ("Misery", "S. King", 2, 12, 3040)

You should now understand that this is represented as

word_counts : CdbTable = ....
word_counts.getOrDefault(new HKeySet(["Misery", "S. King"])
           .put(new CKeySet([2, 12]), new Envelope(3040))

In Cassandra terminology the “partition” key is the HKeySet in the HashMap and identifies a particular block (“partition”) of rows3 and the “cluster key” is the CKeySet in the ArrayMap that identifies the specific subset of rows in that partition. Together these form the primary key.

With this insight it should be clear why:

  1. It’s strongly advised to use all parts of a partition key when doing a lookup
  2. It’s often better to use cluster key for range queries. (However note that if your Hash function is ordered with respect to values, you can do a range query on hashes using the TOKEN function which returns the hash value of input)
  3. There are so many edge cases in Cassandra where certain where-clauses (e.g. IN) are allowed on the last column only.
  4. You can only order on keys; why it’s recommended to use the cluster-keys for ordering; and why you can specify whether the cluster-key ordering is ascending or descending when creating the table4

It should also be clear that doing a range query, even on a cluster-key, can involve iterating through an entire partition, which can be costly. Hence Cassandra often explicitly requests that you add ALLOW FILTERING to range-based WHERE queries to force the developer to indicate to the server that they realise the query could take a long time to execute.

 Distributed Computing

At this point we have a solid basis for a simple database that supports range queries and existence queries.

The Cassandra developers went one step further of course, they wanted to distribute this across several machines.

At the time Cassandra was written, several distributes HashMaps already existed (ehCache, Memcache etc). So it was straightforward for the Cassandra developers to use this framework. Hence our model is

type CdbTable = DistributedHashMap[HKeySet,ArrayMap[CKeySet, Envelope]]

And now different HashMap keys’ values – the partitions – are stored on different machines.

But what if a machine dies? To work around such cases, writes are replicated, usually to three machines, with control typically returning when a majority, or “quorum”, of those writes have succeeded.

When reading, you query all the machines, and once a quorum of machines have returned values5, return that one which has the most recent timestamp6.

This means of course that it’s possible to obtain a stale value since a machine busy doing a write may not return before the time-limit is up. This is why Cassandra offers guaranteed availability and partition-tolerance but not data-consistency (the idea that the most recent read reflects the most recent write). By contrast Redis offers consistency and partition-tolerance, but therefore cannot guarantee availability within a time-limit. For more on these trade-offs the CAP Theorem page on Wikipedia is a good place to start.

This also is why map-reduce works on Cassandra. Since the data is replicated in partitions (equivalent to “blocks” in HDFS), it’s possible to dispatch code to each machine, which works on the blocks stored there, and then report the individual results back to a single machine for reduction to the final output.

This approach of dispatching (“scattering”) requests out to several machines, and then collecting (“gathering”) the results is usually referred to as a scatter/gather pass when discussing Cassandra performance.

 Column Families, not Column Orientation

I mentioned storage when talking about distribution. Storage in Cassandra is something that’s worth looking into

Lets say that instead of a single value like “word_count” we actually have many different values in our table, say

CREATE TABLE authors (
	id         UUID,
	name       Text,
	email_user Text,
	email_host Text,
	roles      Set<Text>,
	town       Text,
	age        Int
	PRIMARY KEY (id)
)

-- Note we skip a few columns here
INSERT INTO authors (id, name, email_user, email_host, roles)
	VALUES(uuid(), "S. King", "sking", "gmail.com", { "Writer" } )

A common misconception with Cassandra is that it is a column-orientated database, which is to say that data is stored as a collection of multi-row columns, rather than the usual approach of multi-column rows. Column orientation does make compression very easy – one can imagine 90% of the email_host column above is just gmail.com – and is ideal for OLAP7 workloads where you read in a large portion of the data for a small subset of columns.

It should be clear, since the primary key, in both its partition and cluster key parts, identifies a single row, that Cassandra is row-orientated. However Cassandra rows don’t store all columns, they only store a subset of columns: in the example above we skipped age and town for example. The subset of columns is called a column-family. Since we only store a subset, we need the column names as well as values, which is just a HashMap[ColName,Any]

Hence our true representation is really

type CdbTable = DistributedHashMap[HKeySet,ArrayMap[CKeySet,HashMap[ColName,Envelope[Any]]]]

Note that each column value has its own envelope, and consequently its own deletion marker and its own time-stamp8.

This does lead to a special issue however. Cassandra stores NULL values as tombstones: i.e. empty envelopes with the deletion marker set to true. So for example the following stores 5 values in a single column family

-- note age and town columns are absent in the column list
INSERT INTO authors (id, name, email_user, email_host, roles)
	VALUES(uuid(), "S. King", "sking", "gmail.com", { "Writer" } )

whereas the query below stores 7 values in a single column, two of which are tombstones which will be removed later in a compaction.

INSERT INTO authors (id, name, email_user, email_host, roles, town, age)
	VALUES(uuid(), "S. King", "sking", "gmail.com", { "Writer" }, NULL, NULL )

As a result, when using ORMs, it’s important to pay attention to how they handle nulls, to avoid excessively bloating out the size of your database, and increasing the time taken in compactions.

Another thing that should be clear from this is that adding columns to a Cassandra table is very very9 cheap. All it does is change the schema.

Indexes

At this stage it should be clear now why you can only have partition and cluster keys as WHERE clauses in your query. If you want to add a field in the column family you need to add an index. Indexes in Cassandra are stored as part of the structure that holds the ArrayMap in a single partition, which is identified by the partition-key part of the primary key.

class Partition (
	contents : ArrayMap[CKeySet,Map[ColName, Envelope[Any]]], 
	indexes : Map[ColName, Map[Any, CKeySet]]
) {

}

type CdbTable = DistributedHashMap[HKeySet, Partition]

A quick thing to note is that if you query on something that’s not a part of the partition key, you will go through every index on every single partition . This is hugely costly, particularly in a distributed HashMap where it means hitting and locking every single machine. In such cases, as with range-queries and IN queries, it’s best to create a second table with the same data using a different primary key structure tailord to the query. Historically this was done manually, but in very recent versions Cassandra can automate this using materialized views

 What an SSTable actually is

An SSTable, or “sorted-strings table”, is how Cassandra stores information on disk. An SSTable is

  1. Immutable, it cannot be changed
  2. Represents only a fixed number of rows10

If it’s immutable, how does Cassandra handle mutations? The answer is that it stores changes. So for example

UPDATE authors where UUID="xxxx-uuid" set name="Stephen King"

Will just store an additional record noting that the name field is now Stephen King. This means our true (and thankfully final) representation is

class Partition (
	contents : ArrayMap[CKeySet,List[Map[ColName, Envelope[Any]]]], 
	indexes : Map[ColName, Map[Any, CKeySet]]
) {

}

type CdbTable = DistributedHashMap[HKeySet, Partition]

and the series of SQL commands

INSERT INTO authors (id, name, email_user, email_host, roles)
	VALUES("xxx-uuid", "S. King", "sking", "gmail.com", { "Writer" } )
UPDATE authors where UUID="xxxx-uuid" set name="Stephen King"

corresponds to the following code

// no cluster key
authors.getOrDefault("xxxx-uuid")
       .contents
       .getOrDefault(NULL_CLUSTER_KEY)
       .add ({
			"name"        -> new Envelope ("S. King"),
			"email_user"  -> new Envelope ("sking"),
			"email_host"  -> new Envelope ("gmail.com"),
			"roles"       -> new Envelope ({ "Writer" })	
		})
authors.get("xxxx-uuid")
       .contents
       .get(NULL_CLUSTER_KEY)
       .add ({
       	"name" -> new Envelope ("Stephen King")
       })

So to retrieve a single record Cassandra has to go through the list of updates, keeping track of individual column-values’ timestamps, until it has recovered a complete record.

It is this list of updates that is periodically serialized from memory to an SSTable on disk.

Outdated information is eventually discarded by a background compaction service operating on these SSTables. This happens automatically (e.g. see the gc_grace_seconds configuration flag) or can be triggered automatically using the nodetool compact command.

It’s this process of aggregating delta-updates in memory, and periodically serializing them, that makes Cassandra so fast at writes. Of course if a machine dies before it’s had the opportunity to flush updates to disk then that information is lost, but only on that machine. Ideally the other machines to which data has been replicated will not also crash before flushing the update to disk.

If you want to have a look at an SSTable you can convert to human-readable JSON using SSTabledump. To get the current state of the database you can use nodetool flush to force a flush to disk. Finally You can also see how much of your table consists of uncompacted tombstone markers using SSTablemetadata

Conclusions

I hope this helps explain all the peculiar ways that Cassandra differs from standard SQL databases.

The idea of a collection of nested HashMaps is probably the best way of thinking about how Cassandra works, how to model data within it, and why certain operations are fast and other operations are slow.

It goes without saying that the true implementation of Cassandra is radically different to this, but this should give an insight into its architecture.

Addendum: Collections & Counters

I’ve mentioned already that in Cassandra usage it’s pretty common to denormalised tables, and have a table for every possible query11. However one final thing that needs mentioning is that in the layout I’ve mentioned thus far, the type of value is unbounded, it can be anything.

In practice, Cassandra’s support for datatypes is very rich. As well as the usual number, text, timestamp and UUID types, it also supports maps, sets, lists and tuples.

Hence in place of the many-to-many tables relational DBs often set up, you can just often just create one to many tables of IDs, e.g.

CREATE TABLE song_tags (
	song_id  UUID
	tags     SET<TEXT>
	PRIMARY KEY (song_id)
)

If you ever want to use a set, list, map or tuple of values as a partition-key or (less advisedly) a cluster key you need to “freeze” it into a blob.

Another esoteric type offered by Cassandra is the COUNTER type. This can only be updated, and the update can only increment it by some amount. In a table with a counter, every other column has to be part of the primary key. Counters cannot be indexed.


  1. Of course this is where quick and space-efficient key-existence checks like Bloom filters come in handy

  2. The real reason for tombstones is a combination of the fact that Cassandra uses immutable files for storage, that it replicates and distributes updates over several machines for safety, and that it offers eventual consistency when reading potentially outdated versions from a random subset of those machines. Due to all this, the only safe way to delete something is to write a deletion marker and wait for it to be replicated across the appropriate machines. Note that Cassandra stores a marker for each column separately, so setting a single column to null will create a column-specific tombstone. For more on tombstones, read this

  3. Stored on a particular machine

  4. A particularly neat trick is that one can use the fact that data are explicitly ordered by cluster-keys and so just use LIMIT clauses instead of ORDER BY clauses. It’s very easy to arrange for a table to have the most recent entries at the very top for example.

  5. In fact this can be configured. It’s called the consistency-level, where typically values are ONE to return the very first thing returned, QUORUM to wait for a majority of machines to return, and ALL to wait for all machines to return. The latter causes Cassandra to act much more like a Consistent-Partitioned database like Redis instead of the Available-Partitioned database it’s designed to be. Consistency level affects both writes and reads.

  6. You can directly access this yourself by using the WRITETIME function. This will report the most recent time a value for any column was written.

  7. OLAP, or online analytical processing, refers to database tasks that generate summary statistics (SUM, AVG, etc.) according to certain criteria (or in the lingo, summaries of measures for certain dimensions). OLTP, or online transactional processing, is about adding, deleting a mutating entire records, and collections of records, at a time in a safe, consistent way where every read perfectly reflects the most recently completed write.

  8. You might think this means that the WRITETIME() function has to inspect all columns of a row in order to return the most recent timestamp. In fact if you use the low-level list() function on a table you’ll see every write includes a special value to a column with no-name. This anonymous column represents the overall list of columns, and it’s this one that it queried to get the most recent timestamp. You can also get the timestamp for a particular column by selecting WRITETIME(col_name).

  9. Compared to standard RDMSs. By Cassandra’s standards it’s relatively expensive as the cluster is locked while the schema change percolates through. Nevertheless, we’re talking a second or two compared to what could be hours or days in RDMSs with the same amount of data.

  10. This can actually be tuned by different CompactionStrategy classes. SizeTieredCompactionStrategy is the default, but if you do a lot of work with time-series, it may make sense to use DateTieredCompactionStrategy in conjuction with a specific cluster key ordering to make the most recent data available first.

  11. This data-modelling guide gives a decent overview of what data-modelling on Cassandra involves for those coming from relational dtabases.