package org.apache.kafka.streams.integration;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/StateRestorationIntegrationTest.class */
public class StateRestorationIntegrationTest {
    private static final String APPLICATION_ID = "restoration-test-app";
    private static final String STATE_STORE_NAME = "stateStore";
    private static final String INPUT_TOPIC = "input";
    private static final String OUTPUT_TOPIC = "output";
    private Properties streamsConfiguration;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private StreamsBuilder builder = new StreamsBuilder();
    private final MockTime mockTime = CLUSTER.time;

    @Before
    public void setUp() throws Exception {
        this.streamsConfiguration = StreamsTestUtils.getStreamsConfig(APPLICATION_ID, CLUSTER.bootstrapServers(), Serdes.Integer().getClass().getName(), Serdes.ByteArray().getClass().getName(), new Properties());
        CLUSTER.createTopics(INPUT_TOPIC);
        CLUSTER.createTopics(OUTPUT_TOPIC);
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldRestoreNullRecord() throws InterruptedException, ExecutionException {
        this.builder.table(INPUT_TOPIC, Materialized.as(Stores.persistentTimestampedKeyValueStore(STATE_STORE_NAME)).withKeySerde(Serdes.Integer()).withValueSerde(Serdes.Bytes()).withCachingDisabled()).toStream().to(OUTPUT_TOPIC);
        Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, BytesSerializer.class);
        List asList = Arrays.asList(KeyValue.pair(3, new Bytes(new byte[]{3})), KeyValue.pair(3, (Object) null), KeyValue.pair(1, new Bytes(new byte[]{1})));
        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_TOPIC, asList, producerConfig, this.mockTime);
        KafkaStreams kafkaStreams = new KafkaStreams(this.builder.build(this.streamsConfiguration), this.streamsConfiguration);
        kafkaStreams.start();
        Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), IntegerDeserializer.class, BytesDeserializer.class);
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig, OUTPUT_TOPIC, asList);
        kafkaStreams.close();
        kafkaStreams.cleanUp();
        List singletonList = Collections.singletonList(KeyValue.pair(2, new Bytes(new byte[3])));
        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_TOPIC, singletonList, producerConfig, this.mockTime);
        KafkaStreams kafkaStreams2 = new KafkaStreams(this.builder.build(this.streamsConfiguration), this.streamsConfiguration);
        kafkaStreams2.start();
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig, OUTPUT_TOPIC, singletonList);
        kafkaStreams2.close();
    }
}
