這篇文章主要介紹了Java Zookeeper分布式分片算法源碼分析的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇Java Zookeeper分布式分片算法源碼分析文章都會有所收獲,下面我們一起來看看吧。
公司的一個服務需要做類似于分片的邏輯,一開始服務基于傳統部署方式通過本地配置文件配置的方式就可以指定該機器服務的分片內容如:0,1,2,3,隨著系統的升級迭代,該服務進行了容器化部署,所以原來基于本地配置文件各自配置分片數據的方式就不適用了,原來的部署方式使得服務是有狀態,是一種非云原生的方式,所以該服務要重新設計實現一套分布式服務分片
邏輯。
要實現分布式服務分片的能力,需要有一個分布式中間件,如:Redis
,Mysql
,Zookeeper
等等都可以,我們選用Zookeeper
。
使用Zookeeper
主要是基于Zookeeper
的臨時節點和節點變化監聽機制,具體的技術設計如下:
Zookeeper
的數據存儲結構類似于目錄,服務注冊后的目錄類似如下結構:
解釋下該目錄結構,首先/xxxx/xxxx/sharding
是區別于其他業務的的目錄,該目錄節點是持久的,service
是服務目錄,標識一個服務,該節點也是持久的,ip1
,ip2
是該服務注冊到Zookeeper
的機器列表節點,該節點是臨時節點。
/xxxx/xxxx/sharding/service/ip1
-----|----|--------|-------/ip2
服務啟動,創建CuratorFramework
客戶端,設置客戶端連接狀態監聽;
向Zookeeper
注冊該機器的信息,這里設計簡單,機器信息就是ip
地址;
注冊機器信息后,從Zookeeper
獲取所有注冊信息;
根據Zookeeper
獲取的所有注冊機器信息根據分片算法進行分片計算。
ZookeeperConfig
Zookeeper
的配置信息
@Data public class ZookeeperConfig { /** * zk集群地址 */ private String zkAddress; /** * 注冊服務目錄 */ private String nodePath; /** * 分片的服務名 */ private String serviceName; /** * 分片總數 */ private Integer shardingCount; public ZookeeperConfig(String zkAddress, String nodePath, String serviceName, Integer shardingCount) { this.zkAddress = zkAddress; this.nodePath = nodePath; this.serviceName = "/" + serviceName; this.shardingCount = shardingCount; } /** * 等待重試的間隔時間的初始值. * 單位毫秒. */ private int baseSleepTimeMilliseconds = 1000; /** * 等待重試的間隔時間的最大值. * 單位毫秒. */ private int maxSleepTimeMilliseconds = 3000; /** * 最大重試次數. */ private int maxRetries = 3; /** * 會話超時時間. * 單位毫秒. */ private int sessionTimeoutMilliseconds; /** * 連接超時時間. * 單位毫秒. */ private int connectionTimeoutMilliseconds; }
InstanceInfo注冊機器
@AllArgsConstructor @EqualsAndHashCode() public class InstanceInfo { private String ip; public String getInstance() { return ip; } }
ZookeeperShardingService分片服務
@Slf4j public class ZookeeperShardingService { public final Map<String, List<Integer>> caches = new HashMap<>(16); private final CuratorFramework client; private final ZookeeperConfig zkConfig; private final ShardingStrategy shardingStrategy; private final InstanceInfo instanceInfo; private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1); public ZookeeperShardingService(ZookeeperConfig zkConfig, ShardingStrategy shardingStrategy) { this.zkConfig = zkConfig; log.info("開始初始化zk, ip列表是: {}.", zkConfig.getZkAddress()); CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(zkConfig.getZkAddress()) .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds())); if (0 != zkConfig.getSessionTimeoutMilliseconds()) { builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds()); } if (0 != zkConfig.getConnectionTimeoutMilliseconds()) { builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds()); } this.shardingStrategy = shardingStrategy; HostInfo host = new HostInfo(); this.instanceInfo = new InstanceInfo(host.getAddress()); client = builder.build(); client.getConnectionStateListenable().addListener(new ConnectionListener()); client.start(); try { COUNT_DOWN_LATCH.await(); } catch (InterruptedException e) { e.printStackTrace(); } // 注冊服務節點監聽 registerPathChildListener(zkConfig.getNodePath() + zkConfig.getServiceName(), new ChildrenPathListener()); try { if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) { client.close(); throw new KeeperException.OperationTimeoutException(); } } catch (final Exception ex) { ex.printStackTrace(); throw new RuntimeException(ex); } } /** * 子節點監聽器 * @param nodePath 主節點 * @param listener 監聽器 */ private void registerPathChildListener(String nodePath, PathChildrenCacheListener listener) { try { // 1. 創建一個PathChildrenCache PathChildrenCache pathChildrenCache = new PathChildrenCache(client, nodePath, true); // 2. 添加目錄監聽器 pathChildrenCache.getListenable().addListener(listener); // 3. 啟動監聽器 pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); } catch (Exception e) { log.error("注冊子目錄監聽器出現異常,nodePath:{}",nodePath,e); throw new RuntimeException(e); } } /** * 服務啟動,注冊zk節點 * @throws Exception 異常 */ private void zkOp() throws Exception { // 是否存在ruubypay-sharding主節點 if (null == client.checkExists().forPath(zkConfig.getNodePath())) { client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkConfig.getNodePath(), Hashing.sha1().hashString("sharding", Charsets.UTF_8).toString().getBytes()); } // 是否存服務主節點 if (null == client.checkExists().forPath(zkConfig.getNodePath() + zkConfig.getServiceName())) { // 創建服務主節點 client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkConfig.getNodePath() + zkConfig.getServiceName()); } // 檢查是否存在臨時節點 if (null == client.checkExists().forPath(zkConfig.getNodePath() + zkConfig.getServiceName() + "/" + instanceInfo.getInstance())) { System.out.println(zkConfig.getNodePath() + zkConfig.getServiceName() + "/" + instanceInfo.getInstance()); // 創建臨時節點 client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(zkConfig.getNodePath() + zkConfig.getServiceName() + "/" + instanceInfo.getInstance(), zkConfig.getShardingCount().toString().getBytes(StandardCharsets.UTF_8)); } shardingFromZk(); } /** * 從zk獲取機器列表并進行分片 * @throws Exception 異常 */ private void shardingFromZk() throws Exception { // 從 serviceName 節點下獲取所有Ip列表 final GetChildrenBuilder childrenBuilder = client.getChildren(); final List<String> instanceList = childrenBuilder.watched().forPath(zkConfig.getNodePath() + zkConfig.getServiceName()); List<InstanceInfo> res = new ArrayList<>(); instanceList.forEach(s -> { res.add(new InstanceInfo(s)); }); Map<InstanceInfo, List<Integer>> shardingResult = shardingStrategy.sharding(res, zkConfig.getShardingCount()); // 先清一遍緩存 caches.clear(); shardingResult.forEach((k, v) -> { caches.put(k.getInstance().split("-")[0], v); }); } /** * zk連接監聽 */ private class ConnectionListener implements ConnectionStateListener { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { if (newState == ConnectionState.CONNECTED || newState == ConnectionState.LOST || newState == ConnectionState.RECONNECTED) { try { zkOp(); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } finally { COUNT_DOWN_LATCH.countDown(); } } } } /** * 子節點監聽 */ private class ChildrenPathListener implements PathChildrenCacheListener { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) { PathChildrenCacheEvent.Type type = event.getType(); if (PathChildrenCacheEvent.Type.CHILD_ADDED == type || PathChildrenCacheEvent.Type.CHILD_REMOVED == type) { try { shardingFromZk(); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } } } } }
分片算法
采用平均分配的算法
public interface ShardingStrategy { Map<InstanceInfo, List<Integer>> sharding(final List<InstanceInfo> list, Integer shardingCount); } public class AverageAllocationShardingStrategy implements ShardingStrategy { @Override public Map<InstanceInfo, List<Integer>> sharding(List<InstanceInfo> list, Integer shardingCount) { if (list.isEmpty()) { return null; } Map<InstanceInfo, List<Integer>> result = shardingAliquot(list, shardingCount); addAliquant(list, shardingCount, result); return result; } private Map<InstanceInfo, List<Integer>> shardingAliquot(final List<InstanceInfo> instanceInfos, final int shardingTotalCount) { Map<InstanceInfo, List<Integer>> result = new LinkedHashMap<>(shardingTotalCount, 1); int itemCountPerSharding = shardingTotalCount / instanceInfos.size(); int count = 0; for (InstanceInfo each : instanceInfos) { List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1); for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) { shardingItems.add(i); } result.put(each, shardingItems); count++; } return result; } private void addAliquant(final List<InstanceInfo> instanceInfos, final int shardingTotalCount, final Map<InstanceInfo, List<Integer>> shardingResults) { int aliquant = shardingTotalCount % instanceInfos.size(); int count = 0; for (Map.Entry<InstanceInfo, List<Integer>> entry : shardingResults.entrySet()) { if (count < aliquant) { entry.getValue().add(shardingTotalCount / instanceInfos.size() * instanceInfos.size() + count); } count++; } } }
關于“Java Zookeeper分布式分片算法源碼分析”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“Java Zookeeper分布式分片算法源碼分析”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。