/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.state;

import java.io.OutputStream;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.stream.DataStreams;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.ByteString;
import org.apache.beam.vendor.sdk.v2.sdk.extensions.protobuf.ByteStringCoder;

public class StateRequestHandlers {
    public static StateRequestHandler delegateBasedUponType(EnumMap<BeamFnApi.StateKey.TypeCase, StateRequestHandler> handlers) {
        return new StateKeyTypeDelegatingStateRequestHandler(handlers);
    }

    public static StateRequestHandler forSideInputHandlerFactory(Map<String, Map<String, ProcessBundleDescriptors.SideInputSpec>> sideInputSpecs, SideInputHandlerFactory sideInputHandlerFactory) {
        return new StateRequestHandlerToSideInputHandlerFactoryAdapter(sideInputSpecs, sideInputHandlerFactory);
    }

    public static StateRequestHandler forBagUserStateHandlerFactory(ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor, BagUserStateHandlerFactory bagUserStateHandlerFactory) {
        return new ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter(processBundleDescriptor, bagUserStateHandlerFactory);
    }

    static class ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter
    implements StateRequestHandler {
        private final ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor;
        private final BagUserStateHandlerFactory handlerFactory;
        private final ConcurrentHashMap<ProcessBundleDescriptors.BagUserStateSpec, BagUserStateHandler> cache;

        ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter(ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor, BagUserStateHandlerFactory handlerFactory) {
            this.processBundleDescriptor = processBundleDescriptor;
            this.handlerFactory = handlerFactory;
            this.cache = new ConcurrentHashMap();
        }

        @Override
        public CompletionStage<BeamFnApi.StateResponse.Builder> handle(BeamFnApi.StateRequest request) throws Exception {
            try {
                Preconditions.checkState(BeamFnApi.StateKey.TypeCase.BAG_USER_STATE.equals((Object)request.getStateKey().getTypeCase()), "Unsupported %s type %s, expected %s", (Object)BeamFnApi.StateRequest.class.getSimpleName(), (Object)request.getStateKey().getTypeCase(), (Object)BeamFnApi.StateKey.TypeCase.BAG_USER_STATE);
                BeamFnApi.StateKey.BagUserState stateKey = request.getStateKey().getBagUserState();
                ProcessBundleDescriptors.BagUserStateSpec referenceSpec = this.processBundleDescriptor.getBagUserStateSpecs().get(stateKey.getPtransformId()).get(stateKey.getUserStateId());
                Preconditions.checkState(referenceSpec.keyCoder() instanceof ByteStringCoder, "This %s only supports the %s as the key coder.", (Object)BagUserStateHandlerFactory.class.getSimpleName(), (Object)ByteStringCoder.class.getSimpleName());
                Preconditions.checkState(referenceSpec.valueCoder() instanceof ByteStringCoder, "This %s only supports the %s as the value coder.", (Object)BagUserStateHandlerFactory.class.getSimpleName(), (Object)ByteStringCoder.class.getSimpleName());
                BagUserStateHandler handler = this.cache.computeIfAbsent(referenceSpec, this::createHandler);
                ByteString key = stateKey.getKey();
                BoundedWindow window = (BoundedWindow)referenceSpec.windowCoder().decode(stateKey.getWindow().newInput());
                switch (request.getRequestCase()) {
                    case GET: {
                        return ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter.handleGetRequest(request, key, window, handler);
                    }
                    case APPEND: {
                        return ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter.handleAppendRequest(request, key, window, handler);
                    }
                    case CLEAR: {
                        return ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter.handleClearRequest(request, key, window, handler);
                    }
                }
                throw new Exception(String.format("Unsupported request type %s for user state.", request.getRequestCase()));
            }
            catch (Exception e) {
                CompletableFuture<BeamFnApi.StateResponse.Builder> f = new CompletableFuture<BeamFnApi.StateResponse.Builder>();
                f.completeExceptionally(e);
                return f;
            }
        }

        private static <W extends BoundedWindow> CompletionStage<BeamFnApi.StateResponse.Builder> handleGetRequest(BeamFnApi.StateRequest request, ByteString key, W window, BagUserStateHandler<ByteString, ByteString, W> handler) {
            Preconditions.checkState(request.getGet().getContinuationToken().isEmpty(), "Continuation tokens are unsupported.");
            return CompletableFuture.completedFuture(BeamFnApi.StateResponse.newBuilder().setId(request.getId()).setGet(BeamFnApi.StateGetResponse.newBuilder().setData(ByteString.copyFrom(handler.get(key, window)))));
        }

        private static <W extends BoundedWindow> CompletionStage<BeamFnApi.StateResponse.Builder> handleAppendRequest(BeamFnApi.StateRequest request, ByteString key, W window, BagUserStateHandler<ByteString, ByteString, W> handler) {
            handler.append(key, window, ImmutableList.of(request.getAppend().getData()).iterator());
            return CompletableFuture.completedFuture(BeamFnApi.StateResponse.newBuilder().setId(request.getId()).setAppend(BeamFnApi.StateAppendResponse.getDefaultInstance()));
        }

        private static <W extends BoundedWindow> CompletionStage<BeamFnApi.StateResponse.Builder> handleClearRequest(BeamFnApi.StateRequest request, ByteString key, W window, BagUserStateHandler<ByteString, ByteString, W> handler) {
            handler.clear(key, window);
            return CompletableFuture.completedFuture(BeamFnApi.StateResponse.newBuilder().setId(request.getId()).setClear(BeamFnApi.StateClearResponse.getDefaultInstance()));
        }

        private <K, V, W extends BoundedWindow> BagUserStateHandler<K, V, W> createHandler(ProcessBundleDescriptors.BagUserStateSpec cacheKey) {
            return this.handlerFactory.forUserState(cacheKey.transformId(), cacheKey.userStateId(), cacheKey.keyCoder(), cacheKey.valueCoder(), cacheKey.windowCoder());
        }
    }

