package org.apache.beam.runners.fnexecution.control;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.fnexecution.data.FnDataService;
import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
import org.apache.beam.runners.fnexecution.state.StateDelegator;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/fnexecution/control/SdkHarnessClient.class */
public class SdkHarnessClient implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(SdkHarnessClient.class);
    private final IdGenerator idGenerator;
    private final InstructionRequestHandler fnApiControlClient;
    private final FnDataService fnApiDataService;
    private final ConcurrentHashMap<String, BundleProcessor> clientProcessors = new ConcurrentHashMap<>();

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/SdkHarnessClient$BundleProcessor.class */
    public class BundleProcessor {
        private final BeamFnApi.ProcessBundleDescriptor processBundleDescriptor;
        private final CompletionStage<BeamFnApi.RegisterResponse> registrationFuture;
        private final Map<String, RemoteInputDestination> remoteInputs;
        private final StateDelegator stateDelegator;

        /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/SdkHarnessClient$BundleProcessor$ActiveBundle.class */
        public class ActiveBundle implements RemoteBundle {
            private final String bundleId;
            private final CompletionStage<BeamFnApi.ProcessBundleResponse> response;
            private final Map<String, CountingFnDataReceiver> inputReceivers;
            private final Map<String, InboundDataClient> outputClients;
            private final StateDelegator.Registration stateRegistration;
            private final BundleProgressHandler progressHandler;
            private final BundleSplitHandler splitHandler;
            private final BundleCheckpointHandler checkpointHandler;
            private final BundleFinalizationHandler finalizationHandler;

            private ActiveBundle(String str, CompletionStage<BeamFnApi.ProcessBundleResponse> completionStage, Map<String, CountingFnDataReceiver> map, Map<String, InboundDataClient> map2, StateDelegator.Registration registration, BundleProgressHandler bundleProgressHandler, BundleSplitHandler bundleSplitHandler, BundleCheckpointHandler bundleCheckpointHandler, BundleFinalizationHandler bundleFinalizationHandler) {
                this.bundleId = str;
                this.response = completionStage;
                this.inputReceivers = map;
                this.outputClients = map2;
                this.stateRegistration = registration;
                this.progressHandler = bundleProgressHandler;
                this.splitHandler = bundleSplitHandler;
                this.checkpointHandler = bundleCheckpointHandler;
                this.finalizationHandler = bundleFinalizationHandler;
            }

            @Override // org.apache.beam.runners.fnexecution.control.RemoteBundle
            public String getId() {
                return this.bundleId;
            }

            @Override // org.apache.beam.runners.fnexecution.control.RemoteBundle
            public Map<String, FnDataReceiver> getInputReceivers() {
                return this.inputReceivers;
            }

            @Override // org.apache.beam.runners.fnexecution.control.RemoteBundle
            public void split(double d) {
                HashMap hashMap = new HashMap();
                for (Map.Entry<String, CountingFnDataReceiver> entry : this.inputReceivers.entrySet()) {
                    hashMap.put(entry.getKey(), BeamFnApi.ProcessBundleSplitRequest.DesiredSplit.newBuilder().setFractionOfRemainder(d).setEstimatedInputElements(entry.getValue().getCount()).build());
                }
                SdkHarnessClient.this.fnApiControlClient.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(SdkHarnessClient.this.idGenerator.getId()).setProcessBundleSplit(BeamFnApi.ProcessBundleSplitRequest.newBuilder().setInstructionId(this.bundleId).putAllDesiredSplits(hashMap).build()).build()).thenAccept(instructionResponse -> {
                    this.splitHandler.split(instructionResponse.getProcessBundleSplit());
                });
            }

            @Override // org.apache.beam.runners.fnexecution.control.RemoteBundle, java.lang.AutoCloseable
            public void close() throws Exception {
                Exception exc = null;
                Iterator<CountingFnDataReceiver> it = this.inputReceivers.values().iterator();
                while (it.hasNext()) {
                    try {
                        it.next().close();
                    } catch (Exception e) {
                        if (exc == null) {
                            exc = e;
                        } else {
                            exc.addSuppressed(e);
                        }
                    }
                }
                try {
                } catch (Exception e2) {
                    if (exc == null) {
                        exc = e2;
                    } else {
                        exc.addSuppressed(e2);
                    }
                }
                if (exc != null) {
                    throw new IllegalStateException("Processing bundle failed, TODO: [BEAM-3962] abort bundle.");
                }
                BeamFnApi.ProcessBundleResponse processBundleResponse = (BeamFnApi.ProcessBundleResponse) MoreFutures.get(this.response);
                this.progressHandler.onCompleted(processBundleResponse);
                if (processBundleResponse.getResidualRootsCount() > 0) {
                    this.checkpointHandler.onCheckpoint(processBundleResponse);
                }
                if (processBundleResponse.getRequiresFinalization()) {
                    this.finalizationHandler.requestsFinalization(this.bundleId);
                }
                try {
                    if (exc == null) {
                        this.stateRegistration.deregister();
                    } else {
                        this.stateRegistration.abort();
                    }
                } catch (Exception e3) {
                    if (exc == null) {
                        exc = e3;
                    } else {
                        exc.addSuppressed(e3);
                    }
                }
                for (InboundDataClient inboundDataClient : this.outputClients.values()) {
                    if (exc == null) {
                        try {
                            inboundDataClient.awaitCompletion();
                        } catch (Exception e4) {
                            if (exc == null) {
                                exc = e4;
                            } else {
                                exc.addSuppressed(e4);
                            }
                        }
                    } else {
                        inboundDataClient.cancel();
                    }
                }
                if (exc != null) {
                    throw exc;
                }
            }
        }

        private BundleProcessor(BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, CompletionStage<BeamFnApi.RegisterResponse> completionStage, Map<String, RemoteInputDestination> map, StateDelegator stateDelegator) {
            this.processBundleDescriptor = processBundleDescriptor;
            this.registrationFuture = completionStage;
            this.remoteInputs = map;
            this.stateDelegator = stateDelegator;
        }

        public CompletionStage<BeamFnApi.RegisterResponse> getRegistrationFuture() {
            return this.registrationFuture;
        }

        public ActiveBundle newBundle(Map<String, RemoteOutputReceiver<?>> map, BundleProgressHandler bundleProgressHandler) {
            return newBundle(map, stateRequest -> {
                throw new UnsupportedOperationException(String.format("The %s does not have a registered state handler.", ActiveBundle.class.getSimpleName()));
            }, bundleProgressHandler);
        }

        public ActiveBundle newBundle(Map<String, RemoteOutputReceiver<?>> map, StateRequestHandler stateRequestHandler, BundleProgressHandler bundleProgressHandler) {
            return newBundle(map, stateRequestHandler, bundleProgressHandler, BundleSplitHandler.unsupported(), processBundleResponse -> {
                throw new UnsupportedOperationException(String.format("The %s does not have a registered bundle checkpoint handler.", ActiveBundle.class.getSimpleName()));
            }, str -> {
                throw new UnsupportedOperationException(String.format("The %s does not have a registered bundle finalization handler.", ActiveBundle.class.getSimpleName()));
            });
        }

        public ActiveBundle newBundle(Map<String, RemoteOutputReceiver<?>> map, StateRequestHandler stateRequestHandler, BundleProgressHandler bundleProgressHandler, BundleSplitHandler bundleSplitHandler, BundleCheckpointHandler bundleCheckpointHandler, BundleFinalizationHandler bundleFinalizationHandler) {
            String id = SdkHarnessClient.this.idGenerator.getId();
            CompletionStage<U> thenCompose = this.registrationFuture.thenCompose(registerResponse -> {
                return SdkHarnessClient.this.fnApiControlClient.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId(this.processBundleDescriptor.getId()).addAllCacheTokens(stateRequestHandler.getCacheTokens())).build());
            });
            SdkHarnessClient.LOG.debug("Sent {} with ID {} for {} with ID {}", new Object[]{BeamFnApi.ProcessBundleRequest.class.getSimpleName(), id, BeamFnApi.ProcessBundleDescriptor.class.getSimpleName(), this.processBundleDescriptor.getId()});
            CompletionStage thenApply = thenCompose.thenApply((v0) -> {
                return v0.getProcessBundle();
            });
            HashMap hashMap = new HashMap();
            for (Map.Entry<String, RemoteOutputReceiver<?>> entry : map.entrySet()) {
                hashMap.put(entry.getKey(), attachReceiver(id, entry.getKey(), entry.getValue()));
            }
            ImmutableMap.Builder builder = ImmutableMap.builder();
            for (Map.Entry<String, RemoteInputDestination> entry2 : this.remoteInputs.entrySet()) {
                builder.put(entry2.getKey(), new CountingFnDataReceiver(SdkHarnessClient.this.fnApiDataService.send(LogicalEndpoint.of(id, entry2.getValue().getPTransformId()), entry2.getValue().getCoder())));
            }
            return new ActiveBundle(id, thenApply, builder.build(), hashMap, this.stateDelegator.registerForProcessBundleInstructionId(id, stateRequestHandler), bundleProgressHandler, bundleSplitHandler, bundleCheckpointHandler, bundleFinalizationHandler);
        }

        private <OutputT> InboundDataClient attachReceiver(String str, String str2, RemoteOutputReceiver<OutputT> remoteOutputReceiver) {
            return SdkHarnessClient.this.fnApiDataService.receive(LogicalEndpoint.of(str, str2), remoteOutputReceiver.getCoder(), remoteOutputReceiver.getReceiver());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/SdkHarnessClient$CountingFnDataReceiver.class */
    public static class CountingFnDataReceiver<T> implements CloseableFnDataReceiver<T> {
        private final CloseableFnDataReceiver delegate;
        private long count;

        private CountingFnDataReceiver(CloseableFnDataReceiver closeableFnDataReceiver) {
            this.delegate = closeableFnDataReceiver;
        }

        public long getCount() {
            return this.count;
        }

        @Override // org.apache.beam.sdk.fn.data.FnDataReceiver
        public void accept(T t) throws Exception {
            this.count++;
            this.delegate.accept(t);
        }

        @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver
        public void flush() throws Exception {
            this.delegate.flush();
        }

        @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver, java.lang.AutoCloseable
        public void close() throws Exception {
            this.delegate.close();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/SdkHarnessClient$NoOpStateDelegator.class */
    private static class NoOpStateDelegator implements StateDelegator {
        private static final NoOpStateDelegator INSTANCE = new NoOpStateDelegator();

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/SdkHarnessClient$NoOpStateDelegator$Registration.class */
        public static class Registration implements StateDelegator.Registration {
            private static final Registration INSTANCE = new Registration();

            private Registration() {
            }

            @Override // org.apache.beam.runners.fnexecution.state.StateDelegator.Registration
            public void deregister() {
            }

            @Override // org.apache.beam.runners.fnexecution.state.StateDelegator.Registration
            public void abort() {
            }
        }

        private NoOpStateDelegator() {
        }

        @Override // org.apache.beam.runners.fnexecution.state.StateDelegator
        public Registration registerForProcessBundleInstructionId(String str, StateRequestHandler stateRequestHandler) {
            return Registration.INSTANCE;
        }
    }

    private SdkHarnessClient(InstructionRequestHandler instructionRequestHandler, FnDataService fnDataService, IdGenerator idGenerator) {
        this.fnApiDataService = fnDataService;
        this.idGenerator = idGenerator;
        this.fnApiControlClient = instructionRequestHandler;
    }

    public static SdkHarnessClient usingFnApiClient(InstructionRequestHandler instructionRequestHandler, FnDataService fnDataService) {
        return new SdkHarnessClient(instructionRequestHandler, fnDataService, IdGenerators.incrementingLongs());
    }

    public SdkHarnessClient withIdGenerator(IdGenerator idGenerator) {
        return new SdkHarnessClient(this.fnApiControlClient, this.fnApiDataService, idGenerator);
    }

    public BundleProcessor getProcessor(BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, Map<String, RemoteInputDestination> map) {
        Preconditions.checkState(!processBundleDescriptor.hasStateApiServiceDescriptor(), "The %s cannot support a %s containing a state %s.", BundleProcessor.class.getSimpleName(), BeamFnApi.ProcessBundleDescriptor.class.getSimpleName(), Endpoints.ApiServiceDescriptor.class.getSimpleName());
        return getProcessor(processBundleDescriptor, map, NoOpStateDelegator.INSTANCE);
    }

    public BundleProcessor getProcessor(BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, Map<String, RemoteInputDestination> map, StateDelegator stateDelegator) {
        BundleProcessor computeIfAbsent = this.clientProcessors.computeIfAbsent(processBundleDescriptor.getId(), str -> {
            return create(processBundleDescriptor, map, stateDelegator);
        });
        Preconditions.checkArgument(computeIfAbsent.processBundleDescriptor.equals(processBundleDescriptor), "The provided %s with id %s collides with an existing %s with the same id but containing different contents.", BeamFnApi.ProcessBundleDescriptor.class.getSimpleName(), processBundleDescriptor.getId(), BeamFnApi.ProcessBundleDescriptor.class.getSimpleName());
        return computeIfAbsent;
    }

    private BundleProcessor create(BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, Map<String, RemoteInputDestination> map, StateDelegator stateDelegator) {
        LOG.debug("Registering {}", processBundleDescriptor);
        return new BundleProcessor(processBundleDescriptor, this.fnApiControlClient.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(this.idGenerator.getId()).setRegister(BeamFnApi.RegisterRequest.newBuilder().addProcessBundleDescriptor(processBundleDescriptor).build()).build()).thenApply((v0) -> {
            return v0.getRegister();
        }), map, stateDelegator);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
    }
}
