package org.apache.beam.fn.harness.debug;

import com.google.common.collect.testing.SampleElements;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/debug/DataSamplerTest.class */
public class DataSamplerTest {
    byte[] encodeInt(Integer num) throws IOException {
        VarIntCoder of = VarIntCoder.of();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        of.encode(num, byteArrayOutputStream, Coder.Context.NESTED);
        return byteArrayOutputStream.toByteArray();
    }

    byte[] encodeString(String str) throws IOException {
        StringUtf8Coder of = StringUtf8Coder.of();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        of.encode(str, byteArrayOutputStream, Coder.Context.NESTED);
        return byteArrayOutputStream.toByteArray();
    }

    byte[] encodeByteArray(byte[] bArr) throws IOException {
        ByteArrayCoder of = ByteArrayCoder.of();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        of.encode(bArr, byteArrayOutputStream, Coder.Context.NESTED);
        return byteArrayOutputStream.toByteArray();
    }

    <T> WindowedValue<T> globalWindowedValue(T t) {
        return WindowedValue.valueInGlobalWindow(t);
    }

    BeamFnApi.InstructionResponse getAllSamples(DataSampler dataSampler) {
        return dataSampler.handleDataSampleRequest(BeamFnApi.InstructionRequest.newBuilder().setSampleData(BeamFnApi.SampleDataRequest.newBuilder().build()).build()).build();
    }

    BeamFnApi.InstructionResponse getSamplesForPCollection(DataSampler dataSampler, String str) {
        return dataSampler.handleDataSampleRequest(BeamFnApi.InstructionRequest.newBuilder().setSampleData(BeamFnApi.SampleDataRequest.newBuilder().addPcollectionIds(str).build()).build()).build();
    }

    BeamFnApi.InstructionResponse getSamplesForPCollections(DataSampler dataSampler, Iterable<String> iterable) {
        return dataSampler.handleDataSampleRequest(BeamFnApi.InstructionRequest.newBuilder().setSampleData(BeamFnApi.SampleDataRequest.newBuilder().addAllPcollectionIds(iterable).build()).build()).build();
    }

    void assertHasSamples(BeamFnApi.InstructionResponse instructionResponse, String str, Iterable<byte[]> iterable) {
        Map elementSamplesMap = instructionResponse.getSampleData().getElementSamplesMap();
        Assert.assertFalse(elementSamplesMap.isEmpty());
        BeamFnApi.SampleDataResponse.ElementList elementList = (BeamFnApi.SampleDataResponse.ElementList) elementSamplesMap.get(str);
        Assert.assertNotNull(elementList);
        ArrayList arrayList = new ArrayList();
        Iterator<byte[]> it = iterable.iterator();
        while (it.hasNext()) {
            arrayList.add(BeamFnApi.SampledElement.newBuilder().setElement(ByteString.copyFrom(it.next())).build());
        }
        Assert.assertTrue(elementList.getElementsList().containsAll(arrayList));
    }

    void assertHasSamples(BeamFnApi.InstructionResponse instructionResponse, String str, List<BeamFnApi.SampledElement> list) {
        Map elementSamplesMap = instructionResponse.getSampleData().getElementSamplesMap();
        Assert.assertFalse(elementSamplesMap.isEmpty());
        BeamFnApi.SampleDataResponse.ElementList elementList = (BeamFnApi.SampleDataResponse.ElementList) elementSamplesMap.get(str);
        Assert.assertNotNull(elementList);
        Assert.assertTrue(elementList.getElementsList().containsAll(list));
    }

    @Test
    public void testSingleOutput() throws Exception {
        DataSampler dataSampler = new DataSampler();
        dataSampler.sampleOutput("pcollection-id", VarIntCoder.of()).sample(globalWindowedValue(1));
        assertHasSamples(getAllSamples(dataSampler), "pcollection-id", Collections.singleton(encodeInt(1)));
    }

    @Test
    public void testNestedContext() throws Exception {
        DataSampler dataSampler = new DataSampler();
        byte[] bytes = "hello".getBytes(StandardCharsets.US_ASCII);
        dataSampler.sampleOutput("pcollection-id", ByteArrayCoder.of()).sample(globalWindowedValue(bytes));
        assertHasSamples(getAllSamples(dataSampler), "pcollection-id", Collections.singleton(encodeByteArray(bytes)));
    }

    @Test
    public void testMultipleOutputs() throws Exception {
        DataSampler dataSampler = new DataSampler();
        VarIntCoder of = VarIntCoder.of();
        dataSampler.sampleOutput("pcollection-id-1", of).sample(globalWindowedValue(1));
        dataSampler.sampleOutput("pcollection-id-2", of).sample(globalWindowedValue(2));
        BeamFnApi.InstructionResponse allSamples = getAllSamples(dataSampler);
        assertHasSamples(allSamples, "pcollection-id-1", Collections.singleton(encodeInt(1)));
        assertHasSamples(allSamples, "pcollection-id-2", Collections.singleton(encodeInt(2)));
    }

    @Test
    public void testMultipleSamePCollections() throws Exception {
        DataSampler dataSampler = new DataSampler();
        VarIntCoder of = VarIntCoder.of();
        dataSampler.sampleOutput("pcollection-id", of).sample(globalWindowedValue(1));
        dataSampler.sampleOutput("pcollection-id", of).sample(globalWindowedValue(2));
        assertHasSamples(getAllSamples(dataSampler), "pcollection-id", (Iterable<byte[]>) ImmutableList.of(encodeInt(1), encodeInt(2)));
    }

