/*
 * Decompiled with CFR 0.152.
 */
package io.specmesh.kafka.provision;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.specmesh.kafka.provision.ProvisioningException;
import io.specmesh.kafka.provision.Status;
import io.specmesh.kafka.provision.TopicProvisioner;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreatePartitionsOptions;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;

public class TopicMutators {

    @SuppressFBWarnings(value={"EI_EXPOSE_REP2"}, justification="adminClient() passed as param to prevent API pollution")
    public static final class TopicMutatorBuilder {
        private Admin adminClient;
        private boolean noop;
        private boolean cleanUnspecified;
        private boolean dryRun;

        private TopicMutatorBuilder() {
        }

        public TopicMutatorBuilder adminClient(Admin adminClient) {
            this.adminClient = adminClient;
            return this;
        }

        public TopicMutatorBuilder noopMutator(boolean dryRun) {
            this.noop = dryRun;
            return this;
        }

        public TopicMutatorBuilder cleanUnspecified(boolean cleanUnspecified, boolean dryRun) {
            this.cleanUnspecified = cleanUnspecified;
            this.dryRun = dryRun;
            return this;
        }

        public static TopicMutatorBuilder builder() {
            return new TopicMutatorBuilder();
        }

        public TopicMutator build() {
            if (this.cleanUnspecified) {
                return new CleanUnspecifiedMutator(this.dryRun, this.adminClient);
            }
            if (this.noop) {
                return new NoopMutator();
            }
            return new CollectiveMutator(new CreateMutator(this.adminClient), new UpdateMutator(this.adminClient));
        }
    }

    static interface TopicMutator {
        public Collection<TopicProvisioner.Topic> mutate(Collection<TopicProvisioner.Topic> var1);
    }

    public static final class NoopMutator
    implements TopicMutator {
        @Override
        public Collection<TopicProvisioner.Topic> mutate(Collection<TopicProvisioner.Topic> topics) throws ProvisioningException {
            return topics;
        }
    }

    public static final class CreateMutator
    implements TopicMutator {
        private final Admin adminClient;

        private CreateMutator(Admin adminClient) {
            this.adminClient = adminClient;
        }

        @Override
        public Collection<TopicProvisioner.Topic> mutate(Collection<TopicProvisioner.Topic> topics) throws ProvisioningException {
            List<TopicProvisioner.Topic> topicsToCreate = topics.stream().filter(topic -> topic.state().equals((Object)Status.STATE.CREATE)).collect(Collectors.toList());
            if (topicsToCreate.isEmpty()) {
                return topicsToCreate;
            }
            try {
                this.adminClient.createTopics(this.asNewTopic(topicsToCreate)).all().get(60L, TimeUnit.SECONDS);
                return topicsToCreate.stream().map(topic -> topic.state(Status.STATE.CREATED)).collect(Collectors.toList());
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                topicsToCreate.forEach(topic -> topic.exception(new ProvisioningException("failed to write topics", e)));
                return topics;
            }
        }

        private Collection<NewTopic> asNewTopic(Collection<TopicProvisioner.Topic> topics) {
            return topics.stream().map(topic -> new NewTopic(topic.name(), topic.partitions(), topic.replication()).configs(topic.config())).collect(Collectors.toList());
        }
    }

    public static final class CleanUnspecifiedMutator
    implements TopicMutator {
        private final boolean dryRun;
        private final Admin adminClient;

        CleanUnspecifiedMutator(boolean dryRun, Admin adminClient) {
            this.dryRun = dryRun;
            this.adminClient = adminClient;
        }

        @Override
        public Collection<TopicProvisioner.Topic> mutate(Collection<TopicProvisioner.Topic> topics) throws ProvisioningException {
            List<TopicProvisioner.Topic> unwanted = topics.stream().filter(topic -> !topic.state().equals((Object)Status.STATE.CREATE) && !topic.state().equals((Object)Status.STATE.UPDATE)).collect(Collectors.toList());
            try {
                unwanted.forEach(topic -> topic.state(Status.STATE.DELETE));
                if (!this.dryRun) {
                    this.adminClient.deleteTopics(this.toTopicNames(unwanted)).all().get(60L, TimeUnit.SECONDS);
                    unwanted.forEach(topic -> topic.state(Status.STATE.DELETED));
                }
            }
            catch (InterruptedException | ExecutionException | TimeoutException ex) {
                unwanted.forEach(topic -> topic.exception(new ProvisioningException("failed to delete topics", ex)));
            }
            return unwanted;
        }

        private List<String> toTopicNames(List<TopicProvisioner.Topic> topicsToUpdate) {
            return topicsToUpdate.stream().map(TopicProvisioner.Topic::name).collect(Collectors.toList());
        }
    }

