package org.apache.flink.streaming.connectors.kafka;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.class */
public class FlinkKafkaProducerBaseTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest$DummyFlinkKafkaProducer.class */
    public static class DummyFlinkKafkaProducer<T> extends FlinkKafkaProducerBase<T> {
        private static final long serialVersionUID = 1;
        private transient MockProducer prod;
        private AtomicBoolean snapshottingFinished;

        public DummyFlinkKafkaProducer(Properties properties, KafkaPartitioner kafkaPartitioner, AtomicBoolean atomicBoolean) {
            super("dummy-topic", (KeyedSerializationSchema) Mockito.mock(KeyedSerializationSchema.class), properties, kafkaPartitioner);
            this.snapshottingFinished = atomicBoolean;
        }

        public DummyFlinkKafkaProducer(Properties properties, KafkaPartitioner kafkaPartitioner) {
            super("dummy-topic", (KeyedSerializationSchema) Mockito.mock(KeyedSerializationSchema.class), properties, kafkaPartitioner);
            this.snapshottingFinished = new AtomicBoolean(true);
        }

        protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties properties) {
            this.prod = new MockProducer();
            return this.prod;
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            super.snapshotState(functionSnapshotContext);
            this.snapshottingFinished.set(true);
        }

        protected void flush() {
            this.prod.flush();
        }

        public MockProducer getProducerInstance() {
            return this.prod;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest$MockProducer.class */
    public static class MockProducer<K, V> extends KafkaProducer<K, V> {
        List<Callback> pendingCallbacks;

        public MockProducer() {
            super(FakeStandardProducerConfig.get());
            this.pendingCallbacks = new ArrayList();
        }

        public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
            throw new UnsupportedOperationException("Unexpected");
        }

        public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord, Callback callback) {
            this.pendingCallbacks.add(callback);
            return null;
        }

        public List<PartitionInfo> partitionsFor(String str) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(new PartitionInfo(str, 3, (Node) null, (Node[]) null, (Node[]) null));
            arrayList.add(new PartitionInfo(str, 1, (Node) null, (Node[]) null, (Node[]) null));
            arrayList.add(new PartitionInfo(str, 0, (Node) null, (Node[]) null, (Node[]) null));
            arrayList.add(new PartitionInfo(str, 2, (Node) null, (Node[]) null, (Node[]) null));
            return arrayList;
        }

        public Map<MetricName, ? extends Metric> metrics() {
            return null;
        }

        public List<Callback> getPending() {
            return this.pendingCallbacks;
        }

        public void flush() {
            while (this.pendingCallbacks.size() > 0) {
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                    throw new RuntimeException("Unable to flush producer, task was interrupted");
                }
            }
        }
    }

    @Test(expected = IllegalArgumentException.class)
    public void testInstantiationFailsWhenBootstrapServersMissing() throws Exception {
        new DummyFlinkKafkaProducer(new Properties(), null);
    }

    @Test
    public void testKeyValueDeserializersSetIfMissing() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:12345");
        new DummyFlinkKafkaProducer(properties, null);
        Assert.assertTrue(properties.containsKey("value.serializer"));
        Assert.assertTrue(properties.containsKey("key.serializer"));
        Assert.assertTrue(properties.getProperty("key.serializer").equals(ByteArraySerializer.class.getCanonicalName()));
        Assert.assertTrue(properties.getProperty("key.serializer").equals(ByteArraySerializer.class.getCanonicalName()));
    }

    @Test
    public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception {
        KafkaPartitioner kafkaPartitioner = (KafkaPartitioner) Mockito.mock(KafkaPartitioner.class);
        RuntimeContext runtimeContext = (RuntimeContext) Mockito.mock(RuntimeContext.class);
        Mockito.when(Integer.valueOf(runtimeContext.getIndexOfThisSubtask())).thenReturn(0);
        Mockito.when(Integer.valueOf(runtimeContext.getNumberOfParallelSubtasks())).thenReturn(1);
        DummyFlinkKafkaProducer dummyFlinkKafkaProducer = new DummyFlinkKafkaProducer(FakeStandardProducerConfig.get(), kafkaPartitioner);
        dummyFlinkKafkaProducer.setRuntimeContext(runtimeContext);
        dummyFlinkKafkaProducer.open(new Configuration());
        ((KafkaPartitioner) Mockito.verify(kafkaPartitioner)).open(0, 1, new int[]{0, 1, 2, 3});
    }

    @Test(timeout = 5000)
    public void testAtLeastOnceProducer() throws Throwable {
        runAtLeastOnceTest(true);
    }

    @Test(expected = AssertionError.class, timeout = 5000)
    public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws Throwable {
        runAtLeastOnceTest(false);
    }

    private void runAtLeastOnceTest(boolean z) throws Throwable {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final DummyFlinkKafkaProducer dummyFlinkKafkaProducer = new DummyFlinkKafkaProducer(FakeStandardProducerConfig.get(), null, atomicBoolean);
        dummyFlinkKafkaProducer.setFlushOnCheckpoint(z);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamSink(dummyFlinkKafkaProducer));
        oneInputStreamOperatorTestHarness.open();
        for (int i = 0; i < 100; i++) {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-" + i));
        }
        final Tuple1 tuple1 = new Tuple1((Object) null);
        final Thread currentThread = Thread.currentThread();
        Thread thread = new Thread(new Runnable() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBaseTest.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    List<Callback> pending = dummyFlinkKafkaProducer.getProducerInstance().getPending();
                    synchronized (currentThread) {
                        currentThread.wait(500L);
                    }
                    Assert.assertEquals(100L, pending.size());
                    Assert.assertFalse("Snapshot method returned before all records were confirmed", atomicBoolean.get());
                    Iterator<Callback> it = pending.iterator();
                    while (it.hasNext()) {
                        it.next().onCompletion((RecordMetadata) null, (Exception) null);
                    }
                    pending.clear();
                } catch (Throwable th) {
                    tuple1.f0 = th;
                }
            }
        });
        thread.start();
        oneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        synchronized (currentThread) {
            currentThread.notifyAll();
        }
        Assert.assertEquals(0L, dummyFlinkKafkaProducer.getProducerInstance().getPending().size());
        Deadline fromNow = FiniteDuration.apply(5L, "s").fromNow();
        while (fromNow.hasTimeLeft() && thread.isAlive()) {
            thread.join(500L);
        }
        Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test is prone to fail", thread.isAlive());
        if (tuple1.f0 != null) {
            throw ((Throwable) tuple1.f0);
        }
        oneInputStreamOperatorTestHarness.close();
    }
}
