package org.apache.beam.fn.harness;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.BeamFnDataReadRunner;
import org.apache.beam.fn.harness.HandlesSplits;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.PTransformRunnerFactoryTestContext;
import org.apache.beam.fn.harness.control.BundleProgressReporter;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.MonitoringInfoEncodings;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.DataEndpoint;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
import org.apache.beam.sdk.fn.test.TestExecutors;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.logging.log4j.message.StructuredDataId;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsMapContaining;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(Enclosed.class)
/* loaded from: input_file:org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.class */
public class BeamFnDataReadRunnerTest {
    private static final String ELEMENT_CODER_SPEC_ID = "string-coder-id";
    private static final RunnerApi.Coder CODER_SPEC;
    private static final RunnerApi.Components COMPONENTS;
    private static final String DEFAULT_BUNDLE_ID = "57";
    private static final String INPUT_TRANSFORM_ID = "1";
    private static final String PTRANSFORM_ID = "ptransform_id";
    private static final MetricsApi.MonitoringInfo DATA_CHANNEL_READ_IDX_MONITORING_INFO;
    private static final Coder<String> ELEMENT_CODER = StringUtf8Coder.of();
    private static final Coder<WindowedValue<String>> CODER = WindowedValue.getFullCoder(ELEMENT_CODER, GlobalWindow.Coder.INSTANCE);
    private static final String CODER_SPEC_ID = "windowed-string-coder-id";
    private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder().setApiServiceDescriptor(Endpoints.ApiServiceDescriptor.getDefaultInstance()).setCoderId(CODER_SPEC_ID).build();

    @RunWith(JUnit4.class)
    /* loaded from: input_file:org/apache/beam/fn/harness/BeamFnDataReadRunnerTest$BeamFnDataReadRunnerExecutionTest.class */
    public static class BeamFnDataReadRunnerExecutionTest {

        @Rule
        public TestExecutors.TestExecutorService executor = TestExecutors.from((Supplier<ExecutorService>) Executors::newCachedThreadPool);

        @Before
        public void setUp() {
            MockitoAnnotations.initMocks(this);
        }

