JAVA通过Gearman实现MySQL到Redis的数据同步(异步复制)

MySQL 到 Redis 数据复制方案

无论 MySQL 还是 Redis,自身都带有数据同步的机制,像比较常用的 MySQL 的 Master/Slave 模式 ,就是由 Slave 端分析 Master 的 binlog 来实现的,这样的数据复制其实还是一个异步过程,只不过当服务器都在同一内网时,异步的延迟几乎可以忽略。

那么理论上我们也可以用同样方式,分析 MySQL 的 binlog 文件并将数据插入 Redis。但是这需要对 binlog 文件以及 MySQL 有非常深入的理解,同时由于 binlog 存在 Statement/Row/Mixedlevel 多种形式 ,分析 binlog 实现同步的工作量是非常大的。

因此这里选择了一种开发成本更加低廉的方式,借用已经比较成熟的 MySQL UDF,将 MySQL 数据首先放入 Gearman 中,然后通过一个自己编写的 PHP Gearman Worker,将数据同步到 Redis。比分析 binlog 的方式增加了不少流程,但是实现成本更低,更容易操作。

Gearman 的安装与使用

Gearman 是一个支持分布式的任务分发框架。设计简洁,获得了非常广泛的支持。一个典型的 Gearman 应用包括以下这些部分:

 

  • Gearman Job Server:Gearman 核心程序,需要编译安装并以守护进程形式运行在后台

  • Gearman Client:可以理解为任务的收件员,比如我要在后台执行一个发送邮件的任务,可以在程序中调用一个 Gearman Client 并传入邮件的信息,然后就可以将执行结果立即展示给用户,而任务本身会慢慢在后台运行。

  • Gearman Worker:任务的真正执行者,一般需要自己编写具体逻辑并通过守护进程方式运行,Gearman Worker 接收到 Gearman Client 传递的任务内容后,会按顺序处理。

以前曾经介绍过类似的 后台任务处理项目 Resque 。两者的设计其实非常接近,简单可以类比为:

  • Gearman Job Server:对应 Resque 的 Redis 部分

  • Gearman Client:对应 Resque 的 Queue 操作

  • Gearman Worker:对应 Resque 的 Worker 和 Job

这里之所以选择 Gearman 而不是 Resque 是因为 Gearman 提供了比较好用的 MySQL UDF,工作量更小。

 

1、安装依赖

 yum install -y boost-devel gperf libevent-devel libuuid-devel

 yum install mysql-devel -y

2、下载 gearman

 wget https://launchpad.net/gearmand/1.2/1.1.12/+download/gearmand-1.1.12.tar.gz

3、编译安装,指定 mysqlclient 的链接路径

  tar -zxvf gearmand-1.1.12.tar.gz 

  cd gearmand-1.1.12

   ./configure  

make && make install

 

