package org.apache.beam.fn.harness.status;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.control.ExecutionStateSampler;
import org.apache.beam.fn.harness.control.ProcessBundleHandler;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnWorkerStatusGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessServerBuilder;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.util.ProcessIdUtil;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/status/BeamFnStatusClientTest.class */
public class BeamFnStatusClientTest {
    private final Endpoints.ApiServiceDescriptor apiServiceDescriptor = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(getClass().getName() + ProcessIdUtil.DEFAULT_PROCESSID + UUID.randomUUID().toString()).build();

    @Test
    public void testActiveBundleState() {
        ProcessBundleHandler processBundleHandler = (ProcessBundleHandler) Mockito.mock(ProcessBundleHandler.class);
        ProcessBundleHandler.BundleProcessorCache bundleProcessorCache = (ProcessBundleHandler.BundleProcessorCache) Mockito.mock(ProcessBundleHandler.BundleProcessorCache.class);
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 11; i++) {
            ProcessBundleHandler.BundleProcessor bundleProcessor = (ProcessBundleHandler.BundleProcessor) Mockito.mock(ProcessBundleHandler.BundleProcessor.class);
            ExecutionStateSampler.ExecutionStateTracker executionStateTracker = (ExecutionStateSampler.ExecutionStateTracker) Mockito.mock(ExecutionStateSampler.ExecutionStateTracker.class);
            Mockito.when(bundleProcessor.getStateTracker()).thenReturn(executionStateTracker);
            Mockito.when(executionStateTracker.getStatus()).thenReturn(ExecutionStateSampler.ExecutionStateTrackerStatus.create("ptransformId", "ptransformIdName", Thread.currentThread(), i * 1000));
            String num = Integer.toString(i);
            Mockito.when(bundleProcessorCache.find(num)).thenReturn(bundleProcessor);
            hashMap.put(num, bundleProcessor);
        }
        Mockito.when(processBundleHandler.getBundleProcessorCache()).thenReturn(bundleProcessorCache);
        Mockito.when(bundleProcessorCache.getActiveBundleProcessors()).thenReturn(hashMap);
        ManagedChannelFactory createInProcess = ManagedChannelFactory.createInProcess();
        Endpoints.ApiServiceDescriptor apiServiceDescriptor = this.apiServiceDescriptor;
        Objects.requireNonNull(createInProcess);
        BeamFnStatusClient beamFnStatusClient = new BeamFnStatusClient(apiServiceDescriptor, createInProcess::forDescriptor, processBundleHandler.getBundleProcessorCache(), PipelineOptionsFactory.create(), Caches.noop());
        StringJoiner stringJoiner = new StringJoiner(StringUtils.LF);
        stringJoiner.add(beamFnStatusClient.getActiveProcessBundleState());
        String stringJoiner2 = stringJoiner.toString();
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < 10; i2++) {
            arrayList.add(String.format("Instruction %d", Integer.valueOf(i2)));
        }
        MatcherAssert.assertThat(stringJoiner2, Matchers.stringContainsInOrder(arrayList));
        MatcherAssert.assertThat(stringJoiner2, (Matcher<? super String>) Matchers.not((Matcher) Matchers.containsString("Instruction 10")));
    }

    @Test
    public void testWorkerStatusResponse() throws Exception {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        final LinkedBlockingQueue linkedBlockingQueue2 = new LinkedBlockingQueue();
        Objects.requireNonNull(linkedBlockingQueue);
        final CallStreamObserver build = TestStreams.withOnNext((v1) -> {
            r0.add(v1);
        }).build();
        Server build2 = InProcessServerBuilder.forName(this.apiServiceDescriptor.getUrl()).addService(new BeamFnWorkerStatusGrpc.BeamFnWorkerStatusImplBase() { // from class: org.apache.beam.fn.harness.status.BeamFnStatusClientTest.1
            @Override // org.apache.beam.model.fnexecution.v1.BeamFnWorkerStatusGrpc.AsyncService
            public StreamObserver<BeamFnApi.WorkerStatusResponse> workerStatus(StreamObserver<BeamFnApi.WorkerStatusRequest> streamObserver) {
                Uninterruptibles.putUninterruptibly(linkedBlockingQueue2, streamObserver);
                return build;
            }
        }).build();
        build2.start();
        try {
            ProcessBundleHandler.BundleProcessorCache bundleProcessorCache = (ProcessBundleHandler.BundleProcessorCache) Mockito.mock(ProcessBundleHandler.BundleProcessorCache.class);
            Mockito.when(bundleProcessorCache.getActiveBundleProcessors()).thenReturn(Collections.emptyMap());
            ManagedChannelFactory createInProcess = ManagedChannelFactory.createInProcess();
            Endpoints.ApiServiceDescriptor apiServiceDescriptor = this.apiServiceDescriptor;
            Objects.requireNonNull(createInProcess);
            new BeamFnStatusClient(apiServiceDescriptor, createInProcess::forDescriptor, bundleProcessorCache, PipelineOptionsFactory.create(), Caches.noop());
            ((StreamObserver) linkedBlockingQueue2.take()).onNext(BeamFnApi.WorkerStatusRequest.newBuilder().setId("id").build());
            BeamFnApi.WorkerStatusResponse workerStatusResponse = (BeamFnApi.WorkerStatusResponse) linkedBlockingQueue.take();
            MatcherAssert.assertThat(workerStatusResponse.getStatusInfo(), Matchers.containsString("No active processing bundles."));
            MatcherAssert.assertThat(workerStatusResponse.getId(), (Matcher<? super String>) Matchers.is("id"));
            build2.shutdownNow();
        } catch (Throwable th) {
            build2.shutdownNow();
            throw th;
        }
    }

    @Test
    public void testCacheStatsExist() {
        ManagedChannelFactory createInProcess = ManagedChannelFactory.createInProcess();
        Endpoints.ApiServiceDescriptor apiServiceDescriptor = this.apiServiceDescriptor;
        Objects.requireNonNull(createInProcess);
        MatcherAssert.assertThat(new BeamFnStatusClient(apiServiceDescriptor, createInProcess::forDescriptor, (ProcessBundleHandler.BundleProcessorCache) Mockito.mock(ProcessBundleHandler.BundleProcessorCache.class), PipelineOptionsFactory.create(), Caches.fromOptions(PipelineOptionsFactory.fromArgs(new String[]{"--maxCacheMemoryUsageMb=234"}).create())).getCacheStats(), Matchers.containsString("used/max 0/234 MB"));
    }
}
