package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.class */
public class GlobalThreadShutDownOrderTest {
    private static final int NUM_BROKERS = 1;
    private static final Properties BROKER_CONFIG = new Properties();

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER;
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private String globalStoreTopic;
    private String streamTopic;
    private boolean firstRecordProcessed;
    private final AtomicInteger closeCounter = new AtomicInteger(0);
    private final int expectedCloseCount = NUM_BROKERS;
    private final MockTime mockTime = CLUSTER.time;
    private final String globalStore = "globalStore";
    private final List<Long> retrievedValuesList = new ArrayList();

    /* loaded from: input_file:org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest$GlobalStoreProcessor.class */
    private class GlobalStoreProcessor extends AbstractProcessor<String, Long> {
        private KeyValueStore<String, Long> store;
        private final String storeName;

        GlobalStoreProcessor(String str) {
            this.storeName = str;
        }

        public void init(ProcessorContext processorContext) {
            super.init(processorContext);
            this.store = processorContext.getStateStore(this.storeName);
        }

        public void process(String str, Long l) {
            GlobalThreadShutDownOrderTest.this.firstRecordProcessed = true;
        }

        public void close() {
            GlobalThreadShutDownOrderTest.this.closeCounter.getAndIncrement();
            for (String str : Arrays.asList("A", "B", "C", "D")) {
                Utils.sleep(1000L);
                GlobalThreadShutDownOrderTest.this.retrievedValuesList.add(this.store.get(str));
            }
        }
    }

    @Before
    public void before() throws Exception {
        this.builder = new StreamsBuilder();
        createTopics();
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", "global-thread-shutdown-test");
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("cache.max.bytes.buffering", 0);
        this.streamsConfiguration.put("commit.interval.ms", 100);
        Consumed with = Consumed.with(Serdes.String(), Serdes.Long());
        this.builder.addGlobalStore(new KeyValueStoreBuilder(Stores.persistentKeyValueStore("globalStore"), Serdes.String(), Serdes.Long(), this.mockTime), this.globalStoreTopic, Consumed.with(Serdes.String(), Serdes.Long()), new MockProcessorSupplier());
        this.builder.stream(this.streamTopic, with).process(() -> {
            return new GlobalStoreProcessor("globalStore");
        }, new String[0]);
    }

    @After
    public void whenShuttingDown() throws Exception {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldFinishGlobalStoreOperationOnShutDown() throws Exception {
        this.kafkaStreams = new KafkaStreams(this.builder.build(), this.streamsConfiguration);
        populateTopics(this.globalStoreTopic);
        populateTopics(this.streamTopic);
        this.kafkaStreams.start();
        TestUtils.waitForCondition(() -> {
            return this.firstRecordProcessed;
        }, 30000L, "Has not processed record within 30 seconds");
        this.kafkaStreams.close(Duration.ofSeconds(30L));
        Assert.assertEquals(Arrays.asList(1L, 2L, 3L, 4L), this.retrievedValuesList);
        Assert.assertEquals(1L, this.closeCounter.get());
    }

    private void createTopics() throws Exception {
        this.streamTopic = "stream-topic";
        this.globalStoreTopic = "global-store-topic";
        CLUSTER.createTopics(this.streamTopic);
        CLUSTER.createTopic(this.globalStoreTopic);
    }

    private void populateTopics(String str) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronously(str, Arrays.asList(new KeyValue("A", 1L), new KeyValue("B", 2L), new KeyValue("C", 3L), new KeyValue("D", 4L)), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class, new Properties()), this.mockTime);
    }

    static {
        BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 1);
        BROKER_CONFIG.put("transaction.state.log.min.isr", Integer.valueOf(NUM_BROKERS));
        CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
    }
}
