Showing posts with label mysql. Show all posts
Showing posts with label mysql. Show all posts

Tuesday, September 16, 2014

MySQL Central: It's that time of the year

It's that time of the year again: yes, Oracle Open World is coming up and with that I'll be travelling to San Francisco. New for this year is that we are part of the main Open World event and therefore have our own MySQL Central. Here you will have the opportunity of meeting many of the engineers behind MySQL, discuss technical problems you have, and also learn some about how we look at the future of the MySQL ecosystem.

This year, me and Narayanan Venkateswaran will be presenting two sessions:

Elastic Scalability in MySQL Fabric with OpenStack (Thursday, Oct 2, 1:15 PM-2:00 PM in Moscone South, 252)

In this session you will see how Fabric can use the new provisioning support to fetch servers from an OpenStack instance. The presentation will cover how to use the provisioning support to fetch servers from OpenStack Nova, OpenStack Trove, and also from Amazon AWS. You will also learn about the provisioning interface and how you can use it to create your own hardware registry support.

MySQL Fabric: High Availability at Different Levels (Wednesday, Oct 1, 2:00 PM-2:45 PM in Moscone South - 250)

MySQL Fabric is a distributed system that requires coordination among different components to provide high availability: connectors, servers, and MySQL Fabric nodes must be orchestrated to create a solution resilient in the face of failures. Ensuring that each component alone is fault-tolerant does not guarantee that applications will continue working in the event of a failure. In this session, you will learn how all components in MySQL Fabric were designed to provide a high-availability solution and how they cooperate to achieve this goal. The presentation shows you how to create your own infrastructure to monitor MySQL servers and manage manual switchover or automatic failover operations together with MySQL Fabric.

As every year, it's going to be fun to meet all the people in the MySQL community, both new and old, so I'm looking forward to meeting you all there.

Tuesday, May 27, 2014

MySQL Fabric: Musings on Release 1.4.3

As you might have noticed in the press release, we just released MySQL Utilities 1.4.3, containing MySQL Fabric, as a General Availability (GA) release. This concludes the first chapter of the MySQL Fabric story.

It all started with the idea that it should be as easy to manage and setup a distributed deployments with MySQL servers as it is to manage the MySQL servers themselves. We also noted that some of the features that were most interesting were sharding and high-availability. Since we also recognized that every user had different needs and needed to customize the solution, we set of to create a framework that would support sharding and high-availability, but also other solutions.

With the release of 1.4.3, we have a range of features that are now available to the community, and all under an open source license and wrapped in an easy-to-use package:

  • High-availability support using built-in slave promotion in a master-slave configuration.
  • A framework with an execution machinery, monitoring, and interfaces to support management of large server farms.
  • Sharding using hash and range sharding. Range sharding is currently limited to integers, but hash sharding support anything that looks like a string.
  • Shard management support to move and split shards.
  • Support for failure detectors, both built-in and custom ones.
  • Connectors with built-in load balancing and fail-over in the event of a master failure.

Beyond MySQL Fabric 1.4.3

As the MySQL Fabric story develop, we have a number of challenges ahead.

Loss-less Fail-over. MySQL 5.7 have extended the support for semi-sync so that transactions that are not replicated to a slave server will not be committed. With this support, we can truly have a loss-less fail-over so that you cannot lose a transaction if a single server fails.

More Fabric-aware connectors. We currently have support for Connector/J, Connector/PHP, and Connector/Python, but one common request is to have support for a Fabric-aware C API. This is both for applications developed using C/C++, but also to add Fabric support to connectors based on the MySQL C API, such as the Perl and Ruby connector.

Multi-Node Fabric Instance. Many have pointed out that the Fabric node is a single point of failure, and it is instead a single node, but if the Fabric node goes down, the system do not stop working. Since the connectors cache the data, they can "run on the cache" for the time it takes for the Fabric node to be brought up again. Procedures being executed will stop, but once the Fabric node is on-line again, execution will resume from where it left off. To ensure that the meta-data (the information about the servers in the farm) is not lost in the event of a machine failure, MySQL Cluster can be used as storage engine, and will then ensure that your meta-data is safe.

There are, however, a few advantages in having support for multiple Fabric nodes:

  • The most obvious advantage is that execution can fail-over to another node and there will be no interruption in the execution of procedures. If the fail-over is built-in, you avoid the need for external clusterware to manage several Fabric nodes.
  • If you have several Fabric nodes available to deliver data, you improve responsiveness to bursts in meta-data requests. This can happen if you have a large bunch of connectors brought on-line at the same time.
  • If you have multiple data centers, having a local version of the data to serve the applications deployed in the same center improve locality of data and avoid an unnecessary round-trip over WAN to fetch some meta-data.
  • With several nodes to execute management procedures, you can improve scaling by being able to execute several management procedures in parallel. This would require some solution to avoid that that procedures do no step over each other.
Location Awareness. In deployments spread over several data-centers, the location of all the components suddenly become important. There is no reason for a connector to be directed to a remote server when a local one suffices, but that require some sort of location awareness in the model allowing the location of servers (or other components) to be given.

Extending the model by adding data centers is not enough though. The location of components withing a data center might be important. For example, if a connector is located in a particular rack in the data center, going to a different rack to fetch data might be undesirable. For this reason, the location awareness need to be hierarchical and support several levels, e.g., continent, city, data center, hall, rack, etc.

Multi-Shard Queries. Sharding can improve performance significantly since it split the data horizontally across several machines and each query therefore go directly to the right shard of the data. In some cases, however, you also need to send queries to multiple shards. There are a few reasons for this:

  • You do not have the shard key available, so you want to search all servers for some object of interest. This of course affect performance, but in some cases there are few alternatives. Consider, for example, searching for a person given name and address when the database is sharded on the SSN.
  • You want to generate a report of several items in the database, for example, find all customers above 50 that have more than 2 cars.
  • You want a summary of some statistic over the database, for example, generate a histogram over the age of all your customers.
Session Consistency Guarantees. As Alfranio point out, when you use multiple servers in your farm, and transactions are sent to different servers at different times, it might well be that you write one transaction that goes to the master of a group and then try to read something from the same group. If the write transactions have not reached the server that you read from, then you might get an incorrect result from your transaction. In some cases, this is fine, but in other cases, you have certain guarantees that you want to have on your session. For example, you want to ensure that anything you write will also be available when you read in transactions following the write, you might want to guarantee that multiple reads read later data all the time (called "read monotonicity"), or other forms of guarantees on the result sets you get back from the distributed database. This might require connectors to wait for transactions to reach slaves before reading, but this should be transparent to the application.

This is just a small set of the possibilities for the future, so it is really going to be interesting to see how the MySQL Fabric story develops.

Tuesday, April 29, 2014

MySQL Fabric: Tales and Tails from Percona Live

Going to Percona Live and presenting MySQL Fabric gave me the opportunity to meet a lot of people and get a lot of good feedback. I talked to developers from many different companies and got a lot of great feedback that will affect the priorities we make, so to all I spoke to I would like to say a great "Thank you!" for the interesting discussions that we had. Your feedback is very valuable. It was very interesting to read the comments on MySQL Fabric on MySQL Performance Blog. The article discuss the current version of MySQL Fabric distributed with MySQL Utilities and give some brief points on features of MySQL Fabric. I think it could be good to give some context to some of the points they raise, both to elaborate on the points and show what they mean in reality, and also to give some background to how we were thinking around these points.

The Art of Framing the Fabric

It was a deliberate decision to make MySQL Fabric extensible, so it is not surprising that it have the feel of a framework. By making MySQL Fabric extensible, we allow community and users to explore ideas or add user-specific support.

In the MySQL Team at Oracle we are strong believers in the open source model and are working hard to keep it that way. There are many reasons to why we believe in this model, but one of the reasons is that we do not believe that one size fit all. For any users, there are always minor variations or tweaks that are required by the users own specific needs. This means that the ability to tweak and adapt the solution to their specific needs is very important. Without MySQL being open-source, this would not be possible. As you can see from WebScaleSQL, this is not just a theoretical exercise, this is how companies really use MySQL.

From the start, we therefore focused on building a framework and created the sharding and high-availability as plugins; granted, they are very important plugins, but they are nevertheless plugins. This took a little more effort, and a little more thinking, but by doing it this way we can ensure that the system is truly extensible for everybody.

Hey! I've got a server in my farm!

As noted, many if the issues related to high-availability and sharding require server-side support to get it really solid. This is also something we recognized quite early; the alternative would be to place the logic in the connectors or the Fabric node. We recognized that the right place to solve this is in the server, not in connector layer since that put a lot of complexity at the wrong place. Even if it was possible to handle everything in the connector, there is still a chance that something goes wrong if the constraints are not enforced in the server. This could be because of bugs, because of mistakes in the administration of the server, or any other number of reasons, so to build a solid solution, constraints on the data should be enforced by the servers and not in the connectors or in a proxy.

