package org.apache.nemo.runtime.executor.data;

import com.google.common.io.CountingInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.nemo.common.DirectByteArrayOutputStream;
import org.apache.nemo.common.coder.DecoderFactory;
import org.apache.nemo.common.coder.EncoderFactory;
import org.apache.nemo.runtime.executor.data.partition.NonSerializedPartition;
import org.apache.nemo.runtime.executor.data.partition.SerializedPartition;
import org.apache.nemo.runtime.executor.data.streamchainer.DecodeStreamChainer;
import org.apache.nemo.runtime.executor.data.streamchainer.EncodeStreamChainer;
import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nemo/runtime/executor/data/DataUtil.class */
public final class DataUtil {
    private static final Logger LOG = LoggerFactory.getLogger(DataUtil.class.getName());

    /* loaded from: input_file:org/apache/nemo/runtime/executor/data/DataUtil$InputStreamIterator.class */
    public static final class InputStreamIterator<T> implements IteratorWithNumBytes<T> {
        private final Iterator<InputStream> inputStreams;
        private final Serializer<?, T> serializer;
        private volatile T next;
        private volatile CountingInputStream serializedCountingStream = null;
        private volatile CountingInputStream encodedCountingStream = null;
        private volatile boolean hasNext = false;
        private volatile boolean cannotContinueDecoding = false;
        private volatile DecoderFactory.Decoder<T> decoder = null;
        private volatile long numSerializedBytes = 0;
        private volatile long numEncodedBytes = 0;

        /* JADX INFO: Access modifiers changed from: package-private */
        public InputStreamIterator(Iterator<InputStream> it, Serializer<?, T> serializer) {
            this.inputStreams = it;
            this.serializer = serializer;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            if (this.hasNext) {
                return true;
            }
            if (this.cannotContinueDecoding) {
                return false;
            }
            while (true) {
                try {
                    if (this.decoder == null) {
                        if (!this.inputStreams.hasNext()) {
                            this.cannotContinueDecoding = true;
                            return false;
                        }
                        this.serializedCountingStream = new CountingInputStream(this.inputStreams.next());
                        this.encodedCountingStream = new CountingInputStream(DataUtil.buildInputStream(this.serializedCountingStream, this.serializer.getDecodeStreamChainers()));
                        this.decoder = this.serializer.getDecoderFactory().create(this.encodedCountingStream);
                    }
                    try {
                        this.next = (T) this.decoder.decode();
                        this.hasNext = true;
                        return true;
                    } catch (IOException e) {
                        this.numSerializedBytes += this.serializedCountingStream.getCount();
                        this.numEncodedBytes += this.encodedCountingStream.getCount();
                        this.serializedCountingStream = null;
                        this.encodedCountingStream = null;
                        this.decoder = null;
                    }
                } catch (IOException e2) {
                    throw new RuntimeException(e2);
                }
            }
        }

        @Override // java.util.Iterator
        public T next() {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            T t = this.next;
            this.next = null;
            this.hasNext = false;
            return t;
        }

        @Override // org.apache.nemo.runtime.executor.data.DataUtil.IteratorWithNumBytes
        public long getNumSerializedBytes() {
            if (hasNext()) {
                throw new IllegalStateException("Iteration not completed.");
            }
            return this.numSerializedBytes;
        }

        @Override // org.apache.nemo.runtime.executor.data.DataUtil.IteratorWithNumBytes
        public long getNumEncodedBytes() {
            if (hasNext()) {
                throw new IllegalStateException("Iteration not completed.");
            }
            return this.numEncodedBytes;
        }
    }

    /* loaded from: input_file:org/apache/nemo/runtime/executor/data/DataUtil$IteratorWithNumBytes.class */
    public interface IteratorWithNumBytes<T> extends Iterator<T> {

