/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class FlinkKafkaProducerBaseTest {
    @Test(expected=IllegalArgumentException.class)
    public void testInstantiationFailsWhenBootstrapServersMissing() throws Exception {
        Properties props = new Properties();
        new DummyFlinkKafkaProducer(props, new KeyedSerializationSchemaWrapper((SerializationSchema)new SimpleStringSchema()), null);
    }

    @Test
    public void testKeyValueDeserializersSetIfMissing() throws Exception {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:12345");
        new DummyFlinkKafkaProducer(props, new KeyedSerializationSchemaWrapper((SerializationSchema)new SimpleStringSchema()), null);
        Assert.assertTrue((boolean)props.containsKey("value.serializer"));
        Assert.assertTrue((boolean)props.containsKey("key.serializer"));
        Assert.assertTrue((boolean)props.getProperty("key.serializer").equals(ByteArraySerializer.class.getCanonicalName()));
        Assert.assertTrue((boolean)props.getProperty("key.serializer").equals(ByteArraySerializer.class.getCanonicalName()));
    }

    @Test
    public void testPartitionerInvokedWithDeterminatePartitionList() throws Exception {
        FlinkKafkaPartitioner mockPartitioner = (FlinkKafkaPartitioner)Mockito.mock(FlinkKafkaPartitioner.class);
        RuntimeContext mockRuntimeContext = (RuntimeContext)Mockito.mock(RuntimeContext.class);
        Mockito.when((Object)mockRuntimeContext.getIndexOfThisSubtask()).thenReturn((Object)0);
        Mockito.when((Object)mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn((Object)1);
        ArrayList<PartitionInfo> mockPartitionsList = new ArrayList<PartitionInfo>(4);
        mockPartitionsList.add(new PartitionInfo("dummy-topic", 3, null, null, null));
        mockPartitionsList.add(new PartitionInfo("dummy-topic", 1, null, null, null));
        mockPartitionsList.add(new PartitionInfo("dummy-topic", 0, null, null, null));
        mockPartitionsList.add(new PartitionInfo("dummy-topic", 2, null, null, null));
        DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper((SerializationSchema)new SimpleStringSchema()), mockPartitioner);
        producer.setRuntimeContext(mockRuntimeContext);
        KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
        Mockito.when((Object)mockProducer.partitionsFor(Mockito.anyString())).thenReturn(mockPartitionsList);
        Mockito.when((Object)mockProducer.metrics()).thenReturn(null);
        producer.open(new Configuration());
        ((FlinkKafkaPartitioner)Mockito.verify((Object)mockPartitioner, (VerificationMode)Mockito.times((int)1))).open(0, 1);
        producer.invoke("foobar");
        ((FlinkKafkaPartitioner)Mockito.verify((Object)mockPartitioner, (VerificationMode)Mockito.times((int)1))).partition((Object)"foobar", null, "foobar".getBytes(), "dummy-topic", new int[]{0, 1, 2, 3});
    }

    @Test
    public void testAsyncErrorRethrownOnInvoke() throws Throwable {
        DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper((SerializationSchema)new SimpleStringSchema()), null);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)new StreamSink(producer));
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)"msg-1"));
        producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
        try {
            testHarness.processElement(new StreamRecord((Object)"msg-2"));
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)e.getCause().getMessage().contains("artificial async exception"));
            return;
        }
        Assert.fail();
    }

    @Test
    public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
        DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper((SerializationSchema)new SimpleStringSchema()), null);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)new StreamSink(producer));
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)"msg-1"));
        producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
        try {
            testHarness.snapshot(123L, 123L);
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)e.getCause().getMessage().contains("artificial async exception"));
            return;
        }
        Assert.fail();
    }

    @Test(timeout=5000L)
    public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
        DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper((SerializationSchema)new SimpleStringSchema()), null);
        producer.setFlushOnCheckpoint(true);
        KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
        final OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)new StreamSink(producer));
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)"msg-1"));
        testHarness.processElement(new StreamRecord((Object)"msg-2"));
        testHarness.processElement(new StreamRecord((Object)"msg-3"));
        ((KafkaProducer)Mockito.verify(mockProducer, (VerificationMode)Mockito.times((int)3))).send((ProducerRecord)Mockito.any(ProducerRecord.class), (Callback)Mockito.any(Callback.class));
        producer.getPendingCallbacks().get(0).onCompletion(null, null);
        CheckedThread snapshotThread = new CheckedThread(){

            public void go() throws Exception {
                testHarness.snapshot(123L, 123L);
            }
        };
        snapshotThread.start();
        producer.getPendingCallbacks().get(1).onCompletion(null, new Exception("artificial async failure for 2nd message"));
        producer.getPendingCallbacks().get(2).onCompletion(null, null);
        try {
            snapshotThread.sync();
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)e.getCause().getMessage().contains("artificial async failure for 2nd message"));
            return;
        }
        Assert.fail();
    }

    @Test(timeout=10000L)
    public void testAtLeastOnceProducer() throws Throwable {
        DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper((SerializationSchema)new SimpleStringSchema()), null);
        producer.setFlushOnCheckpoint(true);
        KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
        final OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)new StreamSink(producer));
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)"msg-1"));
        testHarness.processElement(new StreamRecord((Object)"msg-2"));
        testHarness.processElement(new StreamRecord((Object)"msg-3"));
        ((KafkaProducer)Mockito.verify(mockProducer, (VerificationMode)Mockito.times((int)3))).send((ProducerRecord)Mockito.any(ProducerRecord.class), (Callback)Mockito.any(Callback.class));
        Assert.assertEquals((long)3L, (long)producer.getPendingSize());
        CheckedThread snapshotThread = new CheckedThread(){

            public void go() throws Exception {
                testHarness.snapshot(123L, 123L);
            }
        };
        snapshotThread.start();
        producer.waitUntilFlushStarted();
        Assert.assertTrue((String)"Snapshot returned before all records were flushed", (boolean)snapshotThread.isAlive());
        producer.getPendingCallbacks().get(0).onCompletion(null, null);
        Assert.assertTrue((String)"Snapshot returned before all records were flushed", (boolean)snapshotThread.isAlive());
        Assert.assertEquals((long)2L, (long)producer.getPendingSize());
        producer.getPendingCallbacks().get(1).onCompletion(null, null);
        Assert.assertTrue((String)"Snapshot returned before all records were flushed", (boolean)snapshotThread.isAlive());
        Assert.assertEquals((long)1L, (long)producer.getPendingSize());
        producer.getPendingCallbacks().get(2).onCompletion(null, null);
        Assert.assertEquals((long)0L, (long)producer.getPendingSize());
        snapshotThread.sync();
        testHarness.close();
    }

    @Test(timeout=5000L)
    public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
        DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper((SerializationSchema)new SimpleStringSchema()), null);
        producer.setFlushOnCheckpoint(false);
        KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)new StreamSink(producer));
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)"msg"));
        ((KafkaProducer)Mockito.verify(mockProducer, (VerificationMode)Mockito.times((int)1))).send((ProducerRecord)Mockito.any(ProducerRecord.class), (Callback)Mockito.any(Callback.class));
        testHarness.snapshot(123L, 123L);
        testHarness.close();
    }

    private static class DummyFlinkKafkaProducer<T>
    extends FlinkKafkaProducerBase<T> {
        private static final long serialVersionUID = 1L;
        private static final String DUMMY_TOPIC = "dummy-topic";
        private transient KafkaProducer<?, ?> mockProducer = (KafkaProducer)Mockito.mock(KafkaProducer.class);
        private transient List<Callback> pendingCallbacks;
        private transient MultiShotLatch flushLatch;
        private boolean isFlushed;

        DummyFlinkKafkaProducer(Properties producerConfig, KeyedSerializationSchema<T> schema, FlinkKafkaPartitioner partitioner) {
            super(DUMMY_TOPIC, schema, producerConfig, partitioner);
            Mockito.when((Object)this.mockProducer.send((ProducerRecord)Mockito.any(ProducerRecord.class), (Callback)Mockito.any(Callback.class))).thenAnswer((Answer)new Answer<Object>(){

                public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                    DummyFlinkKafkaProducer.this.pendingCallbacks.add(invocationOnMock.getArgumentAt(1, Callback.class));
                    return null;
                }
            });
            this.pendingCallbacks = new ArrayList<Callback>();
            this.flushLatch = new MultiShotLatch();
        }

        long getPendingSize() {
            if (this.flushOnCheckpoint) {
                return this.numPendingRecords();
            }
            throw new UnsupportedOperationException("getPendingSize not supported when flushing is disabled");
        }

        List<Callback> getPendingCallbacks() {
            return this.pendingCallbacks;
        }

        KafkaProducer<?, ?> getMockKafkaProducer() {
            return this.mockProducer;
        }

        public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
            this.isFlushed = false;
            super.snapshotState(ctx);
            if (this.flushOnCheckpoint && !this.isFlushed) {
                throw new RuntimeException("Flushing is enabled; snapshots should be blocked until all pending records are flushed");
            }
        }

        public void waitUntilFlushStarted() throws Exception {
            this.flushLatch.await();
        }

        protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) {
            return this.mockProducer;
        }

        protected void flush() {
            this.flushLatch.trigger();
            while (this.numPendingRecords() > 0L) {
                try {
                    Thread.sleep(10L);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException("Unable to flush producer, task was interrupted");
                }
            }
            this.isFlushed = true;
        }
    }
}

