package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.class */
public class StandbyTaskEOSIntegrationTest {
    private static final int KEY_0 = 0;
    private static final int KEY_1 = 1;
    private final AtomicBoolean skipRecord = new AtomicBoolean(false);
    private String appId;
    private String inputTopic;
    private String storeName;
    private String outputTopic;
    private static final long REBALANCE_TIMEOUT = Duration.ofMinutes(2).toMillis();

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);

    @Before
    public void createTopics() throws Exception {
        this.appId = "standbyTest";
        this.inputTopic = "testInputTopic";
        this.outputTopic = "testOutputTopic";
        this.storeName = "dedupStore";
        CLUSTER.deleteTopicsAndWait(this.inputTopic, this.outputTopic);
        CLUSTER.createTopic(this.inputTopic, KEY_1, 3);
        CLUSTER.createTopic(this.outputTopic, KEY_1, 3);
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception {
        String path = TestUtils.tempDirectory(this.appId).getPath();
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, Collections.singletonList(new KeyValue(Integer.valueOf(KEY_0), Integer.valueOf(KEY_0))), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, new Properties()), 10L);
        KafkaStreams buildWithDeduplicationTopology = buildWithDeduplicationTopology(path + "-1");
        Throwable th = null;
        try {
            KafkaStreams buildWithDeduplicationTopology2 = buildWithDeduplicationTopology(path + "-2");
            Throwable th2 = null;
            try {
                KafkaStreams buildWithDeduplicationTopology3 = buildWithDeduplicationTopology(path + "-1");
                Throwable th3 = KEY_0;
                try {
                    try {
                        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(buildWithDeduplicationTopology), Duration.ofSeconds(30L));
                        IntegrationTestUtils.waitUntilMinRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), IntegerDeserializer.class, IntegerDeserializer.class), this.outputTopic, KEY_1);
                        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(buildWithDeduplicationTopology2), Duration.ofSeconds(30L));
                        TestUtils.waitForCondition(() -> {
                            return ((ReadOnlyKeyValueStore) buildWithDeduplicationTopology2.store(StoreQueryParameters.fromNameAndType(this.storeName, QueryableStoreTypes.keyValueStore()).enableStaleStores())).get(Integer.valueOf(KEY_0)) != null;
                        }, REBALANCE_TIMEOUT, "Could not get key from standby store");
                        TestUtils.waitForCondition(() -> {
                            return ((ReadOnlyKeyValueStore) buildWithDeduplicationTopology.store(StoreQueryParameters.fromNameAndType(this.storeName, QueryableStoreTypes.keyValueStore()))).get(Integer.valueOf(KEY_0)) != null;
                        }, "Could not get key from main store");
                        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, Collections.singletonList(new KeyValue(Integer.valueOf(KEY_1), Integer.valueOf(KEY_0))), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, new Properties()), 10L);
                        TestUtils.waitForCondition(() -> {
                            return buildWithDeduplicationTopology.state() == KafkaStreams.State.ERROR;
                        }, "Stream instance 1 did not go into error state");
                        buildWithDeduplicationTopology.close();
                        IntegrationTestUtils.waitUntilMinRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), IntegerDeserializer.class, IntegerDeserializer.class), this.outputTopic, 2);
                        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(buildWithDeduplicationTopology3), Duration.ofSeconds(30L));
                        TestUtils.waitForCondition(() -> {
                            return ((ReadOnlyKeyValueStore) buildWithDeduplicationTopology3.store(StoreQueryParameters.fromNameAndType(this.storeName, QueryableStoreTypes.keyValueStore()).enableStaleStores())).get(Integer.valueOf(KEY_0)) != null;
                        }, "Could not get key from recovered standby store");
                        buildWithDeduplicationTopology2.close();
                        TestUtils.waitForCondition(() -> {
                            return ((ReadOnlyKeyValueStore) buildWithDeduplicationTopology3.store(StoreQueryParameters.fromNameAndType(this.storeName, QueryableStoreTypes.keyValueStore()))).get(Integer.valueOf(KEY_0)) != null;
                        }, REBALANCE_TIMEOUT, "Could not get key from recovered main store");
                        this.skipRecord.set(false);
                        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, Collections.singletonList(new KeyValue(Integer.valueOf(KEY_1), Integer.valueOf(KEY_0))), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, new Properties()), 10L);
                        TestUtils.waitForCondition(() -> {
                            return buildWithDeduplicationTopology3.state() == KafkaStreams.State.ERROR;
                        }, "Stream instance 1 did not go into error state");
                        if (buildWithDeduplicationTopology3 != null) {
                            if (th3 != null) {
                                try {
                                    buildWithDeduplicationTopology3.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                buildWithDeduplicationTopology3.close();
                            }
                        }
                        if (buildWithDeduplicationTopology2 != null) {
                            if (KEY_0 != 0) {
                                try {
                                    buildWithDeduplicationTopology2.close();
                                } catch (Throwable th5) {
                                    th2.addSuppressed(th5);
                                }
                            } else {
                                buildWithDeduplicationTopology2.close();
                            }
                        }
                        if (buildWithDeduplicationTopology != null) {
                            if (KEY_0 == 0) {
                                buildWithDeduplicationTopology.close();
                                return;
                            }
                            try {
                                buildWithDeduplicationTopology.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        }
                    } catch (Throwable th7) {
                        th3 = th7;
                        throw th7;
                    }
                } catch (Throwable th8) {
                    if (buildWithDeduplicationTopology3 != null) {
                        if (th3 != null) {
                            try {
                                buildWithDeduplicationTopology3.close();
                            } catch (Throwable th9) {
                                th3.addSuppressed(th9);
                            }
                        } else {
                            buildWithDeduplicationTopology3.close();
                        }
                    }
                    throw th8;
                }
            } catch (Throwable th10) {
                if (buildWithDeduplicationTopology2 != null) {
                    if (KEY_0 != 0) {
                        try {
                            buildWithDeduplicationTopology2.close();
                        } catch (Throwable th11) {
                            th2.addSuppressed(th11);
                        }
                    } else {
                        buildWithDeduplicationTopology2.close();
                    }
                }
                throw th10;
            }
        } catch (Throwable th12) {
            if (buildWithDeduplicationTopology != null) {
                if (KEY_0 != 0) {
                    try {
                        buildWithDeduplicationTopology.close();
                    } catch (Throwable th13) {
                        th.addSuppressed(th13);
                    }
                } else {
                    buildWithDeduplicationTopology.close();
                }
            }
            throw th12;
        }
    }

    private KafkaStreams buildWithDeduplicationTopology(String str) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(this.storeName), Serdes.Integer(), Serdes.Integer()));
        streamsBuilder.stream(this.inputTopic).transform(() -> {
            return new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() { // from class: org.apache.kafka.streams.integration.StandbyTaskEOSIntegrationTest.1
                private KeyValueStore store;

                public void init(ProcessorContext processorContext) {
                    this.store = processorContext.getStateStore(StandbyTaskEOSIntegrationTest.this.storeName);
                }

                public KeyValue<Integer, Integer> transform(Integer num, Integer num2) {
                    if (StandbyTaskEOSIntegrationTest.this.skipRecord.get()) {
                        return KeyValue.pair(num, num2);
                    }
                    if (this.store.get(num) != null) {
                        return null;
                    }
                    this.store.put(num, num2);
                    this.store.flush();
                    if (num.intValue() != StandbyTaskEOSIntegrationTest.KEY_1) {
                        return KeyValue.pair(num, num2);
                    }
                    StandbyTaskEOSIntegrationTest.this.skipRecord.set(true);
                    throw new RuntimeException("Injected test error");
                }

                public void close() {
                }
            };
        }, new String[]{this.storeName}).to(this.outputTopic);
        return new KafkaStreams(streamsBuilder.build(), props(str));
    }

    private Properties props(String str) {
        Properties properties = new Properties();
        properties.put("application.id", this.appId);
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("cache.max.bytes.buffering", Integer.valueOf(KEY_0));
        properties.put("state.dir", str);
        properties.put("num.standby.replicas", Integer.valueOf(KEY_1));
        properties.put("processing.guarantee", "exactly_once");
        properties.put("default.key.serde", Serdes.Integer().getClass());
        properties.put("default.value.serde", Serdes.Integer().getClass());
        properties.put("commit.interval.ms", 1000);
        properties.put("auto.offset.reset", "earliest");
        return properties;
    }
}
