/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.kafka;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
import io.airlift.configuration.ConditionalModule;
import io.airlift.configuration.ConfigurationAwareModule;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.log.Logging;
import io.airlift.units.Duration;
import io.trino.decoder.DecoderModule;
import io.trino.plugin.kafka.KafkaConfig;
import io.trino.plugin.kafka.KafkaQueryRunnerBuilder;
import io.trino.plugin.kafka.KafkaTopicDescription;
import io.trino.plugin.kafka.KafkaTopicFieldGroup;
import io.trino.plugin.kafka.encoder.EncoderModule;
import io.trino.plugin.kafka.schema.ContentSchemaReader;
import io.trino.plugin.kafka.schema.MapBasedTableDescriptionSupplier;
import io.trino.plugin.kafka.schema.TableDescriptionSupplier;
import io.trino.plugin.kafka.schema.file.FileContentSchemaReader;
import io.trino.plugin.kafka.util.CodecSupplier;
import io.trino.plugin.kafka.util.TestUtils;
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.spi.Plugin;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.TypeManager;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.kafka.TestingKafka;
import io.trino.tpch.TpchTable;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

public final class KafkaQueryRunner {
    private static final Logger log = Logger.get(KafkaQueryRunner.class);
    private static final String TPCH_SCHEMA = "tpch";
    private static final String TEST = "test";

    private KafkaQueryRunner() {
    }

    public static Builder builder(TestingKafka testingKafka) {
        return new Builder(testingKafka);
    }

    private static KafkaTopicDescription createTable(SchemaTableName table, JsonCodec<KafkaTopicDescription> topicDescriptionJsonCodec) throws IOException {
        String fileName = String.format("/%s/%s.json", table.getSchemaName(), table.getTableName());
        KafkaTopicDescription tableTemplate = (KafkaTopicDescription)topicDescriptionJsonCodec.fromJson(ByteStreams.toByteArray((InputStream)KafkaQueryRunner.class.getResourceAsStream(fileName)));
        Optional<KafkaTopicFieldGroup> key = tableTemplate.getKey().map(keyTemplate -> new KafkaTopicFieldGroup(keyTemplate.getDataFormat(), keyTemplate.getDataSchema().map(schema -> KafkaQueryRunner.class.getResource((String)schema).getPath()), Optional.empty(), keyTemplate.getFields()));
        Optional<KafkaTopicFieldGroup> message = tableTemplate.getMessage().map(keyTemplate -> new KafkaTopicFieldGroup(keyTemplate.getDataFormat(), keyTemplate.getDataSchema().map(schema -> KafkaQueryRunner.class.getResource((String)schema).getPath()), Optional.empty(), keyTemplate.getFields()));
        return new KafkaTopicDescription(table.getTableName(), Optional.of(table.getSchemaName()), table.toString(), key, message);
    }

    private static String kafkaTopicName(TpchTable<?> table) {
        return "tpch." + table.getTableName().toLowerCase(Locale.ENGLISH);
    }

    private static Map<SchemaTableName, KafkaTopicDescription> createTpchTopicDescriptions(TypeManager typeManager, Iterable<TpchTable<?>> tables) throws Exception {
        JsonCodec<KafkaTopicDescription> topicDescriptionJsonCodec = new CodecSupplier<KafkaTopicDescription>(KafkaTopicDescription.class, typeManager).get();
        ImmutableMap.Builder topicDescriptions = ImmutableMap.builder();
        for (TpchTable<?> table : tables) {
            String tableName = table.getTableName();
            SchemaTableName tpchTable = new SchemaTableName(TPCH_SCHEMA, tableName);
            topicDescriptions.put(TestUtils.loadTpchTopicDescription(topicDescriptionJsonCodec, tpchTable.toString(), tpchTable));
        }
        return topicDescriptions.buildOrThrow();
    }

    public static void main(String[] args) throws Exception {
        Logging.initialize();
        DistributedQueryRunner queryRunner = KafkaQueryRunner.builder(TestingKafka.create()).setTables(TpchTable.getTables()).build();
        Logger log = Logger.get(KafkaQueryRunner.class);
        log.info("======== SERVER STARTED ========");
        log.info("\n====\n%s\n====", new Object[]{queryRunner.getCoordinator().getBaseUrl()});
    }

