package org.apache.flink.table.runtime.operators.join.temporal;

import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.generated.JoinCondition;
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.class */
public class TemporalProcessTimeJoinOperator extends BaseTwoInputStreamOperatorWithStateRetention {
    private static final long serialVersionUID = -5182289624027523612L;
    private final BaseRowTypeInfo rightType;
    private final GeneratedJoinCondition generatedJoinCondition;
    private transient ValueState<BaseRow> rightState;
    private transient JoinCondition joinCondition;
    private transient JoinedRow outRow;
    private transient TimestampedCollector<BaseRow> collector;

    public TemporalProcessTimeJoinOperator(BaseRowTypeInfo baseRowTypeInfo, GeneratedJoinCondition generatedJoinCondition, long j, long j2) {
        super(j, j2);
        this.rightType = baseRowTypeInfo;
        this.generatedJoinCondition = generatedJoinCondition;
    }

    @Override // org.apache.flink.table.runtime.operators.join.temporal.BaseTwoInputStreamOperatorWithStateRetention
    public void open() throws Exception {
        this.joinCondition = this.generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader());
        FunctionUtils.setFunctionRuntimeContext(this.joinCondition, getRuntimeContext());
        FunctionUtils.openFunction(this.joinCondition, new Configuration());
        this.rightState = getRuntimeContext().getState(new ValueStateDescriptor("right", this.rightType));
        this.collector = new TimestampedCollector<>(this.output);
        this.outRow = new JoinedRow();
        super.processWatermark2(Watermark.MAX_WATERMARK);
    }

    public void processElement1(StreamRecord<BaseRow> streamRecord) throws Exception {
        BaseRow baseRow = (BaseRow) this.rightState.value();
        if (baseRow == null) {
            return;
        }
        BaseRow baseRow2 = (BaseRow) streamRecord.getValue();
        if (this.joinCondition.apply(baseRow2, baseRow)) {
            this.outRow.setHeader(baseRow2.getHeader());
            this.outRow.replace(baseRow2, baseRow);
            this.collector.collect(this.outRow);
        }
        registerProcessingCleanupTimer();
    }

    public void processElement2(StreamRecord<BaseRow> streamRecord) throws Exception {
        if (BaseRowUtil.isAccumulateMsg((BaseRow) streamRecord.getValue())) {
            this.rightState.update(streamRecord.getValue());
            registerProcessingCleanupTimer();
        } else {
            this.rightState.clear();
            cleanupLastTimer();
        }
    }

    public void close() throws Exception {
        FunctionUtils.closeFunction(this.joinCondition);
    }

    @Override // org.apache.flink.table.runtime.operators.join.temporal.BaseTwoInputStreamOperatorWithStateRetention
    public void cleanupState(long j) {
        this.rightState.clear();
    }

    public void onEventTime(InternalTimer<Object, VoidNamespace> internalTimer) throws Exception {
    }
}
