package org.apache.flink.streaming.connectors.kafka;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.Assert;
import org.junit.Test;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.class */
public class AtLeastOnceProducerTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest$MockProducer.class */
    public static class MockProducer<K, V> extends KafkaProducer<K, V> {
        List<Callback> pendingCallbacks;

        private static Properties getFakeProperties() {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "localhost:12345");
            properties.setProperty("key.serializer", ByteArraySerializer.class.getName());
            properties.setProperty("value.serializer", ByteArraySerializer.class.getName());
            return properties;
        }

        public MockProducer() {
            super(getFakeProperties());
            this.pendingCallbacks = new ArrayList();
        }

        public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
            throw new UnsupportedOperationException("Unexpected");
        }

        public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord, Callback callback) {
            this.pendingCallbacks.add(callback);
            return null;
        }

        public List<PartitionInfo> partitionsFor(String str) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(new PartitionInfo(str, 0, (Node) null, (Node[]) null, (Node[]) null));
            return arrayList;
        }

        public Map<MetricName, ? extends Metric> metrics() {
            return null;
        }

        public List<Callback> getPending() {
            return this.pendingCallbacks;
        }

        public void flush() {
            while (this.pendingCallbacks.size() > 0) {
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                    throw new RuntimeException("Unable to flush producer, task was interrupted");
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest$TestingKafkaProducer.class */
    public static class TestingKafkaProducer<T> extends FlinkKafkaProducerBase<T> {
        private MockProducer prod;
        private AtomicBoolean snapshottingFinished;

        public TestingKafkaProducer(String str, KeyedSerializationSchema<T> keyedSerializationSchema, Properties properties, AtomicBoolean atomicBoolean) {
            super(str, keyedSerializationSchema, properties, (KafkaPartitioner) null);
            this.snapshottingFinished = atomicBoolean;
        }

        protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties properties) {
            this.prod = new MockProducer();
            return this.prod;
        }

        public Serializable snapshotState(long j, long j2) {
            Serializable snapshotState = super.snapshotState(j, j2);
            this.snapshottingFinished.set(true);
            return snapshotState;
        }

        protected void flush() {
            this.prod.flush();
        }

        public MockProducer getProducerInstance() {
            return this.prod;
        }
    }

    @Test(timeout = 5000)
    public void testAtLeastOnceProducer() throws Throwable {
        runTest(true);
    }

    @Test(expected = AssertionError.class, timeout = 5000)
    public void ensureTestFails() throws Throwable {
        runTest(false);
    }

    private void runTest(boolean z) throws Throwable {
        Properties properties = new Properties();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final TestingKafkaProducer testingKafkaProducer = new TestingKafkaProducer("someTopic", new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), properties, atomicBoolean);
        testingKafkaProducer.setFlushOnCheckpoint(z);
        testingKafkaProducer.setRuntimeContext(new MockRuntimeContext(0, 1));
        testingKafkaProducer.open(new Configuration());
        for (int i = 0; i < 100; i++) {
            testingKafkaProducer.invoke("msg-" + i);
        }
        final Tuple1 tuple1 = new Tuple1((Object) null);
        final Thread currentThread = Thread.currentThread();
        Thread thread = new Thread(new Runnable() { // from class: org.apache.flink.streaming.connectors.kafka.AtLeastOnceProducerTest.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    List<Callback> pending = testingKafkaProducer.getProducerInstance().getPending();
                    synchronized (currentThread) {
                        currentThread.wait(500L);
                    }
                    Assert.assertEquals(100L, pending.size());
                    Assert.assertFalse("Snapshot method returned before all records were confirmed", atomicBoolean.get());
                    Iterator<Callback> it = pending.iterator();
                    while (it.hasNext()) {
                        it.next().onCompletion((RecordMetadata) null, (Exception) null);
                    }
                    pending.clear();
                } catch (Throwable th) {
                    tuple1.f0 = th;
                }
            }
        });
        thread.start();
        testingKafkaProducer.snapshotState(0L, 0L);
        synchronized (currentThread) {
            currentThread.notifyAll();
        }
        Assert.assertEquals(0L, testingKafkaProducer.getProducerInstance().getPending().size());
        Deadline fromNow = FiniteDuration.apply(5L, "s").fromNow();
        while (fromNow.hasTimeLeft() && thread.isAlive()) {
            thread.join(500L);
        }
        Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test is prone to fail", thread.isAlive());
        if (tuple1.f0 != null) {
            throw ((Throwable) tuple1.f0);
        }
        testingKafkaProducer.close();
    }
}
