package org.apache.asterix.external.operators;

import java.io.Serializable;
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.dataset.adapter.FeedAdapter;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.utils.TaskUtil;

/* loaded from: input_file:org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.class */
public class FeedIntakeOperatorNodePushable extends ActiveSourceOperatorNodePushable {
    private final int partition;
    private final IAdapterFactory adapterFactory;
    private final FeedIntakeOperatorDescriptor opDesc;
    private volatile AdapterRuntimeManager adapterRuntimeManager;

    public FeedIntakeOperatorNodePushable(IHyracksTaskContext iHyracksTaskContext, EntityId entityId, IAdapterFactory iAdapterFactory, int i, FeedPolicyAccessor feedPolicyAccessor, IRecordDescriptorProvider iRecordDescriptorProvider, FeedIntakeOperatorDescriptor feedIntakeOperatorDescriptor) {
        super(iHyracksTaskContext, new ActiveRuntimeId(entityId, FeedIntakeOperatorNodePushable.class.getSimpleName(), i));
        this.opDesc = feedIntakeOperatorDescriptor;
        this.recordDesc = iRecordDescriptorProvider.getOutputRecordDescriptor(this.opDesc.getActivityId(), 0);
        this.partition = i;
        this.adapterFactory = iAdapterFactory;
    }

    protected void start() throws HyracksDataException, InterruptedException {
        try {
            try {
                this.writer.open();
                Thread.currentThread().setName("Intake Thread");
                this.adapterRuntimeManager = new AdapterRuntimeManager(this.ctx, this.runtimeId.getEntityId(), (FeedAdapter) this.adapterFactory.createAdapter(this.ctx, this.partition), this.writer, this.partition);
                VSizeFrame vSizeFrame = new VSizeFrame(this.ctx);
                TaskUtil.putInSharedMap("HYX:MSG", vSizeFrame, this.ctx);
                vSizeFrame.getBuffer().put((byte) 1);
                vSizeFrame.getBuffer().flip();
                this.adapterRuntimeManager.start();
                synchronized (this.adapterRuntimeManager) {
                    while (!this.adapterRuntimeManager.isDone()) {
                        this.adapterRuntimeManager.wait();
                    }
                }
                if (this.adapterRuntimeManager.isFailed()) {
                    throw new RuntimeDataException(3008, new Serializable[0]);
                }
            } catch (Exception e) {
                this.writer.fail();
                throw e;
            }
        } finally {
            this.writer.close();
        }
    }

    protected void abort() throws HyracksDataException, InterruptedException {
        if (this.adapterRuntimeManager != null) {
            this.adapterRuntimeManager.stop();
        }
    }
}
