实习中一个项目需要从单租户模式改造成多租户,由于项目使用的是MyBatis,目前没有可以直接使用MyBatis来实现多租户的方式,因此决定搜集资料和方案,自己来实现一个DEMO。
我的多租户方案采用的是MyBatis+MyCat。DEMO是基于Spring MVC的web项目。在用户操作过程中获取用户的id信息,利用MyCat强大的注解功能,根据用户id将SQL语句路由到对应该用户的schema或者database去执行。对SQL加注解的实现则交由MyBatis的插件功能完成,通过自定义MyBatis的Interceptor类,拦截要执行的sql语句加上对应注解。这样就实现了数据库的多租户改造。下面分几个部分来说明。
- MyCat 与MySQL设置
MyCat是一个开源的分布式数据库系统,是一个实现了MySQL协议的服务器,前端用户可以把它看作是一个数据库代理,用MySQL客户端工具和命令行访问,而其后端可以用MySQL原生协议与多个MySQL服务器通信,也可以用JDBC协议与大多数主流数据库服务器通信,其核心功能是分表分库,即将一个大表水平分割为N个小表,存储在后端MySQL服务器里或者其他数据库里。MyCat相当于一个逻辑上的大数据库,又N多个物理数据库组成,可以通过各种分库分表规则(rule)将数据存到规则对应的数据库或schema或表中。
MyCat对自身不支持的Sql语句提供了一种解决方案——在要执行的SQL语句前添加额外的一段由注解SQL组织的代码,这样Sql就能正确执行,这段代码称之为“注解”。注解的使用相当于对mycat不支持的sql语句做了一层透明代理转发,直接交给目标的数据节点进行sql语句执行,其中注解SQL用于确定最终执行SQL的数据节点。
注解使用方式如下:
/*!mycat: schema=node1*/真正执行Sql
这样写就可以将该Sql语句放到对应的node1的schema上执行,例如如果要在特定的节点上执行查询,则可以这样写
/*mycat: datanode=dn1*/select count(*) from item;
由于这个项目是根据MyCat的SQL注解来选择在哪个schema或者database上执行的,所以不需要设置rule.xml。
我的数据库设置如下:
create database db02;
CREATE TABLE item (
id INT NOT NULL AUTO_INCREMENT,
value INT NOT NULL default 0,
indate DATETIME NOT NULL default '2000-01-01 00:00:00',
PRIMARY KEY (id)
)AUTO_INCREMENT= 1 ENGINE=InnoDB DEFAULT CHARSET=utf8;
create database db03;
CREATE TABLE item (
id INT NOT NULL AUTO_INCREMENT,
value INT NOT NULL default 0,
indate DATETIME NOT NULL default '2000-01-01 00:00:00',
PRIMARY KEY (id)
)AUTO_INCREMENT= 1 ENGINE=InnoDB DEFAULT CHARSET=utf8;
设置两个数据库分表为db02,db03,两个库中都有item。
MyCat的配置如下:
server.xml
<?xml version="1.0" encoding="UTF-8"?>
<!-- - - Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License. - You
may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0
- - Unless required by applicable law or agreed to in writing, software -
distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the
License for the specific language governing permissions and - limitations
under the License. -->
<!DOCTYPE mycat:server SYSTEM "server.dtd">
<mycat:server xmlns:mycat="http://org.opencloudb/">
<system>
<property name="defaultSqlParser">druidparser</property>
<!-- <property name="useCompression">1</property>--> <!--1为开启mysql压缩协议-->
<!-- <property name="processorBufferChunk">40960</property> -->
<!--
<property name="processors">1</property>
<property name="processorExecutor">32</property>
-->
<!--默认是65535 64K 用于sql解析时最大文本长度 -->
<!--<property name="maxStringLiteralLength">65535</property>-->
<!--<property name="sequnceHandlerType">0</property>-->
<!--<property name="backSocketNoDelay">1</property>-->
<!--<property name="frontSocketNoDelay">1</property>-->
<!--<property name="processorExecutor">16</property>-->
<!--
<property name="mutiNodeLimitType">1</property> 0:开启小数量级(默认) ;1:开启亿级数据排序
<property name="mutiNodePatchSize">100</property> 亿级数量排序批量
<property name="processors">32</property> <property name="processorExecutor">32</property>
<property name="serverPort">8066</property> <property name="managerPort">9066</property>
<property name="idleTimeout">300000</property> <property name="bindIp">0.0.0.0</property>
<property name="frontWriteQueueSize">4096</property> <property name="processors">32</property> -->
<property name="defaultSqlParser">druidparser</property>
<property name="mutiNodeLimitType">1</property>
<property name="serverPort">8066</property>
<property name="managerPort">9066</property>
</system>
<!-- 任意设置登陆 mycat 的用户名,密码,数据库 -->
<user name="test">
<property name="password">test</property>
<property name="schemas">TESTDB</property>
</user>
<user name="root">
<property name="password">root</property>
<property name="schemas">TESTDB</property>
</user>
<user name="user">
<property name="password">user</property>
<property name="schemas">TESTDB</property>
<property name="readOnly">true</property>
</user>
</mycat:server>
server.xml主要是设置登录用户名密码,登录端口之类的信息。
重头戏是schema.xml的设置
<?xml version="1.0"?>
<!DOCTYPE mycat:schema SYSTEM "schema.dtd">
<mycat:schema xmlns:mycat="http://io.mycat/" >
<schema name="TESTDB" checkSQLschema="false" sqlMaxLimit="100">
<table name="item" primaryKey="id" dataNode="dn2,dn3" />
</schema>
<dataNode name="dn2" dataHost="localhost1" database="db02" />
<dataNode name="dn3" dataHost="localhost1" database="db03" />
<dataHost name="localhost1" maxCon="1000" minCon="10" balance="0"
writeType="0" dbType="mysql" dbDriver="native" switchType="1" slaveThreshold="100">
<heartbeat>select user()</heartbeat>
<!-- can have multi write hosts -->
<writeHost host="server" url="127.0.0.1:3306" user="root"
password="root">
<!-- can have multi read hosts -->
<!--<readHost host="hostS2" url="192.168.1.200:3306" user="root" password="xxx" />-->
</writeHost>
</dataHost>
</mycat:schema>
这里配置好两个数据库节点dn2,dn3对应的就是这前面建立的数据库db02,db03.
这样数据库和Mycat就设置好了,我们可以测试一下,向两个库中插入一些数据:
这是db02中的数据,共8条。
db03中的数据,共6条。
这是mycat的逻辑库TESTDB中的数据,可以看到,包含了所有的db02和db03的数据。
再来试试MyCat的注解:
在mycat的逻辑库TESTDB中分别执行以下语句:
mysql> select count(*) from item;
/*mycat: datanode=dn2*/select count(*) from item;
/*mycat: datanode=dn3*/select count(*) from item;
可以看到,注解实实在在地把SQL语句路由到对应的数据库中去执行了,而不加注解的SQL则在整个逻辑库上执行。
2.MyBatis设置插件拦截器。
MyBatis要使用MyCat很方便,SpringBoot下,只需要将对应的url改成MyCat的端口就行了。
MyBatis 允许你在已映射语句执行过程中的某一点进行拦截调用。默认情况下,MyBatis 允许使用插件来拦截的方法调用包括:
- Executor (update, query, flushStatements, commit, rollback,
getTransaction, close, isClosed) - ParameterHandler (getParameterObject, setParameters)
- ResultSetHandler (handleResultSets, handleOutputParameters)
- StatementHandler (prepare, parameterize, batch, update, query)
这些类中方法的细节可以通过查看每个方法的签名来发现,或者直接查看 MyBatis 的发行包中的源代码。 假设你想做的不仅仅是监控方法的调用,那么你应该很好的了解正在重写的方法的行为。 因为如果在试图修改或重写已有方法的行为的时候,你很可能在破坏 MyBatis 的核心模块。 这些都是更低层的类和方法,所以使用插件的时候要特别当心。
通过 MyBatis 提供的强大机制,使用插件是非常简单的,只需实现 Interceptor 接口,并指定了想要拦截的方法签名即可。
在这里为了实现SQL的改造增加注解,Executor通过调度StatementHandler来完成查询的过程,通过调度它的prepare方法预编译SQL,因此我们可以拦截StatementHandler的prepare方法,在此之前完成SQL的重新编写。
package org.apache.ibatis.executor.statement;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import org.apache.ibatis.cursor.Cursor;
import org.apache.ibatis.executor.parameter.ParameterHandler;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.session.ResultHandler;
public interface StatementHandler {
Statement prepare(Connection var1, Integer var2) throws SQLException;
void parameterize(Statement var1) throws SQLException;
void batch(Statement var1) throws SQLException;
int update(Statement var1) throws SQLException;
<E> List<E> query(Statement var1, ResultHandler var2) throws SQLException;
<E> Cursor<E> queryCursor(Statement var1) throws SQLException;
BoundSql getBoundSql();
ParameterHandler getParameterHandler();
}
以上的任何方法都可以拦截。从接口定义而言,Prepare方法有一个参数Connection对象,因此按如下代码设计拦截器:
package com.mycat.interceptor;
import com.mycat.com.mycat.utils.SessionUtil;
import com.mycat.controller.TestController;
import org.apache.ibatis.cache.CacheKey;
import org.apache.ibatis.executor.Executor;
import org.apache.ibatis.executor.statement.StatementHandler;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import javax.websocket.Session;
import java.sql.Connection;
import java.util.Properties;
@Intercepts(value = {
@Signature(type = StatementHandler.class,
method = "prepare",
args = {Connection.class,Integer.class})})
public class MyInterceptor implements Interceptor {
private static final String preState="/*!mycat:datanode=";
private static final String afterState="*/";
public Object intercept(Invocation invocation) throws Throwable {
StatementHandler statementHandler=(StatementHandler)invocation.getTarget();
MetaObject metaStatementHandler=SystemMetaObject.forObject(statementHandler);
Object object=null;
//分离代理对象链
while(metaStatementHandler.hasGetter("h")){
object=metaStatementHandler.getValue("h");
metaStatementHandler=SystemMetaObject.forObject(object);
}
statementHandler=(StatementHandler)object;
String sql=(String)metaStatementHandler.getValue("delegate.boundSql.sql");
/*
String node=(String)TestController.threadLocal.get(); #通过ThreadLocal获取用户信息
*/
String node=(String)SessionUtil.getSession().getAttribute("corp"); #通过session获取用户corp信息
if(node!=null) {
sql = preState + node + afterState + sql;
}
System.out.println("sql is "+sql);
metaStatementHandler.setValue("delegate.boundSql.sql",sql);
Object result = invocation.proceed();
System.out.println("Invocation.proceed()");
return result;
}
public Object plugin(Object target) {
return Plugin.wrap(target, this);
}
public void setProperties(Properties properties) {
this.properties=properties;
}
}
简单说明一下:
intercept 真个是插件真正运行的方法,它将直接覆盖掉你真实拦截对象的方法。里面有一个Invocation对象,利用它可以调用你原本要拦截的对象的方法
plugin 它是一个生成动态代理对象的方法,
setProperties 它是允许你在使用插件的时候设置参数值。
@Intercepts(value = {
@Signature(type = StatementHandler.class,
method = "prepare",
args = {Connection.class,Integer.class})})
这段说明了要拦截的目标类和方法以及参数。
StatementHandler是一个借口,真实的对象是RoutingStatementHandler,但它不是真实的服务对象,里面的delegate是StatementHandler中真实的StatementHandler实现的类,有多种,它里面的boundSql中存储着SQL语句。具体的可以参考MyBatis的源码。
MetaObject是一个工具类,由于MyBatis四大对象提供的public设置参数的方法很少,难以通过自身得到相关属性信息,但是有个MetaObject这个工具类就可以通过其他的技术手段来读取和修改这些重要对象的属性。
SessionUtil的getSession方法是用来获取之前用户登录时获得的记录在session中的corp信息,根据这个信息拼装SQL注解达到多租户的目的。
Interceptor写好后,写入到mybatis.xml的plugin中
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE configuration
PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
<settings>
<!-- 打印查询语句 -->
<setting name="logImpl" value="STDOUT_LOGGING" />
</settings>
<typeAliases>
<typeAlias alias="Item" type="com.mycat.pojo.Item"/>
</typeAliases>
<plugins>
<plugin interceptor="com.mycat.interceptor.MyInterceptor">
</plugin>
</plugins>
</configuration>
3.实际运行
写一个实体类Item.
package com.mycat.pojo;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
@Data
public class Item implements Serializable {
int id;
int value;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
public Date getIndate() {
return indate;
}
public void setIndate(Date indate) {
this.indate = indate;
}
Date indate;
}
写一个mapper
package com.mycat.mapper;
import com.mycat.pojo.Item;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
@Mapper
public interface ItemMapper {
public Item getOne(@Param("id") int id);
public Integer count();
public void insert(Item item);
}
以及对应的itemMapper.xml,配置好sql语句。
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.mycat.mapper.ItemMapper">
<cache />
<select id="getOne" resultType="com.mycat.pojo.Item">
select * from item where id=#{id};
</select>
<!-- 为了保证mycat分库的操作会进行,同一查询会重新执行而不是在sqlSession中查找,需要加上flushCache="true"-->
<select id="count" flushCache="true" resultType="java.lang.Integer">
select count(*) from item;
</select>
<insert id="insert" parameterType="Item" keyProperty="id" useGeneratedKeys="true">
insert into item (id,value)
values (#{id},#{value});
</insert>
</mapper>
这里还是来测试count方法。
写一个controller
package com.mycat.controller;
import com.mycat.com.mycat.utils.SessionUtil;
import com.mycat.mapper.ItemMapper;
import com.mycat.pojo.Item;
import org.apache.ibatis.plugin.Plugin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.servlet.http.HttpSession;
@Controller
public class TestController {
@Autowired
ItemMapper itemMapper;
@GetMapping("/{id}")
@ResponseBody
public Object test(@PathVariable("id") int id){
System.out.println(id);
return itemMapper.getOne(id);
}
//简化,直接通过这里设置session
@GetMapping("/set/{sess}")
@ResponseBody
public Object setSession(@PathVariable("sess") String sess){
HttpSession httpSession=SessionUtil.getSession();
httpSession.setAttribute("corp",sess);
return "ok";
}
@GetMapping("/count")
@ResponseBody
public Object getCount(){
//要测试的方法
return itemMapper.count();
}
@PostMapping("/add")
@ResponseBody
public Object add(Item item){
itemMapper.insert(item);
return 1;
}
}
首先通过localhost:8080/set/{sess}设置session,假设session设置为dn3.
浏览器中输入localhost:8080/set/dn3.
之后,输入localhost:8080/count.
结果如下:
来看下打印的sql语句:
可以看到,SQL注解已经成功添加进去了。
在设置session为dn2.
结果如下。
打印的sql语句:
至此,一个MyBatis多租户的DEMO就完成了。完整代码可以查看这里