package org.apache.flink.streaming.connectors.kafka.table;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.KafkaTestBaseWithFlink;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.runtime.utils.TableEnvUtil;
import org.apache.flink.test.util.SuccessException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.class */
public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink {
    private static final String JSON_FORMAT = "json";
    private static final String AVRO_FORMAT = "avro";
    private static final String CSV_FORMAT = "csv";

    @Parameterized.Parameter
    public boolean isLegacyConnector;

    @Parameterized.Parameter(1)
    public String format;
    protected StreamExecutionEnvironment env;
    protected StreamTableEnvironment tEnv;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase$TestingSinkFunction.class */
    private static final class TestingSinkFunction implements SinkFunction<RowData> {
        private static final long serialVersionUID = 455430015321124493L;
        private static List<String> rows = new ArrayList();
        private final int expectedSize;

        private TestingSinkFunction(int i) {
            this.expectedSize = i;
            rows.clear();
        }

        public void invoke(RowData rowData, SinkFunction.Context context) throws Exception {
            rows.add(rowData.toString());
            if (rows.size() >= this.expectedSize) {
                throw new SuccessException();
            }
        }
    }

    @Parameterized.Parameters(name = "legacy = {0}, format = {1}")
    public static Object[] parameters() {
        return new Object[]{new Object[]{false, JSON_FORMAT}, new Object[]{false, AVRO_FORMAT}, new Object[]{false, CSV_FORMAT}, new Object[]{true, JSON_FORMAT}, new Object[]{true, AVRO_FORMAT}, new Object[]{true, CSV_FORMAT}};
    }

    @Before
    public void setup() {
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.tEnv = StreamTableEnvironment.create(this.env, EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
        this.env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
        this.env.setParallelism(1);
    }

    public abstract String factoryIdentifier();

    public abstract String kafkaVersion();

    @Test
    public void testKafkaSourceSink() throws Exception {
        boolean isCausedByJobFinished;
        String str = "tstopic_" + this.format + "_" + this.isLegacyConnector;
        createTestTopic(str, 1, 1);
        String property = standardProps.getProperty("group.id");
        String property2 = standardProps.getProperty("bootstrap.servers");
        this.tEnv.executeSql(!this.isLegacyConnector ? String.format("create table kafka (\n  `computed-price` as price + 1.0,\n  price decimal(38, 18),\n  currency string,\n  log_date date,\n  log_time time(3),\n  log_ts timestamp(3),\n  ts as log_ts + INTERVAL '1' SECOND,\n  watermark for ts as ts\n) with (\n  'connector' = '%s',\n  'topic' = '%s',\n  'properties.bootstrap.servers' = '%s',\n  'properties.group.id' = '%s',\n  'scan.startup.mode' = 'earliest-offset',\n  %s\n)", factoryIdentifier(), str, property2, property, formatOptions()) : String.format("create table kafka (\n  `computed-price` as price + 1.0,\n  price decimal(38, 18),\n  currency string,\n  log_date date,\n  log_time time(3),\n  log_ts timestamp(3),\n  ts as log_ts + INTERVAL '1' SECOND,\n  watermark for ts as ts\n) with (\n  'connector.type' = 'kafka',\n  'connector.version' = '%s',\n  'connector.topic' = '%s',\n  'connector.properties.bootstrap.servers' = '%s',\n  'connector.properties.group.id' = '%s',\n  'connector.startup-mode' = 'earliest-offset',\n  'update-mode' = 'append',\n  %s\n)", kafkaVersion(), str, property2, property, formatOptions()));
        TableEnvUtil.execInsertSqlAndWaitResult(this.tEnv, "INSERT INTO kafka\nSELECT CAST(price AS DECIMAL(10, 2)), currency,  CAST(d AS DATE), CAST(t AS TIME(0)), CAST(ts AS TIMESTAMP(3))\nFROM (VALUES (2.02,'Euro','2019-12-12', '00:00:01', '2019-12-12 00:00:01.001001'), \n  (1.11,'US Dollar','2019-12-12', '00:00:02', '2019-12-12 00:00:02.002001'), \n  (50,'Yen','2019-12-12', '00:00:03', '2019-12-12 00:00:03.004001'), \n  (3.1,'Euro','2019-12-12', '00:00:04', '2019-12-12 00:00:04.005001'), \n  (5.33,'US Dollar','2019-12-12', '00:00:05', '2019-12-12 00:00:05.006001'), \n  (0,'DUMMY','2019-12-12', '00:00:10', '2019-12-12 00:00:10'))\n  AS orders (price, currency, d, t, ts)");
        this.tEnv.toAppendStream(this.tEnv.sqlQuery("SELECT\n  CAST(TUMBLE_END(ts, INTERVAL '5' SECOND) AS VARCHAR),\n  CAST(MAX(log_date) AS VARCHAR),\n  CAST(MAX(log_time) AS VARCHAR),\n  CAST(MAX(ts) AS VARCHAR),\n  COUNT(*),\n  CAST(MAX(price) AS DECIMAL(10, 2))\nFROM kafka\nGROUP BY TUMBLE(ts, INTERVAL '5' SECOND)"), RowData.class).addSink(new TestingSinkFunction(2)).setParallelism(1);
        try {
            this.env.execute("Job_2");
        } finally {
            if (!isCausedByJobFinished) {
            }
            Assert.assertEquals(Arrays.asList("+I(2019-12-12 00:00:05.000,2019-12-12,00:00:03,2019-12-12 00:00:04.004,3,50.00)", "+I(2019-12-12 00:00:10.000,2019-12-12,00:00:05,2019-12-12 00:00:06.006,2,5.33)"), TestingSinkFunction.rows);
            deleteTestTopic(str);
        }
        Assert.assertEquals(Arrays.asList("+I(2019-12-12 00:00:05.000,2019-12-12,00:00:03,2019-12-12 00:00:04.004,3,50.00)", "+I(2019-12-12 00:00:10.000,2019-12-12,00:00:05,2019-12-12 00:00:06.006,2,5.33)"), TestingSinkFunction.rows);
        deleteTestTopic(str);
    }

    private String formatOptions() {
        if (!this.isLegacyConnector) {
            return String.format("'format' = '%s'", this.format);
        }
        String format = String.format("'format.type' = '%s'", this.format);
        return this.format.equals(AVRO_FORMAT) ? format + String.format(", 'format.avro-schema' = '%s'", "{\"type\":\"record\",\"name\":\"row_0\",\"fields\":[{\"name\":\"price\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":38,\"scale\":18}},{\"name\":\"currency\",\"type\":[\"string\",\"null\"]},{\"name\":\"log_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},{\"name\":\"log_time\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"log_ts\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}}]}") : format;
    }

    protected static boolean isCausedByJobFinished(Throwable th) {
        if (th instanceof SuccessException) {
            return true;
        }
        if (th.getCause() != null) {
            return isCausedByJobFinished(th.getCause());
        }
        return false;
    }
}
