Apr 20, 2010

Customized Data Movement with Tungsten Replicator Pipelines

Have you ever run into a problem where MySQL replication did 95% of what you needed but not the remaining 5% to solve a real problem?  Hacking the binlog is always a possibility, but it typically looks like this example.  Not a pretty sight.  Wouldn't it be easier if replication were a bunch of building blocks you could recombine to create custom replicator processes? 

Tungsten 1.3 has a new feature called pipelines that allows you to do exactly that.  A pipeline consists of one or more stages that tie together generic components to extract, filter, store, and apply events, which is Tungsten parlance for transactions.  Each stage has a processing thread, so multi-stage pipelines can process data independently and without blocking.  The stages also take care of important but tedious issues like remembering the transactional state of each stage so Tungsten can restart without forgetting events or applying them twice.

Here is a picture of how a pipeline is put together.

When Tungsten Replicator starts it loads a pipeline corresponding to its role, for example master or slave.   The preceding picture shows a slave pipeline consisting of two stages.  The first stage pulls replicated events over the network from a master Tungsten Replicator and stores them in a local transaction history log, which we call the THL.  The second stage extracts the stored events and applies them to the database.   This pipeline is analogous to the I/O and SQL threads on a MySQL slave.

Where Tungsten departs from MySQL and most other replicators in a big way is that pipelines, hence the replication flows, are completely configurable.   The configuration is stored in file replicator.properties.  Here are the property settings to create the slave pipeline.  Note how the role is the name of a pipeline.  This determines which pipeline to run when the replicator goes online.

# Replicator role. 

# Generic pipelines. replicator.pipelines=master,slave,direct 

# Slave pipeline has two stages:  extract from remote THL to local THL; 
# extract from local THL and apply to DBMS. 



The syntax is not beautiful but it is quite flexible.  Here is what this definition means.
  1. This replicator knows about three pipelines named master, slave, and direct
  2. The slave pipeline has two stages called remote-to-thl and thl-to-dbms and a store called thl.  It has a property named syncTHLWithExtractor which must be set to false for slaves.  (We need to change that name to something like 'isMaster'.) 
  3. The remote-to-thl stage extracts from thl-remote.  This extractor reads events over the network from a remote replicator.  The stage apples to thl-local, which is an applier that writes events to the local transaction history log. 
  4. The thl-to-dbms stage pulls events from the local log and applies them to the database.  Note that in addition to an applier and extractor, there is also a filter named mysqlsessions.  This filter looks at events and modifies them to generate a pseudo-session ID, which is necessary to avoid problems with temporary tables when applying transactions from multiple sessions.  It is just one of a number of filters that Tungsten provides.
Components like appliers, filters, extractors, and stores have individual configuration elsewhere in the tungsten.properties file.  Here's an example of configuration for a MySQL binlog extractor.  (Note that Tungsten 1.3 can now read binlogs directly as files or relay them from a master server.) 

# MySQL binlog extractor properties.  

# When using relay logs we download from the master into binlog_dir.  This 
# is used for off-board replication. 

The thing that makes pipelines really flexible is that the interfaces are completely symmetric.  Components to extract events from MySQL binlog or from a transaction history log have identical APIs.  Similarly, the APIs to apply events are the same whether storing events in a log or applying to a slave.  Pipelines can tie together practically any sequence of extract, filter, and apply operations you can think of. 

Here are diagrams of a couple of useful single-stage pipelines. 

The "dummy" pipeline reads events directly from MySQL binlogs and just throws them away.  This sounds useless but in fact it is rather convenient.  You can use the dummy pipeline check whether your binlogs are good.  If you add filters you can also use a dummy pipeline to report on what is in the binlog.  Finally, you can use it as a quick and non-intrusive check to see if Tungsten can handle the data in your binlog--a nice way to ensure you can migrate smoothly. 

Here's the dummy pipeline definition:

# Generic pipelines. 
replicator.pipelines=master,slave,direct, dummy
# Dummy pipeline has single stage that writes from binlog to bit-bucket. 


The "direct" pipeline fetches events directly from a master MySQL server using client log requests over the network and applies them immediately to a slave.  I use this pipeline to test master-to-slave performance, but it's also very handy for transferring a set of SQL updates from the binlog of any master to any slave on the network.  For instance, you can transfer upgrade commands very efficiently out of the binlog of a successfully upgraded MySQL server to other servers on the network.  You can also use it to "rescue" transactions that are stuck in the binlog of a failed master.  That is starting to be genuinely useful. 

The definition of the direct pipeline is already in the default replicator.properties.mysql template that comes with Tungsten 1.3, so it is not necessary to repeat it here.  You can just download the software (open source version is here) and have a look at it yourself.  There's almost more documentation than people can bear--look here to find a full set.  Version 1.3 docs will be posted shortly on the website and are already available for commercial customers.   As usual you can also view the source code on SourceForge.net. 

Pipelines belong to a set of major feature improvements to Tungsten to support SaaS and large enterprise deployments.  Some of the other features include fast event logging directly to disk (no more posting events in InnoDB), low-latency WAN transfer, multi-master replication support, and parallel replication.  Stay tuned!


Justin Swanhart said...

