package com.cloudera.dim.kafka.connect.hdfs;

import com.cloudera.dim.kafka.connect.storage.OffsetCommittable;
import com.cloudera.dim.kafka.connect.storage.PartitionStorage;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.URI;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/dim/kafka/connect/hdfs/HdfsPartitionStorage.class */
public class HdfsPartitionStorage extends OutputStream implements PartitionStorage {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) HdfsPartitionStorage.class);
    private UserGroupInformation userGroupInformation;
    private FSDataOutputStream fsDataOutputStream;
    private TopicPartition topicPartition;
    private FileContext fileContext;
    private Path tempFilePath;
    private String fileExtension;

    public void configure(Map<String, ?> map) {
        HdfsSinkConnectorConfig hdfsSinkConnectorConfig = new HdfsSinkConnectorConfig(map);
        Configuration configuration = new Configuration();
        String string = hdfsSinkConnectorConfig.getString(HdfsSinkConnectorConfig.HADOOP_CONF_PATH_CONFIG);
        if (string != null) {
            LOG.info("loading hadoop config from {}", string);
            configuration.addResource(new Path(string, YarnConfiguration.CORE_SITE_CONFIGURATION_FILE));
            configuration.addResource(new Path(string, "hdfs-site.xml"));
        }
        hdfsSinkConnectorConfig.originalsWithPrefix(HdfsSinkConnectorConfig.HADOOP_EXTRA_CONFIG_PREFIX).forEach((str, obj) -> {
            configuration.set(str, obj.toString());
        });
        if (hdfsSinkConnectorConfig.getBoolean(HdfsSinkConnectorConfig.HDFS_KERBEROS_AUTHENTICATION_CONFIG).booleanValue()) {
            configuration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
            configuration.set(HdfsClientConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, hdfsSinkConnectorConfig.getString(HdfsSinkConnectorConfig.HDFS_KERBEROS_NAMENODE_PRINCIPAL_CONFIG));
            UserGroupInformation.setConfiguration(configuration);
            String string2 = hdfsSinkConnectorConfig.getString(HdfsSinkConnectorConfig.HDFS_KERBEROS_USER_PRINCIPAL_CONFIG);
            try {
                UserGroupInformation.loginUserFromKeytab(SecurityUtil.getServerPrincipal(string2, InetAddress.getLocalHost()), hdfsSinkConnectorConfig.getString(HdfsSinkConnectorConfig.HDFS_KERBEROS_KEYTAB_PATH_CONFIG));
                this.userGroupInformation = UserGroupInformation.getLoginUser();
            } catch (IOException e) {
                LOG.error("failed to authenticate via Kerberos", (Throwable) e);
                throw new ConnectException(e);
            }
        }
        String string3 = hdfsSinkConnectorConfig.getString(HdfsSinkConnectorConfig.HDFS_URI_CONFIG);
        String string4 = hdfsSinkConnectorConfig.getString(HdfsSinkConnectorConfig.HDFS_OUTPUT_CONFIG);
        PrivilegedExceptionAction privilegedExceptionAction = () -> {
            this.fileContext = FileContext.getFileContext(URI.create(string3), configuration);
            Path path = new Path(string4);
            this.fileContext.mkdir(path, FileContext.DIR_DEFAULT_PERM, true);
            this.fileContext.setWorkingDirectory(path);
            return null;
        };
        try {
            if (this.userGroupInformation != null) {
                this.userGroupInformation.doAs(privilegedExceptionAction);
            } else {
                privilegedExceptionAction.run();
            }
        } catch (FileAlreadyExistsException e2) {
            LOG.info("{} already exists", string4);
        } catch (Exception e3) {
            LOG.error("Hadoop file system exception", (Throwable) e3);
            throw new ConnectException(e3);
        }
    }

    @Override // com.cloudera.dim.kafka.connect.storage.PartitionStorage
    public void open(TopicPartition topicPartition, String str) throws Exception {
        this.topicPartition = topicPartition;
        this.fileExtension = str;
        this.tempFilePath = new Path(topicPartition.topic(), String.format("%s_%d.%s", topicPartition.topic(), Integer.valueOf(topicPartition.partition()), str));
        PrivilegedExceptionAction privilegedExceptionAction = () -> {
            this.fsDataOutputStream = this.fileContext.create(this.tempFilePath, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent());
            return null;
        };
        if (this.userGroupInformation != null) {
            this.userGroupInformation.doAs(privilegedExceptionAction);
        } else {
            privilegedExceptionAction.run();
        }
    }

    @Override // com.cloudera.dim.kafka.connect.storage.PartitionStorage
    public OutputStream getOutputStream() {
        return this;
    }

    @Override // java.io.OutputStream
    public void write(int i) throws IOException {
        this.fsDataOutputStream.write(i);
    }

    @Override // java.io.OutputStream, java.io.Flushable
    public void flush() throws IOException {
        super.flush();
        this.fsDataOutputStream.flush();
    }

    @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        super.close();
        this.fsDataOutputStream.close();
    }

    @Override // com.cloudera.dim.kafka.connect.storage.OffsetCommittable
    public void commit(long j, OffsetCommittable.CommitCallback commitCallback) throws Exception {
        if (this.fsDataOutputStream != null) {
            this.fsDataOutputStream.close();
            PrivilegedExceptionAction privilegedExceptionAction = () -> {
                commitToHdfs(j);
                this.fsDataOutputStream = this.fileContext.create(this.tempFilePath, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent());
                commitCallback.commitCompleted();
                return null;
            };
            if (this.userGroupInformation != null) {
                this.userGroupInformation.doAs(privilegedExceptionAction);
            } else {
                privilegedExceptionAction.run();
            }
        }
    }

    @Override // com.cloudera.dim.kafka.connect.storage.OffsetCommittable
    public Long lastCommittedOffset() {
        if (this.topicPartition == null) {
            throw new IllegalStateException("the partition storage hasn't been opened yet");
        }
        PrivilegedAction privilegedAction = () -> {
            FileStatus[] allCompletedFiles = getAllCompletedFiles(this.topicPartition);
            LOG.info("found completed files {}", Arrays.toString(allCompletedFiles));
            return recoverLastOffset(allCompletedFiles);
        };
        return this.userGroupInformation != null ? (Long) this.userGroupInformation.doAs(privilegedAction) : (Long) privilegedAction.run();
    }

    public FSDataOutputStream getFsDataOutputStream() {
        return this.fsDataOutputStream;
    }

    private void commitToHdfs(long j) throws IOException {
        String format = String.format("%s_%d_%d.%s", this.topicPartition.topic(), Integer.valueOf(this.topicPartition.partition()), Long.valueOf(j), this.fileExtension);
        this.fileContext.rename(this.tempFilePath, new Path(this.topicPartition.topic(), format), Options.Rename.OVERWRITE);
        LOG.info("committed {} to {}", this.tempFilePath.getName(), format);
    }

    private Long extractOffsetSuffix(String str) {
        int lastIndexOf;
        if (str == null || str.isEmpty() || (lastIndexOf = str.lastIndexOf(95)) == -1) {
            return null;
        }
        try {
            return Long.valueOf(Long.parseLong(str.substring(lastIndexOf + 1)));
        } catch (NumberFormatException e) {
            LOG.error("offset suffix of file {} not in number format", str);
            return null;
        }
    }

    private FileStatus[] getAllCompletedFiles(TopicPartition topicPartition) {
        Path path = new Path(topicPartition.topic());
        Pattern compile = Pattern.compile(String.format("%s_%d_\\d+\\.%s", topicPartition.topic(), Integer.valueOf(topicPartition.partition()), this.fileExtension));
        try {
            return this.fileContext.util().listStatus(path, path2 -> {
                return compile.matcher(path2.getName()).matches();
            });
        } catch (IOException e) {
            LOG.warn("failed to find any completed file, {}", e.getLocalizedMessage());
            return new FileStatus[0];
        }
    }

    private Long recoverLastOffset(FileStatus[] fileStatusArr) {
        return Long.valueOf(Arrays.stream(fileStatusArr).map(fileStatus -> {
            return fileStatus.getPath().getName();
        }).map(this::trimExtension).mapToLong(this::extractOffsetSuffix).max().orElse(-1L));
    }

    private String trimExtension(String str) {
        int lastIndexOf;
        return (str == null || (lastIndexOf = str.lastIndexOf(new StringBuilder().append(".").append(this.fileExtension).toString())) == -1) ? str : str.substring(0, lastIndexOf);
    }
}
