package org.apache.flink.streaming.connectors.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase.class */
public abstract class KafkaTableTestBase extends KafkaTestBase {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase$JobFinishedException.class */
    public static final class JobFinishedException extends RuntimeException {
        private static final long serialVersionUID = -4684689851069516182L;

        private JobFinishedException(String str) {
            super(str);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTableTestBase$TestingSinkFunction.class */
    private static final class TestingSinkFunction implements SinkFunction<Row> {
        private static final long serialVersionUID = 455430015321124493L;
        private static List<String> rows = new ArrayList();
        private final int expectedSize;

        private TestingSinkFunction(int i) {
            this.expectedSize = i;
            rows.clear();
        }

        public void invoke(Row row, SinkFunction.Context context) throws Exception {
            rows.add(row.toString());
            if (rows.size() >= this.expectedSize) {
                throw new JobFinishedException("All records are received, job is finished.");
            }
        }
    }

    public abstract String kafkaVersion();

    @Test
    public void testKafkaSourceSink() throws Exception {
        boolean isCausedByJobFinished;
        createTestTopic("tstopic", 1, 1);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment, EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
        executionEnvironment.getConfig().setRestartStrategy(RestartStrategies.noRestart());
        executionEnvironment.setParallelism(1);
        String property = standardProps.getProperty("group.id");
        String property2 = standardProps.getProperty("zookeeper.connect");
        String property3 = standardProps.getProperty("bootstrap.servers");
        TableSchema build = TableSchema.builder().field("computed-price", DataTypes.DECIMAL(38, 18), "price + 1.0").field("price", DataTypes.DECIMAL(38, 18)).field("currency", DataTypes.STRING()).field("log_ts", DataTypes.TIMESTAMP(3)).field("ts", DataTypes.TIMESTAMP(3), "log_ts + INTERVAL '1' SECOND").watermark("ts", "ts", DataTypes.TIMESTAMP(3)).build();
        HashMap hashMap = new HashMap();
        hashMap.put("connector.type", "kafka");
        hashMap.put("connector.topic", "tstopic");
        hashMap.put("connector.version", kafkaVersion());
        hashMap.put("connector.properties.zookeeper.connect", property2);
        hashMap.put("connector.properties.bootstrap.servers", property3);
        hashMap.put("connector.properties.group.id", property);
        hashMap.put("connector.startup-mode", "earliest-offset");
        hashMap.put("format.type", "json");
        hashMap.put("update-mode", "append");
        ((Catalog) create.getCatalog(create.getCurrentCatalog()).get()).createTable(ObjectPath.fromString(create.getCurrentDatabase() + ".kafka"), new CatalogTableImpl(build, hashMap, "comment"), true);
        create.sqlUpdate("INSERT INTO kafka\nSELECT CAST(price AS DECIMAL(10, 2)), currency, CAST(ts AS TIMESTAMP(3))\nFROM (VALUES (2.02,'Euro','2019-12-12 00:00:00.001001'), \n  (1.11,'US Dollar','2019-12-12 00:00:01.002001'), \n  (50,'Yen','2019-12-12 00:00:03.004001'), \n  (3.1,'Euro','2019-12-12 00:00:04.005001'), \n  (5.33,'US Dollar','2019-12-12 00:00:05.006001'), \n  (0,'DUMMY','2019-12-12 00:00:10'))\n  AS orders (price, currency, ts)");
        create.execute("Job_1");
        create.toAppendStream(create.sqlQuery("SELECT\n  CAST(TUMBLE_END(ts, INTERVAL '5' SECOND) AS VARCHAR),\n  CAST(MAX(ts) AS VARCHAR),\n  COUNT(*),\n  CAST(MAX(price) AS DECIMAL(10, 2))\nFROM kafka\nGROUP BY TUMBLE(ts, INTERVAL '5' SECOND)"), Row.class).addSink(new TestingSinkFunction(2)).setParallelism(1);
        try {
            create.execute("Job_2");
        } finally {
            if (!isCausedByJobFinished) {
            }
            Assert.assertEquals(Arrays.asList("2019-12-12 00:00:05.000,2019-12-12 00:00:04.004,3,50.00", "2019-12-12 00:00:10.000,2019-12-12 00:00:06.006,2,5.33"), TestingSinkFunction.rows);
            deleteTestTopic("tstopic");
        }
        Assert.assertEquals(Arrays.asList("2019-12-12 00:00:05.000,2019-12-12 00:00:04.004,3,50.00", "2019-12-12 00:00:10.000,2019-12-12 00:00:06.006,2,5.33"), TestingSinkFunction.rows);
        deleteTestTopic("tstopic");
    }

    private static boolean isCausedByJobFinished(Throwable th) {
        if (th instanceof JobFinishedException) {
            return true;
        }
        if (th.getCause() != null) {
            return isCausedByJobFinished(th.getCause());
        }
        return false;
    }
}
