package org.apache.beam.fn.harness.data;

import org.apache.beam.fn.harness.data.BeamFnTimerClient;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;

/* loaded from: input_file:org/apache/beam/fn/harness/data/BeamFnTimerGrpcClient.class */
public class BeamFnTimerGrpcClient implements BeamFnTimerClient {
    private final BeamFnDataClient beamFnDataClient;
    private final Endpoints.ApiServiceDescriptor timerApiServiceDescriptor;

    public BeamFnTimerGrpcClient(BeamFnDataClient beamFnDataClient, Endpoints.ApiServiceDescriptor apiServiceDescriptor) {
        this.beamFnDataClient = beamFnDataClient;
        this.timerApiServiceDescriptor = apiServiceDescriptor;
    }

    @Override // org.apache.beam.fn.harness.data.BeamFnTimerClient
    public <K> BeamFnTimerClient.TimerHandler<K> register(LogicalEndpoint logicalEndpoint, Coder<Timer<K>> coder, FnDataReceiver<Timer<K>> fnDataReceiver) {
        Preconditions.checkArgument(logicalEndpoint.isTimer(), "Expected to receive timer endpoint but received %s", logicalEndpoint);
        final InboundDataClient receive = this.beamFnDataClient.receive(this.timerApiServiceDescriptor, logicalEndpoint, coder, fnDataReceiver);
        final CloseableFnDataReceiver send = this.beamFnDataClient.send(this.timerApiServiceDescriptor, logicalEndpoint, coder);
        return new BeamFnTimerClient.TimerHandler<K>() { // from class: org.apache.beam.fn.harness.data.BeamFnTimerGrpcClient.1
            @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver
            public void flush() throws Exception {
                send.flush();
            }

            @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver, java.lang.AutoCloseable
            public void close() throws Exception {
                send.close();
            }

            @Override // org.apache.beam.sdk.fn.data.FnDataReceiver
            public void accept(Timer<K> timer) throws Exception {
                send.accept(timer);
            }

            @Override // org.apache.beam.sdk.fn.data.InboundDataClient
            public void awaitCompletion() throws InterruptedException, Exception {
                receive.awaitCompletion();
            }

            @Override // org.apache.beam.sdk.fn.data.InboundDataClient
            public void runWhenComplete(Runnable runnable) {
                receive.runWhenComplete(runnable);
            }

            @Override // org.apache.beam.sdk.fn.data.InboundDataClient
            public boolean isDone() {
                return receive.isDone();
            }

            @Override // org.apache.beam.sdk.fn.data.InboundDataClient
            public void cancel() {
                receive.cancel();
            }

            @Override // org.apache.beam.sdk.fn.data.InboundDataClient
            public void complete() {
                receive.complete();
            }

            @Override // org.apache.beam.sdk.fn.data.InboundDataClient
            public void fail(Throwable th) {
                receive.fail(th);
            }
        };
    }
}
