package org.apache.beam.runners.samza.adapter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.metrics.FnWithMetricsWrapper;
import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/samza/adapter/BoundedSourceSystem.class */
public class BoundedSourceSystem {
    private static final Logger LOG = LoggerFactory.getLogger(BoundedSourceSystem.class);

    /* loaded from: input_file:org/apache/beam/runners/samza/adapter/BoundedSourceSystem$Admin.class */
    public static class Admin<T> implements SystemAdmin {
        private final BoundedSource<T> source;
        private final SamzaPipelineOptions pipelineOptions;

        public Admin(BoundedSource<T> boundedSource, SamzaPipelineOptions samzaPipelineOptions) {
            this.source = boundedSource;
            this.pipelineOptions = samzaPipelineOptions;
        }

        public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> map) {
            return map;
        }

        public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> set) {
            return (Map) set.stream().collect(Collectors.toMap(Function.identity(), str -> {
                try {
                    List split = BoundedSourceSystem.split(this.source, this.pipelineOptions);
                    HashMap hashMap = new HashMap();
                    for (int i = 0; i < split.size(); i++) {
                        hashMap.put(new Partition(i), new SystemStreamMetadata.SystemStreamPartitionMetadata((String) null, (String) null, (String) null));
                    }
                    return new SystemStreamMetadata(str, hashMap);
                } catch (Exception e) {
                    throw new SamzaException("Fail to read stream metadata", e);
                }
            }));
        }

        public Integer offsetComparator(String str, String str2) {
            if (str == null) {
                return Integer.valueOf(str2 == null ? 0 : -1);
            }
            if (str2 == null) {
                return 1;
            }
            return Integer.valueOf(Long.valueOf(str).compareTo(Long.valueOf(str2)));
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/samza/adapter/BoundedSourceSystem$Consumer.class */
    public static class Consumer<T> implements SystemConsumer {
        private static final Logger LOG;
        private static final AtomicInteger NEXT_ID;
        private final List<BoundedSource<T>> splits;
        private final SamzaPipelineOptions pipelineOptions;
        private final Map<BoundedSource.BoundedReader<T>, SystemStreamPartition> readerToSsp = new HashMap();
        private final SamzaMetricsContainer metricsContainer;
        private final String stepName;
        private ReaderTask<T> readerTask;
        static final /* synthetic */ boolean $assertionsDisabled;

        /* loaded from: input_file:org/apache/beam/runners/samza/adapter/BoundedSourceSystem$Consumer$ReaderTask.class */
        private static class ReaderTask<T> implements Runnable {
            private final Map<BoundedSource.BoundedReader<T>, SystemStreamPartition> readerToSsp;
            private final Map<SystemStreamPartition, LinkedBlockingQueue<IncomingMessageEnvelope>> queues;
            private final Semaphore available;
            private final FnWithMetricsWrapper metricsWrapper;
            private long offset;
            private volatile Thread readerThread;
            private volatile boolean stopInvoked;
            private volatile Exception lastException;

            private ReaderTask(Map<BoundedSource.BoundedReader<T>, SystemStreamPartition> map, int i, FnWithMetricsWrapper fnWithMetricsWrapper) {
                this.stopInvoked = false;
                this.readerToSsp = map;
                this.available = new Semaphore(i);
                this.metricsWrapper = fnWithMetricsWrapper;
                HashMap hashMap = new HashMap();
                map.values().forEach(systemStreamPartition -> {
                });
                this.queues = ImmutableMap.copyOf(hashMap);
            }

            @Override // java.lang.Runnable
            public void run() {
                this.readerThread = Thread.currentThread();
                HashSet hashSet = new HashSet(this.readerToSsp.keySet());
                try {
                    try {
                        for (BoundedSource.BoundedReader<T> boundedReader : this.readerToSsp.keySet()) {
                            Objects.requireNonNull(boundedReader);
                            if (((Boolean) invoke(boundedReader::start)).booleanValue()) {
                                enqueueMessage(boundedReader);
                            } else {
                                enqueueMaxWatermarkAndEndOfStream(boundedReader);
                                boundedReader.close();
                                hashSet.remove(boundedReader);
                            }
                        }
                        while (!this.stopInvoked && !hashSet.isEmpty()) {
                            Iterator it = hashSet.iterator();
                            while (it.hasNext()) {
                                BoundedSource.BoundedReader<T> boundedReader2 = (BoundedSource.BoundedReader) it.next();
                                Objects.requireNonNull(boundedReader2);
                                if (((Boolean) invoke(boundedReader2::advance)).booleanValue()) {
                                    enqueueMessage(boundedReader2);
                                } else {
                                    enqueueMaxWatermarkAndEndOfStream(boundedReader2);
                                    boundedReader2.close();
                                    it.remove();
                                }
                            }
                        }
                        hashSet.forEach(boundedReader3 -> {
                            try {
                                boundedReader3.close();
                            } catch (IOException e) {
                                Consumer.LOG.error("Reader task failed to close reader for ssp {}", this.readerToSsp.get(boundedReader3), e);
                            }
                        });
                    } catch (InterruptedException e) {
                        hashSet.forEach(boundedReader32 -> {
                            try {
                                boundedReader32.close();
                            } catch (IOException e2) {
                                Consumer.LOG.error("Reader task failed to close reader for ssp {}", this.readerToSsp.get(boundedReader32), e2);
                            }
                        });
                    } catch (Exception e2) {
                        setError(e2);
                        hashSet.forEach(boundedReader322 -> {
                            try {
                                boundedReader322.close();
                            } catch (IOException e22) {
                                Consumer.LOG.error("Reader task failed to close reader for ssp {}", this.readerToSsp.get(boundedReader322), e22);
                            }
                        });
                    }
                } catch (Throwable th) {
                    hashSet.forEach(boundedReader3222 -> {
                        try {
                            boundedReader3222.close();
                        } catch (IOException e22) {
                            Consumer.LOG.error("Reader task failed to close reader for ssp {}", this.readerToSsp.get(boundedReader3222), e22);
                        }
                    });
                    throw th;
                }
            }

            private <X> X invoke(FnWithMetricsWrapper.SupplierWithException<X> supplierWithException) throws Exception {
                return this.metricsWrapper != null ? (X) this.metricsWrapper.wrap(supplierWithException, true) : supplierWithException.get();
            }

            private void enqueueMessage(BoundedSource.BoundedReader<T> boundedReader) throws InterruptedException {
                WindowedValue timestampedValueInGlobalWindow = WindowedValue.timestampedValueInGlobalWindow(boundedReader.getCurrent(), boundedReader.getCurrentTimestamp());
                SystemStreamPartition systemStreamPartition = this.readerToSsp.get(boundedReader);
                long j = this.offset;
                this.offset = j + 1;
                IncomingMessageEnvelope incomingMessageEnvelope = new IncomingMessageEnvelope(systemStreamPartition, Long.toString(j), (Object) null, OpMessage.ofElement(timestampedValueInGlobalWindow));
                this.available.acquire();
                this.queues.get(systemStreamPartition).put(incomingMessageEnvelope);
            }

            private void enqueueMaxWatermarkAndEndOfStream(BoundedSource.BoundedReader<T> boundedReader) {
                SystemStreamPartition systemStreamPartition = this.readerToSsp.get(boundedReader);
                enqueueUninterruptibly(IncomingMessageEnvelope.buildWatermarkEnvelope(systemStreamPartition, BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
                enqueueUninterruptibly(IncomingMessageEnvelope.buildEndOfStreamEnvelope(systemStreamPartition));
            }

            /* JADX INFO: Access modifiers changed from: private */
            public void stop() {
                this.stopInvoked = true;
                Thread thread = this.readerThread;
                if (thread != null) {
                    thread.interrupt();
                }
            }

            /* JADX INFO: Access modifiers changed from: private */
            public List<IncomingMessageEnvelope> getNextMessages(SystemStreamPartition systemStreamPartition, long j) throws InterruptedException {
                if (this.lastException != null) {
                    throw new RuntimeException(this.lastException);
                }
                ArrayList arrayList = new ArrayList();
                LinkedBlockingQueue<IncomingMessageEnvelope> linkedBlockingQueue = this.queues.get(systemStreamPartition);
                IncomingMessageEnvelope poll = linkedBlockingQueue.poll(j, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    arrayList.add(poll);
                    linkedBlockingQueue.drainTo(arrayList);
                }
                this.available.release(arrayList.size());
                if (this.lastException != null) {
                    throw new RuntimeException(this.lastException);
                }
                return arrayList;
            }

            private void setError(Exception exc) {
                this.lastException = exc;
                this.readerToSsp.values().forEach(systemStreamPartition -> {
                    enqueueUninterruptibly(new IncomingMessageEnvelope(systemStreamPartition, (String) null, (Object) null, (Object) null));
                });
            }

            private void enqueueUninterruptibly(IncomingMessageEnvelope incomingMessageEnvelope) {
                while (true) {
                    try {
                        this.queues.get(incomingMessageEnvelope.getSystemStreamPartition()).put(incomingMessageEnvelope);
                        return;
                    } catch (InterruptedException e) {
                    }
                }
            }
        }

        Consumer(BoundedSource<T> boundedSource, SamzaPipelineOptions samzaPipelineOptions, SamzaMetricsContainer samzaMetricsContainer, String str) {
            try {
                this.splits = BoundedSourceSystem.split(boundedSource, samzaPipelineOptions);
                this.pipelineOptions = samzaPipelineOptions;
                this.metricsContainer = samzaMetricsContainer;
                this.stepName = str;
            } catch (Exception e) {
                throw new SamzaException("Fail to split source", e);
            }
        }

        public void start() {
            if (this.readerToSsp.isEmpty()) {
                throw new IllegalArgumentException("Attempted to call start without assigned system stream partitions");
            }
            this.readerTask = new ReaderTask<>(this.readerToSsp, this.pipelineOptions.getSystemBufferSize(), this.pipelineOptions.getEnableMetrics().booleanValue() ? new FnWithMetricsWrapper(this.metricsContainer, this.stepName) : null);
            new Thread(this.readerTask, "bounded-source-system-consumer-" + NEXT_ID.getAndIncrement()).start();
        }

        public void stop() {
            if (this.readerTask != null) {
                this.readerTask.stop();
            }
        }

        public void register(SystemStreamPartition systemStreamPartition, String str) {
            try {
                this.readerToSsp.put(this.splits.get(systemStreamPartition.getPartition().getPartitionId()).createReader(this.pipelineOptions), systemStreamPartition);
            } catch (Exception e) {
                throw new SamzaException("Error while creating source reader for ssp: " + systemStreamPartition, e);
            }
        }

        public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> set, long j) throws InterruptedException {
            if (!$assertionsDisabled && this.readerToSsp.isEmpty()) {
                throw new AssertionError();
            }
            HashMap hashMap = new HashMap();
            for (SystemStreamPartition systemStreamPartition : set) {
                hashMap.put(systemStreamPartition, this.readerTask.getNextMessages(systemStreamPartition, j));
            }
            return hashMap;
        }

        static {
            $assertionsDisabled = !BoundedSourceSystem.class.desiredAssertionStatus();
            LOG = LoggerFactory.getLogger(Consumer.class);
            NEXT_ID = new AtomicInteger();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/samza/adapter/BoundedSourceSystem$Factory.class */
    public static class Factory<T> implements SystemFactory {
        public SystemConsumer getConsumer(String str, Config config, MetricsRegistry metricsRegistry) {
            Config subset = config.subset(("systems." + str) + ".", true);
            return new Consumer(getBoundedSource(subset), getPipelineOptions(config), new SamzaMetricsContainer((MetricsRegistryMap) metricsRegistry), (String) subset.get("stepName"));
        }

        public SystemProducer getProducer(String str, Config config, MetricsRegistry metricsRegistry) {
            BoundedSourceSystem.LOG.info("System " + str + " does not have producer.");
            return null;
        }

        public SystemAdmin getAdmin(String str, Config config) {
            return new Admin(getBoundedSource(config.subset("systems." + str + ".", true)), getPipelineOptions(config));
        }

        private static <T> BoundedSource<T> getBoundedSource(Config config) {
            return (BoundedSource) Base64Serializer.deserializeUnchecked((String) config.get("source"), BoundedSource.class);
        }

        private static SamzaPipelineOptions getPipelineOptions(Config config) {
            return (SamzaPipelineOptions) ((SerializablePipelineOptions) Base64Serializer.deserializeUnchecked((String) config.get("beamPipelineOptions"), SerializablePipelineOptions.class)).get().as(SamzaPipelineOptions.class);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> List<BoundedSource<T>> split(BoundedSource<T> boundedSource, SamzaPipelineOptions samzaPipelineOptions) throws Exception {
        int maxSourceParallelism = samzaPipelineOptions.getMaxSourceParallelism();
        if (maxSourceParallelism > 1) {
            List<BoundedSource<T>> split = boundedSource.split(((boundedSource.getEstimatedSizeBytes(samzaPipelineOptions) + maxSourceParallelism) - 1) / maxSourceParallelism, samzaPipelineOptions);
            if (!split.isEmpty()) {
                return split;
            }
        }
        return Collections.singletonList(boundedSource);
    }
}
