package org.apache.beam.runners.samza.runtime;

import java.util.ArrayList;
import java.util.List;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternalsFactory;
import org.apache.beam.runners.samza.state.SamzaMapState;
import org.apache.beam.runners.samza.state.SamzaSetState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
@ThreadSafe
/* loaded from: input_file:org/apache/beam/runners/samza/runtime/KeyedInternals.class */
public class KeyedInternals<K> {
    private static final ThreadLocal<KeyedStates> threadLocalKeyedStates = new ThreadLocal<>();
    private final StateInternalsFactory<K> stateFactory;
    private final TimerInternalsFactory<K> timerFactory;

    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/KeyedInternals$KeyedStateInternals.class */
    private class KeyedStateInternals implements StateInternals {
        private KeyedStateInternals() {
        }

        public K getKey() {
            return (K) KeyedInternals.this.getKey();
        }

        public <T extends State> T state(StateNamespace stateNamespace, StateTag<T> stateTag, StateContext<?> stateContext) {
            Preconditions.checkState(getKey() != null, "Key is not set before state access in Stateful ParDo.");
            T t = (T) KeyedInternals.this.stateFactory.stateInternalsForKey(getKey()).state(stateNamespace, stateTag, stateContext);
            ((KeyedStates) KeyedInternals.threadLocalKeyedStates.get()).states.add(t);
            return t;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/KeyedInternals$KeyedStates.class */
    public static class KeyedStates<K> {
        private final K key;
        private final List<State> states;

        private KeyedStates(K k) {
            this.key = k;
            this.states = new ArrayList();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/KeyedInternals$KeyedTimerInternals.class */
    private class KeyedTimerInternals implements TimerInternals {
        private KeyedTimerInternals() {
        }

        private TimerInternals getInternals() {
            return KeyedInternals.this.timerFactory.timerInternalsForKey(KeyedInternals.this.getKey());
        }

        public void setTimer(StateNamespace stateNamespace, String str, String str2, Instant instant, Instant instant2, TimeDomain timeDomain) {
            getInternals().setTimer(stateNamespace, str, str2, instant, instant2, timeDomain);
        }

        public void setTimer(TimerInternals.TimerData timerData) {
            getInternals().setTimer(timerData);
        }

        public void deleteTimer(StateNamespace stateNamespace, String str, String str2, TimeDomain timeDomain) {
            getInternals().deleteTimer(stateNamespace, str, str2, timeDomain);
        }

        public void deleteTimer(StateNamespace stateNamespace, String str, String str2) {
            getInternals().deleteTimer(stateNamespace, str, str2);
        }

        public void deleteTimer(TimerInternals.TimerData timerData) {
            getInternals().deleteTimer(timerData);
        }

        public Instant currentProcessingTime() {
            return getInternals().currentProcessingTime();
        }

        public Instant currentSynchronizedProcessingTime() {
            return getInternals().currentSynchronizedProcessingTime();
        }

        public Instant currentInputWatermarkTime() {
            return getInternals().currentInputWatermarkTime();
        }

        public Instant currentOutputWatermarkTime() {
            return getInternals().currentOutputWatermarkTime();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KeyedInternals(StateInternalsFactory<K> stateInternalsFactory, TimerInternalsFactory<K> timerInternalsFactory) {
        this.stateFactory = stateInternalsFactory;
        this.timerFactory = timerInternalsFactory;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StateInternals stateInternals() {
        return new KeyedStateInternals();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TimerInternals timerInternals() {
        return new KeyedTimerInternals();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setKey(K k) {
        Preconditions.checkState(threadLocalKeyedStates.get() == null, "States for key %s is not cleared before processing", k);
        threadLocalKeyedStates.set(new KeyedStates(k));
    }

    K getKey() {
        KeyedStates keyedStates = threadLocalKeyedStates.get();
        if (keyedStates == null) {
            return null;
        }
        return (K) keyedStates.key;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clearKey() {
        List list = threadLocalKeyedStates.get().states;
        list.forEach(state -> {
            if (state instanceof SamzaMapState) {
                ((SamzaMapState) state).closeIterators();
            } else if (state instanceof SamzaSetState) {
                ((SamzaSetState) state).closeIterators();
            }
        });
        list.clear();
        threadLocalKeyedStates.remove();
    }
}
