package com.datastax.oss.dsbulk.connectors.commons;

import com.datastax.oss.dsbulk.config.ConfigUtils;
import com.datastax.oss.dsbulk.connectors.api.Connector;
import com.datastax.oss.dsbulk.connectors.api.DefaultResource;
import com.datastax.oss.dsbulk.connectors.api.Record;
import com.datastax.oss.dsbulk.connectors.api.Resource;
import com.datastax.oss.dsbulk.io.CompressedIOUtils;
import com.datastax.oss.dsbulk.io.IOUtils;
import com.typesafe.config.Config;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.file.FileSystemNotFoundException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;

/* loaded from: input_file:com/datastax/oss/dsbulk/connectors/commons/AbstractFileBasedConnector.class */
public abstract class AbstractFileBasedConnector implements Connector {
    private static final Logger LOGGER;
    protected static final String URL = "url";
    protected static final String URLFILE = "urlfile";
    protected static final String COMPRESSION = "compression";
    protected static final String ENCODING = "encoding";
    protected static final String FILE_NAME_PATTERN = "fileNamePattern";
    protected static final String SKIP_RECORDS = "skipRecords";
    protected static final String MAX_RECORDS = "maxRecords";
    protected static final String MAX_CONCURRENT_FILES = "maxConcurrentFiles";
    protected static final String RECURSIVE = "recursive";
    protected static final String FILE_NAME_FORMAT = "fileNameFormat";
    protected boolean read;
    protected boolean retainRecordSources;
    protected List<URL> urls;
    protected List<Path> roots = new ArrayList();
    protected List<URL> files = new ArrayList();
    protected Charset encoding;
    protected String compression;
    protected String fileNameFormat;
    protected boolean recursive;
    protected String pattern;
    protected long skipRecords;
    protected long maxRecords;
    protected int resourceCount;
    protected int maxConcurrentFiles;
    protected Deque<RecordWriter> writers;
    protected RecordWriter singleWriter;
    protected List<RecordWriter> writersToClose;
    protected AtomicInteger fileCounter;
    protected AtomicInteger nextWriterIndex;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/datastax/oss/dsbulk/connectors/commons/AbstractFileBasedConnector$RecordReader.class */
    public interface RecordReader extends AutoCloseable {
        @NonNull
        RecordReader readNext(@NonNull SynchronousSink<Record> synchronousSink);

        @Override // java.lang.AutoCloseable
        void close() throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/datastax/oss/dsbulk/connectors/commons/AbstractFileBasedConnector$RecordWriter.class */
    public interface RecordWriter extends AutoCloseable {
        void write(@NonNull Record record) throws IOException;

        void flush() throws IOException;

        @Override // java.lang.AutoCloseable
        void close() throws IOException;
    }

    public int readConcurrency() {
        if ($assertionsDisabled || this.read) {
            return Math.max(1, Math.min(this.resourceCount, this.maxConcurrentFiles));
        }
        throw new AssertionError();
    }

    public int writeConcurrency() {
        if (!$assertionsDisabled && this.read) {
            throw new AssertionError();
        }
        if (this.roots.isEmpty()) {
            return 1;
        }
        return this.maxConcurrentFiles;
    }

    public void configure(@NonNull Config config, boolean z, boolean z2) {
        this.read = z;
        this.retainRecordSources = z2;
        this.urls = loadURLs(config);
        this.encoding = ConfigUtils.getCharset(config, ENCODING);
        this.compression = config.getString(COMPRESSION);
        if (!CompressedIOUtils.isSupportedCompression(this.compression, z).booleanValue()) {
            throw new IllegalArgumentException(String.format("Invalid value for connector.csv.%s, valid values: %s, got: '%s'", COMPRESSION, String.join(",", CompressedIOUtils.getSupportedCompressions(z)), this.compression));
        }
        this.pattern = config.getString(FILE_NAME_PATTERN);
        if (!CompressedIOUtils.isNoneCompression(this.compression) && ConfigUtils.hasReferenceValue(config, FILE_NAME_PATTERN)) {
            this.pattern += CompressedIOUtils.getCompressionSuffix(this.compression);
        }
        this.fileNameFormat = config.getString(FILE_NAME_FORMAT);
        if (!CompressedIOUtils.isNoneCompression(this.compression) && ConfigUtils.hasReferenceValue(config, FILE_NAME_FORMAT)) {
            this.fileNameFormat += CompressedIOUtils.getCompressionSuffix(this.compression);
        }
        this.recursive = config.getBoolean(RECURSIVE);
        if ("AUTO".equals(config.getString(MAX_CONCURRENT_FILES))) {
            this.maxConcurrentFiles = ConfigUtils.resolveThreads(z ? "1C" : "0.5C");
        } else {
            this.maxConcurrentFiles = ConfigUtils.getThreads(config, MAX_CONCURRENT_FILES);
        }
        this.skipRecords = config.getLong(SKIP_RECORDS);
        this.maxRecords = config.getLong(MAX_RECORDS);
    }

