The first way to get good performance with Tungsten is to have the right workload. As explained in an earlier article on this blog, Tungsten parallel replication works by replicating independent databases (aka shards) in parallel. Here is a picture that summarizes what is going on.
If you have a lot of schemas, if the updates are distributed evenly across schemas, and if you don't have many dependencies between schemas that require full serialization, parallel replication can speed things up significantly for I/O-bound workloads. For example, Tungsten runs three times faster than MySQL native replication on large datasets when the slave is catching up to the master following mysqld restart.
Catch-up is a famous slave lag case and one where Tungsten can be quite helpful. (I think we will be faster in the future, but this is a good start.) Nevertheless, there's a chance you'll need to do a bit of tuning to see such benefits yourself.
Tungsten currently uses a structure called a parallel queue to enable parallelization. The parallel queue typically sits at the end of a replicator pipeline in front of the parallel apply threads, as shown in the following handy diagram.
One key to getting decent parallel replication performance is to watch the parallel queue in operation. In Tungsten Replicator 2.0.1 we introduced a new status command
trepctl status -name storesthat goes a long way to help diagnose how well parallel replication is performing. Here's a typical example using a 6 channel queue store.
$ trepctl status -name stores Processing status command (stores)... NAME VALUE ---- ----- criticalPartition : -1 discardCount : 0 eventCount : 3217 maxSize : 1000 name : parallel-queue queues : 6 serializationCount: 1 serialized : false stopRequested : false store.queueSize.0 : 0 store.queueSize.1 : 480 store.queueSize.2 : 310 store.queueSize.3 : 1000 store.queueSize.4 : 739 store.queueSize.5 : 407 storeClass : com.continuent.tungsten.enterprise.replicator.store.ParallelQueueStore storeSize : 2936 syncEnabled : true syncInterval : 100 Finished status command (stores)...
The two most important things to look at are distribution of transactions across queues and serialization. Let's start with transaction distribution. In this particular example we were running a parallel queue with 6 channels but only 5 databases. The distribution therefore looks pretty good. One queue is empty but the other have a fairly even distribution of transactions.
Notice that one queue has exactly 1000 transactions. In Tungsten Replicator 2.0.1 the parallel queue has a maximum size parameter (maxSize), which is set to 1000 for this example run. Once an individual queue hits the maxSize limit, the entire parallel queue blocks. It is not uncommon to see one queue blocking in this way if the replicator is catching up, which is exactly what is happening here. In fact, if the queues are all empty it is possible Tungsten is somehow not supplying transactions to the queue fast enough. That is not a problem here.
Bad workloads on the other hand tend to have a lot of transactions in one or two queues and few or none in all the rest. The following is an example of a possibly bad distribution.
$ trepctl status -name stores Processing status command (stores)... NAME VALUE ---- ----- ... store.queueSize.0 : 0 store.queueSize.1 : 4 store.queueSize.2 : 3 store.queueSize.3 : 972 store.queueSize.4 : 0 store.queueSize.5 : 15 ... Finished status command (stores)...
If you see such skewed distributions persistently, you may want to try to adjust the queue partitioning using the
shard.listfile. The default parallel queue partitioning algorithm hashes shards into channels. This does not always gives optimal performance if your shards mostly happen to hash into the same channel. The other possibility is that the workload is just badly distributed across databases.
You can decide whether the workload or partitioning is at fault using the
trepctl status -name shardscommand. Here's an example.
$ ./trepctl status -name shards Processing status command (shards)... NAME VALUE ---- ----- appliedLastEventId: 000007:0000000000000384;20 appliedLastSeqno : 1471201 appliedLatency : 0.0 eventCount : 6 shardId : #UNKNOWN stage : d-pq-to-dbms NAME VALUE ---- ----- appliedLastEventId: 000005:0000000326365895;41 appliedLastSeqno : 1470999 appliedLatency : 0.0 eventCount : 311605 shardId : db1 stage : d-pq-to-dbms NAME VALUE ---- ----- appliedLastEventId: 000005:0000000326512277;95 appliedLastSeqno : 1471200 appliedLatency : 0.0 eventCount : 298522 shardId : db2 stage : d-pq-to-dbms ...
This shows that the distribution of transactions between the db1 and db2 databases is pretty even. If you have many databases with roughly even values in the eventCount parameter, the workload is well suited for parallelization. In that case you may want to assign shards explicitly in the shard.list file if you don't like the distribution in the parallel queue.
Meanwhile, the previous example shows an example of another potential problem. We also see counts for #UNKNOWN, which is a special shard ID that means "I could not tell what schema this is." #UNKOWN transactions can occur if Tungsten cannot parse a SQL statement properly or there is a transaction that updates multiple schemas. In either case, Tungsten serializes the parallel queue.
However it occurs, serialization is a performance killer because it means we have to block the parallel queue until all parallel transactions complete, execute one or more transactions serially, and then reopen the parallel queue. You can see how often this is happening from the serializationCount value on trepctl status -name stores. For many workloads a serializationCount value that is more than a few percent of the number in eventCount means the entire transaction stream is effectively serialized.
If the serialization is occurring due to #UNKNOWN shards, you may be able to improve things using a new value in the replicator service properties file that was added in version 2.0.1. It controls whether we assign the shard ID using the default schema even if Tungsten cannot tell from the SQL command what you are doing.
# Policy for shard assignment based on default database. If 'stringent', use # default database only if SQL is recognized. For 'relaxed' always use the # default database if it is available. replicator.shard.default.db=relaxed
Setting the parameter to
relaxedcan help quite a bit if the problem is due to unusual SQL that confuses the parser. On one workload we were able to reduce the serializationCount from about 10% of transactions to 0% in this way. We then saw the expected speed-up from parallel replication.