package org.apache.beam.sdk.extensions.timeseries;

import com.google.auto.value.AutoValue;
import java.lang.invoke.SerializedLambda;
import java.util.Map;
import java.util.SortedMap;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.SortedMapCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.extensions.timeseries.AutoValue_FillGaps;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.transforms.WithKeys;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.TimerMap;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.joda.time.Duration;
import org.joda.time.Instant;

@AutoValue
/* loaded from: input_file:org/apache/beam/sdk/extensions/timeseries/FillGaps.class */
public abstract class FillGaps<ValueT> extends PTransform<PCollection<ValueT>, PCollection<ValueT>> {
    private static final int GC_EVERY_N_BUCKETS = 60;

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue.Builder
    /* loaded from: input_file:org/apache/beam/sdk/extensions/timeseries/FillGaps$Builder.class */
    public static abstract class Builder<ValueT> {
        abstract Builder<ValueT> setTimeseriesBucketDuration(Duration duration);

        abstract Builder<ValueT> setMaxGapFillBuckets(Long l);

        abstract Builder<ValueT> setStopTime(Instant instant);

        abstract Builder<ValueT> setKeyDescriptor(FieldAccessDescriptor fieldAccessDescriptor);

        abstract Builder<ValueT> setMergeValues(SerializableBiFunction<TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>> serializableBiFunction);

        abstract Builder<ValueT> setInterpolateFunction(@Nullable SerializableFunction<InterpolateData<ValueT>, ValueT> serializableFunction);

        abstract Builder<ValueT> setGcEveryNBuckets(int i);

        abstract FillGaps<ValueT> build();
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/timeseries/FillGaps$FillGapsDoFn.class */
    public static class FillGapsDoFn<ValueT> extends DoFn<KV<Row, ValueT>, ValueT> {
        private final FixedWindows bucketWindows;
        private final FixedWindows gcWindows;
        private final Instant stopTime;
        private final long maxGapFillBuckets;
        private final SerializableBiFunction<TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>> mergeValues;

        @Nullable
        private final SerializableFunction<InterpolateData<ValueT>, ValueT> interpolateFunction;

        @DoFn.StateId("seenBuckets")
        private final StateSpec<ValueState<SortedMap<Instant, TimestampedValue<ValueT>>>> seenBucketsSpec;

        @DoFn.TimerFamily("gapTimers")
        private final TimerSpec gapFillingTimersSpec = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);

        @DoFn.TimerFamily("gcTimers")
        private final TimerSpec gcTimersSpec = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);

        @DoFn.StateId("gapDurationMap")
        private final StateSpec<ValueState<SortedMap<Instant, Long>>> gapDurationSpec = StateSpecs.value(SortedMapCoder.of(InstantCoder.of(), VarLongCoder.of()));

        FillGapsDoFn(FixedWindows fixedWindows, Coder<ValueT> coder, Instant instant, long j, SerializableBiFunction<TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>> serializableBiFunction, @Nullable SerializableFunction<InterpolateData<ValueT>, ValueT> serializableFunction, int i) {
            this.bucketWindows = fixedWindows;
            this.gcWindows = FixedWindows.of(fixedWindows.getSize().multipliedBy(i));
            this.stopTime = instant;
            this.maxGapFillBuckets = j;
            this.seenBucketsSpec = StateSpecs.value(SortedMapCoder.of(InstantCoder.of(), TimestampedValue.TimestampedValueCoder.of(coder)));
            this.mergeValues = serializableBiFunction;
            this.interpolateFunction = serializableFunction;
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element KV<Row, ValueT> kv, @DoFn.Timestamp Instant instant, @DoFn.TimerFamily("gapTimers") TimerMap timerMap, @DoFn.TimerFamily("gcTimers") TimerMap timerMap2, @DoFn.StateId("seenBuckets") @DoFn.AlwaysFetched ValueState<SortedMap<Instant, TimestampedValue<ValueT>>> valueState, DoFn.OutputReceiver<ValueT> outputReceiver) {
            if (instant.isAfter(this.stopTime)) {
                return;
            }
            Instant end = this.bucketWindows.assignWindow(instant).end();
            if (processEvent(() -> {
                return TimestampedValue.of(kv.getValue(), instant);
            }, end, timerMap, timerMap2, valueState, -1L, outputReceiver)) {
                timerMap.get(windowToTimerTag(end)).clear();
            }
        }

        private String windowToTimerTag(Instant instant) {
            return Long.toString(instant.getMillis());
        }

        private Instant windowFromTimerTag(String str) {
            return Instant.ofEpochMilli(Long.parseLong(str));
        }

