package org.apache.nemo.runtime.executor.task;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.punctuation.Finishmark;
import org.apache.nemo.runtime.executor.data.DataUtil;
import org.apache.nemo.runtime.executor.datatransfer.InputReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@NotThreadSafe
/* loaded from: input_file:org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.class */
public class ParentTaskDataFetcher extends DataFetcher {
    private static final Logger LOG = LoggerFactory.getLogger(ParentTaskDataFetcher.class);
    private final InputReader readersForParentTask;
    private final LinkedBlockingQueue iteratorQueue;
    private boolean firstFetch;
    private int expectedNumOfIterators;
    private DataUtil.IteratorWithNumBytes currentIterator;
    private int currentIteratorIndex;
    private long serBytes;
    private long encodedBytes;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ParentTaskDataFetcher(IRVertex iRVertex, InputReader inputReader, OutputCollector outputCollector) {
        super(iRVertex, outputCollector);
        this.serBytes = 0L;
        this.encodedBytes = 0L;
        this.readersForParentTask = inputReader;
        this.firstFetch = true;
        this.currentIteratorIndex = 0;
        this.iteratorQueue = new LinkedBlockingQueue();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.nemo.runtime.executor.task.DataFetcher
    public Object fetchDataElement() throws IOException {
        try {
            if (this.firstFetch) {
                fetchDataLazily();
                advanceIterator();
                this.firstFetch = false;
            }
            while (!this.currentIterator.hasNext()) {
                if (this.currentIteratorIndex >= this.expectedNumOfIterators) {
                    return Finishmark.getInstance();
                }
                countBytes(this.currentIterator);
                advanceIterator();
            }
            return this.currentIterator.next();
        } catch (Throwable th) {
            throw new IOException(th);
        }
    }

    private void advanceIterator() throws IOException {
        try {
            Object take = this.iteratorQueue.take();
            if (take instanceof Throwable) {
                throw new IOException((Throwable) take);
            }
            this.currentIterator = (DataUtil.IteratorWithNumBytes) take;
            this.currentIteratorIndex++;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException(e);
        }
    }

    private void fetchDataLazily() throws IOException {
        List<CompletableFuture<DataUtil.IteratorWithNumBytes>> read = this.readersForParentTask.read();
        this.expectedNumOfIterators = read.size();
        read.forEach(completableFuture -> {
            completableFuture.whenComplete((iteratorWithNumBytes, th) -> {
                try {
                    if (th != null) {
                        this.iteratorQueue.put(th);
                    } else {
                        this.iteratorQueue.put(iteratorWithNumBytes);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final long getSerializedBytes() {
        return this.serBytes;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final long getEncodedBytes() {
        return this.encodedBytes;
    }

    private void countBytes(DataUtil.IteratorWithNumBytes iteratorWithNumBytes) {
        try {
            this.serBytes += iteratorWithNumBytes.getNumSerializedBytes();
        } catch (IllegalStateException e) {
            LOG.error("Failed to get the number of bytes of serialized data - the data is not ready yet ", e);
        } catch (DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e2) {
            this.serBytes = -1L;
        }
        try {
            this.encodedBytes += iteratorWithNumBytes.getNumEncodedBytes();
        } catch (IllegalStateException e3) {
            LOG.error("Failed to get the number of bytes of encoded data - the data is not ready yet ", e3);
        } catch (DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e4) {
            this.encodedBytes = -1L;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
    }
}
