/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.ksn.testcontainers;

import com.github.dockerjava.api.command.InspectContainerResponse;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;

class BrokerContainer
extends GenericContainer<BrokerContainer> {
    static final String HOST_PREFIX = "broker-";
    private static final AtomicInteger COUNT = new AtomicInteger(0);
    private static final Map<String, String> DEFAULT_CONFIGS = new HashMap<String, String>();
    private static final String STARTUP_SCRIPT_PATH = "/pulsar/bin/run-broker.sh";
    private static final String INTERNAL_LISTENER = "kafka_internal";
    private static final String EXTERNAL_LISTENER = "kafka_external";
    private static final int INTERNAL_PORT = 9092;
    private final String host;
    private final int port;

    BrokerContainer(DockerImageName imageName, Network network, String metadataStoreUrl, String cluster, Map<String, String> extraConfigs) {
        super(imageName);
        int id = COUNT.getAndIncrement();
        this.host = HOST_PREFIX + id;
        this.port = 19092 + id;
        this.setPortBindings(Collections.singletonList(this.port + ":" + this.port));
        ((BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)((BrokerContainer)this.withNetwork(network)).withNetworkAliases(new String[]{this.host})).withCopyFileToContainer(MountableFile.forClasspathResource((String)"run-broker.sh"), STARTUP_SCRIPT_PATH)).withCreateContainerCmdModifier(cmd -> cmd.withHostName(this.host))).withEnv("PULSAR_MEM", "-Xmx512M")).addBrokerConfig("metadataStoreUrl", metadataStoreUrl).addBrokerConfig("configurationMetadataStoreUrl", metadataStoreUrl).addBrokerConfig("clusterName", cluster).withLogConsumer(outputFrame -> System.out.println("[" + this.host + "] " + outputFrame.getUtf8StringWithoutLineEnding()))).withExposedPorts(new Integer[]{6650, 8080, 9092})).withCommand("bash /pulsar/bin/run-broker.sh");
        DEFAULT_CONFIGS.forEach(this::addBrokerConfig);
        extraConfigs.forEach(this::addBrokerConfig);
    }

    int getPort() {
        return this.port;
    }

    protected void containerIsStarting(InspectContainerResponse containerInfo) {
        super.containerIsStarting(containerInfo);
        String kafkaListeners = "kafka_internal://0.0.0.0:9092,kafka_external://0.0.0.0:" + this.port;
        String kafkaAdvertisedListeners = "kafka_internal://" + this.host + ":" + 9092 + "," + EXTERNAL_LISTENER + "://127.0.0.1:" + this.port;
        String pulsarListeners = "pulsar:pulsar://" + this.host + ":6650";
        String prefix = "export PULSAR_PREFIX_";
        String env = "export PULSAR_PREFIX_kafkaListeners=" + kafkaListeners + "\n" + "export PULSAR_PREFIX_" + "kafkaAdvertisedListeners=" + kafkaAdvertisedListeners + "\n" + "export PULSAR_PREFIX_" + "advertisedListeners=" + pulsarListeners + "\n";
        this.copyFileToContainer(Transferable.of((String)env, (int)493), "/pulsar/conf/add_to_env.sh");
    }

    private BrokerContainer addBrokerConfig(String key, String value) {
        return (BrokerContainer)this.withEnv("PULSAR_PREFIX_" + key, value);
    }

    static {
        DEFAULT_CONFIGS.put("messagingProtocols", "kafka");
        DEFAULT_CONFIGS.put("brokerEntryMetadataInterceptors", "org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor");
        DEFAULT_CONFIGS.put("brokerDeleteInactiveTopicsEnabled", "false");
        DEFAULT_CONFIGS.put("kafkaTransactionCoordinatorEnabled", "true");
        DEFAULT_CONFIGS.put("brokerDeduplicationEnabled", "true");
        DEFAULT_CONFIGS.put("defaultRetentionTimeInMinutes", "-1");
        DEFAULT_CONFIGS.put("defaultRetentionSizeInMB", "-1");
        DEFAULT_CONFIGS.put("ttlDurationDefaultInSeconds", "259200");
        DEFAULT_CONFIGS.put("kafkaProtocolMap", String.format("%s:PLAINTEXT,%s:PLAINTEXT", INTERNAL_LISTENER, EXTERNAL_LISTENER));
    }
}

