package org.apache.wayang.java.operators;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.logging.log4j.LogManager;
import org.apache.wayang.basic.channels.FileChannel;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.plan.wayangplan.UnarySink;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.java.channels.CollectionChannel;
import org.apache.wayang.java.channels.JavaChannelInstance;
import org.apache.wayang.java.channels.StreamChannel;
import org.apache.wayang.java.execution.JavaExecutor;

/* loaded from: input_file:org/apache/wayang/java/operators/JavaObjectFileSink.class */
public class JavaObjectFileSink<T> extends UnarySink<T> implements JavaExecutionOperator {
    private final String targetPath;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/wayang/java/operators/JavaObjectFileSink$StreamChunker.class */
    public static class StreamChunker {
        private final BiConsumer<Object[], Integer> action;
        private final Object[] chunk;
        private long numPushedObjects = 0;
        private int nextIndex = 0;

        public StreamChunker(int i, BiConsumer<Object[], Integer> biConsumer) {
            this.action = biConsumer;
            this.chunk = new Object[i];
        }

        public void push(Object obj) {
            this.numPushedObjects++;
            this.chunk[this.nextIndex] = obj;
            int i = this.nextIndex + 1;
            this.nextIndex = i;
            if (i >= this.chunk.length) {
                fire();
            }
        }

        public void fire() {
            if (this.nextIndex > 0) {
                this.action.accept(this.chunk, Integer.valueOf(this.nextIndex));
                this.nextIndex = 0;
            }
        }
    }

    public JavaObjectFileSink(DataSetType<T> dataSetType) {
        this(null, dataSetType);
    }

    public JavaObjectFileSink(String str, DataSetType<T> dataSetType) {
        super(dataSetType);
        this.targetPath = str;
    }

    @Override // org.apache.wayang.java.operators.JavaExecutionOperator
    public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(ChannelInstance[] channelInstanceArr, ChannelInstance[] channelInstanceArr2, JavaExecutor javaExecutor, OptimizationContext.OperatorContext operatorContext) {
        if (!$assertionsDisabled && channelInstanceArr.length != getNumInputs()) {
            throw new AssertionError();
        }
        String addGivenOrTempPath = ((FileChannel.Instance) channelInstanceArr2[0]).addGivenOrTempPath(this.targetPath, javaExecutor.getCompiler().getConfiguration());
        try {
            SequenceFile.Writer createWriter = SequenceFile.createWriter(new Configuration(true), new SequenceFile.Writer.Option[]{SequenceFile.Writer.file(new Path(addGivenOrTempPath)), SequenceFile.Writer.keyClass(NullWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class)});
            Throwable th = null;
            try {
                StreamChunker streamChunker = new StreamChunker(10, (objArr, num) -> {
                    if (objArr.length != num.intValue()) {
                        objArr = Arrays.copyOfRange(objArr, 0, num.intValue());
                    }
                    try {
                        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                        new ObjectOutputStream(byteArrayOutputStream).writeObject(objArr);
                        createWriter.append(NullWritable.get(), new BytesWritable(byteArrayOutputStream.toByteArray()));
                    } catch (IOException e) {
                        throw new UncheckedIOException("Writing or serialization failed.", e);
                    }
                });
                Stream<T> provideStream = ((JavaChannelInstance) channelInstanceArr[0]).provideStream();
                streamChunker.getClass();
                provideStream.forEach(streamChunker::push);
                streamChunker.fire();
                LogManager.getLogger(getClass()).info("Writing dataset to {}.", addGivenOrTempPath);
                if (createWriter != null) {
                    if (0 != 0) {
                        try {
                            createWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createWriter.close();
                    }
                }
                return ExecutionOperator.modelEagerExecution(channelInstanceArr, channelInstanceArr2, operatorContext);
            } finally {
            }
        } catch (IOException | UncheckedIOException e) {
            throw new WayangException("Could not write stream to sequence file.", e);
        }
    }

    public String getLoadProfileEstimatorConfigurationKey() {
        return "wayang.java.objectfilesink.load";
    }

    protected ExecutionOperator createCopy() {
        return new JavaObjectFileSink(this.targetPath, getType());
    }

    public List<ChannelDescriptor> getSupportedInputChannels(int i) {
        if ($assertionsDisabled || i <= getNumInputs() || (i == 0 && getNumInputs() == 0)) {
            return Arrays.asList(CollectionChannel.DESCRIPTOR, StreamChannel.DESCRIPTOR);
        }
        throw new AssertionError();
    }

    public List<ChannelDescriptor> getSupportedOutputChannels(int i) {
        if ($assertionsDisabled || i <= getNumInputs() || (i == 0 && getNumInputs() == 0)) {
            return Collections.singletonList(FileChannel.HDFS_OBJECT_FILE_DESCRIPTOR);
        }
        throw new AssertionError();
    }

    static {
        $assertionsDisabled = !JavaObjectFileSink.class.desiredAssertionStatus();
    }
}