        /* loaded from: input_file:org/apache/nemo/runtime/executor/data/DataUtil$IteratorWithNumBytes$NumBytesNotSupportedException.class */
        public static final class NumBytesNotSupportedException extends Exception {
            public NumBytesNotSupportedException() {
                super("Getting number of bytes is not supported");
            }
        }

        static <E> IteratorWithNumBytes<E> of(final Iterator<E> it) {
            return new IteratorWithNumBytes<E>() { // from class: org.apache.nemo.runtime.executor.data.DataUtil.IteratorWithNumBytes.1
                @Override // org.apache.nemo.runtime.executor.data.DataUtil.IteratorWithNumBytes
                public long getNumSerializedBytes() throws NumBytesNotSupportedException {
                    throw new NumBytesNotSupportedException();
                }

                @Override // org.apache.nemo.runtime.executor.data.DataUtil.IteratorWithNumBytes
                public long getNumEncodedBytes() throws NumBytesNotSupportedException {
                    throw new NumBytesNotSupportedException();
                }

                @Override // java.util.Iterator
                public boolean hasNext() {
                    return it.hasNext();
                }

                @Override // java.util.Iterator
                public E next() {
                    return (E) it.next();
                }
            };
        }

        static <E> IteratorWithNumBytes<E> of(final Iterator<E> it, final long j, final long j2) {
            return new IteratorWithNumBytes<E>() { // from class: org.apache.nemo.runtime.executor.data.DataUtil.IteratorWithNumBytes.2
                @Override // org.apache.nemo.runtime.executor.data.DataUtil.IteratorWithNumBytes
                public long getNumSerializedBytes() {
                    return j;
                }

                @Override // org.apache.nemo.runtime.executor.data.DataUtil.IteratorWithNumBytes
                public long getNumEncodedBytes() {
                    return j2;
                }

                @Override // java.util.Iterator
                public boolean hasNext() {
                    return it.hasNext();
                }

                @Override // java.util.Iterator
                public E next() {
                    return (E) it.next();
                }
            };
        }

        long getNumSerializedBytes() throws NumBytesNotSupportedException;

        long getNumEncodedBytes() throws NumBytesNotSupportedException;
    }

    private DataUtil() {
    }

    private static void serializePartition(EncoderFactory encoderFactory, NonSerializedPartition nonSerializedPartition, OutputStream outputStream) throws IOException {
        EncoderFactory.Encoder create = encoderFactory.create(outputStream);
        Iterator it = nonSerializedPartition.getData().iterator();
        while (it.hasNext()) {
            create.encode(it.next());
        }
    }

