package org.apache.beam.runners.samza.runtime;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.lang.ref.SoftReference;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.state.SamzaMapState;
import org.apache.beam.runners.samza.state.SamzaSetState;
import org.apache.beam.runners.samza.transforms.UpdatingCombineFn;
import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.OrderedListState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.ReadableStates;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.state.StateContexts;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.UnsignedBytes;
import org.apache.samza.config.Config;
import org.apache.samza.context.TaskContext;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeFactory;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueStore;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.class */
public class SamzaStoreStateInternals<K> implements StateInternals {
    static final String BEAM_STORE = "beamStore";
    private static final ThreadLocal<SoftReference<ByteArrayOutputStream>> threadLocalBaos = new ThreadLocal<>();
    private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
    private final K key;
    private final byte[] keyBytes;
    private final int batchGetSize;
    private final String stageId;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals$AbstractSamzaState.class */
    public abstract class AbstractSamzaState<T> {
        private final StateNamespace namespace;
        private final String addressId;
        private final boolean isBeamStore;
        private final String stageId;
        private final byte[] keyBytes;
        private byte[] encodedStoreKey;
        protected final Coder<T> coder;
        protected final KeyValueStore<ByteArray, StateValue<T>> store;

        protected AbstractSamzaState(StateNamespace stateNamespace, StateTag<? extends State> stateTag, Coder<T> coder) {
            this.coder = coder;
            this.namespace = stateNamespace;
            this.addressId = stateTag.getId();
            this.isBeamStore = !SamzaStoreStateInternals.this.stores.containsKey(stateTag.getId());
            this.store = this.isBeamStore ? (KeyValueStore) SamzaStoreStateInternals.this.stores.get(SamzaStoreStateInternals.BEAM_STORE) : (KeyValueStore) SamzaStoreStateInternals.this.stores.get(stateTag.getId());
            this.stageId = SamzaStoreStateInternals.this.stageId;
            this.keyBytes = SamzaStoreStateInternals.this.keyBytes;
        }

        protected void clearInternal() {
            this.store.delete(getEncodedStoreKey());
        }

        protected void writeInternal(T t) {
            this.store.put(getEncodedStoreKey(), StateValue.of(t, this.coder));
        }

        protected T readInternal() {
            return decodeValue((StateValue) this.store.get(getEncodedStoreKey()));
        }

        protected ReadableState<Boolean> isEmptyInternal() {
            return new ReadableState<Boolean>() { // from class: org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.AbstractSamzaState.1
                /* renamed from: read, reason: merged with bridge method [inline-methods] */
                public Boolean m29read() {
                    return Boolean.valueOf(AbstractSamzaState.this.store.get(AbstractSamzaState.this.getEncodedStoreKey()) == null);
                }

                public ReadableState<Boolean> readLater() {
                    return this;
                }
            };
        }

        protected ByteArray getEncodedStoreKey() {
            return ByteArray.of(getEncodedStoreKeyBytes());
        }

