/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.AbstractResetIntegrationTest;
import org.apache.kafka.tools.StreamsResetter;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

@Tag(value="integration")
@Timeout(value=600L)
public class ResetIntegrationTest
extends AbstractResetIntegrationTest {
    private static final String NON_EXISTING_TOPIC = "nonExistingTopic";
    public static final EmbeddedKafkaCluster CLUSTER;

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Override
    Map<String, Object> getClientSslConfig() {
        return null;
    }

    @BeforeEach
    public void before(TestInfo testInfo) throws Exception {
        cluster = CLUSTER;
        this.prepareTest(testInfo);
    }

    @AfterEach
    public void after() throws Exception {
        this.cleanupTest();
    }

    @Test
    public void shouldNotAllowToResetWhileStreamsIsRunning(TestInfo testInfo) throws Exception {
        String appID = org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName((TestInfo)testInfo);
        String[] parameters = new String[]{"--application-id", appID, "--bootstrap-server", cluster.bootstrapServers(), "--input-topics", NON_EXISTING_TOPIC};
        Properties cleanUpConfig = new Properties();
        cleanUpConfig.put("heartbeat.interval.ms", (Object)100);
        cleanUpConfig.put("session.timeout.ms", Integer.toString(2000));
        this.streamsConfig.put("application.id", appID);
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning((KafkaStreams)streams);
        int exitCode = new StreamsResetter().execute(parameters, cleanUpConfig);
        Assertions.assertEquals((int)1, (int)exitCode);
        streams.close();
    }

    @Test
    public void shouldNotAllowToResetWhenInputTopicAbsent(TestInfo testInfo) {
        String appID = org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName((TestInfo)testInfo);
        String[] parameters = new String[]{"--application-id", appID, "--bootstrap-server", cluster.bootstrapServers(), "--input-topics", NON_EXISTING_TOPIC};
        Properties cleanUpConfig = new Properties();
        cleanUpConfig.put("heartbeat.interval.ms", (Object)100);
        cleanUpConfig.put("session.timeout.ms", Integer.toString(2000));
        int exitCode = new StreamsResetter().execute(parameters, cleanUpConfig);
        Assertions.assertEquals((int)1, (int)exitCode);
    }

    @Test
    public void shouldDefaultToClassicGroupProtocol(TestInfo testInfo) {
        String appID = org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName((TestInfo)testInfo);
        String[] parameters = new String[]{"--application-id", appID, "--bootstrap-server", cluster.bootstrapServers(), "--input-topics", "inputTopic"};
        Properties cleanUpConfig = new Properties();
        cleanUpConfig.put("heartbeat.interval.ms", (Object)100);
        cleanUpConfig.put("session.timeout.ms", Integer.toString(2000));
        int exitCode = new StreamsResetter().execute(parameters, cleanUpConfig);
        Assertions.assertEquals((int)0, (int)exitCode, (String)"Resetter should use the CLASSIC group protocol");
    }

    @Test
    public void shouldAllowGroupProtocolClassic(TestInfo testInfo) {
        String appID = org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName((TestInfo)testInfo);
        String[] parameters = new String[]{"--application-id", appID, "--bootstrap-server", cluster.bootstrapServers(), "--input-topics", "inputTopic"};
        Properties cleanUpConfig = new Properties();
        cleanUpConfig.put("group.protocol", GroupProtocol.CLASSIC.name());
        cleanUpConfig.put("heartbeat.interval.ms", (Object)100);
        cleanUpConfig.put("session.timeout.ms", Integer.toString(2000));
        int exitCode = new StreamsResetter().execute(parameters, cleanUpConfig);
        Assertions.assertEquals((int)0, (int)exitCode, (String)"Resetter should allow setting group protocol to CLASSIC");
    }

    @Test
    public void shouldOverwriteGroupProtocolOtherThanClassic(TestInfo testInfo) {
        String appID = org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName((TestInfo)testInfo);
        String[] parameters = new String[]{"--application-id", appID, "--bootstrap-server", cluster.bootstrapServers(), "--input-topics", "inputTopic"};
        Properties cleanUpConfig = new Properties();
        cleanUpConfig.put("group.protocol", GroupProtocol.CONSUMER.name());
        cleanUpConfig.put("heartbeat.interval.ms", (Object)100);
        cleanUpConfig.put("session.timeout.ms", Integer.toString(2000));
        int exitCode = new StreamsResetter().execute(parameters, cleanUpConfig);
        Assertions.assertEquals((int)0, (int)exitCode, (String)"Resetter should overwrite the group protocol to CLASSIC");
    }

    @Test
    public void shouldNotAllowToResetWhenIntermediateTopicAbsent(TestInfo testInfo) {
        String appID = org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName((TestInfo)testInfo);
        String[] parameters = new String[]{"--application-id", appID, "--bootstrap-server", cluster.bootstrapServers(), "--intermediate-topics", NON_EXISTING_TOPIC};
        Properties cleanUpConfig = new Properties();
        cleanUpConfig.put("heartbeat.interval.ms", (Object)100);
        cleanUpConfig.put("session.timeout.ms", Integer.toString(2000));
        int exitCode = new StreamsResetter().execute(parameters, cleanUpConfig);
        Assertions.assertEquals((int)1, (int)exitCode);
    }

    @Test
    public void shouldNotAllowToResetWhenSpecifiedInternalTopicDoesNotExist(TestInfo testInfo) {
        String appID = org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName((TestInfo)testInfo);
        String[] parameters = new String[]{"--application-id", appID, "--bootstrap-server", cluster.bootstrapServers(), "--internal-topics", NON_EXISTING_TOPIC};
        Properties cleanUpConfig = new Properties();
        cleanUpConfig.put("heartbeat.interval.ms", (Object)100);
        cleanUpConfig.put("session.timeout.ms", Integer.toString(2000));
        int exitCode = new StreamsResetter().execute(parameters, cleanUpConfig);
        Assertions.assertEquals((int)1, (int)exitCode);
    }

    @Test
    public void shouldNotAllowToResetWhenSpecifiedInternalTopicIsNotInternal(TestInfo testInfo) {
        String appID = org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName((TestInfo)testInfo);
        String[] parameters = new String[]{"--application-id", appID, "--bootstrap-server", cluster.bootstrapServers(), "--internal-topics", "inputTopic"};
        Properties cleanUpConfig = new Properties();
        cleanUpConfig.put("heartbeat.interval.ms", (Object)100);
        cleanUpConfig.put("session.timeout.ms", Integer.toString(2000));
        int exitCode = new StreamsResetter().execute(parameters, cleanUpConfig);
        Assertions.assertEquals((int)1, (int)exitCode);
    }

    @Test
    public void testResetWhenLongSessionTimeoutConfiguredWithForceOption(TestInfo testInfo) throws Exception {
        String appID = org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName((TestInfo)testInfo);
        this.streamsConfig.put("application.id", appID);
        this.streamsConfig.put("session.timeout.ms", Integer.toString(200000));
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning((KafkaStreams)streams);
        List result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived((Properties)this.resultConsumerConfig, (String)"outputTopic", (int)10);
        streams.close();
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.cleanUp();
        boolean cleanResult = this.tryCleanGlobal(false, null, null, appID);
        Assertions.assertFalse((boolean)cleanResult);
        this.cleanGlobal(false, "--force", null, appID);
        Assertions.assertTrue((boolean)IntegrationTestUtils.isEmptyConsumerGroup((Admin)adminClient, (String)appID), (String)"Group is not empty after cleanGlobal");
        this.assertInternalTopicsGotDeleted(null);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning((KafkaStreams)streams);
        List resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived((Properties)this.resultConsumerConfig, (String)"outputTopic", (int)10);
        streams.close();
        Assertions.assertEquals((Object)result, (Object)resultRerun);
        this.cleanGlobal(false, "--force", null, appID);
    }

    @Test
    public void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic(TestInfo testInfo) throws Exception {
        String appID = org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName((TestInfo)testInfo);
        this.streamsConfig.put("application.id", appID);
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning((KafkaStreams)streams);
        List result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived((Properties)this.resultConsumerConfig, (String)"outputTopic", (int)10);
        streams.close();
        IntegrationTestUtils.waitForEmptyConsumerGroup((Admin)adminClient, (String)appID, (long)60000L);
        File resetFile = TestUtils.tempFile((String)"reset", (String)".csv");
        try (BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile));){
            writer.write("inputTopic,0,1");
        }
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.cleanUp();
        this.cleanGlobal(false, "--from-file", resetFile.getAbsolutePath(), appID);
        IntegrationTestUtils.waitForEmptyConsumerGroup((Admin)adminClient, (String)appID, (long)60000L);
        this.assertInternalTopicsGotDeleted(null);
        resetFile.deleteOnExit();
        IntegrationTestUtils.startApplicationAndWaitUntilRunning((KafkaStreams)streams);
        List resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived((Properties)this.resultConsumerConfig, (String)"outputTopic", (int)5);
        streams.close();
        result.remove(0);
        Assertions.assertEquals((Object)result, (Object)resultRerun);
        IntegrationTestUtils.waitForEmptyConsumerGroup((Admin)adminClient, (String)appID, (long)60000L);
        this.cleanGlobal(false, null, null, appID);
    }

    @Test
    public void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic(TestInfo testInfo) throws Exception {
        String appID = org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName((TestInfo)testInfo);
        this.streamsConfig.put("application.id", appID);
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning((KafkaStreams)streams);
        List result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived((Properties)this.resultConsumerConfig, (String)"outputTopic", (int)10);
        streams.close();
        IntegrationTestUtils.waitForEmptyConsumerGroup((Admin)adminClient, (String)appID, (long)60000L);
        File resetFile = TestUtils.tempFile((String)"reset", (String)".csv");
        try (BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile));){
            writer.write("inputTopic,0,1");
        }
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.cleanUp();
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
        Calendar calendar = Calendar.getInstance();
        calendar.add(5, -1);
        this.cleanGlobal(false, "--to-datetime", format.format(calendar.getTime()), appID);
        IntegrationTestUtils.waitForEmptyConsumerGroup((Admin)adminClient, (String)appID, (long)60000L);
        this.assertInternalTopicsGotDeleted(null);
        resetFile.deleteOnExit();
        IntegrationTestUtils.startApplicationAndWaitUntilRunning((KafkaStreams)streams);
        List resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived((Properties)this.resultConsumerConfig, (String)"outputTopic", (int)10);
        streams.close();
        Assertions.assertEquals((Object)result, (Object)resultRerun);
        IntegrationTestUtils.waitForEmptyConsumerGroup((Admin)adminClient, (String)appID, (long)60000L);
        this.cleanGlobal(false, null, null, appID);
    }

    @Test
    public void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic(TestInfo testInfo) throws Exception {
        String appID = org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName((TestInfo)testInfo);
        this.streamsConfig.put("application.id", appID);
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning((KafkaStreams)streams);
        List result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived((Properties)this.resultConsumerConfig, (String)"outputTopic", (int)10);
        streams.close();
        IntegrationTestUtils.waitForEmptyConsumerGroup((Admin)adminClient, (String)appID, (long)60000L);
        File resetFile = TestUtils.tempFile((String)"reset", (String)".csv");
        try (BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile));){
            writer.write("inputTopic,0,1");
        }
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.cleanUp();
        this.cleanGlobal(false, "--by-duration", "PT1M", appID);
        IntegrationTestUtils.waitForEmptyConsumerGroup((Admin)adminClient, (String)appID, (long)60000L);
        this.assertInternalTopicsGotDeleted(null);
        resetFile.deleteOnExit();
        IntegrationTestUtils.startApplicationAndWaitUntilRunning((KafkaStreams)streams);
        List resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived((Properties)this.resultConsumerConfig, (String)"outputTopic", (int)10);
        streams.close();
        Assertions.assertEquals((Object)result, (Object)resultRerun);
        IntegrationTestUtils.waitForEmptyConsumerGroup((Admin)adminClient, (String)appID, (long)60000L);
        this.cleanGlobal(false, null, null, appID);
    }

    static {
        Properties brokerProps = new Properties();
        brokerProps.put("connections.max.idle.ms", (Object)-1L);
        CLUSTER = new EmbeddedKafkaCluster(3, brokerProps);
    }
}

