package org.dromara.soul.web.plugin.function;

import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.dromara.soul.common.dto.convert.DivideUpstream;
import org.dromara.soul.common.dto.convert.rule.DivideRuleHandle;
import org.dromara.soul.common.dto.zk.RuleZkDTO;
import org.dromara.soul.common.dto.zk.SelectorZkDTO;
import org.dromara.soul.common.enums.PluginEnum;
import org.dromara.soul.common.enums.PluginTypeEnum;
import org.dromara.soul.common.enums.RpcTypeEnum;
import org.dromara.soul.common.utils.GsonUtils;
import org.dromara.soul.common.utils.LogUtils;
import org.dromara.soul.web.balance.utils.LoadBalanceUtils;
import org.dromara.soul.web.cache.UpstreamCacheManager;
import org.dromara.soul.web.cache.ZookeeperCacheManager;
import org.dromara.soul.web.plugin.AbstractSoulPlugin;
import org.dromara.soul.web.plugin.SoulPluginChain;
import org.dromara.soul.web.request.RequestDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import org.springframework.web.reactive.socket.server.WebSocketService;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/dromara/soul/web/plugin/function/WebSocketPlugin.class */
public class WebSocketPlugin extends AbstractSoulPlugin {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketPlugin.class);
    private static final String SEC_WEB_SOCKET_PROTOCOL = "Sec-WebSocket-Protocol";
    private final UpstreamCacheManager upstreamCacheManager;
    private final WebSocketClient webSocketClient;
    private final WebSocketService webSocketService;

    /* loaded from: input_file:org/dromara/soul/web/plugin/function/WebSocketPlugin$SoulWebSocketHandler.class */
    private static class SoulWebSocketHandler implements WebSocketHandler {
        private final WebSocketClient client;
        private final URI url;
        private final HttpHeaders headers;
        private final List<String> subProtocols;

        SoulWebSocketHandler(URI uri, WebSocketClient webSocketClient, HttpHeaders httpHeaders, List<String> list) {
            this.client = webSocketClient;
            this.url = uri;
            this.headers = httpHeaders;
            if (list != null) {
                this.subProtocols = list;
            } else {
                this.subProtocols = Collections.emptyList();
            }
        }

        public List<String> getSubProtocols() {
            return this.subProtocols;
        }

        public Mono<Void> handle(final WebSocketSession webSocketSession) {
            return this.client.execute(this.url, this.headers, new WebSocketHandler() { // from class: org.dromara.soul.web.plugin.function.WebSocketPlugin.SoulWebSocketHandler.1
                public Mono<Void> handle(WebSocketSession webSocketSession2) {
                    return Mono.zip(webSocketSession2.send(webSocketSession.receive().doOnNext((v0) -> {
                        v0.retain();
                    })), webSocketSession.send(webSocketSession2.receive().doOnNext((v0) -> {
                        v0.retain();
                    }))).then();
                }

                public List<String> getSubProtocols() {
                    return SoulWebSocketHandler.this.subProtocols;
                }
            });
        }
    }

    public WebSocketPlugin(ZookeeperCacheManager zookeeperCacheManager, UpstreamCacheManager upstreamCacheManager, WebSocketClient webSocketClient, WebSocketService webSocketService) {
        super(zookeeperCacheManager);
        this.upstreamCacheManager = upstreamCacheManager;
        this.webSocketClient = webSocketClient;
        this.webSocketService = webSocketService;
    }

    @Override // org.dromara.soul.web.plugin.AbstractSoulPlugin
    protected Mono<Void> doExecute(ServerWebExchange serverWebExchange, SoulPluginChain soulPluginChain, SelectorZkDTO selectorZkDTO, RuleZkDTO ruleZkDTO) {
        List<DivideUpstream> findUpstreamListBySelectorId = this.upstreamCacheManager.findUpstreamListBySelectorId(selectorZkDTO.getId());
        RequestDTO requestDTO = (RequestDTO) serverWebExchange.getAttribute("requestDTO");
        if (CollectionUtils.isEmpty(findUpstreamListBySelectorId) || Objects.isNull(requestDTO)) {
            Logger logger = LOGGER;
            ruleZkDTO.getClass();
            LogUtils.error(logger, "divide upstream config error：{}", ruleZkDTO::toString);
            return soulPluginChain.execute(serverWebExchange);
        }
        DivideRuleHandle divideRuleHandle = (DivideRuleHandle) GsonUtils.getInstance().fromJson(ruleZkDTO.getHandle(), DivideRuleHandle.class);
        DivideUpstream selector = LoadBalanceUtils.selector(findUpstreamListBySelectorId, divideRuleHandle.getLoadBalance(), ((InetSocketAddress) Objects.requireNonNull(serverWebExchange.getRequest().getRemoteAddress())).getAddress().getHostAddress());
        if (Objects.isNull(selector)) {
            LogUtils.error(LOGGER, () -> {
                return "LoadBalance has error！";
            });
            return soulPluginChain.execute(serverWebExchange);
        }
        URI uri = UriComponentsBuilder.fromUri(URI.create(buildWsRealPath(selector, requestDTO))).scheme((String) Optional.ofNullable(selector.getProtocol()).orElse("ws")).build().toUri();
        HttpHeaders headers = serverWebExchange.getRequest().getHeaders();
        return this.webSocketService.handleRequest(serverWebExchange, new SoulWebSocketHandler(uri, this.webSocketClient, filterHeaders(headers), buildWsProtocols(headers)));
    }

    private String buildWsRealPath(DivideUpstream divideUpstream, RequestDTO requestDTO) {
        return divideUpstream.getProtocol() + "://" + divideUpstream.getUpstreamUrl() + requestDTO.getMethod();
    }

    private List<String> buildWsProtocols(HttpHeaders httpHeaders) {
        List<String> list = httpHeaders.get(SEC_WEB_SOCKET_PROTOCOL);
        if (CollectionUtils.isNotEmpty(list)) {
            list = (List) list.stream().flatMap(str -> {
                return Arrays.stream(StringUtils.commaDelimitedListToStringArray(str));
            }).map((v0) -> {
                return v0.trim();
            }).collect(Collectors.toList());
        }
        return list;
    }

    private HttpHeaders filterHeaders(HttpHeaders httpHeaders) {
        HttpHeaders httpHeaders2 = new HttpHeaders();
        httpHeaders.entrySet().stream().filter(entry -> {
            return !((String) entry.getKey()).toLowerCase().startsWith("sec-websocket");
        }).forEach(entry2 -> {
            httpHeaders2.addAll((String) entry2.getKey(), (List) entry2.getValue());
        });
        return httpHeaders2;
    }

    @Override // org.dromara.soul.web.plugin.SoulPlugin
    public String named() {
        return PluginEnum.DIVIDE.getName();
    }

    @Override // org.dromara.soul.web.plugin.SoulPlugin
    public Boolean skip(ServerWebExchange serverWebExchange) {
        return Boolean.valueOf(!Objects.equals(((RequestDTO) Objects.requireNonNull((RequestDTO) serverWebExchange.getAttribute("requestDTO"))).getRpcType(), RpcTypeEnum.WEB_SOCKET.getName()));
    }

    @Override // org.dromara.soul.web.plugin.SoulPlugin
    public PluginTypeEnum pluginType() {
        return PluginTypeEnum.FUNCTION;
    }

    @Override // org.dromara.soul.web.plugin.SoulPlugin
    public int getOrder() {
        return PluginEnum.WEB_SOCKET.getCode();
    }
}
