package org.apache.beam.examples.complete.kafkatopubsub.kafka.consumer;

import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.HashSet;
import java.util.Map;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/examples/complete/kafkatopubsub/kafka/consumer/SslConsumerFactoryFn.class */
public class SslConsumerFactoryFn implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> {
    private final Map<String, String> sslConfig;
    private static final String TRUSTSTORE_LOCAL_PATH = "/tmp/kafka.truststore.jks";
    private static final String KEYSTORE_LOCAL_PATH = "/tmp/kafka.keystore.jks";
    private static final Logger LOG = LoggerFactory.getLogger(SslConsumerFactoryFn.class);

    public SslConsumerFactoryFn(Map<String, String> map) {
        this.sslConfig = map;
    }

    public Consumer<byte[], byte[]> apply(Map<String, Object> map) {
        String str = this.sslConfig.get("ssl.truststore.location");
        String str2 = this.sslConfig.get("ssl.keystore.location");
        if (str == null || str2 == null) {
            LOG.warn("Not enough information to configure SSL");
            return new KafkaConsumer(map);
        }
        try {
            if (str.startsWith("gs://")) {
                getGcsFileAsLocal(str, TRUSTSTORE_LOCAL_PATH);
                this.sslConfig.put("ssl.truststore.location", TRUSTSTORE_LOCAL_PATH);
            } else {
                checkFileExists(str);
            }
            if (str2.startsWith("gs://")) {
                getGcsFileAsLocal(str2, KEYSTORE_LOCAL_PATH);
                this.sslConfig.put("ssl.keystore.location", KEYSTORE_LOCAL_PATH);
            } else {
                checkFileExists(str2);
            }
            map.put("security.protocol", SecurityProtocol.SASL_SSL.name());
            map.put("ssl.truststore.location", this.sslConfig.get("ssl.truststore.location"));
            map.put("ssl.keystore.location", this.sslConfig.get("ssl.keystore.location"));
            map.put("ssl.truststore.password", this.sslConfig.get("ssl.truststore.password"));
            map.put("ssl.keystore.password", this.sslConfig.get("ssl.keystore.password"));
            map.put("ssl.key.password", this.sslConfig.get("ssl.key.password"));
            return new KafkaConsumer(map);
        } catch (IOException e) {
            LOG.error("Failed to retrieve data for SSL", e);
            return new KafkaConsumer(map);
        }
    }

    private void checkFileExists(String str) throws IOException {
        LOG.info("Trying to get file: {} locally. Local files don't support when in using distribute runner", str);
        File file = new File(str);
        if (file.exists()) {
            LOG.debug("{} exists", file.getAbsolutePath());
        } else {
            LOG.error("{} does not exist", file.getAbsolutePath());
            throw new IOException();
        }
    }

    public static void getGcsFileAsLocal(String str, String str2) throws IOException {
        LOG.info("Reading contents from GCS file: {}", str);
        HashSet hashSet = new HashSet(2);
        hashSet.add(StandardOpenOption.CREATE);
        hashSet.add(StandardOpenOption.APPEND);
        ReadableByteChannel open = FileSystems.open(FileSystems.matchSingleFileSpec(str).resourceId());
        try {
            FileChannel open2 = FileChannel.open(Paths.get(str2, new String[0]), hashSet, new FileAttribute[0]);
            Throwable th = null;
            try {
                try {
                    open2.transferFrom(open, 0L, Long.MAX_VALUE);
                    if (open2 != null) {
                        $closeResource(null, open2);
                    }
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (open2 != null) {
                    $closeResource(th, open2);
                }
                throw th3;
            }
        } finally {
            if (open != null) {
                $closeResource(null, open);
            }
        }
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
