package org.apache.flink.table.store.connector.sink.global;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestState;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SerializableSupplier;

/* loaded from: input_file:org/apache/flink/table/store/connector/sink/global/LocalCommitterOperator.class */
public class LocalCommitterOperator<CommT> extends AbstractCommitterOperator<CommT, CommT> {
    private static final long serialVersionUID = 1;
    private final SerializableSupplier<Committer<CommT>> committerFactory;
    private Committer<CommT> committer;

    /* loaded from: input_file:org/apache/flink/table/store/connector/sink/global/LocalCommitterOperator$CommitRequestImpl.class */
    public class CommitRequestImpl implements Committer.CommitRequest<CommT> {
        private CommT committable;
        private int numRetries;
        private CommitRequestState state;

        private CommitRequestImpl(CommT commt) {
            this.committable = commt;
            this.state = CommitRequestState.RECEIVED;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isFinished() {
            return this.state.isFinalState();
        }

        public CommT getCommittable() {
            return this.committable;
        }

        public int getNumberOfRetries() {
            return this.numRetries;
        }

        public void signalFailedWithKnownReason(Throwable th) {
            this.state = CommitRequestState.FAILED;
        }

        public void signalFailedWithUnknownReason(Throwable th) {
            this.state = CommitRequestState.FAILED;
            throw new IllegalStateException("Failed to commit " + this.committable, th);
        }

        public void retryLater() {
            this.state = CommitRequestState.RETRY;
            this.numRetries++;
        }

        public void updateAndRetryLater(CommT commt) {
            this.committable = commt;
            retryLater();
        }

        public void signalAlreadyCommitted() {
            this.state = CommitRequestState.COMMITTED;
        }

        void setSelected() {
            this.state = CommitRequestState.RECEIVED;
        }

        void setCommittedIfNoError() {
            if (this.state == CommitRequestState.RECEIVED) {
                this.state = CommitRequestState.COMMITTED;
            }
        }
    }

    public LocalCommitterOperator(SerializableSupplier<Committer<CommT>> serializableSupplier, SerializableSupplier<SimpleVersionedSerializer<CommT>> serializableSupplier2) {
        super(serializableSupplier2);
        this.committerFactory = (SerializableSupplier) Preconditions.checkNotNull(serializableSupplier);
    }

    @Override // org.apache.flink.table.store.connector.sink.global.AbstractCommitterOperator
    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        this.committer = (Committer) this.committerFactory.get();
        super.initializeState(stateInitializationContext);
    }

    @Override // org.apache.flink.table.store.connector.sink.global.AbstractCommitterOperator
    public void commit(boolean z, List<CommT> list) throws IOException, InterruptedException {
        if (list.isEmpty()) {
            return;
        }
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<CommT> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(new CommitRequestImpl(it.next()));
        }
        long j = 1000;
        while (true) {
            long j2 = j;
            arrayList.forEach((v0) -> {
                v0.setSelected();
            });
            this.committer.commit(new ArrayList(arrayList));
            arrayList.forEach((v0) -> {
                v0.setCommittedIfNoError();
            });
            arrayList.removeIf(obj -> {
                return ((CommitRequestImpl) obj).isFinished();
            });
            if (arrayList.isEmpty()) {
                return;
            }
            Thread.sleep(j2);
            j = j2 * 2;
        }
    }

    @Override // org.apache.flink.table.store.connector.sink.global.AbstractCommitterOperator
    public List<CommT> toCommittables(long j, List<CommT> list) {
        return list;
    }

    @Override // org.apache.flink.table.store.connector.sink.global.AbstractCommitterOperator
    public void close() throws Exception {
        this.committer.close();
        super.close();
    }

    public static <CommT, NewT> Committer.CommitRequest<NewT> convertCommitRequest(final Committer.CommitRequest<CommT> commitRequest, final Function<CommT, NewT> function, final Function<NewT, CommT> function2) {
        return new Committer.CommitRequest<NewT>() { // from class: org.apache.flink.table.store.connector.sink.global.LocalCommitterOperator.1
            public NewT getCommittable() {
                return (NewT) function.apply(commitRequest.getCommittable());
            }

            public int getNumberOfRetries() {
                return commitRequest.getNumberOfRetries();
            }

            public void signalFailedWithKnownReason(Throwable th) {
                commitRequest.signalFailedWithKnownReason(th);
            }

            public void signalFailedWithUnknownReason(Throwable th) {
                commitRequest.signalFailedWithUnknownReason(th);
            }

            public void retryLater() {
                commitRequest.retryLater();
            }

            public void updateAndRetryLater(NewT newt) {
                commitRequest.updateAndRetryLater(function2.apply(newt));
            }

            public void signalAlreadyCommitted() {
                commitRequest.signalAlreadyCommitted();
            }
        };
    }
}
