Monday, January 5, 2009

Using Tungsten Replicator Filters to Implement MySQL Time-Delayed Replication

Time-delayed replication is a useful feature that allows a slave to lag a fixed amount of time behind a master, thus providing a time window to recover from disasters like deleting a 10 million line table by accident. You just run over to the slave, turn off replication, and recover lost data, as the delayed updates mean it has yet to see your deadly mistake. It's a simple way to protect your administrative honor as well as your job.

Time-delayed replication has been on the MySQL to-do list since at least 2001. It's currently scheduled for release 6.0 and the fix is included in recent OurDelta builds. However, there's a very simple way to get the feature with Tungsten Replicator filters. This works for unadulterated MySQL 5.0 and 5.1 releases.

I wrote about filters in a previous post on the pluggable Tungsten Replicator architecture. Filters are hooks into the replicator that allow your code to examine, change, and drop events before applying them to another database. Here's the Java interface:
public interface Filter extends ReplicatorPlugin
{
public ReplDBMSEvent filter(ReplDBMSEvent event)
throws ReplicatorException, InterruptedException;
}
Is there a simple way to use a filter for time-delayed replication? Actually, there is. It turns out that the ReplDBMSEvent passed into the filter includes a "source timestamp" that tells when the event was pulled from the log. Tungsten reads binlog events almost instantaneously, so we can use the source timestamp to approximate the time at which the update was written on the master. If we also assume that clocks are synchronized between master and slave hosts, it's a matter of minutes to write a simple filter that implements time-delayed replication.

The filter itself is surprisingly short, because all we have to do is compute how long until the SQL should be applied and then go to sleep for that period of time. There are no complex API calls or management gyrations because our filter runs inside the replicator. This took about 20 minutes to write in Eclipse. (By contrast, this blog article is now 2 hours and counting.) Here is all the code you need.
package com.continuent.tungsten.replicator.filter;

import java.sql.Timestamp;
import org.apache.log4j.Logger;
import com.continuent.tungsten.replicator.ReplicatorException;
import com.continuent.tungsten.replicator.event.ReplDBMSEvent;
import com.continuent.tungsten.replicator.plugin.PluginContext;

/**
* Filter to delay a transaction until a particular point in time has passed.
* The time delay filter uses the originating timestamp from the replication
* event to just time delays. We assume that clocks are synchronized to within
* some reasonable precision between event producers and consumers.
*/
public class TimeDelayFilter implements Filter
{
private static Logger logger =
Logger.getLogger(TimeDelayFilter.class);
private long timeDelayMillis = 0;

/**
* Sets the time delay in seconds.
*/
public void setDelay(long timeDelaySeconds)
{
timeDelayMillis = timeDelaySeconds * 1000;
}

public ReplDBMSEvent filter(ReplDBMSEvent event)
throws ReplicatorException, InterruptedException
{
// Compute the interval that we should delay.
Timestamp sourceTstamp = event.getSourceTstamp();
long futureTime = sourceTstamp.getTime() + timeDelayMillis;
long intervalMillis = futureTime - System.currentTimeMillis();

// Sleep until it is time to deliver this event. We let
// InterruptedException flow through or the replicator will not
// be able to shut down.
if (intervalMillis > 0)
Thread.sleep(intervalMillis);

return event;
}

public void configure(PluginContext context) throws ReplicatorException {
logger.info("Time delay filtering: event delivery delay set to "
+ (timeDelayMillis / 1000) + " seconds");
}

public void prepare(PluginContext context) throws ReplicatorException {
}

public void release(PluginContext context) throws ReplicatorException {
}
}
User-written code lives in a directory called lib-ext. If you write your own filters you'll need to compile them into a JAR file using javac/jar. You need to put the JAR into the lib-ext directory of any slave replicator that uses the filter. Then you tell the replicator to run your filter by updating file replicator.properties, which contains all the configuration settings for Tungsten Replicator.

