package org.apache.kafka.streams.kstream.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.class */
class CogroupedStreamAggregateBuilder<K, VOut> {
    private final InternalStreamsBuilder builder;
    private final Map<KGroupedStreamImpl<K, ?>, GraphNode> parentNodes = new LinkedHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public CogroupedStreamAggregateBuilder(InternalStreamsBuilder internalStreamsBuilder) {
        this.builder = internalStreamsBuilder;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <KR> KTable<KR, VOut> build(Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> map, Initializer<VOut> initializer, NamedInternal namedInternal, StoreBuilder<?> storeBuilder, Serde<KR> serde, Serde<VOut> serde2, String str) {
        processRepartitions(map, storeBuilder);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        boolean z = false;
        int i = 0;
        for (Map.Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> entry : map.entrySet()) {
            KStreamAggregate kStreamAggregate = new KStreamAggregate(storeBuilder.name(), initializer, entry.getValue());
            arrayList2.add(kStreamAggregate);
            int i2 = i;
            i++;
            StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(namedInternal.suffixWithOrElseGet("-cogroup-agg-" + i2, this.builder, "COGROUPKSTREAM-AGGREGATE-"), z, storeBuilder, kStreamAggregate);
            z = true;
            arrayList.add(statefulProcessorNode);
            this.builder.addGraphNode(this.parentNodes.get(entry.getKey()), statefulProcessorNode);
        }
        return createTable(arrayList, arrayList2, namedInternal, serde, serde2, str, storeBuilder.name());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <KR, W extends Window> KTable<KR, VOut> build(Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> map, Initializer<VOut> initializer, NamedInternal namedInternal, StoreBuilder<?> storeBuilder, Serde<KR> serde, Serde<VOut> serde2, String str, Windows<W> windows) {
        processRepartitions(map, storeBuilder);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        boolean z = false;
        int i = 0;
        for (Map.Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> entry : map.entrySet()) {
            KStreamWindowAggregate kStreamWindowAggregate = new KStreamWindowAggregate(windows, storeBuilder.name(), initializer, entry.getValue());
            arrayList2.add(kStreamWindowAggregate);
            int i2 = i;
            i++;
            StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(namedInternal.suffixWithOrElseGet("-cogroup-agg-" + i2, this.builder, "COGROUPKSTREAM-AGGREGATE-"), z, storeBuilder, kStreamWindowAggregate);
            z = true;
            arrayList.add(statefulProcessorNode);
            this.builder.addGraphNode(this.parentNodes.get(entry.getKey()), statefulProcessorNode);
        }
        return createTable(arrayList, arrayList2, namedInternal, serde, serde2, str, storeBuilder.name());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <KR> KTable<KR, VOut> build(Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> map, Initializer<VOut> initializer, NamedInternal namedInternal, StoreBuilder<?> storeBuilder, Serde<KR> serde, Serde<VOut> serde2, String str, SessionWindows sessionWindows, Merger<? super K, VOut> merger) {
        processRepartitions(map, storeBuilder);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        boolean z = false;
        int i = 0;
        for (Map.Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> entry : map.entrySet()) {
            KStreamSessionWindowAggregate kStreamSessionWindowAggregate = new KStreamSessionWindowAggregate(sessionWindows, storeBuilder.name(), initializer, entry.getValue(), merger);
            arrayList2.add(kStreamSessionWindowAggregate);
            int i2 = i;
            i++;
            StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(namedInternal.suffixWithOrElseGet("-cogroup-agg-" + i2, this.builder, "COGROUPKSTREAM-AGGREGATE-"), z, storeBuilder, kStreamSessionWindowAggregate);
            z = true;
            arrayList.add(statefulProcessorNode);
            this.builder.addGraphNode(this.parentNodes.get(entry.getKey()), statefulProcessorNode);
        }
        return createTable(arrayList, arrayList2, namedInternal, serde, serde2, str, storeBuilder.name());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <KR> KTable<KR, VOut> build(Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> map, Initializer<VOut> initializer, NamedInternal namedInternal, StoreBuilder<?> storeBuilder, Serde<KR> serde, Serde<VOut> serde2, String str, SlidingWindows slidingWindows) {
        processRepartitions(map, storeBuilder);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        boolean z = false;
        int i = 0;
        for (Map.Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> entry : map.entrySet()) {
            KStreamSlidingWindowAggregate kStreamSlidingWindowAggregate = new KStreamSlidingWindowAggregate(slidingWindows, storeBuilder.name(), initializer, entry.getValue());
            arrayList.add(kStreamSlidingWindowAggregate);
            int i2 = i;
            i++;
            StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(namedInternal.suffixWithOrElseGet("-cogroup-agg-" + i2, this.builder, "COGROUPKSTREAM-AGGREGATE-"), z, storeBuilder, kStreamSlidingWindowAggregate);
            z = true;
            arrayList2.add(statefulProcessorNode);
            this.builder.addGraphNode(this.parentNodes.get(entry.getKey()), statefulProcessorNode);
        }
        return createTable(arrayList2, arrayList, namedInternal, serde, serde2, str, storeBuilder.name());
    }

    private void processRepartitions(Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> map, StoreBuilder<?> storeBuilder) {
        for (KGroupedStreamImpl<K, ?> kGroupedStreamImpl : map.keySet()) {
            if (kGroupedStreamImpl.repartitionRequired) {
                OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, ?> optimizableRepartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
                createRepartitionSource(kGroupedStreamImpl.userProvidedRepartitionTopicName != null ? kGroupedStreamImpl.userProvidedRepartitionTopicName : storeBuilder.name(), optimizableRepartitionNodeBuilder, kGroupedStreamImpl.keySerde, kGroupedStreamImpl.valueSerde);
                if (!this.parentNodes.containsKey(kGroupedStreamImpl)) {
                    OptimizableRepartitionNode<K, ?> build = optimizableRepartitionNodeBuilder.build();
                    this.builder.addGraphNode(kGroupedStreamImpl.graphNode, build);
                    this.parentNodes.put(kGroupedStreamImpl, build);
                }
            } else {
                this.parentNodes.put(kGroupedStreamImpl, kGroupedStreamImpl.graphNode);
            }
        }
        Collection<? extends AbstractStream<K, ?>> arrayList = new ArrayList<>(this.parentNodes.keySet());
        AbstractStream abstractStream = (AbstractStream) arrayList.iterator().next();
        arrayList.remove(abstractStream);
        abstractStream.ensureCopartitionWith(arrayList);
    }

    <KR, VIn> KTable<KR, VOut> createTable(Collection<GraphNode> collection, Collection<KStreamAggProcessorSupplier> collection2, NamedInternal namedInternal, Serde<KR> serde, Serde<VOut> serde2, String str, String str2) {
        String suffixWithOrElseGet = namedInternal.suffixWithOrElseGet("-cogroup-merge", this.builder, "COGROUPKSTREAM-MERGE-");
        KTablePassThrough kTablePassThrough = new KTablePassThrough(collection2, str2);
        ProcessorGraphNode processorGraphNode = new ProcessorGraphNode(suffixWithOrElseGet, new ProcessorParameters(kTablePassThrough, suffixWithOrElseGet));
        this.builder.addGraphNode(collection, processorGraphNode);
        return new KTableImpl(suffixWithOrElseGet, serde, serde2, (Set<String>) Collections.singleton(processorGraphNode.nodeName()), str, kTablePassThrough, processorGraphNode, this.builder);
    }

    private StatefulProcessorNode<K, ?> getStatefulProcessorNode(String str, boolean z, StoreBuilder<?> storeBuilder, ProcessorSupplier<K, ?, K, ?> processorSupplier) {
        return !z ? new StatefulProcessorNode<>(str, new ProcessorParameters(processorSupplier, str), storeBuilder) : new StatefulProcessorNode<>(str, new ProcessorParameters(processorSupplier, str), new String[]{storeBuilder.name()});
    }

    private <VIn> void createRepartitionSource(String str, OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, ?> optimizableRepartitionNodeBuilder, Serde<K> serde, Serde<?> serde2) {
        KStreamImpl.createRepartitionedSource(this.builder, serde, serde2, str, null, optimizableRepartitionNodeBuilder);
    }
}
