在 ZooKeeper 中使用协议 Zab 进行广播 [英] Broadcasting using the protocol Zab in ZooKeeper

查看:18
本文介绍了在 ZooKeeper 中使用协议 Zab 进行广播的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

早上好,

我是 ZooKeeper 及其协议的新手,我对它的广播协议 Zab 感兴趣.

I am new to ZooKeeper and its protocols and I am interested in its broadcast protocol Zab.

能否提供一个简单的java代码,使用Zookeeper的Zab协议?我一直在搜索,但我没有成功找到显示如何使用 Zab 的代码.

Could you provide me with a simple java code that uses the Zab protocol of Zookeeper? I have been searching about that but I did not succeed to find a code that shows how can I use Zab.

事实上,我需要的很简单,我有一个 MapReduce 代码,我希望所有映射器在成功找到更好的 X 值(即更大的值)时更新变量(假设为 X).在这种情况下,领导者必须比较旧值和新值,然后将实际最佳值广播给所有映射器.我怎么能用Java做这样的事情?

In fact what I need is simple, I have a MapReduce code and I want all the mappers to update a variable (let's say X) whenever they succeed to find a better value of X (i.e. a bigger value). In this case, the leader has to compare the old value and the new value and then to broadcast the actual best value to all mappers. How can I do such a thing in Java?

提前致谢,问候

推荐答案

您不需要使用 Zab 协议.相反,您可以按照以下步骤操作:

You don't need to use the Zab protocol. Instead you may follow the below steps:

您在 Zookeeper 上有一个 Znode 说/bigvalue.所有映射器在启动时读取存储在其中的值.他们还监视 Znode 上的数据变化.每当映射器获得更好的值时,它就会用更好的值更新 Znode.所有映射器都将收到数据更改事件的通知,他们读取新的最佳值并重新建立数据更改监视.这样它们就会与最新的最佳值同步,并且可以在有更好的值时更新最新的最佳值.

You have a Znode say /bigvalue on Zookeeper. All the mappers when starts reads the value stored in it. They also put an watch for data change on the Znode. Whenever a mapper gets a better value, it updates the Znode with the better value. All the mappers will get notification for the data change event and they read the new best value and they re-establish the watch for data changes again. That way they are in sync with the latest best value and may update the latest best value whenever there is a better value.

实际上 zkclient 是一个非常好的与 Zookeeper 一起工作的库,它隐藏了很多复杂性(https://github.com/sgroschupf/zkclient).下面是一个示例,演示了如何观察 Znode "/bigvalue" 的任何数据更改.

Actually zkclient is a very good library to work with Zookeeper and it hides a lot of complexities ( https://github.com/sgroschupf/zkclient ). Below is an example that demonstrates how you may watch a Znode "/bigvalue" for any data change.

package geet.org;

import java.io.UnsupportedEncodingException;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.data.Stat;

public class ZkExample implements IZkDataListener, ZkSerializer {
    public static void main(String[] args) {
        String znode = "/bigvalue";
        ZkExample ins = new ZkExample();
        ZkClient cl = new ZkClient("127.0.0.1", 30000, 30000,
                ins);
        try {
            cl.createPersistent(znode);
        } catch (ZkNodeExistsException e) {
            System.out.println(e.getMessage());
        }
        // Change the data for fun
        Stat stat = new Stat();
        String data =  cl.readData(znode, stat);
        System.out.println("Current data " + data + "version = " + stat.getVersion());
        cl.writeData(znode, "My new data ", stat.getVersion());

        cl.subscribeDataChanges(znode, ins);
        try {
            Thread.sleep(36000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void handleDataChange(String dataPath, Object data) throws Exception {
        System.out.println("Detected data change");
        System.out.println("New data for " + dataPath + " " + (String)data);
    }

    @Override
    public void handleDataDeleted(String dataPath) throws Exception {
        System.out.println("Data deleted " + dataPath);
    }

    @Override
    public byte[] serialize(Object data) throws ZkMarshallingError {
        if (data instanceof String){
            try {
                return ((String) data).getBytes("UTF-8");
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        return null;
    }

    @Override
    public Object deserialize(byte[] bytes) throws ZkMarshallingError {
        try {
            return new String(bytes, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return null;
    }
}

这篇关于在 ZooKeeper 中使用协议 Zab 进行广播的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