package org.apache.flink.ml.common.datastream.sort;

import java.io.Serializable;
import java.util.Iterator;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
import org.apache.flink.configuration.AlgorithmOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.sort.ExternalSorter;
import org.apache.flink.runtime.operators.sort.NonReusingSortMergeCoGroupIterator;
import org.apache.flink.runtime.operators.sort.PushSorter;
import org.apache.flink.runtime.operators.sort.ReusingSortMergeCoGroupIterator;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.TraversableOnceException;

/* loaded from: input_file:org/apache/flink/ml/common/datastream/sort/CoGroupOperator.class */
public class CoGroupOperator<IN1, IN2, KEY extends Serializable, OUT> extends AbstractUdfStreamOperator<OUT, CoGroupFunction<IN1, IN2, OUT>> implements TwoInputStreamOperator<IN1, IN2, OUT>, BoundedMultiInput {
    private PushSorter<Tuple2<byte[], StreamRecord<IN1>>> sorterA;
    private PushSorter<Tuple2<byte[], StreamRecord<IN2>>> sorterB;
    private TypeComparator<Tuple2<byte[], StreamRecord<IN1>>> comparatorA;
    private TypeComparator<Tuple2<byte[], StreamRecord<IN2>>> comparatorB;
    private KeySelector<IN1, KEY> keySelectorA;
    private KeySelector<IN2, KEY> keySelectorB;
    private TypeSerializer<Tuple2<byte[], StreamRecord<IN1>>> keyAndValueSerializerA;
    private TypeSerializer<Tuple2<byte[], StreamRecord<IN2>>> keyAndValueSerializerB;
    private TypeSerializer<KEY> keySerializer;
    private DataOutputSerializer dataOutputSerializer;
    private long lastWatermarkTimestamp;
    private int remainingInputNum;

    /* loaded from: input_file:org/apache/flink/ml/common/datastream/sort/CoGroupOperator$TupleUnwrappingIterator.class */
    private static class TupleUnwrappingIterator<T, K> implements Iterator<T>, Iterable<T>, Serializable {
        private static final long serialVersionUID = 1;
        private K lastKey;
        private Iterator<Tuple2<K, StreamRecord<T>>> iterator;
        private boolean iteratorAvailable;

        private TupleUnwrappingIterator() {
        }

        public void set(Iterator<Tuple2<K, StreamRecord<T>>> it) {
            this.iterator = it;
            this.iteratorAvailable = true;
        }

        public K getLastKey() {
            return this.lastKey;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.iterator.hasNext();
        }

        @Override // java.util.Iterator
        public T next() {
            Tuple2<K, StreamRecord<T>> next = this.iterator.next();
            this.lastKey = (K) next.f0;
            return (T) ((StreamRecord) next.f1).getValue();
        }

        @Override // java.util.Iterator
        public void remove() {
            throw new UnsupportedOperationException();
        }

        @Override // java.lang.Iterable
        public Iterator<T> iterator() {
            if (!this.iteratorAvailable) {
                throw new TraversableOnceException();
            }
            this.iteratorAvailable = false;
            return this;
        }
    }