First, we tell the replicator that there is an "applier-side" filter that will run on events that are applied to the slave database. The property setting looks like this--it specifies that a filter nicknamed "delay" that processes events before we apply them.
# Post-storage filter selection.  Value must be one or more comma-separated
# logical filter names.
replicator.postfilter=delay
We now need to define the "delay" filter, which requires two more entries in replicator.properties, so that our filter will delay events by 5 minutes (300 seconds). Tungsten uses a very simple protocol for configuring filters--it reads the 'delay=300' property and assigns this automatically using the corresponding variable setter on the filter class.
# Time delay filter.  Should only be used on slaves, as it delays storage
# of new events on the master. The time delay is in seconds.
replicator.filter.delay=\
com.continuent.tungsten.replicator.filter.TimeDelayFilter
replicator.filter.delay.delay=300
If you now restart the replicator as a slave, it will read the new property values and start running. So does it work? You can test by inserting a row on the master database as follows:
mysql> create table foo (id int primary key, data varchar(25),
mysql> creation_date timestamp);
Query OK, 0 rows affected (0.04 sec)

mysql> insert into foo(id, data) values(9, 'HELLO');
Query OK, 1 row affected (0.01 sec)
Now go over to the slave and try to select the row:
mysql> select * from foo;
ERROR 1146 (42S02): Table 'mats.foo' doesn't exist
It's not there. In fact, the table has not arrived yet either. However, if you try again in 5 minutes, you will get the following:
mysql> select * from foo;
+----+--------+---------------------+
| id | data | creation_date |
+----+--------+---------------------+
| 9 | HELLO | 2009-01-05 16:13:33 |
+----+--------+---------------------+
1 row in set (0.00 sec)
As this post shows, filters are a very flexible way to bend Tungsten replication to your will without writing a lot of code. I did a demo of this filter to a friend and we thought of a number of variations on the time-delay idea--one of the more interesting ones is to call somebody to approve SQL statements that look dangerous. The filter would hold them up until this occurs.

The main objection to the Tungsten filter approach is that it requires you to implement using Java. This does not bother me especially, but a lot of otherwise quite nice people break into hives at the thought of writing Java code. Linas Virbalas, a colleague of mine, is working on a generic filter that connects to JavaScript. I'm really looking forward to see this work. It will open up a lot of interesting doors to extend replication flexibly and easily using scripts.

p.s., You don't have to write this filter yourself. The time-delay filter is checked into our SVN repository and will be released in Tungsten 1.0 beta-4, due out on or around January 9th, 2009. Stop by our community website learn more. Downloads are located on the Continuent Forge.

4 comments:

Roland Bouman said...

Hi!

interesting stuff....very interesting. Just out of curiosity - suppose I would want to keep track of history to do change data capture for datawarehouse loading, is this something you would implement with a filter too? I mean, the fact that events are tagged with a timestamp suggests the filter could perhaps apply the event and store the timestamp in the target too...

Another question I had is this...In your code you do:

if (intervalMillis > 0)
Thread.sleep(intervalMillis);

If I understand correctly, the filter object has to hang on before it can apply the event, and can be reused by another event once it has applied the one it was delaying...What worries me is that this may cause a huge amount of filters to be created to delay incoming log events...won't you run out of memory soon? What if you do? How can you recover from such a replication failure? Perhaps replication events are forced into a sequence, effectively serializing the stream of change events?

(perhaps silly questions but I am pretty new to this topic so bear with me pls.)

kind regards,

Roland Bouman

Baron said...

And for those who want it the old-fashioned way (watch slave delay, start, stop, start, stop) there's mk-slave-delay in Maatkit, too.

Robert Hodges said...

Hi Roland,

On your first question, yes, you could use a filter for this purpose. However, it would be easier to write a new applier that produces the SQL you want want. I will write an article about this in the near future.

On the second question--there is only a single thread that applies updates serially. This is necessary to preserve update order, though in a future release we will allow updates to apply in parallel. It follows that there is only a single copy of the filter as well, so there is no problem with running out of memory. The filter blocks on the first event to come along and releases it when the time delay elapses.

Thanks for the questions!

Robert Hodges said...

Hi Baron,

My article was severely deficient in not mentioning Maatkit. Its capabilities are quite inspiring--as you know we would like to emulate some of them within Tungsten Replicator.

Cheers, Robert

Scaling Databases Using Commodity Hardware and Shared-Nothing Design