    public static final class UpdateMutator
    implements TopicMutator {
        private final Admin adminClient;

        UpdateMutator(Admin adminClient) {
            this.adminClient = adminClient;
        }

        @Override
        public Collection<TopicProvisioner.Topic> mutate(Collection<TopicProvisioner.Topic> topics) throws ProvisioningException {
            List<TopicProvisioner.Topic> topicsToUpdate = topics.stream().filter(topic -> topic.state().equals((Object)Status.STATE.UPDATE)).collect(Collectors.toList());
            if (topicsToUpdate.isEmpty()) {
                return topicsToUpdate;
            }
            List<String> topicNames = this.toTopicNames(topicsToUpdate);
            Map describeTopics = this.adminClient.describeTopics(topicNames).topicNameValues();
            topicsToUpdate.forEach(topic -> {
                try {
                    TopicDescription description = (TopicDescription)((KafkaFuture)describeTopics.get(topic.name())).get(60L, TimeUnit.SECONDS);
                    this.updatePartitions((TopicProvisioner.Topic)topic, description);
                    this.updateConfigs((TopicProvisioner.Topic)topic);
                }
                catch (InterruptedException | ExecutionException | TimeoutException ex) {
                    throw new ProvisioningException("Failed to update configs", ex);
                }
            });
            return topicsToUpdate;
        }

        private List<String> toTopicNames(List<TopicProvisioner.Topic> topicsToUpdate) {
            return topicsToUpdate.stream().map(TopicProvisioner.Topic::name).collect(Collectors.toList());
        }

        private void updateConfigs(TopicProvisioner.Topic topic) {
            try {
                List alterConfigOps = topic.config().entrySet().stream().map(entry -> new AlterConfigOp(new ConfigEntry((String)entry.getKey(), (String)entry.getValue()), AlterConfigOp.OpType.SET)).collect(Collectors.toList());
                Map configs = Map.of(new ConfigResource(ConfigResource.Type.TOPIC, topic.name()), alterConfigOps);
                this.adminClient.incrementalAlterConfigs(configs, new AlterConfigsOptions().timeoutMs(Integer.valueOf(60)));
                topic.messages(topic.messages() + "\nUpdated config: retention.ms -> " + topic.config().get("retention.ms"));
                topic.state(Status.STATE.UPDATED);
            }
            catch (Exception ex) {
                topic.exception(new ProvisioningException("Failed to update config ", ex));
            }
        }

        private void updatePartitions(TopicProvisioner.Topic topic, TopicDescription description) {
            try {
                if (description.partitions().size() < topic.partitions()) {
                    CreatePartitionsResult parts = this.adminClient.createPartitions(Map.of(topic.name(), NewPartitions.increaseTo((int)topic.partitions())), new CreatePartitionsOptions().retryOnQuotaViolation(false));
                    parts.all().get(60L, TimeUnit.SECONDS);
                    topic.state(Status.STATE.UPDATED);
                    topic.messages(topic.messages() + "\n Updated partitionCount");
                } else {
                    topic.messages(topic.messages() + "\n Ignoring partition increase because new count is not higher");
                }
            }
            catch (Exception ex) {
                topic.exception(new ProvisioningException("Failed to update partitions", ex));
            }
        }
    }

    public static final class CollectiveMutator
    implements TopicMutator {
        private final Stream<TopicMutator> writers;

        private CollectiveMutator(TopicMutator ... writers) {
            this.writers = Arrays.stream(writers);
        }

        @Override
        public Collection<TopicProvisioner.Topic> mutate(Collection<TopicProvisioner.Topic> topics) {
            return this.writers.map(writer -> writer.mutate(topics)).flatMap(Collection::stream).collect(Collectors.toList());
        }
    }
}