        protected byte[] getEncodedStoreKeyBytes() {
            if (this.encodedStoreKey == null) {
                ByteArrayOutputStream access$700 = SamzaStoreStateInternals.access$700();
                try {
                    DataOutputStream dataOutputStream = new DataOutputStream(access$700);
                    Throwable th = null;
                    try {
                        try {
                            dataOutputStream.write(this.keyBytes);
                            dataOutputStream.writeUTF(this.namespace.stringKey());
                            if (this.isBeamStore) {
                                dataOutputStream.writeUTF(this.stageId);
                                dataOutputStream.writeUTF(this.addressId);
                            }
                            if (0 != 0) {
                                try {
                                    dataOutputStream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                dataOutputStream.close();
                            }
                            this.encodedStoreKey = access$700.toByteArray();
                        } finally {
                        }
                    } finally {
                    }
                } catch (IOException e) {
                    throw new RuntimeException("Could not encode full address for state: " + this.addressId, e);
                }
            }
            return this.encodedStoreKey;
        }

        protected T decodeValue(StateValue<T> stateValue) {
            if (stateValue == null) {
                return null;
            }
            return stateValue.getValue(this.coder);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            AbstractSamzaState abstractSamzaState = (AbstractSamzaState) obj;
            return (!(this.isBeamStore || abstractSamzaState.isBeamStore) || (this.isBeamStore && abstractSamzaState.isBeamStore && this.stageId.equals(abstractSamzaState.stageId))) && Arrays.equals(this.keyBytes, abstractSamzaState.keyBytes) && this.addressId.equals(abstractSamzaState.addressId) && this.namespace.equals(abstractSamzaState.namespace);
        }

        public int hashCode() {
            return (31 * this.namespace.hashCode()) + Arrays.hashCode(getEncodedStoreKeyBytes());
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals$ByteArray.class */
    public static class ByteArray implements Serializable, Comparable<ByteArray> {
        private final byte[] value;

        public static ByteArray of(byte[] bArr) {
            return new ByteArray(bArr);
        }

        private ByteArray(byte[] bArr) {
            this.value = bArr;
        }

        public byte[] getValue() {
            return this.value;
        }

        public boolean equals(Object obj) {
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return Arrays.equals(this.value, ((ByteArray) obj).value);
        }

        public int hashCode() {
            if (this.value != null) {
                return Arrays.hashCode(this.value);
            }
            return 0;
        }

        @Override // java.lang.Comparable
        public int compareTo(ByteArray byteArray) {
            return UnsignedBytes.lexicographicalComparator().compare(this.value, byteArray.value);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals$ByteArraySerdeFactory.class */
    public static class ByteArraySerdeFactory implements SerdeFactory<ByteArray> {

        /* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals$ByteArraySerdeFactory$ByteArraySerde.class */
        public static class ByteArraySerde implements Serde<ByteArray> {
            public byte[] toBytes(ByteArray byteArray) {
                return byteArray.value;
            }

            /* renamed from: fromBytes, reason: merged with bridge method [inline-methods] */
            public ByteArray m30fromBytes(byte[] bArr) {
                return ByteArray.of(bArr);
            }
        }

        public Serde<ByteArray> getSerde(String str, Config config) {
            return new ByteArraySerde();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals$Factory.class */
    public static class Factory<K> implements StateInternalsFactory<K> {
        private final String stageId;
        private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
        private final Coder<K> keyCoder;
        private final int batchGetSize;

        public Factory(String str, Map<String, KeyValueStore<ByteArray, StateValue<?>>> map, Coder<K> coder, int i) {
            this.stageId = str;
            this.stores = map;
            this.keyCoder = coder;
            this.batchGetSize = i;
        }

        public StateInternals stateInternalsForKey(K k) {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
            if (k != null) {
                try {
                    this.keyCoder.encode(k, byteArrayOutputStream);
                } catch (IOException e) {
                    throw new RuntimeException("Cannot encode key for state store", e);
                }
            }
            byte[] byteArray = byteArrayOutputStream.toByteArray();
            byteArrayOutputStream.reset();
            dataOutputStream.write(byteArray.length);
            dataOutputStream.write(byteArray);
            return new SamzaStoreStateInternals(this.stores, k, byteArrayOutputStream.toByteArray(), this.stageId, this.batchGetSize);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals$SamzaAccumulatorCombiningState.class */
    private class SamzaAccumulatorCombiningState<InT, AccumT, OutT> extends SamzaStoreStateInternals<K>.AbstractSamzaState<AccumT> implements CombiningState<InT, AccumT, OutT> {
        private final Combine.CombineFn<InT, AccumT, OutT> combineFn;

        protected SamzaAccumulatorCombiningState(StateNamespace stateNamespace, StateTag<? extends State> stateTag, Coder<AccumT> coder, Combine.CombineFn<InT, AccumT, OutT> combineFn) {
            super(stateNamespace, stateTag, coder);
            this.combineFn = combineFn;
        }

        public void clear() {
            clearInternal();
        }

        public void add(InT r5) {
            writeInternal(this.combineFn.addInput(getAccum(), r5));
        }

        public ReadableState<Boolean> isEmpty() {
            return isEmptyInternal();
        }

        public AccumT getAccum() {
            AccumT readInternal = readInternal();
            return readInternal != null ? readInternal : (AccumT) this.combineFn.createAccumulator();
        }

        public void addAccum(AccumT accumt) {
            writeInternal(mergeAccumulators(Arrays.asList(getAccum(), accumt)));
        }

        public AccumT mergeAccumulators(Iterable<AccumT> iterable) {
            return (AccumT) this.combineFn.mergeAccumulators(iterable);
        }

        /* renamed from: readLater, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public CombiningState<InT, AccumT, OutT> m32readLater() {
            return this;
        }

        @Nonnull
        public OutT read() {
            AccumT accum = getAccum();
            OutT outt = (OutT) this.combineFn.extractOutput(accum);
            if (this.combineFn instanceof UpdatingCombineFn) {
                writeInternal(((UpdatingCombineFn) this.combineFn).updateAfterFiring(accum));
            }
            return outt;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals$SamzaBagState.class */
    private class SamzaBagState<T> extends SamzaStoreStateInternals<K>.AbstractSamzaState<T> implements BagState<T> {
        private SamzaBagState(StateNamespace stateNamespace, StateTag<? extends State> stateTag, Coder<T> coder) {
            super(stateNamespace, stateTag, coder);
        }

        public void add(T t) {
            synchronized (this.store) {
                int size = getSize();
                this.store.put(encodeKey(size), StateValue.of(t, this.coder));
                this.store.put(getEncodedStoreKey(), StateValue.of(Ints.toByteArray(size + 1)));
            }
        }

        public ReadableState<Boolean> isEmpty() {
            ReadableState<Boolean> isEmptyInternal;
            synchronized (this.store) {
                isEmptyInternal = isEmptyInternal();
            }
            return isEmptyInternal;
        }

        @Nonnull
        /* renamed from: read, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public List<T> m36read() {
            synchronized (this.store) {
                int size = getSize();
                if (size == 0) {
                    return Collections.emptyList();
                }
                ArrayList arrayList = new ArrayList(size);
                ArrayList arrayList2 = new ArrayList(size);
                int i = 0;
                while (i < size) {
                    int min = Math.min(size, i + SamzaStoreStateInternals.this.batchGetSize);
                    for (int i2 = i; i2 < min; i2++) {
                        arrayList2.add(encodeKey(i2));
                    }
                    this.store.getAll(arrayList2).values().forEach(stateValue -> {
                        arrayList.add(decodeValue(stateValue));
                    });
                    i += SamzaStoreStateInternals.this.batchGetSize;
                    arrayList2.clear();
                }
                return arrayList;
            }
        }

        /* renamed from: readLater, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public BagState<T> m35readLater() {
            return this;
        }

        public void clear() {
            synchronized (this.store) {
                int size = getSize();
                if (size != 0) {
                    ArrayList arrayList = new ArrayList(size);
                    for (int i = 0; i < size; i++) {
                        arrayList.add(encodeKey(i));
                    }
                    this.store.deleteAll(arrayList);
                    this.store.delete(getEncodedStoreKey());
                }
            }
        }

        private int getSize() {
            StateValue stateValue = (StateValue) this.store.get(getEncodedStoreKey());
            if (stateValue == null || stateValue.valueBytes == null) {
                return 0;
            }
            return Ints.fromByteArray(stateValue.valueBytes);
        }

        private ByteArray encodeKey(int i) {
            ByteArrayOutputStream access$700 = SamzaStoreStateInternals.access$700();
            try {
                DataOutputStream dataOutputStream = new DataOutputStream(access$700);
                Throwable th = null;
                try {
                    try {
                        dataOutputStream.write(getEncodedStoreKeyBytes());
                        dataOutputStream.writeInt(i);
                        ByteArray of = ByteArray.of(access$700.toByteArray());
                        if (0 != 0) {
                            try {
                                dataOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            dataOutputStream.close();
                        }
                        return of;
                    } finally {
                    }
                } finally {
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals$SamzaMapStateImpl.class */
    public class SamzaMapStateImpl<KeyT, ValueT> extends SamzaStoreStateInternals<K>.AbstractSamzaState<ValueT> implements SamzaMapState<KeyT, ValueT> {
        private final Coder<KeyT> keyCoder;
        private final int storeKeySize;
        private final List<KeyValueIterator<ByteArray, StateValue<ValueT>>> openIterators;
        private int maxKeySize;

        protected SamzaMapStateImpl(StateNamespace stateNamespace, StateTag<? extends State> stateTag, Coder<KeyT> coder, Coder<ValueT> coder2) {
            super(stateNamespace, stateTag, coder2);
            this.openIterators = Collections.synchronizedList(new ArrayList());
            this.keyCoder = coder;
            this.storeKeySize = getEncodedStoreKeyBytes().length;
            this.maxKeySize = this.storeKeySize + 100000;
        }

        public void put(KeyT keyt, ValueT valuet) {
            ByteArray encodeKey = encodeKey(keyt);
            this.maxKeySize = Math.max(this.maxKeySize, encodeKey.getValue().length);
            this.store.put(encodeKey, StateValue.of(valuet, this.coder));
        }

        public ReadableState<ValueT> computeIfAbsent(KeyT keyt, Function<? super KeyT, ? extends ValueT> function) {
            ValueT decodeValue = decodeValue((StateValue) this.store.get(encodeKey(keyt)));
            if (decodeValue == null) {
                put(keyt, function.apply(keyt));
            }
            if (decodeValue == null) {
                return null;
            }
            return ReadableStates.immediate(decodeValue);
        }

        public void remove(KeyT keyt) {
            this.store.delete(encodeKey(keyt));
        }

        public ReadableState<ValueT> get(KeyT keyt) {
            return getOrDefault(keyt, null);
        }

        public ReadableState<ValueT> getOrDefault(final KeyT keyt, final ValueT valuet) {
            return new ReadableState<ValueT>() { // from class: org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.SamzaMapStateImpl.1
                public ValueT read() {
                    ValueT decodeValue = SamzaMapStateImpl.this.decodeValue((StateValue) SamzaMapStateImpl.this.store.get(SamzaMapStateImpl.this.encodeKey(keyt)));
                    return decodeValue != null ? decodeValue : (ValueT) valuet;
                }

                public ReadableState<ValueT> readLater() {
                    return this;
                }
            };
        }

        public ReadableState<Iterable<KeyT>> keys() {
            return new ReadableState<Iterable<KeyT>>() { // from class: org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.SamzaMapStateImpl.2
                /* renamed from: read, reason: merged with bridge method [inline-methods] */
                public Iterable<KeyT> m37read() {
                    return SamzaMapStateImpl.this.createIterable(entry -> {
                        return SamzaMapStateImpl.this.decodeKey((ByteArray) entry.getKey());
                    });
                }

                public ReadableState<Iterable<KeyT>> readLater() {
                    return this;
                }

                private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                    String implMethodName = serializedLambda.getImplMethodName();
                    boolean z = -1;
                    switch (implMethodName.hashCode()) {
                        case -289152682:
                            if (implMethodName.equals("lambda$read$1ae5b8df$1")) {
                                z = false;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case false:
                            if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals$SamzaMapStateImpl$2") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/samza/storage/kv/Entry;)Ljava/lang/Object;")) {
                                AnonymousClass2 anonymousClass2 = (AnonymousClass2) serializedLambda.getCapturedArg(0);
                                return entry -> {
                                    return SamzaMapStateImpl.this.decodeKey((ByteArray) entry.getKey());
                                };
                            }
                            break;
                    }
                    throw new IllegalArgumentException("Invalid lambda deserialization");
                }
            };
        }

        public ReadableState<Iterable<ValueT>> values() {
            return new ReadableState<Iterable<ValueT>>() { // from class: org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.SamzaMapStateImpl.3
                /* renamed from: read, reason: merged with bridge method [inline-methods] */
                public Iterable<ValueT> m38read() {
                    return SamzaMapStateImpl.this.createIterable(entry -> {
                        return SamzaMapStateImpl.this.decodeValue((StateValue) entry.getValue());
                    });
                }

                public ReadableState<Iterable<ValueT>> readLater() {
                    return this;
                }

                private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                    String implMethodName = serializedLambda.getImplMethodName();
                    boolean z = -1;
                    switch (implMethodName.hashCode()) {
                        case 350889639:
                            if (implMethodName.equals("lambda$read$b400a7cd$1")) {
                                z = false;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case false:
                            if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals$SamzaMapStateImpl$3") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/samza/storage/kv/Entry;)Ljava/lang/Object;")) {
                                AnonymousClass3 anonymousClass3 = (AnonymousClass3) serializedLambda.getCapturedArg(0);
                                return entry -> {
                                    return SamzaMapStateImpl.this.decodeValue((StateValue) entry.getValue());
                                };
                            }
                            break;
                    }
                    throw new IllegalArgumentException("Invalid lambda deserialization");
                }
            };
        }

        public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>> entries() {
            return new ReadableState<Iterable<Map.Entry<KeyT, ValueT>>>() { // from class: org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.SamzaMapStateImpl.4
                /* renamed from: read, reason: merged with bridge method [inline-methods] */
                public Iterable<Map.Entry<KeyT, ValueT>> m39read() {
                    return SamzaMapStateImpl.this.createIterable(entry -> {
                        return new AbstractMap.SimpleEntry(SamzaMapStateImpl.this.decodeKey((ByteArray) entry.getKey()), SamzaMapStateImpl.this.decodeValue((StateValue) entry.getValue()));
                    });
                }

                public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>> readLater() {
                    return this;
                }

                private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                    String implMethodName = serializedLambda.getImplMethodName();
                    boolean z = -1;
                    switch (implMethodName.hashCode()) {
                        case 401155072:
                            if (implMethodName.equals("lambda$read$5cb2de36$1")) {
                                z = false;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case false:
                            if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals$SamzaMapStateImpl$4") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/samza/storage/kv/Entry;)Ljava/util/Map$Entry;")) {
                                AnonymousClass4 anonymousClass4 = (AnonymousClass4) serializedLambda.getCapturedArg(0);
                                return entry -> {
                                    return new AbstractMap.SimpleEntry(SamzaMapStateImpl.this.decodeKey((ByteArray) entry.getKey()), SamzaMapStateImpl.this.decodeValue((StateValue) entry.getValue()));
                                };
                            }
                            break;
                    }
                    throw new IllegalArgumentException("Invalid lambda deserialization");
                }
            };
        }

        public ReadableState<Boolean> isEmpty() {
            final ReadableState<Iterable<KeyT>> keys = keys();
            return new ReadableState<Boolean>() { // from class: org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.SamzaMapStateImpl.5
                /* renamed from: read, reason: merged with bridge method [inline-methods] */
                public Boolean m40read() {
                    return Boolean.valueOf(Iterables.isEmpty((Iterable) keys.read()));
                }

                public ReadableState<Boolean> readLater() {
                    keys.readLater();
                    return this;
                }
            };
        }

        @Override // org.apache.beam.runners.samza.state.SamzaMapState
        public ReadableState<Iterator<Map.Entry<KeyT, ValueT>>> readIterator() {
            final KeyValueIterator<ByteArray, StateValue<ValueT>> range = this.store.range(getEncodedStoreKey(), createMaxKey());
            this.openIterators.add(range);
            return new ReadableState<Iterator<Map.Entry<KeyT, ValueT>>>() { // from class: org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.SamzaMapStateImpl.6
                /* renamed from: read, reason: merged with bridge method [inline-methods] */
                public Iterator<Map.Entry<KeyT, ValueT>> m41read() {
                    return new Iterator<Map.Entry<KeyT, ValueT>>() { // from class: org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.SamzaMapStateImpl.6.1
                        @Override // java.util.Iterator
                        public boolean hasNext() {
                            boolean hasNext = range.hasNext();
                            if (!hasNext) {
                                range.close();
                                SamzaMapStateImpl.this.openIterators.remove(range);
                            }
                            return hasNext;
                        }

                        @Override // java.util.Iterator
                        public Map.Entry<KeyT, ValueT> next() {
                            Entry entry = (Entry) range.next();
                            return new AbstractMap.SimpleEntry(SamzaMapStateImpl.this.decodeKey((ByteArray) entry.getKey()), SamzaMapStateImpl.this.decodeValue((StateValue) entry.getValue()));
                        }
                    };
                }

                public ReadableState<Iterator<Map.Entry<KeyT, ValueT>>> readLater() {
                    return this;
                }
            };
        }

        /* JADX INFO: Access modifiers changed from: private */
        public <OutputT> Iterable<OutputT> createIterable(final SerializableFunction<Entry<ByteArray, StateValue<ValueT>>, OutputT> serializableFunction) {
            KeyValueIterator range = this.store.range(getEncodedStoreKey(), createMaxKey());
            final ImmutableList copyOf = ImmutableList.copyOf(range);
            range.close();
            return new Iterable<OutputT>() { // from class: org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.SamzaMapStateImpl.7
                @Override // java.lang.Iterable
                public Iterator<OutputT> iterator() {
                    final Iterator it = copyOf.iterator();
                    return new Iterator<OutputT>() { // from class: org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.SamzaMapStateImpl.7.1
                        @Override // java.util.Iterator
                        public boolean hasNext() {
                            return it.hasNext();
                        }

                        @Override // java.util.Iterator
                        public OutputT next() {
                            return (OutputT) serializableFunction.apply((Entry) it.next());
                        }
                    };
                }
            };
        }

        public void clear() {
            KeyValueIterator range = this.store.range(getEncodedStoreKey(), createMaxKey());
            while (range.hasNext()) {
                this.store.delete((ByteArray) ((Entry) range.next()).getKey());
            }
            range.close();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ByteArray encodeKey(KeyT keyt) {
            try {
                ByteArrayOutputStream access$700 = SamzaStoreStateInternals.access$700();
                access$700.write(getEncodedStoreKeyBytes());
                this.keyCoder.encode(keyt, access$700);
                return ByteArray.of(access$700.toByteArray());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public KeyT decodeKey(ByteArray byteArray) {
            try {
                return (KeyT) this.keyCoder.decode(new ByteArrayInputStream(Arrays.copyOfRange(byteArray.value, this.storeKeySize, byteArray.value.length)));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private ByteArray createMaxKey() {
            byte[] bArr = new byte[this.maxKeySize];
            Arrays.fill(bArr, (byte) -1);
            byte[] encodedStoreKeyBytes = getEncodedStoreKeyBytes();
            System.arraycopy(encodedStoreKeyBytes, 0, bArr, 0, encodedStoreKeyBytes.length);
            return ByteArray.of(bArr);
        }

        @Override // org.apache.beam.runners.samza.state.SamzaMapState
        public void closeIterators() {
            this.openIterators.forEach((v0) -> {
                v0.close();
            });
            this.openIterators.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals$SamzaSetStateImpl.class */
    public class SamzaSetStateImpl<T> implements SamzaSetState<T> {
        private final SamzaStoreStateInternals<K>.SamzaMapStateImpl<T, Boolean> mapState;

        private SamzaSetStateImpl(StateNamespace stateNamespace, StateTag<? extends State> stateTag, Coder<T> coder) {
            this.mapState = new SamzaMapStateImpl<>(stateNamespace, stateTag, coder, BooleanCoder.of());
        }

        public ReadableState<Boolean> contains(T t) {
            return this.mapState.get(t);
        }

        public ReadableState<Boolean> addIfAbsent(T t) {
            return this.mapState.putIfAbsent(t, true);
        }

        public void remove(T t) {
            this.mapState.remove(t);
        }

        public void add(T t) {
            this.mapState.put(t, true);
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>() { // from class: org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.SamzaSetStateImpl.1
                /* renamed from: read, reason: merged with bridge method [inline-methods] */
                public Boolean m45read() {
                    return Boolean.valueOf(Iterables.isEmpty((Iterable) SamzaSetStateImpl.this.mapState.entries().read()));
                }

                public ReadableState<Boolean> readLater() {
                    return this;
                }
            };
        }

        /* renamed from: read, reason: merged with bridge method [inline-methods] */
        public Iterable<T> m44read() {
            return (Iterable) this.mapState.keys().read();
        }

        /* renamed from: readLater, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public SetState<T> m43readLater() {
            return this;
        }

        public void clear() {
            this.mapState.clear();
        }

        @Override // org.apache.beam.runners.samza.state.SamzaSetState
        public ReadableState<Iterator<T>> readIterator() {
            final Iterator it = (Iterator) this.mapState.readIterator().read();
            return new ReadableState<Iterator<T>>() { // from class: org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.SamzaSetStateImpl.2
                /* renamed from: read, reason: merged with bridge method [inline-methods] */
                public Iterator<T> m46read() {
                    return new Iterator<T>() { // from class: org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.SamzaSetStateImpl.2.1
                        @Override // java.util.Iterator
                        public boolean hasNext() {
                            return it.hasNext();
                        }

                        @Override // java.util.Iterator
                        public T next() {
                            return (T) ((Map.Entry) it.next()).getKey();
                        }
                    };
                }

                public ReadableState<Iterator<T>> readLater() {
                    return this;
                }
            };
        }

        @Override // org.apache.beam.runners.samza.state.SamzaSetState
        public void closeIterators() {
            this.mapState.closeIterators();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals$SamzaValueState.class */
    private class SamzaValueState<T> extends SamzaStoreStateInternals<K>.AbstractSamzaState<T> implements ValueState<T> {
        private SamzaValueState(StateNamespace stateNamespace, StateTag<? extends State> stateTag, Coder<T> coder) {
            super(stateNamespace, stateTag, coder);
        }

        public void write(T t) {
            writeInternal(t);
        }

        public T read() {
            return readInternal();
        }

        /* renamed from: readLater, reason: merged with bridge method [inline-methods] */
        public ValueState<T> m47readLater() {
            return this;
        }

        public void clear() {
            clearInternal();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals$SamzaWatermarkHoldState.class */
    private class SamzaWatermarkHoldState extends SamzaStoreStateInternals<K>.AbstractSamzaState<Instant> implements WatermarkHoldState {
        private final TimestampCombiner timestampCombiner;

        public <V extends State> SamzaWatermarkHoldState(StateNamespace stateNamespace, StateTag<V> stateTag, TimestampCombiner timestampCombiner) {
            super(stateNamespace, stateTag, InstantCoder.of());
            this.timestampCombiner = timestampCombiner;
        }

        public void add(Instant instant) {
            Instant readInternal = readInternal();
            Instant combine = readInternal == null ? instant : this.timestampCombiner.combine(new Instant[]{readInternal, instant});
            if (combine.equals(readInternal)) {
                return;
            }
            writeInternal(combine);
        }

        public ReadableState<Boolean> isEmpty() {
            return isEmptyInternal();
        }

        /* renamed from: read, reason: merged with bridge method [inline-methods] */
        public Instant m50read() {
            return readInternal();
        }

        public TimestampCombiner getTimestampCombiner() {
            return this.timestampCombiner;
        }

        /* renamed from: readLater, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public WatermarkHoldState m49readLater() {
            return this;
        }

        public void clear() {
            clearInternal();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals$StateValue.class */
    public static class StateValue<T> implements Serializable {
        private T value;
        private Coder<T> valueCoder;
        private byte[] valueBytes;

        private StateValue(T t, Coder<T> coder, byte[] bArr) {
            this.value = t;
            this.valueCoder = coder;
            this.valueBytes = bArr;
        }

        public static <T> StateValue<T> of(T t, Coder<T> coder) {
            return new StateValue<>(t, coder, null);
        }

        public static <T> StateValue<T> of(byte[] bArr) {
            return new StateValue<>(null, null, bArr);
        }

        public T getValue(Coder<T> coder) {
            if (this.value == null && this.valueBytes != null) {
                if (this.valueCoder == null) {
                    this.valueCoder = coder;
                }
                try {
                    this.value = (T) this.valueCoder.decode(new ByteArrayInputStream(this.valueBytes));
                } catch (IOException e) {
                    throw new RuntimeException("Could not decode state", e);
                }
            }
            return this.value;
        }

        public byte[] getValueBytes() {
            if (this.valueBytes == null && this.value != null) {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                try {
                    this.valueCoder.encode(this.value, byteArrayOutputStream);
                    this.valueBytes = byteArrayOutputStream.toByteArray();
                } catch (IOException e) {
                    throw new RuntimeException("Could not encode state value: " + this.value, e);
                }
            }
            return this.valueBytes;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals$StateValueSerdeFactory.class */
    public static class StateValueSerdeFactory implements SerdeFactory<StateValue<?>> {

        /* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals$StateValueSerdeFactory$StateValueSerde.class */
        public static class StateValueSerde implements Serde<StateValue<?>> {
            /* renamed from: fromBytes, reason: merged with bridge method [inline-methods] */
            public StateValue<?> m51fromBytes(byte[] bArr) {
                return StateValue.of(bArr);
            }

            public byte[] toBytes(StateValue<?> stateValue) {
                if (stateValue == null) {
                    return null;
                }
                return stateValue.getValueBytes();
            }
        }

        public Serde<StateValue<?>> getSerde(String str, Config config) {
            return new StateValueSerde();
        }
    }

    private SamzaStoreStateInternals(Map<String, KeyValueStore<ByteArray, StateValue<?>>> map, K k, byte[] bArr, String str, int i) {
        this.stores = map;
        this.key = k;
        this.keyBytes = bArr;
        this.batchGetSize = i;
        this.stageId = str;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static KeyValueStore<ByteArray, StateValue<?>> getBeamStore(TaskContext taskContext) {
        return taskContext.getStore(BEAM_STORE);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K> Factory<K> createNonKeyedStateInternalsFactory(String str, TaskContext taskContext, SamzaPipelineOptions samzaPipelineOptions) {
        return createStateInternalsFactory(str, (Coder) null, taskContext, samzaPipelineOptions, Collections.emptySet());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K> Factory<K> createStateInternalsFactory(String str, Coder<K> coder, TaskContext taskContext, SamzaPipelineOptions samzaPipelineOptions, DoFnSignature doFnSignature) {
        return createStateInternalsFactory(str, coder, taskContext, samzaPipelineOptions, doFnSignature.stateDeclarations().keySet());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K> Factory<K> createStateInternalsFactory(String str, Coder<K> coder, TaskContext taskContext, SamzaPipelineOptions samzaPipelineOptions, ExecutableStage executableStage) {
        return createStateInternalsFactory(str, coder, taskContext, samzaPipelineOptions, (Set) executableStage.getUserStates().stream().map((v0) -> {
            return v0.localName();
        }).collect(Collectors.toSet()));
    }

    private static <K> Factory<K> createStateInternalsFactory(String str, Coder<K> coder, TaskContext taskContext, SamzaPipelineOptions samzaPipelineOptions, Collection<String> collection) {
        Coder<K> of;
        int storeBatchGetSize = samzaPipelineOptions.getStoreBatchGetSize();
        HashMap hashMap = new HashMap();
        hashMap.put(BEAM_STORE, getBeamStore(taskContext));
        if (coder != null) {
            collection.forEach(str2 -> {
            });
            of = coder;
        } else {
            of = VoidCoder.of();
        }
        return new Factory<>(Objects.toString(str), hashMap, of, storeBatchGetSize);
    }

    public K getKey() {
        return this.key;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <T extends State> T state(StateNamespace stateNamespace, StateTag<T> stateTag) {
        return (T) state(stateNamespace, stateTag, StateContexts.nullContext());
    }

    public <V extends State> V state(final StateNamespace stateNamespace, final StateTag<V> stateTag, StateContext<?> stateContext) {
        return (V) stateTag.bind(new StateTag.StateBinder() { // from class: org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.1
            public <T> ValueState<T> bindValue(StateTag<ValueState<T>> stateTag2, Coder<T> coder) {
                return new SamzaValueState(stateNamespace, stateTag, coder);
            }

            public <T> BagState<T> bindBag(StateTag<BagState<T>> stateTag2, Coder<T> coder) {
                return new SamzaBagState(stateNamespace, stateTag, coder);
            }

            public <T> SetState<T> bindSet(StateTag<SetState<T>> stateTag2, Coder<T> coder) {
                return new SamzaSetStateImpl(stateNamespace, stateTag, coder);
            }

            public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(StateTag<MapState<KeyT, ValueT>> stateTag2, Coder<KeyT> coder, Coder<ValueT> coder2) {
                return new SamzaMapStateImpl(stateNamespace, stateTag, coder, coder2);
            }

            public <T> OrderedListState<T> bindOrderedList(StateTag<OrderedListState<T>> stateTag2, Coder<T> coder) {
                throw new UnsupportedOperationException(String.format("%s is not supported", OrderedListState.class.getSimpleName()));
            }

            public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(StateTag<CombiningState<InputT, AccumT, OutputT>> stateTag2, Coder<AccumT> coder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
                return new SamzaAccumulatorCombiningState(stateNamespace, stateTag, coder, combineFn);
            }

            public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(StateTag<CombiningState<InputT, AccumT, OutputT>> stateTag2, Coder<AccumT> coder, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) {
                throw new UnsupportedOperationException(String.format("%s is not supported", CombiningState.class.getSimpleName()));
            }

            public WatermarkHoldState bindWatermark(StateTag<WatermarkHoldState> stateTag2, TimestampCombiner timestampCombiner) {
                return new SamzaWatermarkHoldState(stateNamespace, stateTag, timestampCombiner);
            }
        });
    }

    private static ByteArrayOutputStream getThreadLocalBaos() {
        SoftReference<ByteArrayOutputStream> softReference = threadLocalBaos.get();
        ByteArrayOutputStream byteArrayOutputStream = softReference == null ? null : softReference.get();
        if (byteArrayOutputStream == null) {
            byteArrayOutputStream = new ByteArrayOutputStream();
            threadLocalBaos.set(new SoftReference<>(byteArrayOutputStream));
        }
        byteArrayOutputStream.reset();
        return byteArrayOutputStream;
    }

    static /* synthetic */ ByteArrayOutputStream access$700() {
        return getThreadLocalBaos();
    }
}
