package org.apache.beam.runners.fnexecution.logging;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.sdk.fn.server.GrpcFnServer;
import org.apache.beam.sdk.fn.server.InProcessServerFactory;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.StreamObserver;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/fnexecution/logging/GrpcLoggingServiceTest.class */
public class GrpcLoggingServiceTest {
    private Consumer<BeamFnApi.LogControl> messageDiscarder = logControl -> {
    };

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/logging/GrpcLoggingServiceTest$CollectionAppendingLogWriter.class */
    private static class CollectionAppendingLogWriter implements LogWriter {
        private final Collection<BeamFnApi.LogEntry> entries;

        private CollectionAppendingLogWriter(Collection<BeamFnApi.LogEntry> collection) {
            this.entries = collection;
        }

        public void log(BeamFnApi.LogEntry logEntry) {
            this.entries.add(logEntry);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/logging/GrpcLoggingServiceTest$CountDown.class */
    private static class CountDown implements Runnable {
        private final CountDownLatch latch;

        CountDown(CountDownLatch countDownLatch) {
            this.latch = countDownLatch;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.latch.countDown();
        }
    }

    @Test
    public void testMultipleClientsSuccessfullyProcessed() throws Exception {
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        GrpcFnServer allocatePortAndCreateFor = GrpcFnServer.allocatePortAndCreateFor(GrpcLoggingService.forWriter(new CollectionAppendingLogWriter(concurrentLinkedQueue)), InProcessServerFactory.create());
        Throwable th = null;
        try {
            try {
                ArrayList arrayList = new ArrayList();
                for (int i = 1; i <= 3; i++) {
                    int i2 = i;
                    arrayList.add(() -> {
                        CountDownLatch countDownLatch = new CountDownLatch(1);
                        StreamObserver logging = BeamFnLoggingGrpc.newStub(InProcessChannelBuilder.forName(allocatePortAndCreateFor.getApiServiceDescriptor().getUrl()).build()).logging(TestStreams.withOnNext(this.messageDiscarder).withOnCompleted(new CountDown(countDownLatch)).build());
                        logging.onNext(createLogsWithIds(i2, -i2));
                        logging.onCompleted();
                        countDownLatch.await();
                        return null;
                    });
                }
                Executors.newCachedThreadPool().invokeAll(arrayList);
                MatcherAssert.assertThat(concurrentLinkedQueue, Matchers.containsInAnyOrder(new BeamFnApi.LogEntry[]{createLogWithId(1L), createLogWithId(2L), createLogWithId(3L), createLogWithId(-1L), createLogWithId(-2L), createLogWithId(-3L)}));
                if (allocatePortAndCreateFor != null) {
                    $closeResource(null, allocatePortAndCreateFor);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (allocatePortAndCreateFor != null) {
                $closeResource(th, allocatePortAndCreateFor);
            }
            throw th3;
        }
    }

    @Test
    public void testMultipleClientsFailingIsHandledGracefullyByServer() throws Exception {
        GrpcFnServer allocatePortAndCreateFor = GrpcFnServer.allocatePortAndCreateFor(GrpcLoggingService.forWriter(new CollectionAppendingLogWriter(new ConcurrentLinkedQueue())), InProcessServerFactory.create());
        Throwable th = null;
        try {
            try {
                CountDownLatch countDownLatch = new CountDownLatch(3);
                LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
                ArrayList arrayList = new ArrayList();
                for (int i = 1; i <= 3; i++) {
                    int i2 = i;
                    arrayList.add(() -> {
                        StreamObserver logging = BeamFnLoggingGrpc.newStub(InProcessChannelBuilder.forName(allocatePortAndCreateFor.getApiServiceDescriptor().getUrl()).build()).logging(TestStreams.withOnNext(this.messageDiscarder).withOnError(new CountDown(countDownLatch)).build());
                        logging.onNext(createLogsWithIds(i2, -i2));
                        linkedBlockingQueue.add(logging);
                        return null;
                    });
                }
                Executors.newCachedThreadPool().invokeAll(arrayList);
                for (int i3 = 1; i3 <= 3; i3++) {
                    ((StreamObserver) linkedBlockingQueue.take()).onError(new RuntimeException("Client " + i3));
                }
                countDownLatch.await();
                if (allocatePortAndCreateFor != null) {
                    $closeResource(null, allocatePortAndCreateFor);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (allocatePortAndCreateFor != null) {
                $closeResource(th, allocatePortAndCreateFor);
            }
            throw th3;
        }
    }

    @Test
    public void testServerCloseHangsUpClients() throws Exception {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        ArrayList arrayList = new ArrayList();
        GrpcFnServer allocatePortAndCreateFor = GrpcFnServer.allocatePortAndCreateFor(GrpcLoggingService.forWriter(new CollectionAppendingLogWriter(linkedBlockingQueue)), InProcessServerFactory.create());
        Throwable th = null;
        for (int i = 1; i <= 3; i++) {
            try {
                try {
                    long j = i;
                    arrayList.add(newCachedThreadPool.submit(() -> {
                        CountDownLatch countDownLatch = new CountDownLatch(1);
                        BeamFnLoggingGrpc.newStub(InProcessChannelBuilder.forName(allocatePortAndCreateFor.getApiServiceDescriptor().getUrl()).build()).logging(TestStreams.withOnNext(this.messageDiscarder).withOnCompleted(new CountDown(countDownLatch)).build()).onNext(createLogsWithIds(j));
                        countDownLatch.await();
                        return null;
                    }));
                } finally {
                }
            } catch (Throwable th2) {
                if (allocatePortAndCreateFor != null) {
                    $closeResource(th, allocatePortAndCreateFor);
                }
                throw th2;
            }
        }
        for (int i2 = 1; i2 <= 3; i2++) {
            linkedBlockingQueue.take();
        }
        if (allocatePortAndCreateFor != null) {
            $closeResource(null, allocatePortAndCreateFor);
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
    }

    private BeamFnApi.LogEntry.List createLogsWithIds(long... jArr) {
        BeamFnApi.LogEntry.List.Builder newBuilder = BeamFnApi.LogEntry.List.newBuilder();
        for (long j : jArr) {
            newBuilder.addLogEntries(createLogWithId(j));
        }
        return newBuilder.build();
    }

    private BeamFnApi.LogEntry createLogWithId(long j) {
        return BeamFnApi.LogEntry.newBuilder().setInstructionId(Long.toString(j)).build();
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
