package org.apache.flink.statefun.sdk.java.storage;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
import org.apache.flink.statefun.sdk.java.ValueSpec;
import org.apache.flink.statefun.sdk.java.annotations.Internal;
import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
import org.apache.flink.statefun.sdk.java.storage.StateValueContexts;
import org.apache.flink.statefun.sdk.java.types.TypeCharacteristics;
import org.apache.flink.statefun.sdk.java.types.TypeSerializer;
import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.apache.flink.statefun.sdk.shaded.com.google.protobuf.ByteString;

@Internal
/* loaded from: input_file:org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorage.class */
public final class ConcurrentAddressScopedStorage implements AddressScopedStorage {
    private final List<Cell<?>> cells;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorage$Cell.class */
    public interface Cell<T> {
        Optional<T> get();

        void set(T t);

        void remove();

        Optional<FromFunction.PersistedValueMutation> toProtocolValueMutation();

        ValueSpec<T> spec();
    }

    /* loaded from: input_file:org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorage$CellStatus.class */
    private enum CellStatus {
        UNMODIFIED,
        MODIFIED,
        DELETED
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorage$ImmutableTypeCell.class */
    public static final class ImmutableTypeCell<T> implements Cell<T> {
        private final ValueSpec<T> spec;
        private final TypedValue typedValue;
        private final TypeSerializer<T> serializer;
        private T cachedObject;
        private final ReentrantLock lock = new ReentrantLock();
        private CellStatus status = CellStatus.UNMODIFIED;

        public ImmutableTypeCell(ValueSpec<T> valueSpec, TypedValue typedValue) {
            this.spec = valueSpec;
            this.typedValue = typedValue;
            this.serializer = (TypeSerializer) Objects.requireNonNull(valueSpec.type().typeSerializer());
        }

        @Override // org.apache.flink.statefun.sdk.java.storage.ConcurrentAddressScopedStorage.Cell
        public Optional<T> get() {
            this.lock.lock();
            try {
                if (this.status == CellStatus.DELETED) {
                    return Optional.empty();
                }
                if (this.cachedObject != null) {
                    return Optional.of(this.cachedObject);
                }
                Optional<T> tryDeserialize = ConcurrentAddressScopedStorage.tryDeserialize(this.serializer, this.typedValue);
                tryDeserialize.ifPresent(obj -> {
                    this.cachedObject = obj;
                });
                return tryDeserialize;
            } finally {
                this.lock.unlock();
            }
        }

        @Override // org.apache.flink.statefun.sdk.java.storage.ConcurrentAddressScopedStorage.Cell
        public void set(T t) {
            if (t == null) {
                throw new IllegalStorageAccessException(this.spec.name(), "Can not set state to NULL. Please use remove() instead.");
            }
            this.lock.lock();
            try {
                this.cachedObject = t;
                this.status = CellStatus.MODIFIED;
            } finally {
                this.lock.unlock();
            }
        }

        @Override // org.apache.flink.statefun.sdk.java.storage.ConcurrentAddressScopedStorage.Cell
        public void remove() {
            this.lock.lock();
            try {
                this.cachedObject = null;
                this.status = CellStatus.DELETED;
            } finally {
                this.lock.unlock();
            }
        }

        @Override // org.apache.flink.statefun.sdk.java.storage.ConcurrentAddressScopedStorage.Cell
        public Optional<FromFunction.PersistedValueMutation> toProtocolValueMutation() {
            String asTypeNameString = this.spec.typeName().asTypeNameString();
            switch (this.status) {
                case MODIFIED:
                    return Optional.of(FromFunction.PersistedValueMutation.newBuilder().setStateName(this.spec.name()).setMutationType(FromFunction.PersistedValueMutation.MutationType.MODIFY).setStateValue(TypedValue.newBuilder().setTypename(asTypeNameString).setHasValue(true).setValue(ConcurrentAddressScopedStorage.serialize(this.serializer, this.cachedObject))).build());
                case DELETED:
                    return Optional.of(FromFunction.PersistedValueMutation.newBuilder().setStateName(this.spec.name()).setMutationType(FromFunction.PersistedValueMutation.MutationType.DELETE).build());
                case UNMODIFIED:
                    return Optional.empty();
                default:
                    throw new IllegalStateException("Unknown cell status: " + this.status);
            }
        }