    static class StateRequestHandlerToSideInputHandlerFactoryAdapter
    implements StateRequestHandler {
        private final Map<String, Map<String, ProcessBundleDescriptors.SideInputSpec>> sideInputSpecs;
        private final SideInputHandlerFactory sideInputHandlerFactory;
        private final ConcurrentHashMap<ProcessBundleDescriptors.SideInputSpec, SideInputHandler> cache;

        StateRequestHandlerToSideInputHandlerFactoryAdapter(Map<String, Map<String, ProcessBundleDescriptors.SideInputSpec>> sideInputSpecs, SideInputHandlerFactory sideInputHandlerFactory) {
            this.sideInputSpecs = sideInputSpecs;
            this.sideInputHandlerFactory = sideInputHandlerFactory;
            this.cache = new ConcurrentHashMap();
        }

        @Override
        public CompletionStage<BeamFnApi.StateResponse.Builder> handle(BeamFnApi.StateRequest request) throws Exception {
            try {
                Preconditions.checkState(BeamFnApi.StateKey.TypeCase.MULTIMAP_SIDE_INPUT.equals((Object)request.getStateKey().getTypeCase()), "Unsupported %s type %s, expected %s", (Object)BeamFnApi.StateRequest.class.getSimpleName(), (Object)request.getStateKey().getTypeCase(), (Object)BeamFnApi.StateKey.TypeCase.MULTIMAP_SIDE_INPUT);
                BeamFnApi.StateKey.MultimapSideInput stateKey = request.getStateKey().getMultimapSideInput();
                ProcessBundleDescriptors.SideInputSpec referenceSpec = this.sideInputSpecs.get(stateKey.getPtransformId()).get(stateKey.getSideInputId());
                SideInputHandler handler = this.cache.computeIfAbsent(referenceSpec, this::createHandler);
                switch (request.getRequestCase()) {
                    case GET: {
                        return this.handleGetRequest(request, handler);
                    }
                }
                throw new Exception(String.format("Unsupported request type %s for side input.", request.getRequestCase()));
            }
            catch (Exception e) {
                CompletableFuture<BeamFnApi.StateResponse.Builder> f = new CompletableFuture<BeamFnApi.StateResponse.Builder>();
                f.completeExceptionally(e);
                return f;
            }
        }

        private <K, V, W extends BoundedWindow> CompletionStage<BeamFnApi.StateResponse.Builder> handleGetRequest(BeamFnApi.StateRequest request, SideInputHandler<V, W> handler) throws Exception {
            Preconditions.checkState(request.getGet().getContinuationToken().isEmpty(), "Continuation tokens are unsupported.");
            BeamFnApi.StateKey.MultimapSideInput stateKey = request.getStateKey().getMultimapSideInput();
            ProcessBundleDescriptors.SideInputSpec sideInputReferenceSpec = this.sideInputSpecs.get(stateKey.getPtransformId()).get(stateKey.getSideInputId());
            BoundedWindow window = (BoundedWindow)sideInputReferenceSpec.windowCoder().decode(stateKey.getWindow().newInput());
            Iterable<V> values = handler.get(stateKey.getKey().toByteArray(), window);
            ArrayList encodedValues = new ArrayList();
            DataStreams.ElementDelimitedOutputStream outputStream = DataStreams.outbound(encodedValues::add);
            for (V value : values) {
                handler.resultCoder().encode(value, (OutputStream)outputStream);
                outputStream.delimitElement();
            }
            outputStream.close();
            BeamFnApi.StateResponse.Builder response = BeamFnApi.StateResponse.newBuilder();
            response.setId(request.getId());
            response.setGet(BeamFnApi.StateGetResponse.newBuilder().setData(ByteString.copyFrom(encodedValues)).build());
            return CompletableFuture.completedFuture(response);
        }

        private <K, V, W extends BoundedWindow> SideInputHandler<V, W> createHandler(ProcessBundleDescriptors.SideInputSpec cacheKey) {
            return this.sideInputHandlerFactory.forSideInput(cacheKey.transformId(), cacheKey.sideInputId(), cacheKey.accessPattern(), cacheKey.elementCoder(), cacheKey.windowCoder());
        }
    }

