spring cloud gateway lb负载机器任意上传文件,下载文件,请求参数保存日志

2022-07-07 11:48:27

设置文件上传限制:properties设置: spring.codec.max-in-memory-size=2MB

可在代码中加载配置:

import org.springframework.context.annotation.Configuration;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.config.WebFluxConfigurer;

@Configuration
@EnableWebFlux
public class WebFluxConfig implements WebFluxConfigurer {
    @Override
    public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
        configurer.defaultCodecs().maxInMemorySize(2*1024*1024);
    }
}

第一步开始配置最高级过滤器:RequestFilter

import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;

@Component
public class RequestFilter implements GlobalFilter, Ordered {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        //上传之类的不做参数处理
        List<String> downloadFlag = exchange.getRequest().getHeaders().get("Upgrade-Insecure-Requests");
        if (downloadFlag != null) {
            return chain.filter(exchange);
        }
        return request(exchange, chain);
    }

    private Mono<Void> request(ServerWebExchange exchange, GatewayFilterChain chain) {
        MediaType contentType = exchange.getRequest().getHeaders().getContentType();
        ServerHttpRequest request = exchange.getRequest();
        ServerHttpResponse response = exchange.getResponse();
        if (contentType == null) {
            if ("POST,PUT".equals(request.getMethod().name())) {
                return DataBufferUtils.join(request.getBody()).flatMap(dataBuffer -> {
                    byte[] bytes = new byte[dataBuffer.readableByteCount()];
                    dataBuffer.read(bytes);
                    DataBufferUtils.release(dataBuffer);
                    //修改请求参数
                    ServerHttpRequest.Builder mutate = request.mutate();
                    ServerHttpRequest build = mutate.build();
                    Flux<DataBuffer> cache = Flux.defer(() -> {
                        DataBuffer wrap = response.bufferFactory().wrap(bytes);
                        return Mono.just(wrap);
                    });
                    ServerHttpRequestDecorator requestDecorator = new ServerHttpRequestDecorator(build) {
                        @Override
                        public Flux<DataBuffer> getBody() {
                            return cache;
                        }
                    };
                    return chain.filter(exchange.mutate().request(requestDecorator).build());
                });
            }
        }
        return chain.filter(exchange);
    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }
}

第二步配置最低级过滤器作为作为保存日志处理:ResponseFilter

import org.reactivestreams.Publisher;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.*;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.util.MultiValueMap;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

@Component
public class ResponseFilter implements GlobalFilter, Ordered {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        //注意下在的时候GET记得加这个在请求头中,自己定义一个也可以
        List<String> downloadFlag = exchange.getRequest().getHeaders().get("Upgrade-Insecure-Requests");
        ServerHttpRequest request = exchange.getRequest();
        ServerHttpResponse response = exchange.getResponse();
        //获取请求参数
        String requestParam = getRequestParam(request);
        //处理下载文件
        AtomicReference<Object> result = new AtomicReference<>();
        if (downloadFlag != null) {
            return chain.filter(exchange).then(Mono.fromRunnable(() -> {
                //异步保存日志信息
                Object responseResult = getDownloadResult(response, result);
                //下载文件之后组织一个下载文件响应的报文当日志存储
                String res = String.valueOf(responseResult);
            }));
        }
        //非下载文件特殊处理
        DataBufferFactory bufferFactory = response.bufferFactory();
        ServerHttpResponseDecorator decoratorResponse = getResponse(request, response, bufferFactory, result);
        decoratorResponse.getHeaders().setContentType(MediaType.APPLICATION_JSON);
        return chain.filter(exchange.mutate().response(decoratorResponse).build()).then(Mono.fromRunnable(() -> {
            String res = String.valueOf(result.get());
            //记录日志信息
        }));
    }

    private Object getDownloadResult(ServerHttpResponse response, AtomicReference<Object> result) {
        HttpHeaders headers = response.getHeaders();
        ArrayList<Map<String, String>> list;
        List<String> fileNameList;
        Map<String, Object> res = null;
        if (headers.containsKey("Content-dispostion")) {
            fileNameList = headers.get("Content-dispostion");
            if (fileNameList != null) {
                res = new HashMap<>();
                list = new ArrayList<>();
                for (int i = 0; i < fileNameList.size(); i++) {
                    String fileName = fileNameList.get(i);
                    Map<String, String> map = new HashMap<>();
                    map.put("fileNme", fileName);
                    list.add(map);
                }
                //自定义返回报文
                res.put("code", "");
                res.put("fileList", list);
            }
        }


        return res;
    }

    private String getRequestParam(ServerHttpRequest request) {
        HttpMethod method = request.getMethod();
        String res = null;
        if (method == HttpMethod.POST) {
            res = getPostParam(request);
        } else if (method == HttpMethod.PUT) {
            res = getPostParam(request);
        } else if (method == HttpMethod.GET) {
            res = getGetParam(request);
        }
        return res;
    }

  private String getGetParam(ServerHttpRequest request) {
        MultiValueMap<String, String> queryParams = request.getQueryParams();
        Map<String, String> map = null;
        if (null != queryParams) {
            map = new HashMap<>();
            Set<String> keys = queryParams.keySet();
            Map<String, String> finalMap = map;
            keys.forEach(key -> {
                finalMap.put(key, queryParams.getFirst(key));
            });
        }
        return map == null ? "" : map.toString();
    }

    private String getPostParam(ServerHttpRequest request) {

        Flux<DataBuffer> body = request.getBody();
        AtomicReference<String> reference = new AtomicReference<>();
        body.subscribe(dataBuffer -> {
            CharBuffer decode = StandardCharsets.UTF_8.decode(dataBuffer.asByteBuffer());
            DataBufferUtils.release(dataBuffer);
            reference.set(decode.toString());
        });
        return reference.get();
    }

    private ServerHttpResponseDecorator getResponse(ServerHttpRequest request, ServerHttpResponse response, DataBufferFactory bufferFactory, AtomicReference<Object> result) {

        return new ServerHttpResponseDecorator(response) {
            @Override
            public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
                Flux<? extends DataBuffer> fluxBody = Flux.from(body);
                return super.writeWith(fluxBody.buffer().map(dataBuffers -> {
                    DefaultDataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
                    DefaultDataBuffer join = dataBufferFactory.join(dataBuffers);
                    byte[] bytes = new byte[join.readableByteCount()];
                    join.read(bytes);
                    DataBufferUtils.release(join);
                    if (!isDownload(response)) {
                        String res = new String(bytes, StandardCharsets.UTF_8);
                        result.set(res);
                    }
                    return bufferFactory.wrap(bytes);
                }));
            }
        };

    }

    //防止下载有漏网之鱼导致---塞入大字符串中导致OOM
    private boolean isDownload(ServerHttpResponse response) {
        HttpHeaders headers = response.getHeaders();
        if (headers.containsKey("Content-dispostion")) {
            return true;
        }
        return false;
    }

    @Override
    public int getOrder() {
        return Ordered.LOWEST_PRECEDENCE;
    }
}
  • 作者:我来秋风扫落叶
  • 原文链接:https://blog.csdn.net/qq_15038701/article/details/122488498
    更新时间:2022-07-07 11:48:27