package org.apache.beam.fn.harness;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;

/* loaded from: input_file:org/apache/beam/fn/harness/WindowMergingFnRunner.class */
public abstract class WindowMergingFnRunner<T, W extends BoundedWindow> {
    static final String URN = "beam:transform:merge_windows:v1";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/fn/harness/WindowMergingFnRunner$MergingViaWindowFnRunner.class */
    public static class MergingViaWindowFnRunner<T, W extends BoundedWindow> extends WindowMergingFnRunner<T, W> {
        private final WindowFn<T, W> windowFn;
        private final WindowFn<?, W>.MergeContext mergeContext;
        private Collection<W> currentWindows;
        private List<KV<W, Collection<W>>> mergedWindows;

        private MergingViaWindowFnRunner(WindowFn<T, W> windowFn) {
            this.windowFn = windowFn;
            this.mergedWindows = new ArrayList();
            this.currentWindows = new ArrayList();
            Objects.requireNonNull(windowFn);
            this.mergeContext = new WindowFn<T, W>.MergeContext(windowFn) { // from class: org.apache.beam.fn.harness.WindowMergingFnRunner.MergingViaWindowFnRunner.1
                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super();
                    Objects.requireNonNull(windowFn);
                }

                @Override // org.apache.beam.sdk.transforms.windowing.WindowFn.MergeContext
                public Collection<W> windows() {
                    return MergingViaWindowFnRunner.this.currentWindows;
                }

                @Override // org.apache.beam.sdk.transforms.windowing.WindowFn.MergeContext
                public void merge(Collection<W> collection, W w) throws Exception {
                    MergingViaWindowFnRunner.this.mergedWindows.add(KV.of(w, collection));
                }
            };
        }

        @Override // org.apache.beam.fn.harness.WindowMergingFnRunner
        KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> mergeWindows(KV<T, Iterable<W>> kv) throws Exception {
            this.currentWindows = Sets.newHashSet(kv.getValue());
            this.windowFn.mergeWindows(this.mergeContext);
            Iterator<KV<W, Collection<W>>> it = this.mergedWindows.iterator();
            while (it.hasNext()) {
                this.currentWindows.removeAll(it.next().getValue());
            }
            KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> of = KV.of(kv.getKey(), KV.of(Sets.newHashSet(this.currentWindows), Lists.newArrayList(this.mergedWindows)));
            this.currentWindows.clear();
            this.mergedWindows.clear();
            return of;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/fn/harness/WindowMergingFnRunner$NonMergingWindowFnRunner.class */
    public static class NonMergingWindowFnRunner<T, W extends BoundedWindow> extends WindowMergingFnRunner<T, W> {
        private NonMergingWindowFnRunner() {
        }

        @Override // org.apache.beam.fn.harness.WindowMergingFnRunner
        KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> mergeWindows(KV<T, Iterable<W>> kv) {
            return KV.of(kv.getKey(), KV.of(kv.getValue(), Collections.emptyList()));
        }
    }

    @AutoService({PTransformRunnerFactory.Registrar.class})
    /* loaded from: input_file:org/apache/beam/fn/harness/WindowMergingFnRunner$Registrar.class */
    public static class Registrar implements PTransformRunnerFactory.Registrar {
        @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar
        public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
            return ImmutableMap.of("beam:transform:merge_windows:v1", MapFnRunners.forValueMapFnFactory(WindowMergingFnRunner::createMapFunctionForPTransform));
        }
    }

    static <T, W extends BoundedWindow> ThrowingFunction<KV<T, Iterable<W>>, KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>>> createMapFunctionForPTransform(String str, RunnerApi.PTransform pTransform) throws IOException {
        WindowMergingFnRunner create = create(WindowingStrategyTranslation.windowFnFromProto(RunnerApi.FunctionSpec.parseFrom(pTransform.getSpec().getPayload())));
        Objects.requireNonNull(create);
        return create::mergeWindows;
    }

    static <T, W extends BoundedWindow> WindowMergingFnRunner<T, W> create(WindowFn<?, W> windowFn) {
        return windowFn.isNonMerging() ? new NonMergingWindowFnRunner() : new MergingViaWindowFnRunner(windowFn);
    }

    abstract KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> mergeWindows(KV<T, Iterable<W>> kv) throws Exception;
}