    public CoGroupOperator(CoGroupFunction<IN1, IN2, OUT> coGroupFunction) {
        super(coGroupFunction);
        this.lastWatermarkTimestamp = Long.MIN_VALUE;
        this.remainingInputNum = 2;
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<OUT>> output) {
        super.setup(streamTask, streamConfig, output);
        ClassLoader userCodeClassLoader = streamTask.getUserCodeClassLoader();
        MemoryManager memoryManager = streamTask.getEnvironment().getMemoryManager();
        IOManager iOManager = streamTask.getEnvironment().getIOManager();
        this.keySelectorA = streamConfig.getStatePartitioner(0, userCodeClassLoader);
        this.keySelectorB = streamConfig.getStatePartitioner(1, userCodeClassLoader);
        this.keySerializer = streamConfig.getStateKeySerializer(userCodeClassLoader);
        int length = this.keySerializer.getLength();
        TypeSerializer typeSerializerIn = streamConfig.getTypeSerializerIn(0, userCodeClassLoader);
        TypeSerializer typeSerializerIn2 = streamConfig.getTypeSerializerIn(1, userCodeClassLoader);
        this.keyAndValueSerializerA = new KeyAndValueSerializer(typeSerializerIn, length);
        this.keyAndValueSerializerB = new KeyAndValueSerializer(typeSerializerIn2, length);
        if (length > 0) {
            this.dataOutputSerializer = new DataOutputSerializer(length);
            this.comparatorA = new FixedLengthByteKeyComparator(length);
            this.comparatorB = new FixedLengthByteKeyComparator(length);
        } else {
            this.dataOutputSerializer = new DataOutputSerializer(64);
            this.comparatorA = new VariableLengthByteKeyComparator();
            this.comparatorB = new VariableLengthByteKeyComparator();
        }
        ExecutionConfig executionConfig = streamTask.getEnvironment().getExecutionConfig();
        double managedMemoryFractionOperatorUseCaseOfSlot = streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.OPERATOR, streamTask.getEnvironment().getTaskConfiguration(), userCodeClassLoader) / 2.0d;
        Configuration jobConfiguration = streamTask.getEnvironment().getJobConfiguration();
        try {
            this.sorterA = ExternalSorter.newBuilder(memoryManager, streamTask, this.keyAndValueSerializerA, this.comparatorA, executionConfig).memoryFraction(managedMemoryFractionOperatorUseCaseOfSlot).enableSpilling(iOManager, ((Float) jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD)).floatValue()).maxNumFileHandles(((Integer) jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN)).intValue()).objectReuse(executionConfig.isObjectReuseEnabled()).largeRecords(((Boolean) jobConfiguration.get(AlgorithmOptions.USE_LARGE_RECORDS_HANDLER)).booleanValue()).build();
            this.sorterB = ExternalSorter.newBuilder(memoryManager, streamTask, this.keyAndValueSerializerB, this.comparatorB, executionConfig).memoryFraction(managedMemoryFractionOperatorUseCaseOfSlot).enableSpilling(iOManager, ((Float) jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD)).floatValue()).maxNumFileHandles(((Integer) jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN)).intValue()).objectReuse(executionConfig.isObjectReuseEnabled()).largeRecords(((Boolean) jobConfiguration.get(AlgorithmOptions.USE_LARGE_RECORDS_HANDLER)).booleanValue()).build();
        } catch (MemoryAllocationException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public void endInput(int i) throws Exception {
        if (i == 1) {
            this.sorterA.finishReading();
            this.remainingInputNum--;
        } else {
            if (i != 2) {
                throw new RuntimeException("Unknown inputId " + i);
            }
            this.sorterB.finishReading();
            this.remainingInputNum--;
        }
        if (this.remainingInputNum > 0) {
            return;
        }
        MutableObjectIterator iterator = this.sorterA.getIterator();
        MutableObjectIterator iterator2 = this.sorterB.getIterator();
        TypePairComparator createComparator12 = new RuntimePairComparatorFactory().createComparator12(this.comparatorA, this.comparatorB);
        ReusingSortMergeCoGroupIterator reusingSortMergeCoGroupIterator = getExecutionConfig().isObjectReuseEnabled() ? new ReusingSortMergeCoGroupIterator(iterator, iterator2, this.keyAndValueSerializerA, this.comparatorA, this.keyAndValueSerializerB, this.comparatorB, createComparator12) : new NonReusingSortMergeCoGroupIterator(iterator, iterator2, this.keyAndValueSerializerA, this.comparatorA, this.keyAndValueSerializerB, this.comparatorB, createComparator12);
        reusingSortMergeCoGroupIterator.open();
        TupleUnwrappingIterator tupleUnwrappingIterator = new TupleUnwrappingIterator();
        TupleUnwrappingIterator tupleUnwrappingIterator2 = new TupleUnwrappingIterator();
        TimestampedCollector timestampedCollector = new TimestampedCollector(this.output);
        while (reusingSortMergeCoGroupIterator.next()) {
            tupleUnwrappingIterator.set(reusingSortMergeCoGroupIterator.getValues1().iterator());
            tupleUnwrappingIterator2.set(reusingSortMergeCoGroupIterator.getValues2().iterator());
            this.userFunction.coGroup(tupleUnwrappingIterator, tupleUnwrappingIterator2, timestampedCollector);
        }
        reusingSortMergeCoGroupIterator.close();
        Watermark watermark = new Watermark(this.lastWatermarkTimestamp);
        if (getTimeServiceManager().isPresent()) {
            ((InternalTimeServiceManager) getTimeServiceManager().get()).advanceWatermark(watermark);
        }
        this.output.emitWatermark(watermark);
    }

    public void processWatermark(Watermark watermark) throws Exception {
        if (this.lastWatermarkTimestamp > watermark.getTimestamp()) {
            throw new RuntimeException("Invalid watermark");
        }
        this.lastWatermarkTimestamp = watermark.getTimestamp();
    }

    public void close() throws Exception {
        super.close();
        this.sorterA.close();
        this.sorterB.close();
    }

    public void processElement1(StreamRecord<IN1> streamRecord) throws Exception {
        this.keySerializer.serialize((Serializable) this.keySelectorA.getKey(streamRecord.getValue()), this.dataOutputSerializer);
        byte[] copyOfBuffer = this.dataOutputSerializer.getCopyOfBuffer();
        this.dataOutputSerializer.clear();
        this.sorterA.writeRecord(Tuple2.of(copyOfBuffer, streamRecord));
    }

    public void processElement2(StreamRecord<IN2> streamRecord) throws Exception {
        this.keySerializer.serialize((Serializable) this.keySelectorB.getKey(streamRecord.getValue()), this.dataOutputSerializer);
        byte[] copyOfBuffer = this.dataOutputSerializer.getCopyOfBuffer();
        this.dataOutputSerializer.clear();
        this.sorterB.writeRecord(Tuple2.of(copyOfBuffer, streamRecord));
    }
}
