Real-time data publishing is a classic example of a big data replication problem. In this two-part article I will describe recent work on Tungsten Replicator to move data out of MySQL into Vertica at high speed with minimal load on DBMS servers. This feature is known as batch loading. Batch loading enables not only real-time analytics but also any other application that depends on moving data efficiently from MySQL into a data warehouse.
The first article works through the overall solution starting with replication problems for real-time analytics through a description of how Tungsten adapts real-time replication to data warehouses. If you are in a hurry to set up, just skim this article and jump straight to the implementation details in the follow-on article.
Replication Challenges for Real-Time Analytics
To understand some of the difficulties of replicating to a data warehouse, imagine a hosted intrusion detection service that collects access log data from across the web and generates security alerts as well as threat assessments for users. The architecture for this application follows a pattern that is increasingly common in businesses that have to analyze large quantities of incoming data.
Access log entries arrive through data feeds, whereupon an application server checks them to look for suspicious activity and commits results into a front-end DBMS tier of sharded MySQL servers. The front-end tier optimizes for a MySQL sweet spot, namely fast processing of a lot of small transactions.
Next, MySQL data feed as quickly as possible into a Vertica cluster that generates reports to users. Vertica is a popular column store with data compression, advanced projections (essentially materialized views) and built-in redundancy. (For more on Vertica origins and column stores in general, read this.) The back-end DBMS tier optimizes for a Vertica sweet spot, namely fast parallel load and quick query performance.
There are many challenges in building any system that must scale to high numbers of transactions. Replicating from MySQL to Vertica is an especially thorny issue. Here is a short list of problems to overcome.
- Intrusion detection generates a lot of data. This type of application can generate aggregate peak rates of 100,000 updates per second into the front-end DBMS tier.
- Data warehouses handle normal SQL commands like INSERT, UPDATE or DELETE very inefficiently. You need to use batch loading methods like the Vertica COPY command rather than submitting individual transactions as they appear in the MySQL binlog.
- Real applications generate not only INSERTS but also UPDATE and DELETE operations. You need to apply these in the correct order during batch loading or the data warehouse will quickly become inconsistent.
- Both DBMS tiers are very busy, and whatever replication technique you use needs to reduce load as much as possible on both sides of the fence.
Until recently there were two obvious options for moving data between MySQL and Vertica.
- Use an ETL tool like Talend to post batches extracted from MySQL to Vertica.
- Write your own scripts to scrape data out of the binlog, process them with a fast scripting language like Perl, and load the result into Vertica.
The summary, then, is that there is no simple way to provide anything like real-time reports to users when large volumes of data are involved. ETL and home-grown solutions tend to fall down on real-time transfer as well as the extra load they impose on already busy servers. That's where Tungsten comes in.
Our first crack at replicating to data warehouses applied MySQL transactions to Greenplum using the same approach used for MySQL--connect with a JDBC driver and apply row changes in binlog order as fast as possible. It was functionally correct but not very usable. Like many data warehouses, Greenplum processes individual SQL statements around 100 times slower than MySQL. To populate data at a reasonable speed you need to dump changes to CSV and insert them in batches using gpload, an extremely fast parallel loader for Greenplum.
We did not add gpload support at that time, because it was obviously a major effort and we did not understand the implementation very well. However, I spent the next couple of months thinking about how to add CSV-based batch loading to Tungsten. The basic idea was to turn on MySQL row replication on the master and then apply updates to the data warehouse as follows:
- Accumulate a large number of transactions as rows in open CSV files.
- Load the files to staging tables.
- Merge the staging table contents into the base tables using SQL.
That said, most of the work was SMOP, or a simple matter of programming. After
First, as new transactions arrive Tungsten writes them to CSV files named after the Vertica tables to which they apply. For instance, say we have updates for a table simple_tab in schema test with the following format (slightly truncated from the vsql \d output):
Schema | Table | Column | Type | Size |
test | simple_tab | id | int | 8 |
test | simple_tab | f_data | varchar(100) | 100 |
The updates go into file test.simple_tab. Here is an example of the data in the CSV file.
"64087","I","17","Some data to be inserted","1"
"64088","I","18","Some more data to be inserted","2"
Tungsten keeps writing transactions until it reaches the block commit maximum (for example 25,000 transactions). It then closes each CSV file and loads the contents into a staging table that has the base name plus a prefix, here "stage_xxx_." The staging table format mimics the CSV file columns. For example, the previous example might have a staging table like the following:
Schema | Table | Column | Type | Size |
test | stage_xxx_simple_tab | tungsten_seqno | int | 8 |
test | stage_xxx_simple_tab | tungsten_opcode | char(1) | 1 |
test | stage_xxx_simple_tab | id | int | 8 |
test | stage_xxx_simple_tab | f_data | varchar(100) | 100 |
test | stage_xxx_simple_tab | tungsten_row_id | int | 8 |
Finally, Tungsten applies the deletes and inserts to table test.simple_tab by executing SQL commands like the following:
DELETE FROM test.simple_tab WHERE id IN
(SELECT id FROM test.stage_xxx_simple_tab
WHERE tungsten_opcode = 'D');
INSERT INTO test.simple_tab(id, f_data)
SELECT id, f_data
FROM test.stage_xxx_simple_tab AS stage_a
WHERE tungsten_opcode='I' AND tungsten_row_id IN
FROM test.stage_xxx_simple_tab GROUP BY id);
Tungsten MySQL to Vertica replication is currently in field testing. The performance on the MySQL side is excellent, as you would expect with asynchronous replication. On the Vertica side we find that batch loading operates far faster than using JDBC interfaces. Tungsten has a block commit feature that allows you to commit very large numbers of transactions at once. Tests show that Tungsten easily commits around 20,000 transactions per block using CSV files.
We added a specialized batch loader class to perform CSV uploads to Vertica from other hosts, which further reduces the load on Vertica servers. (It still needs a small fix to work with Vertica 5 JDBC but works with Vertica 4.) Taking together the new Vertica replication features look as if they will be very successful for implementing real-time analytics. Reading the binlog on MySQL minimizes master overhead and fetches exactly the rows that have changed within seconds of being committed. Batch loading on Vertica takes advantage of parallel load, again reducing overhead in the reporting tier.
A New Replication Paradigm: Set-Based Apply
Batch loading is significant for reasons other than conveniently moving data between MySQL and Vertica. Batch loading is also the beginning of a new model for replication. I would like to expand on this briefly as it will likely be a theme in future work on Tungsten.
Up until this time, Tungsten Replicator has followed the principle of rigorously applying transactions to replicas in serial order without any deviations whatsoever. If you INSERT and then UPDATE a row, it always works because Tungsten applies them to the slave in the same order. This consistency is one of the reasons for the success of Tungsten overall, as serialization short-cuts usually end up hitting weird corner cases and are also hard to test. However, the serialized apply model is horribly inefficient on data warehouses, because single SQL statements execute very slowly.
The SQL-based procedure for updating replicas that we saw in the previous section is based on a model that I call set-based apply. It works by treating the changes in a group of transactions as an ordered set (actually a relation) consisting of insert and delete operations. The algorithm is easiest to explain with an example. The following diagram shows how three row operation on table t in the MySQL binlog morph to four changes, of which we actually apply only the last two.
Set-based apply merges the ordered change set to the base table using the following rules:
- Delete any rows from the base table where there is change set DELETE for the primary key and the first operation on that key is not an INSERT. This deletes any rows that previously existed.
- Apply the last INSERT on each key provided it is not followed by a DELETE. This inserts any row that was not later deleted.
Set-based apply offers interesting capabilities because sets, particularly relations, have powerful mathematical properties. We can use set theory to reason about and develop optimized handling to solve problems like conflict resolution in multi-master systems. I will get back to this topic in a future post.
Meanwhile, there are obvious ways to speed up the apply process for data warehouses by performing more of the set reduction in Tungsten and less in SQL. We can also take advantage of existing Tungsten parallelization capabilities. I predict that this will offer the same sort of efficiency gains for data warehouse loading as Tungsten parallel apply provides for I/O-bound MySQL slaves. Log-based replication is simply a very good way of handling real-time loading and there are lots of ways to optimize it provided we follow a sound processing model.
This first article on enabling real-time analytics explained how Tungsten loads data in real-time from Vertica to MySQL. The focus has been allowing users to serve up reports quickly from MySQL-based data, but Tungsten replication obviously applies to many other problems involving data warehouses.
In the next article I will turn from dry theory to practical details. We will walk through the details of configuring MySQL to Vertica replication, so that you can try setting up real-time data loading yourself.
P.S. If optimized batch loading seems like something you can help solve, Continuent is hiring. This is just one of a number of cutting-edge problems we are working on.