package org.apache.beam.runners.samza.adapter;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.metrics.FnWithMetricsWrapper;
import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.class */
public class UnboundedSourceSystem {
    private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSystem.class);
    private static final IncomingMessageEnvelope CHECK_LAST_EXCEPTION_ENVELOPE = new IncomingMessageEnvelope((SystemStreamPartition) null, (String) null, (Object) null, (Object) null);

    /* loaded from: input_file:org/apache/beam/runners/samza/adapter/UnboundedSourceSystem$Admin.class */
    public static class Admin<T, CheckpointMarkT extends UnboundedSource.CheckpointMark> implements SystemAdmin {
        private final UnboundedSource<T, CheckpointMarkT> source;
        private final SamzaPipelineOptions pipelineOptions;

        public Admin(UnboundedSource<T, CheckpointMarkT> unboundedSource, SamzaPipelineOptions samzaPipelineOptions) {
            this.source = unboundedSource;
            this.pipelineOptions = samzaPipelineOptions;
        }

        public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> map) {
            return map;
        }

        public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> set) {
            return (Map) set.stream().collect(Collectors.toMap(Function.identity(), str -> {
                try {
                    List split = UnboundedSourceSystem.split(this.source, this.pipelineOptions);
                    HashMap hashMap = new HashMap();
                    for (int i = 0; i < split.size(); i++) {
                        hashMap.put(new Partition(i), new SystemStreamMetadata.SystemStreamPartitionMetadata((String) null, (String) null, (String) null));
                    }
                    return new SystemStreamMetadata(str, hashMap);
                } catch (Exception e) {
                    throw new SamzaException("Fail to read stream metadata", e);
                }
            }));
        }

        public Integer offsetComparator(String str, String str2) {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/samza/adapter/UnboundedSourceSystem$Consumer.class */
    public static class Consumer<T, CheckpointMarkT extends UnboundedSource.CheckpointMark> implements SystemConsumer {
        private static final Logger LOG;
        private static final AtomicInteger NEXT_ID;
        private final Coder<CheckpointMarkT> checkpointMarkCoder;
        private final List<UnboundedSource<T, CheckpointMarkT>> splits;
        private final SamzaPipelineOptions pipelineOptions;
        private final Map<UnboundedSource.UnboundedReader, SystemStreamPartition> readerToSsp = new HashMap();
        private final SamzaMetricsContainer metricsContainer;
        private final String stepName;
        private ReaderTask<T, CheckpointMarkT> readerTask;
        static final /* synthetic */ boolean $assertionsDisabled;

        /* loaded from: input_file:org/apache/beam/runners/samza/adapter/UnboundedSourceSystem$Consumer$ReaderTask.class */
        private static class ReaderTask<T, CheckpointMarkT extends UnboundedSource.CheckpointMark> implements Runnable {
            private final Map<UnboundedSource.UnboundedReader, SystemStreamPartition> readerToSsp;
            private final List<UnboundedSource.UnboundedReader> readers;
            private final Coder<CheckpointMarkT> checkpointMarkCoder;
            private final Map<SystemStreamPartition, Instant> currentWatermarks;
            private final Map<SystemStreamPartition, LinkedBlockingQueue<IncomingMessageEnvelope>> queues;
            private final long watermarkInterval;
            private final Semaphore available;
            private final FnWithMetricsWrapper metricsWrapper;
            private volatile boolean running;
            private volatile Exception lastException;
            private long lastWatermarkTime;

            private ReaderTask(Map<UnboundedSource.UnboundedReader, SystemStreamPartition> map, Coder<CheckpointMarkT> coder, int i, long j, FnWithMetricsWrapper fnWithMetricsWrapper) {
                this.currentWatermarks = new HashMap();
                this.lastWatermarkTime = 0L;
                this.readerToSsp = map;
                this.checkpointMarkCoder = coder;
                this.readers = ImmutableList.copyOf(map.keySet());
                this.watermarkInterval = j;
                this.available = new Semaphore(i);
                this.metricsWrapper = fnWithMetricsWrapper;
                HashMap hashMap = new HashMap();
                map.values().forEach(systemStreamPartition -> {
                });
                this.queues = ImmutableMap.copyOf(hashMap);
            }

            @Override // java.lang.Runnable
            public void run() {
                this.running = true;
                try {
                    try {
                        for (UnboundedSource.UnboundedReader unboundedReader : this.readers) {
                            Objects.requireNonNull(unboundedReader);
                            if (((Boolean) invoke(unboundedReader::start)).booleanValue()) {
                                this.available.acquire();
                                enqueueMessage(unboundedReader);
                            }
                        }
                        while (this.running) {
                            boolean z = false;
                            for (UnboundedSource.UnboundedReader unboundedReader2 : this.readers) {
                                Objects.requireNonNull(unboundedReader2);
                                if (((Boolean) invoke(unboundedReader2::advance)).booleanValue()) {
                                    while (!this.available.tryAcquire(1, Math.max((this.lastWatermarkTime + this.watermarkInterval) - System.currentTimeMillis(), 1L), TimeUnit.MILLISECONDS)) {
                                        updateWatermark();
                                    }
                                    enqueueMessage(unboundedReader2);
                                    z = true;
                                }
                            }
                            updateWatermark();
                            if (!z) {
                                Thread.sleep(50L);
                            }
                        }
                    } catch (Exception e) {
                        this.lastException = e;
                        this.running = false;
                        this.readers.forEach(unboundedReader3 -> {
                            try {
                                unboundedReader3.close();
                            } catch (IOException e2) {
                                Consumer.LOG.error("Reader task failed to close reader", e2);
                            }
                        });
                    }
                    if (this.lastException != null) {
                        this.queues.values().forEach(linkedBlockingQueue -> {
                            linkedBlockingQueue.clear();
                            linkedBlockingQueue.add(UnboundedSourceSystem.CHECK_LAST_EXCEPTION_ENVELOPE);
                        });
                    }
                } finally {
                    this.readers.forEach(unboundedReader32 -> {
                        try {
                            unboundedReader32.close();
                        } catch (IOException e2) {
                            Consumer.LOG.error("Reader task failed to close reader", e2);
                        }
                    });
                }
            }

            private <X> X invoke(FnWithMetricsWrapper.SupplierWithException<X> supplierWithException) throws Exception {
                return this.metricsWrapper != null ? (X) this.metricsWrapper.wrap(supplierWithException, true) : supplierWithException.get();
            }

            private void updateWatermark() throws InterruptedException {
                long currentTimeMillis = System.currentTimeMillis();
                if (currentTimeMillis - this.lastWatermarkTime > this.watermarkInterval) {
                    for (UnboundedSource.UnboundedReader unboundedReader : this.readers) {
                        SystemStreamPartition systemStreamPartition = this.readerToSsp.get(unboundedReader);
                        Instant instant = this.currentWatermarks.containsKey(systemStreamPartition) ? this.currentWatermarks.get(systemStreamPartition) : BoundedWindow.TIMESTAMP_MIN_VALUE;
                        Instant watermark = unboundedReader.getWatermark();
                        if (instant.isBefore(watermark)) {
                            this.currentWatermarks.put(systemStreamPartition, watermark);
                            if (BoundedWindow.TIMESTAMP_MAX_VALUE.isAfter(watermark)) {
                                enqueueWatermark(unboundedReader);
                            } else {
                                enqueueMaxWatermarkAndEndOfStream(unboundedReader);
                                this.running = false;
                            }
                        }
                    }
                    this.lastWatermarkTime = currentTimeMillis;
                }
            }

            private void enqueueWatermark(UnboundedSource.UnboundedReader unboundedReader) throws InterruptedException {
                SystemStreamPartition systemStreamPartition = this.readerToSsp.get(unboundedReader);
                this.queues.get(systemStreamPartition).put(IncomingMessageEnvelope.buildWatermarkEnvelope(systemStreamPartition, unboundedReader.getWatermark().getMillis()));
            }

            private void enqueueMessage(UnboundedSource.UnboundedReader unboundedReader) throws InterruptedException {
                Object current = unboundedReader.getCurrent();
                Instant currentTimestamp = unboundedReader.getCurrentTimestamp();
                SystemStreamPartition systemStreamPartition = this.readerToSsp.get(unboundedReader);
                this.queues.get(systemStreamPartition).put(new IncomingMessageEnvelope(systemStreamPartition, getOffset(unboundedReader), (Object) null, OpMessage.ofElement(WindowedValue.timestampedValueInGlobalWindow(current, currentTimestamp))));
            }

            private void enqueueMaxWatermarkAndEndOfStream(UnboundedSource.UnboundedReader<T> unboundedReader) {
                SystemStreamPartition systemStreamPartition = this.readerToSsp.get(unboundedReader);
                enqueueUninterruptibly(IncomingMessageEnvelope.buildWatermarkEnvelope(systemStreamPartition, BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
                enqueueUninterruptibly(IncomingMessageEnvelope.buildEndOfStreamEnvelope(systemStreamPartition));
            }

            private void enqueueUninterruptibly(IncomingMessageEnvelope incomingMessageEnvelope) {
                while (true) {
                    try {
                        this.queues.get(incomingMessageEnvelope.getSystemStreamPartition()).put(incomingMessageEnvelope);
                        return;
                    } catch (InterruptedException e) {
                    }
                }
            }

            void stop() {
                this.running = false;
            }

            List<IncomingMessageEnvelope> getNextMessages(SystemStreamPartition systemStreamPartition, long j) throws InterruptedException {
                if (this.lastException != null) {
                    throw new RuntimeException(this.lastException);
                }
                ArrayList arrayList = new ArrayList();
                LinkedBlockingQueue<IncomingMessageEnvelope> linkedBlockingQueue = this.queues.get(systemStreamPartition);
                IncomingMessageEnvelope poll = linkedBlockingQueue.poll(j, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    arrayList.add(poll);
                    linkedBlockingQueue.drainTo(arrayList);
                }
                this.available.release((int) arrayList.stream().filter(incomingMessageEnvelope -> {
                    return incomingMessageEnvelope.getMessage() instanceof OpMessage;
                }).count());
                if (this.lastException != null) {
                    throw new RuntimeException(this.lastException);
                }
                return arrayList;
            }

            private String getOffset(UnboundedSource.UnboundedReader unboundedReader) {
                try {
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                    Objects.requireNonNull(unboundedReader);
                    this.checkpointMarkCoder.encode((UnboundedSource.CheckpointMark) invoke(unboundedReader::getCheckpointMark), byteArrayOutputStream);
                    return Base64.getEncoder().encodeToString(byteArrayOutputStream.toByteArray());
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }

        Consumer(UnboundedSource<T, CheckpointMarkT> unboundedSource, SamzaPipelineOptions samzaPipelineOptions, SamzaMetricsContainer samzaMetricsContainer, String str) {
            try {
                this.splits = UnboundedSourceSystem.split(unboundedSource, samzaPipelineOptions);
                this.checkpointMarkCoder = unboundedSource.getCheckpointMarkCoder();
                this.pipelineOptions = samzaPipelineOptions;
                this.metricsContainer = samzaMetricsContainer;
                this.stepName = str;
            } catch (Exception e) {
                throw new SamzaException("Fail to split source", e);
            }
        }

        public void start() {
            if (this.readerToSsp.isEmpty()) {
                throw new IllegalArgumentException("Attempted to call start without assigned system stream partitions");
            }
            this.readerTask = new ReaderTask<>(this.readerToSsp, this.checkpointMarkCoder, this.pipelineOptions.getSystemBufferSize(), this.pipelineOptions.getWatermarkInterval(), this.pipelineOptions.getEnableMetrics().booleanValue() ? new FnWithMetricsWrapper(this.metricsContainer, this.stepName) : null);
            new Thread(this.readerTask, "unbounded-source-system-consumer-" + NEXT_ID.getAndIncrement()).start();
        }

        public void stop() {
            this.readerTask.stop();
        }

        public void register(SystemStreamPartition systemStreamPartition, String str) {
            UnboundedSource.CheckpointMark checkpointMark = null;
            if (StringUtils.isNoneEmpty(new CharSequence[]{str})) {
                try {
                    checkpointMark = (UnboundedSource.CheckpointMark) this.checkpointMarkCoder.decode(new ByteArrayInputStream(Base64.getDecoder().decode(str)));
                } catch (Exception e) {
                    throw new SamzaException("Error in decode offset", e);
                }
            }
            try {
                this.readerToSsp.put(this.splits.get(systemStreamPartition.getPartition().getPartitionId()).createReader(this.pipelineOptions, checkpointMark), systemStreamPartition);
            } catch (Exception e2) {
                throw new SamzaException("Error while creating source reader for ssp: " + systemStreamPartition, e2);
            }
        }

        public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> set, long j) throws InterruptedException {
            if (!$assertionsDisabled && this.readerToSsp.isEmpty()) {
                throw new AssertionError();
            }
            HashMap hashMap = new HashMap();
            for (SystemStreamPartition systemStreamPartition : set) {
                hashMap.put(systemStreamPartition, this.readerTask.getNextMessages(systemStreamPartition, j));
            }
            return hashMap;
        }

        static {
            $assertionsDisabled = !UnboundedSourceSystem.class.desiredAssertionStatus();
            LOG = LoggerFactory.getLogger(Consumer.class);
            NEXT_ID = new AtomicInteger();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/samza/adapter/UnboundedSourceSystem$Factory.class */
    public static class Factory<T, CheckpointMarkT extends UnboundedSource.CheckpointMark> implements SystemFactory {
        public SystemConsumer getConsumer(String str, Config config, MetricsRegistry metricsRegistry) {
            Config subset = config.subset(("systems." + str) + ".", true);
            return new Consumer(getUnboundedSource(subset), getPipelineOptions(config), new SamzaMetricsContainer((MetricsRegistryMap) metricsRegistry), (String) subset.get("stepName"));
        }

        public SystemProducer getProducer(String str, Config config, MetricsRegistry metricsRegistry) {
            UnboundedSourceSystem.LOG.info("System " + str + " does not have producer.");
            return null;
        }

        public SystemAdmin getAdmin(String str, Config config) {
            return new Admin(getUnboundedSource(config.subset("systems." + str + ".", true)), getPipelineOptions(config));
        }

        private static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark> UnboundedSource<T, CheckpointMarkT> getUnboundedSource(Config config) {
            return (UnboundedSource) Base64Serializer.deserializeUnchecked((String) config.get("source"), UnboundedSource.class);
        }

        private static SamzaPipelineOptions getPipelineOptions(Config config) {
            return (SamzaPipelineOptions) ((SerializablePipelineOptions) Base64Serializer.deserializeUnchecked((String) config.get("beamPipelineOptions"), SerializablePipelineOptions.class)).get().as(SamzaPipelineOptions.class);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark> List<UnboundedSource<T, CheckpointMarkT>> split(UnboundedSource<T, CheckpointMarkT> unboundedSource, SamzaPipelineOptions samzaPipelineOptions) throws Exception {
        int maxSourceParallelism = samzaPipelineOptions.getMaxSourceParallelism();
        if (maxSourceParallelism > 1) {
            List<UnboundedSource<T, CheckpointMarkT>> split = unboundedSource.split(maxSourceParallelism, samzaPipelineOptions);
            if (!split.isEmpty()) {
                return split;
            }
        }
        return Collections.singletonList(unboundedSource);
    }
}
