/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.util;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.StreamConfig;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
import org.apache.flink.streaming.io.CoReaderIterator;
import org.apache.flink.streaming.util.MockCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;

public class MockCoContext<IN1, IN2, OUT>
implements StreamTaskContext<OUT> {
    private Iterator<IN1> inputIterator1;
    private Iterator<IN2> inputIterator2;
    private List<OUT> outputs;
    private Collector<OUT> collector;
    private StreamRecordSerializer<IN1> inDeserializer1;
    private CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> mockIterator;
    private StreamRecordSerializer<IN2> inDeserializer2;
    private int currentInput = 1;
    private StreamRecord<IN1> reuse1;
    private StreamRecord<IN2> reuse2;

    public MockCoContext(Collection<IN1> input1, Collection<IN2> input2) {
        if (input1.isEmpty() || input2.isEmpty()) {
            throw new RuntimeException("Inputs must not be empty");
        }
        this.inputIterator1 = input1.iterator();
        this.inputIterator2 = input2.iterator();
        TypeInformation inTypeInfo1 = TypeExtractor.getForObject(input1.iterator().next());
        this.inDeserializer1 = new StreamRecordSerializer(inTypeInfo1);
        TypeInformation inTypeInfo2 = TypeExtractor.getForObject(input2.iterator().next());
        this.inDeserializer2 = new StreamRecordSerializer(inTypeInfo2);
        this.mockIterator = new MockCoReaderIterator(this.inDeserializer1, this.inDeserializer2);
        this.outputs = new ArrayList<OUT>();
        this.collector = new MockCollector<OUT>(this.outputs);
    }

    private Integer nextRecord() {
        if (this.inputIterator1.hasNext() && this.inputIterator2.hasNext()) {
            switch (this.currentInput) {
                case 1: {
                    return this.next1();
                }
                case 2: {
                    return this.next2();
                }
            }
            return 0;
        }
        if (this.inputIterator1.hasNext()) {
            return this.next1();
        }
        if (this.inputIterator2.hasNext()) {
            return this.next2();
        }
        return 0;
    }

    private int next1() {
        this.reuse1 = this.inDeserializer1.createInstance();
        this.reuse1.setObject(this.inputIterator1.next());
        this.currentInput = 2;
        return 1;
    }

    private int next2() {
        this.reuse2 = this.inDeserializer2.createInstance();
        this.reuse2.setObject(this.inputIterator2.next());
        this.currentInput = 1;
        return 2;
    }

    public List<OUT> getOutputs() {
        return this.outputs;
    }

    public Collector<OUT> getCollector() {
        return this.collector;
    }

    public StreamRecordSerializer<IN1> getInDeserializer1() {
        return this.inDeserializer1;
    }

    public StreamRecordSerializer<IN2> getInDeserializer2() {
        return this.inDeserializer2;
    }

    public CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> getIterator() {
        return this.mockIterator;
    }

    public static <IN1, IN2, OUT> List<OUT> createAndExecute(CoInvokable<IN1, IN2, OUT> invokable, List<IN1> input1, List<IN2> input2) {
        MockCoContext<IN1, IN2, OUT> mockContext = new MockCoContext<IN1, IN2, OUT>(input1, input2);
        invokable.setup(mockContext);
        try {
            invokable.open(null);
            invokable.invoke();
            invokable.close();
        }
        catch (Exception e) {
            throw new RuntimeException("Cannot invoke invokable.", e);
        }
        return mockContext.getOutputs();
    }

    public StreamConfig getConfig() {
        return null;
    }

    public ClassLoader getUserCodeClassLoader() {
        return null;
    }

    public <X> MutableObjectIterator<X> getInput(int index) {
        switch (index) {
            case 0: {
                return (MutableObjectIterator)this.inputIterator1;
            }
            case 1: {
                return (MutableObjectIterator)this.inputIterator2;
            }
        }
        throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
    }

    public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
        switch (index) {
            case 0: {
                return this.inDeserializer1;
            }
            case 1: {
                return this.inDeserializer2;
            }
        }
        throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
    }

    public <X, Y> CoReaderIterator<X, Y> getCoReader() {
        return this.mockIterator;
    }

    public Collector<OUT> getOutputCollector() {
        return this.collector;
    }

    private class MockCoReaderIterator
    extends CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> {
        public MockCoReaderIterator(TypeSerializer<StreamRecord<IN1>> serializer1, TypeSerializer<StreamRecord<IN2>> serializer2) {
            super(null, serializer1, serializer2);
            MockCoContext.this.reuse1 = MockCoContext.this.inDeserializer1.createInstance();
            MockCoContext.this.reuse2 = MockCoContext.this.inDeserializer2.createInstance();
        }

        public int next(StreamRecord<IN1> target1, StreamRecord<IN2> target2) throws IOException {
            this.delegate1.setInstance(target1);
            this.delegate2.setInstance(target2);
            int inputNumber = MockCoContext.this.nextRecord();
            target1.setObject(MockCoContext.this.reuse1.getObject());
            target2.setObject(MockCoContext.this.reuse2.getObject());
            return inputNumber;
        }
    }
}

