package io.kgraph.utils;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.NavigableMap;
import java.util.Properties;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/kafka-graphs-core-1.2.0.jar:io/kgraph/utils/StreamUtils.class */
public class StreamUtils {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) StreamUtils.class);

    public static <K, V> KStream<K, V> streamFromCollection(StreamsBuilder streamsBuilder, Properties properties, Serde<K> serde, Serde<V> serde2, Collection<KeyValue<K, V>> collection) {
        return streamFromCollection(streamsBuilder, properties, "temp-" + UUID.randomUUID(), 50, (short) 1, serde, serde2, collection);
    }

    public static <K, V> KStream<K, V> streamFromCollection(StreamsBuilder streamsBuilder, Properties properties, String str, int i, short s, Serde<K> serde, Serde<V> serde2, Collection<KeyValue<K, V>> collection) {
        ClientUtils.createTopic(str, i, s, properties);
        KafkaProducer kafkaProducer = new KafkaProducer(properties, (Serializer) serde.serializer(), (Serializer) serde2.serializer());
        Throwable th = null;
        try {
            try {
                for (KeyValue<K, V> keyValue : collection) {
                    kafkaProducer.send(new ProducerRecord<>(str, keyValue.key, keyValue.value));
                }
                kafkaProducer.flush();
                if (kafkaProducer != null) {
                    if (0 != 0) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                return streamsBuilder.stream(str, Consumed.with(serde, serde2));
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th3;
        }
    }

    public static <K, V> KTable<K, V> tableFromCollection(StreamsBuilder streamsBuilder, Properties properties, Serde<K> serde, Serde<V> serde2, Collection<KeyValue<K, V>> collection) {
        return tableFromCollection(streamsBuilder, properties, "temp-" + UUID.randomUUID(), 50, (short) 1, serde, serde2, collection);
    }

    public static <K, V> KTable<K, V> tableFromCollection(StreamsBuilder streamsBuilder, Properties properties, String str, int i, short s, Serde<K> serde, Serde<V> serde2, Collection<KeyValue<K, V>> collection) {
        ClientUtils.createTopic(str, i, s, properties);
        KafkaProducer kafkaProducer = new KafkaProducer(properties, (Serializer) serde.serializer(), (Serializer) serde2.serializer());
        Throwable th = null;
        try {
            try {
                for (KeyValue<K, V> keyValue : collection) {
                    kafkaProducer.send(new ProducerRecord<>(str, keyValue.key, keyValue.value));
                }
                kafkaProducer.flush();
                if (kafkaProducer != null) {
                    if (0 != 0) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                return streamsBuilder.table(str, Consumed.with(serde, serde2), Materialized.with(serde, serde2));
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th3;
        }
    }

    public static <K, V> List<KeyValue<K, V>> listFromTable(KafkaStreams kafkaStreams, KTable<K, V> kTable) {
        return listFromStore(kafkaStreams, kTable.queryableStoreName());
    }

    public static <K, V> List<KeyValue<K, V>> listFromStore(KafkaStreams kafkaStreams, String str) {
        KeyValueIterator<K, V> all = ((ReadOnlyKeyValueStore) kafkaStreams.store(str, QueryableStoreTypes.keyValueStore())).all();
        Throwable th = null;
        try {
            try {
                ArrayList arrayList = new ArrayList();
                while (all.hasNext()) {
                    arrayList.add(all.next());
                }
                if (all != null) {
                    if (0 != 0) {
                        try {
                            all.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        all.close();
                    }
                }
                return arrayList;
            } finally {
            }
        } catch (Throwable th3) {
            if (all != null) {
                if (th != null) {
                    try {
                        all.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    all.close();
                }
            }
            throw th3;
        }
    }

    public static <K, V> NavigableMap<K, V> mapFromTable(KafkaStreams kafkaStreams, KTable<K, V> kTable) {
        return mapFromStore(kafkaStreams, kTable.queryableStoreName());
    }

    public static <K, V> NavigableMap<K, V> mapFromStore(KafkaStreams kafkaStreams, String str) {
        KeyValueIterator<K, V> all = ((ReadOnlyKeyValueStore) kafkaStreams.store(str, QueryableStoreTypes.keyValueStore())).all();
        Throwable th = null;
        try {
            try {
                TreeMap treeMap = new TreeMap();
                while (all.hasNext()) {
                    KeyValue<K, V> next = all.next();
                    treeMap.put(next.key, next.value);
                }
                if (all != null) {
                    if (0 != 0) {
                        try {
                            all.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        all.close();
                    }
                }
                return treeMap;
            } finally {
            }
        } catch (Throwable th3) {
            if (all != null) {
                if (th != null) {
                    try {
                        all.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    all.close();
                }
            }
            throw th3;
        }
    }
}
