这里就不多做介绍,直接上代码
Service
package zookeeper.test.rmi.service;
import java.rmi.Remote;
import java.rmi.RemoteException;
public interface HelloService extends Remote {
/**
* 在顶级rmi方法时,必须要抛出RemoteException
*
* @param name
* @throws RemoteException
*/
void sayHello(String name) throws RemoteException;
}
package zookeeper.test.rmi.service.impl;
import zookeeper.test.rmi.service.HelloService;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
public class HelloServiceImpl extends UnicastRemoteObject implements HelloService {
public HelloServiceImpl() throws RemoteException {
}
@Override
public void sayHello(String name) throws RemoteException {
System.out.println(Thread.currentThread().getName() + ", name: " + name);
}
}
ZooKeeper管理工厂类
package zookeeper.test;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
public class ZookeeperFactory {
static {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("执行钩子。。。");
ZookeeperFactory.close();
}));
}
private static String servers = "192.168.56.102:2181,192.168.56.103:2181,192.168.56.104:2181";
private volatile static ZooKeeper zooKeeper;
private static final Object MUTEX = new Object();
private volatile static boolean isClosed = true;
private ZookeeperFactory() {
}
public static ZooKeeper getInstance() {
if (zooKeeper == null) {
synchronized (MUTEX) {
if (zooKeeper == null || isClosed) {
try {
zooKeeper = new ZooKeeper(servers, 3000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("Receive watched event: " + event);
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
return zooKeeper;
}
public static void close() {
synchronized (MUTEX) {
if (zooKeeper != null) {
try {
zooKeeper.close();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
isClosed = true;
}
}
}
这里只是使用了一个单例,并且在jvm退出的时候能够关闭ZooKeeper的Session,其他的没有什么用处。
RMI Server端配置
package zookeeper.test.rmi;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import zookeeper.test.ZookeeperFactory;
import zookeeper.test.rmi.service.HelloService;
import zookeeper.test.rmi.service.impl.HelloServiceImpl;
import java.net.*;
import java.rmi.Naming;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
public class RMIServer {
public static void main(String[] args) throws SocketException, RemoteException, MalformedURLException, InterruptedException, KeeperException {
int port = 9093;
String path = "/rmiservers/provider";
Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
List<String> supportIps = new ArrayList<>();
while (networkInterfaces.hasMoreElements()) {
NetworkInterface networkInterface = networkInterfaces.nextElement();
// 过滤没有启用或者回环地址
if (networkInterface.isLoopback() || !networkInterface.isUp()) {
continue;
}
Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
InetAddress inetAddress = inetAddresses.nextElement();
if (inetAddress.isLoopbackAddress()
|| inetAddress.isLinkLocalAddress()
|| inetAddress.isMulticastAddress()
|| inetAddress instanceof Inet6Address) {
System.out.println("------" + inetAddress.getHostAddress());
continue;
}
String hostAddress = inetAddress.getHostAddress();
System.out.println("ip: " + hostAddress);
supportIps.add(hostAddress);
}
}
String urlFormat = "rmi://%s:%d/hello";
LocateRegistry.createRegistry(port);
HelloService helloService = new HelloServiceImpl();
ZooKeeper zooKeeper = ZookeeperFactory.getInstance();
for (String ip : supportIps) {
String url = String.format(urlFormat, ip, port);
Naming.rebind(url, helloService);
zooKeeper.create(path, url.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
}
这里注册的ip地址是读取的当前主机的网卡列表信息,并过滤掉不需要使用的,然后通过RMI接口注册,并将注册的信息存储到ZooKeeper, 这里主要是采用了有序临时节点,这样当服务退出的时候,就能够删除服务节点
客户端实现
客户端主要流程就是从ZooKeeper获取服务列表,并从中选取一个调用。然后监听znode事件,并及时的更新服务列表的集合。
package zookeeper.test.rmi;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import zookeeper.test.ZookeeperFactory;
import zookeeper.test.rmi.service.HelloService;
import java.net.MalformedURLException;
import java.rmi.Naming;
import java.rmi.NotBoundException;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class RMIClient {
public static void main(String[] args) throws InterruptedException, KeeperException, MalformedURLException, NotBoundException, RemoteException {
System.out.println("Client start...");
ZooKeeper zooKeeper = ZookeeperFactory.getInstance();
String path = "/rmiservers";
List<String> children = new ArrayList<>();
getGetChildren(zooKeeper, path, children);
if (children.isEmpty()) {
System.out.println("没有找到注册的provider服务列表");
return;
}
int length = children.size();
int index = new Random().nextInt(length);
String subNode = children.get(index);
byte[] bytes = zooKeeper.getData(path + "/" + subNode, false, null);
String url = new String(bytes);
HelloService helloService = (HelloService) Naming.lookup(url);
helloService.sayHello("你好呀,服务端");
Thread.sleep(Integer.MAX_VALUE);
}
private static List<String> getGetChildren(ZooKeeper zooKeeper, String path, List<String> children) throws InterruptedException, KeeperException {
Watcher watcher = event -> {
System.out.println("接收到事件信息");
if (event.getState() == Watcher.Event.KeeperState.SyncConnected && event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
try {
getGetChildren(zooKeeper, path, children);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (KeeperException e) {
throw new RuntimeException(e);
}
}
};
children.clear();
List<String> subChildren = zooKeeper.getChildren(path, watcher, null);
children.addAll(subChildren);
System.out.println("当前结果集为: " + children.isEmpty());
return children;
}
}
