package org.apache.beam.sdk.io.pulsar;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.pulsar.PulsarIO;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DoFn.UnboundedPerElement
/* loaded from: input_file:org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.class */
public class ReadFromPulsarDoFn extends DoFn<PulsarSourceDescriptor, PulsarMessage> {
    private static final Logger LOG = LoggerFactory.getLogger(ReadFromPulsarDoFn.class);
    private SerializableFunction<String, PulsarClient> pulsarClientSerializableFunction;
    private PulsarClient client;
    private PulsarAdmin admin;
    private String clientUrl;
    private String adminUrl;
    private final SerializableFunction<Message<byte[]>, Instant> extractOutputTimestampFn;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn$PulsarLatestOffsetEstimator.class */
    public static class PulsarLatestOffsetEstimator implements GrowableOffsetRangeTracker.RangeEndEstimator {
        private final Supplier<Message> memoizedBacklog;

        private PulsarLatestOffsetEstimator(PulsarAdmin pulsarAdmin, String str) {
            this.memoizedBacklog = Suppliers.memoizeWithExpiration(() -> {
                try {
                    return pulsarAdmin.topics().examineMessage(str, "latest", 1L);
                } catch (PulsarAdminException e) {
                    ReadFromPulsarDoFn.LOG.error(e.getMessage());
                    throw new RuntimeException((Throwable) e);
                }
            }, 1L, TimeUnit.SECONDS);
        }

        public long estimate() {
            return ((Message) this.memoizedBacklog.get()).getPublishTime();
        }
    }

    public ReadFromPulsarDoFn(PulsarIO.Read read) {
        this.extractOutputTimestampFn = read.getExtractOutputTimestampFn();
        this.clientUrl = read.getClientUrl();
        this.adminUrl = read.getAdminUrl();
        this.pulsarClientSerializableFunction = read.getPulsarClient();
    }

    @DoFn.Setup
    public void initPulsarClients() throws Exception {
        if (this.clientUrl == null) {
            this.clientUrl = PulsarIOUtils.SERVICE_URL;
        }
        if (this.adminUrl == null) {
            this.adminUrl = PulsarIOUtils.SERVICE_HTTP_URL;
        }
        if (this.client == null) {
            this.client = (PulsarClient) this.pulsarClientSerializableFunction.apply(this.clientUrl);
            if (this.client == null) {
                this.client = PulsarClient.builder().serviceUrl(this.clientUrl).build();
            }
        }
        if (this.admin == null) {
            this.admin = PulsarAdmin.builder().serviceHttpUrl(this.adminUrl).tlsTrustCertsFilePath((String) null).allowTlsInsecureConnection(false).build();
        }
    }

    @DoFn.Teardown
    public void teardown() throws Exception {
        this.client.close();
        this.admin.close();
    }

    @DoFn.GetInitialRestriction
    public OffsetRange getInitialRestriction(@DoFn.Element PulsarSourceDescriptor pulsarSourceDescriptor) {
        long j = 0;
        long j2 = Long.MAX_VALUE;
        if (pulsarSourceDescriptor.getStartOffset() != null) {
            j = pulsarSourceDescriptor.getStartOffset().longValue();
        }
        if (pulsarSourceDescriptor.getEndOffset() != null) {
            j2 = pulsarSourceDescriptor.getEndOffset().longValue();
        }
        return new OffsetRange(j, j2);
    }

    @DoFn.GetSize
    public double getSize(@DoFn.Element PulsarSourceDescriptor pulsarSourceDescriptor, @DoFn.Restriction OffsetRange offsetRange) {
        return restrictionTracker(pulsarSourceDescriptor, offsetRange).getProgress().getWorkRemaining();
    }

    public Reader<byte[]> newReader(PulsarClient pulsarClient, String str) throws PulsarClientException {
        return pulsarClient.newReader().topic(str).startMessageId(MessageId.earliest).create();
    }

    @DoFn.GetRestrictionCoder
    public Coder<OffsetRange> getRestrictionCoder() {
        return new OffsetRange.Coder();
    }

