package org.apache.beam.sdk.extensions.ordered;

import com.google.auto.value.AutoValue;
import java.util.Arrays;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.extensions.ordered.MutableState;
import org.apache.beam.sdk.extensions.ordered.ProcessingState;
import org.apache.beam.sdk.extensions.ordered.UnprocessedEvent;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.state.OrderedListState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoValue
/* loaded from: input_file:org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.class */
public abstract class OrderedEventProcessor<EventT, EventKeyT, ResultT, StateT extends MutableState<EventT, ResultT>> extends PTransform<PCollection<KV<EventKeyT, KV<Long, EventT>>>, OrderedEventProcessorResult<EventKeyT, ResultT, EventT>> {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor$OrderedProcessorDoFn.class */
    public static class OrderedProcessorDoFn<EventTypeT, EventKeyTypeT, ResultTypeT, StateTypeT extends MutableState<EventTypeT, ResultTypeT>> extends DoFn<KV<EventKeyTypeT, KV<Long, EventTypeT>>, KV<EventKeyTypeT, ResultTypeT>> {
        private static final Logger LOG = LoggerFactory.getLogger(OrderedProcessorDoFn.class);
        private static final String PROCESSING_STATE = "processingState";
        private static final String MUTABLE_STATE = "mutableState";
        private static final String BUFFERED_EVENTS = "bufferedEvents";
        private static final String STATUS_EMISSION_TIMER = "statusTimer";
        private static final String LARGE_BATCH_EMISSION_TIMER = "largeBatchTimer";
        private static final String WINDOW_CLOSED = "windowClosed";
        private final EventExaminer<EventTypeT, StateTypeT> eventExaminer;

        @DoFn.StateId(BUFFERED_EVENTS)
        private final StateSpec<OrderedListState<EventTypeT>> bufferedEventsSpec;

        @DoFn.StateId(PROCESSING_STATE)
        private final StateSpec<ValueState<ProcessingState<EventKeyTypeT>>> processingStateSpec;

        @DoFn.StateId(MUTABLE_STATE)
        private final StateSpec<ValueState<StateTypeT>> mutableStateSpec;
        private final TupleTag<KV<EventKeyTypeT, OrderedProcessingStatus>> statusTupleTag;
        private final Duration statusUpdateFrequency;
        private final TupleTag<KV<EventKeyTypeT, ResultTypeT>> mainOutputTupleTag;
        private final TupleTag<KV<EventKeyTypeT, KV<Long, UnprocessedEvent<EventTypeT>>>> unprocessedEventsTupleTag;
        private final boolean produceStatusUpdateOnEveryEvent;
        private final long maxNumberOfResultsToProduce;
        private Long numberOfResultsBeforeBundleStart;

        @DoFn.TimerId(STATUS_EMISSION_TIMER)
        private final TimerSpec statusEmissionTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

        @DoFn.TimerId(LARGE_BATCH_EMISSION_TIMER)
        private final TimerSpec largeBatchEmissionTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

        @DoFn.StateId(WINDOW_CLOSED)
        private final StateSpec<ValueState<Boolean>> windowClosedSpec = StateSpecs.value(BooleanCoder.of());

        OrderedProcessorDoFn(EventExaminer<EventTypeT, StateTypeT> eventExaminer, Coder<EventTypeT> coder, Coder<StateTypeT> coder2, Coder<EventKeyTypeT> coder3, TupleTag<KV<EventKeyTypeT, ResultTypeT>> tupleTag, TupleTag<KV<EventKeyTypeT, OrderedProcessingStatus>> tupleTag2, Duration duration, TupleTag<KV<EventKeyTypeT, KV<Long, UnprocessedEvent<EventTypeT>>>> tupleTag3, boolean z, long j) {
            this.eventExaminer = eventExaminer;
            this.bufferedEventsSpec = StateSpecs.orderedList(coder);
            this.mutableStateSpec = StateSpecs.value(coder2);
            this.processingStateSpec = StateSpecs.value(ProcessingState.ProcessingStateCoder.of(coder3));
            this.mainOutputTupleTag = tupleTag;
            this.statusTupleTag = tupleTag2;
            this.unprocessedEventsTupleTag = tupleTag3;
            this.statusUpdateFrequency = duration;
            this.produceStatusUpdateOnEveryEvent = z;
            this.maxNumberOfResultsToProduce = j;
        }

