package org.apache.nemo.runtime.executor.task;

import java.io.IOException;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.punctuation.Finishmark;
import org.apache.nemo.common.punctuation.Watermark;
import org.apache.nemo.runtime.executor.data.DataUtil;
import org.apache.nemo.runtime.executor.datatransfer.InputReader;
import org.apache.nemo.runtime.executor.datatransfer.InputWatermarkManager;
import org.apache.nemo.runtime.executor.datatransfer.MultiInputWatermarkManager;
import org.apache.nemo.runtime.executor.datatransfer.SingleInputWatermarkManager;
import org.apache.nemo.runtime.executor.datatransfer.WatermarkWithIndex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@NotThreadSafe
/* loaded from: input_file:org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.class */
public class MultiThreadParentTaskDataFetcher extends DataFetcher {
    private static final Logger LOG = LoggerFactory.getLogger(MultiThreadParentTaskDataFetcher.class);
    private final InputReader readersForParentTask;
    private final ExecutorService queueInsertionThreads;
    private boolean firstFetch;
    private final ConcurrentLinkedQueue elementQueue;
    private long serBytes;
    private long encodedBytes;
    private int numOfIterators;
    private int numOfFinishMarks;
    private InputWatermarkManager inputWatermarkManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher$WatermarkCollector.class */
    public final class WatermarkCollector implements OutputCollector {
        private WatermarkCollector() {
        }

        public void emit(Object obj) {
            throw new IllegalStateException("Should not be called");
        }

        public void emitWatermark(Watermark watermark) {
            MultiThreadParentTaskDataFetcher.this.elementQueue.offer(watermark);
        }

        public void emit(String str, Object obj) {
            throw new IllegalStateException("Should not be called");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MultiThreadParentTaskDataFetcher(IRVertex iRVertex, InputReader inputReader, OutputCollector outputCollector) {
        super(iRVertex, outputCollector);
        this.firstFetch = true;
        this.serBytes = 0L;
        this.encodedBytes = 0L;
        this.numOfFinishMarks = 0;
        this.readersForParentTask = inputReader;
        this.firstFetch = true;
        this.elementQueue = new ConcurrentLinkedQueue();
        this.queueInsertionThreads = Executors.newCachedThreadPool();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.nemo.runtime.executor.task.DataFetcher
    public Object fetchDataElement() throws IOException, NoSuchElementException {
        if (this.firstFetch) {
            fetchDataLazily();
            this.firstFetch = false;
        }
        do {
            Object poll = this.elementQueue.poll();
            if (poll == null) {
                throw new NoSuchElementException();
            }
            if (!(poll instanceof Finishmark)) {
                return poll;
            }
            this.numOfFinishMarks++;
        } while (this.numOfFinishMarks != this.numOfIterators);
        return Finishmark.getInstance();
    }

    private void fetchDataLazily() {
        List<CompletableFuture<DataUtil.IteratorWithNumBytes>> read = this.readersForParentTask.read();
        this.numOfIterators = read.size();
        if (this.numOfIterators > 1) {
            this.inputWatermarkManager = new MultiInputWatermarkManager(this.numOfIterators, new WatermarkCollector());
        } else {
            this.inputWatermarkManager = new SingleInputWatermarkManager(new WatermarkCollector());
        }
        read.forEach(completableFuture -> {
            completableFuture.whenComplete((iteratorWithNumBytes, th) -> {
                this.queueInsertionThreads.submit(() -> {
                    if (th != null) {
                        th.printStackTrace();
                        throw new RuntimeException(th);
                    }
                    while (iteratorWithNumBytes.hasNext()) {
                        T next = iteratorWithNumBytes.next();
                        if (next instanceof WatermarkWithIndex) {
                            synchronized (this.inputWatermarkManager) {
                                WatermarkWithIndex watermarkWithIndex = (WatermarkWithIndex) next;
                                this.inputWatermarkManager.trackAndEmitWatermarks(watermarkWithIndex.getIndex(), watermarkWithIndex.getWatermark());
                            }
                        } else {
                            this.elementQueue.offer(next);
                        }
                    }
                    countBytesSynchronized(iteratorWithNumBytes);
                    this.elementQueue.offer(Finishmark.getInstance());
                });
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final long getSerializedBytes() {
        return this.serBytes;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final long getEncodedBytes() {
        return this.encodedBytes;
    }

    private synchronized void countBytesSynchronized(DataUtil.IteratorWithNumBytes iteratorWithNumBytes) {
        try {
            this.serBytes += iteratorWithNumBytes.getNumSerializedBytes();
        } catch (IllegalStateException e) {
            LOG.error("Failed to get the number of bytes of serialized data - the data is not ready yet ", e);
        } catch (DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e2) {
            this.serBytes = -1L;
        }
        try {
            this.encodedBytes += iteratorWithNumBytes.getNumEncodedBytes();
        } catch (IllegalStateException e3) {
            LOG.error("Failed to get the number of bytes of encoded data - the data is not ready yet ", e3);
        } catch (DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e4) {
            this.encodedBytes = -1L;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.queueInsertionThreads.shutdown();
    }
}
