一、功能设计
拦截远程调用,当远程服务异常,则拦截,直接返回;
若远程服务正常,则放行调用。
二、窗口滑动
在一个时间窗口
11:01->11:02->11:03->11:04->11:05
在一个窗口时间内,若调用失败次数达到一个值,做什么事情
三、熔断器的状态HystrixStatus
/**
* 断路器的状态
* @author WHX
*/publicenum HystrixStatus{OPEN(0),CLOSE(1),HALF_OPEN(2);// 半开是关的一种策略 半开: 使用少许请求测试提供者是否活了// 半开是: 开(多一点) + 关HystrixStatus(int code){}}
**四、断路器接口设计 **HystixCmd
/**
* 断路器功能设计
* @author WHX
*/publicinterfaceHystixCmd{/**
* 拦截请求
*
* @param request
* @return
*/
Objectinterceptor(ProceedingJoinPoint request);/**
* 通过请求
*
* @return
*/
Objectpass(ProceedingJoinPoint request);/**
* 测试请求
*
* @param request
* @return
*/
Objecttest(ProceedingJoinPoint request);/**
* 修改拦截器的状态
*
* @param status
*/voidchangeStatus(HystrixStatus status);/**
* 获取自己的状态
*
* @return
*/
HystrixStatusgetStatus();}
五、切面拦截远程请求的方法HystixAspect
@Aspect@ComponentpublicclassHystixAspect{/**
* key: 服务的id
* value:该服务对应的拦截器
*/private Map<String, HystixCmd> hystixCmds=newHashMap<String, HystixCmd>();{//注入一个此服务的拦截器
hystixCmds.put("server-id",newHystixCmdImpl());}@Around("@annotation(com.sxt.feign.core.anno.HystixCmdRpc)")public Objectinterceptor(ProceedingJoinPoint point){// 从容器里面取自己的拦截器
HystixCmd hystixCmd= hystixCmds.get("server-id");switch(hystixCmd.getStatus()){case OPEN:return hystixCmd.interceptor(point);case CLOSE:return hystixCmd.pass(point);case HALF_OPEN:// 有结果,没有抛异常
System.out.println("半开,我用3%的几率访问一下,看提供者活了没有");
Object result= hystixCmd.test(point);return result;default:break;}return null;// 没有符合的条件}}
六、拦截器的实现HystixCmdImpl 实现上面的断路器接口
publicclassHystixCmdImplimplementsHystixCmd{/**
* 一个窗口的时间是5
*/privatestatic Long WINDOW_SLIDE_TIME=5000L;/**
* 在一个窗口内失败10 次就代表远程服务异常
*/privatestatic Integer MAX_FAIL_COUNT=3;/**
* 锁对象
*/private Object lock=newObject();/**
* 默认是关的
*/private HystrixStatus status= HystrixStatus.CLOSE;/**
* 随机数
*/privatestatic Random RDM=newRandom();/**
* 在一个窗口怎么统计次数
*/private AtomicInteger currentFallCount=newAtomicInteger(0);{// 定时任务做啥事情, 若一个窗口里面它没有达到失败的阈值,但是我们不能让该值去影响下个窗口,就需要清空该窗口的值newThread(()->{while(true){try{
Thread.sleep(WINDOW_SLIDE_TIME);// 5s 执行一次检查}catch(InterruptedException e1){
e1.printStackTrace();}if(this.getStatus()== HystrixStatus.CLOSE){// 断路器关闭
currentFallCount.set(0);}else{// 断路器打开了 或半开
System.out.println("断路器打开了,我统计失败次数,没有任何意义,我先死一会");synchronized(lock){try{//少许流量测试通过后再唤醒我
lock.wait();
System.out.println("远程服务正常了,我需要在启动清空窗口数据了");
currentFallCount.set(0);}catch(InterruptedException e){
e.printStackTrace();}}}}}).start();}/**
* 拦截请求
*/@Overridepublic Objectinterceptor(ProceedingJoinPoint request){// 不调用正常的值,使用备胎值
MethodSignature methodSignature=(MethodSignature) request.getSignature();
Method method= methodSignature.getMethod();// 这是正常的实现类方法,我们需要调用备胎里面的方法// 现在没法得到接口和实现类
Object object=getFallCallback(method);
Method callBack= null;try{
callBack= object.getClass().getMethod(method.getName(), method.getParameterTypes());}catch(NoSuchMethodException| SecurityException e1){
e1.printStackTrace();}
Object fallBackResult= null;try{
fallBackResult= callBack.invoke(object, request.getArgs());// 直接调用备胎里面的方法}catch(IllegalAccessException| IllegalArgumentException| InvocationTargetException e){
e.printStackTrace();}return fallBackResult;}/**
* 正常调用
*/@Overridepublic Objectpass(ProceedingJoinPoint request){
Object result= null;try{
result= request.proceed(request.getArgs());}catch(Throwable e){// 有异常了,若异常为超时异常,代表失败了 一段时间失败n 次// e.printStackTrace();// 若失败了,我把失败次数++
currentFallCount.getAndIncrement();if(currentFallCount.get()>= MAX_FAIL_COUNT){// 失败超过阈值
System.out.println("当前的窗口里面已经达到失败的阈值了,我把断路器打开");this.changeStatus(HystrixStatus.OPEN);// 打开断路器,一段时间后,把断路器改为半开newThread(()->{try{
Thread.sleep(5000L);
System.out.println("过了一段时间,我把断路器该为半开");this.changeStatus(HystrixStatus.HALF_OPEN);}catch(InterruptedException e1){
e1.printStackTrace();}}).start();}// 失败了,我们也不直接报错,而是返回备胎的结果
result=interceptor(request);}return result;}@Overridepublic Objecttest(ProceedingJoinPoint request){// 断路器半开,使用少许的流量测试// 3% 100 3int num= RDM.nextInt(100)+1;//1 100
Object result= null;if(num<=3){// 做测试
System.out.println("test中奖,去测试提供者是否活了");try{
result= request.proceed(request.getArgs());// 测试通过了,关闭断路器this.changeStatus(HystrixStatus.CLOSE);// 开启窗口滑动来清空计数synchronized(lock){
System.out.println("远程调用的测试已经通过,远程服务正常");//唤醒窗口线程
lock.notifyAll();}}catch(Exception e){// 进来,就没有通过
System.out.println("测试没有通过1");// e.printStackTrace();returninterceptor(request);}catch(Throwable throwable){// 进来,就没有通过
System.out.println("测试没有通过2");}}else{
System.out.println("未进测试,直接返回备胎结果");returninterceptor(request);}return result;}@OverridepublicvoidchangeStatus(HystrixStatus status){this.status= status;}@Overridepublic HystrixStatusgetStatus(){returnthis.status;}private ObjectgetFallCallback(Method method){
HystixCmdRpc annotation= method.getAnnotation(HystixCmdRpc.class);
Class<?> callback= annotation.callback();try{return callback.newInstance();}catch(InstantiationException| IllegalAccessException e){
e.printStackTrace();}return null;}}
七、使用注解来实现接口的获取 @HystixCmdRpc
@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interfaceHystixCmdRpc{/**
* 该方法调用失败了,我做啥
*
* @return 是备胎的类型
*/
Class<?>callback()default Object.class;}
八、测试:远程调用的接口 RpcService
远程调用的接口RpcService
publicinterfaceRpcService{/**
* 执行远程调用
* @return
*/public Stringrpc();}
正常调用的实现类
@ServicepublicclassRpcServiceImplimplementsRpcService{@Autowiredprivate RestTemplate rest;/**
* 正常的调用getForObject
* 不正常走RpcFallCallback 里面的rpc 方法
*/@Override@HystixCmdRpc(callback= RpcFallCallback.class)public Stringrpc(){
String result= rest.getForObject("http://localhost:8081/info", String.class);return result+"-------";}}
失败的(备胎)实现类
publicclassRpcFallCallbackimplementsRpcService{@Overridepublic Stringrpc(){return"我是备胎";}}
yml
server:
port: 8087
spring:
application:
name: feign-client
eureka:
client:
service-url:
defaultZone: http://nnhx.top:8761/eureka/,http://nnhx.top:8762/eureka/,http://nnhx.top:8763/eureka/
启动类:
@SpringBootApplication@RestControllerpublicclassFeignClientApplication{@Autowiredprivate RpcService rpcService;publicstaticvoidmain(String[] args){
SpringApplication.run(FeignClientApplication.class, args);}@Beanpublic RestTemplaterest(){returnnewRestTemplate();}@GetMapping("/rpc/testwu")public Stringsdfds(){int i=0;while(i<1000000000){
String rpc= rpcService.rpc();
System.out.println(rpc);
i++;}return"rpcOK";}}
启动提供者,在启动此消费者,访问http://localhost:8087/rpc/testwu查看消费者控制台–》关掉提供者,观察消费者控制台–》再重启提供者,观察消费者控制台;