/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.logging;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.InProcessServerFactory;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.runners.fnexecution.logging.LogWriter;
import org.apache.beam.sdk.fn.test.Consumer;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class GrpcLoggingServiceTest {
    private Consumer<BeamFnApi.LogControl> messageDiscarder = item -> {};

    @Test
    public void testMultipleClientsSuccessfullyProcessed() throws Exception {
        ConcurrentLinkedQueue logs = new ConcurrentLinkedQueue();
        GrpcLoggingService service = GrpcLoggingService.forWriter((LogWriter)new CollectionAppendingLogWriter(logs));
        try (GrpcFnServer server = GrpcFnServer.allocatePortAndCreateFor((FnService)service, (ServerFactory)InProcessServerFactory.create());){
            ArrayList<Callable<Void>> tasks = new ArrayList<Callable<Void>>();
            int i = 1;
            while (i <= 3) {
                int instructionReference = i++;
                tasks.add(() -> {
                    CountDownLatch waitForServerHangup = new CountDownLatch(1);
                    String url = server.getApiServiceDescriptor().getUrl();
                    ManagedChannel channel = InProcessChannelBuilder.forName((String)url).build();
                    StreamObserver outboundObserver = BeamFnLoggingGrpc.newStub((Channel)channel).logging((StreamObserver)TestStreams.withOnNext(this.messageDiscarder).withOnCompleted((Runnable)new CountDown(waitForServerHangup)).build());
                    outboundObserver.onNext((Object)this.createLogsWithIds(instructionReference, -instructionReference));
                    outboundObserver.onCompleted();
                    waitForServerHangup.await();
                    return null;
                });
            }
            ExecutorService executorService = Executors.newCachedThreadPool();
            executorService.invokeAll(tasks);
            Assert.assertThat(logs, (Matcher)Matchers.containsInAnyOrder((Object[])new BeamFnApi.LogEntry[]{this.createLogWithId(1L), this.createLogWithId(2L), this.createLogWithId(3L), this.createLogWithId(-1L), this.createLogWithId(-2L), this.createLogWithId(-3L)}));
        }
    }

    @Test
    public void testMultipleClientsFailingIsHandledGracefullyByServer() throws Exception {
        ConcurrentLinkedQueue logs = new ConcurrentLinkedQueue();
        GrpcLoggingService service = GrpcLoggingService.forWriter((LogWriter)new CollectionAppendingLogWriter(logs));
        try (GrpcFnServer server = GrpcFnServer.allocatePortAndCreateFor((FnService)service, (ServerFactory)InProcessServerFactory.create());){
            ArrayList<Callable<Void>> tasks = new ArrayList<Callable<Void>>();
            int i = 1;
            while (i <= 3) {
                int instructionReference = i++;
                tasks.add(() -> {
                    CountDownLatch waitForTermination = new CountDownLatch(1);
                    ManagedChannel channel = InProcessChannelBuilder.forName((String)server.getApiServiceDescriptor().getUrl()).build();
                    StreamObserver outboundObserver = BeamFnLoggingGrpc.newStub((Channel)channel).logging((StreamObserver)TestStreams.withOnNext(this.messageDiscarder).withOnError((Runnable)new CountDown(waitForTermination)).build());
                    outboundObserver.onNext((Object)this.createLogsWithIds(instructionReference, -instructionReference));
                    outboundObserver.onError((Throwable)new RuntimeException("Client " + instructionReference));
                    waitForTermination.await();
                    return null;
                });
            }
            ExecutorService executorService = Executors.newCachedThreadPool();
            executorService.invokeAll(tasks);
        }
    }

    @Test
    public void testServerCloseHangsUpClients() throws Exception {
        LinkedBlockingQueue logs = new LinkedBlockingQueue();
        ExecutorService executorService = Executors.newCachedThreadPool();
        ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
        GrpcLoggingService service = GrpcLoggingService.forWriter((LogWriter)new CollectionAppendingLogWriter(logs));
        Throwable throwable = null;
        try (GrpcFnServer server = GrpcFnServer.allocatePortAndCreateFor((FnService)service, (ServerFactory)InProcessServerFactory.create());){
            int i;
            for (i = 1; i <= 3; ++i) {
                long instructionReference = i;
                futures.add(executorService.submit(() -> {
                    CountDownLatch waitForServerHangup = new CountDownLatch(1);
                    ManagedChannel channel = InProcessChannelBuilder.forName((String)server.getApiServiceDescriptor().getUrl()).build();
                    StreamObserver outboundObserver = BeamFnLoggingGrpc.newStub((Channel)channel).logging((StreamObserver)TestStreams.withOnNext(this.messageDiscarder).withOnCompleted((Runnable)new CountDown(waitForServerHangup)).build());
                    outboundObserver.onNext((Object)this.createLogsWithIds(instructionReference));
                    waitForServerHangup.await();
                    return null;
                }));
            }
            for (i = 1; i <= 3; ++i) {
                logs.take();
            }
        }
        catch (Throwable throwable2) {
            Throwable throwable3 = throwable2;
            throw throwable2;
        }
        for (Future future : futures) {
            future.get();
        }
    }

    private BeamFnApi.LogEntry.List createLogsWithIds(long ... ids) {
        BeamFnApi.LogEntry.List.Builder builder = BeamFnApi.LogEntry.List.newBuilder();
        for (long id : ids) {
            builder.addLogEntries(this.createLogWithId(id));
        }
        return builder.build();
    }

    private BeamFnApi.LogEntry createLogWithId(long id) {
        return BeamFnApi.LogEntry.newBuilder().setInstructionReference(Long.toString(id)).build();
    }

    private static class CountDown
    implements Runnable {
        private final CountDownLatch latch;

        CountDown(CountDownLatch latch) {
            this.latch = latch;
        }

        @Override
        public void run() {
            this.latch.countDown();
        }
    }

    private static class CollectionAppendingLogWriter
    implements LogWriter {
        private final Collection<BeamFnApi.LogEntry> entries;

        private CollectionAppendingLogWriter(Collection<BeamFnApi.LogEntry> entries) {
            this.entries = entries;
        }

        public void log(BeamFnApi.LogEntry entry) {
            this.entries.add(entry);
        }
    }
}

