package io.kgraph.utils;

import io.kgraph.AbstractIntegrationTest;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:io/kgraph/utils/StreamUtilsTest.class */
public class StreamUtilsTest extends AbstractIntegrationTest {
    private static final String LEFT_INPUT_TOPIC = "left-input-topic";
    private static final String RIGHT_INPUT_TOPIC = "right-input-topic";
    private static final String OUTPUT_TOPIC = "output-topic";
    private static final String AGG_OUTPUT_TOPIC = "agg-output-topic";
    private static final List<Integer> LEFT_INPUT = Arrays.asList(1, 2, 2, 3, 4, 4, 5);
    private static final List<Integer> RIGHT_INPUT = Arrays.asList(2, 3, 3, 4, 5, 5, 6);
    private static Properties PRODUCER_CONFIG;

    private static String[] topics() {
        return new String[]{LEFT_INPUT_TOPIC, RIGHT_INPUT_TOPIC, OUTPUT_TOPIC};
    }

    @BeforeClass
    public static void setupConfigsAndUtils() {
        PRODUCER_CONFIG = ClientUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, new Properties());
    }

    private static <K, V> List<KeyValue<K, V>> consumeData(String str, Class cls, Class cls2, int i, long j) {
        ArrayList arrayList = new ArrayList();
        KafkaConsumer kafkaConsumer = new KafkaConsumer(ClientUtils.consumerConfig(CLUSTER.bootstrapServers(), "testgroup", cls, cls2, new Properties()));
        Throwable th = null;
        try {
            try {
                kafkaConsumer.subscribe(Collections.singleton(str));
                long currentTimeMillis = System.currentTimeMillis() + j;
                while (System.currentTimeMillis() < currentTimeMillis && continueConsuming(arrayList.size(), i)) {
                    Iterator it = kafkaConsumer.poll(Duration.ofMillis(Math.max(1L, currentTimeMillis - System.currentTimeMillis()))).iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        if (consumerRecord.value() != null) {
                            arrayList.add(new KeyValue(consumerRecord.key(), consumerRecord.value()));
                        }
                    }
                }
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                return arrayList;
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (th != null) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }

    private static boolean continueConsuming(int i, int i2) {
        return i2 < 0 || i < i2;
    }

    @Test
    public void testCollectionToStream() throws Exception {
        ArrayList arrayList = new ArrayList();
        for (Integer num : LEFT_INPUT) {
            arrayList.add(new KeyValue(num, num));
        }
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        StreamUtils.streamFromCollection(streamsBuilder, PRODUCER_CONFIG, LEFT_INPUT_TOPIC, 50, (short) 1, Serdes.Integer(), Serdes.Integer(), arrayList).to(OUTPUT_TOPIC);
        startStreams(streamsBuilder, Serdes.Integer(), Serdes.Integer());
        Thread.sleep(1000L);
        for (KeyValue keyValue : consumeData(OUTPUT_TOPIC, IntegerDeserializer.class, IntegerDeserializer.class, 26, 10000L)) {
            Assert.assertEquals(keyValue.key, keyValue.value);
        }
        this.streams.close();
    }

    @Override // io.kgraph.AbstractIntegrationTest
    @After
    public void cleanup() throws Exception {
        CLUSTER.deleteTopicsAndWait(120000L, topics());
    }
}
