溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java?Zookeeper分布式分片算法源碼分析

發布時間:2023-03-01 16:57:42 來源:億速云 閱讀:137 作者:iii 欄目:開發技術

這篇文章主要介紹了Java Zookeeper分布式分片算法源碼分析的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇Java Zookeeper分布式分片算法源碼分析文章都會有所收獲,下面我們一起來看看吧。

    背景

    公司的一個服務需要做類似于分片的邏輯,一開始服務基于傳統部署方式通過本地配置文件配置的方式就可以指定該機器服務的分片內容如:0,1,2,3,隨著系統的升級迭代,該服務進行了容器化部署,所以原來基于本地配置文件各自配置分片數據的方式就不適用了,原來的部署方式使得服務是有狀態,是一種非云原生的方式,所以該服務要重新設計實現一套分布式服務分片邏輯。

    技術方案

    分布式協調中間件

    要實現分布式服務分片的能力,需要有一個分布式中間件,如:Redis,Mysql,Zookeeper等等都可以,我們選用Zookeeper。

    基于Zookeeper的技術方案

    使用Zookeeper主要是基于Zookeeper的臨時節點和節點變化監聽機制,具體的技術設計如下:

    服務注冊目錄設計

    Zookeeper的數據存儲結構類似于目錄,服務注冊后的目錄類似如下結構:

    解釋下該目錄結構,首先/xxxx/xxxx/sharding是區別于其他業務的的目錄,該目錄節點是持久的,service是服務目錄,標識一個服務,該節點也是持久的,ip1,ip2是該服務注冊到Zookeeper的機器列表節點,該節點是臨時節點。

    /xxxx/xxxx/sharding/service/ip1
    -----|----|--------|-------/ip2

    服務分片處理流程
    • 服務啟動,創建CuratorFramework客戶端,設置客戶端連接狀態監聽;

    • Zookeeper注冊該機器的信息,這里設計簡單,機器信息就是ip地址;

    • 注冊機器信息后,從Zookeeper獲取所有注冊信息;

    • 根據Zookeeper獲取的所有注冊機器信息根據分片算法進行分片計算。

    編碼實現

    ZookeeperConfig

    Zookeeper的配置信息

    @Data
    public class ZookeeperConfig {
        /**
         * zk集群地址
         */
        private String zkAddress;
        /**
         * 注冊服務目錄
         */
        private String nodePath;
        /**
         * 分片的服務名
         */
        private String serviceName;
        /**
         * 分片總數
         */
        private Integer shardingCount;
        public ZookeeperConfig(String zkAddress, String nodePath, String serviceName, Integer shardingCount) {
            this.zkAddress = zkAddress;
            this.nodePath = nodePath;
            this.serviceName = "/" + serviceName;
            this.shardingCount = shardingCount;
        }
        /**
         * 等待重試的間隔時間的初始值.
         * 單位毫秒.
         */
        private int baseSleepTimeMilliseconds = 1000;
        /**
         * 等待重試的間隔時間的最大值.
         * 單位毫秒.
         */
        private int maxSleepTimeMilliseconds = 3000;
        /**
         * 最大重試次數.
         */
        private int maxRetries = 3;
        /**
         * 會話超時時間.
         * 單位毫秒.
         */
        private int sessionTimeoutMilliseconds;
        /**
         * 連接超時時間.
         * 單位毫秒.
         */
        private int connectionTimeoutMilliseconds;
    }

    InstanceInfo注冊機器

    @AllArgsConstructor
    @EqualsAndHashCode()
    public class InstanceInfo {
        private String ip;
        public String getInstance() {
            return ip;
        }
    }

    ZookeeperShardingService分片服務

    @Slf4j
    public class ZookeeperShardingService {
        public final Map<String, List<Integer>> caches = new HashMap<>(16);
        private final CuratorFramework client;
        private final ZookeeperConfig zkConfig;
        private final ShardingStrategy shardingStrategy;
        private final InstanceInfo instanceInfo;
        private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
        public ZookeeperShardingService(ZookeeperConfig zkConfig, ShardingStrategy shardingStrategy) {
            this.zkConfig = zkConfig;
            log.info("開始初始化zk, ip列表是: {}.", zkConfig.getZkAddress());
            CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                    .connectString(zkConfig.getZkAddress())
                    .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()));
            if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
                builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
            }
            if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
                builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
            }
            this.shardingStrategy = shardingStrategy;
            HostInfo host = new HostInfo();
            this.instanceInfo = new InstanceInfo(host.getAddress());
            client = builder.build();
            client.getConnectionStateListenable().addListener(new ConnectionListener());
            client.start();
            try {
                COUNT_DOWN_LATCH.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 注冊服務節點監聽
            registerPathChildListener(zkConfig.getNodePath() + zkConfig.getServiceName(), new ChildrenPathListener());
            try {
                if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
                    client.close();
                    throw new KeeperException.OperationTimeoutException();
                }
            } catch (final Exception ex) {
                ex.printStackTrace();
                throw new RuntimeException(ex);
            }
        }
        /**
         * 子節點監聽器
         * @param nodePath 主節點
         * @param listener 監聽器
         */
        private void registerPathChildListener(String nodePath, PathChildrenCacheListener listener) {
            try {
                // 1. 創建一個PathChildrenCache
                PathChildrenCache pathChildrenCache = new PathChildrenCache(client, nodePath, true);
                // 2. 添加目錄監聽器
                pathChildrenCache.getListenable().addListener(listener);
                // 3. 啟動監聽器
                pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
            } catch (Exception e) {
                log.error("注冊子目錄監聽器出現異常,nodePath:{}",nodePath,e);
                throw new RuntimeException(e);
            }
        }
        /**
         * 服務啟動,注冊zk節點
         * @throws Exception 異常
         */
        private void zkOp() throws Exception {
            // 是否存在ruubypay-sharding主節點
            if (null == client.checkExists().forPath(zkConfig.getNodePath())) {
                client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkConfig.getNodePath(), Hashing.sha1().hashString("sharding", Charsets.UTF_8).toString().getBytes());
            }
            // 是否存服務主節點
            if (null == client.checkExists().forPath(zkConfig.getNodePath() + zkConfig.getServiceName())) {
                // 創建服務主節點
                client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkConfig.getNodePath() + zkConfig.getServiceName());
            }
            // 檢查是否存在臨時節點
            if (null == client.checkExists().forPath(zkConfig.getNodePath() + zkConfig.getServiceName() + "/" + instanceInfo.getInstance())) {
                System.out.println(zkConfig.getNodePath() + zkConfig.getServiceName() +  "/" + instanceInfo.getInstance());
                // 創建臨時節點
                client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(zkConfig.getNodePath() + zkConfig.getServiceName() +
                        "/" + instanceInfo.getInstance(), zkConfig.getShardingCount().toString().getBytes(StandardCharsets.UTF_8));
            }
            shardingFromZk();
        }
        /**
         * 從zk獲取機器列表并進行分片
         * @throws Exception 異常
         */
        private void shardingFromZk() throws Exception {
            // 從 serviceName 節點下獲取所有Ip列表
            final GetChildrenBuilder childrenBuilder = client.getChildren();
            final List<String> instanceList = childrenBuilder.watched().forPath(zkConfig.getNodePath() + zkConfig.getServiceName());
            List<InstanceInfo> res = new ArrayList<>();
            instanceList.forEach(s -> {
                res.add(new InstanceInfo(s));
            });
            Map<InstanceInfo, List<Integer>> shardingResult = shardingStrategy.sharding(res, zkConfig.getShardingCount());
            // 先清一遍緩存
            caches.clear();
            shardingResult.forEach((k, v) -> {
                caches.put(k.getInstance().split("-")[0], v);
            });
        }
        /**
         * zk連接監聽
         */
        private class ConnectionListener implements ConnectionStateListener {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                if (newState == ConnectionState.CONNECTED || newState == ConnectionState.LOST || newState == ConnectionState.RECONNECTED) {
                    try {
                        zkOp();
                    } catch (Exception e) {
                        e.printStackTrace();
                        throw new RuntimeException(e);
                    } finally {
                        COUNT_DOWN_LATCH.countDown();
                    }
                }
            }
        }
        /**
         * 子節點監聽
         */
        private class ChildrenPathListener implements PathChildrenCacheListener {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
                PathChildrenCacheEvent.Type type = event.getType();
                if (PathChildrenCacheEvent.Type.CHILD_ADDED == type || PathChildrenCacheEvent.Type.CHILD_REMOVED == type) {
                    try {
                        shardingFromZk();
                    } catch (Exception e) {
                        e.printStackTrace();
                        throw new RuntimeException(e);
                    }
                }
            }
        }
    }

    分片算法

    采用平均分配的算法

    public interface ShardingStrategy {
        Map<InstanceInfo, List<Integer>> sharding(final List<InstanceInfo> list, Integer shardingCount);
    }
    public class AverageAllocationShardingStrategy implements ShardingStrategy {
        @Override
        public Map<InstanceInfo, List<Integer>> sharding(List<InstanceInfo> list, Integer shardingCount) {
            if (list.isEmpty()) {
                return null;
            }
            Map<InstanceInfo, List<Integer>> result = shardingAliquot(list, shardingCount);
            addAliquant(list, shardingCount, result);
            return result;
        }
        private Map<InstanceInfo, List<Integer>> shardingAliquot(final List<InstanceInfo> instanceInfos, final int shardingTotalCount) {
            Map<InstanceInfo, List<Integer>> result = new LinkedHashMap<>(shardingTotalCount, 1);
            int itemCountPerSharding = shardingTotalCount / instanceInfos.size();
            int count = 0;
            for (InstanceInfo each : instanceInfos) {
                List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
                for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
                    shardingItems.add(i);
                }
                result.put(each, shardingItems);
                count++;
            }
            return result;
        }
        private void addAliquant(final List<InstanceInfo> instanceInfos, final int shardingTotalCount, final Map<InstanceInfo, List<Integer>> shardingResults) {
            int aliquant = shardingTotalCount % instanceInfos.size();
            int count = 0;
            for (Map.Entry<InstanceInfo, List<Integer>> entry : shardingResults.entrySet()) {
                if (count < aliquant) {
                    entry.getValue().add(shardingTotalCount / instanceInfos.size() * instanceInfos.size() + count);
                }
                count++;
            }
        }
    }

    關于“Java Zookeeper分布式分片算法源碼分析”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“Java Zookeeper分布式分片算法源碼分析”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女