An example given is that there is no way to check that a row ends up in the right shard, which is very true. A generic solution to this would be to add CHECK constraint on the server, but unfortunately, this is a very big change in the server code-base. Adding triggers to the tables on the server is probably a good short-term solution, but that require managing and deploying extra code on all servers, which in turn is an additional burden on managing the servers, which is something we would like to avoid (the more "special" things you have to do with the servers, the higher the risk is of something going wrong).

On the proximity of things...

One of the central components of MySQL Fabric are the high-availability groups (or just groups, when it is clear from the context) that were discussed in an earlier post. The central idea around a group is that each group manages the same piece of data and MySQL Fabric is designed to handle and coordinate multiple groups into a federation of databases. The feature of being able to manage multiple groups is something that is critical to create a sharded system. On thing that is quite often raised is that it should be possible for a server to belong to multiple groups, but I think this comes from a misunderstanding on what a group represents. It is not a "replica set", which gives information about the topology, that is, how replication is set up, nor does it say anything about how the group is deployed. It is perfectly OK to have members of the group in different data centers (for geographical redundancy), and it is perfectly OK to have replication between groups to support, for example, functional partitioning. If a server belonged to two different groups, it would mean that it manages two different sets of data at the same time.

The fact that group members can be located in different data centers raises another important aspect, something that was often mentioned at Percona Live, that of managing the proximity of components in the system. There is some support for this in Hadoop where you have rack-awareness, but we need a slightly more flexible model. Imagine that you have a group set up with two servers in different data centers and you further have scale-out slaves attached locally. You have connectors deployed in both data centers, but when reading data you do not want to go to the other data center to execute the transaction, it should always be done locally. So, is it sufficient to be able to just have a simple grouping of the components? No, because you can have multiple levels of proximity, for example, data centers, continents, and even rooms or racks within a data center. You can also have different facets that you want to model, such as latency, throughput, or other properties that are interesting for particular uses. For that reason, whatever proximity model we deploy, it need to support a hierarchy and also have a more flexible cost model where you can model different aspects. Given that this problem have been raised several times on Percona Live and also by others, it is likely to be something we need to prioritize.

The crux of the problem

As most of you have already noted, there is a single Fabric node running that everybody talk to. Isn't this a single point of failure? It is indeed, but there is more to the story than just this. A single point of failure is a problem because if it goes down, so does the system... but in this case, it doesn't really go down, it will keep running most of the time.

The Fabric node does a lot of things: it keeps track of the status of all the components of the farm, execute procedures to handle fail-over, and deliver information about the farm on request. However, the connectors are the ones that route the transactions to the correct place, and to avoid having to ask the Fabric node about information each time, the connectors maintain caches. This means that in the event of a Fabric node failure, connectors might not even notice that it is gone unless they had to re-fill their caches. This means that if you restart the Fabric node, it will be able to serve the information again.

Another thing that stops when the Fabric node goes down is that no more fail-overs can be done and ongoing procedures are stopped in their tracks, which could potentially leave the farm in an unknown state. However, the state of the execution of any ongoing procedures are stored in the backing store, so when you bring up the Fabric node again, it will restore the procedures from the backing store and continue executing. This feature alone do not help against a complete loss of the machine where the Fabric node and the backing store are put, but, MySQL Fabric is not relying on specific storage engine features, any transactional engine will do, so by using MySQL Cluster as the storage engine it is possible to ensure safe-keeping of the state.

There are still good reasons to support multi-node Fabric instances:

  • If one Fabric node goes down, it should automatically fail over to another and continue execution. This will prevent any downtime in handling executions.
  • Detecting and bringing up a secondary Fabric node can become very complicated in the case of network partitions since it require handling split-brain scenarios reliably. It is then better to have this built into MySQL Fabric since it makes deployment and management significantly simpler.
  • Management of a farm does not put any significant pressure on the database back-end, but having a single Fabric node can be a bottleneck. In this case, it would be good to be able to execute multiple independent procedures on different Fabric nodes and coordinate the updates.
  • If a lot of connectors are required to fill their caches at the same time, we have a risk of a thundering herd. Having a set of Fabric nodes for read scale-out can then be beneficial.
  • If a group is deployed in two very remote data centers, it is desirable to have a local Fabric node for read-only purposes instead of having to go to the other data center.

More Fabric-aware Connectors

Currently we support connectors for Python, Java, and PHP, but one point that pop up quite often (both at Percona Live and elsewhere) is the lack of a Fabric-aware C connector. It is the basis for implementing both the Perl Database Interface MySQL driver DBD::mysql and for the Ruby connector, but is also desirable in itself for applications using C or C++ connector. All I can say at this point is that we are aware of the situation and know that it is something desired and important.

Interesting links

Tuesday, April 01, 2014

MySQL Fabric 1.4.2 Released