        @DoFn.StartBundle
        public void onBundleStart() {
            this.numberOfResultsBeforeBundleStart = null;
        }

        @DoFn.FinishBundle
        public void onBundleFinish() {
            this.numberOfResultsBeforeBundleStart = null;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @DoFn.ProcessElement
        public void processElement(@DoFn.StateId("bufferedEvents") OrderedListState<EventTypeT> orderedListState, @DoFn.StateId("processingState") @DoFn.AlwaysFetched ValueState<ProcessingState<EventKeyTypeT>> valueState, @DoFn.StateId("mutableState") ValueState<StateTypeT> valueState2, @DoFn.TimerId("statusTimer") Timer timer, @DoFn.TimerId("largeBatchTimer") Timer timer2, @DoFn.Element KV<EventKeyTypeT, KV<Long, EventTypeT>> kv, DoFn.MultiOutputReceiver multiOutputReceiver, BoundedWindow boundedWindow) {
            Object key = kv.getKey();
            long longValue = ((Long) ((KV) kv.getValue()).getKey()).longValue();
            Object value = ((KV) kv.getValue()).getValue();
            ProcessingState processingState = (ProcessingState) valueState.read();
            if (processingState == null) {
                processingState = new ProcessingState(key);
                if (this.statusUpdateFrequency != null) {
                    timer.offset(this.statusUpdateFrequency).setRelative();
                }
            }
            if (this.numberOfResultsBeforeBundleStart == null) {
                this.numberOfResultsBeforeBundleStart = Long.valueOf(processingState.getResultCount());
            }
            processingState.eventReceived();
            MutableState processNewEvent = processNewEvent(longValue, value, processingState, valueState2, orderedListState, multiOutputReceiver);
            processBufferedEvents(processingState, processNewEvent, orderedListState, multiOutputReceiver, timer2);
            saveStates(valueState, processingState, valueState2, processNewEvent, multiOutputReceiver, boundedWindow.maxTimestamp());
            checkIfProcessingIsCompleted(processingState);
        }

        private boolean checkIfProcessingIsCompleted(ProcessingState<EventKeyTypeT> processingState) {
            boolean isProcessingCompleted = processingState.isProcessingCompleted();
            if (isProcessingCompleted) {
                LOG.info("Processing for key '" + processingState.getKey() + "' is completed.");
            }
            return isProcessingCompleted;
        }

        private void saveStates(ValueState<ProcessingState<EventKeyTypeT>> valueState, ProcessingState<EventKeyTypeT> processingState, ValueState<StateTypeT> valueState2, StateTypeT statetypet, DoFn.MultiOutputReceiver multiOutputReceiver, Instant instant) {
            valueState.write(processingState);
            if (statetypet != null) {
                valueState2.write(statetypet);
            }
            if (this.produceStatusUpdateOnEveryEvent) {
                emitProcessingStatus(processingState, multiOutputReceiver, instant);
            }
        }

        private void emitProcessingStatus(ProcessingState<EventKeyTypeT> processingState, DoFn.MultiOutputReceiver multiOutputReceiver, Instant instant) {
            multiOutputReceiver.get(this.statusTupleTag).outputWithTimestamp(KV.of(processingState.getKey(), OrderedProcessingStatus.create(processingState.getLastOutputSequence(), processingState.getBufferedEventCount(), processingState.getEarliestBufferedSequence(), processingState.getLatestBufferedSequence(), processingState.getEventsReceived(), processingState.getResultCount(), processingState.getDuplicates(), processingState.isLastEventReceived())), instant);
        }

        private StateTypeT processNewEvent(long j, EventTypeT eventtypet, ProcessingState<EventKeyTypeT> processingState, ValueState<StateTypeT> valueState, OrderedListState<EventTypeT> orderedListState, DoFn.MultiOutputReceiver multiOutputReceiver) {
            if (j == Long.MAX_VALUE) {
                multiOutputReceiver.get(this.unprocessedEventsTupleTag).output(KV.of(processingState.getKey(), KV.of(Long.valueOf(j), UnprocessedEvent.create(eventtypet, UnprocessedEvent.Reason.sequence_id_outside_valid_range))));
                return null;
            }
            if (processingState.hasAlreadyBeenProcessed(j)) {
                multiOutputReceiver.get(this.unprocessedEventsTupleTag).output(KV.of(processingState.getKey(), KV.of(Long.valueOf(j), UnprocessedEvent.create(eventtypet, UnprocessedEvent.Reason.duplicate))));
                return null;
            }
            boolean isLastEvent = this.eventExaminer.isLastEvent(j, eventtypet);
            if (this.eventExaminer.isInitialEvent(j, eventtypet)) {
                StateTypeT createStateOnInitialEvent = this.eventExaminer.createStateOnInitialEvent(eventtypet);
                processingState.eventAccepted(j, isLastEvent);
                Object produceResult = createStateOnInitialEvent.produceResult();
                if (produceResult != null) {
                    multiOutputReceiver.get(this.mainOutputTupleTag).output(KV.of(processingState.getKey(), produceResult));
                    processingState.resultProduced();
                }
                return createStateOnInitialEvent;
            }
            if (!processingState.isNextEvent(j)) {
                orderedListState.add(TimestampedValue.of(eventtypet, Instant.ofEpochMilli(j)));
                processingState.eventBuffered(j, isLastEvent);
                return null;
            }
            StateTypeT statetypet = (StateTypeT) valueState.read();
            try {
                statetypet.mutate(eventtypet);
                Object produceResult2 = statetypet.produceResult();
                if (produceResult2 != null) {
                    multiOutputReceiver.get(this.mainOutputTupleTag).output(KV.of(processingState.getKey(), produceResult2));
                    processingState.resultProduced();
                }
                processingState.eventAccepted(j, isLastEvent);
                return statetypet;
            } catch (Exception e) {
                multiOutputReceiver.get(this.unprocessedEventsTupleTag).output(KV.of(processingState.getKey(), KV.of(Long.valueOf(j), UnprocessedEvent.create(eventtypet, e))));
                return null;
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        private void processBufferedEvents(ProcessingState<EventKeyTypeT> processingState, StateTypeT statetypet, OrderedListState<EventTypeT> orderedListState, DoFn.MultiOutputReceiver multiOutputReceiver, Timer timer) {
            if (statetypet == 0 || !processingState.readyToProcessBufferedEvents() || reachedMaxResultCountForBundle(processingState, timer)) {
                return;
            }
            Instant ofEpochMilli = Instant.ofEpochMilli(processingState.getEarliestBufferedSequence().longValue());
            Instant instant = null;
            Iterator it = orderedListState.readRange(ofEpochMilli, Instant.ofEpochMilli(processingState.getLatestBufferedSequence().longValue() + 1)).iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                TimestampedValue timestampedValue = (TimestampedValue) it.next();
                long millis = timestampedValue.getTimestamp().getMillis();
                Object value = timestampedValue.getValue();
                if (processingState.checkForDuplicateBatchedEvent(millis)) {
                    multiOutputReceiver.get(this.unprocessedEventsTupleTag).output(KV.of(processingState.getKey(), KV.of(Long.valueOf(millis), UnprocessedEvent.create(value, UnprocessedEvent.Reason.duplicate))));
                } else {
                    if (millis > processingState.getLastOutputSequence().longValue() + 1) {
                        processingState.foundSequenceGap(millis);
                        instant = Instant.ofEpochMilli(millis);
                        break;
                    }
                    if (reachedMaxResultCountForBundle(processingState, timer)) {
                        instant = Instant.ofEpochMilli(millis);
                        break;
                    }
                    try {
                        statetypet.mutate(value);
                        Object produceResult = statetypet.produceResult();
                        if (produceResult != null) {
                            multiOutputReceiver.get(this.mainOutputTupleTag).output(KV.of(processingState.getKey(), produceResult));
                            processingState.resultProduced();
                        }
                        processingState.processedBufferedEvent(millis);
                        instant = Instant.ofEpochMilli(millis + 1);
                    } catch (Exception e) {
                        multiOutputReceiver.get(this.unprocessedEventsTupleTag).output(KV.of(processingState.getKey(), KV.of(Long.valueOf(millis), UnprocessedEvent.create(value, e))));
                    }
                }
            }
            orderedListState.clearRange(ofEpochMilli, instant);
        }

        private boolean reachedMaxResultCountForBundle(ProcessingState<EventKeyTypeT> processingState, Timer timer) {
            boolean z = processingState.resultsProducedInBundle(this.numberOfResultsBeforeBundleStart.longValue()) >= this.maxNumberOfResultsToProduce;
            if (z) {
                LOG.info("Setting the timer to output next batch of events for key '" + processingState.getKey() + "'");
                timer.offset(Duration.millis(1L)).setRelative();
            }
            return z;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @DoFn.OnTimer(LARGE_BATCH_EMISSION_TIMER)
        public void onBatchEmission(DoFn<KV<EventKeyTypeT, KV<Long, EventTypeT>>, KV<EventKeyTypeT, ResultTypeT>>.OnTimerContext onTimerContext, @DoFn.StateId("bufferedEvents") OrderedListState<EventTypeT> orderedListState, @DoFn.StateId("processingState") @DoFn.AlwaysFetched ValueState<ProcessingState<EventKeyTypeT>> valueState, @DoFn.StateId("mutableState") @DoFn.AlwaysFetched ValueState<StateTypeT> valueState2, @DoFn.TimerId("largeBatchTimer") Timer timer, DoFn.MultiOutputReceiver multiOutputReceiver) {
            ProcessingState processingState = (ProcessingState) valueState.read();
            if (processingState == null) {
                LOG.warn("Processing state is empty. Ignore it if the pipeline is being cancelled.");
                return;
            }
            MutableState mutableState = (MutableState) valueState2.read();
            if (mutableState == null) {
                LOG.warn("Mutable state is empty. Ignore it if the pipeline is being cancelled.");
                return;
            }
            LOG.debug("Starting to process batch for key '" + processingState.getKey() + "'");
            this.numberOfResultsBeforeBundleStart = Long.valueOf(processingState.getResultCount());
            processBufferedEvents(processingState, mutableState, orderedListState, multiOutputReceiver, timer);
            saveStates(valueState, processingState, valueState2, mutableState, multiOutputReceiver, onTimerContext.window().maxTimestamp());
            checkIfProcessingIsCompleted(processingState);
        }

        @DoFn.OnTimer(STATUS_EMISSION_TIMER)
        public void onStatusEmission(DoFn.MultiOutputReceiver multiOutputReceiver, @DoFn.TimerId("statusTimer") Timer timer, @DoFn.StateId("windowClosed") ValueState<Boolean> valueState, @DoFn.StateId("processingState") ValueState<ProcessingState<EventKeyTypeT>> valueState2) {
            ProcessingState<EventKeyTypeT> processingState = (ProcessingState) valueState2.read();
            if (processingState == null) {
                LOG.warn("Current processing state is null in onStatusEmission() - most likely the pipeline is shutting down.");
                return;
            }
            emitProcessingStatus(processingState, multiOutputReceiver, Instant.now());
            Boolean bool = (Boolean) valueState.read();
            if (processingState.isProcessingCompleted()) {
                return;
            }
            if (bool == null || !bool.booleanValue()) {
                timer.offset(this.statusUpdateFrequency).setRelative();
            }
        }

        @DoFn.OnWindowExpiration
        public void onWindowExpiration(@DoFn.StateId("windowClosed") ValueState<Boolean> valueState) {
            valueState.write(true);
        }
    }

    public static <EventTypeT, EventKeyTypeT, ResultTypeT, StateTypeT extends MutableState<EventTypeT, ResultTypeT>> OrderedEventProcessor<EventTypeT, EventKeyTypeT, ResultTypeT, StateTypeT> create(OrderedProcessingHandler<EventTypeT, EventKeyTypeT, StateTypeT, ResultTypeT> orderedProcessingHandler) {
        return new AutoValue_OrderedEventProcessor(orderedProcessingHandler);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nullable
    public abstract OrderedProcessingHandler<EventT, EventKeyT, StateT, ResultT> getHandler();

    public OrderedEventProcessorResult<EventKeyT, ResultT, EventT> expand(PCollection<KV<EventKeyT, KV<Long, EventT>>> pCollection) {
        TupleTag<KV<EventKeyT, ResultT>> tupleTag = new TupleTag<KV<EventKeyT, ResultT>>("mainOutput") { // from class: org.apache.beam.sdk.extensions.ordered.OrderedEventProcessor.1
        };
        TupleTag<KV<EventKeyT, OrderedProcessingStatus>> tupleTag2 = new TupleTag<KV<EventKeyT, OrderedProcessingStatus>>("status") { // from class: org.apache.beam.sdk.extensions.ordered.OrderedEventProcessor.2
        };
        TupleTag<KV<EventKeyT, KV<Long, UnprocessedEvent<EventT>>>> tupleTag3 = new TupleTag<KV<EventKeyT, KV<Long, UnprocessedEvent<EventT>>>>("unprocessed-events") { // from class: org.apache.beam.sdk.extensions.ordered.OrderedEventProcessor.3
        };
        OrderedProcessingHandler<EventT, EventKeyT, StateT, ResultT> handler = getHandler();
        Pipeline pipeline = pCollection.getPipeline();
        try {
            Coder<EventKeyT> keyCoder = handler.getKeyCoder(pipeline, pCollection.getCoder());
            try {
                Coder<EventT> eventCoder = handler.getEventCoder(pipeline, pCollection.getCoder());
                try {
                    Coder<StateT> stateCoder = handler.getStateCoder(pipeline);
                    try {
                        Coder<ResultT> resultCoder = handler.getResultCoder(pipeline);
                        PCollectionTuple apply = pCollection.apply(ParDo.of(new OrderedProcessorDoFn(handler.getEventExaminer(), eventCoder, stateCoder, keyCoder, tupleTag, tupleTag2, handler.getStatusUpdateFrequency(), tupleTag3, handler.isProduceStatusUpdateOnEveryEvent(), handler.getMaxOutputElementsPerBundle())).withOutputTags(tupleTag, TupleTagList.of(Arrays.asList(tupleTag2, tupleTag3))));
                        return new OrderedEventProcessorResult<>(pipeline, apply.get(tupleTag).setCoder(KvCoder.of(keyCoder, resultCoder)), tupleTag, apply.get(tupleTag2).setCoder(KvCoder.of(keyCoder, getOrderedProcessingStatusCoder(pipeline))), tupleTag2, apply.get(tupleTag3).setCoder(KvCoder.of(keyCoder, KvCoder.of(VarLongCoder.of(), new UnprocessedEvent.UnprocessedEventCoder(eventCoder)))), tupleTag3);
                    } catch (CannotProvideCoderException e) {
                        throw new RuntimeException("Unable to get result coder", e);
                    }
                } catch (CannotProvideCoderException e2) {
                    throw new RuntimeException("Unable to get state coder", e2);
                }
            } catch (CannotProvideCoderException e3) {
                throw new RuntimeException("Unable to get event coder", e3);
            }
        } catch (CannotProvideCoderException e4) {
            throw new RuntimeException("Unable to get key coder", e4);
        }
    }

    private static Coder<OrderedProcessingStatus> getOrderedProcessingStatusCoder(Pipeline pipeline) {
        SchemaRegistry schemaRegistry = pipeline.getSchemaRegistry();
        try {
            return SchemaCoder.of(schemaRegistry.getSchema(OrderedProcessingStatus.class), TypeDescriptor.of(OrderedProcessingStatus.class), schemaRegistry.getToRowFunction(OrderedProcessingStatus.class), schemaRegistry.getFromRowFunction(OrderedProcessingStatus.class));
        } catch (NoSuchSchemaException e) {
            throw new RuntimeException((Throwable) e);
        }
    }
}
