根据IP动态路由调用Dubbo服务
一、前言
前面我们探讨了如何获取某一个Dubbo的服务的提供者列表,本节我们探讨如何使用Dubbo的扩展,实现指定IP调用。
二、实现
在Dubbo中集群容错策略Cluster是SPI扩展接口,DUbbo框架提供了丰富的集群容错策略实现,本节我们就基于扩展接口实现指定IP调用功能。
首先我们实现扩展接口Cluster:
public class MyCluster implements Cluster{ @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new MyClusterInvoker(directory); } }
然后我们看自己实现的MyClusterInvoker
public class MyClusterInvoker<T> extends MyAbstractClusterInvoker<T> { public MyClusterInvoker(Directory<T> directory) { super(directory); } @Override protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { //1.查看是否设置了指定ip String ip = (String) RpcContext.getContext().get("ip"); if (StringUtils.isBlank(ip)) { throw new RuntimeException("ip is blank "); } //2.检查是否有可用invoker checkInvokers(invokers,invocation); //3.根据指定ip获取对应invoker Invoker<T> invoked = invokers.stream().filter(invoker -> invoker.getUrl().getHost().equals(ip)) .findFirst().orElse(null); //4.检查是否有可用invoker if(null == invoked) { throw new RpcException(RpcException.NO_INVOKER_AVAILABLE_AFTER_FILTER, "Failed to invoke the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + ". No provider available for the service " + directory.getUrl().getServiceKey() + " from ip " + ip + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Please check if the providers have been started and registered."); } //5.发起远程调用,失败则抛出异常 try { return invoked.invoke(invocation); } catch (Throwable e) { if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception. throw (RpcException) e; } throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Fail invoke providers " + (invoked != null?invoked.getUrl():"")+ " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } } ... }
- 如上代码1,我们从RpcContext.getContext()获取了属性值ip,如果指定了改值说明指定了ip,
- 代码2则检查是否有可用的服务提供者,如果没有则抛出异常。
- 代码3变量invokers列表查找指定IP对应的Invoker
- 代码4 检查是否有对应IP对应的Invoker,没有则抛出异常。
- 代码5 具体使用选择的invoker发起远程调用。
注意我们还修改了框架的AbstractClusterInvoker为MyAbstractClusterInvoker:
public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); // binding attachments into invocation. Map<String, String> contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addAttachments(contextAttachments); } List<Invoker<T>> invokers = list(invocation); LoadBalance loadbalance = null;//initLoadBalance(invokers, invocation); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }
这里我们把 LoadBalance loadbalance = initLoadBalance(invokers, invocation);
修改为了 LoadBalance loadbalance = null;因为我们不需要负载均衡了。
扩展实现写好后,要把扩展实现配置到下面文件
image.png
然后在消费端调用时候进行下面设置就可以指定ip调用了。
//设置集群容错策略为我们自己的 referenceConfig.setCluster("myCluster"); //指定ip,企图让ip为30.10.67.231的服务提供者来处理服务 RpcContext.getContext().set("ip", "30.10.67.231");
三、总结
Dubbo是一个高度可扩充的框架,基于SPI的扩展接口,我们可以根据需要定制我们自己的实现,本文我们则基于集群容错策略实现了基于ip调用的扩展。
原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 根据IP动态路由调用Dubbo服务
这里用负载均衡机制,在调用的时候设置RpcContext,动态调用的时候get 会不会更好?
动态调用的时候怎么get?你的能找到具体的inovker列表,然后从中选择出你指定的ip对应的invoker
说错,在负载均衡select的时候,get会不会更好。。。