package org.apache.flink.streaming.siddhi.operator;

import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
import io.siddhi.core.stream.input.InputHandler;
import io.siddhi.query.api.SiddhiApp;
import io.siddhi.query.api.annotation.Annotation;
import io.siddhi.query.api.annotation.Element;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.compiler.SiddhiCompiler;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
import org.apache.flink.streaming.siddhi.schema.StreamSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.class */
public abstract class AbstractSiddhiOperator<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSiddhiOperator.class);
    protected static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
    private static final String SIDDHI_RUNTIME_STATE_NAME = "siddhiRuntimeState";
    private static final String QUEUED_RECORDS_STATE_NAME = "queuedRecordsState";
    protected final String operatorName;
    private final SiddhiOperatorContext siddhiPlan;
    private final String executionExpression;
    private final boolean isProcessingTime;
    private final Map<String, StreamElementSerializer<IN>> streamRecordSerializers;
    private transient SiddhiManager siddhiManager;
    private transient SiddhiAppRuntime siddhiRuntime;
    private transient Map<String, InputHandler> inputStreamHandlers;
    private transient PriorityQueue<StreamRecord<IN>> priorityQueue;
    private transient ListState<byte[]> siddhiRuntimeState;
    private transient ListState<byte[]> queuedRecordsState;

    public AbstractSiddhiOperator(SiddhiOperatorContext siddhiOperatorContext, String str) {
        validate(siddhiOperatorContext);
        this.executionExpression = siddhiOperatorContext.getFinalExecutionPlan();
        this.siddhiPlan = siddhiOperatorContext;
        this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
        this.streamRecordSerializers = new HashMap();
        this.operatorName = str;
        registerStreamRecordSerializers();
    }

    private void registerStreamRecordSerializers() {
        for (String str : this.siddhiPlan.getInputStreams()) {
            this.streamRecordSerializers.put(str, createStreamRecordSerializer(this.siddhiPlan.getInputStreamSchema(str), this.siddhiPlan.getExecutionConfig()));
        }
    }

    protected abstract StreamElementSerializer<IN> createStreamRecordSerializer(StreamSchema streamSchema, ExecutionConfig executionConfig);

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamElementSerializer<IN> getStreamRecordSerializer(String str) {
        if (this.streamRecordSerializers.containsKey(str)) {
            return this.streamRecordSerializers.get(str);
        }
        throw new UndefinedStreamException("Stream " + str + " not defined");
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        String streamId = getStreamId(streamRecord.getValue());
        StreamSchema<IN> inputStreamSchema = this.siddhiPlan.getInputStreamSchema(streamId);
        if (this.isProcessingTime) {
            processEvent(streamId, inputStreamSchema, streamRecord.getValue(), System.currentTimeMillis());
            checkpointSiddhiRuntimeState();
            return;
        }
        PriorityQueue priorityQueue = getPriorityQueue();
        if (getExecutionConfig().isObjectReuseEnabled()) {
            priorityQueue.offer(new StreamRecord(inputStreamSchema.getTypeSerializer().copy(streamRecord.getValue()), streamRecord.getTimestamp()));
        } else {
            priorityQueue.offer(streamRecord);
        }
        checkpointRecordQueueState();
    }

    protected abstract void processEvent(String str, StreamSchema<IN> streamSchema, IN in, long j) throws Exception;

    /* JADX WARN: Multi-variable type inference failed */
    public void processWatermark(Watermark watermark) throws Exception {
        while (!this.priorityQueue.isEmpty() && this.priorityQueue.peek().getTimestamp() <= watermark.getTimestamp()) {
            StreamRecord<IN> poll = this.priorityQueue.poll();
            String streamId = getStreamId(poll.getValue());
            processEvent(streamId, this.siddhiPlan.getInputStreamSchema(streamId), poll.getValue(), poll.getTimestamp());
        }
        this.output.emitWatermark(watermark);
    }

    public abstract String getStreamId(IN in);

    public PriorityQueue<StreamRecord<IN>> getPriorityQueue() {
        return this.priorityQueue;
    }

    protected SiddhiAppRuntime getSiddhiRuntime() {
        return this.siddhiRuntime;
    }

    public InputHandler getSiddhiInputHandler(String str) {
        return this.inputStreamHandlers.get(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SiddhiOperatorContext getSiddhiPlan() {
        return this.siddhiPlan;
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<OUT>> output) {
        super.setup(streamTask, streamConfig, output);
        if (this.priorityQueue == null) {
            this.priorityQueue = new PriorityQueue<>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator());
        }
        startSiddhiRuntime();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void send(String str, Object[] objArr, long j) throws InterruptedException {
        getSiddhiInputHandler(str).send(j, objArr);
    }

    private static void validate(SiddhiOperatorContext siddhiOperatorContext) {
        SiddhiManager createSiddhiManager = siddhiOperatorContext.createSiddhiManager();
        try {
            createSiddhiManager.validateSiddhiApp(siddhiOperatorContext.getFinalExecutionPlan());
        } finally {
            createSiddhiManager.shutdown();
        }
    }

    private void startSiddhiRuntime() {
        if (this.siddhiRuntime != null) {
            throw new IllegalStateException("Siddhi has already been initialized");
        }
        this.siddhiManager = this.siddhiPlan.createSiddhiManager();
        for (Map.Entry<String, Class<?>> entry : this.siddhiPlan.getExtensions().entrySet()) {
            this.siddhiManager.setExtension(entry.getKey(), entry.getValue());
        }
        SiddhiApp parse = SiddhiCompiler.parse(this.executionExpression);
        Annotation annotation = new Annotation("Name");
        Element element = new Element((String) null, this.operatorName);
        ArrayList arrayList = new ArrayList();
        arrayList.add(element);
        annotation.setElements(arrayList);
        parse.getAnnotations().add(annotation);
        this.siddhiRuntime = this.siddhiManager.createSiddhiAppRuntime(parse);
        this.siddhiRuntime.start();
        registerInputAndOutput(this.siddhiRuntime);
        LOGGER.info("Siddhi {} started", this.siddhiRuntime.getName());
    }

    private void shutdownSiddhiRuntime() {
        if (this.siddhiRuntime == null) {
            throw new IllegalStateException("Siddhi has already shutdown");
        }
        this.siddhiRuntime.shutdown();
        LOGGER.info("Siddhi {} shutdown", this.siddhiRuntime.getName());
        this.siddhiRuntime = null;
        this.siddhiManager.shutdown();
        this.siddhiManager = null;
        this.inputStreamHandlers = null;
    }

    private void registerInputAndOutput(SiddhiAppRuntime siddhiAppRuntime) {
        siddhiAppRuntime.addCallback(this.siddhiPlan.getOutputStreamId(), new StreamOutputHandler(this.siddhiPlan.getOutputStreamType(), (AbstractDefinition) this.siddhiRuntime.getStreamDefinitionMap().get(this.siddhiPlan.getOutputStreamId()), this.output));
        this.inputStreamHandlers = new HashMap();
        for (String str : this.siddhiPlan.getInputStreams()) {
            this.inputStreamHandlers.put(str, siddhiAppRuntime.getInputHandler(str));
        }
    }

    public void close() throws Exception {
        shutdownSiddhiRuntime();
        this.siddhiRuntimeState.clear();
        super.close();
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        checkpointSiddhiRuntimeState();
        checkpointRecordQueueState();
    }

    private void restoreState() throws Exception {
        LOGGER.info("Restore siddhi state");
        Iterator it = ((Iterable) this.siddhiRuntimeState.get()).iterator();
        if (it.hasNext()) {
            this.siddhiRuntime.restore((byte[]) it.next());
        }
        LOGGER.info("Restore queued records state");
        Iterator it2 = ((Iterable) this.queuedRecordsState.get()).iterator();
        if (it2.hasNext()) {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream((byte[]) it2.next());
            DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(byteArrayInputStream);
            try {
                this.priorityQueue = restoreQueuerState(dataInputViewStreamWrapper);
                dataInputViewStreamWrapper.close();
                byteArrayInputStream.close();
            } catch (Throwable th) {
                dataInputViewStreamWrapper.close();
                byteArrayInputStream.close();
                throw th;
            }
        }
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        if (this.siddhiRuntimeState == null) {
            this.siddhiRuntimeState = stateInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor(SIDDHI_RUNTIME_STATE_NAME, new BytePrimitiveArraySerializer()));
        }
        if (this.queuedRecordsState == null) {
            this.queuedRecordsState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor(QUEUED_RECORDS_STATE_NAME, new BytePrimitiveArraySerializer()));
        }
        if (stateInitializationContext.isRestored()) {
            restoreState();
        }
    }

    private void checkpointSiddhiRuntimeState() throws Exception {
        this.siddhiRuntimeState.clear();
        this.siddhiRuntimeState.add(this.siddhiRuntime.snapshot());
        this.queuedRecordsState.clear();
    }

    private void checkpointRecordQueueState() throws Exception {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(byteArrayOutputStream);
        try {
            snapshotQueueState(this.priorityQueue, dataOutputViewStreamWrapper);
            this.queuedRecordsState.clear();
            this.queuedRecordsState.add(byteArrayOutputStream.toByteArray());
        } finally {
            dataOutputViewStreamWrapper.close();
            byteArrayOutputStream.close();
        }
    }

    protected abstract void snapshotQueueState(PriorityQueue<StreamRecord<IN>> priorityQueue, DataOutputView dataOutputView) throws IOException;

    protected abstract PriorityQueue<StreamRecord<IN>> restoreQueuerState(DataInputView dataInputView) throws IOException;
}
