package org.apache.beam.runners.jet.processors;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.nio.Address;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.jet.Utils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;

/* loaded from: input_file:org/apache/beam/runners/jet/processors/UnboundedSourceP.class */
public class UnboundedSourceP<T, CmT extends UnboundedSource.CheckpointMark> extends AbstractProcessor {
    private UnboundedSource.UnboundedReader<T>[] readers;
    private final List<? extends UnboundedSource<T, CmT>> allShards;
    private final PipelineOptions options;
    private final Coder outputCoder;
    private final String ownerId;
    private Traverser<Object> traverser;

    /* loaded from: input_file:org/apache/beam/runners/jet/processors/UnboundedSourceP$CoalescingTraverser.class */
    private static class CoalescingTraverser<InputT> implements Traverser<Object> {
        private final UnboundedSource.UnboundedReader<InputT>[] readers;
        private final Function<UnboundedSource.UnboundedReader<InputT>, byte[]> mapFn;
        private int currentReaderIndex;
        private long minWatermark = Long.MIN_VALUE;
        private long lastSentWatermark = Long.MIN_VALUE;
        private long[] watermarks;

        CoalescingTraverser(UnboundedSource.UnboundedReader<InputT>[] unboundedReaderArr, Function<UnboundedSource.UnboundedReader<InputT>, byte[]> function) {
            this.readers = unboundedReaderArr;
            this.watermarks = UnboundedSourceP.initWatermarks(unboundedReaderArr.length);
            this.mapFn = function;
        }

