package org.apache.flink.table.planner.runtime.utils;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/planner/runtime/utils/FailingCollectionSource.class */
public class FailingCollectionSource<T> implements SourceFunction<T>, CheckpointedFunction, CheckpointListener {
    public static volatile boolean failedBefore = false;
    private static final long serialVersionUID = 1;
    private final TypeSerializer<T> serializer;
    private final byte[] elementsSerialized;
    private final int numElements;
    private volatile int numElementsEmitted;
    private volatile int numElementsToSkip;
    private transient ListState<Integer> checkpointedState;
    private final int failureAfterNumElements;
    private volatile int numSuccessfulCheckpoints;
    private final Map<Long, Integer> checkpointedEmittedNums;
    private volatile boolean isRunning = true;
    private volatile int lastCheckpointedEmittedNum = 0;

    public FailingCollectionSource(TypeSerializer<T> typeSerializer, Iterable<T> iterable, int i) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(byteArrayOutputStream);
        int i2 = 0;
        try {
            Iterator<T> it = iterable.iterator();
            while (it.hasNext()) {
                typeSerializer.serialize(it.next(), dataOutputViewStreamWrapper);
                i2++;
            }
            this.serializer = typeSerializer;
            this.elementsSerialized = byteArrayOutputStream.toByteArray();
            this.numElements = i2;
            Preconditions.checkArgument(i > 0);
            this.failureAfterNumElements = i;
            this.checkpointedEmittedNums = new HashMap();
        } catch (Exception e) {
            throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        Preconditions.checkState(this.checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized.");
        this.checkpointedState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("from-elements-state", IntSerializer.INSTANCE));
        if (functionInitializationContext.isRestored()) {
            ArrayList arrayList = new ArrayList();
            Iterator<T> it = ((Iterable) this.checkpointedState.get()).iterator();
            while (it.hasNext()) {
                arrayList.add((Integer) it.next());
            }
            Preconditions.checkArgument(arrayList.size() == 1, getClass().getSimpleName() + " retrieved invalid state.");
            this.numElementsToSkip = ((Integer) arrayList.get(0)).intValue();
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:11:0x0056, code lost:
    
        throw new java.io.IOException("Failed to deserialize an element from the source. If you are using user-defined serialization (Value and Writable types), check the serialization functions.\nSerializer is " + r5.serializer);
     */
    /* JADX WARN: Code restructure failed: missing block: B:15:0x0057, code lost:
    
        r5.numElementsEmitted = r5.numElementsToSkip;
     */
    /* JADX WARN: Code restructure failed: missing block: B:18:0x0063, code lost:
    
        if (r5.isRunning == false) goto L53;
     */
    /* JADX WARN: Code restructure failed: missing block: B:20:0x006e, code lost:
    
        if (r5.numElementsEmitted >= r5.numElements) goto L55;
     */
    /* JADX WARN: Code restructure failed: missing block: B:22:0x0074, code lost:
    
        if (org.apache.flink.table.planner.runtime.utils.FailingCollectionSource.failedBefore != false) goto L24;
     */
    /* JADX WARN: Code restructure failed: missing block: B:23:0x0077, code lost:
    
        java.lang.Thread.sleep(1);
     */
    /* JADX WARN: Code restructure failed: missing block: B:24:0x0080, code lost:
    
        if (r5.numSuccessfulCheckpoints < 1) goto L24;
     */
    /* JADX WARN: Code restructure failed: missing block: B:26:0x008b, code lost:
    
        if (r5.lastCheckpointedEmittedNum < r5.failureAfterNumElements) goto L24;
     */
    /* JADX WARN: Code restructure failed: missing block: B:28:0x008e, code lost:
    
        org.apache.flink.table.planner.runtime.utils.FailingCollectionSource.failedBefore = true;
     */
    /* JADX WARN: Code restructure failed: missing block: B:29:0x009b, code lost:
    
        throw new java.lang.Exception("Artificial Failure");
     */
    /* JADX WARN: Code restructure failed: missing block: B:2:0x001d, code lost:
    
        if (r9 > 0) goto L4;
     */
    /* JADX WARN: Code restructure failed: missing block: B:31:0x009f, code lost:
    
        if (org.apache.flink.table.planner.runtime.utils.FailingCollectionSource.failedBefore != false) goto L45;
     */
    /* JADX WARN: Code restructure failed: missing block: B:33:0x00aa, code lost:
    
        if (r5.numElementsEmitted >= r5.failureAfterNumElements) goto L56;
     */
    /* JADX WARN: Code restructure failed: missing block: B:35:0x0107, code lost:
    
        java.lang.Thread.sleep(1);
     */
    /* JADX WARN: Code restructure failed: missing block: B:39:0x00ad, code lost:
    
        r0 = r5.serializer.deserialize(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:40:0x00da, code lost:
    
        r0 = r6.getCheckpointLock();
     */
    /* JADX WARN: Code restructure failed: missing block: B:41:0x00e3, code lost:
    
        monitor-enter(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:43:0x00e4, code lost:
    
        r6.collect(r0);
        r5.numElementsEmitted++;
     */
    /* JADX WARN: Code restructure failed: missing block: B:44:0x00f8, code lost:
    
        monitor-exit(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:4:0x0022, code lost:
    
        if (r9 <= 0) goto L51;
     */
    /* JADX WARN: Code restructure failed: missing block: B:56:0x00d9, code lost:
    
        throw new java.io.IOException("Failed to deserialize an element from the source. If you are using user-defined serialization (Value and Writable types), check the serialization functions.\nSerializer is " + r5.serializer);
     */
    /* JADX WARN: Code restructure failed: missing block: B:58:?, code lost:
    
        return;
     */
    /* JADX WARN: Code restructure failed: missing block: B:60:0x010e, code lost:
    
        return;
     */
    /* JADX WARN: Code restructure failed: missing block: B:6:0x0025, code lost:
    
        r5.serializer.deserialize(r0);
        r9 = r9 - 1;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<T> r6) throws java.lang.Exception {
        /*
            Method dump skipped, instructions count: 271
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.table.planner.runtime.utils.FailingCollectionSource.run(org.apache.flink.streaming.api.functions.source.SourceFunction$SourceContext):void");
    }

    public void cancel() {
        this.isRunning = false;
    }

    public int getNumElements() {
        return this.numElements;
    }

    public int getNumElementsEmitted() {
        return this.numElementsEmitted;
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        Preconditions.checkState(this.checkpointedState != null, "The " + getClass().getSimpleName() + " has not been properly initialized.");
        this.checkpointedState.clear();
        this.checkpointedState.add(Integer.valueOf(this.numElementsEmitted));
        this.checkpointedEmittedNums.put(Long.valueOf(functionSnapshotContext.getCheckpointId()), Integer.valueOf(this.numElementsEmitted));
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        this.numSuccessfulCheckpoints++;
        this.lastCheckpointedEmittedNum = this.checkpointedEmittedNums.get(Long.valueOf(j)).intValue();
    }

    public static void reset() {
        failedBefore = false;
    }
}