        @DoFn.OnTimerFamily("gapTimers")
        public void onTimer(@DoFn.TimerId String str, @DoFn.Timestamp Instant instant, @DoFn.TimerFamily("gapTimers") TimerMap timerMap, @DoFn.TimerFamily("gcTimers") TimerMap timerMap2, @DoFn.StateId("seenBuckets") @DoFn.AlwaysFetched ValueState<SortedMap<Instant, TimestampedValue<ValueT>>> valueState, @DoFn.StateId("gapDurationMap") @DoFn.AlwaysFetched ValueState<SortedMap<Instant, Long>> valueState2, DoFn.OutputReceiver<ValueT> outputReceiver) {
            Instant windowFromTimerTag = windowFromTimerTag(str);
            Instant minus = windowFromTimerTag.minus(Duration.millis(1L));
            Instant minus2 = windowFromTimerTag.minus(this.bucketWindows.getSize());
            Instant minus3 = minus2.minus(Duration.millis(1L));
            Map map = (Map) valueState.read();
            if (map == null) {
                throw new RuntimeException("Unexpected timer fired with no seenBucketMap.");
            }
            SortedMap sortedMap = (SortedMap) valueState2.read();
            long j = 0;
            if (sortedMap != null) {
                j = ((Long) sortedMap.getOrDefault(minus2, 0L)).longValue();
            }
            processEvent(() -> {
                TimestampedValue timestampedValue = (TimestampedValue) map.get(minus2);
                if (timestampedValue == null) {
                    throw new RuntimeException("Processing bucket for " + windowFromTimerTag + " before processing bucket for " + minus2);
                }
                Object value = timestampedValue.getValue();
                if (this.interpolateFunction != null) {
                    IntervalWindow assignWindow = this.bucketWindows.assignWindow(minus3);
                    IntervalWindow assignWindow2 = this.bucketWindows.assignWindow(minus);
                    Preconditions.checkState(!assignWindow2.equals(assignWindow));
                    value = this.interpolateFunction.apply(new AutoValue_FillGaps_InterpolateData(timestampedValue, assignWindow, assignWindow2));
                }
                return TimestampedValue.of(value, minus);
            }, windowFromTimerTag, timerMap, timerMap2, valueState, j, outputReceiver);
            if (map.containsKey(windowFromTimerTag.plus(this.bucketWindows.getSize()))) {
                return;
            }
            if (sortedMap == null) {
                sortedMap = Maps.newTreeMap();
            }
            sortedMap.put(windowFromTimerTag, Long.valueOf(j + 1));
            valueState2.write(sortedMap);
        }

        @DoFn.OnTimerFamily("gcTimers")
        public void onGcTimer(@DoFn.Timestamp Instant instant, @DoFn.StateId("seenBuckets") @DoFn.AlwaysFetched ValueState<SortedMap<Instant, TimestampedValue<ValueT>>> valueState, @DoFn.StateId("gapDurationMap") @DoFn.AlwaysFetched ValueState<SortedMap<Instant, Long>> valueState2) {
            gcMap(valueState, instant.minus(this.gcWindows.getSize()));
            gcMap(valueState2, instant.minus(this.gcWindows.getSize()));
        }

        private boolean processEvent(Supplier<TimestampedValue<ValueT>> supplier, Instant instant, TimerMap timerMap, TimerMap timerMap2, ValueState<SortedMap<Instant, TimestampedValue<ValueT>>> valueState, long j, DoFn.OutputReceiver<ValueT> outputReceiver) {
            TimestampedValue<ValueT> timestampedValue = supplier.get();
            outputReceiver.outputWithTimestamp(timestampedValue.getValue(), timestampedValue.getTimestamp());
            boolean z = true;
            TimestampedValue<ValueT> timestampedValue2 = timestampedValue;
            SortedMap sortedMap = (SortedMap) valueState.read();
            if (sortedMap == null) {
                sortedMap = Maps.newTreeMap();
            } else {
                TimestampedValue timestampedValue3 = (TimestampedValue) sortedMap.get(instant);
                if (timestampedValue3 != null) {
                    timestampedValue2 = (TimestampedValue) this.mergeValues.apply(timestampedValue3, timestampedValue);
                    z = false;
                }
            }
            sortedMap.put(instant, timestampedValue2);
            valueState.write(sortedMap);
            if (z) {
                Instant plus = instant.plus(this.bucketWindows.getSize());
                if (plus.minus(Duration.millis(1L)).isBefore(this.stopTime) && j + 1 < this.maxGapFillBuckets && !sortedMap.containsKey(plus)) {
                    timerMap.get(windowToTimerTag(plus)).withOutputTimestamp(instant).set(plus);
                }
                Instant end = this.gcWindows.assignWindow(plus).end();
                timerMap2.get(windowToTimerTag(end)).set(end);
            }
            return z;
        }

