package org.apache.asterix.external.dataflow;

import java.io.Serializable;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IStreamDataParser;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.ITupleFilter;

/* loaded from: input_file:org/apache/asterix/external/dataflow/FeedStreamDataFlowController.class */
public class FeedStreamDataFlowController extends AbstractFeedDataFlowController {
    private final IStreamDataParser dataParser;
    private final AsterixInputStream stream;

    public FeedStreamDataFlowController(IHyracksTaskContext iHyracksTaskContext, FeedLogManager feedLogManager, IStreamDataParser iStreamDataParser, AsterixInputStream asterixInputStream) {
        super(iHyracksTaskContext, feedLogManager, 1);
        this.dataParser = iStreamDataParser;
        this.stream = asterixInputStream;
    }

    @Override // org.apache.asterix.external.api.IDataFlowController
    public void start(IFrameWriter iFrameWriter, ITupleFilter iTupleFilter, long j) throws HyracksDataException {
        if (iTupleFilter != null || j >= 0) {
            throw new RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE, new Serializable[0]);
        }
        try {
            this.tupleForwarder = new TupleForwarder(this.ctx, iFrameWriter);
            while (parseNext()) {
                this.tb.addFieldEndOffset();
                this.tupleForwarder.addTuple(this.tb);
                this.incomingRecordsCount++;
            }
            this.tupleForwarder.complete();
        } catch (Throwable th) {
            throw HyracksDataException.create(th);
        }
    }

    private boolean parseNext() throws HyracksDataException {
        do {
            try {
                this.tb.reset();
                return this.dataParser.parse(this.tb.getDataOutput());
            } catch (Exception e) {
            }
        } while (handleException(e));
        throw e;
    }

    @Override // org.apache.asterix.external.api.IDataFlowController
    public boolean stop(long j) throws HyracksDataException {
        try {
            if (this.stream.stop()) {
                return true;
            }
            this.stream.close();
            return false;
        } catch (Exception e) {
            throw HyracksDataException.create(e);
        }
    }

    private boolean handleException(Throwable th) {
        try {
            boolean handleException = true & this.stream.handleException(th);
            if (handleException) {
                handleException &= this.dataParser.reset(this.stream);
            }
            return handleException;
        } catch (Exception e) {
            th.addSuppressed(e);
            return false;
        }
    }

    @Override // org.apache.asterix.external.dataflow.AbstractFeedDataFlowController
    public String getStats() {
        return "{\"incoming-records-number\": " + this.incomingRecordsCount + "}";
    }
}
