package org.apache.kafka.streams.integration;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import kafka.tools.StreamsResetter;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/AbstractResetIntegrationTest.class */
public abstract class AbstractResetIntegrationTest {
    static String testId;
    static EmbeddedKafkaCluster cluster;
    private static MockTime mockTime;
    private static KafkaStreams streams;
    private static AdminClient adminClient = null;
    private Properties commonClientConfig;
    private Properties streamsConfig;
    private Properties producerConfig;
    private Properties resultConsumerConfig;
    private static final String INPUT_TOPIC = "inputTopic";
    private static final String OUTPUT_TOPIC = "outputTopic";
    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
    private static final String INTERMEDIATE_USER_TOPIC = "userTopic";
    private static final String NON_EXISTING_TOPIC = "nonExistingTopic";
    private static final long STREAMS_CONSUMER_TIMEOUT = 2000;
    private static final long CLEANUP_CONSUMER_TIMEOUT = 2000;
    private static final int TIMEOUT_MULTIPLIER = 5;
    private String appID = "abstract-reset-integration-test";

    @Rule
    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());

    /* loaded from: input_file:org/apache/kafka/streams/integration/AbstractResetIntegrationTest$ConsumerGroupInactiveCondition.class */
    private class ConsumerGroupInactiveCondition implements TestCondition {
        private ConsumerGroupInactiveCondition() {
        }

        public boolean conditionMet() {
            try {
                return ((ConsumerGroupDescription) ((KafkaFuture) AbstractResetIntegrationTest.adminClient.describeConsumerGroups(Collections.singletonList(AbstractResetIntegrationTest.this.appID)).describedGroups().get(AbstractResetIntegrationTest.this.appID)).get()).members().isEmpty();
            } catch (InterruptedException | ExecutionException e) {
                return false;
            }
        }
    }

    abstract Map<String, Object> getClientSslConfig();

    @AfterClass
    public static void afterClassCleanup() {
        if (adminClient != null) {
            adminClient.close(10L, TimeUnit.SECONDS);
            adminClient = null;
        }
    }

    private void prepareEnvironment() {
        if (adminClient == null) {
            adminClient = AdminClient.create(this.commonClientConfig);
        }
        boolean z = false;
        while (!z) {
            z = setCurrentTime();
        }
    }

    private boolean setCurrentTime() {
        boolean z = false;
        try {
            mockTime = cluster.time;
            mockTime.setCurrentTimeMs(((System.currentTimeMillis() / 1000) + 1) * 1000);
            z = true;
        } catch (IllegalArgumentException e) {
        }
        return z;
    }

    private void prepareConfigs() {
        this.commonClientConfig = new Properties();
        this.commonClientConfig.put("bootstrap.servers", cluster.bootstrapServers());
        Map<String, Object> clientSslConfig = getClientSslConfig();
        if (clientSslConfig != null) {
            this.commonClientConfig.put("ssl.truststore.location", clientSslConfig.get("ssl.truststore.location"));
            this.commonClientConfig.put("ssl.truststore.password", ((Password) clientSslConfig.get("ssl.truststore.password")).value());
            this.commonClientConfig.put("security.protocol", "SSL");
        }
        this.producerConfig = new Properties();
        this.producerConfig.put("acks", "all");
        this.producerConfig.put("retries", 0);
        this.producerConfig.put("key.serializer", LongSerializer.class);
        this.producerConfig.put("value.serializer", StringSerializer.class);
        this.producerConfig.putAll(this.commonClientConfig);
        this.resultConsumerConfig = new Properties();
        this.resultConsumerConfig.put("group.id", testId + "-result-consumer");
        this.resultConsumerConfig.put("auto.offset.reset", "earliest");
        this.resultConsumerConfig.put("key.deserializer", LongDeserializer.class);
        this.resultConsumerConfig.put("value.deserializer", LongDeserializer.class);
        this.resultConsumerConfig.putAll(this.commonClientConfig);
        this.streamsConfig = new Properties();
        this.streamsConfig.put("state.dir", this.testFolder.getRoot().getPath());
        this.streamsConfig.put("default.key.serde", Serdes.Long().getClass());
        this.streamsConfig.put("default.value.serde", Serdes.String().getClass());
        this.streamsConfig.put("cache.max.bytes.buffering", 0);
        this.streamsConfig.put("commit.interval.ms", 100);
        this.streamsConfig.put("heartbeat.interval.ms", 100);
        this.streamsConfig.put("auto.offset.reset", "earliest");
        this.streamsConfig.put("session.timeout.ms", "2000");
        this.streamsConfig.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
        this.streamsConfig.putAll(this.commonClientConfig);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void prepareTest() throws Exception {
        prepareConfigs();
        prepareEnvironment();
        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 10000L, "Test consumer group " + this.appID + " still active even after waiting 10000 ms.");
        cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
        add10InputElements();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cleanupTest() throws Exception {
        if (streams != null) {
            streams.close(Duration.ofSeconds(30L));
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfig);
    }

    private void add10InputElements() throws ExecutionException, InterruptedException {
        for (KeyValue keyValue : Arrays.asList(KeyValue.pair(0L, "aaa"), KeyValue.pair(1L, "bbb"), KeyValue.pair(0L, "ccc"), KeyValue.pair(1L, "ddd"), KeyValue.pair(0L, "eee"), KeyValue.pair(1L, "fff"), KeyValue.pair(0L, "ggg"), KeyValue.pair(1L, "hhh"), KeyValue.pair(0L, "iii"), KeyValue.pair(1L, "jjj"))) {
            mockTime.sleep(10L);
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(keyValue), this.producerConfig, Long.valueOf(mockTime.milliseconds()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shouldNotAllowToResetWhileStreamsIsRunning() throws Exception {
        this.appID = testId + "-not-reset-during-runtime";
        String[] strArr = {"--application-id", this.appID, "--bootstrap-servers", cluster.bootstrapServers(), "--input-topics", NON_EXISTING_TOPIC, "--execute"};
        Properties properties = new Properties();
        properties.put("heartbeat.interval.ms", 100);
        properties.put("session.timeout.ms", "2000");
        this.streamsConfig.put("application.id", this.appID);
        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.start();
        Assert.assertEquals(1L, new StreamsResetter().run(strArr, properties));
        streams.close();
    }

    public void shouldNotAllowToResetWhenInputTopicAbsent() throws Exception {
        this.appID = testId + "-not-reset-without-input-topic";
        String[] strArr = {"--application-id", this.appID, "--bootstrap-servers", cluster.bootstrapServers(), "--input-topics", NON_EXISTING_TOPIC, "--execute"};
        Properties properties = new Properties();
        properties.put("heartbeat.interval.ms", 100);
        properties.put("session.timeout.ms", "2000");
        Assert.assertEquals(1L, new StreamsResetter().run(strArr, properties));
    }

    public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception {
        this.appID = testId + "-not-reset-without-intermediate-topic";
        String[] strArr = {"--application-id", this.appID, "--bootstrap-servers", cluster.bootstrapServers(), "--intermediate-topics", NON_EXISTING_TOPIC, "--execute"};
        Properties properties = new Properties();
        properties.put("heartbeat.interval.ms", 100);
        properties.put("session.timeout.ms", "2000");
        Assert.assertEquals(1L, new StreamsResetter().run(strArr, properties));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
        this.appID = testId + "-from-scratch";
        this.streamsConfig.put("application.id", this.appID);
        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.start();
        List waitUntilMinKeyValueRecordsReceived = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, OUTPUT_TOPIC, 10);
        streams.close();
        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 10000L, "Streams Application consumer group " + this.appID + " did not time out after 10000 ms.");
        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.cleanUp();
        cleanGlobal(false, null, null);
        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 10000L, "Reset Tool consumer group " + this.appID + " did not time out after 10000 ms.");
        assertInternalTopicsGotDeleted(null);
        streams.start();
        List waitUntilMinKeyValueRecordsReceived2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, OUTPUT_TOPIC, 10);
        streams.close();
        MatcherAssert.assertThat(waitUntilMinKeyValueRecordsReceived2, CoreMatchers.equalTo(waitUntilMinKeyValueRecordsReceived));
        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 10000L, "Reset Tool consumer group " + this.appID + " did not time out after 10000 ms.");
        cleanGlobal(false, null, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception {
        cluster.createTopic(INTERMEDIATE_USER_TOPIC);
        this.appID = testId + "-from-scratch-with-intermediate-topic";
        this.streamsConfig.put("application.id", this.appID);
        streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), this.streamsConfig);
        streams.start();
        List waitUntilMinKeyValueRecordsReceived = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, OUTPUT_TOPIC, 10);
        List waitUntilMinKeyValueRecordsReceived2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, OUTPUT_TOPIC_2, 40);
        streams.close();
        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 10000L, "Streams Application consumer group " + this.appID + " did not time out after 10000 ms.");
        mockTime.sleep(1L);
        KeyValue keyValue = new KeyValue(-1L, "badRecord-ShouldBeSkipped");
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INTERMEDIATE_USER_TOPIC, Collections.singleton(keyValue), this.producerConfig, Long.valueOf(mockTime.milliseconds()));
        streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), this.streamsConfig);
        streams.cleanUp();
        cleanGlobal(true, null, null);
        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 10000L, "Reset Tool consumer group " + this.appID + " did not time out after 10000 ms.");
        assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC);
        streams.start();
        List waitUntilMinKeyValueRecordsReceived3 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, OUTPUT_TOPIC, 10);
        List waitUntilMinKeyValueRecordsReceived4 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, OUTPUT_TOPIC_2_RERUN, 40);
        streams.close();
        MatcherAssert.assertThat(waitUntilMinKeyValueRecordsReceived3, CoreMatchers.equalTo(waitUntilMinKeyValueRecordsReceived));
        MatcherAssert.assertThat(waitUntilMinKeyValueRecordsReceived4, CoreMatchers.equalTo(waitUntilMinKeyValueRecordsReceived2));
        List waitUntilMinKeyValueRecordsReceived5 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig(cluster.bootstrapServers(), testId + "-result-consumer", LongDeserializer.class, StringDeserializer.class, this.commonClientConfig), INTERMEDIATE_USER_TOPIC, 21);
        for (int i = 0; i < 10; i++) {
            MatcherAssert.assertThat(waitUntilMinKeyValueRecordsReceived5.get(i), CoreMatchers.equalTo(waitUntilMinKeyValueRecordsReceived5.get(i + 11)));
        }
        MatcherAssert.assertThat(waitUntilMinKeyValueRecordsReceived5.get(10), CoreMatchers.equalTo(keyValue));
        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 10000L, "Reset Tool consumer group " + this.appID + " did not time out after 10000 ms.");
        cleanGlobal(true, null, null);
        cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws Exception {
        this.appID = testId + "-from-file";
        this.streamsConfig.put("application.id", this.appID);
        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.start();
        List waitUntilMinKeyValueRecordsReceived = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, OUTPUT_TOPIC, 10);
        streams.close();
        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 10000L, "Streams Application consumer group " + this.appID + " did not time out after 10000 ms.");
        File createTempFile = File.createTempFile("reset", ".csv");
        BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(createTempFile));
        Throwable th = null;
        try {
            try {
                bufferedWriter.write("inputTopic,0,1");
                bufferedWriter.close();
                if (bufferedWriter != null) {
                    if (0 != 0) {
                        try {
                            bufferedWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        bufferedWriter.close();
                    }
                }
                streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
                streams.cleanUp();
                cleanGlobal(false, "--from-file", createTempFile.getAbsolutePath());
                TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 10000L, "Reset Tool consumer group " + this.appID + " did not time out after 10000 ms.");
                assertInternalTopicsGotDeleted(null);
                createTempFile.deleteOnExit();
                streams.start();
                List waitUntilMinKeyValueRecordsReceived2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, OUTPUT_TOPIC, TIMEOUT_MULTIPLIER);
                streams.close();
                waitUntilMinKeyValueRecordsReceived.remove(0);
                MatcherAssert.assertThat(waitUntilMinKeyValueRecordsReceived2, CoreMatchers.equalTo(waitUntilMinKeyValueRecordsReceived));
                TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 10000L, "Reset Tool consumer group " + this.appID + " did not time out after 10000 ms.");
                cleanGlobal(false, null, null);
            } finally {
            }
        } catch (Throwable th3) {
            if (bufferedWriter != null) {
                if (th != null) {
                    try {
                        bufferedWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    bufferedWriter.close();
                }
            }
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() throws Exception {
        this.appID = testId + "-from-datetime";
        this.streamsConfig.put("application.id", this.appID);
        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.start();
        List waitUntilMinKeyValueRecordsReceived = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, OUTPUT_TOPIC, 10);
        streams.close();
        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 10000L, "Streams Application consumer group " + this.appID + " did not time out after 10000 ms.");
        File createTempFile = File.createTempFile("reset", ".csv");
        BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(createTempFile));
        Throwable th = null;
        try {
            try {
                bufferedWriter.write("inputTopic,0,1");
                bufferedWriter.close();
                if (bufferedWriter != null) {
                    if (0 != 0) {
                        try {
                            bufferedWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        bufferedWriter.close();
                    }
                }
                streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
                streams.cleanUp();
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
                Calendar calendar = Calendar.getInstance();
                calendar.add(TIMEOUT_MULTIPLIER, -1);
                cleanGlobal(false, "--to-datetime", simpleDateFormat.format(calendar.getTime()));
                TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 10000L, "Reset Tool consumer group " + this.appID + " did not time out after 10000 ms.");
                assertInternalTopicsGotDeleted(null);
                createTempFile.deleteOnExit();
                streams.start();
                List waitUntilMinKeyValueRecordsReceived2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, OUTPUT_TOPIC, 10);
                streams.close();
                MatcherAssert.assertThat(waitUntilMinKeyValueRecordsReceived2, CoreMatchers.equalTo(waitUntilMinKeyValueRecordsReceived));
                TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 10000L, "Reset Tool consumer group " + this.appID + " did not time out after 10000 ms.");
                cleanGlobal(false, null, null);
            } finally {
            }
        } catch (Throwable th3) {
            if (bufferedWriter != null) {
                if (th != null) {
                    try {
                        bufferedWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    bufferedWriter.close();
                }
            }
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws Exception {
        this.appID = testId + "-from-duration";
        this.streamsConfig.put("application.id", this.appID);
        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.start();
        List waitUntilMinKeyValueRecordsReceived = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, OUTPUT_TOPIC, 10);
        streams.close();
        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 10000L, "Streams Application consumer group " + this.appID + "  did not time out after 10000 ms.");
        File createTempFile = File.createTempFile("reset", ".csv");
        BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(createTempFile));
        Throwable th = null;
        try {
            try {
                bufferedWriter.write("inputTopic,0,1");
                bufferedWriter.close();
                if (bufferedWriter != null) {
                    if (0 != 0) {
                        try {
                            bufferedWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        bufferedWriter.close();
                    }
                }
                streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
                streams.cleanUp();
                cleanGlobal(false, "--by-duration", "PT1M");
                TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 10000L, "Reset Tool consumer group " + this.appID + " did not time out after 10000 ms.");
                assertInternalTopicsGotDeleted(null);
                createTempFile.deleteOnExit();
                streams.start();
                List waitUntilMinKeyValueRecordsReceived2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, OUTPUT_TOPIC, 10);
                streams.close();
                MatcherAssert.assertThat(waitUntilMinKeyValueRecordsReceived2, CoreMatchers.equalTo(waitUntilMinKeyValueRecordsReceived));
                TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 10000L, "Reset Tool consumer group " + this.appID + " did not time out after 10000 ms.");
                cleanGlobal(false, null, null);
            } finally {
            }
        } catch (Throwable th3) {
            if (bufferedWriter != null) {
                if (th != null) {
                    try {
                        bufferedWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    bufferedWriter.close();
                }
            }
            throw th3;
        }
    }

    private Topology setupTopologyWithIntermediateUserTopic(String str) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream(INPUT_TOPIC);
        stream.map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() { // from class: org.apache.kafka.streams.integration.AbstractResetIntegrationTest.1
            public KeyValue<Long, String> apply(Long l, String str2) {
                return new KeyValue<>(l, str2);
            }
        }).groupByKey().count().toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long()));
        stream.through(INTERMEDIATE_USER_TOPIC).groupByKey().windowedBy(TimeWindows.of(Duration.ofMillis(35L)).advanceBy(Duration.ofMillis(10L))).count().toStream().map(new KeyValueMapper<Windowed<Long>, Long, KeyValue<Long, Long>>() { // from class: org.apache.kafka.streams.integration.AbstractResetIntegrationTest.2
            public KeyValue<Long, Long> apply(Windowed<Long> windowed, Long l) {
                return new KeyValue<>(Long.valueOf(windowed.window().start() + windowed.window().end()), l);
            }
        }).to(str, Produced.with(Serdes.Long(), Serdes.Long()));
        return streamsBuilder.build();
    }

    private Topology setupTopologyWithoutIntermediateUserTopic() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(INPUT_TOPIC).map(new KeyValueMapper<Long, String, KeyValue<Long, Long>>() { // from class: org.apache.kafka.streams.integration.AbstractResetIntegrationTest.3
            public KeyValue<Long, Long> apply(Long l, String str) {
                return new KeyValue<>(l, l);
            }
        }).to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long()));
        return streamsBuilder.build();
    }

    private void cleanGlobal(boolean z, String str, String str2) throws Exception {
        ArrayList arrayList = new ArrayList(Arrays.asList("--application-id", this.appID, "--bootstrap-servers", cluster.bootstrapServers(), "--input-topics", INPUT_TOPIC, "--execute"));
        if (z) {
            arrayList.add("--intermediate-topics");
            arrayList.add(INTERMEDIATE_USER_TOPIC);
        }
        Map<String, Object> clientSslConfig = getClientSslConfig();
        if (clientSslConfig != null) {
            File tempFile = TestUtils.tempFile();
            BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(tempFile));
            bufferedWriter.write("security.protocol=SSL\n");
            bufferedWriter.write("ssl.truststore.location=" + clientSslConfig.get("ssl.truststore.location") + "\n");
            bufferedWriter.write("ssl.truststore.password=" + ((Password) clientSslConfig.get("ssl.truststore.password")).value() + "\n");
            bufferedWriter.close();
            arrayList.add("--config-file");
            arrayList.add(tempFile.getAbsolutePath());
        }
        if (str != null) {
            arrayList.add(str);
        }
        if (str2 != null) {
            arrayList.add(str2);
        }
        String[] strArr = (String[]) arrayList.toArray(new String[arrayList.size()]);
        Properties properties = new Properties();
        properties.put("heartbeat.interval.ms", 100);
        properties.put("session.timeout.ms", "2000");
        Assert.assertEquals(0L, new StreamsResetter().run(strArr, properties));
    }

    private void assertInternalTopicsGotDeleted(String str) throws Exception {
        if (str != null) {
            cluster.waitForRemainingTopics(IntegrationTestUtils.DEFAULT_TIMEOUT, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, "__consumer_offsets", str);
        } else {
            cluster.waitForRemainingTopics(IntegrationTestUtils.DEFAULT_TIMEOUT, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, "__consumer_offsets");
        }
    }
}