    public void init() throws URISyntaxException, IOException {
        if (this.read) {
            processURLsForRead();
            return;
        }
        processURLsForWrite();
        this.fileCounter = new AtomicInteger(0);
        this.nextWriterIndex = new AtomicInteger(0);
        this.writersToClose = new ArrayList();
        if (this.roots.isEmpty() || this.maxConcurrentFiles <= 1) {
            this.singleWriter = newSingleFileWriter();
            this.writersToClose.add(this.singleWriter);
            return;
        }
        this.writers = new ConcurrentLinkedDeque();
        for (int i = 0; i < this.maxConcurrentFiles; i++) {
            RecordWriter newSingleFileWriter = newSingleFileWriter();
            this.writers.add(newSingleFileWriter);
            this.writersToClose.add(newSingleFileWriter);
        }
    }

    @NonNull
    public Publisher<Resource> read() {
        if ($assertionsDisabled || this.read) {
            return Flux.concat(new Publisher[]{Flux.fromIterable(this.roots).flatMap(this::scanRootDirectory), Flux.fromIterable(this.files)}).map(url -> {
                URI create = URI.create(url.toExternalForm());
                return new DefaultResource(create, readSingleFile(url, create).transform(this::applyPerFileLimits));
            });
        }
        throw new AssertionError();
    }

    @NonNull
    public Function<Publisher<Record>, Publisher<Record>> write() {
        if ($assertionsDisabled || !this.read) {
            return (this.roots.isEmpty() || this.maxConcurrentFiles <= 1) ? publisher -> {
                return writeRecordFlux(publisher, this.singleWriter);
            } : publisher2 -> {
                return Flux.deferContextual(contextView -> {
                    RecordWriter recordWriter = (RecordWriter) contextView.get("WRITER");
                    return writeRecordFlux(publisher2, recordWriter).doOnTerminate(() -> {
                        this.writers.offer(recordWriter);
                    });
                }).contextWrite(context -> {
                    return context.put("WRITER", this.writers.remove());
                });
            };
        }
        throw new AssertionError();
    }

    private Flux<Record> writeRecordFlux(Publisher<Record> publisher, RecordWriter recordWriter) {
        return Flux.from(publisher).concatMap(record -> {
            try {
                recordWriter.write(record);
                return Mono.just(record);
            } catch (Exception e) {
                return Mono.error(e);
            }
        }, 500).concatWith(Flux.create(fluxSink -> {
            try {
                recordWriter.flush();
                fluxSink.complete();
            } catch (Exception e) {
                fluxSink.error(e);
            }
        }));
    }

    public void close() {
        if (this.writersToClose != null) {
            IOException iOException = null;
            for (RecordWriter recordWriter : this.writersToClose) {
                try {
                    recordWriter.flush();
                    recordWriter.close();
                } catch (IOException e) {
                    if (iOException == null) {
                        iOException = e;
                    } else {
                        iOException.addSuppressed(e);
                    }
                }
            }
            if (iOException != null) {
                throw new UncheckedIOException(iOException);
            }
        }
    }

    @NonNull
    protected abstract String getConnectorName();

    @NonNull
    protected Flux<Record> readSingleFile(@NonNull URL url, @NonNull URI uri) {
        return Flux.generate(() -> {
            return newSingleFileReader(url, uri);
        }, (v0, v1) -> {
            return v0.readNext(v1);
        }, recordReader -> {
            try {
                recordReader.close();
            } catch (IOException e) {
                LOGGER.error("Error closing " + url, e);
            }
        });
    }

    @NonNull
    protected abstract RecordReader newSingleFileReader(@NonNull URL url, URI uri) throws IOException;

    @NonNull
    protected abstract RecordWriter newSingleFileWriter();

    @NonNull
    protected List<URL> loadURLs(@NonNull Config config) {
        boolean isPathPresentAndNotEmpty = ConfigUtils.isPathPresentAndNotEmpty(config, URL);
        boolean isPathPresentAndNotEmpty2 = ConfigUtils.isPathPresentAndNotEmpty(config, URLFILE);
        if (this.read) {
            if (!isPathPresentAndNotEmpty && !isPathPresentAndNotEmpty2) {
                throw new IllegalArgumentException(String.format("A URL or URL file is mandatory when using the %s connector for LOAD. Please set connector.%s.url or connector.%s.urlfile and try again. See settings.md or help for more information.", getConnectorName(), getConnectorName(), getConnectorName()));
            }
            if (isPathPresentAndNotEmpty && isPathPresentAndNotEmpty2) {
                LOGGER.debug("You specified both URL and URL file. The URL file will take precedence.");
            }
        } else {
            if (isPathPresentAndNotEmpty2) {
                throw new IllegalArgumentException("The urlfile parameter is not supported for UNLOAD");
            }
            if (!isPathPresentAndNotEmpty) {
                throw new IllegalArgumentException(String.format("A URL is mandatory when using the %s connector for UNLOAD. Please set connector.%s.url and try again. See settings.md or help for more information.", getConnectorName(), getConnectorName()));
            }
        }
        if (!isPathPresentAndNotEmpty2) {
            return Collections.singletonList(ConfigUtils.getURL(config, URL));
        }
        try {
            return ConfigUtils.getURLsFromFile(ConfigUtils.getPath(config, URLFILE));
        } catch (IOException e) {
            throw new IllegalArgumentException("Problem when retrieving urls from file specified by the URL file parameter", e);
        }
    }

