package org.apache.flink.connector.jdbc.xa;

import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
import org.apache.flink.connector.jdbc.testutils.TableManaged;
import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.class */
public abstract class JdbcExactlyOnceSinkE2eTest implements DatabaseTest {
    protected static final int PARALLELISM = 4;
    protected static final long CHECKPOINT_TIMEOUT_MS = 5000;
    protected static final long TASK_CANCELLATION_TIMEOUT_MS = 10000;
    private static final Logger LOG = LoggerFactory.getLogger(JdbcExactlyOnceSinkE2eTest.class);
    private static final BooksTable OUTPUT_TABLE = new BooksTable("XaTable");

    @RegisterExtension
    static final MiniClusterExtension MINI_CLUSTER = createCluster();
    private static final Map<Integer, CountDownLatch> activeSources = new ConcurrentHashMap();
    private static final Map<Integer, CountDownLatch> inactiveMappers = new ConcurrentHashMap();

    /* loaded from: input_file:org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest$FailingMapper.class */
    private static class FailingMapper extends RichMapFunction<BooksTable.BookEntry, BooksTable.BookEntry> {
        private final int failingMessage;
        private transient AtomicInteger counter;

        public FailingMapper(int i) {
            this.failingMessage = i;
        }

        public void open(Configuration configuration) throws Exception {
            ((CountDownLatch) JdbcExactlyOnceSinkE2eTest.inactiveMappers.computeIfAbsent(Integer.valueOf(getRuntimeContext().getAttemptNumber()), num -> {
                return new CountDownLatch(getRuntimeContext().getNumberOfParallelSubtasks());
            })).countDown();
            this.counter = new AtomicInteger(this.failingMessage);
            JdbcExactlyOnceSinkE2eTest.LOG.debug("Mapper will fail after {} records.", Integer.valueOf(this.failingMessage));
        }

        public BooksTable.BookEntry map(BooksTable.BookEntry bookEntry) throws Exception {
            if (this.counter.getAndDecrement() > 0) {
                return bookEntry;
            }
            JdbcExactlyOnceSinkE2eTest.LOG.debug("Mapper failing intentionally.");
            throw new TestException();
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest$TestEntrySource.class */
    private static class TestEntrySource extends RichParallelSourceFunction<BooksTable.BookEntry> implements CheckpointListener, CheckpointedFunction {
        private final int numElements;
        private final int numElementsPerCheckpoint;
        private volatile transient ListState<SourceRange> ranges;
        private volatile long lastCheckpointId;
        private volatile boolean lastSnapshotConfirmed;
        private volatile boolean running;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest$TestEntrySource$SourceRange.class */
        public static final class SourceRange {
            private int from;
            private final int to;

            private SourceRange(int i, int i2) {
                this.from = i;
                this.to = i2;
            }

            public static SourceRange forSubtask(int i, int i2) {
                return new SourceRange(i * i2, (i + 1) * i2);
            }

            public void advance() {
                Preconditions.checkState(this.from < this.to);
                this.from++;
            }

            public String toString() {
                return String.format("%d..%d", Integer.valueOf(this.from), Integer.valueOf(this.to));
            }
        }

        private TestEntrySource(int i, int i2) {
            this.lastCheckpointId = -1L;
            this.lastSnapshotConfirmed = false;
            this.running = true;
            this.numElements = i;
            this.numElementsPerCheckpoint = i2;
        }

        public void run(SourceFunction.SourceContext<BooksTable.BookEntry> sourceContext) throws Exception {
            try {
                waitForConsumers();
                Iterator it = ((Iterable) this.ranges.get()).iterator();
                while (it.hasNext()) {
                    emitRange((SourceRange) it.next(), sourceContext);
                }
                waitOtherSources();
            } finally {
                getActiveSources().countDown();
            }
        }

        private void waitForConsumers() throws InterruptedException {
            sleep(() -> {
                return Boolean.valueOf(!JdbcExactlyOnceSinkE2eTest.inactiveMappers.containsKey(Integer.valueOf(getRuntimeContext().getAttemptNumber())));
            });
            ((CountDownLatch) JdbcExactlyOnceSinkE2eTest.inactiveMappers.get(Integer.valueOf(getRuntimeContext().getAttemptNumber()))).await();
        }

        private void emitRange(SourceRange sourceRange, SourceFunction.SourceContext<BooksTable.BookEntry> sourceContext) {
            int i = sourceRange.from;
            while (true) {
                int i2 = i;
                if (i2 >= sourceRange.to || !this.running) {
                    return;
                }
                int min = Math.min(sourceRange.to - i2, this.numElementsPerCheckpoint);
                emit(i2, min, sourceRange, sourceContext);
                i = i2 + min;
            }
        }

        private void emit(int i, int i2, SourceRange sourceRange, SourceFunction.SourceContext<BooksTable.BookEntry> sourceContext) {
            synchronized (sourceContext.getCheckpointLock()) {
                this.lastCheckpointId = -1L;
                this.lastSnapshotConfirmed = false;
                for (int i3 = i; i3 < i + i2 && this.running; i3++) {
                    try {
                        sourceContext.collect(new BooksTable.BookEntry(Integer.valueOf(i3), Integer.toString(i3), Integer.toString(i3), Double.valueOf(i3), Integer.valueOf(i3)));
                        sourceRange.advance();
                    } catch (Exception e) {
                        if (!ExceptionUtils.findThrowable(e, TestException.class).isPresent()) {
                            JdbcExactlyOnceSinkE2eTest.LOG.warn("Exception during record emission", e);
                        }
                        throw e;
                    }
                }
            }
            sleep(() -> {
                return Boolean.valueOf(!this.lastSnapshotConfirmed);
            });
        }

        public void cancel() {
            this.running = false;
        }

        public void notifyCheckpointComplete(long j) {
            if (this.lastCheckpointId <= -1 || j < this.lastCheckpointId) {
                return;
            }
            this.lastSnapshotConfirmed = true;
        }

        public void open(Configuration configuration) throws Exception {
            JdbcExactlyOnceSinkE2eTest.activeSources.putIfAbsent(Integer.valueOf(getRuntimeContext().getAttemptNumber()), new CountDownLatch(getRuntimeContext().getNumberOfParallelSubtasks()));
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.ranges = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("SourceState", SourceRange.class));
            if (!functionInitializationContext.isRestored()) {
                this.ranges.update(Collections.singletonList(SourceRange.forSubtask(getRuntimeContext().getIndexOfThisSubtask(), this.numElements)));
            }
            JdbcExactlyOnceSinkE2eTest.LOG.debug("Source initialized with ranges: {}", this.ranges.get());
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
            this.lastCheckpointId = functionSnapshotContext.getCheckpointId();
        }

        private void sleep(Supplier<Boolean> supplier) {
            while (supplier.get().booleanValue() && this.running && !Thread.currentThread().isInterrupted() && haveActiveSources()) {
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    ExceptionUtils.rethrow(e);
                }
            }
        }

