package org.apache.beam.runners.fnexecution.status;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/fnexecution/status/WorkerStatusClient.class */
public class WorkerStatusClient implements Closeable {
    public static final Logger LOG = LoggerFactory.getLogger((Class<?>) WorkerStatusClient.class);
    private final StreamObserver<BeamFnApi.WorkerStatusRequest> requestReceiver;
    private final String workerId;
    private final IdGenerator idGenerator = IdGenerators.incrementingLongs();
    private final Map<String, CompletableFuture<BeamFnApi.WorkerStatusResponse>> pendingResponses = Collections.synchronizedMap(new HashMap());
    private AtomicBoolean isClosed = new AtomicBoolean(false);

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/status/WorkerStatusClient$ResponseStreamObserver.class */
    private class ResponseStreamObserver implements StreamObserver<BeamFnApi.WorkerStatusResponse> {
        private ResponseStreamObserver() {
        }

        @Override // org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver
        public void onNext(BeamFnApi.WorkerStatusResponse workerStatusResponse) {
            if (WorkerStatusClient.this.isClosed.get()) {
                return;
            }
            CompletableFuture completableFuture = (CompletableFuture) WorkerStatusClient.this.pendingResponses.remove(workerStatusResponse.getId());
            if (completableFuture != null) {
                completableFuture.complete(workerStatusResponse);
            } else {
                WorkerStatusClient.LOG.warn(String.format("Received response for status with unknown response id %s and status %s", workerStatusResponse.getId(), workerStatusResponse.getStatusInfo()));
            }
        }

        @Override // org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver
        public void onError(Throwable th) {
            WorkerStatusClient.LOG.error("{} received error {}", WorkerStatusClient.class.getSimpleName(), th);
            onCompleted();
        }

        @Override // org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver
        public void onCompleted() {
            try {
                WorkerStatusClient.this.close();
            } catch (IOException e) {
                WorkerStatusClient.LOG.warn("Error closing Fn status api client", (Throwable) e);
            }
        }
    }

    private WorkerStatusClient(String str, StreamObserver<BeamFnApi.WorkerStatusRequest> streamObserver) {
        this.requestReceiver = SynchronizedStreamObserver.wrapping(streamObserver);
        this.workerId = str;
    }

    public static WorkerStatusClient forRequestObserver(String str, StreamObserver<BeamFnApi.WorkerStatusRequest> streamObserver) {
        return new WorkerStatusClient(str, streamObserver);
    }

    public CompletableFuture<BeamFnApi.WorkerStatusResponse> getWorkerStatus() {
        return getWorkerStatus(BeamFnApi.WorkerStatusRequest.newBuilder().setId(this.idGenerator.getId()).build());
    }

    CompletableFuture<BeamFnApi.WorkerStatusResponse> getWorkerStatus(BeamFnApi.WorkerStatusRequest workerStatusRequest) {
        CompletableFuture<BeamFnApi.WorkerStatusResponse> completableFuture = new CompletableFuture<>();
        if (this.isClosed.get()) {
            completableFuture.completeExceptionally(new RuntimeException("Worker status client already closed."));
            return completableFuture;
        }
        this.pendingResponses.put(workerStatusRequest.getId(), completableFuture);
        this.requestReceiver.onNext(workerStatusRequest);
        return completableFuture;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.isClosed.getAndSet(true)) {
            return;
        }
        synchronized (this.pendingResponses) {
            Iterator<CompletableFuture<BeamFnApi.WorkerStatusResponse>> it = this.pendingResponses.values().iterator();
            while (it.hasNext()) {
                it.next().completeExceptionally(new RuntimeException("Fn Status Api client shut down while waiting for the request"));
            }
            this.pendingResponses.clear();
        }
        this.requestReceiver.onCompleted();
    }

    public boolean isClosed() {
        return this.isClosed.get();
    }

    public String getWorkerId() {
        return this.workerId;
    }

    public StreamObserver<BeamFnApi.WorkerStatusResponse> getResponseObserver() {
        return new ResponseStreamObserver();
    }
}
