package com.cloudera.dim.kafka.connect.hdfs;

import com.cloudera.dim.kafka.connect.partition.writers.PartitionWriter;
import com.cloudera.dim.kafka.connect.partition.writers.PartitionWriterConfigDef;
import com.cloudera.dim.kafka.connect.storage.PartitionStorage;
import com.cloudera.dim.kafka.connect.storage.StorageConfigDef;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/dim/kafka/connect/hdfs/HdfsSinkTask.class */
public class HdfsSinkTask extends SinkTask {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) HdfsSinkTask.class);
    private HdfsSinkConnectorConfig hdfsSinkConnectorConfig;
    private Map<TopicPartition, PartitionWriter> partitionWriterMap;
    private SinkTaskContext sinkTaskContext;

    public void initialize(SinkTaskContext sinkTaskContext) {
        super.initialize(sinkTaskContext);
        this.sinkTaskContext = sinkTaskContext;
    }

    public String version() {
        return HdfsSinkConnectorConfig.getVersion();
    }

    public void start(Map<String, String> map) {
        this.partitionWriterMap = new HashMap();
        this.hdfsSinkConnectorConfig = buildConfig(map);
    }

    public void open(Collection<TopicPartition> collection) {
        super.open(collection);
        LOG.info("partitions {} are assigned", collection);
        try {
            for (TopicPartition topicPartition : collection) {
                PartitionWriter partitionWriter = (PartitionWriter) this.hdfsSinkConnectorConfig.getConfiguredInstance(PartitionWriterConfigDef.OUTPUT_WRITER_CONFIG, PartitionWriter.class);
                PartitionStorage partitionStorage = (PartitionStorage) this.hdfsSinkConnectorConfig.getConfiguredInstance(StorageConfigDef.OUTPUT_STORAGE_CONFIG, PartitionStorage.class);
                partitionWriter.open(topicPartition, partitionStorage);
                Long lastCommittedOffset = partitionStorage.lastCommittedOffset();
                LOG.info("last committed offset of partition {} is {} ", topicPartition, lastCommittedOffset);
                if (lastCommittedOffset.longValue() != -1) {
                    this.sinkTaskContext.offset(topicPartition, lastCommittedOffset.longValue() + 1);
                }
                this.partitionWriterMap.put(topicPartition, partitionWriter);
            }
        } catch (Exception e) {
            LOG.error("failed to open the writer", (Throwable) e);
            throw new ConnectException(e);
        }
    }

    public void put(Collection<SinkRecord> collection) {
        try {
            for (SinkRecord sinkRecord : collection) {
                this.partitionWriterMap.get(new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition().intValue())).write(sinkRecord);
            }
        } catch (IOException e) {
            LOG.error("failed to write record, retry", (Throwable) e);
            throw new RetriableException(e);
        }
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
        super.flush(map);
        try {
            Iterator<TopicPartition> it = map.keySet().iterator();
            while (it.hasNext()) {
                PartitionWriter partitionWriter = this.partitionWriterMap.get(it.next());
                if (partitionWriter != null) {
                    partitionWriter.flush();
                }
            }
        } catch (IOException e) {
            LOG.error("failed to flush", (Throwable) e);
            throw new ConnectException(e);
        }
    }

    public void close(Collection<TopicPartition> collection) {
        super.close(collection);
        LOG.info("partitions {} are revoked", collection);
        try {
            for (TopicPartition topicPartition : collection) {
                LOG.info("close writer for partition {}", topicPartition);
                PartitionWriter remove = this.partitionWriterMap.remove(topicPartition);
                if (remove != null) {
                    remove.close();
                }
            }
        } catch (IOException e) {
            LOG.warn("writer was not closed successfully", (Throwable) e);
        }
    }

    public void stop() {
        LOG.info("shutdown task and close all writers");
        try {
            Iterator<PartitionWriter> it = this.partitionWriterMap.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.partitionWriterMap.clear();
        } catch (IOException e) {
            LOG.warn("writer was not closed successfully", (Throwable) e);
        }
    }

    HdfsSinkConnectorConfig buildConfig(Map<String, String> map) {
        return new HdfsSinkConnectorConfig(map);
    }
}