        private void waitOtherSources() throws InterruptedException {
            while (this.running && haveActiveSources()) {
                getActiveSources().await(100L, TimeUnit.MILLISECONDS);
            }
        }

        private CountDownLatch getActiveSources() {
            return (CountDownLatch) JdbcExactlyOnceSinkE2eTest.activeSources.get(Integer.valueOf(getRuntimeContext().getAttemptNumber()));
        }

        private boolean haveActiveSources() {
            return getActiveSources().getCount() > 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest$TestException.class */
    public static final class TestException extends Exception {
        public TestException() {
            super("java.lang.Exception: Artificial failure", null, true, false);
        }
    }

    private static MiniClusterExtension createCluster() {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "full");
        configuration.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, Long.valueOf(TASK_CANCELLATION_TIMEOUT_MS));
        configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT, Duration.ofMillis(CHECKPOINT_TIMEOUT_MS));
        return new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(PARALLELISM).setConfiguration(configuration).build());
    }

    @Override // org.apache.flink.connector.jdbc.testutils.DatabaseTest
    public List<TableManaged> getManagedTables() {
        return Collections.singletonList(OUTPUT_TABLE);
    }

    @AfterEach
    public void after() {
        activeSources.clear();
        inactiveMappers.clear();
    }

    @Test
    void testInsert() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        LOG.info("Test insert for {}", getMetadata().getVersion());
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(PARALLELISM);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart((50 / 10) * 2, Time.milliseconds(100L)));
        executionEnvironment.getConfig().setAutoWatermarkInterval(0L);
        executionEnvironment.enableCheckpointing(50L, CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.addSource(new TestEntrySource(50, 10)).setParallelism(PARALLELISM).map(new FailingMapper(10 + (10 / 2))).addSink(JdbcSink.exactlyOnceSink(OUTPUT_TABLE.getInsertIntoQuery(), OUTPUT_TABLE.getStatementBuilder(), JdbcExecutionOptions.builder().withMaxRetries(0).build(), JdbcExactlyOnceOptions.builder().withTransactionPerConnection(true).build(), getMetadata().getXaSourceSupplier()));
        executionEnvironment.execute();
        List<Integer> insertedIds = JdbcXaFacadeTestHelper.getInsertedIds(getMetadata(), OUTPUT_TABLE.getTableName());
        Assertions.assertThat(insertedIds).as(insertedIds.toString(), new Object[0]).containsExactlyInAnyOrderElementsOf((List) IntStream.range(0, 50 * PARALLELISM).boxed().collect(Collectors.toList()));
        LOG.info("Test insert for {} finished in {} ms.", getMetadata().getVersion(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }
}
