package org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.BufferedElements;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.class */
public class BufferingDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
    private final DoFnRunner<InputT, OutputT> underlying;
    private final ListState<CheckpointIdentifier> notYetAcknowledgedSnapshots;
    private final BufferingElementsHandlerFactory bufferingElementsHandlerFactory;
    final int numCheckpointBuffers;
    int currentStateIndex;
    private BufferingElementsHandler currentBufferingElementsHandler;

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner$BufferingElementsHandlerFactory.class */
    private interface BufferingElementsHandlerFactory {
        BufferingElementsHandler get(int i) throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner$CheckpointIdentifier.class */
    public static class CheckpointIdentifier {
        final int internalId;
        final long checkpointId;

        CheckpointIdentifier(int i, long j) {
            this.internalId = i;
            this.checkpointId = j;
        }
    }

    public static <InputT, OutputT> BufferingDoFnRunner<InputT, OutputT> create(DoFnRunner<InputT, OutputT> doFnRunner, String str, Coder coder, Coder coder2, OperatorStateBackend operatorStateBackend, @Nullable KeyedStateBackend<Object> keyedStateBackend, int i) throws Exception {
        return new BufferingDoFnRunner<>(doFnRunner, str, coder, coder2, operatorStateBackend, keyedStateBackend, i);
    }

    private BufferingDoFnRunner(DoFnRunner<InputT, OutputT> doFnRunner, String str, Coder coder, Coder coder2, OperatorStateBackend operatorStateBackend, @Nullable KeyedStateBackend keyedStateBackend, int i) throws Exception {
        Preconditions.checkArgument(i > 0 && i < 32767, "Maximum number of concurrent checkpoints not within the bounds of 0 and %s", 32767);
        this.underlying = doFnRunner;
        this.notYetAcknowledgedSnapshots = operatorStateBackend.getUnionListState(new ListStateDescriptor("notYetAcknowledgedSnapshots", CheckpointIdentifier.class));
        this.bufferingElementsHandlerFactory = i2 -> {
            ListStateDescriptor listStateDescriptor = new ListStateDescriptor(str + i2, new CoderTypeSerializer(new BufferedElements.Coder(coder, coder2)));
            return keyedStateBackend != null ? KeyedBufferingElementsHandler.create(keyedStateBackend, listStateDescriptor) : NonKeyedBufferingElementsHandler.create(operatorStateBackend.getListState(listStateDescriptor));
        };
        this.numCheckpointBuffers = initializeState(i);
        this.currentBufferingElementsHandler = this.bufferingElementsHandlerFactory.get(rotateAndGetStateIndex());
    }

    private int initializeState(int i) throws Exception {
        ArrayList arrayList = new ArrayList();
        Iterables.addAll(arrayList, (Iterable) this.notYetAcknowledgedSnapshots.get());
        int i2 = -1;
        int i3 = 0;
        if (!arrayList.isEmpty()) {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                i3 = Math.max(i3, ((CheckpointIdentifier) it.next()).internalId);
            }
            i2 = ((CheckpointIdentifier) arrayList.get(arrayList.size() - 1)).internalId;
        }
        this.currentStateIndex = i2;
        return Math.max(i, i3) + 1;
    }

    public void startBundle() {
    }

    public void processElement(WindowedValue<InputT> windowedValue) {
        this.currentBufferingElementsHandler.buffer(new BufferedElements.Element(windowedValue));
    }

    public void onTimer(String str, String str2, BoundedWindow boundedWindow, Instant instant, Instant instant2, TimeDomain timeDomain) {
        this.currentBufferingElementsHandler.buffer(new BufferedElements.Timer(str, str2, boundedWindow, instant, instant2, timeDomain));
    }

    public void finishBundle() {
    }

    public DoFn<InputT, OutputT> getFn() {
        return this.underlying.getFn();
    }

    public void checkpoint(long j) throws Exception {
        addToBeAcknowledgedCheckpoint(j, getStateIndex());
        this.currentBufferingElementsHandler = this.bufferingElementsHandlerFactory.get(rotateAndGetStateIndex());
    }

    public void checkpointCompleted(long j) throws Exception {
        Iterator<CheckpointIdentifier> it = gatherToBeAcknowledgedCheckpoints(j).iterator();
        while (it.hasNext()) {
            BufferingElementsHandler bufferingElementsHandler = this.bufferingElementsHandlerFactory.get(it.next().internalId);
            Iterator<BufferedElement> it2 = bufferingElementsHandler.getElements().iterator();
            boolean hasNext = it2.hasNext();
            if (hasNext) {
                this.underlying.startBundle();
            }
            while (it2.hasNext()) {
                it2.next().processWith(this.underlying);
            }
            if (hasNext) {
                this.underlying.finishBundle();
            }
            bufferingElementsHandler.clear();
        }
    }

    private void addToBeAcknowledgedCheckpoint(long j, int i) throws Exception {
        this.notYetAcknowledgedSnapshots.addAll(Collections.singletonList(new CheckpointIdentifier(i, j)));
    }

    private List<CheckpointIdentifier> gatherToBeAcknowledgedCheckpoints(long j) throws Exception {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (CheckpointIdentifier checkpointIdentifier : (Iterable) this.notYetAcknowledgedSnapshots.get()) {
            if (checkpointIdentifier.checkpointId <= j) {
                arrayList.add(checkpointIdentifier);
            } else {
                arrayList2.add(checkpointIdentifier);
            }
        }
        this.notYetAcknowledgedSnapshots.update(arrayList2);
        arrayList.sort(Comparator.comparingLong(checkpointIdentifier2 -> {
            return checkpointIdentifier2.checkpointId;
        }));
        return arrayList;
    }

    private int rotateAndGetStateIndex() {
        this.currentStateIndex = (this.currentStateIndex + 1) % this.numCheckpointBuffers;
        return this.currentStateIndex;
    }

    private int getStateIndex() {
        return this.currentStateIndex;
    }
}
