package org.apache.hyracks.client.dataset;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hyracks.api.channels.IInputChannel;
import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.context.IHyracksCommonContext;
import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
import org.apache.hyracks.api.dataset.DatasetJobRecord;
import org.apache.hyracks.api.dataset.IDatasetInputChannelMonitor;
import org.apache.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
import org.apache.hyracks.api.dataset.IHyracksDatasetReader;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.client.net.ClientNetworkManager;
import org.apache.hyracks.comm.channels.DatasetNetworkInputChannel;

/* loaded from: input_file:org/apache/hyracks/client/dataset/HyracksDatasetReader.class */
public class HyracksDatasetReader implements IHyracksDatasetReader {
    private final IHyracksDatasetDirectoryServiceConnection datasetDirectoryServiceConnection;
    private final ClientNetworkManager netManager;
    private final IHyracksCommonContext datasetClientCtx;
    private JobId jobId;
    private ResultSetId resultSetId;
    private DatasetDirectoryRecord[] knownRecords = null;
    private IDatasetInputChannelMonitor[] monitors = null;
    private int lastReadPartition = -1;
    private IDatasetInputChannelMonitor lastMonitor = null;
    private DatasetNetworkInputChannel resultChannel = null;
    private static final Logger LOGGER = Logger.getLogger(HyracksDatasetReader.class.getName());
    private static int NUM_READ_BUFFERS = 1;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hyracks/client/dataset/HyracksDatasetReader$DatasetInputChannelMonitor.class */
    public class DatasetInputChannelMonitor implements IDatasetInputChannelMonitor {
        private final AtomicInteger nAvailableFrames = new AtomicInteger(0);
        private final AtomicBoolean eos = new AtomicBoolean(false);
        private final AtomicBoolean failed = new AtomicBoolean(false);

        public DatasetInputChannelMonitor() {
        }

        public synchronized void notifyFailure(IInputChannel iInputChannel) {
            this.failed.set(true);
            notifyAll();
        }

        public synchronized void notifyDataAvailability(IInputChannel iInputChannel, int i) {
            this.nAvailableFrames.addAndGet(i);
            notifyAll();
        }

        public synchronized void notifyEndOfStream(IInputChannel iInputChannel) {
            this.eos.set(true);
            notifyAll();
        }

        public synchronized boolean eosReached() {
            return this.eos.get();
        }

        public synchronized boolean failed() {
            return this.failed.get();
        }

        public synchronized int getNFramesAvailable() {
            return this.nAvailableFrames.get();
        }

        public synchronized void notifyFrameRead() {
            this.nAvailableFrames.decrementAndGet();
        }
    }

    public HyracksDatasetReader(IHyracksDatasetDirectoryServiceConnection iHyracksDatasetDirectoryServiceConnection, ClientNetworkManager clientNetworkManager, IHyracksCommonContext iHyracksCommonContext, JobId jobId, ResultSetId resultSetId) throws Exception {
        this.datasetDirectoryServiceConnection = iHyracksDatasetDirectoryServiceConnection;
        this.netManager = clientNetworkManager;
        this.datasetClientCtx = iHyracksCommonContext;
        this.jobId = jobId;
        this.resultSetId = resultSetId;
    }

    public DatasetJobRecord.Status getResultStatus() {
        try {
            return this.datasetDirectoryServiceConnection.getDatasetResultStatus(this.jobId, this.resultSetId);
        } catch (Exception e) {
            LOGGER.log(Level.WARNING, "Exception retrieving result set for job " + this.jobId, (Throwable) e);
            return null;
        } catch (HyracksDataException e2) {
            if (e2.getErrorCode() == 24) {
                return null;
            }
            LOGGER.log(Level.WARNING, "Exception retrieving result set for job " + this.jobId, e2);
            return null;
        }
    }

    private DatasetDirectoryRecord getRecord(int i) throws Exception {
        while (true) {
            if (this.knownRecords != null && this.knownRecords[i] != null) {
                return this.knownRecords[i];
            }
            this.knownRecords = this.datasetDirectoryServiceConnection.getDatasetResultLocations(this.jobId, this.resultSetId, this.knownRecords);
        }
    }

