/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.mirror.integration;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
import org.apache.kafka.connect.mirror.MirrorMaker;
import org.apache.kafka.connect.mirror.MirrorSourceConnector;
import org.apache.kafka.connect.mirror.SourceAndTarget;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.FutureCallback;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag(value="integration")
public class DedicatedMirrorIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger(DedicatedMirrorIntegrationTest.class);
    private static final int TOPIC_CREATION_TIMEOUT_MS = 30000;
    private static final int TOPIC_REPLICATION_TIMEOUT_MS = 30000;
    private static final long MM_START_UP_TIMEOUT_MS = 120000L;
    private Map<String, EmbeddedKafkaCluster> kafkaClusters;
    private Map<String, MirrorMaker> mirrorMakers;

    @BeforeEach
    public void setup() {
        this.kafkaClusters = new HashMap<String, EmbeddedKafkaCluster>();
        this.mirrorMakers = new HashMap<String, MirrorMaker>();
    }

    @AfterEach
    public void teardown() throws Throwable {
        AtomicReference shutdownFailure = new AtomicReference();
        this.mirrorMakers.forEach((name, mirrorMaker) -> Utils.closeQuietly(() -> ((MirrorMaker)mirrorMaker).stop(), (String)("MirrorMaker worker '" + name + "'"), (AtomicReference)shutdownFailure));
        this.mirrorMakers.forEach((name, mirrorMaker) -> mirrorMaker.awaitStop());
        this.kafkaClusters.forEach((name, kafkaCluster) -> Utils.closeQuietly(() -> ((EmbeddedKafkaCluster)kafkaCluster).stop(), (String)("Embedded Kafka cluster '" + name + "'"), (AtomicReference)shutdownFailure));
        if (shutdownFailure.get() != null) {
            throw (Throwable)shutdownFailure.get();
        }
    }

    private EmbeddedKafkaCluster startKafkaCluster(String name, int numBrokers, Properties brokerProperties) {
        if (this.kafkaClusters.containsKey(name)) {
            throw new IllegalStateException("Cannot register multiple Kafka clusters with the same name");
        }
        EmbeddedKafkaCluster result = new EmbeddedKafkaCluster(numBrokers, brokerProperties);
        this.kafkaClusters.put(name, result);
        result.start();
        return result;
    }

    private MirrorMaker startMirrorMaker(String name, Map<String, String> mmProps) {
        if (this.mirrorMakers.containsKey(name)) {
            throw new IllegalStateException("Cannot register multiple MirrorMaker nodes with the same name");
        }
        MirrorMaker result = new MirrorMaker(mmProps);
        this.mirrorMakers.put(name, result);
        result.start();
        return result;
    }

    private void stopMirrorMaker(String name) {
        MirrorMaker mirror = this.mirrorMakers.remove(name);
        if (mirror == null) {
            throw new IllegalStateException("No MirrorMaker named " + name + " has been started");
        }
        mirror.stop();
    }

    @Test
    public void testSingleNodeCluster() throws Exception {
        Properties brokerProps = new Properties();
        final EmbeddedKafkaCluster clusterA = this.startKafkaCluster("A", 1, brokerProps);
        final EmbeddedKafkaCluster clusterB = this.startKafkaCluster("B", 1, brokerProps);
        try (Admin adminB = clusterB.createAdminClient();){
            String a = "A";
            String b = "B";
            String ab = "A->B";
            String ba = "B->A";
            String testTopicPrefix = "test-topic-";
            HashMap<String, String> mmProps = new HashMap<String, String>(){
                {
                    this.put("dedicated.mode.enable.internal.rest", "false");
                    this.put("listeners", "http://localhost:0");
                    this.put("refresh.topics.interval.seconds", "1");
                    this.put("clusters", String.join((CharSequence)", ", "A", "B"));
                    this.put("A.bootstrap.servers", clusterA.bootstrapServers());
                    this.put("B.bootstrap.servers", clusterB.bootstrapServers());
                    this.put("A->B.enabled", "true");
                    this.put("A->B.topics", "^test-topic-.*");
                    this.put("B->A.enabled", "false");
                    this.put("B->A.emit.heartbeats.enabled", "false");
                    this.put("replication.factor", "1");
                    this.put("checkpoints.topic.replication.factor", "1");
                    this.put("heartbeats.topic.replication.factor", "1");
                    this.put("offset-syncs.topic.replication.factor", "1");
                    this.put("offset.storage.replication.factor", "1");
                    this.put("status.storage.replication.factor", "1");
                    this.put("config.storage.replication.factor", "1");
                }
            };
            MirrorMaker mm = this.startMirrorMaker("single node", (Map<String, String>)mmProps);
            SourceAndTarget sourceAndTarget = new SourceAndTarget("A", "B");
            this.awaitMirrorMakerStart(mm, sourceAndTarget);
            this.awaitConnectorTasksStart(mm, MirrorHeartbeatConnector.class, sourceAndTarget);
            int numMessages = 10;
            String topic = "test-topic-1";
            clusterA.createTopic(topic, 1);
            this.awaitTopicCreation("B", adminB, "A." + topic);
            this.awaitConnectorTasksStart(mm, MirrorSourceConnector.class, sourceAndTarget);
            this.writeToTopic(clusterA, topic, 10);
            this.awaitTopicContent(clusterB, "B", "A." + topic, 10);
        }
    }

    @Test
    public void testClusterWithEmitOffsetDisabled() throws Exception {
        Properties brokerProps = new Properties();
        final EmbeddedKafkaCluster clusterA = this.startKafkaCluster("A", 1, brokerProps);
        final EmbeddedKafkaCluster clusterB = this.startKafkaCluster("B", 1, brokerProps);
        try (Admin adminB = clusterB.createAdminClient();){
            String a = "A";
            String b = "B";
            String ab = "A->B";
            String testTopicPrefix = "test-topic-";
            HashMap<String, String> mmProps = new HashMap<String, String>(){
                {
                    this.put("dedicated.mode.enable.internal.rest", "false");
                    this.put("listeners", "http://localhost:0");
                    this.put("refresh.topics.interval.seconds", "1");
                    this.put("clusters", String.join((CharSequence)", ", "A", "B"));
                    this.put("A.bootstrap.servers", clusterA.bootstrapServers());
                    this.put("B.bootstrap.servers", clusterB.bootstrapServers());
                    this.put("A->B.enabled", "true");
                    this.put("A->B.topics", "^test-topic-.*");
                    this.put("replication.factor", "1");
                    this.put("checkpoints.topic.replication.factor", "1");
                    this.put("heartbeats.topic.replication.factor", "1");
                    this.put("emit.offset-syncs.enabled", "false");
                    this.put("status.storage.replication.factor", "1");
                    this.put("offset.storage.replication.factor", "1");
                    this.put("config.storage.replication.factor", "1");
                }
            };
            MirrorMaker mm = this.startMirrorMaker("no-offset-syncing", (Map<String, String>)mmProps);
            SourceAndTarget sourceAndTarget = new SourceAndTarget("A", "B");
            this.awaitMirrorMakerStart(mm, sourceAndTarget, Arrays.asList(MirrorSourceConnector.class, MirrorHeartbeatConnector.class));
            this.awaitConnectorTasksStart(mm, MirrorHeartbeatConnector.class, sourceAndTarget);
            int numMessages = 10;
            String topic = "test-topic-1";
            clusterA.createTopic(topic, 1);
            this.awaitTopicCreation("B", adminB, "A." + topic);
            this.awaitConnectorTasksStart(mm, MirrorSourceConnector.class, sourceAndTarget);
            this.writeToTopic(clusterA, topic, 10);
            this.awaitTopicContent(clusterB, "B", "A." + topic, 10);
            List offsetSyncTopic = clusterA.describeTopics(new String[]{"mm2-offset-syncs.B.internal"}).values().stream().filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
            Assertions.assertTrue((boolean)offsetSyncTopic.isEmpty());
        }
    }

    @Test
    public void testMultiNodeCluster() throws Exception {
        Properties brokerProps = new Properties();
        brokerProps.put("transaction.state.log.replication.factor", "1");
        brokerProps.put("transaction.state.log.min.isr", "1");
        final EmbeddedKafkaCluster clusterA = this.startKafkaCluster("A", 1, brokerProps);
        final EmbeddedKafkaCluster clusterB = this.startKafkaCluster("B", 1, brokerProps);
        try (Admin adminB = clusterB.createAdminClient();){
            String a = "A";
            String b = "B-_~:?#[]@!$&'()*+=\"<>{}|^`618";
            String ab = "A->B-_~:?#[]@!$&'()*+=\"<>{}|^`618";
            String ba = "B-_~:?#[]@!$&'()*+=\"<>{}|^`618->A";
            String testTopicPrefix = "test-topic-";
            HashMap<String, String> mmProps = new HashMap<String, String>(){
                {
                    this.put("dedicated.mode.enable.internal.rest", "true");
                    this.put("listeners", "http://localhost:0");
                    this.put("refresh.topics.interval.seconds", "1");
                    this.put("clusters", String.join((CharSequence)", ", "A", "B-_~:?#[]@!$&'()*+=\"<>{}|^`618"));
                    this.put("A.bootstrap.servers", clusterA.bootstrapServers());
                    this.put("B-_~:?#[]@!$&'()*+=\"<>{}|^`618.bootstrap.servers", clusterB.bootstrapServers());
                    this.put("B-_~:?#[]@!$&'()*+=\"<>{}|^`618.exactly.once.source.support", "enabled");
                    this.put("A.consumer.isolation.level", "read_committed");
                    this.put("A->B-_~:?#[]@!$&'()*+=\"<>{}|^`618.enabled", "true");
                    this.put("A->B-_~:?#[]@!$&'()*+=\"<>{}|^`618.topics", "^test-topic-.*");
                    this.put("A->B-_~:?#[]@!$&'()*+=\"<>{}|^`618.offset-syncs.topic.location", "target");
                    this.put("B-_~:?#[]@!$&'()*+=\"<>{}|^`618->A.enabled", "false");
                    this.put("B-_~:?#[]@!$&'()*+=\"<>{}|^`618->A.emit.heartbeats.enabled", "false");
                    this.put("replication.factor", "1");
                    this.put("checkpoints.topic.replication.factor", "1");
                    this.put("heartbeats.topic.replication.factor", "1");
                    this.put("offset-syncs.topic.replication.factor", "1");
                    this.put("offset.storage.replication.factor", "1");
                    this.put("status.storage.replication.factor", "1");
                    this.put("config.storage.replication.factor", "1");
                    this.put("A.scheduled.rebalance.max.delay.ms", "1000");
                    this.put("B-_~:?#[]@!$&'()*+=\"<>{}|^`618.scheduled.rebalance.max.delay.ms", "1000");
                }
            };
            SourceAndTarget sourceAndTarget = new SourceAndTarget("A", "B-_~:?#[]@!$&'()*+=\"<>{}|^`618");
            int numNodes = 3;
            for (int i = 0; i < 3; ++i) {
                this.startMirrorMaker("node " + i, (Map<String, String>)mmProps);
            }
            this.awaitMirrorMakerStart(this.mirrorMakers.get("node 0"), sourceAndTarget);
            this.awaitConnectorTasksStart(this.mirrorMakers.get("node 0"), MirrorHeartbeatConnector.class, sourceAndTarget);
            int messagesPerTopic = 10;
            for (int i = 0; i < 3; ++i) {
                String topic = "test-topic-" + i;
                clusterA.createTopic(topic, 1);
                this.awaitTopicCreation("B-_~:?#[]@!$&'()*+=\"<>{}|^`618", adminB, "A." + topic);
                this.awaitConnectorTasksStart(this.mirrorMakers.get("node " + i), MirrorSourceConnector.class, sourceAndTarget);
                this.writeToTopic(clusterA, topic, 10);
                this.awaitTopicContent(clusterB, "B-_~:?#[]@!$&'()*+=\"<>{}|^`618", "A." + topic, 10);
            }
            HashMap<String, String> newMmProps = new HashMap<String, String>(mmProps);
            String newConfigValue = "2";
            newMmProps.put("refresh.topics.interval.seconds", newConfigValue);
            for (int i = 0; i < 3; ++i) {
                this.stopMirrorMaker("node " + i);
                MirrorMaker any = this.mirrorMakers.values().stream().findAny().get();
                this.awaitConnectorTasksStart(any, MirrorHeartbeatConnector.class, sourceAndTarget);
                this.awaitConnectorTasksStart(any, MirrorSourceConnector.class, sourceAndTarget);
                this.startMirrorMaker("node " + i, newMmProps);
            }
            this.awaitTaskConfigurations(this.mirrorMakers.get("node 0"), MirrorSourceConnector.class, sourceAndTarget, config -> newConfigValue.equals(config.get("refresh.topics.interval.seconds")));
        }
    }

    private void awaitTopicCreation(String clusterName, Admin admin, String topic) throws Exception {
        TestUtils.waitForCondition(() -> {
            try {
                Set allTopics = (Set)admin.listTopics().names().get();
                return allTopics.contains(topic);
            }
            catch (Exception e) {
                log.debug("Failed to check for existence of topic {} on cluster {}", new Object[]{topic, clusterName, e});
                return false;
            }
        }, (long)30000L, (String)("topic " + topic + " was not created on cluster " + clusterName + " in time"));
    }

    private void writeToTopic(EmbeddedKafkaCluster cluster, String topic, int numMessages) {
        for (int i = 0; i <= numMessages; ++i) {
            cluster.produce(topic, Integer.toString(i));
        }
    }

    private void awaitMirrorMakerStart(MirrorMaker mm, SourceAndTarget sourceAndTarget) throws InterruptedException {
        this.awaitMirrorMakerStart(mm, sourceAndTarget, MirrorMaker.CONNECTOR_CLASSES);
    }

    private void awaitMirrorMakerStart(MirrorMaker mm, SourceAndTarget sourceAndTarget, List<Class<?>> connectorClasses) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            try {
                return connectorClasses.stream().allMatch(connectorClazz -> this.isConnectorRunningForMirrorMaker((Class<?>)connectorClazz, mm, sourceAndTarget));
            }
            catch (Exception ex) {
                log.error("Something unexpected occurred. Unable to check for startup status for mirror maker {}", (Object)mm, (Object)ex);
                throw new NoRetryException((Throwable)ex);
            }
        }, (long)120000L, (String)"MirrorMaker instances did not transition to running in time");
    }

    private <T extends SourceConnector> void awaitConnectorTasksStart(MirrorMaker mm, Class<T> clazz, SourceAndTarget sourceAndTarget) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            try {
                return this.isTaskRunningForMirrorMakerConnector(clazz, mm, sourceAndTarget);
            }
            catch (Exception ex) {
                log.error("Something unexpected occurred. Unable to check for startup status of connector {} for mirror maker with source->target={}", new Object[]{clazz.getSimpleName(), sourceAndTarget, ex});
                throw new NoRetryException((Throwable)ex);
            }
        }, (long)120000L, (String)("Tasks for connector " + clazz.getSimpleName() + " for MirrorMaker instances did not transition to running in time"));
    }

    private <T extends SourceConnector> void awaitTaskConfigurations(MirrorMaker mm, Class<T> clazz, SourceAndTarget sourceAndTarget, Predicate<Map<String, String>> predicate) throws InterruptedException {
        String connName = clazz.getSimpleName();
        TestUtils.waitForCondition(() -> {
            try {
                FutureCallback cb = new FutureCallback();
                mm.taskConfigs(sourceAndTarget, connName, (Callback)cb);
                return ((List)cb.get(120000L, TimeUnit.MILLISECONDS)).stream().map(TaskInfo::config).allMatch(predicate);
            }
            catch (ExecutionException ex) {
                if (ex.getCause() instanceof RebalanceNeededException) {
                    throw ex;
                }
                log.error("Something unexpected occurred. Unable to get configuration of connector {} for mirror maker with source->target={}", new Object[]{connName, sourceAndTarget, ex});
                throw new NoRetryException((Throwable)ex);
            }
        }, (long)120000L, (String)("Connector configuration for " + connName + " for MirrorMaker instances is incorrect"));
    }

    private void awaitTopicContent(EmbeddedKafkaCluster cluster, String clusterName, String topic, int numMessages) throws Exception {
        try (KafkaConsumer consumer = cluster.createConsumer(Collections.singletonMap("auto.offset.reset", "earliest"));){
            consumer.subscribe(Collections.singleton(topic));
            AtomicInteger messagesRead = new AtomicInteger(0);
            TestUtils.waitForCondition(() -> DedicatedMirrorIntegrationTest.lambda$awaitTopicContent$9((Consumer)consumer, messagesRead, numMessages), (long)30000L, () -> "could not read " + numMessages + " from topic " + topic + " on cluster " + clusterName + " in time; only read " + messagesRead.get());
        }
    }

    private boolean isConnectorRunningForMirrorMaker(Class<?> connectorClazz, MirrorMaker mm, SourceAndTarget sourceAndTarget) {
        String connName = connectorClazz.getSimpleName();
        try {
            ConnectorStateInfo connectorStatus = mm.connectorStatus(sourceAndTarget, connName);
            if (connectorStatus.connector().state().equals(AbstractStatus.State.FAILED.toString())) {
                throw new NoRetryException((Throwable)((Object)new AssertionError((Object)String.format("Connector %s is in FAILED state for MirrorMaker %s and source->target=%s", connectorClazz, mm, sourceAndTarget))));
            }
            return connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString());
        }
        catch (NotFoundException nf) {
            return false;
        }
    }

    private <T extends SourceConnector> boolean isTaskRunningForMirrorMakerConnector(Class<T> connectorClazz, MirrorMaker mm, SourceAndTarget sourceAndTarget) {
        String connName = connectorClazz.getSimpleName();
        ConnectorStateInfo connectorStatus = mm.connectorStatus(sourceAndTarget, connName);
        return this.isConnectorRunningForMirrorMaker(connectorClazz, mm, sourceAndTarget) && !connectorStatus.tasks().isEmpty() && connectorStatus.tasks().stream().allMatch(s -> {
            if (s.state().equals(AbstractStatus.State.FAILED.toString())) {
                throw new NoRetryException((Throwable)((Object)new AssertionError((Object)String.format("Task %s is in FAILED state", s))));
            }
            return s.state().equals(AbstractStatus.State.RUNNING.toString());
        });
    }

    private static /* synthetic */ boolean lambda$awaitTopicContent$9(Consumer consumer, AtomicInteger messagesRead, int numMessages) throws Exception {
        ConsumerRecords records = consumer.poll(Duration.ofSeconds(1L));
        return messagesRead.addAndGet(records.count()) >= numMessages;
    }
}