    public static <K extends Serializable> NonSerializedPartition deserializePartition(int i, Serializer serializer, K k, InputStream inputStream) throws IOException {
        ArrayList arrayList = new ArrayList();
        LimitedInputStream limitedInputStream = new LimitedInputStream(inputStream, i);
        Throwable th = null;
        try {
            InputStreamIterator inputStreamIterator = new InputStreamIterator(Collections.singletonList(limitedInputStream).iterator(), serializer);
            arrayList.getClass();
            inputStreamIterator.forEachRemaining(arrayList::add);
            NonSerializedPartition nonSerializedPartition = new NonSerializedPartition(k, arrayList, inputStreamIterator.getNumSerializedBytes(), inputStreamIterator.getNumEncodedBytes());
            if (limitedInputStream != null) {
                if (0 != 0) {
                    try {
                        limitedInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    limitedInputStream.close();
                }
            }
            return nonSerializedPartition;
        } catch (Throwable th3) {
            if (limitedInputStream != null) {
                if (0 != 0) {
                    try {
                        limitedInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    limitedInputStream.close();
                }
            }
            throw th3;
        }
    }

    public static <K extends Serializable> Iterable<SerializedPartition<K>> convertToSerPartitions(Serializer serializer, Iterable<NonSerializedPartition<K>> iterable) throws IOException {
        ArrayList arrayList = new ArrayList();
        for (NonSerializedPartition<K> nonSerializedPartition : iterable) {
            DirectByteArrayOutputStream directByteArrayOutputStream = new DirectByteArrayOutputStream();
            Throwable th = null;
            try {
                try {
                    OutputStream buildOutputStream = buildOutputStream(directByteArrayOutputStream, serializer.getEncodeStreamChainers());
                    Throwable th2 = null;
                    try {
                        try {
                            serializePartition(serializer.getEncoderFactory(), nonSerializedPartition, buildOutputStream);
                            buildOutputStream.close();
                            arrayList.add(new SerializedPartition(nonSerializedPartition.getKey(), directByteArrayOutputStream.getBufDirectly(), directByteArrayOutputStream.getCount()));
                            if (buildOutputStream != null) {
                                if (0 != 0) {
                                    try {
                                        buildOutputStream.close();
                                    } catch (Throwable th3) {
                                        th2.addSuppressed(th3);
                                    }
                                } else {
                                    buildOutputStream.close();
                                }
                            }
                            if (directByteArrayOutputStream != null) {
                                if (0 != 0) {
                                    try {
                                        directByteArrayOutputStream.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    directByteArrayOutputStream.close();
                                }
                            }
                        } finally {
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (Throwable th5) {
                if (directByteArrayOutputStream != null) {
                    if (th != null) {
                        try {
                            directByteArrayOutputStream.close();
                        } catch (Throwable th6) {
                            th.addSuppressed(th6);
                        }
                    } else {
                        directByteArrayOutputStream.close();
                    }
                }
                throw th5;
            }
        }
        return arrayList;
    }

    public static <K extends Serializable> Iterable<NonSerializedPartition<K>> convertToNonSerPartitions(Serializer serializer, Iterable<SerializedPartition<K>> iterable) throws IOException {
        ArrayList arrayList = new ArrayList();
        for (SerializedPartition<K> serializedPartition : iterable) {
            K key = serializedPartition.getKey();
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(serializedPartition.getData());
            Throwable th = null;
            try {
                try {
                    arrayList.add(deserializePartition(serializedPartition.getLength(), serializer, key, byteArrayInputStream));
                    if (byteArrayInputStream != null) {
                        if (0 != 0) {
                            try {
                                byteArrayInputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            byteArrayInputStream.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (byteArrayInputStream != null) {
                    if (th != null) {
                        try {
                            byteArrayInputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        byteArrayInputStream.close();
                    }
                }
                throw th3;
            }
        }
        return arrayList;
    }

    public static String blockIdToFilePath(String str, String str2) {
        return str2 + "/" + str;
    }

    public static String blockIdToMetaFilePath(String str, String str2) {
        return str2 + "/" + str + "_meta";
    }

    public static Iterable concatNonSerPartitions(Iterable<NonSerializedPartition> iterable) throws IOException {
        Stream stream = new ArrayList().stream();
        Iterator<NonSerializedPartition> it = iterable.iterator();
        while (it.hasNext()) {
            stream = Stream.concat(stream, StreamSupport.stream(it.next().getData().spliterator(), false));
        }
        return (Iterable) stream.collect(Collectors.toList());
    }

    public static InputStream buildInputStream(InputStream inputStream, List<DecodeStreamChainer> list) throws IOException {
        InputStream inputStream2 = inputStream;
        Iterator<DecodeStreamChainer> it = list.iterator();
        while (it.hasNext()) {
            inputStream2 = it.next().chainInput(inputStream2);
        }
        return inputStream2;
    }

    public static OutputStream buildOutputStream(OutputStream outputStream, List<EncodeStreamChainer> list) throws IOException {
        OutputStream outputStream2 = outputStream;
        ArrayList arrayList = new ArrayList(list);
        Collections.reverse(arrayList);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            outputStream2 = ((EncodeStreamChainer) it.next()).chainOutput(outputStream2);
        }
        return outputStream2;
    }
}
