package org.apache.wayang.flink.operators;

import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.flink.util.Collector;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.wayang.basic.channels.FileChannel;
import org.apache.wayang.basic.operators.ObjectFileSource;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.flink.channels.DataSetChannel;
import org.apache.wayang.flink.execution.FlinkExecutor;

/* loaded from: input_file:org/apache/wayang/flink/operators/FlinkObjectFileSource.class */
public class FlinkObjectFileSource<Type> extends ObjectFileSource<Type> implements FlinkExecutionOperator {
    static final /* synthetic */ boolean $assertionsDisabled;

    public FlinkObjectFileSource(ObjectFileSource<Type> objectFileSource) {
        super(objectFileSource);
    }

    public FlinkObjectFileSource(DataSetType<Type> dataSetType) {
        this(null, dataSetType);
    }

    public FlinkObjectFileSource(String str, DataSetType<Type> dataSetType) {
        super(str, dataSetType);
    }

    @Override // org.apache.wayang.flink.operators.FlinkExecutionOperator
    public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(ChannelInstance[] channelInstanceArr, ChannelInstance[] channelInstanceArr2, FlinkExecutor flinkExecutor, OptimizationContext.OperatorContext operatorContext) throws Exception {
        String inputUrl;
        if (!$assertionsDisabled && channelInstanceArr.length != getNumInputs()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && channelInstanceArr2.length != getNumOutputs()) {
            throw new AssertionError();
        }
        if (getInputUrl() == null) {
            inputUrl = ((FileChannel.Instance) channelInstanceArr[0]).getSinglePath();
        } else {
            if (!$assertionsDisabled && channelInstanceArr.length != 0) {
                throw new AssertionError();
            }
            inputUrl = getInputUrl();
        }
        DataSetChannel.Instance instance = (DataSetChannel.Instance) channelInstanceArr2[0];
        flinkExecutor.fee.setParallelism(flinkExecutor.getNumDefaultPartitions());
        instance.accept(flinkExecutor.fee.createInput(HadoopInputs.readSequenceFile(NullWritable.class, BytesWritable.class, inputUrl)).setParallelism(flinkExecutor.getNumDefaultPartitions()).flatMap(new FlatMapFunction<Tuple2<NullWritable, BytesWritable>, org.apache.wayang.basic.data.Tuple2>() { // from class: org.apache.wayang.flink.operators.FlinkObjectFileSource.1
            public void flatMap(Tuple2<NullWritable, BytesWritable> tuple2, Collector<org.apache.wayang.basic.data.Tuple2> collector) throws Exception {
                Iterator it = ((Iterable) new ObjectInputStream(new ByteArrayInputStream(((BytesWritable) tuple2.f1).getBytes())).readObject()).iterator();
                while (it.hasNext()) {
                    collector.collect((org.apache.wayang.basic.data.Tuple2) it.next());
                }
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Tuple2<NullWritable, BytesWritable>) obj, (Collector<org.apache.wayang.basic.data.Tuple2>) collector);
            }
        }), flinkExecutor);
        return ExecutionOperator.modelEagerExecution(channelInstanceArr, channelInstanceArr2, operatorContext);
    }

    protected ExecutionOperator createCopy() {
        return new FlinkObjectFileSource(getInputUrl(), getType());
    }

    public String getLoadProfileEstimatorConfigurationKey() {
        return "wayang.flink.objectfilesource.load";
    }

    public List<ChannelDescriptor> getSupportedInputChannels(int i) {
        if ($assertionsDisabled || i <= getNumInputs() || (i == 0 && getNumInputs() == 0)) {
            return Collections.singletonList(FileChannel.HDFS_OBJECT_FILE_DESCRIPTOR);
        }
        throw new AssertionError();
    }

    public List<ChannelDescriptor> getSupportedOutputChannels(int i) {
        if ($assertionsDisabled || i <= getNumOutputs() || (i == 0 && getNumOutputs() == 0)) {
            return Collections.singletonList(DataSetChannel.DESCRIPTOR);
        }
        throw new AssertionError();
    }

    @Override // org.apache.wayang.flink.operators.FlinkExecutionOperator
    public boolean containsAction() {
        return true;
    }

    static {
        $assertionsDisabled = !FlinkObjectFileSource.class.desiredAssertionStatus();
    }
}
