package org.apache.beam.sdk.io.sparkreceiver;

import java.lang.invoke.SerializedLambda;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.receiver.Receiver;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@DoFn.UnboundedPerElement
/* loaded from: input_file:org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFn.class */
public class ReadFromSparkReceiverWithOffsetDoFn<V> extends DoFn<byte[], V> {
    private static final Logger LOG = LoggerFactory.getLogger(ReadFromSparkReceiverWithOffsetDoFn.class);
    private static final int START_POLL_TIMEOUT_MS = 1000;
    private final SerializableFunction<Instant, WatermarkEstimator<Instant>> createWatermarkEstimatorFn = WatermarkEstimators.Manual::new;
    private final SerializableFunction<V, Long> getOffsetFn;
    private final SerializableFunction<V, Instant> getTimestampFn;
    private final ReceiverBuilder<V, ? extends Receiver<V>> sparkReceiverBuilder;

    /* loaded from: input_file:org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFn$SparkConsumerWithOffset.class */
    private static class SparkConsumerWithOffset<V> implements SparkConsumer<V> {
        private final Queue<V> recordsQueue = new ConcurrentLinkedQueue();
        private Receiver<V> sparkReceiver;
        private final Long startOffset;

        SparkConsumerWithOffset(Long l) {
            this.startOffset = l;
        }

        @Override // org.apache.beam.sdk.io.sparkreceiver.SparkConsumer
        public boolean hasRecords() {
            return !this.recordsQueue.isEmpty();
        }

        @Override // org.apache.beam.sdk.io.sparkreceiver.SparkConsumer
        public V poll() {
            return this.recordsQueue.poll();
        }

        @Override // org.apache.beam.sdk.io.sparkreceiver.SparkConsumer
        public void start(Receiver<V> receiver) {
            this.sparkReceiver = receiver;
            try {
                new WrappedSupervisor(receiver, new SparkConf(), objArr -> {
                    this.recordsQueue.offer(objArr[0]);
                    return null;
                });
                ((HasOffset) receiver).setStartOffset(this.startOffset);
                receiver.supervisor().startReceiver();
                try {
                    TimeUnit.MILLISECONDS.sleep(1000L);
                } catch (InterruptedException e) {
                    ReadFromSparkReceiverWithOffsetDoFn.LOG.error("SparkReceiver was interrupted before polling started", e);
                    throw new IllegalStateException("Spark Receiver was interrupted before polling started");
                }
            } catch (Exception e2) {
                ReadFromSparkReceiverWithOffsetDoFn.LOG.error("Can not init Spark Receiver!", e2);
                throw new IllegalStateException("Spark Receiver was not initialized");
            }
        }

