/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration.utils;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import kafka.utils.Time;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;

public class IntegrationTestUtils {
    public static final int UNLIMITED_MESSAGES = -1;
    public static final long DEFAULT_TIMEOUT = 30000L;

    public static <V> List<V> readValues(String topic, Properties consumerConfig, int maxMessages) {
        ArrayList<Object> returnList = new ArrayList<Object>();
        List kvs = IntegrationTestUtils.readKeyValues(topic, consumerConfig, maxMessages);
        for (KeyValue kv : kvs) {
            returnList.add(kv.value);
        }
        return returnList;
    }

    public static <K, V> List<KeyValue<K, V>> readKeyValues(String topic, Properties consumerConfig) {
        return IntegrationTestUtils.readKeyValues(topic, consumerConfig, -1);
    }

    public static <K, V> List<KeyValue<K, V>> readKeyValues(String topic, Properties consumerConfig, int maxMessages) {
        KafkaConsumer consumer = new KafkaConsumer(consumerConfig);
        consumer.subscribe(Collections.singletonList(topic));
        int pollIntervalMs = 100;
        int maxTotalPollTimeMs = 2000;
        int totalPollTimeMs = 0;
        ArrayList<KeyValue<K, V>> consumedValues = new ArrayList<KeyValue<K, V>>();
        while (totalPollTimeMs < 2000 && IntegrationTestUtils.continueConsuming(consumedValues.size(), maxMessages)) {
            totalPollTimeMs += 100;
            ConsumerRecords records = consumer.poll(100L);
            for (ConsumerRecord record : records) {
                consumedValues.add(new KeyValue(record.key(), record.value()));
            }
        }
        consumer.close();
        return consumedValues;
    }

    private static boolean continueConsuming(int messagesConsumed, int maxMessages) {
        return maxMessages <= 0 || messagesConsumed < maxMessages;
    }

    public static void purgeLocalStreamsState(Properties streamsConfiguration) throws IOException {
        File node;
        String tmpDir = TestUtils.IO_TMP_DIR.getPath();
        String path = streamsConfiguration.getProperty("state.dir");
        if (path != null && (node = Paths.get(path, new String[0]).normalize().toFile()).getAbsolutePath().startsWith(tmpDir)) {
            Utils.delete((File)new File(node.getAbsolutePath()));
        }
    }

    public static <K, V> void produceKeyValuesSynchronously(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Time time) throws ExecutionException, InterruptedException {
        for (KeyValue<K, V> record : records) {
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, Collections.singleton(record), producerConfig, time.milliseconds());
            time.sleep(1L);
        }
    }

    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Long timestamp) throws ExecutionException, InterruptedException {
        KafkaProducer producer = new KafkaProducer(producerConfig);
        for (KeyValue<K, V> record : records) {
            Future f = producer.send(new ProducerRecord(topic, null, timestamp, record.key, record.value));
            f.get();
        }
        producer.flush();
        producer.close();
    }

    public static <V> void produceValuesSynchronously(String topic, Collection<V> records, Properties producerConfig, Time time) throws ExecutionException, InterruptedException {
        ArrayList keyedRecords = new ArrayList();
        for (V value : records) {
            KeyValue kv = new KeyValue(null, value);
            keyedRecords.add(kv);
        }
        IntegrationTestUtils.produceKeyValuesSynchronously(topic, keyedRecords, producerConfig, time);
    }

    public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(Properties consumerConfig, String topic, int expectedNumRecords) throws InterruptedException {
        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, expectedNumRecords, 30000L);
    }

    public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig, final String topic, final int expectedNumRecords, long waitTime) throws InterruptedException {
        final ArrayList<KeyValue<K, V>> accumData = new ArrayList<KeyValue<K, V>>();
        TestCondition valuesRead = new TestCondition(){

            public boolean conditionMet() {
                List readData = IntegrationTestUtils.readKeyValues(topic, consumerConfig);
                accumData.addAll(readData);
                return accumData.size() >= expectedNumRecords;
            }
        };
        String conditionDetails = "Did not receive " + expectedNumRecords + " number of records";
        TestUtils.waitForCondition((TestCondition)valuesRead, (long)waitTime, (String)conditionDetails);
        return accumData;
    }

    public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties consumerConfig, String topic, int expectedNumRecords) throws InterruptedException {
        return IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig, topic, expectedNumRecords, 30000L);
    }

    public static <V> List<V> waitUntilMinValuesRecordsReceived(final Properties consumerConfig, final String topic, final int expectedNumRecords, long waitTime) throws InterruptedException {
        final ArrayList accumData = new ArrayList();
        TestCondition valuesRead = new TestCondition(){

            public boolean conditionMet() {
                List readData = IntegrationTestUtils.readValues(topic, consumerConfig, expectedNumRecords);
                accumData.addAll(readData);
                return accumData.size() >= expectedNumRecords;
            }
        };
        String conditionDetails = "Did not receive " + expectedNumRecords + " number of records";
        TestUtils.waitForCondition((TestCondition)valuesRead, (long)waitTime, (String)conditionDetails);
        return accumData;
    }
}

