package org.apache.kafka.streams.integration;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.server.KafkaConfig$;
import kafka.tools.StreamsResetter;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.Timeout;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/ResetIntegrationTest.class */
public class ResetIntegrationTest extends AbstractResetIntegrationTest {

    @Rule
    public Timeout globalTimeout = Timeout.seconds(600);
    private static final String NON_EXISTING_TOPIC = "nonExistingTopic";
    public static final EmbeddedKafkaCluster CLUSTER;

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Override // org.apache.kafka.streams.integration.AbstractResetIntegrationTest
    Map<String, Object> getClientSslConfig() {
        return null;
    }

    @Before
    public void before() throws Exception {
        cluster = CLUSTER;
        prepareTest();
    }

    @After
    public void after() throws Exception {
        cleanupTest();
    }

    @Test
    public void shouldNotAllowToResetWhileStreamsIsRunning() {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        String[] strArr = {"--application-id", safeUniqueTestName, "--bootstrap-server", cluster.bootstrapServers(), "--input-topics", NON_EXISTING_TOPIC};
        Properties properties = new Properties();
        properties.put("heartbeat.interval.ms", 100);
        properties.put("session.timeout.ms", Integer.toString(2000));
        this.streamsConfig.put("application.id", safeUniqueTestName);
        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.start();
        Assert.assertEquals(1L, new StreamsResetter().run(strArr, properties));
        streams.close();
    }

