package org.apache.beam.fn.harness;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.beam.fn.harness.PrecombineGroupingTable;
import org.apache.beam.runners.core.GlobalCombineFnRunner;
import org.apache.beam.runners.core.GlobalCombineFnRunners;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.test.TestExecutors;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ArrayListMultimap;
import org.apache.log4j.Priority;
import org.apache.log4j.spi.Configurator;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.hamcrest.core.Is;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/PrecombineGroupingTableTest.class */
public class PrecombineGroupingTableTest {

    @Rule
    public TestExecutors.TestExecutorService executorService = TestExecutors.from(Executors.newCachedThreadPool());
    private static final Combine.CombineFn<Integer, Long, Long> COMBINE_FN = new Combine.CombineFn<Integer, Long, Long>() { // from class: org.apache.beam.fn.harness.PrecombineGroupingTableTest.1
        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public Long m550createAccumulator() {
            return 0L;
        }

        public Long addInput(Long l, Integer num) {
            return Long.valueOf(l.longValue() + num.intValue());
        }

        public Long mergeAccumulators(Iterable<Long> iterable) {
            long j = 0;
            Iterator<Long> it = iterable.iterator();
            while (it.hasNext()) {
                j += it.next().longValue();
            }
            return Long.valueOf(j);
        }

        public Long compact(Long l) {
            return l.longValue() % 2 == 0 ? Long.valueOf(l.longValue() / 4) : l;
        }

        public Long extractOutput(Long l) {
            return l;
        }

        /* renamed from: mergeAccumulators, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m549mergeAccumulators(Iterable iterable) {
            return mergeAccumulators((Iterable<Long>) iterable);
        }
    };

    /* loaded from: input_file:org/apache/beam/fn/harness/PrecombineGroupingTableTest$TestOutputReceiver.class */
    private static class TestOutputReceiver<T> implements FnDataReceiver<T> {
        final List<T> outputElems;

        private TestOutputReceiver() {
            this.outputElems = new ArrayList();
        }

        @Override // org.apache.beam.sdk.fn.data.FnDataReceiver
        public void accept(T t) {
            this.outputElems.add(t);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/fn/harness/PrecombineGroupingTableTest$TestSizeEstimator.class */
    public static class TestSizeEstimator implements PrecombineGroupingTable.SizeEstimator {
        int calls;

        private TestSizeEstimator() {
            this.calls = 0;
        }

        public long estimateSize(Object obj) {
            this.calls++;
            if (obj instanceof PrecombineGroupingTable.GloballyWindowedTableGroupingKey) {
                obj = ((PrecombineGroupingTable.GloballyWindowedTableGroupingKey) obj).getStructuralKey();
            } else if (obj instanceof PrecombineGroupingTable.WindowedGroupingTableKey) {
                obj = ((PrecombineGroupingTable.WindowedGroupingTableKey) obj).getStructuralKey();
            } else if (obj instanceof PrecombineGroupingTable.GroupingTableEntry) {
                obj = ((PrecombineGroupingTable.GroupingTableEntry) obj).getAccumulator();
            }
            if (obj instanceof String) {
                return (long) Math.pow(10.0d, ((String) obj).length());
            }
            if (obj instanceof Long) {
                return ((Long) obj).longValue();
            }
            throw new IllegalArgumentException("Unknown type " + (obj == null ? Configurator.NULL : obj.getClass().toString()));
        }
    }

    @Test
    public void testCombiningInheritsOneOfTheValuesTimestamps() throws Exception {
        PrecombineGroupingTable precombineGroupingTable = new PrecombineGroupingTable(PipelineOptionsFactory.create(), Caches.forMaximumBytes(2500L), StringUtf8Coder.of(), GlobalCombineFnRunners.create(COMBINE_FN), new TestSizeEstimator(), false);
        TestOutputReceiver testOutputReceiver = new TestOutputReceiver();
        precombineGroupingTable.put(WindowedValue.timestampedValueInGlobalWindow(KV.of("A", 1), new Instant(1L)), testOutputReceiver);
        precombineGroupingTable.put(WindowedValue.timestampedValueInGlobalWindow(KV.of("B", 9), new Instant(21L)), testOutputReceiver);
        precombineGroupingTable.put(WindowedValue.timestampedValueInGlobalWindow(KV.of("A", 2), new Instant(1L)), testOutputReceiver);
        precombineGroupingTable.put(WindowedValue.timestampedValueInGlobalWindow(KV.of("B", 2), new Instant(20L)), testOutputReceiver);
        precombineGroupingTable.put(WindowedValue.timestampedValueInGlobalWindow(KV.of("A", 4), new Instant(1L)), testOutputReceiver);
        precombineGroupingTable.flush(testOutputReceiver);
        MatcherAssert.assertThat(testOutputReceiver.outputElems, (Matcher<? super Collection>) Matchers.containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(KV.of("A", 7L), new Instant(1L)), WindowedValue.timestampedValueInGlobalWindow(KV.of("B", 11L), new Instant(21L))));
    }

    @Test
    public void testCombiningGroupingTableHonorsKeyWeights() throws Exception {
        PrecombineGroupingTable precombineGroupingTable = new PrecombineGroupingTable(PipelineOptionsFactory.create(), Caches.forMaximumBytes(2500L), StringUtf8Coder.of(), GlobalCombineFnRunners.create(COMBINE_FN), new TestSizeEstimator(), false);
        TestOutputReceiver testOutputReceiver = new TestOutputReceiver();
        precombineGroupingTable.put(WindowedValue.valueInGlobalWindow(KV.of("AAA", 1)), testOutputReceiver);
        precombineGroupingTable.put(WindowedValue.valueInGlobalWindow(KV.of("AAA", 2)), testOutputReceiver);
        precombineGroupingTable.put(WindowedValue.valueInGlobalWindow(KV.of("AAA", 4)), testOutputReceiver);
        MatcherAssert.assertThat(testOutputReceiver.outputElems, (Matcher<? super Collection>) Matchers.empty());
        precombineGroupingTable.put(WindowedValue.valueInGlobalWindow(KV.of("BB", 509)), testOutputReceiver);
        precombineGroupingTable.put(WindowedValue.valueInGlobalWindow(KV.of("CCC", 11)), testOutputReceiver);
        MatcherAssert.assertThat(testOutputReceiver.outputElems, (Matcher<? super Collection>) Matchers.containsInAnyOrder(WindowedValue.valueInGlobalWindow(KV.of("AAA", 7L)), WindowedValue.valueInGlobalWindow(KV.of("BB", 509L))));
        precombineGroupingTable.flush(testOutputReceiver);
        MatcherAssert.assertThat(testOutputReceiver.outputElems, (Matcher<? super Collection>) Matchers.containsInAnyOrder(WindowedValue.valueInGlobalWindow(KV.of("AAA", 7L)), WindowedValue.valueInGlobalWindow(KV.of("BB", 509L)), WindowedValue.valueInGlobalWindow(KV.of("CCC", 11L))));
    }

    @Test
    public void testCombiningGroupingTableEvictsAllOnLargeEntry() throws Exception {
        PrecombineGroupingTable precombineGroupingTable = new PrecombineGroupingTable(PipelineOptionsFactory.create(), Caches.forMaximumBytes(2500L), StringUtf8Coder.of(), GlobalCombineFnRunners.create(COMBINE_FN), new TestSizeEstimator(), false);
        TestOutputReceiver testOutputReceiver = new TestOutputReceiver();
        precombineGroupingTable.put(WindowedValue.valueInGlobalWindow(KV.of("A", 1)), testOutputReceiver);
        precombineGroupingTable.put(WindowedValue.valueInGlobalWindow(KV.of("B", 3)), testOutputReceiver);
        precombineGroupingTable.put(WindowedValue.valueInGlobalWindow(KV.of("B", 6)), testOutputReceiver);
        precombineGroupingTable.put(WindowedValue.valueInGlobalWindow(KV.of("C", 7)), testOutputReceiver);
        MatcherAssert.assertThat(testOutputReceiver.outputElems, (Matcher<? super Collection>) Matchers.empty());
        precombineGroupingTable.put(WindowedValue.valueInGlobalWindow(KV.of("C", 9999)), testOutputReceiver);
        MatcherAssert.assertThat(testOutputReceiver.outputElems, (Matcher<? super Collection>) Matchers.containsInAnyOrder(WindowedValue.valueInGlobalWindow(KV.of("A", 1L)), WindowedValue.valueInGlobalWindow(KV.of("B", 9L)), WindowedValue.valueInGlobalWindow(KV.of("C", 2501L))));
        precombineGroupingTable.flush(testOutputReceiver);
        MatcherAssert.assertThat(testOutputReceiver.outputElems, (Matcher<? super Collection>) Matchers.containsInAnyOrder(WindowedValue.valueInGlobalWindow(KV.of("A", 1L)), WindowedValue.valueInGlobalWindow(KV.of("B", 9L)), WindowedValue.valueInGlobalWindow(KV.of("C", 2501L))));
    }

    @Test
    public void testCombiningGroupingTableCompactionSaves() throws Exception {
        PrecombineGroupingTable precombineGroupingTable = new PrecombineGroupingTable(PipelineOptionsFactory.create(), Caches.forMaximumBytes(2500L), StringUtf8Coder.of(), GlobalCombineFnRunners.create(COMBINE_FN), new TestSizeEstimator(), false);
        TestOutputReceiver testOutputReceiver = new TestOutputReceiver();
        precombineGroupingTable.put(WindowedValue.valueInGlobalWindow(KV.of("A", 804)), testOutputReceiver);
        precombineGroupingTable.put(WindowedValue.valueInGlobalWindow(KV.of("B", 904)), testOutputReceiver);
        precombineGroupingTable.put(WindowedValue.valueInGlobalWindow(KV.of("C", 1004)), testOutputReceiver);
        MatcherAssert.assertThat(testOutputReceiver.outputElems, (Matcher<? super Collection>) Matchers.empty());
        MatcherAssert.assertThat(Long.valueOf(precombineGroupingTable.getWeight()), (Matcher<? super Long>) Matchers.lessThan(2712L));
        precombineGroupingTable.flush(testOutputReceiver);
        MatcherAssert.assertThat(testOutputReceiver.outputElems, (Matcher<? super Collection>) Matchers.containsInAnyOrder(WindowedValue.valueInGlobalWindow(KV.of("A", 201L)), WindowedValue.valueInGlobalWindow(KV.of("B", 226L)), WindowedValue.valueInGlobalWindow(KV.of("C", 251L))));
    }

    @Test
    public void testCombiningGroupingTablePartialEviction() throws Exception {
        PrecombineGroupingTable precombineGroupingTable = new PrecombineGroupingTable(PipelineOptionsFactory.create(), Caches.forMaximumBytes(2500L), StringUtf8Coder.of(), GlobalCombineFnRunners.create(COMBINE_FN), new TestSizeEstimator(), false);
        TestOutputReceiver testOutputReceiver = new TestOutputReceiver();
        precombineGroupingTable.put(WindowedValue.valueInGlobalWindow(KV.of("A", 801)), testOutputReceiver);
        precombineGroupingTable.put(WindowedValue.valueInGlobalWindow(KV.of("B", 901)), testOutputReceiver);
        precombineGroupingTable.put(WindowedValue.valueInGlobalWindow(KV.of("C", 1001)), testOutputReceiver);
        MatcherAssert.assertThat(testOutputReceiver.outputElems, (Matcher<? super Collection>) Matchers.containsInAnyOrder(WindowedValue.valueInGlobalWindow(KV.of("A", 801L)), WindowedValue.valueInGlobalWindow(KV.of("B", 901L))));
        precombineGroupingTable.flush(testOutputReceiver);
        MatcherAssert.assertThat(testOutputReceiver.outputElems, (Matcher<? super Collection>) Matchers.containsInAnyOrder(WindowedValue.valueInGlobalWindow(KV.of("A", 801L)), WindowedValue.valueInGlobalWindow(KV.of("B", 901L)), WindowedValue.valueInGlobalWindow(KV.of("C", 1001L))));
    }

    @Test
    public void testCombiningGroupingTableEmitsCorrectValuesUnderHighCacheContention() throws Exception {
        Long[] lArr = new Long[1000];
        for (int i = 1; i <= 1000; i++) {
            lArr[i - 1] = Long.valueOf(i);
        }
        ArrayList arrayList = new ArrayList(1000);
        PipelineOptions create = PipelineOptionsFactory.create();
        GlobalCombineFnRunner create2 = GlobalCombineFnRunners.create(COMBINE_FN);
        Cache forMaximumBytes = Caches.forMaximumBytes(1000 * Priority.FATAL_INT);
        for (int i2 = 0; i2 < 1000; i2++) {
            int i3 = i2;
            arrayList.add(this.executorService.submit(() -> {
                ArrayListMultimap create3 = ArrayListMultimap.create();
                PrecombineGroupingTable precombineGroupingTable = new PrecombineGroupingTable(create, Caches.subCache(forMaximumBytes, Integer.valueOf(i3), new Object[0]), VarLongCoder.of(), create2, new TestSizeEstimator(), false);
                for (int i4 = 1; i4 <= 1000; i4++) {
                    precombineGroupingTable.put(WindowedValue.valueInGlobalWindow(KV.of(Long.valueOf(i4), Integer.valueOf(i4))), windowedValue -> {
                        create3.put((Long) ((KV) windowedValue.getValue()).getKey(), (Long) ((KV) windowedValue.getValue()).getValue());
                    });
                }
                for (int i5 = 1; i5 <= 1000; i5++) {
                    precombineGroupingTable.flush(windowedValue2 -> {
                        create3.put((Long) ((KV) windowedValue2.getValue()).getKey(), (Long) ((KV) windowedValue2.getValue()).getValue());
                    });
                }
                MatcherAssert.assertThat(create3.keySet(), (Matcher<? super Set>) Matchers.containsInAnyOrder(lArr));
                for (Map.Entry entry : create3.entries()) {
                    if (((Long) entry.getKey()).longValue() % 2 == 0) {
                        MatcherAssert.assertThat((Long) entry.getValue(), (Matcher<? super Long>) Matchers.equalTo(Long.valueOf(((Long) entry.getKey()).longValue() / 4)));
                    } else {
                        MatcherAssert.assertThat((Long) entry.getValue(), (Matcher<? super Long>) Matchers.equalTo((Long) entry.getKey()));
                    }
                }
                return null;
            }));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
    }

    @Test
    public void testSampleFlatSizes() throws Exception {
        TestSizeEstimator testSizeEstimator = new TestSizeEstimator();
        PrecombineGroupingTable.SamplingSizeEstimator samplingSizeEstimator = new PrecombineGroupingTable.SamplingSizeEstimator(testSizeEstimator, 0.05d, 1.0d, 10L, new Random(1L));
        for (int i = 0; i < 10; i++) {
            Assert.assertEquals(100L, samplingSizeEstimator.estimateSize(100L));
            Assert.assertEquals(i + 1, testSizeEstimator.calls);
        }
        for (int i2 = 10; i2 < 20; i2++) {
            Assert.assertEquals(100L, samplingSizeEstimator.estimateSize(100L));
        }
        MatcherAssert.assertThat(Integer.valueOf(testSizeEstimator.calls), between(11, 19));
        int i3 = testSizeEstimator.calls;
        for (int i4 = 20; i4 < 1020; i4++) {
            Assert.assertEquals(100L, samplingSizeEstimator.estimateSize(100L));
        }
        MatcherAssert.assertThat(Integer.valueOf(testSizeEstimator.calls - i3), between(40, 60));
    }

    @Test
    public void testSampleBoringSizes() throws Exception {
        TestSizeEstimator testSizeEstimator = new TestSizeEstimator();
        PrecombineGroupingTable.SamplingSizeEstimator samplingSizeEstimator = new PrecombineGroupingTable.SamplingSizeEstimator(testSizeEstimator, 0.05d, 1.0d, 10L, new Random(1L));
        for (int i = 0; i < 10; i += 2) {
            Assert.assertEquals(100L, samplingSizeEstimator.estimateSize(100L));
            Assert.assertEquals(102L, samplingSizeEstimator.estimateSize(102L));
            Assert.assertEquals(i + 2, testSizeEstimator.calls);
        }
        for (int i2 = 10; i2 < 20; i2 += 2) {
            MatcherAssert.assertThat(Long.valueOf(samplingSizeEstimator.estimateSize(100L)), between(100L, 102L));
            MatcherAssert.assertThat(Long.valueOf(samplingSizeEstimator.estimateSize(102L)), between(100L, 102L));
        }
        MatcherAssert.assertThat(Integer.valueOf(testSizeEstimator.calls), between(11, 19));
        int i3 = testSizeEstimator.calls;
        for (int i4 = 20; i4 < 1020; i4 += 2) {
            MatcherAssert.assertThat(Long.valueOf(samplingSizeEstimator.estimateSize(100L)), between(100L, 102L));
            MatcherAssert.assertThat(Long.valueOf(samplingSizeEstimator.estimateSize(102L)), between(100L, 102L));
        }
        MatcherAssert.assertThat(Integer.valueOf(testSizeEstimator.calls - i3), between(40, 60));
    }

    @Test
    public void testSampleHighVarianceSizes() throws Exception {
        List asList = Arrays.asList(1L, 10L, 100L, 1000L);
        TestSizeEstimator testSizeEstimator = new TestSizeEstimator();
        PrecombineGroupingTable.SamplingSizeEstimator samplingSizeEstimator = new PrecombineGroupingTable.SamplingSizeEstimator(testSizeEstimator, 0.1d, 0.2d, 10L, new Random(1L));
        for (int i = 0; i < 10; i++) {
            long longValue = ((Long) asList.get(i % asList.size())).longValue();
            Assert.assertEquals(longValue, samplingSizeEstimator.estimateSize(Long.valueOf(longValue)));
            Assert.assertEquals(i + 1, testSizeEstimator.calls);
        }
        for (int i2 = 10; i2 < 20; i2++) {
            long longValue2 = ((Long) asList.get(i2 % asList.size())).longValue();
            Assert.assertEquals(longValue2, samplingSizeEstimator.estimateSize(Long.valueOf(longValue2)));
            Assert.assertEquals(i2 + 1, testSizeEstimator.calls);
        }
        for (int i3 = 20; i3 < 500; i3++) {
            samplingSizeEstimator.estimateSize(asList.get(i3 % asList.size()));
        }
        int i4 = testSizeEstimator.calls;
        for (int i5 = 500; i5 < 1500; i5++) {
            MatcherAssert.assertThat(Long.valueOf(samplingSizeEstimator.estimateSize(Long.valueOf(((Long) asList.get(i5 % asList.size())).longValue()))), Matchers.anyOf(Is.is(Matchers.in(asList)), between(250L, 350L)));
        }
        MatcherAssert.assertThat(Integer.valueOf(testSizeEstimator.calls - i4), between(180, 220));
        for (int i6 = 1500; i6 < 3000; i6++) {
            samplingSizeEstimator.estimateSize(asList.get(i6 % asList.size()));
        }
        int i7 = testSizeEstimator.calls;
        for (int i8 = 3000; i8 < 4000; i8++) {
            MatcherAssert.assertThat(Long.valueOf(samplingSizeEstimator.estimateSize(Long.valueOf(((Long) asList.get(i8 % asList.size())).longValue()))), Matchers.anyOf(Is.is(Matchers.in(asList)), between(250L, 350L)));
        }
        MatcherAssert.assertThat(Integer.valueOf(testSizeEstimator.calls - i7), between(90, 110));
    }

    @Test
    public void testSampleChangingSizes() throws Exception {
        TestSizeEstimator testSizeEstimator = new TestSizeEstimator();
        PrecombineGroupingTable.SamplingSizeEstimator samplingSizeEstimator = new PrecombineGroupingTable.SamplingSizeEstimator(testSizeEstimator, 0.05d, 1.0d, 10L, new Random(1L));
        for (int i = 0; i < 10; i++) {
            Assert.assertEquals(100L, samplingSizeEstimator.estimateSize(100L));
            Assert.assertEquals(i + 1, testSizeEstimator.calls);
        }
        for (int i2 = 10; i2 < 20; i2++) {
            Assert.assertEquals(100L, samplingSizeEstimator.estimateSize(100L));
        }
        MatcherAssert.assertThat(Integer.valueOf(testSizeEstimator.calls), between(11, 19));
        int i3 = testSizeEstimator.calls;
        for (int i4 = 20; i4 < 1020; i4++) {
            Assert.assertEquals(100L, samplingSizeEstimator.estimateSize(100L));
        }
        MatcherAssert.assertThat(Integer.valueOf(testSizeEstimator.calls - i3), between(40, 60));
        do {
        } while (samplingSizeEstimator.estimateSize(1000000L) == 100);
        Assert.assertEquals(99L, samplingSizeEstimator.estimateSize(99L));
    }

    private static <T extends Comparable<T>> TypeSafeDiagnosingMatcher<T> between(final T t, final T t2) {
        return (TypeSafeDiagnosingMatcher<T>) new TypeSafeDiagnosingMatcher<T>() { // from class: org.apache.beam.fn.harness.PrecombineGroupingTableTest.2
            @Override // org.hamcrest.SelfDescribing
            public void describeTo(Description description) {
                description.appendText("is between " + t + " and " + t2);
            }

            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Incorrect types in method signature: (TT;Lorg/hamcrest/Description;)Z */
            @Override // org.hamcrest.TypeSafeDiagnosingMatcher
            public boolean matchesSafely(Comparable comparable, Description description) {
                return t.compareTo(comparable) <= 0 && comparable.compareTo(t2) <= 0;
            }
        };
    }
}
