package org.apache.flink.table.store.connector;

import java.time.LocalDateTime;
import java.util.function.Function;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/store/connector/StreamingWarehouseITCase.class */
public class StreamingWarehouseITCase extends ReadWriteTableTestBase {
    private final StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(buildStreamEnv(1));
    private final StreamTableEnvironment batchTableEnv = StreamTableEnvironment.create(buildBatchEnv(1), EnvironmentSettings.inBatchMode());
    private static final Function<Row, CleanedTradeOrder> ORDER_CONVERTER;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/store/connector/StreamingWarehouseITCase$CleanedTradeOrder.class */
    public static class CleanedTradeOrder {
        protected Long orderId;
        protected LocalDateTime orderTimestamp;
        protected String buyerId;
        protected Double orderAmount;
        protected Double loyaltyDiscount;
        protected Double shippingFee;
        protected Boolean orderVerified;
        protected Double actualGmv;
        protected String dt;

        private CleanedTradeOrder() {
        }
    }

    @Test
    public void testUserStory() throws Exception {
        this.rootPath = TEMPORARY_FOLDER.newFolder().getPath();
        String format = String.format("CREATE TABLE IF NOT EXISTS cleaned_trade_order (\n    order_id BIGINT NOT NULL,\n    order_timestamp TIMESTAMP (3),\n    buyer_id STRING,\n    order_amount DOUBLE,\n    loyalty_discount DOUBLE,\n    shipping_fee DOUBLE,\n    order_verified BOOLEAN,\n    actual_gmv DOUBLE,\n    dt STRING,\n    PRIMARY KEY (dt, order_id) NOT ENFORCED\n  )\nPARTITIONED BY (dt)\nWITH (\n    'root-path' = '%s',\n    'log.system' = 'kafka',     'kafka.bootstrap.servers' = '%s');", this.rootPath, getBootstrapServers());
        this.streamTableEnv.executeSql("CREATE TABLE IF NOT EXISTS trade_orders (\n    order_id BIGINT NOT NULL,\n    order_timestamp AS LOCALTIMESTAMP,\n    buyer_id STRING,\n    order_amount DOUBLE,\n    loyalty_discount DOUBLE,\n    shipping_fee DOUBLE,\n    order_verified BOOLEAN,\n    PRIMARY KEY (order_id) NOT ENFORCED\n  )\nWITH (\n    'connector' = 'datagen',\n    'rows-per-second' = '10',\n    'fields.order_id.kind' = 'random',\n    'fields.order_id.min' = '1',\n    'fields.buyer_id.kind' = 'random',\n    'fields.buyer_id.length' = '3',\n    'fields.order_amount.min' = '10',\n    'fields.order_amount.max' = '1000',\n    'fields.loyalty_discount.min' = '0',\n    'fields.loyalty_discount.max' = '10',\n    'fields.shipping_fee.min' = '5',\n    'fields.shipping_fee.max' = '20'\n  );");
        this.streamTableEnv.executeSql(format);
        this.batchTableEnv.executeSql("CREATE TABLE IF NOT EXISTS trade_orders (\n    order_id BIGINT NOT NULL,\n    order_timestamp AS LOCALTIMESTAMP,\n    buyer_id STRING,\n    order_amount DOUBLE,\n    loyalty_discount DOUBLE,\n    shipping_fee DOUBLE,\n    order_verified BOOLEAN,\n    PRIMARY KEY (order_id) NOT ENFORCED\n  )\nWITH (\n    'connector' = 'datagen',\n    'rows-per-second' = '10',\n    'fields.order_id.kind' = 'random',\n    'fields.order_id.min' = '1',\n    'fields.buyer_id.kind' = 'random',\n    'fields.buyer_id.length' = '3',\n    'fields.order_amount.min' = '10',\n    'fields.order_amount.max' = '1000',\n    'fields.loyalty_discount.min' = '0',\n    'fields.loyalty_discount.max' = '10',\n    'fields.shipping_fee.min' = '5',\n    'fields.shipping_fee.max' = '20'\n  );");
        this.batchTableEnv.executeSql(format);
        this.batchTableEnv.executeSql("INSERT INTO cleaned_trade_order\nPARTITION (dt = '2022-04-14')\nSELECT order_id,\n  TIMESTAMPADD (\n    HOUR,\n    RAND_INTEGER (24),\n    TO_TIMESTAMP ('2022-04-14', 'yyyy-MM-dd')\n  ) AS order_timestamp,\n  IF (\n    order_verified\n    AND order_id % 2 = 1,\n    '404NotFound',\n    buyer_id\n  ) AS buyer_id,\n  order_amount,\n  loyalty_discount,\n  shipping_fee,\n  order_verified,\n  IF (\n    order_verified\n    AND order_id % 2 = 1,\n    -1,\n    order_amount + shipping_fee - loyalty_discount\n  ) AS actual_gmv\nFROM\n  trade_orders\n  /*+ OPTIONS ('number-of-rows' = '50')  */").await();
        BlockingIterator of = BlockingIterator.of(this.streamTableEnv.executeSql("SELECT * FROM cleaned_trade_order").collect(), ORDER_CONVERTER);
        of.collect(50).stream().filter(cleanedTradeOrder -> {
            return cleanedTradeOrder.orderVerified.booleanValue() && cleanedTradeOrder.orderId.longValue() % 2 == 1;
        }).forEach(cleanedTradeOrder2 -> {
            Assertions.assertThat(cleanedTradeOrder2.buyerId).isEqualTo("404NotFound");
            Assertions.assertThat(cleanedTradeOrder2.actualGmv).isEqualTo(-1.0d);
            Assertions.assertThat(cleanedTradeOrder2.dt).isEqualTo("2022-04-14");
        });
        JobClient jobClient = (JobClient) this.streamTableEnv.executeSql("INSERT INTO cleaned_trade_order\nSELECT order_id,\n  order_timestamp,\n  buyer_id,\n  order_amount,\n  loyalty_discount,\n  shipping_fee,\n  order_verified,\n  order_amount + shipping_fee - loyalty_discount AS actual_gmv,\n  DATE_FORMAT (order_timestamp, 'yyyy-MM-dd') AS dt\nFROM\n  trade_orders").getJobClient().get();
        do {
        } while (jobClient.getJobStatus().get() != JobStatus.RUNNING);
        this.batchTableEnv.executeSql("INSERT OVERWRITE cleaned_trade_order\nSELECT order_id,\n  order_timestamp,\n  IF (buyer_id = '404NotFound', '_ANONYMOUS_USER_', buyer_id) AS buyer_id,\n  order_amount,\n  loyalty_discount,\n  shipping_fee,\n  order_verified,\n  IF (\n    actual_gmv = -1,\n    order_amount + shipping_fee - loyalty_discount,\n    actual_gmv\n  ) AS actual_gmv,\n  dt\nFROM\n  cleaned_trade_order\nWHERE\n  dt = '2022-04-14';").await();
        for (int i = 200; i > 0; i -= 10) {
            Thread.sleep(1000L);
            of.collect(10).forEach(cleanedTradeOrder3 -> {
                Assertions.assertThat(cleanedTradeOrder3.dt).isGreaterThan("2022-04-14");
            });
        }
        BlockingIterator.of(this.batchTableEnv.executeSql("SELECT * FROM cleaned_trade_order WHERE dt ='2022-04-14'").collect(), ORDER_CONVERTER).collect(50).stream().filter(cleanedTradeOrder4 -> {
            return cleanedTradeOrder4.orderVerified.booleanValue() && cleanedTradeOrder4.orderId.longValue() % 2 == 1;
        }).forEach(cleanedTradeOrder5 -> {
            Assertions.assertThat(cleanedTradeOrder5.buyerId).isEqualTo("_ANONYMOUS_USER_");
            Assertions.assertThat(cleanedTradeOrder5.actualGmv).isEqualTo((cleanedTradeOrder5.orderAmount.doubleValue() + cleanedTradeOrder5.shippingFee.doubleValue()) - cleanedTradeOrder5.loyaltyDiscount.doubleValue());
            Assertions.assertThat(cleanedTradeOrder5.dt).isEqualTo("2022-04-14");
        });
        of.close();
        jobClient.cancel().get();
    }

    static {
        $assertionsDisabled = !StreamingWarehouseITCase.class.desiredAssertionStatus();
        ORDER_CONVERTER = row -> {
            if (!$assertionsDisabled && (row == null || row.getArity() != 9)) {
                throw new AssertionError();
            }
            CleanedTradeOrder cleanedTradeOrder = new CleanedTradeOrder();
            cleanedTradeOrder.orderId = (Long) row.getField(0);
            cleanedTradeOrder.orderTimestamp = (LocalDateTime) row.getField(1);
            cleanedTradeOrder.buyerId = (String) row.getField(2);
            cleanedTradeOrder.orderAmount = (Double) row.getField(3);
            cleanedTradeOrder.loyaltyDiscount = (Double) row.getField(4);
            cleanedTradeOrder.shippingFee = (Double) row.getField(5);
            cleanedTradeOrder.orderVerified = (Boolean) row.getField(6);
            cleanedTradeOrder.actualGmv = (Double) row.getField(7);
            cleanedTradeOrder.dt = (String) row.getField(8);
            return cleanedTradeOrder;
        };
    }
}