    void generateStringSamples(DataSampler dataSampler) {
        StringUtf8Coder of = StringUtf8Coder.of();
        dataSampler.sampleOutput(SampleElements.Strings.MIN_ELEMENT, of).sample(globalWindowedValue("a1"));
        dataSampler.sampleOutput(SampleElements.Strings.MIN_ELEMENT, of).sample(globalWindowedValue("a2"));
        dataSampler.sampleOutput("b", of).sample(globalWindowedValue("b1"));
        dataSampler.sampleOutput("b", of).sample(globalWindowedValue("b2"));
        dataSampler.sampleOutput("c", of).sample(globalWindowedValue("c1"));
        dataSampler.sampleOutput("c", of).sample(globalWindowedValue("c2"));
    }

    @Test
    public void testFiltersSinglePCollectionId() throws Exception {
        DataSampler dataSampler = new DataSampler(10, 10, false);
        generateStringSamples(dataSampler);
        assertHasSamples(getSamplesForPCollection(dataSampler, SampleElements.Strings.MIN_ELEMENT), SampleElements.Strings.MIN_ELEMENT, (Iterable<byte[]>) ImmutableList.of(encodeString("a1"), encodeString("a2")));
    }

    @Test
    public void testFiltersMultiplePCollectionIds() throws Exception {
        ImmutableList of = ImmutableList.of(SampleElements.Strings.MIN_ELEMENT, "c");
        DataSampler dataSampler = new DataSampler(10, 10, false);
        generateStringSamples(dataSampler);
        BeamFnApi.InstructionResponse samplesForPCollections = getSamplesForPCollections(dataSampler, of);
        MatcherAssert.assertThat(Integer.valueOf(samplesForPCollections.getSampleData().getElementSamplesMap().size()), (Matcher<? super Integer>) Matchers.equalTo(2));
        assertHasSamples(samplesForPCollections, SampleElements.Strings.MIN_ELEMENT, (Iterable<byte[]>) ImmutableList.of(encodeString("a1"), encodeString("a2")));
        assertHasSamples(samplesForPCollections, "c", (Iterable<byte[]>) ImmutableList.of(encodeString("c1"), encodeString("c2")));
    }

    @Test
    public void testConcurrentNewSampler() throws Exception {
        DataSampler dataSampler = new DataSampler();
        VarIntCoder of = VarIntCoder.of();
        Thread[] threadArr = new Thread[100];
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(threadArr.length);
        for (int i = 0; i < threadArr.length; i++) {
            threadArr[i] = new Thread(() -> {
                try {
                    countDownLatch.await();
                    for (int i2 = 0; i2 < 100; i2++) {
                        dataSampler.sampleOutput("pcollection-" + i2, of).sample(globalWindowedValue(0));
                    }
                    countDownLatch2.countDown();
                } catch (InterruptedException e) {
                }
            });
            threadArr[i].start();
        }
        countDownLatch.countDown();
        while (countDownLatch2.getCount() > 0) {
            dataSampler.handleDataSampleRequest(BeamFnApi.InstructionRequest.newBuilder().setSampleData(BeamFnApi.SampleDataRequest.newBuilder()).build());
        }
        for (Thread thread : threadArr) {
            thread.join();
        }
    }

    @Test
    public void testEnableAlwaysOnExceptionSampling() throws Exception {
        ExperimentalOptions as = PipelineOptionsFactory.as(ExperimentalOptions.class);
        as.setExperiments(Collections.singletonList("enable_always_on_exception_sampling"));
        DataSampler create = DataSampler.create(as);
        Assert.assertNotNull(create);
        OutputSampler sampleOutput = create.sampleOutput("pcollection-id", VarIntCoder.of());
        sampleOutput.exception(sampleOutput.sample(globalWindowedValue(1)), new RuntimeException(), "", "");
        sampleOutput.sample(globalWindowedValue(2));
        assertHasSamples(getAllSamples(create), "pcollection-id", (List<BeamFnApi.SampledElement>) ImmutableList.of(BeamFnApi.SampledElement.newBuilder().setElement(ByteString.copyFrom(encodeInt(1))).setException(BeamFnApi.SampledElement.Exception.newBuilder().setError(new RuntimeException().toString())).build()));
    }

    @Test
    public void testDisableAlwaysOnExceptionSampling() throws Exception {
        ExperimentalOptions as = PipelineOptionsFactory.as(ExperimentalOptions.class);
        as.setExperiments(ImmutableList.of("enable_always_on_exception_sampling", "disable_always_on_exception_sampling"));
        Assert.assertNull(DataSampler.create(as));
    }

    @Test
    public void testDisableAlwaysOnExceptionSamplingWithEnableDataSampling() throws Exception {
        ExperimentalOptions as = PipelineOptionsFactory.as(ExperimentalOptions.class);
        as.setExperiments(ImmutableList.of("enable_data_sampling", "enable_always_on_exception_sampling", "disable_always_on_exception_sampling"));
        DataSampler create = DataSampler.create(as);
        Assert.assertNotNull(create);
        OutputSampler sampleOutput = create.sampleOutput("pcollection-id", VarIntCoder.of());
        sampleOutput.exception(sampleOutput.sample(globalWindowedValue(1)), new RuntimeException(), "", "");
        sampleOutput.sample(globalWindowedValue(2));
        assertHasSamples(getAllSamples(create), "pcollection-id", (List<BeamFnApi.SampledElement>) ImmutableList.of(BeamFnApi.SampledElement.newBuilder().setElement(ByteString.copyFrom(encodeInt(1))).setException(BeamFnApi.SampledElement.Exception.newBuilder().setError(new RuntimeException().toString())).build()));
    }
}
