package org.apache.flink.streaming.connectors.kafka;

import java.io.Serializable;
import java.util.Properties;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.class */
public abstract class KafkaTableSinkTestBase {
    private static final String TOPIC = "testTopic";
    protected static final String[] FIELD_NAMES = {"field1", "field2"};
    private static final TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[]{Integer.class, String.class});
    private static final KafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
    private static final Properties PROPERTIES = createSinkProperties();
    private final FlinkKafkaProducerBase<Row> PRODUCER = new FlinkKafkaProducerBase<Row>(TOPIC, new KeyedSerializationSchemaWrapper(getSerializationSchema()), PROPERTIES, PARTITIONER) { // from class: org.apache.flink.streaming.connectors.kafka.KafkaTableSinkTestBase.1
        protected void flush() {
        }
    };

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase$CustomPartitioner.class */
    private static class CustomPartitioner extends KafkaPartitioner<Row> implements Serializable {
        private CustomPartitioner() {
        }

        public int partition(Row row, byte[] bArr, byte[] bArr2, int i) {
            return 0;
        }
    }

    @Test
    public void testKafkaTableSink() throws Exception {
        DataStream dataStream = (DataStream) Mockito.mock(DataStream.class);
        KafkaTableSink kafkaTableSink = (KafkaTableSink) Mockito.spy(createTableSink());
        kafkaTableSink.emitDataStream(dataStream);
        ((DataStream) Mockito.verify(dataStream)).addSink((SinkFunction) Matchers.eq(this.PRODUCER));
        ((KafkaTableSink) Mockito.verify(kafkaTableSink)).createKafkaProducer((String) Matchers.eq(TOPIC), (Properties) Matchers.eq(PROPERTIES), (SerializationSchema) Mockito.any(getSerializationSchema().getClass()), (KafkaPartitioner) Matchers.eq(PARTITIONER));
    }

    @Test
    public void testConfiguration() {
        KafkaTableSink createTableSink = createTableSink();
        KafkaTableSink configure = createTableSink.configure(FIELD_NAMES, FIELD_TYPES);
        Assert.assertNotSame(createTableSink, configure);
        Assert.assertArrayEquals(FIELD_NAMES, configure.getFieldNames());
        Assert.assertArrayEquals(FIELD_TYPES, configure.getFieldTypes());
        Assert.assertEquals(new RowTypeInfo(FIELD_TYPES), configure.getOutputType());
    }

    protected abstract KafkaTableSink createTableSink(String str, Properties properties, KafkaPartitioner<Row> kafkaPartitioner, FlinkKafkaProducerBase<Row> flinkKafkaProducerBase);

    protected abstract SerializationSchema<Row> getSerializationSchema();

    private KafkaTableSink createTableSink() {
        return createTableSink(TOPIC, PROPERTIES, PARTITIONER, this.PRODUCER);
    }

    private static Properties createSinkProperties() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:12345");
        return properties;
    }
}
