/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.util.Properties;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.connectors.kafka.KafkaTableSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

public abstract class KafkaTableSinkTestBase {
    private static final String TOPIC = "testTopic";
    protected static final String[] FIELD_NAMES = new String[]{"field1", "field2"};
    private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]{Types.INT(), Types.STRING()};
    private static final FlinkKafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
    private static final Properties PROPERTIES = KafkaTableSinkTestBase.createSinkProperties();
    private final FlinkKafkaProducerBase<Row> PRODUCER = new FlinkKafkaProducerBase<Row>("testTopic", (KeyedSerializationSchema)new KeyedSerializationSchemaWrapper(this.getSerializationSchema()), PROPERTIES, PARTITIONER){

        protected void flush() {
        }
    };

    @Test
    public void testKafkaTableSink() throws Exception {
        DataStream dataStream = (DataStream)Mockito.mock(DataStream.class);
        KafkaTableSink kafkaTableSink = (KafkaTableSink)Mockito.spy((Object)this.createTableSink());
        kafkaTableSink.emitDataStream(dataStream);
        ((DataStream)Mockito.verify((Object)dataStream)).addSink((SinkFunction)Matchers.eq(this.PRODUCER));
        ((KafkaTableSink)Mockito.verify((Object)kafkaTableSink)).createKafkaProducer((String)Matchers.eq((Object)TOPIC), (Properties)Matchers.eq((Object)PROPERTIES), (SerializationSchema)Mockito.any(this.getSerializationSchema().getClass()), (FlinkKafkaPartitioner)Matchers.eq(PARTITIONER));
    }

    @Test
    public void testConfiguration() {
        KafkaTableSink kafkaTableSink = this.createTableSink();
        KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
        Assert.assertNotSame((Object)kafkaTableSink, (Object)newKafkaTableSink);
        Assert.assertArrayEquals((Object[])FIELD_NAMES, (Object[])newKafkaTableSink.getFieldNames());
        Assert.assertArrayEquals((Object[])FIELD_TYPES, (Object[])newKafkaTableSink.getFieldTypes());
        Assert.assertEquals((Object)new RowTypeInfo(FIELD_TYPES), (Object)newKafkaTableSink.getOutputType());
    }

    protected abstract KafkaTableSink createTableSink(String var1, Properties var2, FlinkKafkaPartitioner<Row> var3, FlinkKafkaProducerBase<Row> var4);

    protected abstract SerializationSchema<Row> getSerializationSchema();

    private KafkaTableSink createTableSink() {
        return this.createTableSink(TOPIC, PROPERTIES, PARTITIONER, this.PRODUCER);
    }

    private static Properties createSinkProperties() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:12345");
        return properties;
    }

    private static class CustomPartitioner
    extends FlinkKafkaPartitioner<Row> {
        private CustomPartitioner() {
        }

        public int partition(Row record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
            return 0;
        }
    }
}