        public Object next() {
            if (this.minWatermark > this.lastSentWatermark) {
                this.lastSentWatermark = this.minWatermark;
                return new Watermark(this.lastSentWatermark);
            }
            for (int i = 0; i < this.readers.length; i++) {
                try {
                    this.currentReaderIndex++;
                    if (this.currentReaderIndex >= this.readers.length) {
                        this.currentReaderIndex = 0;
                    }
                    UnboundedSource.UnboundedReader<InputT> unboundedReader = this.readers[this.currentReaderIndex];
                    if (unboundedReader.advance()) {
                        long millis = unboundedReader.getWatermark().getMillis();
                        long j = this.watermarks[this.currentReaderIndex];
                        if (millis > j) {
                            this.watermarks[this.currentReaderIndex] = millis;
                            if (j == this.minWatermark) {
                                this.minWatermark = UnboundedSourceP.getMin(this.watermarks);
                            }
                        }
                        return this.mapFn.apply(unboundedReader);
                    }
                } catch (IOException e) {
                    throw ExceptionUtil.rethrow(e);
                }
            }
            return null;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/jet/processors/UnboundedSourceP$SingleReaderTraverser.class */
    private static class SingleReaderTraverser<InputT> implements Traverser<Object> {
        private final UnboundedSource.UnboundedReader<InputT> reader;
        private final Function<UnboundedSource.UnboundedReader<InputT>, byte[]> mapFn;
        private long lastWatermark = Long.MIN_VALUE;

        SingleReaderTraverser(UnboundedSource.UnboundedReader<InputT> unboundedReader, Function<UnboundedSource.UnboundedReader<InputT>, byte[]> function) {
            this.reader = unboundedReader;
            this.mapFn = function;
        }

        public Object next() {
            long millis = this.reader.getWatermark().getMillis();
            if (millis > this.lastWatermark) {
                this.lastWatermark = millis;
                return new Watermark(millis);
            }
            try {
                if (this.reader.advance()) {
                    return this.mapFn.apply(this.reader);
                }
                return null;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/jet/processors/UnboundedSourceP$UnboundedSourceProcessorMetaSupplier.class */
    private static class UnboundedSourceProcessorMetaSupplier<T, CmT extends UnboundedSource.CheckpointMark> implements ProcessorMetaSupplier {
        private final UnboundedSource<T, CmT> unboundedSource;
        private final SerializablePipelineOptions options;
        private final Coder outputCoder;
        private final String ownerId;
        private List<? extends UnboundedSource<T, CmT>> shards;

        private UnboundedSourceProcessorMetaSupplier(UnboundedSource<T, CmT> unboundedSource, SerializablePipelineOptions serializablePipelineOptions, Coder coder, String str) {
            this.unboundedSource = unboundedSource;
            this.options = serializablePipelineOptions;
            this.outputCoder = coder;
            this.ownerId = str;
        }

        public void init(@Nonnull ProcessorMetaSupplier.Context context) throws Exception {
            this.shards = this.unboundedSource.split(context.totalParallelism(), this.options.get());
        }

        @Nonnull
        public Function<? super Address, ? extends ProcessorSupplier> get(@Nonnull List<Address> list) {
            return address -> {
                return ProcessorSupplier.of(() -> {
                    return new UnboundedSourceP(this.shards, this.options.get(), this.outputCoder, this.ownerId);
                });
            };
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 214619923:
                    if (implMethodName.equals("lambda$get$537266df$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/jet/processors/UnboundedSourceP$UnboundedSourceProcessorMetaSupplier") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                        UnboundedSourceProcessorMetaSupplier unboundedSourceProcessorMetaSupplier = (UnboundedSourceProcessorMetaSupplier) serializedLambda.getCapturedArg(0);
                        return () -> {
                            return new UnboundedSourceP(this.shards, this.options.get(), this.outputCoder, this.ownerId);
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    private UnboundedSourceP(List<? extends UnboundedSource<T, CmT>> list, PipelineOptions pipelineOptions, Coder coder, String str) {
        this.allShards = list;
        this.options = pipelineOptions;
        this.outputCoder = coder;
        this.ownerId = str;
    }

    protected void init(@Nonnull Processor.Context context) throws IOException {
        List roundRobinSubList = Utils.roundRobinSubList(this.allShards, context.globalProcessorIndex(), context.totalParallelism());
        this.readers = createReaders(roundRobinSubList, this.options);
        Function function = unboundedReader -> {
            return Utils.encode(WindowedValue.timestampedValueInGlobalWindow(unboundedReader.getCurrent(), unboundedReader.getCurrentTimestamp()), this.outputCoder);
        };
        if (roundRobinSubList.size() == 0) {
            this.traverser = Traversers.empty();
        } else if (roundRobinSubList.size() == 1) {
            this.traverser = new SingleReaderTraverser(this.readers[0], function);
        } else {
            this.traverser = new CoalescingTraverser(this.readers, function);
        }
        for (UnboundedSource.UnboundedReader<T> unboundedReader2 : this.readers) {
            unboundedReader2.start();
        }
    }

    public boolean complete() {
        emitFromTraverser(this.traverser);
        return this.readers.length == 0;
    }

    public boolean isCooperative() {
        return false;
    }

    public void close() {
        Arrays.stream(this.readers).forEach(UnboundedSourceP::stopReader);
        Arrays.fill(this.readers, (Object) null);
    }

    private static <T, CmT extends UnboundedSource.CheckpointMark> UnboundedSource.UnboundedReader<T>[] createReaders(List<? extends UnboundedSource<T, CmT>> list, PipelineOptions pipelineOptions) {
        return (UnboundedSource.UnboundedReader[]) list.stream().map(unboundedSource -> {
            return createReader(pipelineOptions, unboundedSource);
        }).toArray(i -> {
            return new UnboundedSource.UnboundedReader[i];
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static long[] initWatermarks(int i) {
        long[] jArr = new long[i];
        Arrays.fill(jArr, Long.MIN_VALUE);
        return jArr;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> UnboundedSource.UnboundedReader<T> createReader(PipelineOptions pipelineOptions, UnboundedSource<T, ?> unboundedSource) {
        try {
            return unboundedSource.createReader(pipelineOptions, (UnboundedSource.CheckpointMark) null);
        } catch (IOException e) {
            throw ExceptionUtil.rethrow(e);
        }
    }

    private static void stopReader(UnboundedSource.UnboundedReader<?> unboundedReader) {
        try {
            unboundedReader.close();
        } catch (IOException e) {
            throw ExceptionUtil.rethrow(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static long getMin(long[] jArr) {
        long j = jArr[0];
        for (int i = 1; i < jArr.length; i++) {
            if (jArr[i] < j) {
                j = jArr[i];
            }
        }
        return j;
    }

    public static <T, CmT extends UnboundedSource.CheckpointMark> ProcessorMetaSupplier supplier(UnboundedSource<T, CmT> unboundedSource, SerializablePipelineOptions serializablePipelineOptions, Coder coder, String str) {
        return new UnboundedSourceProcessorMetaSupplier(unboundedSource, serializablePipelineOptions, coder, str);
    }
}