    protected void processURLsForRead() throws URISyntaxException, IOException {
        this.resourceCount = 0;
        for (URL url : this.urls) {
            try {
                Path path = Paths.get(url.toURI());
                if (!Files.isDirectory(path, new LinkOption[0])) {
                    this.resourceCount++;
                    this.files.add(url);
                } else {
                    if (!Files.isReadable(path)) {
                        throw new IllegalArgumentException(String.format("Directory is not readable: %s.", path));
                        break;
                    }
                    this.roots.add(path);
                    int intValue = ((Long) Objects.requireNonNull((Long) scanRootDirectory(path).take(1000L).count().block())).intValue();
                    if (intValue == 0) {
                        if (IOUtils.countReadableFiles(path, this.recursive) == 0) {
                            LOGGER.warn("Directory {} has no readable files.", path);
                        } else {
                            LOGGER.warn("No files in directory {} matched the connector.{}.fileNamePattern of \"{}\".", new Object[]{path, getConnectorName(), this.pattern});
                        }
                    }
                    this.resourceCount += intValue;
                }
            } catch (FileSystemNotFoundException e) {
                this.files.add(url);
                this.resourceCount++;
            }
        }
    }

    protected void processURLsForWrite() throws URISyntaxException, IOException {
        try {
            this.resourceCount = -1;
            Path path = Paths.get(this.urls.get(0).toURI());
            if (!Files.exists(path, new LinkOption[0])) {
                path = Files.createDirectories(path, new FileAttribute[0]);
            }
            if (Files.isDirectory(path, new LinkOption[0])) {
                if (!Files.isWritable(path)) {
                    throw new IllegalArgumentException(String.format("Directory is not writable: %s.", path));
                }
                if (IOUtils.isDirectoryNonEmpty(path)) {
                    throw new IllegalArgumentException(String.format("Invalid value for connector.%s.url: target directory " + path + " must be empty.", getConnectorName()));
                }
                this.roots.add(path);
            }
        } catch (FileSystemNotFoundException e) {
        }
    }

    @NonNull
    protected Flux<URL> scanRootDirectory(@NonNull Path path) {
        try {
            Stream<Path> walk = Files.walk(path, this.recursive ? Integer.MAX_VALUE : 1, new FileVisitOption[0]);
            PathMatcher pathMatcher = path.getFileSystem().getPathMatcher("glob:" + this.pattern);
            Flux filter = Flux.fromStream(walk).filter(Files::isReadable).filter(path2 -> {
                return Files.isRegularFile(path2, new LinkOption[0]);
            });
            Objects.requireNonNull(pathMatcher);
            return filter.filter(pathMatcher::matches).map(path3 -> {
                try {
                    return path3.toUri().toURL();
                } catch (MalformedURLException e) {
                    throw new UncheckedIOException(e);
                }
            });
        } catch (IOException e) {
            throw new UncheckedIOException("Error scanning directory " + path, e);
        }
    }

    @NonNull
    protected Flux<Record> applyPerFileLimits(@NonNull Flux<Record> flux) {
        if (this.skipRecords > 0) {
            flux = flux.skip(this.skipRecords);
        }
        if (this.maxRecords != -1) {
            flux = flux.take(this.maxRecords);
        }
        return flux;
    }

    @NonNull
    protected URL getOrCreateDestinationURL() {
        if (this.roots.isEmpty()) {
            return this.urls.get(0);
        }
        try {
            return this.roots.get(0).resolve(String.format(this.fileNameFormat, Integer.valueOf(this.fileCounter.incrementAndGet()))).toUri().toURL();
        } catch (MalformedURLException e) {
            throw new UncheckedIOException(String.format("Could not create file URL with format %s", this.fileNameFormat), e);
        }
    }

    protected boolean isDataSizeSamplingAvailable() {
        return this.read && this.urls.stream().noneMatch(IOUtils::isStandardStream);
    }

    static {
        $assertionsDisabled = !AbstractFileBasedConnector.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(AbstractFileBasedConnector.class);
    }
}