    @Test
    public void shouldNotAllowToResetWhenInputTopicAbsent() {
        String[] strArr = {"--application-id", IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName), "--bootstrap-server", cluster.bootstrapServers(), "--input-topics", NON_EXISTING_TOPIC};
        Properties properties = new Properties();
        properties.put("heartbeat.interval.ms", 100);
        properties.put("session.timeout.ms", Integer.toString(2000));
        Assert.assertEquals(1L, new StreamsResetter().run(strArr, properties));
    }

    @Test
    public void shouldNotAllowToResetWhenIntermediateTopicAbsent() {
        String[] strArr = {"--application-id", IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName), "--bootstrap-server", cluster.bootstrapServers(), "--intermediate-topics", NON_EXISTING_TOPIC};
        Properties properties = new Properties();
        properties.put("heartbeat.interval.ms", 100);
        properties.put("session.timeout.ms", Integer.toString(2000));
        Assert.assertEquals(1L, new StreamsResetter().run(strArr, properties));
    }

    @Test
    public void shouldNotAllowToResetWhenSpecifiedInternalTopicDoesNotExist() {
        String[] strArr = {"--application-id", IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName), "--bootstrap-server", cluster.bootstrapServers(), "--internal-topics", NON_EXISTING_TOPIC};
        Properties properties = new Properties();
        properties.put("heartbeat.interval.ms", 100);
        properties.put("session.timeout.ms", Integer.toString(2000));
        Assert.assertEquals(1L, new StreamsResetter().run(strArr, properties));
    }

    @Test
    public void shouldNotAllowToResetWhenSpecifiedInternalTopicIsNotInternal() {
        String[] strArr = {"--application-id", IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName), "--bootstrap-server", cluster.bootstrapServers(), "--internal-topics", "inputTopic"};
        Properties properties = new Properties();
        properties.put("heartbeat.interval.ms", 100);
        properties.put("session.timeout.ms", Integer.toString(2000));
        Assert.assertEquals(1L, new StreamsResetter().run(strArr, properties));
    }

    @Test
    public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() throws Exception {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        this.streamsConfig.put("application.id", safeUniqueTestName);
        this.streamsConfig.put("session.timeout.ms", Integer.toString(200000));
        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.start();
        List waitUntilMinKeyValueRecordsReceived = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, "outputTopic", 10);
        streams.close();
        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.cleanUp();
        Assert.assertFalse(tryCleanGlobal(false, null, null, safeUniqueTestName));
        cleanGlobal(false, "--force", null, safeUniqueTestName);
        MatcherAssert.assertThat("Group is not empty after cleanGlobal", IntegrationTestUtils.isEmptyConsumerGroup(adminClient, safeUniqueTestName));
        assertInternalTopicsGotDeleted(null);
        streams.start();
        List waitUntilMinKeyValueRecordsReceived2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, "outputTopic", 10);
        streams.close();
        MatcherAssert.assertThat(waitUntilMinKeyValueRecordsReceived2, CoreMatchers.equalTo(waitUntilMinKeyValueRecordsReceived));
        cleanGlobal(false, "--force", null, safeUniqueTestName);
    }

    @Test
    public void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws Exception {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        this.streamsConfig.put("application.id", safeUniqueTestName);
        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.start();
        List waitUntilMinKeyValueRecordsReceived = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, "outputTopic", 10);
        streams.close();
        IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, safeUniqueTestName, 30000L);
        File tempFile = TestUtils.tempFile("reset", ".csv");
        BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(tempFile));
        Throwable th = null;
        try {
            try {
                bufferedWriter.write("inputTopic,0,1");
                if (bufferedWriter != null) {
                    if (0 != 0) {
                        try {
                            bufferedWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        bufferedWriter.close();
                    }
                }
                streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
                streams.cleanUp();
                cleanGlobal(false, "--from-file", tempFile.getAbsolutePath(), safeUniqueTestName);
                IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, safeUniqueTestName, 30000L);
                assertInternalTopicsGotDeleted(null);
                tempFile.deleteOnExit();
                streams.start();
                List waitUntilMinKeyValueRecordsReceived2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, "outputTopic", 5);
                streams.close();
                waitUntilMinKeyValueRecordsReceived.remove(0);
                MatcherAssert.assertThat(waitUntilMinKeyValueRecordsReceived2, CoreMatchers.equalTo(waitUntilMinKeyValueRecordsReceived));
                IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, safeUniqueTestName, 30000L);
                cleanGlobal(false, null, null, safeUniqueTestName);
            } finally {
            }
        } catch (Throwable th3) {
            if (bufferedWriter != null) {
                if (th != null) {
                    try {
                        bufferedWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    bufferedWriter.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() throws Exception {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        this.streamsConfig.put("application.id", safeUniqueTestName);
        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.start();
        List waitUntilMinKeyValueRecordsReceived = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, "outputTopic", 10);
        streams.close();
        IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, safeUniqueTestName, 30000L);
        File tempFile = TestUtils.tempFile("reset", ".csv");
        BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(tempFile));
        Throwable th = null;
        try {
            try {
                bufferedWriter.write("inputTopic,0,1");
                if (bufferedWriter != null) {
                    if (0 != 0) {
                        try {
                            bufferedWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        bufferedWriter.close();
                    }
                }
                streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
                streams.cleanUp();
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
                Calendar calendar = Calendar.getInstance();
                calendar.add(5, -1);
                cleanGlobal(false, "--to-datetime", simpleDateFormat.format(calendar.getTime()), safeUniqueTestName);
                IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, safeUniqueTestName, 30000L);
                assertInternalTopicsGotDeleted(null);
                tempFile.deleteOnExit();
                streams.start();
                List waitUntilMinKeyValueRecordsReceived2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, "outputTopic", 10);
                streams.close();
                MatcherAssert.assertThat(waitUntilMinKeyValueRecordsReceived2, CoreMatchers.equalTo(waitUntilMinKeyValueRecordsReceived));
                IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, safeUniqueTestName, 30000L);
                cleanGlobal(false, null, null, safeUniqueTestName);
            } finally {
            }
        } catch (Throwable th3) {
            if (bufferedWriter != null) {
                if (th != null) {
                    try {
                        bufferedWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    bufferedWriter.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws Exception {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        this.streamsConfig.put("application.id", safeUniqueTestName);
        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.start();
        List waitUntilMinKeyValueRecordsReceived = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, "outputTopic", 10);
        streams.close();
        IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, safeUniqueTestName, 30000L);
        File tempFile = TestUtils.tempFile("reset", ".csv");
        BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(tempFile));
        Throwable th = null;
        try {
            try {
                bufferedWriter.write("inputTopic,0,1");
                if (bufferedWriter != null) {
                    if (0 != 0) {
                        try {
                            bufferedWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        bufferedWriter.close();
                    }
                }
                streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
                streams.cleanUp();
                cleanGlobal(false, "--by-duration", "PT1M", safeUniqueTestName);
                IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, safeUniqueTestName, 30000L);
                assertInternalTopicsGotDeleted(null);
                tempFile.deleteOnExit();
                streams.start();
                List waitUntilMinKeyValueRecordsReceived2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, "outputTopic", 10);
                streams.close();
                MatcherAssert.assertThat(waitUntilMinKeyValueRecordsReceived2, CoreMatchers.equalTo(waitUntilMinKeyValueRecordsReceived));
                IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, safeUniqueTestName, 30000L);
                cleanGlobal(false, null, null, safeUniqueTestName);
            } finally {
            }
        } catch (Throwable th3) {
            if (bufferedWriter != null) {
                if (th != null) {
                    try {
                        bufferedWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    bufferedWriter.close();
                }
            }
            throw th3;
        }
    }

    static {
        Properties properties = new Properties();
        properties.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L);
        CLUSTER = new EmbeddedKafkaCluster(1, properties);
    }
}
