package org.apache.beam.sdk.transforms;

import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.joda.time.Duration;

/* loaded from: input_file:org/apache/beam/sdk/transforms/Deduplicate.class */
public final class Deduplicate {
    public static final TimeDomain DEFAULT_TIME_DOMAIN = TimeDomain.PROCESSING_TIME;
    public static final Duration DEFAULT_DURATION = Duration.standardMinutes(10);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/transforms/Deduplicate$DeduplicateFn.class */
    public static class DeduplicateFn<K, V> extends DoFn<KV<K, V>, KV<K, V>> {
        private static final String EXPIRY_TIMER = "expiryTimer";
        private static final String SEEN_STATE = "seen";

        @DoFn.TimerId(EXPIRY_TIMER)
        private final TimerSpec expiryTimerSpec;

        @DoFn.StateId(SEEN_STATE)
        private final StateSpec<ValueState<Boolean>> seenState;
        private final Duration duration;

        private DeduplicateFn(TimeDomain timeDomain, Duration duration) {
            this.seenState = StateSpecs.value(BooleanCoder.of());
            this.expiryTimerSpec = TimerSpecs.timer(timeDomain);
            this.duration = duration;
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<K, V> kv, DoFn.OutputReceiver<KV<K, V>> outputReceiver, @DoFn.StateId("seen") ValueState<Boolean> valueState, @DoFn.TimerId("expiryTimer") Timer timer) {
            if (valueState.read() == null) {
                timer.offset(this.duration).withNoOutputTimestamp().setRelative();
                valueState.write(true);
                outputReceiver.output(kv);
            }
        }

        @DoFn.OnTimer(EXPIRY_TIMER)
        public void onExpiry(DoFn<KV<K, V>, KV<K, V>>.OnTimerContext onTimerContext, @DoFn.StateId("seen") ValueState<Boolean> valueState) {
            valueState.clear();
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/Deduplicate$KeyedValues.class */
    public static final class KeyedValues<K, V> extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
        private final TimeDomain timeDomain;
        private final Duration duration;

        private KeyedValues(TimeDomain timeDomain, Duration duration) {
            this.timeDomain = timeDomain;
            this.duration = duration;
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        /* renamed from: expand, reason: merged with bridge method [inline-methods] */
        public PCollection<KV<K, V>> mo321expand(PCollection<KV<K, V>> pCollection) {
            return (PCollection) pCollection.apply(ParDo.of(new DeduplicateFn(this.timeDomain, this.duration)));
        }

        public KeyedValues<K, V> withTimeDomain(TimeDomain timeDomain) {
            return new KeyedValues<>(timeDomain, this.duration);
        }

        public KeyedValues<K, V> withDuration(Duration duration) {
            return new KeyedValues<>(this.timeDomain, duration);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/Deduplicate$Values.class */
    public static final class Values<T> extends PTransform<PCollection<T>, PCollection<T>> {
        private final TimeDomain timeDomain;
        private final Duration duration;

        private Values(TimeDomain timeDomain, Duration duration) {
            this.timeDomain = timeDomain;
            this.duration = duration;
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        /* renamed from: expand, reason: merged with bridge method [inline-methods] */
        public PCollection<T> mo321expand(PCollection<T> pCollection) {
            return (PCollection) ((PCollection) ((PCollection) pCollection.apply("KeyByElement", MapElements.via((SimpleFunction) new SimpleFunction<T, KV<T, Void>>() { // from class: org.apache.beam.sdk.transforms.Deduplicate.Values.1
                @Override // org.apache.beam.sdk.transforms.SimpleFunction, org.apache.beam.sdk.transforms.InferableFunction, org.apache.beam.sdk.transforms.ProcessFunction
                public KV<T, Void> apply(T t) {
                    return KV.of(t, (Void) null);
                }

                @Override // org.apache.beam.sdk.transforms.SimpleFunction, org.apache.beam.sdk.transforms.InferableFunction, org.apache.beam.sdk.transforms.ProcessFunction
                public /* bridge */ /* synthetic */ Object apply(Object obj) {
                    return apply((AnonymousClass1) obj);
                }
            }))).apply(new KeyedValues(this.timeDomain, this.duration))).apply(Keys.create());
        }

        public Values<T> withTimeDomain(TimeDomain timeDomain) {
            return new Values<>(timeDomain, this.duration);
        }

        public Values<T> withDuration(Duration duration) {
            return new Values<>(this.timeDomain, duration);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/Deduplicate$WithRepresentativeValues.class */
    public static final class WithRepresentativeValues<T, IdT> extends PTransform<PCollection<T>, PCollection<T>> {
        private final SerializableFunction<T, IdT> fn;
        private final TypeDescriptor<IdT> type;
        private final Coder<IdT> coder;
        private final TimeDomain timeDomain;
        private final Duration duration;

        private WithRepresentativeValues(TimeDomain timeDomain, Duration duration, SerializableFunction<T, IdT> serializableFunction, TypeDescriptor<IdT> typeDescriptor, Coder<IdT> coder) {
            this.timeDomain = timeDomain;
            this.duration = duration;
            this.fn = serializableFunction;
            this.type = typeDescriptor;
            this.coder = coder;
        }

        public WithRepresentativeValues<T, IdT> withRepresentativeType(TypeDescriptor<IdT> typeDescriptor) {
            return new WithRepresentativeValues<>(this.timeDomain, this.duration, this.fn, typeDescriptor, null);
        }

        public WithRepresentativeValues<T, IdT> withRepresentativeCoder(Coder<IdT> coder) {
            return new WithRepresentativeValues<>(this.timeDomain, this.duration, this.fn, null, coder);
        }

        public WithRepresentativeValues<T, IdT> withTimeDomain(TimeDomain timeDomain) {
            return new WithRepresentativeValues<>(timeDomain, this.duration, this.fn, this.type, this.coder);
        }

        public WithRepresentativeValues<T, IdT> withDuration(Duration duration) {
            return new WithRepresentativeValues<>(this.timeDomain, duration, this.fn, this.type, this.coder);
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        /* renamed from: expand, reason: merged with bridge method [inline-methods] */
        public PCollection<T> mo321expand(PCollection<T> pCollection) {
            WithKeys of = WithKeys.of((SerializableFunction) this.fn);
            if (this.type != null) {
                of = of.withKeyType(this.type);
            }
            PCollection pCollection2 = (PCollection) pCollection.apply(of);
            if (this.coder != null) {
                pCollection2.setCoder(KvCoder.of(this.coder, pCollection.getCoder()));
            }
            return (PCollection) ((PCollection) pCollection2.apply(new KeyedValues(this.timeDomain, this.duration))).apply(org.apache.beam.sdk.transforms.Values.create());
        }
    }

    public static <T> Values<T> values() {
        return new Values<>(DEFAULT_TIME_DOMAIN, DEFAULT_DURATION);
    }

    public static <K, V> KeyedValues<K, V> keyedValues() {
        return new KeyedValues<>(DEFAULT_TIME_DOMAIN, DEFAULT_DURATION);
    }

    public static <T, IdT> WithRepresentativeValues<T, IdT> withRepresentativeValueFn(SerializableFunction<T, IdT> serializableFunction) {
        return new WithRepresentativeValues<>(DEFAULT_TIME_DOMAIN, DEFAULT_DURATION, serializableFunction, null, null);
    }

    private Deduplicate() {
    }
}
