package org.apache.beam.sdk.extensions.euphoria.core.translate;

import org.apache.beam.sdk.extensions.euphoria.core.client.accumulators.AccumulatorProvider;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.ExtractEventTime;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FlatMap;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareness;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.PCollectionLists;
import org.apache.beam.sdk.extensions.euphoria.core.translate.collector.AdaptableCollector;
import org.apache.beam.sdk.extensions.euphoria.core.translate.collector.CollectorAdapter;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/sdk/extensions/euphoria/core/translate/FlatMapTranslator.class */
public class FlatMapTranslator<InputT, OutputT> implements OperatorTranslator<InputT, OutputT, FlatMap<InputT, OutputT>> {

    /* loaded from: input_file:org/apache/beam/sdk/extensions/euphoria/core/translate/FlatMapTranslator$Collector.class */
    private static class Collector<InputT, OutputT> implements CollectorAdapter<InputT, OutputT, OutputT> {
        private final ExtractEventTime<InputT> eventTimeExtractor;

        private Collector(ExtractEventTime<InputT> extractEventTime) {
            this.eventTimeExtractor = extractEventTime;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.beam.sdk.extensions.euphoria.core.translate.collector.CollectorAdapter
        public void collect(DoFn<InputT, OutputT>.ProcessContext processContext, OutputT outputt) {
            if (this.eventTimeExtractor == null) {
                processContext.output(outputt);
            } else {
                processContext.outputWithTimestamp(outputt, new Instant(this.eventTimeExtractor.extractTimestamp(processContext.element())));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/extensions/euphoria/core/translate/FlatMapTranslator$Mapper.class */
    public static class Mapper<InputT, OutputT> extends DoFn<InputT, OutputT> {
        private final UnaryFunctor<InputT, OutputT> mapper;
        private final AdaptableCollector<InputT, OutputT, OutputT> collector;
        private final Duration timestampSkew;

        Mapper(String str, UnaryFunctor<InputT, OutputT> unaryFunctor, AccumulatorProvider accumulatorProvider, ExtractEventTime<InputT> extractEventTime, Duration duration) {
            this.mapper = unaryFunctor;
            this.collector = new AdaptableCollector<>(accumulatorProvider, str, new Collector(extractEventTime));
            this.timestampSkew = duration;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @DoFn.ProcessElement
        public void processElement(DoFn<InputT, OutputT>.ProcessContext processContext) {
            this.collector.setProcessContext(processContext);
            this.mapper.apply(processContext.element(), this.collector);
        }

        public Duration getAllowedTimestampSkew() {
            return this.timestampSkew;
        }
    }

    @Override // org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTranslator
    public PCollection<OutputT> translate(FlatMap<InputT, OutputT> flatMap, PCollectionList<InputT> pCollectionList) {
        return PCollectionLists.getOnlyElement(pCollectionList).apply("mapper", ParDo.of(new Mapper(flatMap.getName().orElse(null), flatMap.getFunctor(), new LazyAccumulatorProvider(AccumulatorProvider.of(pCollectionList.getPipeline())), flatMap.getEventTimeExtractor().orElse(null), flatMap.getAllowedTimestampSkew()))).setTypeDescriptor(TypeAwareness.orObjects(flatMap.getOutputType()));
    }
}
