package org.apache.beam.runners.direct;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.direct_java.runners.core.InMemoryStateInternals;
import org.apache.beam.repackaged.direct_java.runners.core.StateInternals;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespace;
import org.apache.beam.repackaged.direct_java.runners.core.StateTable;
import org.apache.beam.repackaged.direct_java.runners.core.StateTag;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.state.StateContexts;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.class */
public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals {
    private final CopyOnAccessInMemoryStateTable table;
    private K key;

    /* loaded from: input_file:org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals$CopyOnAccessInMemoryStateTable.class */
    private static class CopyOnAccessInMemoryStateTable extends StateTable {
        private Optional<StateTable> underlying;
        private StateBinderFactory binderFactory;
        private Optional<Instant> earliestWatermarkHold = Optional.empty();

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals$CopyOnAccessInMemoryStateTable$CopyOnBindBinderFactory.class */
        public static class CopyOnBindBinderFactory implements StateBinderFactory {
            private final Optional<StateTable> underlying;

            public CopyOnBindBinderFactory(Optional<StateTable> optional) {
                this.underlying = optional;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public boolean containedInUnderlying(StateNamespace stateNamespace, StateTag<?> stateTag) {
                return this.underlying.isPresent() && this.underlying.get().isNamespaceInUse(stateNamespace) && this.underlying.get().getTagsInUse(stateNamespace).containsKey(stateTag);
            }

            @Override // org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.CopyOnAccessInMemoryStateTable.StateBinderFactory
            public StateTag.StateBinder forNamespace(final StateNamespace stateNamespace, final StateContext<?> stateContext) {
                return new StateTag.StateBinder() { // from class: org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.CopyOnAccessInMemoryStateTable.CopyOnBindBinderFactory.1
                    @Override // org.apache.beam.repackaged.direct_java.runners.core.StateTag.StateBinder
                    public WatermarkHoldState bindWatermark(StateTag<WatermarkHoldState> stateTag, TimestampCombiner timestampCombiner) {
                        return CopyOnBindBinderFactory.this.containedInUnderlying(stateNamespace, stateTag) ? ((StateTable) CopyOnBindBinderFactory.this.underlying.get()).get(stateNamespace, stateTag, stateContext).copy() : new InMemoryStateInternals.InMemoryWatermarkHold(timestampCombiner);
                    }

                    @Override // org.apache.beam.repackaged.direct_java.runners.core.StateTag.StateBinder
                    public <T> ValueState<T> bindValue(StateTag<ValueState<T>> stateTag, Coder<T> coder) {
                        return CopyOnBindBinderFactory.this.containedInUnderlying(stateNamespace, stateTag) ? ((StateTable) CopyOnBindBinderFactory.this.underlying.get()).get(stateNamespace, stateTag, stateContext).copy() : new InMemoryStateInternals.InMemoryValue(coder);
                    }

                    @Override // org.apache.beam.repackaged.direct_java.runners.core.StateTag.StateBinder
                    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(StateTag<CombiningState<InputT, AccumT, OutputT>> stateTag, Coder<AccumT> coder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
                        return CopyOnBindBinderFactory.this.containedInUnderlying(stateNamespace, stateTag) ? ((StateTable) CopyOnBindBinderFactory.this.underlying.get()).get(stateNamespace, stateTag, stateContext).copy() : new InMemoryStateInternals.InMemoryCombiningState(combineFn, coder);
                    }

                    @Override // org.apache.beam.repackaged.direct_java.runners.core.StateTag.StateBinder
                    public <T> BagState<T> bindBag(StateTag<BagState<T>> stateTag, Coder<T> coder) {
                        return CopyOnBindBinderFactory.this.containedInUnderlying(stateNamespace, stateTag) ? ((StateTable) CopyOnBindBinderFactory.this.underlying.get()).get(stateNamespace, stateTag, stateContext).copy() : new InMemoryStateInternals.InMemoryBag(coder);
                    }

                    @Override // org.apache.beam.repackaged.direct_java.runners.core.StateTag.StateBinder
                    public <T> SetState<T> bindSet(StateTag<SetState<T>> stateTag, Coder<T> coder) {
                        return CopyOnBindBinderFactory.this.containedInUnderlying(stateNamespace, stateTag) ? ((StateTable) CopyOnBindBinderFactory.this.underlying.get()).get(stateNamespace, stateTag, stateContext).copy() : new InMemoryStateInternals.InMemorySet(coder);
                    }

                    @Override // org.apache.beam.repackaged.direct_java.runners.core.StateTag.StateBinder
                    public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(StateTag<MapState<KeyT, ValueT>> stateTag, Coder<KeyT> coder, Coder<ValueT> coder2) {
                        return CopyOnBindBinderFactory.this.containedInUnderlying(stateNamespace, stateTag) ? ((StateTable) CopyOnBindBinderFactory.this.underlying.get()).get(stateNamespace, stateTag, stateContext).copy() : new InMemoryStateInternals.InMemoryMap(coder, coder2);
                    }

                    @Override // org.apache.beam.repackaged.direct_java.runners.core.StateTag.StateBinder
                    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(StateTag<CombiningState<InputT, AccumT, OutputT>> stateTag, Coder<AccumT> coder, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) {
                        return bindCombiningValue(stateTag, coder, CombineFnUtil.bindContext(combineFnWithContext, stateContext));
                    }
                };
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals$CopyOnAccessInMemoryStateTable$InMemoryStateBinderFactory.class */
        public static class InMemoryStateBinderFactory implements StateBinderFactory {
            @Override // org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.CopyOnAccessInMemoryStateTable.StateBinderFactory
            public StateTag.StateBinder forNamespace(StateNamespace stateNamespace, StateContext<?> stateContext) {
                return new InMemoryStateInternals.InMemoryStateBinder(stateContext);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals$CopyOnAccessInMemoryStateTable$ReadThroughBinderFactory.class */
        public static class ReadThroughBinderFactory<K> implements StateBinderFactory {
            private final StateTable underlying;

            public ReadThroughBinderFactory(StateTable stateTable) {
                this.underlying = stateTable;
            }

            public Instant readThroughAndGetEarliestHold(StateTable stateTable) {
                ReadableInstant readableInstant;
                ReadableInstant readableInstant2 = BoundedWindow.TIMESTAMP_MAX_VALUE;
                for (StateNamespace stateNamespace : this.underlying.getNamespacesInUse()) {
                    for (Map.Entry<StateTag, State> entry : this.underlying.getTagsInUse(stateNamespace).entrySet()) {
                        if (!entry.getValue().isCleared()) {
                            WatermarkHoldState watermarkHoldState = stateTable.get(stateNamespace, entry.getKey(), StateContexts.nullContext());
                            if ((watermarkHoldState instanceof WatermarkHoldState) && (readableInstant = (Instant) watermarkHoldState.read()) != null && readableInstant.isBefore(readableInstant2)) {
                                readableInstant2 = readableInstant;
                            }
                        }
                    }
                }
                return readableInstant2;
            }

            @Override // org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.CopyOnAccessInMemoryStateTable.StateBinderFactory
            public StateTag.StateBinder forNamespace(final StateNamespace stateNamespace, final StateContext<?> stateContext) {
                return new StateTag.StateBinder() { // from class: org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.CopyOnAccessInMemoryStateTable.ReadThroughBinderFactory.1
                    @Override // org.apache.beam.repackaged.direct_java.runners.core.StateTag.StateBinder
                    public WatermarkHoldState bindWatermark(StateTag<WatermarkHoldState> stateTag, TimestampCombiner timestampCombiner) {
                        return ReadThroughBinderFactory.this.underlying.get(stateNamespace, stateTag, stateContext);
                    }

                    @Override // org.apache.beam.repackaged.direct_java.runners.core.StateTag.StateBinder
                    public <T> ValueState<T> bindValue(StateTag<ValueState<T>> stateTag, Coder<T> coder) {
                        return ReadThroughBinderFactory.this.underlying.get(stateNamespace, stateTag, stateContext);
                    }

                    @Override // org.apache.beam.repackaged.direct_java.runners.core.StateTag.StateBinder
                    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(StateTag<CombiningState<InputT, AccumT, OutputT>> stateTag, Coder<AccumT> coder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
                        return ReadThroughBinderFactory.this.underlying.get(stateNamespace, stateTag, stateContext);
                    }

                    @Override // org.apache.beam.repackaged.direct_java.runners.core.StateTag.StateBinder
                    public <T> BagState<T> bindBag(StateTag<BagState<T>> stateTag, Coder<T> coder) {
                        return ReadThroughBinderFactory.this.underlying.get(stateNamespace, stateTag, stateContext);
                    }

                    @Override // org.apache.beam.repackaged.direct_java.runners.core.StateTag.StateBinder
                    public <T> SetState<T> bindSet(StateTag<SetState<T>> stateTag, Coder<T> coder) {
                        return ReadThroughBinderFactory.this.underlying.get(stateNamespace, stateTag, stateContext);
                    }

                    @Override // org.apache.beam.repackaged.direct_java.runners.core.StateTag.StateBinder
                    public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(StateTag<MapState<KeyT, ValueT>> stateTag, Coder<KeyT> coder, Coder<ValueT> coder2) {
                        return ReadThroughBinderFactory.this.underlying.get(stateNamespace, stateTag, stateContext);
                    }

                    @Override // org.apache.beam.repackaged.direct_java.runners.core.StateTag.StateBinder
                    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(StateTag<CombiningState<InputT, AccumT, OutputT>> stateTag, Coder<AccumT> coder, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) {
                        return bindCombiningValue(stateTag, coder, CombineFnUtil.bindContext(combineFnWithContext, stateContext));
                    }
                };
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals$CopyOnAccessInMemoryStateTable$StateBinderFactory.class */
        public interface StateBinderFactory {
            StateTag.StateBinder forNamespace(StateNamespace stateNamespace, StateContext<?> stateContext);
        }

        public CopyOnAccessInMemoryStateTable(StateTable stateTable) {
            this.underlying = Optional.ofNullable(stateTable);
            this.binderFactory = new CopyOnBindBinderFactory(this.underlying);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void commit() {
            ReadableInstant earliestWatermarkHold = getEarliestWatermarkHold();
            if (this.underlying.isPresent()) {
                ReadThroughBinderFactory readThroughBinderFactory = new ReadThroughBinderFactory(this.underlying.get());
                this.binderFactory = readThroughBinderFactory;
                ReadableInstant readThroughAndGetEarliestHold = readThroughBinderFactory.readThroughAndGetEarliestHold(this);
                if (readThroughAndGetEarliestHold.isBefore(earliestWatermarkHold)) {
                    earliestWatermarkHold = readThroughAndGetEarliestHold;
                }
            }
            this.earliestWatermarkHold = Optional.of(earliestWatermarkHold);
            clearEmpty();
            this.binderFactory = new InMemoryStateBinderFactory();
            this.underlying = Optional.empty();
        }

        private Instant getEarliestWatermarkHold() {
            ReadableInstant readableInstant;
            ReadableInstant readableInstant2 = BoundedWindow.TIMESTAMP_MAX_VALUE;
            Iterator<State> it = values().iterator();
            while (it.hasNext()) {
                WatermarkHoldState watermarkHoldState = (State) it.next();
                if ((watermarkHoldState instanceof WatermarkHoldState) && (readableInstant = (Instant) watermarkHoldState.read()) != null && readableInstant.isBefore(readableInstant2)) {
                    readableInstant2 = readableInstant;
                }
            }
            return readableInstant2;
        }

        private void clearEmpty() {
            HashSet hashSet = new HashSet(getNamespacesInUse());
            for (StateNamespace stateNamespace : getNamespacesInUse()) {
                Iterator<State> it = getTagsInUse(stateNamespace).values().iterator();
                while (true) {
                    if (it.hasNext()) {
                        if (!((State) it.next()).isCleared()) {
                            hashSet.remove(stateNamespace);
                            break;
                        }
                    } else {
                        break;
                    }
                }
            }
            Iterator it2 = hashSet.iterator();
            while (it2.hasNext()) {
                clearNamespace((StateNamespace) it2.next());
            }
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.core.StateTable
        protected StateTag.StateBinder binderForNamespace(StateNamespace stateNamespace, StateContext<?> stateContext) {
            return this.binderFactory.forNamespace(stateNamespace, stateContext);
        }
    }

    public static <K> CopyOnAccessInMemoryStateInternals withUnderlying(K k, @Nullable CopyOnAccessInMemoryStateInternals copyOnAccessInMemoryStateInternals) {
        return new CopyOnAccessInMemoryStateInternals(k, copyOnAccessInMemoryStateInternals);
    }

    private CopyOnAccessInMemoryStateInternals(K k, CopyOnAccessInMemoryStateInternals copyOnAccessInMemoryStateInternals) {
        this.key = k;
        this.table = new CopyOnAccessInMemoryStateTable(copyOnAccessInMemoryStateInternals == null ? null : copyOnAccessInMemoryStateInternals.table);
    }

    public CopyOnAccessInMemoryStateInternals commit() {
        this.table.commit();
        return this;
    }

    public Instant getEarliestWatermarkHold() {
        Preconditions.checkState(this.table.earliestWatermarkHold.isPresent(), "Can't get the earliest watermark hold in a %s before it is committed", getClass().getSimpleName());
        return (Instant) this.table.earliestWatermarkHold.get();
    }

    @Override // org.apache.beam.repackaged.direct_java.runners.core.StateInternals
    public <T extends State> T state(StateNamespace stateNamespace, StateTag<T> stateTag, StateContext<?> stateContext) {
        return (T) this.table.get(stateNamespace, stateTag, stateContext);
    }

    @Override // org.apache.beam.repackaged.direct_java.runners.core.StateInternals
    public Object getKey() {
        return this.key;
    }

    public boolean isEmpty() {
        return Iterables.isEmpty(this.table.values());
    }
}
