package org.apache.flink.table.runtime.operators.join.stream.state;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.util.IterableIterator;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.class */
public final class JoinRecordStateViews {

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews$InputSideHasNoUniqueKey.class */
    private static final class InputSideHasNoUniqueKey implements JoinRecordStateView {
        private final MapState<RowData, Integer> recordState;

        private InputSideHasNoUniqueKey(RuntimeContext runtimeContext, String str, InternalTypeInfo<RowData> internalTypeInfo, StateTtlConfig stateTtlConfig) {
            MapStateDescriptor mapStateDescriptor = new MapStateDescriptor(str, internalTypeInfo, Types.INT);
            if (stateTtlConfig.isEnabled()) {
                mapStateDescriptor.enableTimeToLive(stateTtlConfig);
            }
            this.recordState = runtimeContext.getMapState(mapStateDescriptor);
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView
        public void addRecord(RowData rowData) throws Exception {
            Integer num = this.recordState.get(rowData);
            this.recordState.put(rowData, num != null ? Integer.valueOf(num.intValue() + 1) : 1);
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView
        public void retractRecord(RowData rowData) throws Exception {
            Integer num = this.recordState.get(rowData);
            if (num != null) {
                if (num.intValue() > 1) {
                    this.recordState.put(rowData, Integer.valueOf(num.intValue() - 1));
                } else {
                    this.recordState.remove(rowData);
                }
            }
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView
        public Iterable<RowData> getRecords() throws Exception {
            return new IterableIterator<RowData>() { // from class: org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.InputSideHasNoUniqueKey.1
                private final Iterator<Map.Entry<RowData, Integer>> backingIterable;
                private RowData record;
                private int remainingTimes = 0;

                {
                    this.backingIterable = InputSideHasNoUniqueKey.this.recordState.entries().iterator();
                }

                @Override // java.util.Iterator
                public boolean hasNext() {
                    return this.backingIterable.hasNext() || this.remainingTimes > 0;
                }

                @Override // java.util.Iterator
                public RowData next() {
                    if (this.remainingTimes > 0) {
                        Preconditions.checkNotNull(this.record);
                        this.remainingTimes--;
                        return this.record;
                    }
                    Map.Entry<RowData, Integer> next = this.backingIterable.next();
                    this.record = next.getKey();
                    this.remainingTimes = next.getValue().intValue() - 1;
                    return this.record;
                }

                @Override // java.lang.Iterable
                public Iterator<RowData> iterator() {
                    return this;
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews$InputSideHasUniqueKey.class */
    private static final class InputSideHasUniqueKey implements JoinRecordStateView {
        private final MapState<RowData, RowData> recordState;
        private final KeySelector<RowData, RowData> uniqueKeySelector;

        private InputSideHasUniqueKey(RuntimeContext runtimeContext, String str, InternalTypeInfo<RowData> internalTypeInfo, InternalTypeInfo<RowData> internalTypeInfo2, KeySelector<RowData, RowData> keySelector, StateTtlConfig stateTtlConfig) {
            Preconditions.checkNotNull(internalTypeInfo2);
            Preconditions.checkNotNull(keySelector);
            MapStateDescriptor mapStateDescriptor = new MapStateDescriptor(str, internalTypeInfo2, internalTypeInfo);
            if (stateTtlConfig.isEnabled()) {
                mapStateDescriptor.enableTimeToLive(stateTtlConfig);
            }
            this.recordState = runtimeContext.getMapState(mapStateDescriptor);
            this.uniqueKeySelector = keySelector;
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView
        public void addRecord(RowData rowData) throws Exception {
            this.recordState.put(this.uniqueKeySelector.getKey(rowData), rowData);
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView
        public void retractRecord(RowData rowData) throws Exception {
            this.recordState.remove(this.uniqueKeySelector.getKey(rowData));
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView
        public Iterable<RowData> getRecords() throws Exception {
            return this.recordState.values();
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews$JoinKeyContainsUniqueKey.class */
    private static final class JoinKeyContainsUniqueKey implements JoinRecordStateView {
        private final ValueState<RowData> recordState;
        private final List<RowData> reusedList;

        private JoinKeyContainsUniqueKey(RuntimeContext runtimeContext, String str, InternalTypeInfo<RowData> internalTypeInfo, StateTtlConfig stateTtlConfig) {
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor(str, internalTypeInfo);
            if (stateTtlConfig.isEnabled()) {
                valueStateDescriptor.enableTimeToLive(stateTtlConfig);
            }
            this.recordState = runtimeContext.getState(valueStateDescriptor);
            this.reusedList = new ArrayList(1);
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView
        public void addRecord(RowData rowData) throws Exception {
            this.recordState.update(rowData);
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView
        public void retractRecord(RowData rowData) throws Exception {
            this.recordState.clear();
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView
        public Iterable<RowData> getRecords() throws Exception {
            this.reusedList.clear();
            RowData value = this.recordState.value();
            if (value != null) {
                this.reusedList.add(value);
            }
            return this.reusedList;
        }
    }

    public static JoinRecordStateView create(RuntimeContext runtimeContext, String str, JoinInputSideSpec joinInputSideSpec, InternalTypeInfo<RowData> internalTypeInfo, long j) {
        StateTtlConfig createTtlConfig = StateConfigUtil.createTtlConfig(j);
        return joinInputSideSpec.hasUniqueKey() ? joinInputSideSpec.joinKeyContainsUniqueKey() ? new JoinKeyContainsUniqueKey(runtimeContext, str, internalTypeInfo, createTtlConfig) : new InputSideHasUniqueKey(runtimeContext, str, internalTypeInfo, joinInputSideSpec.getUniqueKeyType(), joinInputSideSpec.getUniqueKeySelector(), createTtlConfig) : new InputSideHasNoUniqueKey(runtimeContext, str, internalTypeInfo, createTtlConfig);
    }
}
