package org.apache.flink.streaming.siddhi.operator;

import java.io.IOException;
import java.util.Iterator;
import java.util.PriorityQueue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.siddhi.schema.StreamSchema;
import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;

/* loaded from: input_file:org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.class */
public class SiddhiStreamOperator<IN, OUT> extends AbstractSiddhiOperator<Tuple2<String, IN>, OUT> {
    public SiddhiStreamOperator(SiddhiOperatorContext siddhiOperatorContext, String str) {
        super(siddhiOperatorContext, str);
    }

    @Override // org.apache.flink.streaming.siddhi.operator.AbstractSiddhiOperator
    protected StreamElementSerializer<Tuple2<String, IN>> createStreamRecordSerializer(StreamSchema streamSchema, ExecutionConfig executionConfig) {
        return new StreamElementSerializer<>(SiddhiTypeFactory.getStreamTupleTypeInformation(streamSchema.getTypeInfo()).createSerializer(executionConfig));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.siddhi.operator.AbstractSiddhiOperator
    public void processEvent(String str, StreamSchema<Tuple2<String, IN>> streamSchema, Tuple2<String, IN> tuple2, long j) throws InterruptedException {
        send((String) tuple2.f0, getSiddhiPlan().getInputStreamSchema((String) tuple2.f0).getStreamSerializer().getRow(tuple2.f1), j);
    }

    @Override // org.apache.flink.streaming.siddhi.operator.AbstractSiddhiOperator
    public String getStreamId(Tuple2<String, IN> tuple2) {
        return (String) tuple2.f0;
    }

    @Override // org.apache.flink.streaming.siddhi.operator.AbstractSiddhiOperator
    protected void snapshotQueueState(PriorityQueue<StreamRecord<Tuple2<String, IN>>> priorityQueue, DataOutputView dataOutputView) throws IOException {
        dataOutputView.writeInt(priorityQueue.size());
        Iterator<StreamRecord<Tuple2<String, IN>>> it = priorityQueue.iterator();
        while (it.hasNext()) {
            StreamRecord<Tuple2<String, IN>> next = it.next();
            String str = (String) ((Tuple2) next.getValue()).f0;
            dataOutputView.writeUTF(str);
            getStreamRecordSerializer(str).serialize(next, dataOutputView);
        }
    }

    @Override // org.apache.flink.streaming.siddhi.operator.AbstractSiddhiOperator
    protected PriorityQueue<StreamRecord<Tuple2<String, IN>>> restoreQueuerState(DataInputView dataInputView) throws IOException {
        int readInt = dataInputView.readInt();
        PriorityQueue<StreamRecord<Tuple2<String, IN>>> priorityQueue = new PriorityQueue<>(readInt > 0 ? readInt : 11);
        for (int i = 0; i < readInt; i++) {
            priorityQueue.offer(getStreamRecordSerializer(dataInputView.readUTF()).deserialize(dataInputView).asRecord());
        }
        return priorityQueue;
    }
}
