/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.euphoria.core.translate;

import org.apache.beam.sdk.extensions.euphoria.core.client.accumulators.AccumulatorProvider;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.ExtractEventTime;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FlatMap;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareness;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.PCollectionLists;
import org.apache.beam.sdk.extensions.euphoria.core.translate.LazyAccumulatorProvider;
import org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTranslator;
import org.apache.beam.sdk.extensions.euphoria.core.translate.collector.AdaptableCollector;
import org.apache.beam.sdk.extensions.euphoria.core.translate.collector.CollectorAdapter;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class FlatMapTranslator<InputT, OutputT>
implements OperatorTranslator<InputT, OutputT, FlatMap<InputT, OutputT>> {
    @Override
    public PCollection<OutputT> translate(FlatMap<InputT, OutputT> operator, PCollectionList<InputT> inputs) {
        LazyAccumulatorProvider accumulators = new LazyAccumulatorProvider(AccumulatorProvider.of(inputs.getPipeline()));
        Mapper<InputT, OutputT> mapper = new Mapper<InputT, OutputT>(operator.getName().orElse(null), operator.getFunctor(), accumulators, operator.getEventTimeExtractor().orElse(null), operator.getAllowedTimestampSkew());
        return ((PCollection)PCollectionLists.getOnlyElement(inputs).apply("mapper", (PTransform)ParDo.of(mapper))).setTypeDescriptor(TypeAwareness.orObjects(operator.getOutputType()));
    }

    private static class Collector<InputT, OutputT>
    implements CollectorAdapter<InputT, OutputT, OutputT> {
        private final @Nullable ExtractEventTime<InputT> eventTimeExtractor;

        private Collector(@Nullable ExtractEventTime<InputT> eventTimeExtractor) {
            this.eventTimeExtractor = eventTimeExtractor;
        }

        @Override
        public void collect(DoFn.ProcessContext ctx, OutputT out) {
            if (this.eventTimeExtractor != null) {
                Object element = ctx.element();
                ctx.outputWithTimestamp(out, new Instant(this.eventTimeExtractor.extractTimestamp(element)));
            } else {
                ctx.output(out);
            }
        }
    }

    private static class Mapper<InputT, OutputT>
    extends DoFn<InputT, OutputT> {
        private final UnaryFunctor<InputT, OutputT> mapper;
        private final AdaptableCollector<InputT, OutputT, OutputT> collector;
        private final Duration timestampSkew;

        Mapper(@Nullable String operatorName, UnaryFunctor<InputT, OutputT> mapper, AccumulatorProvider accumulators, @Nullable ExtractEventTime<InputT> eventTimeExtractor, Duration timestampSkew) {
            this.mapper = mapper;
            this.collector = new AdaptableCollector(accumulators, operatorName, new Collector(eventTimeExtractor));
            this.timestampSkew = timestampSkew;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext ctx) {
            this.collector.setProcessContext(ctx);
            this.mapper.apply(ctx.element(), this.collector);
        }

        public Duration getAllowedTimestampSkew() {
            return this.timestampSkew;
        }
    }
}