        @Override // org.apache.flink.statefun.sdk.java.storage.ConcurrentAddressScopedStorage.Cell
        public ValueSpec<T> spec() {
            return this.spec;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorage$MutableTypeCell.class */
    public static final class MutableTypeCell<T> implements Cell<T> {
        private final ReentrantLock lock;
        private final TypeSerializer<T> serializer;
        private final ValueSpec<T> spec;
        private TypedValue typedValue;
        private CellStatus status;

        private MutableTypeCell(ValueSpec<T> valueSpec, TypedValue typedValue) {
            this.lock = new ReentrantLock();
            this.status = CellStatus.UNMODIFIED;
            this.spec = valueSpec;
            this.typedValue = typedValue;
            this.serializer = (TypeSerializer) Objects.requireNonNull(valueSpec.type().typeSerializer());
        }

        @Override // org.apache.flink.statefun.sdk.java.storage.ConcurrentAddressScopedStorage.Cell
        public Optional<T> get() {
            this.lock.lock();
            try {
                return this.status == CellStatus.DELETED ? Optional.empty() : ConcurrentAddressScopedStorage.tryDeserialize(this.serializer, this.typedValue);
            } finally {
                this.lock.unlock();
            }
        }

        @Override // org.apache.flink.statefun.sdk.java.storage.ConcurrentAddressScopedStorage.Cell
        public void set(T t) {
            if (t == null) {
                throw new IllegalStorageAccessException(this.spec.name(), "Can not set state to NULL. Please use remove() instead.");
            }
            this.lock.lock();
            try {
                this.typedValue = this.typedValue.toBuilder().setHasValue(true).setValue(ConcurrentAddressScopedStorage.serialize(this.serializer, t)).build();
                this.status = CellStatus.MODIFIED;
            } finally {
                this.lock.unlock();
            }
        }

        @Override // org.apache.flink.statefun.sdk.java.storage.ConcurrentAddressScopedStorage.Cell
        public void remove() {
            this.lock.lock();
            try {
                this.status = CellStatus.DELETED;
            } finally {
                this.lock.unlock();
            }
        }

        @Override // org.apache.flink.statefun.sdk.java.storage.ConcurrentAddressScopedStorage.Cell
        public Optional<FromFunction.PersistedValueMutation> toProtocolValueMutation() {
            switch (this.status) {
                case MODIFIED:
                    return Optional.of(FromFunction.PersistedValueMutation.newBuilder().setStateName(this.spec.name()).setMutationType(FromFunction.PersistedValueMutation.MutationType.MODIFY).setStateValue(this.typedValue).build());
                case DELETED:
                    return Optional.of(FromFunction.PersistedValueMutation.newBuilder().setStateName(this.spec.name()).setMutationType(FromFunction.PersistedValueMutation.MutationType.DELETE).build());
                case UNMODIFIED:
                    return Optional.empty();
                default:
                    throw new IllegalStateException("Unknown cell status: " + this.status);
            }
        }

        @Override // org.apache.flink.statefun.sdk.java.storage.ConcurrentAddressScopedStorage.Cell
        public ValueSpec<T> spec() {
            return this.spec;
        }
    }

    public ConcurrentAddressScopedStorage(List<StateValueContexts.StateValueContext<?>> list) {
        this.cells = createCells(list);
    }

    @Override // org.apache.flink.statefun.sdk.java.AddressScopedStorage
    public <T> Optional<T> get(ValueSpec<T> valueSpec) {
        return getCellOrThrow(valueSpec).get();
    }

    @Override // org.apache.flink.statefun.sdk.java.AddressScopedStorage
    public <T> void set(ValueSpec<T> valueSpec, T t) {
        getCellOrThrow(valueSpec).set(t);
    }

    @Override // org.apache.flink.statefun.sdk.java.AddressScopedStorage
    public <T> void remove(ValueSpec<T> valueSpec) {
        getCellOrThrow(valueSpec).remove();
    }

    private <T> Cell<T> getCellOrThrow(ValueSpec<T> valueSpec) {
        Iterator<Cell<?>> it = this.cells.iterator();
        while (it.hasNext()) {
            Cell<T> cell = (Cell) it.next();
            if (valueSpec == cell.spec()) {
                return cell;
            }
        }
        return slowGetCellOrThrow(valueSpec);
    }

    private <T> Cell<T> slowGetCellOrThrow(ValueSpec<T> valueSpec) {
        Iterator<Cell<?>> it = this.cells.iterator();
        while (it.hasNext()) {
            Cell<T> cell = (Cell) it.next();
            ValueSpec<T> spec = cell.spec();
            if (spec.name().equals(valueSpec.name())) {
                if (spec.typeName().equals(valueSpec.typeName())) {
                    return cell;
                }
                throw new IllegalStorageAccessException(valueSpec.name(), "Accessed state with incorrect type; state type was registered as " + spec.typeName() + ", but was accessed as type " + valueSpec.typeName());
            }
        }
        throw new IllegalStorageAccessException(valueSpec.name(), "State does not exist; make sure that this state was registered.");
    }

    public void addMutations(Consumer<FromFunction.PersistedValueMutation> consumer) {
        Iterator<Cell<?>> it = this.cells.iterator();
        while (it.hasNext()) {
            it.next().toProtocolValueMutation().ifPresent(consumer);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> Optional<T> tryDeserialize(TypeSerializer<T> typeSerializer, TypedValue typedValue) {
        return !typedValue.getHasValue() ? Optional.empty() : Optional.ofNullable(typeSerializer.deserialize(SliceProtobufUtil.asSlice(typedValue.getValue())));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> ByteString serialize(TypeSerializer<T> typeSerializer, T t) {
        return SliceProtobufUtil.asByteString(typeSerializer.serialize(t));
    }

    private static List<Cell<?>> createCells(List<StateValueContexts.StateValueContext<?>> list) {
        ArrayList arrayList = new ArrayList(list.size());
        for (StateValueContexts.StateValueContext<?> stateValueContext : list) {
            TypedValue stateValue = stateValueContext.protocolValue().getStateValue();
            ValueSpec<?> spec = stateValueContext.spec();
            arrayList.add(spec.type().typeCharacteristics().contains(TypeCharacteristics.IMMUTABLE_VALUES) ? new ImmutableTypeCell(spec, stateValue) : new MutableTypeCell(spec, stateValue));
        }
        return arrayList;
    }
}
