package org.apache.paimon.flink.sink;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.utils.SerializableSupplier;

/* loaded from: input_file:org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.class */
public class RestoreAndFailCommittableStateManager implements CommittableStateManager {
    private static final long serialVersionUID = 1;
    private final SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>> committableSerializer;
    private ListState<ManifestCommittable> streamingCommitterState;

    public RestoreAndFailCommittableStateManager(SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>> serializableSupplier) {
        this.committableSerializer = serializableSupplier;
    }

    @Override // org.apache.paimon.flink.sink.CommittableStateManager
    public void initializeState(StateInitializationContext stateInitializationContext, Committer committer) throws Exception {
        this.streamingCommitterState = new SimpleVersionedListState(stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE)), this.committableSerializer.get());
        ArrayList arrayList = new ArrayList();
        Iterable iterable = (Iterable) this.streamingCommitterState.get();
        arrayList.getClass();
        iterable.forEach((v1) -> {
            r1.add(v1);
        });
        this.streamingCommitterState.clear();
        recover(arrayList, committer);
    }

    private void recover(List<ManifestCommittable> list, Committer committer) throws Exception {
        List<ManifestCommittable> filterRecoveredCommittables = committer.filterRecoveredCommittables(list);
        if (filterRecoveredCommittables.isEmpty()) {
            return;
        }
        committer.commit(filterRecoveredCommittables);
        throw new RuntimeException("This exception is intentionally thrown after committing the restored checkpoints. By restarting the job we hope that writers can start writing based on these new commits.");
    }

    @Override // org.apache.paimon.flink.sink.CommittableStateManager
    public void snapshotState(StateSnapshotContext stateSnapshotContext, List<ManifestCommittable> list) throws Exception {
        this.streamingCommitterState.update(list);
    }
}