        @Override // org.apache.beam.sdk.io.sparkreceiver.SparkConsumer
        public void stop() {
            if (this.sparkReceiver != null) {
                this.sparkReceiver.stop("SparkReceiver is stopped.");
            }
            this.recordsQueue.clear();
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -272880709:
                    if (implMethodName.equals("lambda$start$cba0f7d4$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFn$SparkConsumerWithOffset") && serializedLambda.getImplMethodSignature().equals("([Ljava/lang/Object;)Ljava/lang/Void;")) {
                        SparkConsumerWithOffset sparkConsumerWithOffset = (SparkConsumerWithOffset) serializedLambda.getCapturedArg(0);
                        return objArr -> {
                            this.recordsQueue.offer(objArr[0]);
                            return null;
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReadFromSparkReceiverWithOffsetDoFn(SparkReceiverIO.Read<V> read) {
        ReceiverBuilder<V, ? extends Receiver<V>> sparkReceiverBuilder = read.getSparkReceiverBuilder();
        Preconditions.checkStateNotNull(sparkReceiverBuilder, "Spark Receiver Builder can't be null!");
        this.sparkReceiverBuilder = sparkReceiverBuilder;
        SerializableFunction<V, Long> getOffsetFn = read.getGetOffsetFn();
        Preconditions.checkStateNotNull(getOffsetFn, "Get offset fn can't be null!");
        this.getOffsetFn = getOffsetFn;
        SerializableFunction<V, Instant> timestampFn = read.getTimestampFn();
        this.getTimestampFn = timestampFn == null ? obj -> {
            return Instant.now();
        } : timestampFn;
    }

    @DoFn.GetInitialRestriction
    public OffsetRange initialRestriction(@DoFn.Element byte[] bArr) {
        return new OffsetRange(0L, Long.MAX_VALUE);
    }

    @DoFn.GetInitialWatermarkEstimatorState
    public Instant getInitialWatermarkEstimatorState(@DoFn.Timestamp Instant instant) {
        return instant;
    }

    @DoFn.NewWatermarkEstimator
    public WatermarkEstimator<Instant> newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant instant) {
        return (WatermarkEstimator) this.createWatermarkEstimatorFn.apply(ensureTimestampWithinBounds(instant));
    }

    @DoFn.GetSize
    public double getSize(@DoFn.Element byte[] bArr, @DoFn.Restriction OffsetRange offsetRange) {
        return restrictionTracker(bArr, offsetRange).getProgress().getWorkRemaining();
    }

    @DoFn.NewTracker
    public OffsetRangeTracker restrictionTracker(@DoFn.Element byte[] bArr, @DoFn.Restriction OffsetRange offsetRange) {
        return new OffsetRangeTracker(offsetRange);
    }

    @DoFn.GetRestrictionCoder
    public Coder<OffsetRange> restrictionCoder() {
        return new OffsetRange.Coder();
    }

    @DoFn.ProcessElement
    public DoFn.ProcessContinuation processElement(@DoFn.Element byte[] bArr, RestrictionTracker<OffsetRange, Long> restrictionTracker, WatermarkEstimator<Instant> watermarkEstimator, DoFn.OutputReceiver<V> outputReceiver) {
        try {
            Receiver<V> build = this.sparkReceiverBuilder.build();
            SparkConsumerWithOffset sparkConsumerWithOffset = new SparkConsumerWithOffset(Long.valueOf(((OffsetRange) restrictionTracker.currentRestriction()).getFrom()));
            sparkConsumerWithOffset.start(build);
            while (sparkConsumerWithOffset.hasRecords()) {
                V poll = sparkConsumerWithOffset.poll();
                if (poll != null) {
                    if (!restrictionTracker.tryClaim((Long) this.getOffsetFn.apply(poll))) {
                        sparkConsumerWithOffset.stop();
                        LOG.debug("Stop for restriction: {}", ((OffsetRange) restrictionTracker.currentRestriction()).toString());
                        return DoFn.ProcessContinuation.stop();
                    }
                    Instant instant = (Instant) this.getTimestampFn.apply(poll);
                    ((ManualWatermarkEstimator) watermarkEstimator).setWatermark(instant);
                    outputReceiver.outputWithTimestamp(poll, instant);
                }
            }
            sparkConsumerWithOffset.stop();
            LOG.debug("Resume for restriction: {}", ((OffsetRange) restrictionTracker.currentRestriction()).toString());
            return DoFn.ProcessContinuation.resume();
        } catch (Exception e) {
            LOG.error("Can not build Spark Receiver", e);
            throw new IllegalStateException("Spark Receiver was not built!");
        }
    }

    private static Instant ensureTimestampWithinBounds(Instant instant) {
        if (instant.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
            instant = BoundedWindow.TIMESTAMP_MIN_VALUE;
            LOG.debug("Timestamp was before MIN_VALUE({})", BoundedWindow.TIMESTAMP_MIN_VALUE);
        } else if (instant.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            instant = BoundedWindow.TIMESTAMP_MAX_VALUE;
            LOG.debug("Timestamp was after MAX_VALUE({})", BoundedWindow.TIMESTAMP_MAX_VALUE);
        }
        return instant;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1624069335:
                if (implMethodName.equals("lambda$new$2eee71c1$1")) {
                    z = false;
                    break;
                }
                break;
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFn") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Lorg/joda/time/Instant;")) {
                    return obj -> {
                        return Instant.now();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators$Manual") && serializedLambda.getImplMethodSignature().equals("(Lorg/joda/time/Instant;)V")) {
                    return WatermarkEstimators.Manual::new;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
