package org.apache.beam.sdk.transforms;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.TestUtils;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.repackaged.com.google.common.base.MoreObjects;
import org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableSet;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/transforms/CombineTest.class */
public class CombineTest implements Serializable {

    @Mock
    private DoFn<?, ?>.ProcessContext processContext;

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    static final List<KV<String, Integer>> TABLE = Arrays.asList(KV.of("a", 1), KV.of("a", 1), KV.of("a", 4), KV.of("b", 1), KV.of("b", 13));
    static final List<KV<String, Integer>> EMPTY_TABLE = Collections.emptyList();
    private static final SerializableFunction<String, Integer> hotKeyFanout = new SerializableFunction<String, Integer>() { // from class: org.apache.beam.sdk.transforms.CombineTest.2
        public Integer apply(String str) {
            return Integer.valueOf(str.equals("a") ? 3 : 0);
        }
    };
    private static final SerializableFunction<String, Integer> splitHotKeyFanout = new SerializableFunction<String, Integer>() { // from class: org.apache.beam.sdk.transforms.CombineTest.3
        public Integer apply(String str) {
            return Integer.valueOf(Math.random() < 0.5d ? 3 : 0);
        }
    };

    /* loaded from: input_file:org/apache/beam/sdk/transforms/CombineTest$FormatPaneInfo.class */
    private static class FormatPaneInfo extends DoFn<Integer, String> {
        private FormatPaneInfo() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Integer, String>.ProcessContext processContext) {
            processContext.output(processContext.element() + ": " + processContext.pane().isLast());
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/CombineTest$GetLast.class */
    private static class GetLast extends DoFn<Integer, Integer> {
        private GetLast() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
            if (processContext.pane().isLast()) {
                processContext.output(processContext.element());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/transforms/CombineTest$MeanInts.class */
    public static class MeanInts extends Combine.AccumulatingCombineFn<Integer, CountSum, Double> {
        private static final Coder<Long> LONG_CODER = BigEndianLongCoder.of();
        private static final Coder<Double> DOUBLE_CODER = DoubleCoder.of();

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/sdk/transforms/CombineTest$MeanInts$CountSum.class */
        public class CountSum implements Combine.AccumulatingCombineFn.Accumulator<Integer, CountSum, Double> {
            long count;
            double sum;

            CountSum(long j, double d) {
                this.count = 0L;
                this.sum = 0.0d;
                this.count = j;
                this.sum = d;
            }

            public void addInput(Integer num) {
                this.count++;
                this.sum += num.doubleValue();
            }

            public void mergeAccumulator(CountSum countSum) {
                this.count += countSum.count;
                this.sum += countSum.sum;
            }

            /* renamed from: extractOutput, reason: merged with bridge method [inline-methods] */
            public Double m107extractOutput() {
                return Double.valueOf(this.count == 0 ? 0.0d : this.sum / this.count);
            }

            public int hashCode() {
                return Objects.hash(Long.valueOf(this.count), Double.valueOf(this.sum));
            }

            public boolean equals(Object obj) {
                if (obj == this) {
                    return true;
                }
                if (!(obj instanceof CountSum)) {
                    return false;
                }
                CountSum countSum = (CountSum) obj;
                return this.count == countSum.count && Math.abs(this.sum - countSum.sum) < 0.1d;
            }

            public String toString() {
                return MoreObjects.toStringHelper(this).add("count", this.count).add("sum", this.sum).toString();
            }
        }

        /* loaded from: input_file:org/apache/beam/sdk/transforms/CombineTest$MeanInts$CountSumCoder.class */
        private class CountSumCoder extends AtomicCoder<CountSum> {
            private CountSumCoder() {
            }

            public void encode(CountSum countSum, OutputStream outputStream) throws CoderException, IOException {
                MeanInts.LONG_CODER.encode(Long.valueOf(countSum.count), outputStream);
                MeanInts.DOUBLE_CODER.encode(Double.valueOf(countSum.sum), outputStream);
            }

            /* renamed from: decode, reason: merged with bridge method [inline-methods] */
            public CountSum m108decode(InputStream inputStream) throws CoderException, IOException {
                return new CountSum(((Long) MeanInts.LONG_CODER.decode(inputStream)).longValue(), ((Double) MeanInts.DOUBLE_CODER.decode(inputStream)).doubleValue());
            }

            public void verifyDeterministic() throws Coder.NonDeterministicException {
            }

            public boolean isRegisterByteSizeObserverCheap(CountSum countSum) {
                return true;
            }

            public void registerByteSizeObserver(CountSum countSum, ElementByteSizeObserver elementByteSizeObserver) throws Exception {
                MeanInts.LONG_CODER.registerByteSizeObserver(Long.valueOf(countSum.count), elementByteSizeObserver);
                MeanInts.DOUBLE_CODER.registerByteSizeObserver(Double.valueOf(countSum.sum), elementByteSizeObserver);
            }
        }

        private MeanInts() {
        }

        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public CountSum m106createAccumulator() {
            return new CountSum(0L, 0.0d);
        }

        public Coder<CountSum> getAccumulatorCoder(CoderRegistry coderRegistry, Coder<Integer> coder) {
            return new CountSumCoder();
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/CombineTest$NullCombiner.class */
    private static final class NullCombiner extends Combine.BinaryCombineFn<Integer> {
        private NullCombiner() {
        }

        public Integer apply(Integer num, Integer num2) {
            return Integer.valueOf((num == null ? 2 : num.intValue()) * (num2 == null ? 2 : num2.intValue()));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/CombineTest$SumInts.class */
    public static class SumInts implements SerializableFunction<Iterable<Integer>, Integer> {
        public Integer apply(Iterable<Integer> iterable) {
            int i = 0;
            Iterator<Integer> it = iterable.iterator();
            while (it.hasNext()) {
                i += it.next().intValue();
            }
            return Integer.valueOf(i);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/CombineTest$TestCombineFn.class */
    public static class TestCombineFn extends Combine.CombineFn<Integer, Accumulator, String> {

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/sdk/transforms/CombineTest$TestCombineFn$Accumulator.class */
        public static class Accumulator {
            String value;

            public Accumulator(String str) {
                this.value = str;
            }

            public static Coder<Accumulator> getCoder() {
                return new AtomicCoder<Accumulator>() { // from class: org.apache.beam.sdk.transforms.CombineTest.TestCombineFn.Accumulator.1
                    public void encode(Accumulator accumulator, OutputStream outputStream) throws CoderException, IOException {
                        encode(accumulator, outputStream, Coder.Context.NESTED);
                    }

                    public void encode(Accumulator accumulator, OutputStream outputStream, Coder.Context context) throws CoderException, IOException {
                        StringUtf8Coder.of().encode(accumulator.value, outputStream, context);
                    }

                    /* renamed from: decode, reason: merged with bridge method [inline-methods] */
                    public Accumulator m112decode(InputStream inputStream) throws CoderException, IOException {
                        return m111decode(inputStream, Coder.Context.NESTED);
                    }

                    /* renamed from: decode, reason: merged with bridge method [inline-methods] */
                    public Accumulator m111decode(InputStream inputStream, Coder.Context context) throws CoderException, IOException {
                        return new Accumulator(StringUtf8Coder.of().decode(inputStream, context));
                    }
                };
            }
        }

        public Coder<Accumulator> getAccumulatorCoder(CoderRegistry coderRegistry, Coder<Integer> coder) {
            return Accumulator.getCoder();
        }

        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public Accumulator m110createAccumulator() {
            return new Accumulator("");
        }

        public Accumulator addInput(Accumulator accumulator, Integer num) {
            try {
                Accumulator accumulator2 = new Accumulator(accumulator.value + String.valueOf(num));
                accumulator.value = "cleared in addInput";
                return accumulator2;
            } catch (Throwable th) {
                accumulator.value = "cleared in addInput";
                throw th;
            }
        }

        public Accumulator mergeAccumulators(Iterable<Accumulator> iterable) {
            String str = "";
            for (Accumulator accumulator : iterable) {
                str = str + accumulator.value;
                accumulator.value = "cleared in mergeAccumulators";
            }
            return new Accumulator(str);
        }

        public String extractOutput(Accumulator accumulator) {
            char[] charArray = accumulator.value.toCharArray();
            Arrays.sort(charArray);
            return new String(charArray);
        }

        /* renamed from: mergeAccumulators, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m109mergeAccumulators(Iterable iterable) {
            return mergeAccumulators((Iterable<Accumulator>) iterable);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/CombineTest$TestCombineFnWithContext.class */
    public class TestCombineFnWithContext extends CombineWithContext.CombineFnWithContext<Integer, TestCombineFn.Accumulator, String> {
        private final PCollectionView<Integer> view;

        public TestCombineFnWithContext(PCollectionView<Integer> pCollectionView) {
            this.view = pCollectionView;
        }

        public Coder<TestCombineFn.Accumulator> getAccumulatorCoder(CoderRegistry coderRegistry, Coder<Integer> coder) {
            return TestCombineFn.Accumulator.getCoder();
        }

        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public TestCombineFn.Accumulator m114createAccumulator(CombineWithContext.Context context) {
            return new TestCombineFn.Accumulator(((Integer) context.sideInput(this.view)).toString());
        }

        public TestCombineFn.Accumulator addInput(TestCombineFn.Accumulator accumulator, Integer num, CombineWithContext.Context context) {
            try {
                Assert.assertThat(accumulator.value, Matchers.startsWith(((Integer) context.sideInput(this.view)).toString()));
                TestCombineFn.Accumulator accumulator2 = new TestCombineFn.Accumulator(accumulator.value + String.valueOf(num));
                accumulator.value = "cleared in addInput";
                return accumulator2;
            } catch (Throwable th) {
                accumulator.value = "cleared in addInput";
                throw th;
            }
        }

        public TestCombineFn.Accumulator mergeAccumulators(Iterable<TestCombineFn.Accumulator> iterable, CombineWithContext.Context context) {
            String num = ((Integer) context.sideInput(this.view)).toString();
            String str = num;
            for (TestCombineFn.Accumulator accumulator : iterable) {
                Assert.assertThat(accumulator.value, Matchers.startsWith(num));
                str = str + accumulator.value.substring(num.length());
                accumulator.value = "cleared in mergeAccumulators";
            }
            return new TestCombineFn.Accumulator(str);
        }

        public String extractOutput(TestCombineFn.Accumulator accumulator, CombineWithContext.Context context) {
            Assert.assertThat(accumulator.value, Matchers.startsWith(((Integer) context.sideInput(this.view)).toString()));
            char[] charArray = accumulator.value.toCharArray();
            Arrays.sort(charArray);
            return new String(charArray);
        }

        /* renamed from: mergeAccumulators, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m113mergeAccumulators(Iterable iterable, CombineWithContext.Context context) {
            return mergeAccumulators((Iterable<TestCombineFn.Accumulator>) iterable, context);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/CombineTest$TestCounter.class */
    public static class TestCounter extends Combine.AccumulatingCombineFn<Integer, Counter, Iterable<Long>> {

        /* loaded from: input_file:org/apache/beam/sdk/transforms/CombineTest$TestCounter$Counter.class */
        public class Counter implements Combine.AccumulatingCombineFn.Accumulator<Integer, Counter, Iterable<Long>>, Serializable {
            public long sum;
            public long inputs;
            public long merges;
            public long outputs;

            public Counter(long j, long j2, long j3, long j4) {
                this.sum = 0L;
                this.inputs = 0L;
                this.merges = 0L;
                this.outputs = 0L;
                this.sum = j;
                this.inputs = j2;
                this.merges = j3;
                this.outputs = j4;
            }

            public void addInput(Integer num) {
                Preconditions.checkState(this.merges == 0);
                Preconditions.checkState(this.outputs == 0);
                this.inputs++;
                this.sum += num.intValue();
            }

            public void mergeAccumulator(Counter counter) {
                Preconditions.checkState(this.outputs == 0);
                Preconditions.checkArgument(counter.outputs == 0);
                this.merges += counter.merges + 1;
                this.inputs += counter.inputs;
                this.sum += counter.sum;
            }

            /* renamed from: extractOutput, reason: merged with bridge method [inline-methods] */
            public Iterable<Long> m116extractOutput() {
                Preconditions.checkState(this.outputs == 0);
                return Arrays.asList(Long.valueOf(this.sum), Long.valueOf(this.inputs), Long.valueOf(this.merges), Long.valueOf(this.outputs));
            }

            public int hashCode() {
                return (int) ((this.sum * 17) + (this.inputs * 31) + (this.merges * 43) + (this.outputs * 181));
            }

            public boolean equals(Object obj) {
                if (!(obj instanceof Counter)) {
                    return false;
                }
                Counter counter = (Counter) obj;
                return this.sum == counter.sum && this.inputs == counter.inputs && this.merges == counter.merges && this.outputs == counter.outputs;
            }

            public String toString() {
                return this.sum + ":" + this.inputs + ":" + this.merges + ":" + this.outputs;
            }
        }

        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public Counter m115createAccumulator() {
            return new Counter(0L, 0L, 0L, 0L);
        }

        public Coder<Counter> getAccumulatorCoder(CoderRegistry coderRegistry, Coder<Integer> coder) {
            return SerializableCoder.of(Counter.class);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/CombineTest$TestProdInt.class */
    private static final class TestProdInt extends Combine.BinaryCombineIntegerFn {
        private TestProdInt() {
        }

        public int apply(int i, int i2) {
            return i * i2;
        }

        public int identity() {
            return 1;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/CombineTest$TestProdObj.class */
    private static final class TestProdObj extends Combine.BinaryCombineFn<Integer> {
        private TestProdObj() {
        }

        public Integer apply(Integer num, Integer num2) {
            return Integer.valueOf(num.intValue() * num2.intValue());
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/CombineTest$UniqueInts.class */
    public static class UniqueInts extends Combine.CombineFn<Integer, Set<Integer>, Set<Integer>> {
        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public Set<Integer> m118createAccumulator() {
            return new HashSet();
        }

        public Set<Integer> addInput(Set<Integer> set, Integer num) {
            set.add(num);
            return set;
        }

        public Set<Integer> mergeAccumulators(Iterable<Set<Integer>> iterable) {
            HashSet hashSet = new HashSet();
            Iterator<Set<Integer>> it = iterable.iterator();
            while (it.hasNext()) {
                hashSet.addAll(it.next());
            }
            return hashSet;
        }

        public Set<Integer> extractOutput(Set<Integer> set) {
            return set;
        }

        /* renamed from: mergeAccumulators, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m117mergeAccumulators(Iterable iterable) {
            return mergeAccumulators((Iterable<Set<Integer>>) iterable);
        }
    }

    PCollection<KV<String, Integer>> createInput(Pipeline pipeline, List<KV<String, Integer>> list) {
        return pipeline.apply(Create.of(list).withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
    }

    private void runTestSimpleCombine(List<KV<String, Integer>> list, int i, List<KV<String, String>> list2) {
        PCollection<KV<String, Integer>> createInput = createInput(this.pipeline, list);
        PCollection apply = createInput.apply(Values.create()).apply(Combine.globally(new SumInts()));
        PCollection apply2 = createInput.apply(Combine.perKey(new TestCombineFn()));
        PAssert.that(apply).containsInAnyOrder(new Integer[]{Integer.valueOf(i)});
        PAssert.that(apply2).containsInAnyOrder(list2);
        this.pipeline.run();
    }

    private void runTestSimpleCombineWithContext(List<KV<String, Integer>> list, int i, List<KV<String, String>> list2, String[] strArr) {
        PCollection<KV<String, Integer>> createInput = createInput(this.pipeline, list);
        PCollection apply = createInput.apply(Values.create());
        PCollection apply2 = apply.apply("Sum", Combine.globally(new SumInts()));
        PCollectionView apply3 = apply2.apply(View.asSingleton());
        PCollection apply4 = createInput.apply(Combine.perKey(new TestCombineFnWithContext(apply3)).withSideInputs(Arrays.asList(apply3)));
        PCollection apply5 = apply.apply(Combine.globally(new TestCombineFnWithContext(apply3)).withoutDefaults().withSideInputs(Arrays.asList(apply3)));
        PAssert.that(apply2).containsInAnyOrder(new Integer[]{Integer.valueOf(i)});
        PAssert.that(apply4).containsInAnyOrder(list2);
        PAssert.that(apply5).containsInAnyOrder(strArr);
        this.pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testSimpleCombine() {
        runTestSimpleCombine(TABLE, 20, Arrays.asList(KV.of("a", "114"), KV.of("b", "113")));
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testSimpleCombineWithContext() {
        runTestSimpleCombineWithContext(TABLE, 20, Arrays.asList(KV.of("a", "01124"), KV.of("b", "01123")), new String[]{"01111234"});
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testSimpleCombineWithContextEmpty() {
        runTestSimpleCombineWithContext(EMPTY_TABLE, 0, Collections.emptyList(), new String[0]);
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testSimpleCombineEmpty() {
        runTestSimpleCombine(EMPTY_TABLE, 0, Collections.emptyList());
    }

    private void runTestBasicCombine(List<KV<String, Integer>> list, Set<Integer> set, List<KV<String, Set<Integer>>> list2) {
        PCollection<KV<String, Integer>> createInput = createInput(this.pipeline, list);
        PCollection apply = createInput.apply(Values.create()).apply(Combine.globally(new UniqueInts()));
        PCollection apply2 = createInput.apply(Combine.perKey(new UniqueInts()));
        PAssert.that(apply).containsInAnyOrder(new Set[]{set});
        PAssert.that(apply2).containsInAnyOrder(list2);
        this.pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testBasicCombine() {
        runTestBasicCombine(TABLE, ImmutableSet.of(1, 13, 4), Arrays.asList(KV.of("a", ImmutableSet.of(1, 4)), KV.of("b", ImmutableSet.of(1, 13))));
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testBasicCombineEmpty() {
        runTestBasicCombine(EMPTY_TABLE, ImmutableSet.of(), Collections.emptyList());
    }

    private void runTestAccumulatingCombine(List<KV<String, Integer>> list, Double d, List<KV<String, Double>> list2) {
        PCollection<KV<String, Integer>> createInput = createInput(this.pipeline, list);
        PCollection apply = createInput.apply(Values.create()).apply(Combine.globally(new MeanInts()));
        PCollection apply2 = createInput.apply(Combine.perKey(new MeanInts()));
        PAssert.that(apply).containsInAnyOrder(new Double[]{d});
        PAssert.that(apply2).containsInAnyOrder(list2);
        this.pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testFixedWindowsCombine() {
        PCollection apply = this.pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 1L, 6L, 7L, 8L)).withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))).apply(Window.into(FixedWindows.of(Duration.millis(2L))));
        PCollection apply2 = apply.apply(Values.create()).apply(Combine.globally(new SumInts()).withoutDefaults());
        PCollection apply3 = apply.apply(Combine.perKey(new TestCombineFn()));
        PAssert.that(apply2).containsInAnyOrder(new Integer[]{2, 5, 13});
        PAssert.that(apply3).containsInAnyOrder(new KV[]{KV.of("a", "11"), KV.of("a", "4"), KV.of("b", "1"), KV.of("b", "13")});
        this.pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testFixedWindowsCombineWithContext() {
        PCollection apply = this.pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 1L, 6L, 7L, 8L)).withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))).apply(Window.into(FixedWindows.of(Duration.millis(2L))));
        PCollection apply2 = apply.apply(Values.create());
        PCollection apply3 = apply2.apply("Sum", Combine.globally(new SumInts()).withoutDefaults());
        PCollectionView apply4 = apply3.apply(View.asSingleton());
        PCollection apply5 = apply.apply(Combine.perKey(new TestCombineFnWithContext(apply4)).withSideInputs(Arrays.asList(apply4)));
        PCollection apply6 = apply2.apply(Combine.globally(new TestCombineFnWithContext(apply4)).withoutDefaults().withSideInputs(Arrays.asList(apply4)));
        PAssert.that(apply3).containsInAnyOrder(new Integer[]{2, 5, 13});
        PAssert.that(apply5).containsInAnyOrder(new KV[]{KV.of("a", "112"), KV.of("a", "45"), KV.of("b", "15"), KV.of("b", "1133")});
        PAssert.that(apply6).containsInAnyOrder(new String[]{"112", "145", "1133"});
        this.pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testSlidingWindowsCombineWithContext() {
        PCollection apply = this.pipeline.apply(Create.timestamped(TABLE, Arrays.asList(2L, 3L, 8L, 9L, 10L)).withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))).apply(Window.into(SlidingWindows.of(Duration.millis(2L))));
        PCollection apply2 = apply.apply(Values.create());
        PCollection apply3 = apply2.apply("Sum", Combine.globally(new SumInts()).withoutDefaults());
        PCollectionView apply4 = apply3.apply(View.asSingleton());
        PCollection apply5 = apply.apply(Combine.perKey(new TestCombineFnWithContext(apply4)).withSideInputs(Arrays.asList(apply4)));
        PCollection apply6 = apply2.apply(Combine.globally(new TestCombineFnWithContext(apply4)).withoutDefaults().withSideInputs(Arrays.asList(apply4)));
        PAssert.that(apply3).containsInAnyOrder(new Integer[]{1, 2, 1, 4, 5, 14, 13});
        PAssert.that(apply5).containsInAnyOrder(new KV[]{KV.of("a", "11"), KV.of("a", "112"), KV.of("a", "11"), KV.of("a", "44"), KV.of("a", "45"), KV.of("b", "15"), KV.of("b", "11134"), KV.of("b", "1133")});
        PAssert.that(apply6).containsInAnyOrder(new String[]{"11", "112", "11", "44", "145", "11134", "1133"});
        this.pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testGlobalCombineWithDefaultsAndTriggers() {
        PAssert.that(this.pipeline.apply(Create.of(1, new Integer[]{1})).apply(Window.into(new GlobalWindows()).triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).accumulatingFiredPanes().withAllowedLateness(new Duration(0L))).apply(Sum.integersGlobally()).apply(ParDo.of(new FormatPaneInfo()))).satisfies(new SerializableFunction<Iterable<String>, Void>() { // from class: org.apache.beam.sdk.transforms.CombineTest.1
            public Void apply(Iterable<String> iterable) {
                Assert.assertThat(iterable, Matchers.hasItem("2: true"));
                return null;
            }
        });
        this.pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testSessionsCombine() {
        PCollection apply = this.pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 4L, 7L, 10L, 16L)).withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))).apply(Window.into(Sessions.withGapDuration(Duration.millis(5L))));
        PCollection apply2 = apply.apply(Values.create()).apply(Combine.globally(new SumInts()).withoutDefaults());
        PCollection apply3 = apply.apply(Combine.perKey(new TestCombineFn()));
        PAssert.that(apply2).containsInAnyOrder(new Integer[]{7, 13});
        PAssert.that(apply3).containsInAnyOrder(new KV[]{KV.of("a", "114"), KV.of("b", "1"), KV.of("b", "13")});
        this.pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testSessionsCombineWithContext() {
        PCollection apply = this.pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 4L, 7L, 10L, 16L)).withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
        PCollection apply2 = apply.apply(Values.create());
        PCollection apply3 = apply2.apply("FixedWindows", Window.into(FixedWindows.of(Duration.millis(5L)))).apply("Sum", Combine.globally(new SumInts()).withoutDefaults());
        PCollectionView apply4 = apply3.apply(View.asSingleton().withDefaultValue(0));
        PCollection apply5 = apply.apply("PerKey Input Sessions", Window.into(Sessions.withGapDuration(Duration.millis(5L)))).apply(Combine.perKey(new TestCombineFnWithContext(apply4)).withSideInputs(Arrays.asList(apply4)));
        PCollection apply6 = apply2.apply("Globally Input Sessions", Window.into(Sessions.withGapDuration(Duration.millis(5L)))).apply(Combine.globally(new TestCombineFnWithContext(apply4)).withoutDefaults().withSideInputs(Arrays.asList(apply4)));
        PAssert.that(apply3).containsInAnyOrder(new Integer[]{2, 4, 1, 13});
        PAssert.that(apply5).containsInAnyOrder(new KV[]{KV.of("a", "1114"), KV.of("b", "11"), KV.of("b", "013")});
        PAssert.that(apply6).containsInAnyOrder(new String[]{"11114", "013"});
        this.pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testWindowedCombineEmpty() {
        PAssert.that(this.pipeline.apply(Create.empty(BigEndianIntegerCoder.of())).apply(Window.into(FixedWindows.of(Duration.millis(1L)))).apply(Combine.globally(new MeanInts()).withoutDefaults())).empty();
        this.pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testAccumulatingCombine() {
        runTestAccumulatingCombine(TABLE, Double.valueOf(4.0d), Arrays.asList(KV.of("a", Double.valueOf(2.0d)), KV.of("b", Double.valueOf(7.0d))));
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testAccumulatingCombineEmpty() {
        runTestAccumulatingCombine(EMPTY_TABLE, Double.valueOf(0.0d), Collections.emptyList());
    }

    @Test
    public void testCombinerNames() {
        Combine.PerKey integersPerKey = Min.integersPerKey();
        Combine.PerKey integersPerKey2 = Max.integersPerKey();
        Combine.PerKey perKey = Mean.perKey();
        Combine.PerKey integersPerKey3 = Sum.integersPerKey();
        Assert.assertThat(integersPerKey.getName(), Matchers.equalTo("Combine.perKey(MinInteger)"));
        Assert.assertThat(integersPerKey2.getName(), Matchers.equalTo("Combine.perKey(MaxInteger)"));
        Assert.assertThat(perKey.getName(), Matchers.equalTo("Combine.perKey(Mean)"));
        Assert.assertThat(integersPerKey3.getName(), Matchers.equalTo("Combine.perKey(SumInteger)"));
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testHotKeyCombining() {
        PCollection copy = copy(createInput(this.pipeline, TABLE), 10);
        MeanInts meanInts = new MeanInts();
        PCollection apply = copy.apply("ColdMean", Combine.perKey(meanInts).withHotKeyFanout(0));
        PCollection apply2 = copy.apply("WarmMean", Combine.perKey(meanInts).withHotKeyFanout(hotKeyFanout));
        PCollection apply3 = copy.apply("HotMean", Combine.perKey(meanInts).withHotKeyFanout(5));
        PCollection apply4 = copy.apply("SplitMean", Combine.perKey(meanInts).withHotKeyFanout(splitHotKeyFanout));
        List asList = Arrays.asList(KV.of("a", Double.valueOf(2.0d)), KV.of("b", Double.valueOf(7.0d)));
        PAssert.that(apply).containsInAnyOrder(asList);
        PAssert.that(apply2).containsInAnyOrder(asList);
        PAssert.that(apply3).containsInAnyOrder(asList);
        PAssert.that(apply4).containsInAnyOrder(asList);
        this.pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testHotKeyCombiningWithAccumulationMode() {
        PAssert.that(this.pipeline.apply(Create.of(1, new Integer[]{2, 3, 4, 5})).apply(Window.into(new GlobalWindows()).triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).accumulatingFiredPanes().withAllowedLateness(new Duration(0L), Window.ClosingBehavior.FIRE_ALWAYS)).apply(Sum.integersGlobally().withoutDefaults().withFanout(2)).apply(ParDo.of(new GetLast()))).satisfies(new SerializableFunction<Iterable<Integer>, Void>() { // from class: org.apache.beam.sdk.transforms.CombineTest.4
            public Void apply(Iterable<Integer> iterable) {
                Assert.assertThat(iterable, Matchers.hasItem(15));
                return null;
            }
        });
        this.pipeline.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testBinaryCombineFn() {
        PCollection copy = copy(createInput(this.pipeline, TABLE), 2);
        PCollection apply = copy.apply("IntProduct", Combine.perKey(new TestProdInt()));
        PCollection apply2 = copy.apply("ObjProduct", Combine.perKey(new TestProdObj()));
        List asList = Arrays.asList(KV.of("a", 16), KV.of("b", 169));
        PAssert.that(apply).containsInAnyOrder(asList);
        PAssert.that(apply2).containsInAnyOrder(asList);
        this.pipeline.run();
    }

    @Test
    public void testBinaryCombineFnWithNulls() {
        TestUtils.checkCombineFn((Combine.CombineFn<InputT, AccumT, int>) new NullCombiner(), Arrays.asList(3, 3, 5), 45);
        TestUtils.checkCombineFn((Combine.CombineFn<InputT, AccumT, int>) new NullCombiner(), Arrays.asList(null, 3, 5), 30);
        TestUtils.checkCombineFn((Combine.CombineFn<InputT, AccumT, int>) new NullCombiner(), Arrays.asList(3, 3, null), 18);
        TestUtils.checkCombineFn((Combine.CombineFn<InputT, AccumT, int>) new NullCombiner(), Arrays.asList(null, 3, null), 12);
        TestUtils.checkCombineFn((Combine.CombineFn<InputT, AccumT, int>) new NullCombiner(), Arrays.asList(null, null, null), 8);
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testCombineGloballyAsSingletonView() {
        final PCollectionView apply = this.pipeline.apply("CreateEmptySideInput", Create.empty(BigEndianIntegerCoder.of())).apply(Sum.integersGlobally().asSingletonView());
        PAssert.thatSingleton(this.pipeline.apply("CreateVoidMainInput", Create.of((Void) null, new Void[0])).apply("OutputSideInput", ParDo.of(new DoFn<Void, Integer>() { // from class: org.apache.beam.sdk.transforms.CombineTest.5
            @DoFn.ProcessElement
            public void processElement(DoFn<Void, Integer>.ProcessContext processContext) {
                processContext.output(processContext.sideInput(apply));
            }
        }).withSideInputs(new PCollectionView[]{apply}))).isEqualTo(0);
        this.pipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testWindowedCombineGloballyAsSingletonView() {
        FixedWindows of = FixedWindows.of(Duration.standardMinutes(1L));
        final PCollectionView apply = this.pipeline.apply("CreateSideInput", Create.timestamped(TimestampedValue.of(1, new Instant(100L)), new TimestampedValue[]{TimestampedValue.of(3, new Instant(100L))})).apply("WindowSideInput", Window.into(of)).apply("CombineSideInput", Sum.integersGlobally().asSingletonView());
        TimestampedValue of2 = TimestampedValue.of((Object) null, new Instant(100L));
        TimestampedValue atMinimumTimestamp = TimestampedValue.atMinimumTimestamp((Object) null);
        PCollection apply2 = this.pipeline.apply("CreateMainInput", Create.timestamped(of2, new TimestampedValue[]{atMinimumTimestamp}).withCoder(VoidCoder.of())).apply("WindowMainInput", Window.into(of)).apply("OutputSideInput", ParDo.of(new DoFn<Void, Integer>() { // from class: org.apache.beam.sdk.transforms.CombineTest.6
            @DoFn.ProcessElement
            public void processElement(DoFn<Void, Integer>.ProcessContext processContext) {
                processContext.output(processContext.sideInput(apply));
            }
        }).withSideInputs(new PCollectionView[]{apply}));
        PAssert.that(apply2).containsInAnyOrder(new Integer[]{4, 0});
        PAssert.that(apply2).inWindow(of.assignWindow(of2.getTimestamp())).containsInAnyOrder(new Integer[]{4});
        PAssert.that(apply2).inWindow(of.assignWindow(atMinimumTimestamp.getTimestamp())).containsInAnyOrder(new Integer[]{0});
        this.pipeline.run();
    }

    @Test
    public void testCombineGetName() {
        Assert.assertEquals("Combine.globally(SumInts)", Combine.globally(new SumInts()).getName());
        Assert.assertEquals("Combine.GloballyAsSingletonView", Combine.globally(new SumInts()).asSingletonView().getName());
        Assert.assertEquals("Combine.perKey(Test)", Combine.perKey(new TestCombineFn()).getName());
        Assert.assertEquals("Combine.perKeyWithFanout(Test)", Combine.perKey(new TestCombineFn()).withHotKeyFanout(10).getName());
    }

    @Test
    public void testDisplayData() {
        UniqueInts uniqueInts = new UniqueInts() { // from class: org.apache.beam.sdk.transforms.CombineTest.7
            public void populateDisplayData(DisplayData.Builder builder) {
                builder.add(DisplayData.item("fnMetadata", "foobar"));
            }
        };
        DisplayData from = DisplayData.from(Combine.globally(uniqueInts).withFanout(1234));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("combineFn", uniqueInts.getClass()));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("emitDefaultOnEmptyInput", (Boolean) true));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("fanout", 1234L));
        Assert.assertThat(from, DisplayDataMatchers.includesDisplayDataFor("combineFn", uniqueInts));
    }

    @Test
    public void testDisplayDataForWrappedFn() {
        UniqueInts uniqueInts = new UniqueInts() { // from class: org.apache.beam.sdk.transforms.CombineTest.8
            public void populateDisplayData(DisplayData.Builder builder) {
                builder.add(DisplayData.item("foo", "bar"));
            }
        };
        DisplayData from = DisplayData.from(Combine.perKey(uniqueInts));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("combineFn", uniqueInts.getClass()));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem(DisplayDataMatchers.hasNamespace(uniqueInts.getClass())));
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testCombinePerKeyPrimitiveDisplayData() {
        DisplayDataEvaluator create = DisplayDataEvaluator.create();
        UniqueInts uniqueInts = new UniqueInts();
        Assert.assertThat("Combine.perKey should include the combineFn in its primitive transform", create.displayDataForPrimitiveTransforms(Combine.perKey(uniqueInts), KvCoder.of(VarIntCoder.of(), VarIntCoder.of())), Matchers.hasItem(DisplayDataMatchers.hasDisplayItem("combineFn", uniqueInts.getClass())));
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testCombinePerKeyWithHotKeyFanoutPrimitiveDisplayData() {
        DisplayDataEvaluator create = DisplayDataEvaluator.create();
        UniqueInts uniqueInts = new UniqueInts();
        Set<DisplayData> displayDataForPrimitiveTransforms = create.displayDataForPrimitiveTransforms(Combine.perKey(uniqueInts).withHotKeyFanout(2), KvCoder.of(VarIntCoder.of(), VarIntCoder.of()));
        Assert.assertThat("Combine.perKey.withHotKeyFanout should include the combineFn in its primitive transform", displayDataForPrimitiveTransforms, Matchers.hasItem(DisplayDataMatchers.hasDisplayItem("combineFn", uniqueInts.getClass())));
        Assert.assertThat("Combine.perKey.withHotKeyFanout(int) should include the fanout in its primitive transform", displayDataForPrimitiveTransforms, Matchers.hasItem(DisplayDataMatchers.hasDisplayItem("fanout", 2)));
    }

    private static <T> PCollection<T> copy(PCollection<T> pCollection, final int i) {
        return pCollection.apply(ParDo.of(new DoFn<T, T>() { // from class: org.apache.beam.sdk.transforms.CombineTest.9
            @DoFn.ProcessElement
            public void processElement(DoFn<T, T>.ProcessContext processContext) throws Exception {
                for (int i2 = 0; i2 < i; i2++) {
                    processContext.output(processContext.element());
                }
            }
        }));
    }
}