        private static <V> void gcMap(ValueState<SortedMap<Instant, V>> valueState, Instant instant) {
            SortedMap sortedMap = (SortedMap) valueState.read();
            if (sortedMap != null) {
                sortedMap.headMap(instant).clear();
                if (sortedMap.isEmpty()) {
                    valueState.clear();
                } else {
                    valueState.write(sortedMap);
                }
            }
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/extensions/timeseries/FillGaps$InterpolateData.class */
    public static abstract class InterpolateData<ValueT> {
        public abstract TimestampedValue<ValueT> getValue();

        public abstract BoundedWindow getPreviousWindow();

        public abstract BoundedWindow getNextWindow();
    }

    public static <ValueT> SerializableBiFunction<TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>> keepLatest() {
        return (timestampedValue, timestampedValue2) -> {
            return timestampedValue.getTimestamp().isAfter(timestampedValue2.getTimestamp()) ? timestampedValue : timestampedValue2;
        };
    }

    public static <ValueT> SerializableBiFunction<TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>> keepEarliest() {
        return (timestampedValue, timestampedValue2) -> {
            return timestampedValue.getTimestamp().isAfter(timestampedValue2.getTimestamp()) ? timestampedValue2 : timestampedValue;
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract Duration getTimeseriesBucketDuration();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract Long getMaxGapFillBuckets();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract Instant getStopTime();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract FieldAccessDescriptor getKeyDescriptor();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract SerializableBiFunction<TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>> getMergeValues();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract int getGcEveryNBuckets();

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nullable
    public abstract SerializableFunction<InterpolateData<ValueT>, ValueT> getInterpolateFunction();

    abstract Builder<ValueT> toBuilder();

    public static <ValueT> FillGaps<ValueT> of(Duration duration, String... strArr) {
        return of(duration, FieldAccessDescriptor.withFieldNames(strArr));
    }

    public static <ValueT> FillGaps<ValueT> of(Duration duration, FieldAccessDescriptor fieldAccessDescriptor) {
        return new AutoValue_FillGaps.Builder().setTimeseriesBucketDuration(duration).setMaxGapFillBuckets(Long.MAX_VALUE).setStopTime(BoundedWindow.TIMESTAMP_MAX_VALUE).setKeyDescriptor(fieldAccessDescriptor).setMergeValues(keepLatest()).setGcEveryNBuckets(GC_EVERY_N_BUCKETS).build();
    }

    FillGaps<ValueT> withMaxGapFillBuckets(Long l) {
        return toBuilder().setMaxGapFillBuckets(l).build();
    }

    FillGaps<ValueT> withStopTime(Instant instant) {
        return toBuilder().setStopTime(instant).build();
    }

    FillGaps<ValueT> withMergeFunction(SerializableBiFunction<TimestampedValue<ValueT>, TimestampedValue<ValueT>, TimestampedValue<ValueT>> serializableBiFunction) {
        return toBuilder().setMergeValues(serializableBiFunction).build();
    }

    FillGaps<ValueT> withInterpolateFunction(SerializableFunction<InterpolateData<ValueT>, ValueT> serializableFunction) {
        return toBuilder().setInterpolateFunction(serializableFunction).build();
    }

    public PCollection<ValueT> expand(PCollection<ValueT> pCollection) {
        if (!pCollection.hasSchema()) {
            throw new RuntimeException("The input to FillGaps must have a schema.");
        }
        FixedWindows of = FixedWindows.of(getTimeseriesBucketDuration());
        return pCollection.apply("FixedWindow", Window.into(of)).apply("withKeys", WithKeys.of(getKeyDescriptor())).apply("globalWindow", Window.into(new GlobalWindows())).apply("fillGaps", ParDo.of(new FillGapsDoFn(of, pCollection.getCoder(), getStopTime(), getMaxGapFillBuckets().longValue(), getMergeValues(), getInterpolateFunction(), getGcEveryNBuckets()))).apply("applyOriginalWindow", Window.into(pCollection.getWindowingStrategy().getWindowFn())).setCoder(pCollection.getCoder());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 627010506:
                if (implMethodName.equals("lambda$keepEarliest$d118d62$1")) {
                    z = false;
                    break;
                }
                break;
            case 930352510:
                if (implMethodName.equals("lambda$keepLatest$d118d62$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableBiFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/extensions/timeseries/FillGaps") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/values/TimestampedValue;Lorg/apache/beam/sdk/values/TimestampedValue;)Lorg/apache/beam/sdk/values/TimestampedValue;")) {
                    return (timestampedValue, timestampedValue2) -> {
                        return timestampedValue.getTimestamp().isAfter(timestampedValue2.getTimestamp()) ? timestampedValue2 : timestampedValue;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableBiFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/extensions/timeseries/FillGaps") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/values/TimestampedValue;Lorg/apache/beam/sdk/values/TimestampedValue;)Lorg/apache/beam/sdk/values/TimestampedValue;")) {
                    return (timestampedValue3, timestampedValue22) -> {
                        return timestampedValue3.getTimestamp().isAfter(timestampedValue22.getTimestamp()) ? timestampedValue3 : timestampedValue22;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
