package org.apache.beam.runners.fnexecution.control;

import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.runners.fnexecution.data.FnDataService;
import org.apache.beam.runners.java.fn.execution.repackaged.com.google.common.cache.Cache;
import org.apache.beam.runners.java.fn.execution.repackaged.com.google.common.cache.CacheBuilder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.util.WindowedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/fnexecution/control/SdkHarnessClient.class */
public class SdkHarnessClient implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(SdkHarnessClient.class);
    private final IdGenerator idGenerator;
    private final InstructionRequestHandler fnApiControlClient;
    private final FnDataService fnApiDataService;
    private final Cache<String, BundleProcessor> clientProcessors = CacheBuilder.newBuilder().build();

    @AutoValue
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/SdkHarnessClient$ActiveBundle.class */
    public static abstract class ActiveBundle<InputT> {
        public abstract String getBundleId();

        public abstract CompletionStage<BeamFnApi.ProcessBundleResponse> getBundleResponse();

        public abstract CloseableFnDataReceiver<WindowedValue<InputT>> getInputReceiver();

        public abstract Map<BeamFnApi.Target, InboundDataClient> getOutputClients();

        public static <InputT> ActiveBundle<InputT> create(String str, CompletionStage<BeamFnApi.ProcessBundleResponse> completionStage, CloseableFnDataReceiver<WindowedValue<InputT>> closeableFnDataReceiver, Map<BeamFnApi.Target, InboundDataClient> map) {
            return new AutoValue_SdkHarnessClient_ActiveBundle(str, completionStage, closeableFnDataReceiver, map);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/SdkHarnessClient$BundleProcessor.class */
    public class BundleProcessor<T> {
        private final String processBundleDescriptorId;
        private final CompletionStage<BeamFnApi.RegisterResponse> registrationFuture;
        private final RemoteInputDestination<WindowedValue<T>> remoteInput;

        private BundleProcessor(String str, CompletionStage<BeamFnApi.RegisterResponse> completionStage, RemoteInputDestination<WindowedValue<T>> remoteInputDestination) {
            this.processBundleDescriptorId = str;
            this.registrationFuture = completionStage;
            this.remoteInput = remoteInputDestination;
        }

        public CompletionStage<BeamFnApi.RegisterResponse> getRegistrationFuture() {
            return this.registrationFuture;
        }

        public ActiveBundle<T> newBundle(Map<BeamFnApi.Target, RemoteOutputReceiver<?>> map) {
            String id = SdkHarnessClient.this.idGenerator.getId();
            CompletionStage<BeamFnApi.InstructionResponse> handle = SdkHarnessClient.this.fnApiControlClient.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference(this.processBundleDescriptorId)).build());
            SdkHarnessClient.LOG.debug("Sent {} with ID {} for {} with ID {}", new Object[]{BeamFnApi.ProcessBundleRequest.class.getSimpleName(), id, BeamFnApi.ProcessBundleDescriptor.class.getSimpleName(), this.processBundleDescriptorId});
            CompletionStage<U> thenApply = handle.thenApply((v0) -> {
                return v0.getProcessBundle();
            });
            HashMap hashMap = new HashMap();
            for (Map.Entry<BeamFnApi.Target, RemoteOutputReceiver<?>> entry : map.entrySet()) {
                hashMap.put(entry.getKey(), attachReceiver(id, entry.getKey(), entry.getValue()));
            }
            return ActiveBundle.create(id, thenApply, SdkHarnessClient.this.fnApiDataService.send(LogicalEndpoint.of(id, this.remoteInput.getTarget()), this.remoteInput.getCoder()), hashMap);
        }

        private <OutputT> InboundDataClient attachReceiver(String str, BeamFnApi.Target target, RemoteOutputReceiver<WindowedValue<OutputT>> remoteOutputReceiver) {
            return SdkHarnessClient.this.fnApiDataService.receive(LogicalEndpoint.of(str, target), remoteOutputReceiver.getCoder(), remoteOutputReceiver.getReceiver());
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/SdkHarnessClient$CountingIdGenerator.class */
    private static class CountingIdGenerator implements IdGenerator {
        private final AtomicLong nextId;

        private CountingIdGenerator() {
            this.nextId = new AtomicLong(0L);
        }

        @Override // org.apache.beam.runners.fnexecution.control.SdkHarnessClient.IdGenerator
        public String getId() {
            return String.valueOf(this.nextId.incrementAndGet());
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/SdkHarnessClient$IdGenerator.class */
    public interface IdGenerator {
        String getId();
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/SdkHarnessClient$RemoteInputDestination.class */
    public static abstract class RemoteInputDestination<T> {
        public static <T> RemoteInputDestination<T> of(Coder<T> coder, BeamFnApi.Target target) {
            return new AutoValue_SdkHarnessClient_RemoteInputDestination(coder, target);
        }

        public abstract Coder<T> getCoder();

        public abstract BeamFnApi.Target getTarget();
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/SdkHarnessClient$RemoteOutputReceiver.class */
    public static abstract class RemoteOutputReceiver<T> {
        public static <T> RemoteOutputReceiver of(Coder<T> coder, FnDataReceiver<T> fnDataReceiver) {
            return new AutoValue_SdkHarnessClient_RemoteOutputReceiver(coder, fnDataReceiver);
        }

        public abstract Coder<T> getCoder();

        public abstract FnDataReceiver<T> getReceiver();
    }

    private SdkHarnessClient(InstructionRequestHandler instructionRequestHandler, FnDataService fnDataService, IdGenerator idGenerator) {
        this.fnApiDataService = fnDataService;
        this.idGenerator = idGenerator;
        this.fnApiControlClient = instructionRequestHandler;
    }

    public static SdkHarnessClient usingFnApiClient(InstructionRequestHandler instructionRequestHandler, FnDataService fnDataService) {
        return new SdkHarnessClient(instructionRequestHandler, fnDataService, new CountingIdGenerator());
    }

    public SdkHarnessClient withIdGenerator(IdGenerator idGenerator) {
        return new SdkHarnessClient(this.fnApiControlClient, this.fnApiDataService, idGenerator);
    }

    public <T> BundleProcessor<T> getProcessor(BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, RemoteInputDestination<WindowedValue<T>> remoteInputDestination) {
        try {
            return this.clientProcessors.get(processBundleDescriptor.getId(), () -> {
                return register(Collections.singletonMap(processBundleDescriptor, remoteInputDestination)).get(processBundleDescriptor.getId());
            });
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public Map<String, BundleProcessor> register(Map<BeamFnApi.ProcessBundleDescriptor, RemoteInputDestination<WindowedValue<?>>> map) {
        LOG.debug("Registering {}", map.keySet());
        CompletionStage<U> thenApply = this.fnApiControlClient.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(this.idGenerator.getId()).setRegister(BeamFnApi.RegisterRequest.newBuilder().addAllProcessBundleDescriptor(map.keySet()).build()).build()).thenApply((v0) -> {
            return v0.getRegister();
        });
        for (Map.Entry<BeamFnApi.ProcessBundleDescriptor, RemoteInputDestination<WindowedValue<?>>> entry : map.entrySet()) {
            this.clientProcessors.put(entry.getKey().getId(), new BundleProcessor(entry.getKey().getId(), thenApply, entry.getValue()));
        }
        return this.clientProcessors.asMap();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
    }
}
