package org.springframework.cloud.stream.binder.kafka.provisioning;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaTopicProperties;
import org.springframework.cloud.stream.binder.kafka.utils.KafkaTopicUtils;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner.class */
public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>>, InitializingBean {
    private static final Log logger = LogFactory.getLog(KafkaTopicProvisioner.class);
    private static final int DEFAULT_OPERATION_TIMEOUT = 30;
    private final KafkaBinderConfigurationProperties configurationProperties;
    private final int operationTimeout = DEFAULT_OPERATION_TIMEOUT;
    private final Map<String, Object> adminClientProperties;
    private RetryOperations metadataRetryOperations;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner$KafkaConsumerDestination.class */
    public static final class KafkaConsumerDestination implements ConsumerDestination {
        private final String consumerDestinationName;
        private final int partitions;
        private final String dlqName;

        KafkaConsumerDestination(String str) {
            this(str, 0, null);
        }

        KafkaConsumerDestination(String str, int i) {
            this(str, Integer.valueOf(i), null);
        }

        KafkaConsumerDestination(String str, Integer num, String str2) {
            this.consumerDestinationName = str;
            this.partitions = num.intValue();
            this.dlqName = str2;
        }

        public String getName() {
            return this.consumerDestinationName;
        }

        public String toString() {
            return "KafkaConsumerDestination{consumerDestinationName='" + this.consumerDestinationName + "', partitions=" + this.partitions + ", dlqName='" + this.dlqName + "'}";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner$KafkaProducerDestination.class */
    public static final class KafkaProducerDestination implements ProducerDestination {
        private final String producerDestinationName;
        private final int partitions;

        KafkaProducerDestination(String str, Integer num) {
            this.producerDestinationName = str;
            this.partitions = num.intValue();
        }

        public String getName() {
            return this.producerDestinationName;
        }

        public String getNameForPartition(int i) {
            return this.producerDestinationName;
        }

        public String toString() {
            return "KafkaProducerDestination{producerDestinationName='" + this.producerDestinationName + "', partitions=" + this.partitions + '}';
        }
    }

    public KafkaTopicProvisioner(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties, KafkaProperties kafkaProperties) {
        Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null");
        this.adminClientProperties = kafkaProperties.buildAdminProperties();
        this.configurationProperties = kafkaBinderConfigurationProperties;
        normalalizeBootPropsWithBinder(this.adminClientProperties, kafkaProperties, kafkaBinderConfigurationProperties);
    }

    public void setMetadataRetryOperations(RetryOperations retryOperations) {
        this.metadataRetryOperations = retryOperations;
    }

    public void afterPropertiesSet() {
        if (this.metadataRetryOperations == null) {
            RetryTemplate retryTemplate = new RetryTemplate();
            SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
            simpleRetryPolicy.setMaxAttempts(10);
            retryTemplate.setRetryPolicy(simpleRetryPolicy);
            ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
            exponentialBackOffPolicy.setInitialInterval(100L);
            exponentialBackOffPolicy.setMultiplier(2.0d);
            exponentialBackOffPolicy.setMaxInterval(1000L);
            retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
            this.metadataRetryOperations = retryTemplate;
        }
    }

    public ProducerDestination provisionProducerDestination(String str, ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties) {
        if (logger.isInfoEnabled()) {
            logger.info("Using kafka topic for outbound: " + str);
        }
        KafkaTopicUtils.validateTopicName(str);
        AdminClient create = AdminClient.create(this.adminClientProperties);
        Throwable th = null;
        try {
            try {
                createTopic(create, str, extendedProducerProperties.getPartitionCount(), false, ((KafkaProducerProperties) extendedProducerProperties.getExtension()).getTopic());
                int i = 0;
                HashMap hashMap = new HashMap();
                if (this.configurationProperties.isAutoCreateTopics()) {
                    this.metadataRetryOperations.execute(retryContext -> {
                        try {
                            if (logger.isDebugEnabled()) {
                                logger.debug("Attempting to retrieve the description for the topic: " + str);
                            }
                            KafkaFuture all = create.describeTopics(Collections.singletonList(str)).all();
                            getClass();
                            hashMap.putAll((Map) all.get(30L, TimeUnit.SECONDS));
                            return null;
                        } catch (Exception e) {
                            throw new ProvisioningException("Problems encountered with partitions finding", e);
                        }
                    });
                }
                TopicDescription topicDescription = (TopicDescription) hashMap.get(str);
                if (topicDescription != null) {
                    i = topicDescription.partitions().size();
                }
                KafkaProducerDestination kafkaProducerDestination = new KafkaProducerDestination(str, Integer.valueOf(i));
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                return kafkaProducerDestination;
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    public ConsumerDestination provisionConsumerDestination(String str, String str2, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        if (!extendedConsumerProperties.isMultiplex()) {
            return doProvisionConsumerDestination(str, str2, extendedConsumerProperties);
        }
        for (String str3 : StringUtils.commaDelimitedListToStringArray(str)) {
            doProvisionConsumerDestination(str3.trim(), str2, extendedConsumerProperties);
        }
        return new KafkaConsumerDestination(str);
    }

    private ConsumerDestination doProvisionConsumerDestination(String str, String str2, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        if (((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isDestinationIsPattern()) {
            Assert.isTrue(!((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isEnableDlq(), "enableDLQ is not allowed when listening to topic patterns");
            if (logger.isDebugEnabled()) {
                logger.debug("Listening to a topic pattern - " + str + " - no provisioning performed");
            }
            return new KafkaConsumerDestination(str);
        }
        KafkaTopicUtils.validateTopicName(str);
        boolean z = !StringUtils.hasText(str2);
        Assert.isTrue((z && ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isEnableDlq()) ? false : true, "DLQ support is not available for anonymous subscriptions");
        if (extendedConsumerProperties.getInstanceCount() == 0) {
            throw new IllegalArgumentException("Instance count cannot be zero");
        }
        int instanceCount = extendedConsumerProperties.getInstanceCount() * extendedConsumerProperties.getConcurrency();
        ConsumerDestination kafkaConsumerDestination = new KafkaConsumerDestination(str);
        AdminClient createAdminClient = createAdminClient();
        Throwable th = null;
        try {
            try {
                createTopic(createAdminClient, str, instanceCount, ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isAutoRebalanceEnabled(), ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getTopic());
                if (this.configurationProperties.isAutoCreateTopics()) {
                    KafkaFuture all = createAdminClient.describeTopics(Collections.singletonList(str)).all();
                    try {
                        getClass();
                        int size = ((TopicDescription) ((Map) all.get(30L, TimeUnit.SECONDS)).get(str)).partitions().size();
                        kafkaConsumerDestination = createDlqIfNeedBe(createAdminClient, str, str2, extendedConsumerProperties, z, size);
                        if (kafkaConsumerDestination == null) {
                            kafkaConsumerDestination = new KafkaConsumerDestination(str, size);
                        }
                    } catch (Exception e) {
                        throw new ProvisioningException("provisioning exception", e);
                    }
                }
                if (createAdminClient != null) {
                    if (0 != 0) {
                        try {
                            createAdminClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createAdminClient.close();
                    }
                }
                return kafkaConsumerDestination;
            } finally {
            }
        } catch (Throwable th3) {
            if (createAdminClient != null) {
                if (th != null) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAdminClient.close();
                }
            }
            throw th3;
        }
    }

    AdminClient createAdminClient() {
        return AdminClient.create(this.adminClientProperties);
    }

    public static void normalalizeBootPropsWithBinder(Map<String, Object> map, KafkaProperties kafkaProperties, KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
        String kafkaConnectionString = kafkaBinderConfigurationProperties.getKafkaConnectionString();
        if (ObjectUtils.isEmpty(map.get("bootstrap.servers")) || !kafkaConnectionString.equals(kafkaBinderConfigurationProperties.getDefaultKafkaConnectionString())) {
            map.put("bootstrap.servers", kafkaConnectionString);
        }
        Map<String, String> configuration = kafkaBinderConfigurationProperties.getConfiguration();
        Set configNames = AdminClientConfig.configNames();
        configuration.forEach((str, str2) -> {
            Object put;
            if (str.equals("bootstrap.servers")) {
                throw new IllegalStateException("Set binder bootstrap servers via the 'brokers' property, not 'configuration'");
            }
            if (configNames.contains(str) && (put = map.put(str, str2)) != null && logger.isDebugEnabled()) {
                logger.debug("Overrode boot property: [" + str + "], from: [" + put + "] to: [" + str2 + "]");
            }
        });
    }

    private ConsumerDestination createDlqIfNeedBe(AdminClient adminClient, String str, String str2, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties, boolean z, int i) {
        if (!((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isEnableDlq() || z) {
            return null;
        }
        String dlqName = StringUtils.hasText(((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getDlqName()) ? ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getDlqName() : "error." + str + "." + str2;
        try {
            createTopicAndPartitions(adminClient, dlqName, ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getDlqPartitions() == null ? i : ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getDlqPartitions().intValue(), ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isAutoRebalanceEnabled(), ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getTopic());
            return new KafkaConsumerDestination(str, Integer.valueOf(i), dlqName);
        } catch (Throwable th) {
            if (th instanceof Error) {
                throw ((Error) th);
            }
            throw new ProvisioningException("provisioning exception", th);
        }
    }

    private void createTopic(AdminClient adminClient, String str, int i, boolean z, KafkaTopicProperties kafkaTopicProperties) {
        try {
            createTopicIfNecessary(adminClient, str, i, z, kafkaTopicProperties);
        } catch (Throwable th) {
            if (!(th instanceof Error)) {
                throw new ProvisioningException("Provisioning exception", th);
            }
            throw ((Error) th);
        }
    }

    private void createTopicIfNecessary(AdminClient adminClient, String str, int i, boolean z, KafkaTopicProperties kafkaTopicProperties) throws Throwable {
        if (this.configurationProperties.isAutoCreateTopics()) {
            createTopicAndPartitions(adminClient, str, i, z, kafkaTopicProperties);
        } else {
            logger.info("Auto creation of topics is disabled.");
        }
    }

    private void createTopicAndPartitions(AdminClient adminClient, String str, int i, boolean z, KafkaTopicProperties kafkaTopicProperties) throws Throwable {
        KafkaFuture names = adminClient.listTopics().names();
        getClass();
        if (!((Set) names.get(30L, TimeUnit.SECONDS)).contains(str)) {
            int max = Math.max(this.configurationProperties.getMinPartitionCount(), i);
            this.metadataRetryOperations.execute(retryContext -> {
                NewTopic newTopic;
                Map<Integer, List<Integer>> replicasAssignments = kafkaTopicProperties.getReplicasAssignments();
                if (replicasAssignments == null || replicasAssignments.size() <= 0) {
                    newTopic = new NewTopic(str, max, kafkaTopicProperties.getReplicationFactor() != null ? kafkaTopicProperties.getReplicationFactor().shortValue() : this.configurationProperties.getReplicationFactor());
                } else {
                    newTopic = new NewTopic(str, kafkaTopicProperties.getReplicasAssignments());
                }
                if (kafkaTopicProperties.getProperties().size() > 0) {
                    newTopic.configs(kafkaTopicProperties.getProperties());
                }
                try {
                    KafkaFuture all = adminClient.createTopics(Collections.singletonList(newTopic)).all();
                    getClass();
                    all.get(30L, TimeUnit.SECONDS);
                    return null;
                } catch (Exception e) {
                    if (!(e instanceof ExecutionException)) {
                        logger.error("Failed to create topics", e.getCause());
                        throw e.getCause();
                    }
                    if (!(e.getCause() instanceof TopicExistsException)) {
                        logger.error("Failed to create topics", e.getCause());
                        throw e.getCause();
                    }
                    if (!logger.isWarnEnabled()) {
                        return null;
                    }
                    logger.warn("Attempt to create topic: " + str + ". Topic already exists.");
                    return null;
                }
            });
            return;
        }
        int max2 = this.configurationProperties.isAutoAddPartitions() ? Math.max(this.configurationProperties.getMinPartitionCount(), i) : i;
        KafkaFuture all = adminClient.describeTopics(Collections.singletonList(str)).all();
        getClass();
        int size = ((TopicDescription) ((Map) all.get(30L, TimeUnit.SECONDS)).get(str)).partitions().size();
        if (size < max2) {
            if (this.configurationProperties.isAutoAddPartitions()) {
                KafkaFuture all2 = adminClient.createPartitions(Collections.singletonMap(str, NewPartitions.increaseTo(max2))).all();
                getClass();
                all2.get(30L, TimeUnit.SECONDS);
            } else {
                if (!z) {
                    throw new ProvisioningException("The number of expected partitions was: " + i + ", but " + size + (size > 1 ? " have " : " has ") + "been found instead.Consider either increasing the partition count of the topic or enabling `autoAddPartitions`");
                }
                logger.warn("The number of expected partitions was: " + i + ", but " + size + (size > 1 ? " have " : " has ") + "been found instead.There will be " + (max2 - size) + " idle consumers");
            }
        }
    }

    public Collection<PartitionInfo> getPartitionsForTopic(int i, boolean z, Callable<Collection<PartitionInfo>> callable, String str) {
        try {
            return (Collection) this.metadataRetryOperations.execute(retryContext -> {
                List emptyList = Collections.emptyList();
                try {
                    emptyList = (Collection) callable.call();
                } catch (Exception e) {
                    if (e instanceof UnknownTopicOrPartitionException) {
                        throw e;
                    }
                    logger.error("Failed to obtain partition information", e);
                }
                if (CollectionUtils.isEmpty(emptyList)) {
                    try {
                        AdminClient create = AdminClient.create(this.adminClientProperties);
                        Throwable th = null;
                        try {
                            try {
                                create.describeTopics(Collections.singletonList(str)).all().get();
                                if (create != null) {
                                    if (0 != 0) {
                                        try {
                                            create.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        create.close();
                                    }
                                }
                            } finally {
                            }
                        } finally {
                        }
                    } catch (ExecutionException e2) {
                        if (e2.getCause() instanceof UnknownTopicOrPartitionException) {
                            throw e2.getCause();
                        }
                        logger.warn("No partitions have been retrieved for the topic (" + str + "). This will affect the health check.");
                    }
                }
                int size = CollectionUtils.isEmpty(emptyList) ? 0 : emptyList.size();
                if (size < i) {
                    if (!z) {
                        throw new IllegalStateException("The number of expected partitions was: " + i + ", but " + size + (size > 1 ? " have " : " has ") + "been found instead");
                    }
                    logger.warn("The number of expected partitions was: " + i + ", but " + size + (size > 1 ? " have " : " has ") + "been found instead.There will be " + (i - size) + " idle consumers");
                }
                return emptyList;
            });
        } catch (Exception e) {
            logger.error("Cannot initialize Binder", e);
            throw new BinderException("Cannot initialize binder:", e);
        }
    }
}
