/*
 * Decompiled with CFR 0.152.
 */
package com.icetech.mqtt.autoconfigure;

import com.icetech.mqtt.autoconfigure.MqttAsyncClientAdapter;
import com.icetech.mqtt.autoconfigure.MqttConnectOptionsAdapter;
import com.icetech.mqtt.autoconfigure.MqttSubscribeProcessor;
import com.icetech.mqtt.properties.MqttProperties;
import com.icetech.mqtt.subscriber.MqttSubscriber;
import com.icetech.mqtt.subscriber.TopicPair;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.util.StringUtils;

public class MqttConnector
implements DisposableBean {
    private static final Logger log = LoggerFactory.getLogger(MqttConnector.class);
    public static final Map<String, IMqttAsyncClient> MQTT_CLIENT_MAP = new HashMap<String, IMqttAsyncClient>();
    public static final Map<String, Integer> MQTT_DEFAULT_QOS_MAP = new HashMap<String, Integer>();
    public static String DefaultClientId;
    public static int DefaultPublishQos;
    private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);
    private MqttProperties properties;

    public static IMqttAsyncClient getDefaultClient() {
        if (StringUtils.hasLength((String)DefaultClientId)) {
            return MQTT_CLIENT_MAP.get(DefaultClientId);
        }
        if (!MQTT_CLIENT_MAP.isEmpty()) {
            return MQTT_CLIENT_MAP.values().iterator().next();
        }
        return null;
    }

    public static int getDefaultQosById(String clientId) {
        if (StringUtils.hasLength((String)clientId)) {
            return MQTT_DEFAULT_QOS_MAP.getOrDefault(clientId, 0);
        }
        return DefaultPublishQos;
    }

    public static IMqttAsyncClient getClientById(String clientId) {
        if (StringUtils.hasLength((String)clientId)) {
            return MQTT_CLIENT_MAP.get(clientId);
        }
        return MqttConnector.getDefaultClient();
    }

    public void start(MqttAsyncClientAdapter clientAdapter, MqttProperties properties, MqttConnectOptionsAdapter adapter) {
        if (properties.getDisable() == null || !properties.getDisable().booleanValue()) {
            MqttSubscribeProcessor.SUBSCRIBERS.sort(Comparator.comparingInt(MqttSubscriber::getOrder));
            this.properties = properties;
            properties.forEach((id, options) -> {
                try {
                    adapter.configure((String)id, (MqttConnectOptions)options);
                    IMqttAsyncClient client = clientAdapter.create((String)id, options.getServerURIs());
                    if (client != null) {
                        MQTT_CLIENT_MAP.put((String)id, client);
                        MQTT_DEFAULT_QOS_MAP.put((String)id, properties.getDefaultPublishQos((String)id));
                        if (!StringUtils.hasLength((String)DefaultClientId)) {
                            DefaultClientId = id;
                            DefaultPublishQos = MQTT_DEFAULT_QOS_MAP.get(id);
                            log.info("Default mqtt client is '{}'", (Object)DefaultClientId);
                        }
                        this.scheduled.schedule(new ReConnect(client, (MqttConnectOptions)options), 1L, TimeUnit.MILLISECONDS);
                    }
                }
                catch (MqttException exception) {
                    exception.printStackTrace();
                }
            });
        }
    }

    private void connect(final IMqttAsyncClient client, final MqttConnectOptions options) {
        try {
            client.connect(options, null, new IMqttActionListener(){

                public void onSuccess(IMqttToken asyncActionToken) {
                    try {
                        log.info("Connect success. client_id is [{}], brokers is [{}].", (Object)client.getClientId(), (Object)String.join((CharSequence)",", options.getServerURIs()));
                        MqttConnector.this.subscribe(client);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }

                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    try {
                        log.error("Connect failure. client_id is [{}], brokers is [{}]. retry after {} ms.", new Object[]{client.getClientId(), String.join((CharSequence)",", options.getServerURIs()), options.getMaxReconnectDelay()});
                        MqttConnector.this.scheduled.schedule(new ReConnect(client, options), (long)options.getMaxReconnectDelay(), TimeUnit.MILLISECONDS);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            client.setCallback((MqttCallback)new MqttCallbackExtended(){
                private final String clientId;
                {
                    this.clientId = client.getClientId();
                }

                public void connectComplete(boolean reconnect, String serverURI) {
                    if (reconnect) {
                        log.info("Mqtt reconnection success.");
                        MqttConnector.this.subscribe(client);
                    }
                }

                public void connectionLost(Throwable cause) {
                    log.warn("Mqtt connection lost.");
                }

                public void messageArrived(String topic, MqttMessage message) {
                    for (MqttSubscriber subscriber : MqttSubscribeProcessor.SUBSCRIBERS) {
                        try {
                            subscriber.accept(this.clientId, topic, message);
                        }
                        catch (Exception e) {
                            log.error("Mqtt subscriber process error.", (Throwable)e);
                        }
                    }
                }

                public void deliveryComplete(IMqttDeliveryToken token) {
                }
            });
        }
        catch (MqttException e) {
            e.printStackTrace();
        }
    }

    private void subscribe(IMqttAsyncClient client) {
        String clientId = client.getClientId();
        boolean sharedEnable = this.properties.isSharedEnable(clientId);
        try {
            Set<TopicPair> topicPairs = this.mergeTopics(clientId, sharedEnable);
            if (topicPairs.isEmpty() && !StringUtils.isEmpty((Object)this.properties.getSubTopic())) {
                log.warn("There is no topic has been found for client '{}'.", (Object)clientId);
                return;
            }
            Object[] topics = new String[topicPairs.size()];
            int[] QOSs = new int[topicPairs.size()];
            int i = 0;
            for (TopicPair topicPair : topicPairs) {
                topics[i] = topicPair.getTopic(sharedEnable);
                QOSs[i] = topicPair.getQos();
                ++i;
            }
            client.subscribe((String[])topics, QOSs);
            log.info("Mqtt client '{}' subscribe success. topics : " + Arrays.toString(topics), (Object)clientId);
        }
        catch (MqttException e) {
            log.error("Mqtt client '{}' subscribe failure.", (Object)clientId, (Object)e);
        }
    }

    private Set<TopicPair> mergeTopics(String clientId, boolean sharedEnable) {
        HashSet<TopicPair> topicPairs = new HashSet<TopicPair>();
        for (MqttSubscriber subscriber : MqttSubscribeProcessor.SUBSCRIBERS) {
            if (!subscriber.contains(clientId)) continue;
            topicPairs.addAll(subscriber.getTopics());
        }
        if (topicPairs.isEmpty()) {
            return topicPairs;
        }
        TopicPair[] pairs = new TopicPair[topicPairs.size()];
        block1: for (TopicPair topic : topicPairs) {
            for (int i = 0; i < pairs.length; ++i) {
                TopicPair pair = pairs[i];
                if (pair == null) {
                    pairs[i] = topic;
                    continue block1;
                }
                if (pair.getQos() != topic.getQos()) continue;
                String temp = pair.getTopic(sharedEnable).replace('+', '\u0000').replace("#", "\u0000/\u0000");
                if (MqttTopic.isMatched((String)topic.getTopic(sharedEnable), (String)temp)) {
                    pairs[i] = topic;
                    continue;
                }
                temp = topic.getTopic(sharedEnable).replace('+', '\u0000').replace("#", "\u0000/\u0000");
                if (MqttTopic.isMatched((String)pair.getTopic(sharedEnable), (String)temp)) continue block1;
            }
        }
        return Arrays.stream(pairs).filter(Objects::nonNull).collect(Collectors.toSet());
    }

    public void destroy() {
        log.info("Shutting down mqtt clients.");
        MQTT_CLIENT_MAP.forEach((id, client) -> {
            try {
                if (client.isConnected()) {
                    client.disconnect();
                }
            }
            catch (Exception e) {
                log.error("Mqtt disconnect error: {}", (Object)e.getMessage(), (Object)e);
            }
            try {
                client.close();
            }
            catch (Exception e) {
                log.error("Mqtt close error: {}", (Object)e.getMessage(), (Object)e);
            }
        });
        MQTT_CLIENT_MAP.clear();
    }

    private class ReConnect
    implements Runnable {
        final IMqttAsyncClient client;
        final MqttConnectOptions options;

        ReConnect(IMqttAsyncClient client, MqttConnectOptions options) {
            this.client = client;
            this.options = options;
        }

        @Override
        public void run() {
            MqttConnector.this.connect(this.client, this.options);
        }
    }
}