    @DoFn.ProcessElement
    public DoFn.ProcessContinuation processElement(@DoFn.Element PulsarSourceDescriptor pulsarSourceDescriptor, RestrictionTracker<OffsetRange, Long> restrictionTracker, WatermarkEstimator watermarkEstimator, DoFn.OutputReceiver<PulsarMessage> outputReceiver) throws IOException {
        long from = ((OffsetRange) restrictionTracker.currentRestriction()).getFrom();
        Reader<byte[]> newReader = newReader(this.client, pulsarSourceDescriptor.getTopic());
        Throwable th = null;
        try {
            if (from > 0) {
                newReader.seek(from);
            }
            while (!newReader.hasReachedEndOfTopic()) {
                Message readNext = newReader.readNext();
                if (readNext == null) {
                    DoFn.ProcessContinuation resume = DoFn.ProcessContinuation.resume();
                    if (newReader != null) {
                        if (0 != 0) {
                            try {
                                newReader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            newReader.close();
                        }
                    }
                    return resume;
                }
                if (!restrictionTracker.tryClaim(Long.valueOf(readNext.getPublishTime()))) {
                    newReader.close();
                    DoFn.ProcessContinuation stop = DoFn.ProcessContinuation.stop();
                    if (newReader != null) {
                        if (0 != 0) {
                            try {
                                newReader.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            newReader.close();
                        }
                    }
                    return stop;
                }
                if (pulsarSourceDescriptor.getEndMessageId() != null) {
                    if (readNext.getMessageId().compareTo(pulsarSourceDescriptor.getEndMessageId()) == 0) {
                        DoFn.ProcessContinuation stop2 = DoFn.ProcessContinuation.stop();
                        if (newReader != null) {
                            if (0 != 0) {
                                try {
                                    newReader.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                newReader.close();
                            }
                        }
                        return stop2;
                    }
                }
                outputReceiver.outputWithTimestamp(new PulsarMessage(readNext.getTopicName(), Long.valueOf(readNext.getPublishTime()), readNext), (Instant) this.extractOutputTimestampFn.apply(readNext));
            }
            newReader.close();
            DoFn.ProcessContinuation stop3 = DoFn.ProcessContinuation.stop();
            if (newReader != null) {
                if (0 != 0) {
                    try {
                        newReader.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    newReader.close();
                }
            }
            return stop3;
        } catch (Throwable th6) {
            if (newReader != null) {
                if (0 != 0) {
                    try {
                        newReader.close();
                    } catch (Throwable th7) {
                        th.addSuppressed(th7);
                    }
                } else {
                    newReader.close();
                }
            }
            throw th6;
        }
    }

    @DoFn.GetInitialWatermarkEstimatorState
    public Instant getInitialWatermarkEstimatorState(@DoFn.Timestamp Instant instant) {
        return instant;
    }

    @DoFn.NewWatermarkEstimator
    public WatermarkEstimator<Instant> newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant instant) {
        return new WatermarkEstimators.MonotonicallyIncreasing(ensureTimestampWithinBounds(instant));
    }

    @DoFn.NewTracker
    public OffsetRangeTracker restrictionTracker(@DoFn.Element PulsarSourceDescriptor pulsarSourceDescriptor, @DoFn.Restriction OffsetRange offsetRange) {
        if (offsetRange.getTo() < Long.MAX_VALUE) {
            return new OffsetRangeTracker(offsetRange);
        }
        return new GrowableOffsetRangeTracker(offsetRange.getFrom(), new PulsarLatestOffsetEstimator(this.admin, pulsarSourceDescriptor.getTopic()));
    }

    private static Instant ensureTimestampWithinBounds(Instant instant) {
        if (instant.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
            instant = BoundedWindow.TIMESTAMP_MIN_VALUE;
        } else if (instant.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            instant = BoundedWindow.TIMESTAMP_MAX_VALUE;
        }
        return instant;
    }
}