    private boolean nextPartition() throws HyracksDataException {
        this.lastReadPartition++;
        try {
            DatasetDirectoryRecord record = getRecord(this.lastReadPartition);
            while (record.getEmpty()) {
                int i = this.lastReadPartition + 1;
                this.lastReadPartition = i;
                if (i >= this.knownRecords.length) {
                    break;
                }
                record = getRecord(this.lastReadPartition);
            }
            if (this.lastReadPartition == this.knownRecords.length) {
                return false;
            }
            this.resultChannel = new DatasetNetworkInputChannel(this.netManager, getSocketAddress(record), this.jobId, this.resultSetId, this.lastReadPartition, NUM_READ_BUFFERS);
            this.lastMonitor = getMonitor(this.lastReadPartition);
            this.resultChannel.registerMonitor(this.lastMonitor);
            this.resultChannel.open(this.datasetClientCtx);
            return true;
        } catch (Exception e) {
            throw HyracksDataException.create(e);
        }
    }

    public int read(IFrame iFrame) throws HyracksDataException {
        iFrame.reset();
        int i = 0;
        if (this.lastReadPartition == -1 && !nextPartition()) {
            return 0;
        }
        while (i < iFrame.getFrameSize() && (this.lastReadPartition != this.knownRecords.length - 1 || !isPartitionReadComplete(this.lastMonitor))) {
            waitForNextFrame(this.lastMonitor);
            if (isPartitionReadComplete(this.lastMonitor)) {
                this.knownRecords[this.lastReadPartition].readEOS();
                this.resultChannel.close();
                if (this.lastReadPartition == this.knownRecords.length - 1 || !nextPartition()) {
                    break;
                }
            } else {
                ByteBuffer nextBuffer = this.resultChannel.getNextBuffer();
                this.lastMonitor.notifyFrameRead();
                if (nextBuffer != null) {
                    if (i <= 0) {
                        iFrame.ensureFrameSize(iFrame.getMinSize() * FrameHelper.deserializeNumOfMinFrame(nextBuffer));
                        iFrame.getBuffer().clear();
                        iFrame.getBuffer().put(nextBuffer);
                        this.resultChannel.recycleBuffer(nextBuffer);
                        i = iFrame.getBuffer().position();
                    } else {
                        iFrame.getBuffer().put(nextBuffer);
                        this.resultChannel.recycleBuffer(nextBuffer);
                        i = iFrame.getBuffer().position();
                    }
                }
            }
        }
        iFrame.getBuffer().flip();
        return i;
    }

    private static void waitForNextFrame(IDatasetInputChannelMonitor iDatasetInputChannelMonitor) throws HyracksDataException {
        synchronized (iDatasetInputChannelMonitor) {
            while (iDatasetInputChannelMonitor.getNFramesAvailable() <= 0 && !iDatasetInputChannelMonitor.eosReached() && !iDatasetInputChannelMonitor.failed()) {
                try {
                    iDatasetInputChannelMonitor.wait();
                } catch (InterruptedException e) {
                    throw new HyracksDataException(e);
                }
            }
        }
        if (iDatasetInputChannelMonitor.failed()) {
            throw new HyracksDataException("Job Failed.");
        }
    }

    private boolean isPartitionReadComplete(IDatasetInputChannelMonitor iDatasetInputChannelMonitor) {
        return iDatasetInputChannelMonitor.getNFramesAvailable() <= 0 && iDatasetInputChannelMonitor.eosReached();
    }

    private SocketAddress getSocketAddress(DatasetDirectoryRecord datasetDirectoryRecord) throws UnknownHostException {
        NetworkAddress networkAddress = datasetDirectoryRecord.getNetworkAddress();
        return new InetSocketAddress(InetAddress.getByAddress(networkAddress.lookupIpAddress()), networkAddress.getPort());
    }

    private IDatasetInputChannelMonitor getMonitor(int i) throws HyracksException {
        if (this.knownRecords == null || this.knownRecords[i] == null) {
            throw new HyracksException("Accessing monitors before the obtaining the corresponding addresses.");
        }
        if (this.monitors == null) {
            this.monitors = new DatasetInputChannelMonitor[this.knownRecords.length];
        }
        if (this.monitors[i] == null) {
            this.monitors[i] = new DatasetInputChannelMonitor();
        }
        return this.monitors[i];
    }
}