Can you write custom filters? I need to take a binary log and capture the changes into my own tables. The THL is not useful to me in the structure that it is in.

-- given a simple table
c1 int

I create a "change log" table:

CREATE TABLE flexviews.test_t1_mvlog(
dml_type tinyint, # -1=DEL,1=INS
uow_id , # trx id

For each set of committed RBR events on the table I write the old/new tuples into the log table. I also maintain a transaction id.

I use these tables to maintain materialized views. For example, if a view:
select count(*) cnt from test.t1 was materialized, it could be maintained with something like:

insert into view_table
select sum(dml_type) cnt_diff from flexviews.test_t1_mvlog
where uow_id > @refeshed_to_uow_id
on duplicate key update t1.cnt = t1.cnt + cnt_diff

This is somewhat of an oversimplification, but I hope you understand what I need to do.

These table logs can also be used by ETL tools.

If possible, I'd rather use something like Tungsten, which specializes in replication, instead of my own custom script which uses mysqlbinlog -v.

Linas Virbalas said...

Hello Swany,

you can do exactly that (and a lot more) with Tungsten filters. There are two ways to create one:

a. Develop it in Java in the same manner as already existing filters are done (JAVA package com.continuent.tungsten.replicator.filter).

b. Develop it in JavaScript - this is faster, as you don't need to compile anything. Check this out for examples: tungsten-replicator/samples/extensions/javascript/

Linas Virbalas

Robert Hodges said...

Hi Justin!

You can generate the tables using filters as Linas said. Look in the samples directory on Tungsten Replicator for some examples of both Javascript as well as Java.

That said, it sounds as if we could implement this more easily using a custom applier that maintains transfer tables, as that's what you have. This would also take care of restart--the transfer tables would remember their position as it were. Currently appliers must be written in Java but we could change that pretty easily to support Javascript, Ruby, or any other interpreter that has a Java VM implementation.

Does flexviews have a configuration file that defines the transfer tables? I had a quick look at your code download from SourceForge but did not see it. We could of course infer this automatically by reading table schema and then doing an automatic transformation on the RBR record to drop it into the correct transfer table. That would actually be a pretty flexible implementation...I know other people who could use this including the gentleman whose blog was cited in this article.

Also, how does flexviews maintain the transfer tables? We have a problem with the current THL in that in-DBMS tables have significant management overhead as the row counts go into the [many] millions, so we are sensitive to that problem. One specific problem at scale is that you can run out of InnoDB locks if you are not careful, which results in lock wait timeouts when updating tables that are simultaneously being managed to clear old rows. :(

Cheers, Robert

Justin Swanhart said...


I use a relation table to store which tables are to be change logged:
table_schema varchar(50),
table_name varchar(50),
mvlog_name varchar(50),
active_flag boolean default true) ENGINE=INNODB;

table_schema, table_name point to existing tables in the database.

mvlog_name records the mapping name (usually schema_table_mvlog) for the changelog table in the Flexviews schema.

To clean up the transfer tables (my _mvlog tables) I have to find the oldest transaction_id which has to be kept around for any particular table, based on the views that are created on the table.

Then I do a DELETE w/ LIMIT 1000 in a loop and remove all records which are older than that id. This delete is always a range condition on a key, so it performs well.

It doesn't matter if I remove all the rows related to a transaction at once, because it is guaranteed that no Flexviews refresh will ever have to read the rows that I am deleting because they are older than the transaction id to which any views which use the base table are refreshed to.

I'll look into using a custom applier. This sounds like exactly what I need.

I'll post to the mailing lists if I get stuck.

Thanks for the help!

Justin Swanhart said...

Also, I don't read from the flexviews.mvlogs table for every RBR entry.

When I start processing a binary log, I cache the entries from the table locally.

I refresh the cache if I see an RBR entry for the table which lists the tables to log: flexviews.mvlog.

if($db == 'flexviews' && $table == 'mvlogs') {

This allows change logging to be enabled/disabled at runtime. As soon as change-logging is turned on the consumer will start collecting changes for that table, in a consistent fashion.

Robert Hodges said...

OK, good luck. You can post problems/questions on the Continuent forums at http://www.continuent.com/community/forums.

Lachlan Mulcahy said...

I'm pretty new to what you guys are doing in this space, but is it the case that you can have multiple pipelines and effectively have multi-threaded replication? -- provided of course that you can setup filters for each pipeline that will isolate them and they don't have any inter-reliance data/timing-wise...

Robert Hodges said...

Hi Lachlan,

That's absolutely correct. I'm writing a parallel replication algorithm that works within a single stage but you can do it using multiple independent stages exactly as you suggest.

Another feature that will go in shortly (I mean within a few days) is multiple replication services within a single replicator process. That would give you the same effect and in addition would allow you to turn pipelines on/off independently of each other.

Robert Hodges said...

I would like to amend my comment about using stages for parallel replication. What's missing is logic for restart--as currently written, stages will trample over positioning information required for restart, because they will all write to a single table named trep_commit_seqno. You would have to write a different applier to fix this.

We will have a commercial version of parallel replication that will fix this and other issues. In the near future, however, you will be able to use multiple replication services as mentioned in the previous comment.