As you saw in the press release, MySQL Fabric 1.4.2 is now released! If you're interested in learning more about MySQL Fabric, there is a session titled Sharding and Scale-out using MySQL Fabric in Ballroom G. MySQL Fabric is a relatively new project in the MySQL ecosystem and it focuses on building a framework for working with large deployments of MySQL Servers. The architecture of MySQL Fabric is such that it allows extensions to be added and the first two extensions that we added were support for high-availability using High-Availability groups (HA groups) and sharding to manage very large databases. The first version of sharding have hash and range sharding implemented as well as procedures for moving and splitting shards.
A critical part of working with a collection of servers is the ability to route transactions to the correct servers, and for efficiency reasons we quite early decided to put this routing logic into the connectors. This avoid one extra network hop and hence improve performance by reducing latency, but it does require that the connectors containing routing logic, caches, and support for fetching data from MySQL Fabric. Putting the routing logic into the connector also make it easy to extend the API to add new support that applications can require.
MySQL Fabric 1.4.2 is distributed as part of MySQL Utilities 1.4.2. To avoid confusion, we have changed the version numbering to match the version of MySQL Utilities it is distributed in.
We have just done a few public releases, even though we did a few internal releases as well, but a brief history of our releases this far is:
  • MySQL Fabric 1.4.0
    • First public release
    • High-Availability groups for modeling farms
    • Event-driven Executor for execution of management procedures.
    • Simple failure detector with fail-over procedures.
    • Hash and Range sharding allowing management of large databases.
    • Shard move and shard split to support management of a sharded database.
    • Connector interfaces to support federated database systems.
    • Fabric-aware Connector/Python (labs)
    • Fabric-aware Connector/J (labs)
    • Fabric-aware Connector/PHP (labs)
  • MySQL Fabric 1.4.1
    • More solid scale-out support in connectors and MySQL Fabric
    • Improvements to the Executor to avoid stalling reads
    • Connector/Python 1.2.0 containing:
      • Range and Hash sharding
      • Load-balancing support
    • Labs release of Connector/J with Fabric-support
  • MySQL Fabric 1.4.2
    • Credentials in MySQL Fabric
    • External failure reporting interfaces supporting external failure detectors
    • Support for unreliable failure detectors in MySQL Fabric
    • Credentials support in Connector/Python
    • Connector/Python 1.2.1 containing:
      • Failure reporting
      • Credentials Support
  • Connector/J 5.1.30 containing Fabric support

  • Do you want to participate?

    There is a lot you can do if you want to help improve MySQL Fabric.

    Blogs about MySQL Fabric

    Tuesday, October 08, 2013

    MySQL Connect presentations on MySQL Fabric available on SlideShare

    Going to MySQL Connect was truly a blast. We got a lot of good questions and feedback in the sessions and there were a lot of interest in both MySQL Fabric and the MySQL Applier for Hadoop.

    A big thank you to all that attended the talks, I got a lot of good questions and comments that will help us build good solutions.

    The talks are available on SlideShare:

    Saturday, September 21, 2013

    A Brief Introduction to MySQL Fabric

    As you saw on the keynote, we are introducing an integrated framework for managing farms of MySQL servers with support for both high-availability and sharding. It should be noted that this is a very early alpha and that it at this point is not ready for production use.

    MySQL Fabric is an integrated system for managing a collection of MySQL servers and is the framework on which high-availability and sharding is built. MySQL Fabric is open-source and is intended to be extensible, easy to use, and support procedure execution even in the presence of failure, an execution model we call resilient execution.

    To ensure high-availability, it is necessary to have redundancy in the system. For database systems, the redundancy traditionally takes the form of having a primary server acting as a master and using replication to keep secondaries available to take over in case the primary fails. This means that the "server" that the application connects to is in reality a collection of servers, not a single server. In a similar manner, if the application is using a sharded database, it is in reality working with a collection of servers, not a single server. In this case, we refer to a collection of servers as a farm.

    Now, just having a collection of servers does not really help us that much: it is necessary to have some structure imposed on the farm as well as an API to work with the farm, and this is where MySQL Fabric comes in.

    Before going over the concepts, have a look at the farm below. In the figure, there is an application that want to connect to the farm and there are a set of database servers at the bottom organized into groups called high-availability groups. To manage the structure of the farm, there is a MySQL Fabric Node available that keeps, among other things, track of the meta-data as well as handles procedure execution. Both the application and an operator can connect to the Fabric node to get information about the farm being managed.


    The MySQL Fabric nodes are responsible for keeping track of the structure of the farm as well as all information about their status and is also where any management procedures are executed. If a server fails, a the Fabric node will handle the procedure of promoting one of the slaves to be the new master, but it also contain the logic for handling shard moving and shard splitting.

    High-Availability Group

    High-availability Groups


    The central concept to handling high-availability in Fabric are the high-availability groups. These are collections of servers that shall work together to deliver a database service to the application that connects. The groups are introduced to give structure to the farm and allow you to describe how the servers are organized to support the redundancy necessary to ensure high-availability.

    Inside the group, all the servers manage the same data and hence have the same schema. In addition, each server have distinct roles and no server can belong to more than one group. The group concept does not really represent anything new: it is just a way to structure the farm in such a manner that managing it is easy to understand and the roles of servers are clear. To manage redundancy, a traditional Master-Slave Setup is used, a topology that we all are familiar with (it is often called the Primary-Secondary Approach, hence the names that follow). Each group have a primary that is is the master for all the data. Any queries that update data is sent here and that data is propagated to the other servers in the group. Redundancy is achieved by keeping one or more secondaries in the group that receive changes from the primary and are ready to take over the role as primary should the primary dissapear. To handle scale-out, the group also contain scale-out servers which are severs that receive changes from the primary but are not eligable for being promoted to primary. In the group, there are also spares, which are servers that are available for use but which are not assigned any active role yet.

    Sharding


    In addition to high-availability support, MySQL Fabric also offer support for sharding, which is a technique for handling very large databases and/or very high write loads. The database is split into a large number of shards, where each shard contain a fragment of the data in the database. Each shard is stored on a separate server (or a separate set of servers if you want to ensure high-availability) and the transactions are directed to each shard based on the shard key. Splitting the database in this way allow you both to manage a larger database by separating it onto more servers, but it also scale the write traffic because you can execute writes independently.

    When using sharding, MySQL Fabric separate tables into sharded tables MySQL Fabric allow you to shard just some of the tables in the database and keep the other tables available on all shards, which we call global tablesand global tables. Since databases usually have multiple tables with foreign key relationships between, it is critical to be able to shard several tables the same way (but possibly on different columns), which is something that MySQL Fabric supports. Using this support, you can give all the tables that are sharded and what column should be used as the sharding key and MySQL Fabric will shard all tables and distribute the rows on the shards. Tables that are not sharded are the global tables and they will be available on all shards.

    If you want to know more about how Fabric support sharding, or about sharding in general, you should come to the session MySQL Sharding: Tools and Best Practices for Horizontal Scaling, September 21, 4:00pm-5:00pm in Imperial Ballroom B.


    Connecting to a MySQL Farm


    To provide better control when working with a MySQL farm we have extended the connector API in such a manner that it hides the complexities of handling fail-over in the event of a server failure as well as dispatching transactions to shards correctly. There is currently support for Fabric-aware versions of Connector/J, Connector/PHP, Connector/Python as well as some rudimentary support for Hibernate and Doctrine. If you are interested in how the extensions to the interface look and how you can use them to scale your application, you should come to Scaling PHP Applications, September 22, 10:00am-11:00am in Union Square Room 3/4.


    More information about MySQL Fabric


    There are several blogs being published on high-availability and sharding from the developers working on the Fabric system or the connectors.


    If you are interested in discussing and asking questions about MySQL Fabric, or sharding and high-availability in general, the forum on Fabric, Sharding, HA, Utilities is an excellent place for discussions. Also, if you are at MySQL Connect, going to MySQL Sharding, Replication, and HA (September 21, 5:30-6:30pm in Imperial Ballroom B) is an excellent opportunity to learn more about the project, meet us developers and team leads, and provide feedback to us. The BOF will cover several areas, some overlapping, but the discussion is likely to cover MySQL Fabric and MySQL replication.








    Thursday, August 29, 2013

    Going to MySQL Connect 2013



    MySQL Connect 2013 is coming up with several interesting new sessions. Some sessions that I am participating in got accepted for the conference, so if you are going there, you might find the following sessions interesting. For your convenience, the sessions have hCalendar markup, so it should be easier to add them to your calendar.

    MySQL Sharding, Replication, and HA (September 21, 5:30-6:30pm in Imperial Ballroom B)

    This session is an opportunity for you to meet the MySQL engineering team and discuss the latest tools and best practices for sharding MySQL across distributed server farms while maintaining high availability.

    Come here and meet Lars, Luis, Johannes, and me, and bring up any questions or comments you have regarding sharding, high-availability, and replication. We might have some informal presentations available to discuss sharding and high-availability issues.

    MySQL Sharding: Tools and Best Practices for Horizontal Scaling (September 21, 4:00pm-5:00pm in Imperial Ballroom B)

    In this session, Alfranio and I will discuss how to create and manage a sharded MySQL database system. The session covers tools, techniques, and best practices for handing various aspects of sharding such as:

    • Hybrid approaches mixing sharding and global tables.
    • Sharding in the precense of transactions and how that affect
      applications
    • Turning an unsharded database into a sharded database
      system.
    • Handling schema changes to a sharded database.

    MySQL and Hadoop: Big Data Integration—Unlocking New Insights (September 22,
    11:30am
    -12:30pm in Taylor)

    Hadoop enables organizations to gain deeper insight into their customers, partners, and processes. As the world's most popular open source database, MySQL is a key component of many big data platforms. This session discusses technologies that enable integration between MySQL and Hadoop, exploring the lifecycle of big data, from acquisition via SQL and NoSQL APIs through to delivering operational insight and tools such as Sqoop and the Binlog API with Hadoop Applier enabling both batch and real-time integration.

    MySQL High Availability: Managing Farms of Distributed Servers (September 22, 5:30pm-6:30pm in Imperial Ballroom B)

    In this session, Alfranio and I will discuss tools, best practices, and frameworks for delivering high availability using MySQL. For example, handling such issues as ensuring server redundancy, handling failure detection and executing recovery, re-directing clients in the event of failure.

    Scaling PHP Applications (September 22, 10:00am-11:00am in Union Square Room 3/4)

    When building a PHP application, it is necessary to consider both scaling and high-availability issues. In this session, Johannes and I will discuss how to ensure that your PHP application scales well and can work in a high-availability environment.

    Thursday, October 11, 2012

    Round Robin Replication using GTID

    In a previous post I showed how to implement multi-source round-robin replication in pure SQL using the tables that are needed for crash-safe replication. I also outlined a revised version of this approach in the Replication Tips & Tricks presentation I gave at MySQL Connect. This was, however, before the GTID (Global Transaction ID) implementation was done. Now that they are introduced, multi-source replication is even easier since you no longer have to keep track of the positions.

    Figure 1. Tables for storing information about masters
    CREATE TABLE my_masters (
        idx INT AUTO_INCREMENT,
        host CHAR(50) NOT NULL,
        port INT NOT NULL DEFAULT 3306,
        PRIMARY KEY (idx),
        UNIQUE INDEX (host,port)
    );
    
    CREATE TABLE current_master (
        idx INT
    );
    
    CREATE TABLE replication_user(
        name CHAR(40),
        passwd CHAR(40)
    );

    One caveat is that this only works if you are replicating from servers that have GTID enabled, so if you are trying to replicate from a pre-5.6 server, you can use the original implementation. I have added a re-factored version of the code last in this post, and you can also find some utility procedures that I use in the version described here.

    For the version that uses GTID, we still keep the two tables we were using in the original implementation around, but we remove the file and position from the tables, giving the definitions seen in Figure 1. Also, the user and password is stored in a separate table and is assumed to be identical for all machines.

    To fetch the new master, I created a fetch_next_master procedure that fetches the next master in turn and then advance current_master to the next master. The second select in the code below is used to handle the case that you have a table with masters defined as in Table 1.


    delimiter $$
    CREATE PROCEDURE fetch_next_master(
        OUT p_host CHAR(50), OUT p_port INT UNSIGNED,
        OUT p_file CHAR(50), OUT p_pos BIGINT)
    BEGIN
       DECLARE l_next_idx INT DEFAULT 1;
    
       SELECT idx INTO l_next_idx FROM my_masters
        WHERE idx > (SELECT idx FROM current_master)
        ORDER BY idx LIMIT 1;
    
       SELECT idx INTO l_next_idx FROM my_masters
        WHERE idx >= l_next_idx
        ORDER BY idx LIMIT 1;
    
       UPDATE current_master SET idx = l_next_idx;
    
       SELECT host, port INTO p_host, p_port
         FROM my_masters WHERE idx = l_next_idx;
    END $$
    delimiter ;

    Since we no longer need to save the position, the code for multi_source event is significantly simpler. All that is necessary is to change master to the next master in turn: the server remembers what transactions are missing automatically and will start replicating from the correct position.

    delimiter $$
    CREATE EVENT multi_source
        ON SCHEDULE EVERY 1 MINUTE DO
    BEGIN
       DECLARE l_host CHAR(50);
       DECLARE l_port INT UNSIGNED;
       DECLARE l_user CHAR(40);
       DECLARE l_passwd CHAR(40);
       DECLARE l_file CHAR(50);
       DECLARE l_pos BIGINT;
    
       SET SQL_LOG_BIN = 0;
    
       CALL stop_slave_gracefully();
       START TRANSACTION;
       CALL fetch_next_master(l_host, l_port);
       SELECT name, passwd INFO l_user, l_passwd FROM replication_user;
       CALL change_master(l_host, l_port, l_user, l_passwd);
       COMMIT;
       START SLAVE;
    END $$
    delimiter ;

    Full code for original implementation


    Here is the code for replicating from pre-5.6 to 5.6 using the replication tables added for implementing crash-safe slaves.

    Compared to the version described in the earlier post, I have added a few utility procedures such as a procedure to stop the slave gracefully. The procedure will first stop the I/O thread, and then empty the relay log before stopping the SQL thread. This is mainly to avoid having to re-transfer a lot of events from the master. Compared to the version provided in the previous post, I factored out some separate procedures. You can see the re-factored version last in the post.


    delimiter $$
    CREATE PROCEDURE change_master(
        p_host CHAR(40), p_port INT,
        p_user CHAR(40), p_passwd CHAR(40),
        p_file CHAR(40), p_pos LONG)
    BEGIN
       SET @cmd = CONCAT('CHANGE MASTER TO ',
                         CONCAT('MASTER_HOST = "', p_host, '", '),
                         CONCAT('MASTER_PORT = ', p_port, ', '),
                         CONCAT('MASTER_USER = "', p_user, '", '),
                         CONCAT('MASTER_PASSWORD = "', p_passwd, '"'));
    
       IF p_file IS NOT NULL AND p_pos IS NOT NULL THEN
         SET @cmd = CONCAT(@cmd,
                           CONCAT(', MASTER_LOG_FILE = "', p_file, '"'),
                           CONCAT(', MASTER_LOG_POS = ', p_pos));
       END IF;
       PREPARE change_master FROM @cmd;
       EXECUTE change_master;
       DEALLOCATE PREPARE change_master;
    END $$
    delimiter ;
    
    delimiter $$
    CREATE PROCEDURE save_position()
    BEGIN
       DECLARE l_idx INT UNSIGNED;
       DECLARE l_msg CHAR(60);
    
       UPDATE my_masters AS m,
              mysql.slave_relay_log_info AS rli
          SET m.log_pos = rli.master_log_pos,
              m.log_file = rli.master_log_name
        WHERE idx = (SELECT idx FROM current_master);
    END $$
    delimiter ;
    
    delimiter $$
    CREATE PROCEDURE fetch_next_master(
        OUT p_host CHAR(40), OUT p_port INT UNSIGNED,
        OUT p_file CHAR(40), OUT p_pos BIGINT)
    BEGIN
       DECLARE l_next_idx INT DEFAULT 1;
    
       SELECT idx INTO l_next_idx FROM my_masters
        WHERE idx > (SELECT idx FROM current_master)
        ORDER BY idx LIMIT 1;
    
       SELECT idx INTO l_next_idx FROM my_masters
        WHERE idx >= l_next_idx
        ORDER BY idx LIMIT 1;
    
       UPDATE current_master SET idx = l_next_idx;
    
       SELECT host, port, log_pos, log_file
         INTO p_host, p_port, p_pos, p_file
         FROM my_masters
        WHERE idx = l_next_idx;
    END $$
    delimiter ;
    
    delimiter $$
    CREATE EVENT multi_source
        ON SCHEDULE EVERY 10 SECOND DO
    BEGIN
       DECLARE l_host CHAR(40);
       DECLARE l_port INT UNSIGNED;
       DECLARE l_user CHAR(40);
       DECLARE l_passwd CHAR(40);
       DECLARE l_file CHAR(40);
       DECLARE l_pos BIGINT;
    
       SET SQL_LOG_BIN = 0;
    
       STOP SLAVE;
       START TRANSACTION;
       CALL save_position();
       CALL fetch_next_master(l_host, l_port, l_file, l_pos);
       SELECT name, passwd INTO l_user, l_passwd FROM replication_user;
       CALL change_master(l_host, l_port, l_user, l_passwd, l_file, l_pos);
       COMMIT;
       START SLAVE;
    END $$
    delimiter ;




    Tuesday, June 05, 2012

    Binary Log Group Commit in MySQL 5.6

    With the release of MySQL 5.6 binary log group commit is included, which is a feature focused on improving performance of a server when the binary log is enabled. In short, binary log group commit improve performance by grouping several writes to the binary log instead of writing them one by one, but let me digress a little on how transactions are logged to the binary log before going into the details. Before going into details about the problem and the implementation, let look at what you do to turn it on.

    Nothing.

    Well... we actually have a few options to tweak it, but nothing required to turn it on. It even works for existing engines since we did not have to extend the handlerton interface to implement the binary log group commit. However, InnoDB has some optimizations to take advantage of the binary log group commit implementation.

    binlog_order_commits={0|1}
    This is a global variable that can be set without stopping the server.

    If this is off (0), transactions may be committed in parallel. In some circumstances, this might offer some performance boost. For the measurements we did, there were no significant improvement in throughput, but we decided to keep the option anyway since there are special cases were it can offer improvements.

    binlog_max_flush_queue_time=microseconds
    This variable controls when to stop skimming the flush queue (more about that below) and move on as soon as possible. Note that this is not a timeout on how often the binary log should be written to disk since grabbing the queue and writing it to disk takes time.

    Transactions galore...

    As the server executes transactions the server will collect the changes done by the transaction in a per-connection transaction cache. If statement-based replication is used, the statements will be written to the transaction cache, and if row-based replication is used, the actual rows changed will be written to the transaction cache. Once the transaction commits, the transaction cache is written to the binary log as one single block. This allow each session to execute independently of each others and only need to take a lock on the binary log when writing the transaction data to it. Since transactions are isolated from each others it is enough to serialize the transactions on commits. (Of course, this is in an ideal world. Transactions can see other transactions changes if you set a different transaction isolation level. You would never do that unless you knew exactly what you're doing... right?)
    Figure 1. Two-Phase Commit Protocol (2PC)
    In order to keep the storage engine and the binary log in sync, the server employs a two-phase commit protocol (or just 2PC) that you can see in Figure 1. As you can see, there is a call to write() and one call to fsync() in the diagram: I'll get to that is just a moment, so stay tuned.

    The entire point of using a two-phase commit protocol is to be able to guarantee that the transaction is either both in the engine and the binary log (or in neither) even in the event that the server crashes after the prepare, and subsequently recovers. That is, it should not be possible that the transaction is in the engine but not in the binary log, or vice verse. Two-phase commit solves this by requiring that once a transaction is prepared in the engine, it can be either fully committed or fully rolled back even if the server crashes and recover. So, on recovery, the storage engine will then provide the server with all the transactions that are prepared but not yet committed, and the server will then commit the transaction if it can be found in the binary log, and roll it back otherwise.

    This is, however, only possible if the transaction can be guaranteed to be persistent in the binary log before committing the transaction in the engine. Since disks are slow and memory fast, the operating system tries to improve performance by keeping part of the file in memory instead of writing directly to disk. Once enough changes have been written, or the memory is needed for something else, the changes are written to disk. This is good for the operating system (and also for anybody using the computer), but causes a problem for the database server since if the server crashes, it is possible that the transaction is committed in the storage engine, but there is no trace of it in the binary log.

    For recovery to work properly, it is therefore necessary to ensure that the file is really on disk, which is why there is a call to fsync() in Figure 1, which makes the in-memory part of the file to be written to disk.

    The Infamous prepare_commit_mutex

    Figure 2. Missing un-committed transactions
    When the server recovers, it has access to the binary log and can therefore decide what to commit and what to rollback, but what if there is no binary log?

    In the general case, a recovery can just roll back all prepared transactions and start again. After all, the transactions that were just prepared but not committed are safe to roll back. They just move the database to the state it had just before starting those transactions. Any clients being connected has not got an indication that the transaction is committed, so they will realize that the transactions have to be re-executed.

    There is another case where recovery is being used in this way and that is when using on-line backup methods such as InnoDB Hot Backup (which is used in the MySQL Enterprise Backup). These tools take a copy of the database files and InnoDB transaction logs directly—which is an easy way to take a backup—but it means that the transaction logs contain transactions that have just been prepared. On recovery, they roll back all the transactions and have a database in a consistent state.

    Since these on-line backup methods are often used to bootstrap new slaves, the binary log position of the last committed transaction is written in the header of the InnoDB redo log. On recovery, the recovery program print the binary log position of the last committed transaction and you can use this information with the CHANGE MASTER command to start replicating from the correct position. For this to work correctly, it is necessary that all the transactions are committed in the same order as they are written to the binary log. If they are not, there can be "holes" where some transactions are written to the binary log, but not yet committed, which cause the slave to miss transactions that were not committed. The problematic case that can arise is what you see in Figure 3 below.

    Figure 3. Committing in parallel
    You can see an example of this in Figure 2, where replication will start from the last committed position, but there is a transaction that were just prepared and hence was rolled back when the backup was restored on the slave.

    To solve this, InnoDB added a mutex called the prepare_commit_mutex that was taken when preparing a transaction and released when committing the transaction. This is a simple solution to the problem, but causes some problems that we will get to in a minute. Basically, the prepare_commit_mutex solve the problem by forcing the call sequence to be as in Figure 4.

    Figure 4. Sequencing transactions

    Steady as she goes... NOT!

    Since disk writes are slow, writing every transaction to disk will affect performance quite a lot... well, actually very much...

    To try to handle that, there is a server option sync_binlog that can be set to how often the binary log should be written to disk. If it is set to 0, the operating system will decide on when the file pages should be written to disk, if it is set to 1, then fsync() will be called after every transaction being written to the binary log. In general, if you set sync_binlog to N, you can at most lose N-1 transactions, so in practice there are just two useful settings: sync_binlog=0 means that you accept that some transactions can be lost and handle it some other way, and sync_binlog=1 means that you do not accept to lose any transactions at all. You could of course set it to some other value to get something in between, but in reality you can either handle transaction loss or not.

    To improve performance, the common case is to bundle many writes with each sync: this is what the operating system does, and that is what we should be able to do. However, if you look at Figure 4 you see that there is no way to place a fsync() call in that sequence so that several transactions are written to disk at the same time. Why? Because at any point in that sequence there is at most one prepared and written transaction. However, if you go back to Figure 3, you can see that it would be possible to place an fsync() as shown and write several transactions to disk at the same time. If it was possible, then all transactions written to the binary log before the fsync() call would be written to disk at once. But this means that it is necessary to order the commits in the same order as the writes without using the prepare_commit_mutex.

    So, how does all this work then...

    The binary log group commit implementation used split the commit procedure into several stages as you can see in Figure 5. The stages are entirely internal to the binary log commit procedure and does not affect anything else. In theory, it would be possible to have another replication implementation with another policy for ordering commits. Since the commit procedure is separated into stages, there can be several threads processing transactions at the same time, which also improves throughput.
    Figure 5. Commit Procedure Stages
    For each stage, there is an input queue where sessions queue up for processing. If a thread registers in an empty queue, it is considered the stage leader otherwise, the session is a follower. Stage leaders will bring all the threads in the queue through the stage and then register the leader and all followers for the next stage. Followers will move off to the side and wait for a leader to signal that the entire commit is done. Since it is possible that a leader registers to a non-empty queue, a leader can decide to become a follower and go off waiting as well, but a follower can never become a leader.

    When a leader enters a stage, it will grab the entire queue in one go and process it in order according to the stage. After the queue is grabbed, other sessions can register for the stage while the leader processes the old queue.

    In the flush stage, all the threads that registered will have their caches written to the binary log. Since all the transactions are written to an internal I/O cache, the last part of the stage is writing the memory cache to disk (which means it is written to the file pages, also in memory).

    In the sync stage, the binary log is synced to disk according to the settings of sync_binlog. If sync_binlog=1 all sessions that were flushed in the flush stage will be synced to disk each time.

    In the commit stage, the sessions will that registered for the stage will be committed in the engine in the order they registered, all work is here done by the stage leader. Since order is preserved in each stage of the commit procedure, the writes and the commits will be made in the same order.

    After the commit stage has finished executing, all threads that were in the queue for the commit stage will be marked as done and will be signaled that they can continue. Each session will then return from the commit procedure and continue executing.

    Thanks to the fact that the leader registers for the next queue and is ready to become a follower, the stage that is slowest will accumulate the most work. This is typically the sync stage, at least for normal hard disks. However, it is critical to fill the flush stage with as many transactions as possible, so the flush stage is treated a little special.

    In the flush stage, the leader will skim the the sessions one by one from the flush queue (the input queue for the flush stage). As long as the last session was not remove the from the queue, or the first session was unqueued more than binlog_max_flush_queue_time microseconds ago, this process will continue. There are two different conditions that can stop the process:

    • If the queue is empty, the leader immediately advanced to the next stage and registers all sessions processed to the sync stage queue.
    • If the timeout was reached, the entire queue is grabbed and the sessions transaction caches are flushed (as before). The leader then advance to the sync stage.
    Figure 6. Comparing 5.6.5 and 5.6 June labs release

    Performance, performance, performance...

    I'm sure you all wonder what the improvements are, so without further delay, let's have a look at the results of some benchmarks we have done on the labs tree. There has been several improvements that is not related to the binary log, so I will just focus on the results involving the binary log. In Figure 6 you see a benchmark comparing the 5.6.5 release with the 5.6 June labs release using the binary log. These benchmarks were executed on an 2.00 GHz 48-core Intel® Xeon® 7540 with 512 GiB memory and using SSD disks.

    As you can see, the throughput has increased tremendously, with increases ranging from a little less than 2 and approaching 4 times that of 5.6.5. To a large extent, the improvements are in the server itself, but what is interesting is that with binary log group commit, the server is able to keep the pace. Even with sync_binlog=0 on 5.6.5 and sync_binlog=1 on the 5.6 labs release, the 5.6 labs release outperforms 5.6.5 by a big margin.

    Another interesting aspect is that even with sync_binlog=1 the server performs nearly as well when using sync_binlog=0. On higher number of connections (roughly more than 50), the difference in throughput is varying between 0% [sic] and with a more typical throughput between 5% and 10%. However, there is a drop of roughly 20% in the lower range. This looks very strange, especially in the light that the performance is almost equal in the higher range, so what is causing that drop and is there anything that can be done about it?

    Figure 7. Benchmark of Binary Log Group Commit
    The answer comes from some internal benchmarks done while developing the feature. For these tests we were using Sysbench on a 64-core Intel® Xeon® X7560 running at 2.27GHz with 126 GB memory and a HDD.

    In the benchmarks that you can see in Figure 7 the enhanced version of 5.6 with and without the binary log group commit is compared. The enhanced version of 5.6 include some optimizations to improve performance that are not available in the latest 5.6 DMR, but most are available in the labs tree. However, these are benchmarks done while developing, so it is not really possible to compare them with Figure 6 above, but it will help understand why we have a 20% drop at the lower number of connections.

    The bottom line in Figure 7 is the enhanced 5.6 branch without binary log group commit and using sync_binlog=1, which does not scale very well. (This is nothing new, and is why implementing binary log group commit is a good idea.) Note that even at sync_binlog=0 the staged commit architecture scale better than the old implementation. If you look at the other lines in the figure, you can see that even when the enhanced 5.6 server is running with sync_binlog=0, the binary log group commit implementation outperforms the enhanced 5.6 branch at roughly 105 simultaneous threads with sync_binlog=1.

    Also note that the difference between sync_binlog=1 and sync_binlog=0 is diminishing as the number of simultaneous connections is increased, to vanish completely at roughly 160 simultaneous connections. We haven't made a deep analysis of this, but while using the performance schema to analyze the performance of each individual stage, we noted that the sync time was completely dominating the performance (no surprise there, just giving the background), and that all available transactions "piled up" in the sync stage queue. Since each connection can at most have one ongoing transaction, it means that at 32 connections, there can never be more than 32 transactions in the queue. As a matter of fact, one can expect that over a long run, roughly half of the connections are in the queue and half of the connections are inside the sync stage (this was also confirmed in the measurements mentioned above), so at lower number of connections it is just not possible to fill the queue enough to utilize the system efficiently.

    The conclusion is that reducing the sync time would probably make the difference between sync_binlog=0 and sync_binlog=1 smaller even on low number of connections. We didn't do any benchmarks using disks with battery-backed caches (which should reduce the sync time significantly, if not entirely eliminates it), but it would be really interesting to see the effect of that on performance.

    Summary and closing remarks

    • The binary logging code has been simplified and optimized, leading to improved performance even when using sync_binlog=0.
    • The prepare_commit_mutex is removed from the code and instead the server orders transactions correctly.
    • Transactions can be written and committed as groups without losing any transactions, giving around 3 times improvement in performance on both sync_binlog=1 and sync_binlog=0.
    • The difference between sync_binlog=0 and sync_binlog=1 is small and reduces as the load increases on the system.
    • Existing storage engines benefit from binary log group commit since there are no changes to the handlerton interface.
    Binary log group commit is one of a range of important new enhancements to replication in MySQL 5.6, including global transaction IDs (GTIDs), multi-threaded slave, crash safe slave and binary log, replication event checksums, and some more. You can learn more about all of these from our DevZone article:
    dev.mysql.com/tech-resources/articles/mysql-5.6-replication.html

    You can also try out binary log group commit today by downloading the latest MySQL 5.6 build that is available on labs.mysql.com

    Related links

    • It all started with this post where Mark points out the problem and show some results of their implementation.
    • The next year, Kristian Nielsen implemented binary log group commit for MariaDB and has a lot of good posts on the technical challenges in implementing it. This implementation is using an atomic queue and does flushing and syncing of the transactions as a batch, after which the sessions are signalled in order and commit their transactions.

    Monday, February 20, 2012

    Pythonic Database API: Now with Launchpad

    In a previous post, I demonstrated a simple Python database API with a syntax similar to jQuery. The goal was to provide a simple API that would allow Python programmers to use a database without having to resort to SQL, nor having to use any of the good, but quite heavy, ORM implementations that exist. The code was just an experimental implementation, and I was considering putting it up on Launchpad.
    I did some basic cleaning of the code, turned it into a Python package, and pushed it to Launchpad. I also added some minor changes, such as introducing a define function to define new tables instead of automatically creating one when an insert was executed. Automatically constructing a table from values seems neat, but in reality it is quite difficult to ensure that it has the right types for the application. Here is a small code example demonstrating how to use the define function together with some other operations.
    import mysql.api.simple as api
    
    server = api.Server(host="example.com")
    
    server.test_api.tbl.define(
        { 'name': 'more', 'type': int },
        { 'name': 'magic', 'type': str },
    )
    
    items = [
        {'more': 3, 'magic': 'just a test'},
        {'more': 3, 'magic': 'just another test'},
        {'more': 4, 'magic': 'quadrant'},
        {'more': 5, 'magic': 'even more magic'},
    ]
    
    for item in items:
        server.test_api.tbl.insert(item)
    
    The table is defined by providing a dictionary for each row that you want in the table. The two most important fields in the dictionary is name and type. The name field is used to supply a name for the field, and the type field is used to provide a type of the column. The type is denoted using a basic Python type constructor, which then maps internally to a SQL type. So, for example, int map to the SQL INT type, and bool map to the SQL type BIT(1). This choice of deciding to use Python types are simply because it is more natural for a Python programmer to define the tables from the data that the programmer want to store in the database. I this case, I would be less concerned with how the types are mapped, just assuming that it is mapped in a way that works. It is currently not possible to register your own mappings, but that is easy to add.So, why provide the type object and not just a string with the type name? The idea I had here is that since Python has introspection (it is a dynamic language after all), it would be possible to add code that read the provided type objects and do things with them, such as figuring out what fields there are in the type. It's not that I plan to implement this, but even though this is intended to be a simple database interface, there is no reason to tie ones hands from start, so this simple approach will provide some flexibility if needed in the future.

    Links

    Some additional links that you might find useful:
    Connector/Python
    You need to have Connector/Python installed to be able to use this package.
    Sequalize
    This is a JavaScript library that provide a similar interface to a database. It claims to be an ORM layer, but is not really. It is more similar to what I have written above.
    Roland's MQL to SQL and Presentation on SlideShare is also some inspiration for alternatives.

    Monday, January 23, 2012

    MySQL: Python, Meta-Programming, and Interceptors

    I recently found Todd's posts on interceptors which allow callbacks (called interceptors) to be registered with the connector so that you can intercept a statement execution, commit, or any of the many extension points supported by Connector/Java. This is a language feature that allow you to implement a number of new features without having to change the application code such as load-balancing policies, profiling queries or transactions, or debugging an application.

    Since Python is a dynamic language, it is easy to add interceptors to any method in Connector/Python, without having to extend the connector with specific code. This is something that is possible in dynamic languages such as Python, Perl, JavaScript, and even some lesser known languages such as Lua and Self. In this post, I will describe how and also give an introduction to some of the (in my view) more powerful features of Python.

    In order to create an interceptor, you need to be able to do these things:

    • Catch an existing method in a class and replace it with a new one.
    • Call the original function, if necessary.
    • For extra points: catch an existing method in an object and replace a new one.
    You will in this post see how all three of these problems are solved in Python. You will see and use decorators to be able to define methods in existing classes and object, and closures to be able to call the original version of the methods. By picking this approach, it will not be necessary to change the implementation: in fact, you can use this code to replace any method in any class, not only in Connector/Python.

    Table 1. Attributes for methods
    Method Instance
    Name Unbound Bound
    __name__ Name of Method
    im_func "Inner" function of the method
    im_self None Class instance for the method
    im_class Class that the method belongs to
    In addition to being able to replace methods in the class, we would also like to be able to replace methods in instances of a class ("objects" in the traditional sense). This is useful to create specialized objects, for example for tracking particular cases where a method is used.

    In order to understand how the replacement works, you should understand that in Python (and the dynamic languages mentioned above), all objects can have attributes, including classes, functions, and a bunch of other esoteric constructions. Each type of object has a set of pre-defined attributes with well-defined meaning. For classes (and class instances), methods are stored as attributes of the class (or class instance) and can therefore be replaced with other methods that you build dynamically. However, it requires some tinkering to take an existing "normal" function definition and "imbue" it with whatever "tincture" that makes it behave as a method of the class or class instance.

    Depending on where the method comes from, it can be either unbound and bound. Unbound methods are roughly equivalent to member function pointers in C++: they reference a function, but not the instance. In contrast, bound methods have an instance tied to it, so when you call them, they already know what instance they belong to and will use it. Methods have a set of attributes, of which the four in Table 1 interests us. If a method is fetched from a class (to be precise, from a class object), it will be unbound and im_self will be None. If the method is fetched from a class instance, it will be bound and im_self will be set to the instance it belongs to. These attributes are all the "tincture" you need make our own instance methods. The code for doing the replacement described above is simply:

    import functools, types
    
    def replace_method(orig, func):
        functools.update_wrapper(func, orig.im_func)
        new = types.MethodType(func, orig.im_self, orig.im_class)
        obj = orig.im_self or orig.im_class
        setattr(obj, orig.__name__, new)
    
    The function uses two standard modules to make the job simpler, but the steps are:
    1. Copy the meta-information from the original method function to the new function using update_wrapper. This copies the name, module information, and documentation from the original method function to make it look like the original method.
    2. Create a new method instance from the method information of the original method using the constructor MethodType, but replace the "inner" function with the new function.
    3. Install the new instance method in the class or instance by replacing the attribute denoting the original method with the new method. Depending on whether the function is given a bound or unbound instance, either the method in the class or in the instance is replaced.
    Using this function you can now replace a method in a class like this:
    from mysql.connector import MySQLCursor
    
    def my_execute(self, operation, params=None):
      ...
    
    replace_method(MySQLCursor.execute, my_execute)
    
    This is already pretty useful, but note that you can also replace only a specific instance as well by using replace_method(cursor.execute, my_execute). It was not necessary to change anything inside Connector/Python to intercept a method there, so you can actually apply this to any method in any of the classes in Connector/Python that you already have available. In order to make it even easier to use you'll see how to define a decorator that will install the function in the correct place at the same time as it is defined. The code for defining a decorator and an example usage is:
    import functools, types
    from mysql.connector import MySQLCursor
    
    def intercept(orig):
        def wrap(func):
            functools.update_wrapper(func, orig.im_func)
            meth = types.MethodType(func, orig.im_self, orig.im_class)
            obj = orig.im_self or orig.im_class
            setattr(obj, orig.__name__, meth)
            return func
        return wrap
    
    # Define a function using the decorator
    @intercept(MySQLCursor.execute)
    def my_execute(self, operation, params=None):
      ...
    
    The @intercept line before the definition of my_execute is where the new descriptor is used. The syntax is a shorthand that can be used to do some things with the function when defining it. It behaves as if the following code had been executed:
    def _temporary(self, operation, params=None):
      ...
    my_execute = intercept(MySQLCursor.execute)(_temporary)
    
    As you can see here, whatever is given after the @ is used as a function and called with the function-being-defined as argument. This explains why the wrap function is returned from the decorator (it will be called with a reference to the function that is being defined), and also why the original function is returned from the wrap function (the result will be assigned to the function name).

    Using a statement interceptor, you can catch the execution of statements and do some special magic on them. In our case, let's define an interceptor to catch the execution of a statement and log the result using the standard logging module. If you read the wrap function carefully, you probably noted that it uses a closure to access the value of orig when the decorator was called, not the value it happen to have when the wrap function is executed. This feature is very useful since a closure can also be used to get access to the original execute function and call it from within the new function. So, to intercept an execute call and log information about the statement using the logging module, you could use code like this:

    from mysql.connector import MySQLCursor
    original_execute = MySQLCursor.execute
    @intercept(MySQLCursor.execute)
    def my_execute(self, operation, params=None):
        if params is not None:
            stmt = operation % self._process_params(params)
        else:
            stmt = operation
        result = original_execute(self, operation, params)
        logging.debug("Executed '%s', rowcount: %d", stmt, self.rowcount)
        logging.debug("Columns: %s", ', '. join(c[0] for c in self.description))
        return result
    
    Now with this, you could implement your own caching layer to, for example, do a memcached lookup before sending the statement to the server for execution. I leave this as an exercises to the reader, or maybe I'll show you in a later post. &smiley; Implementing a lifecycle interceptor is similar, only that you replace, for example, the commit or rollback calls. However, implementing an exception interceptor is not obvious. Catching the exception is straightforward and can be done using the intercept decorator:
    original_init = ProgrammingError.__init__
    @intercept(ProgrammingError.__init__)
    def catch_error(self, msg, errno):
        logging.debug("This statement didn't work: '%s', errno: %d", msg, errno)
        original_init(self, msg, errno=errno)
    
    However, in order to do something more interesting, such as asking for some additional information from the database, it is necessary to either get hold of the cursor that was used to execute the query, or at least the connection. It is possible to dig through the interpreter stack, or try to override one of the internal methods that Connector/Python uses, but since that is very dependent on the implementation, I will not present that in this post. It would be good if the cursor is passed down to the exception constructor, but this requires some changes to the connector code.

    Even though I have been programming in dynamic languages for decades (literally) it always amaze me how easy it is to accomplish things in these languages. If you are interested in playing around with this code, you can always fetch Connector/Python on Launchpad and try out the examples above. Some links and other assorted references related to this post are:

    Monday, September 26, 2011

    Python Interface to MySQL

    There has been a lot of discussions lately about various non-SQL languages that provide access to databases without having to resort to using SQL. I wondered how difficult it would be to implement such an interface, so as an experiment, I implemented a simple interface in Python that similar to the document-oriented interfaces available elsewhere. The interface generate SQL queries to query the database, but does not require any knowlegdge of SQL to use. The syntax is inspired by JQuery, but since JQuery works with documents, the semantics is slightly different.

    A simple example would look like this:

    from native_db import *
    server = Server(host='127.0.0.1')
    server.test.t1.insert({'more': 3, 'magic': 'just a test', 'count': 0})
    server.test.t1.insert({'more': 3, 'magic': 'just another test', 'count': 0})
    server.test.t1.insert({'more': 4, 'magic': 'quadrant', 'count': 0})
    server.test.t1.insert({'more': 5, 'magic': 'even more magic', 'count': 0})
    for row in server.test.t1.find({'more': 3}):
      print "The magic is:", row['magic']
    server.test.t1.update({'more': 3}, {'count': 'count+1'})
    for row in server.test.t1.find({'more': 3}, ['magic', 'count']):
      print "The magic is:", row['magic'], "and the count is", row['count']
    server.test.t1.delete({'more': 5})
    
    The first line define a server to communicate with, which is simply done by creating a Server object with the necessary parameters. The constructor accepts the normal parameters for Connector/Python (which is what I'm using internally), but the user defaults to whatever getpass.getuser() returns, and the host default to 127.0.0.1, even though I've provided it here.

    After that, the necessary methods are overridden so that server.database.table will refer to the table with name table in database with name database on the given server. One possibility would be to just skip the database and go directly on the table (using some default database name), but since this is just an experiment, I did this instead. After that, there are various methods defined to support searching, inserting, deleting, and updating.

    Since this is intended to be a simple interface, autocommit is on. Each of the functions generate a single SQL statement, so they will be executed atomically if you're using InnoDB.

    table.insert(row)
    This function will insert the contents of the dictionary into the table. using the keys of the dictionary as column names. If the table does not exist, it will be created with a "best effort" guess of what types to use for the columns.
    table.delete(condition)
    This function will remove all rows in the table that matches the supplied dictionary. Currently, only equality mapping is supported, but see below for how it could be extended.
    table.find(condition, fields="*")
    This will search the table and return an iterable to the rows that match condition. If fields is supplied (as a list of field names), only those fields are returned.
    table.update(condition, update)
    This will search for rows matching condition and update each matching row according to the update dictionary. The values of the dictionary is used on the right side of the assignments of the UPDATE statement, so expressions can be given here as strings.

    That's all folks!

    The code is available at http://mats.kindahl.net/python/native_db.py if you're interested in trying it out. The code is very basic, and there's potential for a lot of extensions. If there's interest, I could probably create a repository somewhere.

    Note that this is not a replacement for an ORM library. The intention is not to allow storing arbitrary objects in the database: the intention is to be able to query the database using a Python interface without resorting to using SQL.

    I'm just playing around and testing some things out, and I'm not really sure if there is any interest in anything like this, so what do you think? Personally, I have no problems with using SQL, but since I'm working with MySQL on a daily basis, I'm strongly biased on the subject. For simple jobs, this is probably easier to work with than a "real" SQL interface, but it cannot handle as complex queries as SQL can (at least not without extensions).

    There is a number of open issues for the implementation (this is just a small list of obvious ones):

    Only equality searching supported
    Searching can only be done with equality matches, but it is trivial to extend to support more complex comparisons. To allow more complex conditions, the condition supplied to find, delete, and update can actually be a string, in which case it is used "raw".

    Conditions could be extended to support something like {'more': '>3'}, or a more object-oriented approach would be to support something similar to {'more': operator.gt(3)}.

    No support for indexes
    There's no support for indexes yet, but that can easily be added. The complication is what kind of indexes should be generated.

    For example, right now rows are identified by their content, but if we want unique rows to be handled as a set? Imagine the following (not supported) query where we insert :

    server.test.t1.insert(content with some more=3).find({'more': eq(3)})
    In this case, we have to fetch the row identifiers for the inserted rows to be able to manipulate exactly those rows and none other. Not sure how to do this right now, but auto-inventing a row-identifier would mean that tables lacking it cannot be handled naturally.

    Creating and dropping tables
    The support for creation of tables is to create tables automatically if they do not exist. A simple heuristic is used to figure out the table definition, but this has obvious flaws if later inserts have more fields than the first one.

    To support extending the table, one would have to generate an ALTER TABLE statement to "fix" the table.

    There is no support for dropping tables... or databases.

    Wednesday, July 27, 2011

    Binlog Group Commit Experiments

    Binlog Group Commit Experiments

    It was a while ago since I talked about binary log group commit. I had to spend time on a few other things.

    Since then, Kristian has released a version of binary log group commit that seems to work well. However, for a few reasons that will be outlined below, we decided to do experiments ourselves using the approach that I have described earlier. A very early version of what we will start doing benchmarks on are available at the MySQL labs. We have not done any any benchmarking on this approach before OSCON, so we we'll have to get back on that.

    All of this started with Facebook pointing out a problem in how the group commit interacts with the binary log and proposed a way to handle the binary log group commit by demonstrating a patch to solve the problem.

    What's in the patch

    The patch involves implementing logic for handling binary log group commit and parallel writing of the binary log, including a minor change to the handler protocol by adding a persist callback. The extension of the handler interface is strictly speaking not necessary for the implementation, but it is natural to extend the interface in this manner and I belive that it can be used by storage engines to execute more efficiently). In addition to the new logic, three new options were added and one option was created as an alias of an old option.
    binlog-sync-period=N
    This is just a rename of the old sync-period option, which tell that fsync should be called for the binary log every N events. For many of the old options, it is not clear what they are configuring, so we are adding the binlog- prefix to options that affect the binary log. The old option is kept as an alias for this option.
    binlog-sync-interval=msec
    No transaction commit will wait for more than msec milliseconds before calling fsync on the binary log. If set to zero, it is disabled. You can set both this option and the binlog-sync-period option.
    binlog-trx-committed={COMPLETE,DURABLE}
    A transaction is considered committed when it is either in durable store or when it is completed. If set to DURABLE either binlog-sync-interval or binlog-sync-period has to be non-zero. If they are both zero, transactions will not be flushed to disk and hence they will never be considered durable.
    master-trx-read=={COMPLETE,DURABLE}
    A transaction is read from the binary log when it is completed or when it is durable. If set to DURABLE either binlog-sync-interval or binlog-sync-period has to be non-zero or an error will be generated. If it was possible for both zero, no transactions will ever be read from the binary log and hence never sent out.
    The patch also contain code to eliminate the prepare_commit_mutex as well as moving release of row locks inside InnoDB (not completely applied yet, I will get it there as soon as possible) to the prepare phase. The focus on these changes is that we should maintain consistency, so we have not done any aggressive changes like moving the release of the write locks to the prepare phase: that could possibly lead to inconsistencies.

    Figure 1. Binary log with transaction in different stages
    The main changes are about how a transaction is committed. The details are explained in the previous articles, but for understanding the rest of this blog post, I'll briefly recapitulate how a transaction is committed in this solution. Each transaction pass through three states: prepared, completed (committed to memory), and durable (committed to disk), as seen in Figure 1. The transaction is pushed through these states using the following procedure:
    1. The transaction is first prepared, which is now split into two steps:
      1. In the reserve step, a slot is assigned for the transaction in the binary log and the storage engine is asked check if this transaction can be committed. At this point, the storage engine can abort the transaction if it is unable to fulfill the commit, but if it approves of the commit, the only thing that can abort the transaction after this point is a server crash. This check is currently done using the prepare call. This step is executed with a lock, but is intended to be short.
      2. In the persist step, the persist function is called, which asks the storage engine to persist any data that it need to persist to guarantee that the transaction is fully prepared. After this step is complete, the transaction is fully prepared in the storage engine and in the event of a crash, it will be able to commit the transaction on recovery, if asked to do so. This step is executed without a lock and a storage engine that intend to handle group commit should defer any expensive operations to this step.
    2. To record the decision, the transaction is written to the reserved slot in the binary log. Since the write is done to a dedicated place in the binary log reserved to this transaction, it is not necessary to hold any locks, which means that several threads can write the transaction to the binary log at the same time.
    3. The commit phase is in turn split into two steps:
      1. In the completion step, the thread waits for all preceeding transactions to be fully written to the binary log, after which the transaction is completed, which means that it is logically committed but not necessarily in durable storage.
      2. In the durability, step, the thread waits for the transaction (and all preceeding transactions) to be written to disk. If this does not occur within the given time period, it will itself call fsync for the binary log. This will make all completed transactions durable.
    After this procedure is complete, the transaction is fully committed and the thread can proceed with executing the next statement.

    The different approaches

    So, providing this patch begs the questions: why a third version of binary log group commit? There are three approaches: Facebook's patch (#1), Kristian's patch (#2), and my patch (#3). Before going over the rationale leading to a third version, it is necessary to understand how the Facebook patch and Krisian's patch work on a very high level. If you look at Figure 1, you see a principal diagram showing how the patches work. Both of them maintain a queue of threads with transactions to be written and will ensure that they are written in the correct order to the binary log.

    The Facebook patch ensures that the transactions are written in the correct order by signalling each thread waiting in the queue in the correct order, after which the thread will take a lock on the binary log, append the transaction, and release the lock. When the decision to commit the outstanding transactions are made, fsync() is called. It has turned out that this lock-write-unlock loop can just be executed at a certain speed, which means that as the number threads waiting to write transactions increase, the system choke and is not able to keep up.

    Kristian solves this by designating the first thread in the queue as the leader, and have it write the transactions for all threads in the queue instead of just having each thread do it individually and then broadcast to the other threads, who just return from the commit. This improves performance significantly as can be seen from the figures in the measurements that Mark did. Note, however, that a lock of the binary log is still kept while writing the transactions.

    The approach we are experimenting with goes about this in another way and instead of queueing the data to be written, a place is immediately allocated in the binary log after which the thread proceed to write the data. This means that several threads can at the same time write in parallel to the binary log without needing to keep any locks. There is a need for a lock when allocating space in the binary log, but that is very short. Since the threads can finish writing in different order, it is necessary to keep logic around for deciding when a transaction is committed and when it's not. For details, you can look at the worklog (which is not entirely up to date, but I'll fix that). In this sense, the binary log itself is the queue (there is a queue in the implementation, but this is just for bookkeeping). The important differences leading us to a want to have a look at this third version are:

    • Approaches #1 and #2 keep a lock while writing the binary log while #3 doesn't.
    • Approaches #1 and #2 keep the transactions on the side (in the queue) and write them to the binary log when they are being committed. Approach #3 writes the transactions directly to the binary log, possibly before they are committed.
    Figure 1. Sources of performance problems

    Efficiently using Multiple Cores

    Efficiently using a multi-threaded systems, especially one with multiple cores, is very hard. It requires knowledge of hardware issues, operating systems considerations, algorithms, and some luck. I will not cover all the issues revolving around designing a system for multi-core use, but I will focus on three of the parts that we are considering in this case. We split the sources of performance degradations when committing a transaction into three separate parts: CPU and memory issues, software lock contention, and I/O.
    • The CPU and memory issues has to do with how caches are handled on the CPU level, which can affect performance quite a lot. There some things that can be done, such as avoiding false sharing, handling data alignment, and checking the cache access patterns, but in general, it is hard to add as an afterthought and require quite a lot of work to get right. We are not considering this and view it as static.
    • The I/O can be reduced using either SSDs or use RAID solutions (which does not reduce latency, but improves the throughput and therefore reduce the I/O needed for each transaction). Also, reducing the number of accesses to disk using group commits will improve the situation significantly, which is what we're doing here.
    • To reduce the software lock contention there is only one solution: reduce the time each lock is kept. This can be as simple as moving the lock aquire and release, using atomic primitives instead of locks, but can also require re-designing algorithms to be able to run without locks.
    So, assuming that we reduce the I/O portion of committing a transaction—and only I/O portion—as you can see in Figure 1, the software lock time start to become the problem and we need to start to work on reducing that. To do this, there are not many options except the approach described above. And if we take this approach to reduce lock contention, there's just a few additions to get the group commit as well.

    Given this, it is rational to explore if this solution can solve the group commit problem as good as the other solutions and improve the scalability of the server at the same time.

    Scaling out

    One of the most central uses for replication is to achieve high-availability by duplicating masters and replicate between them to keep both up to date. For this reason, it is important to get the changes over to the other master as fast as possible. In this case, whether the data is durable on the original master or not is of a smaller concern since once the transaction has left the node, a crash will not cause the transaction to disappear since it has already been distributed. This means that for implementing multi-masters, we want replication to send transactions as soon as possible—and maybe even before that—since we can achive high-availablility by propagating the information as widely as possible.

    On the other hand, transactions sent from the master to the slave might need to be durable on the master since otherwise the slave might be moving into an alternative future—a future where this transaction was committed—if the transactions sent to the slave are lost because of a crash. In this case, it is necessary for the master to not send out the transaction before it is in durable store. Having a master that is able to send out both completed transactions and durable transactions at the same time, all based on the requirements of the slave that connects, is a great feature and allow the implementation of both an efficient multi-master solution as well as slaves that does not diverge from the master even in the event of crashes. Currently, a master cannot both deliver transactions that are completed and transactions that are durable at the same time. With the patch presented in this article, it is possible to implement this, but in alternative #1 and #2 described above, all the transactions are kept "on the side" and not written to the binary log until they are being committed. This means that it is harder to support this scenario with the two other alternatives.

    Concluding remarks

    To sum up the discussion above: we are interested in exploring this approach since we think that it provides shorter lock time, hence scales better to multi-core machines, and in addition provide better scale-out capabilities, since it will be possible that the slaves can decide if they want to receive durable or completed transactions. Thanks to all in the community for the great work and discussions on binlog group commit. The next steps will be to benchmark this solution to see how it flies and it would be great to also get some feedback on this approach. As always, we are interested in getting a good and efficent solution that also can be maintained end evolved easily.