package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.LogDirDescription;
import org.apache.kafka.clients.admin.ReplicaInfo;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.Timeout;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.class */
public class PurgeRepartitionTopicIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final String INPUT_TOPIC = "input-stream";
    private static final String APPLICATION_ID = "restore-test";
    private static final String REPARTITION_TOPIC = "restore-test-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition";
    private static Admin adminClient;
    private static KafkaStreams kafkaStreams;
    private static final Integer PURGE_INTERVAL_MS = 10;
    private static final Integer PURGE_SEGMENT_BYTES = 2000;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties() { // from class: org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.1
        {
            put("log.retention.check.interval.ms", PurgeRepartitionTopicIntegrationTest.PURGE_INTERVAL_MS);
            put("file.delete.delay.ms", 0);
        }
    });

    @Rule
    public Timeout globalTimeout = Timeout.seconds(600);
    private final Time time = CLUSTER.time;

    /* loaded from: input_file:org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest$RepartitionTopicCreatedWithExpectedConfigs.class */
    private class RepartitionTopicCreatedWithExpectedConfigs implements TestCondition {
        private RepartitionTopicCreatedWithExpectedConfigs() {
        }

        public final boolean conditionMet() {
            try {
                if (!((Set) PurgeRepartitionTopicIntegrationTest.adminClient.listTopics().names().get()).contains(PurgeRepartitionTopicIntegrationTest.REPARTITION_TOPIC)) {
                    return false;
                }
                try {
                    ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, PurgeRepartitionTopicIntegrationTest.REPARTITION_TOPIC);
                    Config config = (Config) ((KafkaFuture) PurgeRepartitionTopicIntegrationTest.adminClient.describeConfigs(Collections.singleton(configResource)).values().get(configResource)).get();
                    if (config.get("cleanup.policy").value().equals("delete") && config.get("segment.ms").value().equals(PurgeRepartitionTopicIntegrationTest.PURGE_INTERVAL_MS.toString())) {
                        if (config.get("segment.bytes").value().equals(PurgeRepartitionTopicIntegrationTest.PURGE_SEGMENT_BYTES.toString())) {
                            return true;
                        }
                    }
                    return false;
                } catch (Exception e) {
                    return false;
                }
            } catch (Exception e2) {
                return false;
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest$RepartitionTopicVerified.class */
    private class RepartitionTopicVerified implements TestCondition {
        private final TopicSizeVerifier verifier;

        RepartitionTopicVerified(TopicSizeVerifier topicSizeVerifier) {
            this.verifier = topicSizeVerifier;
        }

        public final boolean conditionMet() {
            PurgeRepartitionTopicIntegrationTest.this.time.sleep(PurgeRepartitionTopicIntegrationTest.PURGE_INTERVAL_MS.intValue());
            try {
                Iterator it = ((Map) ((KafkaFuture) PurgeRepartitionTopicIntegrationTest.adminClient.describeLogDirs(Collections.singleton(0)).descriptions().get(0)).get()).values().iterator();
                while (it.hasNext()) {
                    ReplicaInfo replicaInfo = (ReplicaInfo) ((LogDirDescription) it.next()).replicaInfos().get(new TopicPartition(PurgeRepartitionTopicIntegrationTest.REPARTITION_TOPIC, 0));
                    if (replicaInfo != null && this.verifier.verify(replicaInfo.size())) {
                        return true;
                    }
                }
                return false;
            } catch (Exception e) {
                return false;
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest$TopicSizeVerifier.class */
    private interface TopicSizeVerifier {
        boolean verify(long j);
    }

    @BeforeClass
    public static void startCluster() throws IOException, InterruptedException {
        CLUSTER.start();
        CLUSTER.createTopic(INPUT_TOPIC, 1, 1);
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void setup() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        adminClient = Admin.create(properties);
        Properties properties2 = new Properties();
        properties2.put("application.id", APPLICATION_ID);
        properties2.put("commit.interval.ms", PURGE_INTERVAL_MS);
        properties2.put("repartition.purge.interval.ms", PURGE_INTERVAL_MS);
        properties2.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties2.put("default.key.serde", Serdes.Integer().getClass());
        properties2.put("default.value.serde", Serdes.Integer().getClass());
        properties2.put("state.dir", TestUtils.tempDirectory(APPLICATION_ID).getPath());
        properties2.put(StreamsConfig.topicPrefix("segment.ms"), PURGE_INTERVAL_MS);
        properties2.put(StreamsConfig.topicPrefix("segment.bytes"), PURGE_SEGMENT_BYTES);
        properties2.put(StreamsConfig.producerPrefix("batch.size"), Integer.valueOf(PURGE_SEGMENT_BYTES.intValue() / 2));
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(INPUT_TOPIC).groupBy(MockMapper.selectKeyKeyValueMapper()).count();
        kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties2, this.time);
    }

    @After
    public void shutdown() {
        if (kafkaStreams != null) {
            kafkaStreams.close(Duration.ofSeconds(30L));
        }
    }

    @Test
    public void shouldRestoreState() throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 1000; i++) {
            arrayList.add(new KeyValue(Integer.valueOf(i), Integer.valueOf(i)));
        }
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, arrayList, TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class), Long.valueOf(this.time.milliseconds()));
        kafkaStreams.start();
        TestUtils.waitForCondition(new RepartitionTopicCreatedWithExpectedConfigs(), IntegrationTestUtils.DEFAULT_TIMEOUT, "Repartition topic restore-test-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition not created with the expected configs after 60000 ms.");
        TestUtils.waitForCondition(new RepartitionTopicVerified(j -> {
            return j > ((long) PURGE_SEGMENT_BYTES.intValue());
        }), IntegrationTestUtils.DEFAULT_TIMEOUT, "Repartition topic restore-test-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition not received more than " + PURGE_SEGMENT_BYTES + "B of data after 60000 ms.");
        TestUtils.waitForCondition(new RepartitionTopicVerified(j2 -> {
            return j2 <= ((long) PURGE_SEGMENT_BYTES.intValue());
        }), IntegrationTestUtils.DEFAULT_TIMEOUT, "Repartition topic restore-test-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition not purged data after 60000 ms.");
    }
}
