這篇文章主要介紹“怎么使用sharding-jdbc讀寫分離”,在日常操作中,相信很多人在怎么使用sharding-jdbc讀寫分離問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”怎么使用sharding-jdbc讀寫分離”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
核心概念
主庫:添加、更新以及刪除數據操作
從庫:查詢數據操作所使用的數據庫,可支持多從庫
一主多從讀寫分離,多主多從需用使用sharding
源碼分析
1.啟動入口:
public class JavaConfigurationExample {
// private static ShardingType shardingType = ShardingType.SHARDING_DATABASES;
// private static ShardingType shardingType = ShardingType.SHARDING_TABLES;
// private static ShardingType shardingType = ShardingType.SHARDING_DATABASES_AND_TABLES;
private static ShardingType shardingType = ShardingType.MASTER_SLAVE;
// private static ShardingType shardingType = ShardingType.SHARDING_MASTER_SLAVE;
// private static ShardingType shardingType = ShardingType.SHARDING_VERTICAL;
public static void main(final String[] args) throws SQLException {
DataSource dataSource = DataSourceFactory.newInstance(shardingType);
CommonService commonService = getCommonService(dataSource);
commonService.initEnvironment();
commonService.processSuccess();
commonService.cleanEnvironment();
}
private static CommonService getCommonService(final DataSource dataSource) {
return new CommonServiceImpl(new OrderRepositoryImpl(dataSource), new OrderItemRepositoryImpl(dataSource));
}
}2.以sharding-jdbc為例,配置主從讀寫分離代碼如下:
@Override
public DataSource getDataSource() throws SQLException {
//主從配置
MasterSlaveRuleConfiguration masterSlaveRuleConfig = new MasterSlaveRuleConfiguration(/*主從命名*/"demo_ds_master_slave", /*主庫*/"demo_ds_master", /*從庫*/Arrays.asList("demo_ds_slave_0", "demo_ds_slave_1"));
//打印sql
Properties props = new Properties();
props.put("sql.show", true);
//創建MasterSlaveDataSource數據源
return MasterSlaveDataSourceFactory.createDataSource(createDataSourceMap(), masterSlaveRuleConfig, props);
}
private Map<String, DataSource> createDataSourceMap() {
Map<String, DataSource> result = new HashMap<>();
//主庫
result.put("demo_ds_master", DataSourceUtil.createDataSource("demo_ds_master"));
//兩從庫
result.put("demo_ds_slave_0", DataSourceUtil.createDataSource("demo_ds_slave_0"));
result.put("demo_ds_slave_1", DataSourceUtil.createDataSource("demo_ds_slave_1"));
return result;
}創建sharding主從數據源MasterSlaveDataSource
public MasterSlaveDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig, final Properties props) throws SQLException {
super(dataSourceMap);
//緩存mysql元數據
cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSourceMap);
//主從規則配置
this.masterSlaveRule = new MasterSlaveRule(masterSlaveRuleConfig);
//主從sql解析
parseEngine = new MasterSlaveSQLParseEntry(getDatabaseType());
shardingProperties = new ShardingProperties(null == props ? new Properties() : props);
}3.執行insert插入方法
@Override
public Long insert(final Order order) throws SQLException {
String sql = "INSERT INTO t_order (user_id, status) VALUES (?, ?)";
//獲取MasterSlaveDataSource數據源連接,同時創建MasterSlavePreparedStatement
//這里有兩個Statement分別含義
//1.MasterSlaveStatement:執行sql時候才路由
//2.MasterSlavePreparedStatement:創建Statement時就路由
//Statement.RETURN_GENERATED_KEYS 自動生成主鍵并返回生成的主鍵
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
preparedStatement.setInt(1, order.getUserId());
preparedStatement.setString(2, order.getStatus());
//MasterSlavePreparedStatement執行sql
preparedStatement.executeUpdate();
try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) {
if (resultSet.next()) {
order.setOrderId(resultSet.getLong(1));
}
}
}
return order.getOrderId();
}獲取數據庫連接MasterSlaveConnection->AbstractConnectionAdapter#getConnection
/**
* Get database connection.
*
* @param dataSourceName data source name
* @return database connection
* @throws SQLException SQL exception
*/
//MEMORY_STRICTLY:Proxy會保持一個數據庫中所有被路由到的表的連接,這種方式的好處是利用流式ResultSet來節省內存
//
//CONNECTION_STRICTLY:代理在取出ResultSet中的所有數據后會釋放連接,同時,內存的消耗將會增加
//
public final Connection getConnection(final String dataSourceName) throws SQLException {
return getConnections(ConnectionMode.MEMORY_STRICTLY, dataSourceName, 1).get(0);
}
/**
* Get database connections.
*
* @param connectionMode connection mode
* @param dataSourceName data source name
* @param connectionSize size of connection list to be get
* @return database connections
* @throws SQLException SQL exception
*/
public final List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
//獲取數據源
DataSource dataSource = getDataSourceMap().get(dataSourceName);
Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName);
Collection<Connection> connections;
//并發從cache中獲取連接
synchronized (cachedConnections) {
connections = cachedConnections.get(dataSourceName);
}
List<Connection> result;
//如果cache中連接數大于指定連接數時,返回指定連接數量
if (connections.size() >= connectionSize) {
result = new ArrayList<>(connections).subList(0, connectionSize);
} else if (!connections.isEmpty()) {
result = new ArrayList<>(connectionSize);
result.addAll(connections);
//創建缺少的指定連接數
List<Connection> newConnections = createConnections(dataSourceName, connectionMode, dataSource, connectionSize - connections.size());
result.addAll(newConnections);
synchronized (cachedConnections) {
cachedConnections.putAll(dataSourceName, newConnections);
}
} else {
result = new ArrayList<>(createConnections(dataSourceName, connectionMode, dataSource, connectionSize));
synchronized (cachedConnections) {
cachedConnections.putAll(dataSourceName, result);
}
}
return result;
}
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
private List<Connection> createConnections(final String dataSourceName, final ConnectionMode connectionMode, final DataSource dataSource, final int connectionSize) throws SQLException {
//為1時不存在并發獲取連接情況,直接返回單個連接
if (1 == connectionSize) {
return Collections.singletonList(createConnection(dataSourceName, dataSource));
}
//TODO 不處理并發
if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {
return createConnections(dataSourceName, dataSource, connectionSize);
}
//并發
synchronized (dataSource) {
return createConnections(dataSourceName, dataSource, connectionSize);
}
}
private List<Connection> createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize) throws SQLException {
List<Connection> result = new ArrayList<>(connectionSize);
for (int i = 0; i < connectionSize; i++) {
try {
result.add(createConnection(dataSourceName, dataSource));
} catch (final SQLException ex) {
for (Connection each : result) {
each.close();
}
throw new SQLException(String.format("Could't get %d connections one time, partition succeed connection(%d) have released!", connectionSize, result.size()), ex);
}
}
return result;
}
private Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
//判斷是否是sharding事物
Connection result = isInShardingTransaction() ? shardingTransactionManager.getConnection(dataSourceName) : dataSource.getConnection();
replayMethodsInvocation(result);
return result;
}預準備路由并緩存Statement
public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final int autoGeneratedKeys) throws SQLException {
this.connection = connection;
//創建router對象
masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule(), connection.getParseEngine(),
connection.getMasterSlaveDataSource().getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW));
//緩存路由后的Statement,useCache緩存解析后的sql Statement
for (String each : masterSlaveRouter.route(sql, true)) {
//獲取數據庫連接
PreparedStatement preparedStatement = connection.getConnection(each).prepareStatement(sql, autoGeneratedKeys);
routedStatements.add(preparedStatement);
}
}執行MasterSlaveRouter#route方法獲取路由庫
public Collection<String> route(final String sql, final boolean useCache) {
//解析sql,這里不分析sql如何使用antlr4解析
Collection<String> result = route(parseEngine.parse(sql, useCache));
//是否打印sql
if (showSQL) {
SQLLogger.logSQL(sql, result);
}
return result;
}
private Collection<String> route(final SQLStatement sqlStatement) {
//判斷是否master
if (isMasterRoute(sqlStatement)) {
//設置當前線程是否允許訪問主庫
MasterVisitedManager.setMasterVisited();
//返回主庫
return Collections.singletonList(masterSlaveRule.getMasterDataSourceName());
}
//根據配置的算法獲取從庫,兩種算法:
//1、隨機
//2、輪詢
return Collections.singletonList(masterSlaveRule.getLoadBalanceAlgorithm().getDataSource(
masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames())));
}執行MasterSlavePreparedStatement#executeUpdate
@Override
public int executeUpdate() throws SQLException {
int result = 0;
//從本地緩存遍歷執行
for (PreparedStatement each : routedStatements) {
result += each.executeUpdate();
}
return result;
}4.獲取從庫算法策略
隨機算法
@Getter
@Setter
public final class RandomMasterSlaveLoadBalanceAlgorithm implements MasterSlaveLoadBalanceAlgorithm {
private Properties properties = new Properties();
@Override
public String getType() {
return "RANDOM";
}
@Override
public String getDataSource(final String name, final String masterDataSourceName, final List<String> slaveDataSourceNames) {
//從slave.size()中獲取一個隨機數
return slaveDataSourceNames.get(new Random().nextInt(slaveDataSourceNames.size()));
}
}輪詢算法
@Getter
@Setter
public final class RoundRobinMasterSlaveLoadBalanceAlgorithm implements MasterSlaveLoadBalanceAlgorithm {
//并發map
private static final ConcurrentHashMap<String, AtomicInteger> COUNTS = new ConcurrentHashMap<>();
private Properties properties = new Properties();
@Override
public String getType() {
return "ROUND_ROBIN";
}
@Override
public String getDataSource(final String name, final String masterDataSourceName, final List<String> slaveDataSourceNames) {
//查看對應名稱的計數器,沒有則初始化一個
AtomicInteger count = COUNTS.containsKey(name) ? COUNTS.get(name) : new AtomicInteger(0);
COUNTS.putIfAbsent(name, count);
// 采用cas輪詢,如果計數器長到slave.size(),那么歸零(防止計數器不斷增長下去)
count.compareAndSet(slaveDataSourceNames.size(), 0);
//絕對值,計數器%slave.size()取模
return slaveDataSourceNames.get(Math.abs(count.getAndIncrement()) % slaveDataSourceNames.size());
}
}默認算法
SPI擴展機制,load加載第一個算法作為默認算法;ss默認是隨機
到此,關于“怎么使用sharding-jdbc讀寫分離”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。