    static class StateKeyTypeDelegatingStateRequestHandler
    implements StateRequestHandler {
        private final EnumMap<BeamFnApi.StateKey.TypeCase, StateRequestHandler> handlers;

        StateKeyTypeDelegatingStateRequestHandler(EnumMap<BeamFnApi.StateKey.TypeCase, StateRequestHandler> handlers) {
            this.handlers = handlers;
        }

        @Override
        public CompletionStage<BeamFnApi.StateResponse.Builder> handle(BeamFnApi.StateRequest request) throws Exception {
            return this.handlers.getOrDefault(request.getStateKey().getTypeCase(), this::handlerNotFound).handle(request);
        }

        private CompletionStage<BeamFnApi.StateResponse.Builder> handlerNotFound(BeamFnApi.StateRequest request) {
            CompletableFuture<BeamFnApi.StateResponse.Builder> rval = new CompletableFuture<BeamFnApi.StateResponse.Builder>();
            rval.completeExceptionally(new IllegalStateException());
            return rval;
        }
    }

    @ThreadSafe
    public static interface BagUserStateHandlerFactory {
        public <K, V, W extends BoundedWindow> BagUserStateHandler<K, V, W> forUserState(String var1, String var2, Coder<K> var3, Coder<V> var4, Coder<W> var5);

        public static BagUserStateHandlerFactory unsupported() {
            return new BagUserStateHandlerFactory(){

                @Override
                public <K, V, W extends BoundedWindow> BagUserStateHandler<K, V, W> forUserState(String pTransformId, String userStateId, Coder<K> keyCoder, Coder<V> valueCoder, Coder<W> windowCoder) {
                    throw new UnsupportedOperationException(String.format("The %s does not support handling sides inputs for PTransform %s with user state id %s.", BagUserStateHandler.class.getSimpleName(), pTransformId, userStateId));
                }
            };
        }
    }

    @ThreadSafe
    public static interface BagUserStateHandler<K, V, W extends BoundedWindow> {
        public Iterable<V> get(K var1, W var2);

        public void append(K var1, W var2, Iterator<V> var3);

        public void clear(K var1, W var2);
    }

    @ThreadSafe
    public static interface SideInputHandlerFactory {
        public <T, V, W extends BoundedWindow> SideInputHandler<V, W> forSideInput(String var1, String var2, RunnerApi.FunctionSpec var3, Coder<T> var4, Coder<W> var5);

        public static SideInputHandlerFactory unsupported() {
            return new SideInputHandlerFactory(){

                @Override
                public <T, V, W extends BoundedWindow> SideInputHandler<V, W> forSideInput(String pTransformId, String sideInputId, RunnerApi.FunctionSpec accessPattern, Coder<T> elementCoder, Coder<W> windowCoder) {
                    throw new UnsupportedOperationException(String.format("The %s does not support handling sides inputs for PTransform %s with side input id %s.", SideInputHandler.class.getSimpleName(), pTransformId, sideInputId));
                }
            };
        }
    }

    @ThreadSafe
    public static interface SideInputHandler<V, W extends BoundedWindow> {
        public Iterable<V> get(byte[] var1, W var2);

        public Coder<V> resultCoder();
    }
}