    public static class Builder
    extends KafkaQueryRunnerBuilder {
        private List<TpchTable<?>> tables = ImmutableList.of();
        private Map<SchemaTableName, KafkaTopicDescription> extraTopicDescription = ImmutableMap.of();

        protected Builder(TestingKafka testingKafka) {
            super(testingKafka, KafkaQueryRunner.TPCH_SCHEMA);
        }

        public Builder setTables(Iterable<TpchTable<?>> tables) {
            this.tables = ImmutableList.copyOf(Objects.requireNonNull(tables, "tables is null"));
            return this;
        }

        public Builder setExtraTopicDescription(Map<SchemaTableName, KafkaTopicDescription> extraTopicDescription) {
            this.extraTopicDescription = ImmutableMap.copyOf(Objects.requireNonNull(extraTopicDescription, "extraTopicDescription is null"));
            return this;
        }

        @Override
        public Builder setExtension(Module extension) {
            this.extension = Objects.requireNonNull(extension, "extension is null");
            return this;
        }

        @Override
        public void preInit(DistributedQueryRunner queryRunner) throws Exception {
            queryRunner.installPlugin((Plugin)new TpchPlugin());
            queryRunner.createCatalog(KafkaQueryRunner.TPCH_SCHEMA, KafkaQueryRunner.TPCH_SCHEMA);
            Map<SchemaTableName, KafkaTopicDescription> tpchTopicDescriptions = KafkaQueryRunner.createTpchTopicDescriptions(queryRunner.getCoordinator().getTypeManager(), this.tables);
            ArrayList<SchemaTableName> tableNames = new ArrayList<SchemaTableName>();
            tableNames.add(new SchemaTableName("read_test", "all_datatypes_json"));
            tableNames.add(new SchemaTableName("write_test", "all_datatypes_avro"));
            tableNames.add(new SchemaTableName("write_test", "all_datatypes_csv"));
            tableNames.add(new SchemaTableName("write_test", "all_datatypes_raw"));
            tableNames.add(new SchemaTableName("write_test", "all_datatypes_json"));
            JsonCodec<KafkaTopicDescription> topicDescriptionJsonCodec = new CodecSupplier<KafkaTopicDescription>(KafkaTopicDescription.class, queryRunner.getCoordinator().getTypeManager()).get();
            ImmutableMap.Builder testTopicDescriptions = ImmutableMap.builder();
            for (SchemaTableName tableName : tableNames) {
                testTopicDescriptions.put((Object)tableName, (Object)KafkaQueryRunner.createTable(tableName, topicDescriptionJsonCodec));
            }
            ImmutableMap topicDescriptions = ImmutableMap.builder().putAll(this.extraTopicDescription).putAll(tpchTopicDescriptions).putAll((Map)testTopicDescriptions.buildOrThrow()).buildOrThrow();
            this.setExtension(ConfigurationAwareModule.combine((Module[])new Module[]{this.extension, ConditionalModule.conditionalModule(KafkaConfig.class, kafkaConfig -> kafkaConfig.getTableDescriptionSupplier().equalsIgnoreCase(KafkaQueryRunner.TEST), arg_0 -> Builder.lambda$preInit$1((Map)topicDescriptions, arg_0)), binder -> binder.bind(ContentSchemaReader.class).to(FileContentSchemaReader.class).in(Scopes.SINGLETON), new DecoderModule(), new EncoderModule()}));
            HashMap<String, String> properties = new HashMap<String, String>(this.extraKafkaProperties);
            properties.putIfAbsent("kafka.table-description-supplier", KafkaQueryRunner.TEST);
            this.setExtraKafkaProperties(properties);
        }

        @Override
        public void postInit(DistributedQueryRunner queryRunner) {
            log.info("Loading data...");
            long startTime = System.nanoTime();
            for (TpchTable<?> table : this.tables) {
                long start = System.nanoTime();
                log.info("Running import for %s", new Object[]{table.getTableName()});
                queryRunner.execute(String.format("INSERT INTO %1$s SELECT * FROM tpch.tiny.%1$s", table.getTableName()));
                log.info("Imported %s in %s", new Object[]{table.getTableName(), Duration.nanosSince((long)start).convertToMostSuccinctTimeUnit()});
            }
            log.info("Loading complete in %s", new Object[]{Duration.nanosSince((long)startTime).toString(TimeUnit.SECONDS)});
        }

        private static /* synthetic */ void lambda$preInit$1(Map topicDescriptions, Binder binder) {
            binder.bind(TableDescriptionSupplier.class).toInstance((Object)new MapBasedTableDescriptionSupplier(topicDescriptions));
        }
    }
}