        @Test
        public void testCreatingAndProcessingBeamFnDataReadRunner() throws Exception {
            ArrayList arrayList = new ArrayList();
            PTransformRunnerFactoryTestContext build = PTransformRunnerFactoryTestContext.builder(BeamFnDataReadRunnerTest.INPUT_TRANSFORM_ID, RemoteGrpcPortRead.readFromPort(BeamFnDataReadRunnerTest.PORT_SPEC, "outputPC").toPTransform()).processBundleInstructionId(BeamFnDataReadRunnerTest.DEFAULT_BUNDLE_ID).pCollections(ImmutableMap.of("outputPC", RunnerApi.PCollection.newBuilder().setCoderId(BeamFnDataReadRunnerTest.ELEMENT_CODER_SPEC_ID).build())).coders(BeamFnDataReadRunnerTest.COMPONENTS.getCodersMap()).windowingStrategies(BeamFnDataReadRunnerTest.COMPONENTS.getWindowingStrategiesMap()).build();
            Objects.requireNonNull(arrayList);
            build.addPCollectionConsumer("outputPC", (v1) -> {
                r2.add(v1);
            });
            new BeamFnDataReadRunner.Factory().createRunnerForPTransform(build);
            MatcherAssert.assertThat(build.getTearDownFunctions(), (Matcher<? super List<ThrowingRunnable>>) Matchers.empty());
            MatcherAssert.assertThat(build.getStartBundleFunctions(), (Matcher<? super List<ThrowingRunnable>>) Matchers.empty());
            MatcherAssert.assertThat(build.getPCollectionConsumers().keySet(), (Matcher<? super Set<String>>) Matchers.containsInAnyOrder("outputPC"));
            MatcherAssert.assertThat(build.getIncomingDataEndpoints().keySet(), (Matcher<? super Set<Endpoints.ApiServiceDescriptor>>) Matchers.hasSize(1));
            DataEndpoint dataEndpoint = (DataEndpoint) Iterables.getOnlyElement(build.getIncomingDataEndpoints().get(BeamFnDataReadRunnerTest.PORT_SPEC.getApiServiceDescriptor()));
            Assert.assertEquals(BeamFnDataReadRunnerTest.INPUT_TRANSFORM_ID, dataEndpoint.getTransformId());
            Assert.assertEquals(BeamFnDataReadRunnerTest.CODER, dataEndpoint.getCoder());
            dataEndpoint.getReceiver().accept(WindowedValue.valueInGlobalWindow("TestValue"));
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.valueInGlobalWindow("TestValue")));
            arrayList.clear();
            ((ThrowingRunnable) Iterables.getOnlyElement(build.getFinishBundleFunctions())).run();
        }

        @Test
        public void testReuseForMultipleBundles() throws Exception {
            ArrayList arrayList = new ArrayList();
            AtomicReference atomicReference = new AtomicReference("0");
            PTransformRunnerFactoryTestContext.Builder builder = PTransformRunnerFactoryTestContext.builder(BeamFnDataReadRunnerTest.INPUT_TRANSFORM_ID, RemoteGrpcPortRead.readFromPort(BeamFnDataReadRunnerTest.PORT_SPEC, "outputPC").toPTransform());
            Objects.requireNonNull(atomicReference);
            PTransformRunnerFactoryTestContext build = builder.processBundleInstructionIdSupplier(atomicReference::get).pCollections(ImmutableMap.of("outputPC", RunnerApi.PCollection.newBuilder().setCoderId(BeamFnDataReadRunnerTest.ELEMENT_CODER_SPEC_ID).build())).coders(BeamFnDataReadRunnerTest.COMPONENTS.getCodersMap()).windowingStrategies(BeamFnDataReadRunnerTest.COMPONENTS.getWindowingStrategiesMap()).build();
            Objects.requireNonNull(arrayList);
            build.addPCollectionConsumer("outputPC", (v1) -> {
                r2.add(v1);
            });
            BeamFnDataReadRunner createRunnerForPTransform = new BeamFnDataReadRunner.Factory().createRunnerForPTransform(build);
            MatcherAssert.assertThat(build.getIncomingDataEndpoints().keySet(), (Matcher<? super Set<Endpoints.ApiServiceDescriptor>>) Matchers.hasSize(1));
            DataEndpoint dataEndpoint = (DataEndpoint) Iterables.getOnlyElement(build.getIncomingDataEndpoints().get(BeamFnDataReadRunnerTest.PORT_SPEC.getApiServiceDescriptor()));
            BeamFnDataReadRunnerTest.assertIntermediateMonitoringDataDataChannelReadIndexEquals(build.getShortIdMap(), build.getBundleProgressReporters(), -1L);
            dataEndpoint.getReceiver().accept(WindowedValue.valueInGlobalWindow("ABC"));
            BeamFnDataReadRunnerTest.assertIntermediateMonitoringDataDataChannelReadIndexEquals(build.getShortIdMap(), build.getBundleProgressReporters(), 0L);
            dataEndpoint.getReceiver().accept(WindowedValue.valueInGlobalWindow("DEF"));
            BeamFnDataReadRunnerTest.assertIntermediateMonitoringDataDataChannelReadIndexEquals(build.getShortIdMap(), build.getBundleProgressReporters(), 1L);
            createRunnerForPTransform.blockTillReadFinishes();
            BeamFnDataReadRunnerTest.assertIntermediateMonitoringDataDataChannelReadIndexEquals(build.getShortIdMap(), build.getBundleProgressReporters(), 2L);
            BeamFnDataReadRunnerTest.assertFinalMonitoringDataDataChannelReadIndexEquals(build.getShortIdMap(), build.getBundleProgressReporters(), 2L);
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.valueInGlobalWindow("ABC"), WindowedValue.valueInGlobalWindow("DEF")));
            atomicReference.set(null);
            createRunnerForPTransform.reset();
            arrayList.clear();
            BeamFnDataReadRunnerTest.assertIntermediateMonitoringDataDataChannelReadIndexEquals(build.getShortIdMap(), build.getBundleProgressReporters(), -1L);
            atomicReference.set(BeamFnDataReadRunnerTest.INPUT_TRANSFORM_ID);
            dataEndpoint.getReceiver().accept(WindowedValue.valueInGlobalWindow("GHI"));
            BeamFnDataReadRunnerTest.assertIntermediateMonitoringDataDataChannelReadIndexEquals(build.getShortIdMap(), build.getBundleProgressReporters(), 0L);
            dataEndpoint.getReceiver().accept(WindowedValue.valueInGlobalWindow("JKL"));
            BeamFnDataReadRunnerTest.assertIntermediateMonitoringDataDataChannelReadIndexEquals(build.getShortIdMap(), build.getBundleProgressReporters(), 1L);
            createRunnerForPTransform.blockTillReadFinishes();
            BeamFnDataReadRunnerTest.assertIntermediateMonitoringDataDataChannelReadIndexEquals(build.getShortIdMap(), build.getBundleProgressReporters(), 2L);
            BeamFnDataReadRunnerTest.assertFinalMonitoringDataDataChannelReadIndexEquals(build.getShortIdMap(), build.getBundleProgressReporters(), 2L);
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.valueInGlobalWindow("GHI"), WindowedValue.valueInGlobalWindow("JKL")));
        }

        @Test
        public void testRegistration() {
            Iterator it = ServiceLoader.load(PTransformRunnerFactory.Registrar.class).iterator();
            while (it.hasNext()) {
                PTransformRunnerFactory.Registrar registrar = (PTransformRunnerFactory.Registrar) it.next();
                if (registrar instanceof BeamFnDataReadRunner.Registrar) {
                    MatcherAssert.assertThat(registrar.getPTransformRunnerFactories(), (Matcher<? super Map>) IsMapContaining.hasKey(RemoteGrpcPortRead.URN));
                    return;
                }
            }
            Assert.fail("Expected registrar not found.");
        }

        @Test
        public void testSplittingBeforeStartBundle() throws Exception {
            ArrayList arrayList = new ArrayList();
            Objects.requireNonNull(arrayList);
            BeamFnDataReadRunner createReadRunner = BeamFnDataReadRunnerTest.createReadRunner((v1) -> {
                r0.add(v1);
            }, BeamFnDataReadRunnerTest.PTRANSFORM_ID);
            Assert.assertEquals(BeamFnDataReadRunnerTest.channelSplitResult(5L), BeamFnDataReadRunnerTest.executeSplit(createReadRunner, BeamFnDataReadRunnerTest.PTRANSFORM_ID, BeamFnDataReadRunnerTest.DEFAULT_BUNDLE_ID, -1L, 0.5d, 10L, Collections.EMPTY_LIST));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow("A"));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow("B"));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow("C"));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow("D"));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow("E"));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow("F"));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow("G"));
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.valueInGlobalWindow("A"), WindowedValue.valueInGlobalWindow("B"), WindowedValue.valueInGlobalWindow("C"), WindowedValue.valueInGlobalWindow("D"), WindowedValue.valueInGlobalWindow("E")));
        }

        @Test
        public void testSplittingWhenNoElementsProcessed() throws Exception {
            ArrayList arrayList = new ArrayList();
            Objects.requireNonNull(arrayList);
            BeamFnDataReadRunner createReadRunner = BeamFnDataReadRunnerTest.createReadRunner((v1) -> {
                r0.add(v1);
            }, BeamFnDataReadRunnerTest.PTRANSFORM_ID);
            Assert.assertEquals(BeamFnDataReadRunnerTest.channelSplitResult(5L), BeamFnDataReadRunnerTest.executeSplit(createReadRunner, BeamFnDataReadRunnerTest.PTRANSFORM_ID, BeamFnDataReadRunnerTest.DEFAULT_BUNDLE_ID, -1L, 0.5d, 10L, Collections.EMPTY_LIST));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow("A"));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow("B"));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow("C"));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow("D"));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow("E"));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow("F"));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow("G"));
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.valueInGlobalWindow("A"), WindowedValue.valueInGlobalWindow("B"), WindowedValue.valueInGlobalWindow("C"), WindowedValue.valueInGlobalWindow("D"), WindowedValue.valueInGlobalWindow("E")));
        }

        @Test
        public void testSplittingWhenSomeElementsProcessed() throws Exception {
            ArrayList arrayList = new ArrayList();
            Objects.requireNonNull(arrayList);
            BeamFnDataReadRunner createReadRunner = BeamFnDataReadRunnerTest.createReadRunner((v1) -> {
                r0.add(v1);
            }, BeamFnDataReadRunnerTest.PTRANSFORM_ID);
            Assert.assertEquals(BeamFnDataReadRunnerTest.channelSplitResult(6L), BeamFnDataReadRunnerTest.executeSplit(createReadRunner, BeamFnDataReadRunnerTest.PTRANSFORM_ID, BeamFnDataReadRunnerTest.DEFAULT_BUNDLE_ID, 1L, 0.5d, 10L, Collections.EMPTY_LIST));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow(BeamFnDataReadRunnerTest.INPUT_TRANSFORM_ID));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow("2"));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow("3"));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow("4"));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow("5"));
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.valueInGlobalWindow(StructuredDataId.RESERVED), WindowedValue.valueInGlobalWindow("0"), WindowedValue.valueInGlobalWindow(BeamFnDataReadRunnerTest.INPUT_TRANSFORM_ID), WindowedValue.valueInGlobalWindow("2"), WindowedValue.valueInGlobalWindow("3"), WindowedValue.valueInGlobalWindow("4")));
        }

        @Test
        public void testSplittingAfterReuse() throws Exception {
            ArrayList arrayList = new ArrayList();
            Objects.requireNonNull(arrayList);
            BeamFnDataReadRunner createReadRunner = BeamFnDataReadRunnerTest.createReadRunner((v1) -> {
                r0.add(v1);
            }, BeamFnDataReadRunnerTest.PTRANSFORM_ID);
            Assert.assertEquals(BeamFnApi.ProcessBundleSplitResponse.getDefaultInstance(), BeamFnDataReadRunnerTest.executeSplit(createReadRunner, BeamFnDataReadRunnerTest.PTRANSFORM_ID, "previousBundleId", 1L, 0.25d, 10L, Collections.EMPTY_LIST));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow(BeamFnDataReadRunnerTest.INPUT_TRANSFORM_ID));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow("2"));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow("3"));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow("4"));
            createReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow("5"));
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.valueInGlobalWindow(StructuredDataId.RESERVED), WindowedValue.valueInGlobalWindow("0"), WindowedValue.valueInGlobalWindow(BeamFnDataReadRunnerTest.INPUT_TRANSFORM_ID), WindowedValue.valueInGlobalWindow("2"), WindowedValue.valueInGlobalWindow("3"), WindowedValue.valueInGlobalWindow("4"), WindowedValue.valueInGlobalWindow("5")));
        }
    }

    @RunWith(Parameterized.class)
    /* loaded from: input_file:org/apache/beam/fn/harness/BeamFnDataReadRunnerTest$ChannelSplitTest.class */
    public static class ChannelSplitTest {

        @Parameterized.Parameter(0)
        public BeamFnApi.ProcessBundleSplitResponse expectedResponse;

        @Parameterized.Parameter(1)
        public long index;

        @Parameterized.Parameter(2)
        public double elementProgress;

        @Parameterized.Parameter(3)
        public double fractionOfRemainder;

        @Parameterized.Parameter(4)
        public long bufferSize;

        @Parameterized.Parameters
        public static Iterable<Object[]> data() {
            return ImmutableList.builder().add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(1L), 0L, 0, 0, 16L}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(4L), 0L, 0, Double.valueOf(0.24d), 16L}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(4L), 0L, 0, Double.valueOf(0.25d), 16L}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(4L), 0L, 0, Double.valueOf(0.26d), 16L}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(8L), 0L, 0, Double.valueOf(0.5d), 16L}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(9L), 2, 0, Double.valueOf(0.5d), 16L}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(11L), 6L, 0, Double.valueOf(0.5d), 16L}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(1L), 0L, Double.valueOf(0.5d), Double.valueOf(0.25d), 4L}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(2L), 0L, Double.valueOf(0.9d), Double.valueOf(0.25d), 4L}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(2L), 1L, 0, Double.valueOf(0.25d), 4L}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(2L), 1L, Double.valueOf(0.1d), Double.valueOf(0.25d), 4L}).build();
        }

        @Test
        public void testChannelSplit() throws Exception {
            SplittingReceiver splittingReceiver = (SplittingReceiver) Mockito.mock(SplittingReceiver.class);
            Mockito.when(Double.valueOf(splittingReceiver.getProgress())).thenReturn(Double.valueOf(this.elementProgress));
            Assert.assertEquals(this.expectedResponse, BeamFnDataReadRunnerTest.executeSplit(BeamFnDataReadRunnerTest.createReadRunner(splittingReceiver, BeamFnDataReadRunnerTest.PTRANSFORM_ID), BeamFnDataReadRunnerTest.PTRANSFORM_ID, BeamFnDataReadRunnerTest.DEFAULT_BUNDLE_ID, this.index, this.fractionOfRemainder, this.bufferSize, Collections.EMPTY_LIST));
        }
    }

    @RunWith(Parameterized.class)
    /* loaded from: input_file:org/apache/beam/fn/harness/BeamFnDataReadRunnerTest$ChannelSplitWithAllowedSplitPointsTest.class */
    public static class ChannelSplitWithAllowedSplitPointsTest {

        @Parameterized.Parameter(0)
        public BeamFnApi.ProcessBundleSplitResponse expectedResponse;

        @Parameterized.Parameter(1)
        public long index;

        @Parameterized.Parameter(2)
        public double fractionOfRemainder;

        @Parameterized.Parameter(3)
        public long bufferSize;

        @Parameterized.Parameter(4)
        public List<Long> allowedSplitPoints;

        @Parameterized.Parameters
        public static Iterable<Object[]> data() {
            return ImmutableList.builder().add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(4L), 0L, Double.valueOf(0.25d), 16L, ImmutableList.of(2L, 3L, 4L, 5L)}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(5L), 0L, Double.valueOf(0.25d), 16L, ImmutableList.of(2L, 3L, 5L)}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(3L), 0L, Double.valueOf(0.25d), 16L, ImmutableList.of(2L, 3L, 6L)}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(5L), 0L, Double.valueOf(0.25d), 16L, ImmutableList.of(5L, 6L, 7L)}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(3L), 0L, Double.valueOf(0.25d), 16L, ImmutableList.of(1L, 2L, 3L)}).add(new Object[]{BeamFnApi.ProcessBundleSplitResponse.getDefaultInstance(), 5L, Double.valueOf(0.25d), 16L, ImmutableList.of(1L, 2L, 3L)}).build();
        }

        @Test
        public void testChannelSplittingWithAllowedSplitPoints() throws Exception {
            ArrayList arrayList = new ArrayList();
            Objects.requireNonNull(arrayList);
            Assert.assertEquals(this.expectedResponse, BeamFnDataReadRunnerTest.executeSplit(BeamFnDataReadRunnerTest.createReadRunner((v1) -> {
                r0.add(v1);
            }, BeamFnDataReadRunnerTest.PTRANSFORM_ID), BeamFnDataReadRunnerTest.PTRANSFORM_ID, BeamFnDataReadRunnerTest.DEFAULT_BUNDLE_ID, this.index, this.fractionOfRemainder, this.bufferSize, this.allowedSplitPoints));
        }
    }

    @RunWith(Parameterized.class)
    /* loaded from: input_file:org/apache/beam/fn/harness/BeamFnDataReadRunnerTest$ElementSplitTest.class */
    public static class ElementSplitTest {

        @Parameterized.Parameter(0)
        public BeamFnApi.ProcessBundleSplitResponse expectedResponse;

        @Parameterized.Parameter(1)
        public long index;

        @Parameterized.Parameter(2)
        public double elementProgress;

        @Parameterized.Parameter(3)
        public double fractionOfRemainder;

        @Parameterized.Parameter(4)
        public long bufferSize;

        @Parameterized.Parameters
        public static Iterable<Object[]> data() {
            return ImmutableList.builder().add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(2L), 0L, 0, Double.valueOf(0.51d), 4L}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(2L), 0L, 0, Double.valueOf(0.49d), 4L}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(1L), 0L, 0, Double.valueOf(0.26d), 4L}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(1L), 0L, 0, Double.valueOf(0.25d), 4L}).add(new Object[]{BeamFnDataReadRunnerTest.elementSplitResult(0L, 0.8d), 0L, 0, Double.valueOf(0.2d), 4L}).add(new Object[]{BeamFnDataReadRunnerTest.elementSplitResult(0L, 0.5d), 0L, 0, Double.valueOf(0.125d), 4L}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(1L), 0L, Double.valueOf(0.5d), Double.valueOf(0.2d), 4L}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(3L), 2L, 0, Double.valueOf(0.6d), 4L}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(4L), 2L, Double.valueOf(0.9d), Double.valueOf(0.6d), 4L}).add(new Object[]{BeamFnDataReadRunnerTest.elementSplitResult(2L, 0.6d), 2L, Double.valueOf(0.5d), Double.valueOf(0.2d), 4L}).build();
        }

        @Test
        public void testElementSplit() throws Exception {
            SplittingReceiver splittingReceiver = (SplittingReceiver) Mockito.mock(SplittingReceiver.class);
            Mockito.when(Double.valueOf(splittingReceiver.getProgress())).thenReturn(Double.valueOf(this.elementProgress));
            Mockito.when(splittingReceiver.trySplit(org.mockito.Matchers.anyDouble())).thenCallRealMethod();
            Assert.assertEquals(this.expectedResponse, BeamFnDataReadRunnerTest.executeSplit(BeamFnDataReadRunnerTest.createReadRunner(splittingReceiver, BeamFnDataReadRunnerTest.PTRANSFORM_ID), BeamFnDataReadRunnerTest.PTRANSFORM_ID, BeamFnDataReadRunnerTest.DEFAULT_BUNDLE_ID, this.index, this.fractionOfRemainder, this.bufferSize, Collections.EMPTY_LIST));
        }
    }

    @RunWith(Parameterized.class)
    /* loaded from: input_file:org/apache/beam/fn/harness/BeamFnDataReadRunnerTest$ElementSplitWithAllowedSplitPointsTest.class */
    public static class ElementSplitWithAllowedSplitPointsTest {

        @Parameterized.Parameter(0)
        public BeamFnApi.ProcessBundleSplitResponse expectedResponse;

        @Parameterized.Parameter(1)
        public long index;

        @Parameterized.Parameter(2)
        public double elementProgress;

        @Parameterized.Parameter(3)
        public double fractionOfRemainder;

        @Parameterized.Parameter(4)
        public long bufferSize;

        @Parameterized.Parameter(5)
        public List<Long> allowedSplitPoints;

        @Parameterized.Parameters
        public static Iterable<Object[]> data() {
            return ImmutableList.builder().add(new Object[]{BeamFnDataReadRunnerTest.elementSplitResult(2L, 0.6d), 2L, 0, Double.valueOf(0.2d), 5L, ImmutableList.of(1L, 2L, 3L, 4L, 5L)}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(4L), 2L, 0, Double.valueOf(0.2d), 5L, ImmutableList.of(1L, 2L, 4L, 5L)}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(5L), 2L, 0, Double.valueOf(0.2d), 5L, ImmutableList.of(1L, 2L, 5L)}).add(new Object[]{BeamFnDataReadRunnerTest.channelSplitResult(3L), 2L, 0, Double.valueOf(0.2d), 5L, ImmutableList.of(1L, 3L, 4L, 5L)}).build();
        }

        @Test
        public void testElementSplittingWithAllowedSplitPoints() throws Exception {
            SplittingReceiver splittingReceiver = (SplittingReceiver) Mockito.mock(SplittingReceiver.class);
            Mockito.when(Double.valueOf(splittingReceiver.getProgress())).thenReturn(Double.valueOf(this.elementProgress));
            Mockito.when(splittingReceiver.trySplit(org.mockito.Matchers.anyDouble())).thenCallRealMethod();
            Assert.assertEquals(this.expectedResponse, BeamFnDataReadRunnerTest.executeSplit(BeamFnDataReadRunnerTest.createReadRunner(splittingReceiver, BeamFnDataReadRunnerTest.PTRANSFORM_ID), BeamFnDataReadRunnerTest.PTRANSFORM_ID, BeamFnDataReadRunnerTest.DEFAULT_BUNDLE_ID, this.index, this.fractionOfRemainder, this.bufferSize, this.allowedSplitPoints));
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/BeamFnDataReadRunnerTest$SplittingReceiver.class */
    private static abstract class SplittingReceiver implements FnDataReceiver<WindowedValue<String>>, HandlesSplits {
        private SplittingReceiver() {
        }

        public HandlesSplits.SplitResult trySplit(double d) {
            return HandlesSplits.SplitResult.of(Collections.singletonList(BeamFnApi.BundleApplication.newBuilder().setInputId(String.format("primary%.1f", Double.valueOf(d))).m606build()), Collections.singletonList(BeamFnApi.DelayedBundleApplication.newBuilder().setApplication(BeamFnApi.BundleApplication.newBuilder().setInputId(String.format("residual%.1f", Double.valueOf(1.0d - d))).m606build()).m654build()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static BeamFnDataReadRunner<String> createReadRunner(FnDataReceiver<WindowedValue<String>> fnDataReceiver, String str) throws Exception {
        PTransformRunnerFactoryTestContext build = PTransformRunnerFactoryTestContext.builder(str, RemoteGrpcPortRead.readFromPort(PORT_SPEC, "outputPC").toPTransform()).processBundleInstructionId(DEFAULT_BUNDLE_ID).pCollections(ImmutableMap.of("outputPC", RunnerApi.PCollection.newBuilder().setCoderId(ELEMENT_CODER_SPEC_ID).build())).coders(COMPONENTS.getCodersMap()).windowingStrategies(COMPONENTS.getWindowingStrategiesMap()).build();
        build.addPCollectionConsumer("outputPC", fnDataReceiver);
        return new BeamFnDataReadRunner.Factory().createRunnerForPTransform(build);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void assertIntermediateMonitoringDataDataChannelReadIndexEquals(ShortIdMap shortIdMap, Collection<BundleProgressReporter> collection, long j) {
        HashMap hashMap = new HashMap();
        Iterator<BundleProgressReporter> it = collection.iterator();
        while (it.hasNext()) {
            it.next().updateIntermediateMonitoringData(hashMap);
        }
        String orCreateShortId = shortIdMap.getOrCreateShortId(DATA_CHANNEL_READ_IDX_MONITORING_INFO);
        Assert.assertTrue(hashMap.containsKey(orCreateShortId));
        Assert.assertEquals(j, MonitoringInfoEncodings.decodeInt64Counter((ByteString) hashMap.get(orCreateShortId)));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void assertFinalMonitoringDataDataChannelReadIndexEquals(ShortIdMap shortIdMap, Collection<BundleProgressReporter> collection, long j) {
        HashMap hashMap = new HashMap();
        Iterator<BundleProgressReporter> it = collection.iterator();
        while (it.hasNext()) {
            it.next().updateFinalMonitoringData(hashMap);
        }
        String orCreateShortId = shortIdMap.getOrCreateShortId(DATA_CHANNEL_READ_IDX_MONITORING_INFO);
        Assert.assertTrue(hashMap.containsKey(orCreateShortId));
        Assert.assertEquals(j, MonitoringInfoEncodings.decodeInt64Counter((ByteString) hashMap.get(orCreateShortId)));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static BeamFnApi.ProcessBundleSplitResponse executeSplit(BeamFnDataReadRunner<String> beamFnDataReadRunner, String str, String str2, long j, double d, long j2, List<Long> list) throws Exception {
        long j3 = -1;
        while (true) {
            long j4 = j3;
            if (j4 >= j) {
                BeamFnApi.ProcessBundleSplitRequest m1844build = BeamFnApi.ProcessBundleSplitRequest.newBuilder().setInstructionId(str2).putDesiredSplits(str, BeamFnApi.ProcessBundleSplitRequest.DesiredSplit.newBuilder().setEstimatedInputElements(j2).setFractionOfRemainder(d).addAllAllowedSplitPoints(list).m1891build()).m1844build();
                BeamFnApi.ProcessBundleSplitResponse.Builder newBuilder = BeamFnApi.ProcessBundleSplitResponse.newBuilder();
                beamFnDataReadRunner.trySplit(m1844build, newBuilder);
                return newBuilder.build();
            }
            beamFnDataReadRunner.forwardElementToConsumer(WindowedValue.valueInGlobalWindow(Long.valueOf(j4).toString()));
            j3 = j4 + 1;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static BeamFnApi.ProcessBundleSplitResponse channelSplitResult(long j) {
        return BeamFnApi.ProcessBundleSplitResponse.newBuilder().addChannelSplits(BeamFnApi.ProcessBundleSplitResponse.ChannelSplit.newBuilder().setLastPrimaryElement(j - 1).setFirstResidualElement(j).build()).build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static BeamFnApi.ProcessBundleSplitResponse elementSplitResult(long j, double d) {
        return BeamFnApi.ProcessBundleSplitResponse.newBuilder().addPrimaryRoots(BeamFnApi.BundleApplication.newBuilder().setInputId(String.format("primary%.1f", Double.valueOf(d))).m606build()).addResidualRoots(BeamFnApi.DelayedBundleApplication.newBuilder().setApplication(BeamFnApi.BundleApplication.newBuilder().setInputId(String.format("residual%.1f", Double.valueOf(1.0d - d))).m606build()).m654build()).addChannelSplits(BeamFnApi.ProcessBundleSplitResponse.ChannelSplit.newBuilder().setLastPrimaryElement(j - 1).setFirstResidualElement(j + 1).build()).build();
    }

    static {
        try {
            RunnerApi.MessageWithComponents proto = CoderTranslation.toProto(CODER);
            CODER_SPEC = proto.getCoder();
            COMPONENTS = proto.getComponents().toBuilder().putCoders(CODER_SPEC_ID, CODER_SPEC).putCoders(ELEMENT_CODER_SPEC_ID, CoderTranslation.toProto(ELEMENT_CODER).getCoder()).build();
            DATA_CHANNEL_READ_IDX_MONITORING_INFO = MetricsApi.MonitoringInfo.newBuilder().setUrn(MonitoringInfoConstants.Urns.DATA_CHANNEL_READ_INDEX).setType(MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE).putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, INPUT_TRANSFORM_ID).build();
        } catch (IOException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
}