4、启动 gearmand 服务端 (启动之时,在 /var/log/ 下创建 gearmand.log 日志文件。-l 指定日志文件  -d 后台运行 -L 0.0.0.0 绑定到 IPV4

gearmand -L 0.0.0.0 -l /var/log/gearmand.log -d

5、查看是否启动成功

ps -ef | grep gearman

6、查看是否安装成功,查看 gearman 版本信息

gearmand -V

 

7、MySQL UDF + Trigger 同步数据到 Gearman (https://github.com/mysqludf)

安装 lib_mysqludf_json(lib_mysqludf_json 可以把 MySQL 表的数据以 json 数据格式输出)

wget https://github.com/mysqludf/lib_mysqludf_json/archive/master.zip

unzip master.zip

cd lib_mysqludf_json-master/

rm -rf lib_mysqludf_json.so

8、编译 mysql_config 这是 mysql 的配置文件,可以 find /usr -name mysql_config 搜索下在什么位置

gcc $(/usr/local/mysql/bin/mysql_config  --cflags) -shared -fPIC -o lib_mysqludf_json.so lib_mysqludf_json.c

9、拷贝 lib_mysqludf_json.so 到 MySQL 的 plugin 目录

(可以登陆 MySQL,输入命令 "show variables like'%plugin%'" 查看 plugin 位置)

cp lib_mysqludf_json.so /usr/local/mysql/lib/plugin/

 

演示 lib_mysqludf_json 功能

登录 mysql

mysql -uroot -h127.0.0.1 -p

注册 UDF 函数

CREATE FUNCTION json_object RETURNS STRING SONAME "lib_mysqludf_json.so";

CREATE FUNCTION json_array RETURNS STRING SONAME "lib_mysqludf_json.so";

CREATE FUNCTION json_members RETURNS STRING SONAME "lib_mysqludf_json.so";

CREATE FUNCTION json_values RETURNS STRING SONAME "lib_mysqludf_json.so";

//json_array|json_members|json_values 函数注册方式与 json_object 一样.

select json_object(id,file_save_type,base_dir) as sys_file_save_config from sys_file_save_config;

ERROR 1123 (HY000): Can't initialize function'json_object'; Invalid json member name - name cannot be empty

以上错误这样解决,给每个成员名称使用别名即可:

select json_object(id as id ,file_save_type as fileSaveType,app_id as appID) as sys_file_save_config from sys_file_save_config;

 

 

10、安装 gearman-mysql-udf (https://launchpad.net/gearman-mysql-udf)

  wget https://launchpad.net/gearman-mysql-udf/trunk/0.6/+download/gearman-mysql-udf-0.6.tar.gz

  tar zxvf gearman-mysql-udf-0.6.tar.gz 

   cd gearman-mysql-udf-0.6

11、安装 libgearman-devel

   yum install libgearman-devel -y

   如果没有 yum 源,添加 epel.repo yum 源

  [epel]

name=Extra Packages for Enterprise Linux 6 - $basearch

#baseurl=http://download.fedoraproject.org/pub/epel/6/$basearch

mirrorlist=https://mirrors.fedoraproject.org/metalink?repo=epel-6&arch=$basearch

failovermethod=priority

enabled=1

gpgcheck=1

gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-6

 

[epel-debuginfo]

name=Extra Packages for Enterprise Linux 6 - $basearch - Debug

#baseurl=http://download.fedoraproject.org/pub/epel/6/$basearch/debug

mirrorlist=https://mirrors.fedoraproject.org/metalink?repo=epel-debug-6&arch=$basearch

failovermethod=priority

enabled=0

gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-6

gpgcheck=1

 

[epel-source]

name=Extra Packages for Enterprise Linux 6 - $basearch - Source

#baseurl=http://download.fedoraproject.org/pub/epel/6/SRPMS

mirrorlist=https://mirrors.fedoraproject.org/metalink?repo=epel-source-6&arch=$basearch

failovermethod=priority

enabled=0

gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-6

gpgcheck=1

    

12、编译安装

(可以登陆 MySQL,输入命令 "show variables like'%plugin%'" 查看 plugin 位置, mysql_config 的配置文件,以及插件库所在路径,编译之后会在此路径生成.so 文件)

./configure --with-mysql=/usr/local/mysql/bin/mysql_config --libdir=/usr/local/mysql/lib/plugin/

make && make install

 

演示 gearman-mysql-udf 功能

mysql -uroot -p

CREATE FUNCTION gman_do_background RETURNS STRING SONAME "libgearman_mysql_udf.so";

CREATE FUNCTION gman_servers_set RETURNS STRING SONAME "libgearman_mysql_udf.so"; 

CREATE FUNCTION gman_do RETURNS STRING SONAME "libgearman_mysql_udf.so"; 

CREATE FUNCTION gman_do_high RETURNS STRING SONAME "libgearman_mysql_udf.so"; 

CREATE FUNCTION gman_do_low RETURNS STRING SONAME "libgearman_mysql_udf.so"; 

CREATE FUNCTION gman_do_high_background RETURNS STRING SONAME "libgearman_mysql_udf.so"; 

CREATE FUNCTION gman_do_low_background RETURNS STRING SONAME "libgearman_mysql_udf.so"; 

CREATE FUNCTION gman_sum RETURNS STRING SONAME "libgearman_mysql_udf.so"; 

// 函数 gman_do|gman_do_high|gman_do_low|gman_do_high_background|gman_do_low_background|gman_sum 注册方式类似,请参考 gearman-mysql-udf-0.6/README 

// 指定 gearman job server 地址 

SELECT gman_servers_set('127.0.0.1:4730'); 

 

如果出现异常信息:

ERROR 1126 (HY000): Can't open shared library'libgearman_mysql_udf.so' (errno: 11 libgearman.so.8: cannot open shared object file: No such file or directory)

表示系统找不到 libgearman.so 文件,一般 so 都在 /usr/local/lib 目录下,修改配置文件 /etc/ld.so.conf,将 /usr/local/lib 目录加入进去即可:

$ cat /etc/ld.so.conf

include ld.so.conf.d/*.conf

/usr/local/lib

$ /sbin/ldconfig -v | grep gearman*

 

13、MySQL Trigger 调用 Gearman UDF 实现同步

创建触发器

DELIMITER $$

CREATE TRIGGER test_data_to_redis AFTER UPDATE ON test FOR EACH ROW BEGIN

    SET@ret=gman_do_background('syncToRedis', json_object(NEW.id AS `id`, NEW.phone AS`phone`));

END$$;

 

DELIMITER $$

CREATE TRIGGER test_data_to_redis2 AFTER INSERT ON test

  FOR EACH ROW BEGIN

    SET @ret=gman_do_background('syncToRedis2', json_object(NEW.id AS `id`, NEW.phone AS`phone`)); 

  END$$

DELIMITER ;

 

DELIMITER $$

CREATE TRIGGER test_data_to_redis3 BEFORE DELETE ON test

  FOR EACH ROW BEGIN

    SET @ret=gman_do_background('syncToRedis3', json_object(OLD.id AS `id`, OLD.phone AS`phone`)); 

  END$$

DELIMITER ;

 

  说明以及问题:此类采用了 gearman 官网的 java-gearman-service(地址:https://launchpad.net/gearman-java),目前 release 版本是 0.6.6。java-gearman-servic.jar 包中,即包括 gearman server,还包括 client 和 work 客户端 API。

  问题:config 类为 spring 注入的配置文件类,在 worker.addFunction 中,如果通过 config 类的属性,并且属性是从配置文件来的就会有问题。不知道为啥,写死就是 OK 的。此类连接远程的 gearman job server。

  

  jar 包需要添加到本地 jar 仓库:

mvn install:install-file -Dfile=C:\software\java-gearman-service-0.6.6.jar -DgroupId=org.gearman.jgs -DartifactId=java-gearman-service -Dversion=0.6.6 -Dpackaging=jar

 

import java.util.concurrent.TimeUnit;

 

import org.gearman.Gearman;

import org.gearman.GearmanFunction;

import org.gearman.GearmanFunctionCallback;

import org.gearman.GearmanServer;

import org.gearman.GearmanWorker;

 

/**

 * *ECHO_HOST = "192.168.125.131" 为安装了 Gearman 并开启 geramand 服务的主机地址

 *int ECHO_PORT = 4730 默认端口为 4730

 *

 * @author Administrator

 *

 */

public class EchoWorker implements GearmanFunction {

 

// function name

public static final String ECHO_FUNCTION_NAME = "syncToRedis";

 

// job server 地址

public static final String ECHO_HOST = "192.168.1.245";

 

// job server 监听的端口

public static final int ECHO_PORT = 4730;

 

public static void main(String[] args) {

// 创建一个 Gearman 实例

Gearman gearman = Gearman.createGearman();

/*

 * 创建一个 jobserver

 * 

 * Parameter 1: job server 的 IP 地址 Parameter 2: job server 监听的端口

 * 

 * job server 收到 client 的 job,并将其分发给注册 worker

 * 

 */

GearmanServer server = gearman.createGearmanServer(EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);

// 创建一个 Gearman 的 worker

GearmanWorker worker = gearman.createGearmanWorker(); // 正题来了,创建 work 节点。

worker.setReconnectPeriod(2, TimeUnit.SECONDS); // 设置超时重连时间

worker.setMaximumConcurrency(5); // 最大并发数

// 告诉工人如何执行工作 (主要实现了 GearmanFunction 接口)

worker.addFunction(EchoWorker.ECHO_FUNCTION_NAME, new EchoWorker());

// worker 连接服务器

worker.addServer(server);

}

 

@Override

public byte[] work(String function, byte[] data, GearmanFunctionCallback callback) throws Exception {

// work 方法实现了 GearmanFunction 接口中的 work 方法, 本实例中进行了字符串的反写

if (data != null) {

String str = new String(data);

System.out.println(str);

StringBuffer sb = new StringBuffer(str);

return sb.reverse().toString().getBytes();

} else {

return "未接收到 data".getBytes();

}

}

}

 

 

import org.gearman.Gearman;  

import org.gearman.GearmanClient;  

import org.gearman.GearmanJobEvent;  

import org.gearman.GearmanJobReturn;  

import org.gearman.GearmanServer;  

  

public class EchoClient {  

    public static void main(String... args) throws InterruptedException {  

            // 创建一个 Gearman 实例  

            Gearman gearman = Gearman.createGearman();  

            // 创建一个 Gearman client               

            GearmanClient client = gearman.createGearmanClient();  

            /*  

             * 创建一个 jobserver  

             *   

             * Parameter 1: job server 的 IP 地址  

             * Parameter 2: job server 监听的端口  

             *   

             *job server 收到 client 的 job,并将其分发给注册 worker  

             *  

             */  

            GearmanServer server = gearman.createGearmanServer(  

                            EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);  

             // 告诉客户端,提交工作时它可以连接到该服务器  

            client.addServer(server);  

            /*  

             * 向 job server 提交工作  

             *   

             * Parameter 1: gearman function 名字  

             * Parameter 2: 传送给 job server 和 worker 的数据  

             *   

             * GearmanJobReturn 返回 job 发热结果  

             */  

            GearmanJobReturn jobReturn = client.submitJob(  

                            EchoWorker.ECHO_FUNCTION_NAME, ("Hello World!").getBytes());  

            // 遍历作业事件,直到我们打到最后文件               

            while (!jobReturn.isEOF()) {  

  

                    // 下一个作业事件  

                    GearmanJobEvent event = jobReturn.poll();  

  

                    switch (event.getEventType()) {  

  

                    case GEARMAN_JOB_SUCCESS:     //job 执行成功  

                            System.out.println(new String(event.getData()));  

                            break;  

                    case GEARMAN_SUBMIT_FAIL:     //job 提交失败  

                          

                    case GEARMAN_JOB_FAIL:        //job 执行失败  

                            System.err.println(event.getEventType() + ":"  

                                            + new String(event.getData()));  

                    default:  

                    }  

            }  

            // 关闭  

            gearman.shutdown();  

    }  

}  

 

http://gearman.org/download/

php 方案:https://www.tuicool.com/articles/B7Jjaa

 

更多好文章分享