Querying CDC Streams

Some use cases for CDC may require querying the log table periodically in short intervals. One way to do that would be to perform partition scans, where you don’t specify the partition (in this case, the stream) which you want to query, for example:

SELECT * FROM ks.t_scylla_cdc_log;

Although partition scans are convenient, they require the read coordinator to contact the entire cluster, not just a small set of replicas defined by the replication factor.

The recommended alternative is to query each stream separately:

SELECT * FROM ks.t_scylla_cdc_log WHERE "cdc$stream_id" = 0x365fd1a9ae34373954529ac8169dfb93;

With the above approach you can, for instance, build a distributed CDC consumer, where each of the consumer nodes queries only streams that are replicated to Scylla nodes in proximity to the consumer node. This allows efficient, concurrent querying of streams, without putting strain on a single node due to a partition scan.

Learning about available streams

To query the log table without performing partition scans, you need to know which streams to look at. For this you can use the system_distributed.cdc_description table.

Example: querying the CDC description table

  1. Retrieve the timestamp of the currently operating CDC generation from the cdc_description table. If you have a multi-node cluster, query the table with QUORUM or ALL consistency level so you don’t miss any entry:

    CONSISTENCY QUORUM;
    SELECT time FROM system_distributed.cdc_description;
    

    The query can return multiple entries:

     time
    ---------------------------------
     2020-03-25 16:05:29.484000+0000
     2020-03-25 12:44:43.006000+0000
    
    (2 rows)
    

    Take the highest one. In our case this is 2020-03-25 16:05:29.484000+0000.

  2. Retrieve the list of stream IDs in the current CDC generation. Unfortunately, to use the time-date value in a WHERE clause, you have to modify the format of the time-date a little by removing the last three 0s before the +. In our case, the modified time-date is 2020-03-25 16:05:29.484+0000:

    CONSISTENCY QUORUM;
    SELECT streams FROM system_distributed.cdc_description WHERE time = '2020-03-25 16:05:29.484+0000';
    

    The result will be a huge list of values. Below we’ve shown just a couple of them:

     streams
    -------------------------------------------------------------------------------
     {0x365fd1a9ae34373954529ac8169dfb93, 0x0521d5ce4a4a8ca552f83d88a1ae55d2, ... }
    
    (1 rows)
    
  3. Use the obtained stream IDs to query your CDC log tables:

    CREATE TABLE ks.t (pk int, ck int, v int, primary key (pk, ck)) WITH cdc = {'enabled': true};
    INSERT INTO ks.t (pk, ck, v) values (0, 0, 0);
    SELECT * FROM ks.t_scylla_cdc_log WHERE "cdc$stream_id" = 0x0521d5ce4a4a8ca552f83d88a1ae55d2;
    

    returns:

     cdc$stream_id                      | cdc$time                             | cdc$batch_seq_no | cdc$deleted_v | cdc$operation | cdc$ttl | ck | pk | v
    ------------------------------------+--------------------------------------+------------------+---------------+---------------+---------+----+----+---
     0x0521d5ce4a4a8ca552f83d88a1ae55d2 | d027a354-6eba-11ea-cceb-86c21186de4a |                0 |          null |             2 |    null |  0 |  0 | 0
    
    (1 rows)
    

Query all streams to read the entire CDC log.

Reacting to topology changes

As explained in CDC Stream Generations, the set of used CDC stream IDs changes whenever you bootstrap a new node. You should then query the CDC description table to read the new set of stream IDs and the corresponding timestamp.

If you’re periodically querying streams and you don’t want to miss any writes that are sent to the old generation, you should query it at least one time after the old generation stops operating (which happens when the new generation starts operating).

Keep in mind that time is relative: every node has its own clock. Therefore you should make sure that the old generation stops operating from the point of view of every node in the cluster before you query it one last time and start querying the new generation.

Example: switching streams

Suppose that cdc_description contains the following entries:

 time
---------------------------------
 2020-03-25 16:05:29.484000+0000
 2020-03-25 12:44:43.006000+0000

(2 rows)

The currently operating generation’s timestamp is 2020-03-25 16:05:29.484000+0000 — the highest one in the above list. You’ve been periodically querying all streams in this generation. In the meantime, a new node is bootstrapped, hence a new generation appears:

 time
---------------------------------
 2020-03-25 16:05:29.484000+0000
 2020-03-25 17:21:45.360000+0000
 2020-03-25 12:44:43.006000+0000

(3 rows)

You should keep querying streams from generation 2020-03-25 16:05:29.484000+0000 until after you make sure that every node’s clock moved past 2020-03-25 17:21:45.360000+0000. One way to do that is to connect to each node and use the now() function:

$ cqlsh 127.0.0.1
Connected to  at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.0.8 | CQL spec 3.3.1 | Native protocol v4]
Use HELP for help.
cqlsh> select totimestamp(now()) from system.local;

 system.totimestamp(system.now())
----------------------------------
  2020-03-25 17:24:34.104000+0000

(1 rows)
cqlsh>
$ cqlsh 127.0.0.4
Connected to  at 127.0.0.4:9042.
[cqlsh 5.0.1 | Cassandra 3.0.8 | CQL spec 3.3.1 | Native protocol v4]
Use HELP for help.
cqlsh> select totimestamp(now()) from system.local;

 system.totimestamp(system.now())
----------------------------------
  2020-03-25 17:24:42.038000+0000

(1 rows)

and so on. After you make sure that every node uses the new generation, you can query streams from the previous generation one last time, and then switch to querying streams from the new generation.