package org.apache.asterix.external.dataflow;

import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.storage.am.common.api.ITupleFilter;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/asterix/external/dataflow/FeedRecordDataFlowController.class */
public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController {
    public static final String INCOMING_RECORDS_COUNT_FIELD_NAME = "incoming-records-count";
    public static final String FAILED_AT_PARSER_RECORDS_COUNT_FIELD_NAME = "failed-at-parser-records-count";
    public static final String READER_STATS_FIELD_NAME = "reader-stats";
    public static final String TIMESTAMP_FIELD_NAME = "timestamp";
    private static final Logger LOGGER = LogManager.getLogger();
    private final IRecordDataParser<T> dataParser;
    private final IRecordReader<T> recordReader;
    protected final AtomicBoolean closed;
    protected static final long INTERVAL = 1000;
    protected State state;
    protected long failedRecordsCount;

    /* loaded from: input_file:org/apache/asterix/external/dataflow/FeedRecordDataFlowController$State.class */
    public enum State {
        CREATED,
        STARTED,
        STOPPED
    }

    public FeedRecordDataFlowController(IHyracksTaskContext iHyracksTaskContext, FeedLogManager feedLogManager, int i, IRecordDataParser<T> iRecordDataParser, IRecordReader<T> iRecordReader) throws HyracksDataException {
        super(iHyracksTaskContext, feedLogManager, i);
        this.closed = new AtomicBoolean(false);
        this.state = State.CREATED;
        this.failedRecordsCount = 0L;
        this.dataParser = iRecordDataParser;
        this.recordReader = iRecordReader;
        iRecordReader.setFeedLogManager(feedLogManager);
        iRecordReader.setController(this);
    }

    @Override // org.apache.asterix.external.api.IDataFlowController
    public void start(IFrameWriter iFrameWriter, ITupleFilter iTupleFilter, long j) throws HyracksDataException, InterruptedException {
        Throwable finish;
        if (iTupleFilter != null || j >= 0) {
            throw new RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE, new Serializable[0]);
        }
        synchronized (this) {
            if (this.state == State.STOPPED) {
                return;
            }
            setState(State.STARTED);
            Exception exc = null;
            try {
                try {
                    this.tupleForwarder = new TupleForwarder(this.ctx, iFrameWriter);
                    while (hasNext()) {
                        IRawRecord<? extends T> next = next();
                        if (next == null) {
                            flush();
                            Thread.sleep(INTERVAL);
                        } else {
                            this.tb.reset();
                            this.incomingRecordsCount++;
                            if (!parseAndForward(next)) {
                                this.failedRecordsCount++;
                            }
                        }
                    }
                    finish = finish(null);
                } catch (HyracksDataException e) {
                    LOGGER.log(Level.WARN, "Exception during ingestion", e);
                    if (e.matches(ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD)) {
                        exc = e;
                        try {
                            flush();
                        } catch (Exception e2) {
                            e2.addSuppressed(e);
                            exc = e2;
                        }
                    } else {
                        exc = e;
                    }
                    finish = finish(exc);
                } catch (Throwable th) {
                    LOGGER.log(Level.WARN, "Failure while operating a feed source", th);
                    finish = finish(th);
                }
                if (finish != null) {
                    if (!(finish instanceof InterruptedException)) {
                        throw HyracksDataException.create(finish);
                    }
                    throw ((InterruptedException) finish);
                }
            } catch (Throwable th2) {
                finish(exc);
                throw th2;
            }
        }
    }

    private synchronized void setState(State state) {
        LOGGER.log(Level.INFO, "State is being set from " + this.state + " to " + state);
        this.state = state;
    }

    public synchronized State getState() {
        return this.state;
    }

    private IRawRecord<? extends T> next() throws Exception {
        try {
            return this.recordReader.next();
        } catch (InterruptedException e) {
            throw e;
        } catch (Exception e2) {
            if (this.flushing) {
                throw e2;
            }
            if (this.recordReader.handleException(e2)) {
                return null;
            }
            throw new RuntimeDataException(ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD, e2, new Serializable[0]);
        }
    }

    private boolean hasNext() throws Exception {
        do {
            try {
                return this.recordReader.hasNext();
            } catch (InterruptedException e) {
                throw e;
            } catch (Exception e2) {
                if (this.flushing) {
                    throw e2;
                }
            }
        } while (this.recordReader.handleException(e2));
        throw new RuntimeDataException(ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD, e2, new Serializable[0]);
    }

    private Throwable finish(Throwable th) {
        Throwable close = CleanupUtils.close(this.recordReader, th);
        if (close == null) {
            try {
                this.tupleForwarder.complete();
            } catch (Throwable th2) {
                close = th2;
            }
        }
        closeSignal();
        setState(State.STOPPED);
        return close;
    }

    private boolean parseAndForward(IRawRecord<? extends T> iRawRecord) throws IOException {
        try {
            if (!this.dataParser.parse(iRawRecord, this.tb.getDataOutput())) {
                return false;
            }
            this.tb.addFieldEndOffset();
            addMetaPart(this.tb, iRawRecord);
            addPrimaryKeys(this.tb, iRawRecord);
            this.tupleForwarder.addTuple(this.tb);
            return true;
        } catch (Exception e) {
            LOGGER.log(Level.WARN, ExternalDataConstants.ERROR_PARSE_RECORD, e);
            this.feedLogManager.logRecord(iRawRecord.toString(), ExternalDataConstants.ERROR_PARSE_RECORD);
            return false;
        }
    }

    protected void addMetaPart(ArrayTupleBuilder arrayTupleBuilder, IRawRecord<? extends T> iRawRecord) throws IOException {
    }

    protected void addPrimaryKeys(ArrayTupleBuilder arrayTupleBuilder, IRawRecord<? extends T> iRawRecord) throws IOException {
    }

    private void closeSignal() {
        synchronized (this.closed) {
            this.closed.set(true);
            this.closed.notifyAll();
        }
    }

    private void waitForSignal(long j) throws InterruptedException, HyracksDataException {
        if (j <= 0) {
            throw new IllegalArgumentException("timeout must be greater than 0");
        }
        synchronized (this.closed) {
            while (!this.closed.get()) {
                long currentTimeMillis = System.currentTimeMillis();
                this.closed.wait(j);
                j -= System.currentTimeMillis() - currentTimeMillis;
                if (!this.closed.get() && j <= 0) {
                    throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.TIMEOUT, new Serializable[0]);
                }
            }
        }
    }

    @Override // org.apache.asterix.external.api.IDataFlowController
    public boolean stop(long j) throws HyracksDataException {
        synchronized (this) {
            switch (this.state) {
                case CREATED:
                case STOPPED:
                    setState(State.STOPPED);
                    return true;
                case STARTED:
                    if (!this.recordReader.stop()) {
                        return false;
                    }
                    try {
                        waitForSignal(j);
                        return true;
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw HyracksDataException.create(e);
                    }
                default:
                    throw new HyracksDataException("unknown state " + this.state);
            }
        }
    }

    public IRecordReader<T> getReader() {
        return this.recordReader;
    }

    public IRecordDataParser<T> getParser() {
        return this.dataParser;
    }

    public long getFailedRecordsCount() {
        return this.failedRecordsCount;
    }

    @Override // org.apache.asterix.external.dataflow.AbstractFeedDataFlowController
    public String getStats() {
        String stats = this.recordReader.getStats();
        StringBuilder sb = new StringBuilder();
        sb.append("{");
        if (stats != null) {
            sb.append("\"reader-stats\":").append(stats).append(ExternalDataConstants.DEFAULT_DELIMITER);
        }
        sb.append("\"timestamp\":").append(System.currentTimeMillis()).append(ExternalDataConstants.DEFAULT_DELIMITER);
        sb.append("\"incoming-records-count\":").append(this.incomingRecordsCount).append(",\"failed-at-parser-records-count\":").append(this.failedRecordsCount).append("}");
        return sb.toString();
    }

    @Override // org.apache.asterix.external.dataflow.AbstractFeedDataFlowController
    public void handleGenericEvent(ActiveManagerMessage activeManagerMessage) {
        this.recordReader.handleGenericEvent(activeManagerMessage);
    }
}
