Spring Boot සහ IoT: MQTT හරහා දත්ත එකතු කිරීම | Sinhala Guide

Spring Boot සහ IoT: MQTT හරහා දත්ත එකතු කිරීම | Sinhala Guide

ආයුබෝවන් හැමෝටම! 👋 අද අපි කතා කරන්න යන්නේ මේ දවස් වල ගොඩක් දෙනෙක්ගේ අවධානයට ලක්වෙලා තියෙන Software Engineering TopiC එකක් ගැන – ඒ තමයි Spring Boot සහ IoT (Internet of Things). අපි බලමු IoT උපාංග වලින් එන දත්ත, MQTT කියන lightweight messaging protocol එක භාවිතා කරලා Spring Boot Application එකකට කොහොමද සාර්ථකව එකතු කරගන්නේ කියලා. මේක IoT Data Collector එකක් හදන්න හොඳ ආරම්භයක් වේවි.

IoT කියන්නේ අපේ දෛනික ජීවිතයේ තියෙන, සාමාන්‍යයෙන් අන්තර්ජාලයට සම්බන්ධ නැති උපාංග (Sensors, actuators, smart devices) අන්තර්ජාලයට සම්බන්ධ කරලා ඒවායෙන් දත්ත ලබාගෙන, ඒවා විශ්ලේෂණය කරලා තීරණ ගන්න පුළුවන් කරන තාක්ෂණයට. හිතන්නකෝ, ගොවිපළක උෂ්ණත්වය, ආර්ද්‍රතාවය මනින sensors, ගෙදර විදුලි බිල අඩු කරන්න පුළුවන් smart plugs වගේ දේවල්. මේවායින් එන දත්ත නිවැරදිව සහ කාර්යක්ෂමව handle කරන්න අපිට ස්ථාවර backend system එකක් අවශ්‍යයි.

අන්න ඒ වෙලාවට තමයි Spring Boot වල වටිනාකම මතුවෙන්නේ. Spring Boot කියන්නේ production-ready applications ඉක්මනින් සහ පහසුවෙන් හදන්න පුළුවන් බලවත් Framework එකක්. IoT ලෝකයේදී, Spring Boot වලට පුළුවන් IoT devices වලින් එන විශාල දත්ත ප්‍රමාණයක් විශ්වාසවන්තව process කරලා, ඒවා ගබඩා කරලා, අවශ්‍ය අයට API හරහා ලබා දෙන්න. ඒ වගේම MQTT (Message Queuing Telemetry Transport) කියන්නේ IoT devices අතර දත්ත හුවමාරු කරගන්න ගොඩක් ජනප්‍රිය, සැහැල්ලු Protocol එකක්. Low-bandwidth, high-latency, unreliable networks වලදී වුණත් හොඳට වැඩ කරන නිසා IoT වලට මේක කදිමයි.

ඉතින් අද මේ guide එකෙන් අපි බලමු:

  • IoT සහ MQTT වල මූලික කරුණු ටිකක්.
  • Spring Boot Project එකක් MQTT Support එක්ක හදාගන්න හැටි.
  • MQTT හරහා දත්ත receive කරන (subscribe) සහ send කරන (publish) විදිය.
  • අපි receive කරන දත්ත collect කරලා, REST API එකක් හරහා පෙන්වන විදිය.

ලංකාවේ අපි හැමෝටම මේ වගේ නවීන තාක්ෂණයන් ගැන දැනුවත් වීම ගොඩක් වටිනවා. අපි එහෙනම් පටන් ගමු නේද?

IoT සහ MQTT මූලික කරුණු

IoT යනු කුමක්ද? (What is IoT?)

සරලවම කිව්වොත්, IoT කියන්නේ අන්තර්ජාලයට සම්බන්ධ වෙන්න පුළුවන්, එකිනෙකාට දත්ත හුවමාරු කරගන්න පුළුවන් 'smart' උපාංග ජාලයක්. මේවාට sensors, actuators, software, සහ අනෙකුත් තාක්ෂණික දේවල් ඇතුළත් වෙනවා. මේ උපාංග වල ප්‍රධාන කාර්යය තමයි පරිසරයෙන් දත්ත එකතු කරලා, ඒ දත්ත අන්තර්ජාලය හරහා වෙනත් devices වලට හෝ servers වලට යැවීම. උදාහරණයක් විදියට, Smart Home Systems, Industrial Automation, Smart Agriculture, Wearable Devices වගේ දේවල් ගන්න පුළුවන්.

MQTT යනු කුමක්ද? (What is MQTT?)

MQTT කියන්නේ ISO Standard එකක් වන, lightweight, publish/subscribe model එක මත පදනම් වූ messaging protocol එකක්. මේක නිර්මාණය කරලා තියෙන්නේ constrained devices සහ low-bandwidth, high-latency networks වලදී වුණත් විශ්වාසනීයව ක්‍රියාත්මක වෙන්න පුළුවන් විදියට. IoT වලදී මේක ගොඩක් ප්‍රයෝජනවත්.

MQTT වල ප්‍රධාන සංරචක තුනක් තියෙනවා:

  • Publisher: මේක තමයි දත්ත යවන device එක (උදා: IoT Sensor එකක්). Publisher එකක් 'topic' එකකට දත්ත 'publish' කරනවා.
  • Subscriber: මේක තමයි දත්ත ලබාගන්න device එක (අපේ Spring Boot Application එක වගේ). Subscriber එකක් තමන්ට අවශ්‍ය 'topic' එකකට 'subscribe' වෙනවා.
  • Broker: මේක තමයි Publisher සහ Subscriber අතර මැදිහත් වෙන්නේ. Publisher එකකින් එවන දත්ත අදාළ Subscriber වලට distribute කරන්නේ Broker එකයි. Mosquitto, HiveMQ, EMQ X වගේ Brokers මේ සඳහා භාවිතා කරනවා.

MQTT මේ model එක නිසා, Publisher සහ Subscriber එකිනෙකා ගැන දැන සිටිය යුතු නැහැ. Broker එකෙන් මේ දත්ත හුවමාරුව manage කරනවා. මේක decoupled architecture එකක් නිසා system එක scalability සහ flexibility වැඩි කරනවා.

Spring Boot Project එකක් ආරම්භ කිරීම

හොඳයි, දැන් අපි Spring Boot Project එකක් හදාගෙන MQTT එක්ක වැඩ කරන්න අවශ්‍ය Dependencies add කරගමු. මේ සඳහා අපිට Spring Initializr භාවිතා කරන්න පුළුවන්. ඒක තමයි පහසුම ක්‍රමය.

අපි මේ Dependencies ටික add කරගමු:

  • Spring Web: REST APIs හදන්න.
  • Spring Integration MQTT: MQTT messaging support එක සඳහා.

ඔයාලා Project එක Generate කරලා, Intellij IDEA වගේ IDE එකකින් Open කරගත්තාට පස්සේ, `pom.xml` file එකේ dependencies මේ වගේ තියෙන්න ඕනේ:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>iot-data-collector</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>iot-data-collector</name>
    <description>Demo project for Spring Boot MQTT IoT Data Collector</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

        <!-- Test dependencies -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

MQTT Broker Configuration

දැන් අපි Project එකේ `src/main/resources/application.properties` file එකේ MQTT Broker එකේ විස්තර configure කරගමු. මේකෙන් අපේ Spring Boot Application එකට Broker එකට සම්බන්ධ වෙන්න පුළුවන්.

සටහන: මේක කරන්න කලින් ඔයාලගේ computer එකේ Mosquitto වගේ MQTT Broker එකක් install කරලා run කරලා තියෙන්න ඕනේ. නැත්නම් මේක වැඩ කරන්නේ නැහැ. Windows වලට Mosquitto install කරන්න මෙතනින් බලන්න පුළුවන්.

# MQTT Broker Configuration
spring.mqtt.url=tcp://localhost:1883
spring.mqtt.clientId=iot-data-collector-app
spring.mqtt.completionTimeout=5000
spring.mqtt.default-qos=1
spring.mqtt.auto-startup=true

# Topics to subscribe to
mqtt.subscription.topic=iot/data/#

# Topic for publishing (optional, for testing)
mqtt.publishing.topic=iot/control
  • `spring.mqtt.url`: MQTT Broker එකේ URL එක. අපේ උදාහරණයට local Mosquitto Broker එකක් (Port 1883) භාවිතා කරනවා.
  • `spring.mqtt.clientId`: මේක MQTT Broker එකේ unique client ID එකක්.
  • `mqtt.subscription.topic`: අපිට Subscribe වෙන්න ඕන topic එක. `iot/data/#` කියන්නේ `iot/data/` වලින් පටන් ගන්න ඕනෑම sub-topic එකකට subscribe වෙනවා කියන එකයි (wildcard).

MQTT Publisher සහ Subscriber Implement කිරීම

දැන් අපි අපේ Spring Boot Application එක MQTT Broker එකට සම්බන්ධ කරලා, දත්ත receive කරන (Subscriber) සහ send කරන (Publisher) components හදමු. අපේ ප්‍රධාන කාර්යය IoT Data Collector එකක් හදන එක නිසා, Subscriber එකට වැඩි අවධානයක් දෙමු.

MQTT Subscriber (IoT Data Collector)

MQTT Subscriber එකක් හදන්න Spring Integration framework එක අපිට ගොඩක් උදව් වෙනවා. අපි `MqttPahoMessageDrivenChannelAdapter` එකක් සහ `@Service` එකක් භාවිතා කරලා incoming messages process කරන හැටි බලමු.

මුලින්ම, MQTT Client Factory එකක් configure කරගමු. මේකෙන් තමයි MQTT Connections හදන්නේ.

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;

@Configuration
public class MqttConfig {

    @Value("${spring.mqtt.url}")
    private String mqttUrl;

    @Value("${spring.mqtt.clientId}")
    private String clientId;

    @Value("${spring.mqtt.completionTimeout}")
    private long completionTimeout;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { mqttUrl });
        options.setCleanSession(true);
        options.setConnectionTimeout(10);
        options.setKeepAliveInterval(20);
        // If you have username/password for your broker, uncomment and set them
        // options.setUserName("your_username");
        // options.setPassword("your_password".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }
}

දැන් අපි MQTT Messages receive කරන්න Service එකක් හදමු.

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.Default="1"
org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@Configuration
@Service
public class MqttSubscriberService {

    @Value("${spring.mqtt.clientId}")
    private String clientId;

    @Value("${mqtt.subscription.topic}")
    private String subscriptionTopic;

    private final List<String> collectedData = Collections.synchronizedList(new ArrayList<>());

    // This method is exposed for the REST Controller to fetch data
    public List<String> getCollectedData() {
        return new ArrayList<>(collectedData);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inboundMqttAdapter(MqttPahoClientFactory mqttClientFactory) {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                clientId + "_inbound", mqttClientFactory, subscriptionTopic);
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new Default='1'
); // Use the default converter
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @ServiceActivator(inputChannel = "mqttInputChannel")
    public void handleMqttMessage(String payload, @org.springframework.integration.mqtt.support.MqttHeaders("mqtt_receivedTopic") String topic) {
        System.out.println("Received message from topic: " + topic + " | Payload: " + payload);
        collectedData.add(topic + ": " + payload); // Storing topic and payload
    }
}

මේ code එකෙන් මොකද වෙන්නේ?

  • `MqttConfig` class එකෙන් `MqttPahoClientFactory` එක configure කරනවා, ඒක Broker එකට connect වෙන්න අවශ්‍ය විස්තර සපයනවා.
  • `MqttSubscriberService` class එක තමයි අපේ ප්‍රධාන Data Collector එක.
  • `mqttInputChannel()` කියන `MessageChannel` එකට තමයි incoming MQTT messages එන්නේ.
  • `inboundMqttAdapter()` Bean එක `MqttPahoMessageDrivenChannelAdapter` එකක් නිර්මාණය කරනවා. මේක තමයි Broker එකෙන් messages receive කරලා `mqttInputChannel()` එකට යවන්නේ.
  • `@ServiceActivator(inputChannel = "mqttInputChannel")` Annotation එකෙන් කියන්නේ `handleMqttMessage` method එකට `mqttInputChannel` එකට එන හැම Message එකක්ම receive කරන්න කියලයි.
  • `handleMqttMessage` method එක ඇතුලේ, අපි receive කරන `payload` එක console එකේ print කරලා, `collectedData` කියන `List` එකට add කරනවා. මෙතනදී අපි simple `ArrayList` එකක් භාවිතා කළත්, real-world application එකකදී database එකකට (MongoDB, PostgreSQL, Cassandra, InfluxDB වගේ) data save කරන්න පුළුවන්.

MQTT Publisher (Testing සඳහා)

අපිට අපේ Subscriber එක test කරන්න MQTT Publisher කෙනෙක් අවශ්‍ය වෙනවා. ඒක IoT Device එකක් වෙන්න පුළුවන්, නැත්නම් Postman වගේ client එකකින් හෝ මේ Spring Boot Application එකෙන්ම හදන Test Publisher කෙනෙක් වෙන්නත් පුළුවන්. අපි මේ Application එකෙන්ම Test Publisher කෙනෙක් හදමු.

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Component;

@Configuration
public class MqttPublisherConfig {

    @Value("${spring.mqtt.clientId}")
    private String clientId;

    @Value("${mqtt.publishing.topic}")
    private String publishingTopic;

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(MqttPahoClientFactory mqttClientFactory) {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_outbound", mqttClientFactory);
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(publishingTopic);
        messageHandler.setDefaultQos(1);
        return messageHandler;
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MqttGateway {
        void sendToMqtt(String payload);
        void sendToMqtt(@org.springframework.integration.mqtt.support.MqttHeaders("mqtt_topic") String topic, String payload);
    }
}

`MqttPublisherConfig` එකට අමතරව, අපිට මේ Gateway එකට messages යවන්න පුළුවන් `RestController` එකක් හදමු.

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MqttTestController {

    private final MqttPublisherConfig.MqttGateway mqttGateway;

    public MqttTestController(MqttPublisherConfig.MqttGateway mqttGateway) {
        this.mqttGateway = mqttGateway;
    }

    @GetMapping("/publish/{topic}/{message}")
    public String publishMessage(@PathVariable String topic, @PathVariable String message) {
        String fullTopic = "iot/data/" + topic; // Ensure it matches the subscription topic pattern
        mqttGateway.sendToMqtt(fullTopic, message);
        return "Message '" + message + "' published to topic '" + fullTopic + "'";
    }
}

දැන් ඔයාලට පුළුවන් `http://localhost:8080/publish/temperature/25.5C` වගේ URL එකකට ගිහින් test message එකක් publish කරන්න. එතකොට ඒ message එක අපේ `MqttSubscriberService` එකෙන් receive කරනවා ද කියලා බලන්න පුළුවන්.

Data Storage සහ API

අපි මේ වන විට MQTT හරහා දත්ත receive කරන ආකාරය ඉගෙන ගත්තා. දැන් අපි මේ receive කරන දත්ත ගබඩා කරලා, ඒවා බලන්න පුළුවන් විදියට REST API එකක් හදමු.

අපේ `MqttSubscriberService` එකේ අපි දැනටමත් `collectedData` කියන `List` එකට data add කරනවා. අපි ඒක භාවිතා කරලා REST API එකක් හදමු.

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

@RestController
@RequestMapping("/api/iot")
public class IotDataController {

    private final MqttSubscriberService mqttSubscriberService;

    public IotDataController(MqttSubscriberService mqttSubscriberService) {
        this.mqttSubscriberService = mqttSubscriberService;
    }

    @GetMapping("/data")
    public List<String> getAllCollectedData() {
        return mqttSubscriberService.getCollectedData();
    }
}

මේ `IotDataController` එකේ `getAllCollectedData()` method එකට `GET` request එකක් එව්වොත්, `mqttSubscriberService` එකෙන් collect කරපු හැම data item එකක්ම List එකක් විදියට ලැබෙනවා.

ඔබගේ Application එක run කරලා, Postman හෝ browser එකකින් `http://localhost:8080/api/iot/data` කියන URL එකට ගිහින් බලන්න. මුලින්ම හිස් List එකක් ලැබෙයි. ඊට පස්සේ `http://localhost:8080/publish/humidity/60%` වගේ URL එකකින් test messages publish කරලා, ආයෙත් `http://localhost:8080/api/iot/data` එකට request එකක් යවලා බලන්න. දැන් ඔබට receive කරපු messages එහි දකින්නට ලැබෙයි.

මේක බොහොම සරල උදාහරණයක්. real-world scenario එකකදී, මේ data items JSON objects විදියට එන්න පුළුවන් (උදා: {"deviceId":"sensor001","temperature":25.5}). එතකොට අපිට පුළුවන් ඒ JSON data එක Java Objects වලට Map කරලා, ඒවා Database එකකට save කරන්න. මේ සඳහා Jackson ObjectMapper වගේ Libraries භාවිතා කරන්න පුළුවන්.

අවසන් වශයෙන්

ඉතින් යාලුවනේ, අද අපි Spring Boot Application එකක් MQTT Broker එකක් සමඟ සම්බන්ධ කරලා, IoT උපාංග වලින් එන දත්ත සාර්ථකව collect කරන හැටි ඉගෙන ගත්තා. මේක IoT Data Collector එකක් හදන්න ඉතාම හොඳ ආරම්භක පියවරක්. අපිට පුළුවන් මේක තව දුරටත් දියුණු කරන්න.

ඔයාලට පුළුවන් මේ Project එක තවදුරටත් develop කරන්න මෙන්න මේ වගේ දේවල් කරන්න:

  • Persistence: දැනට අපි දත්ත store කරන්නේ Application එකේ memory එකේ. ඒ වෙනුවට MongoDB, PostgreSQL, InfluxDB වගේ Database එකකට මේ දත්ත save කරන්න.
  • Error Handling: Message processing වලදී ඇතිවෙන errors handle කරන්න robust mechanism එකක් add කරන්න.
  • Security: MQTT connection එකට SSL/TLS Security add කරන්න. Broker එකේ username/password authentication implement කරන්න.
  • Scalability: වැඩිපුර දත්ත ප්‍රමාණයක් handle කරන්න Application එක scalable කරන්න (Microservices, Kafka වගේ).
  • Data Visualization: Collected data එකට Grafana, Tableau වගේ tools භාවිතා කරලා visualize කරන්න.
  • Real IoT Device: ESP32, Raspberry Pi වගේ Real IoT Device එකක් configure කරලා මේ Broker එකට data යවන්න.

මේ tutorial එක ඔයාලට ප්‍රයෝජනවත් වෙන්න ඇති කියලා හිතනවා. Spring Boot සහ IoT කියන්නේ අනාගතයේදී ගොඩක් ඉල්ලුමක් තියෙන ක්ෂේත්‍ර දෙකක්. මේ වගේ දේවල් ඉගෙන ගන්න එක ඔයාලගේ career එකට ගොඩක් වටිනවා.

ඔයාලගේ අදහස්, මේ Project එක කරගෙන යනකොට මුණගැහුණු ගැටළු හෝ දියුණු කරන්න පුළුවන් දේවල් පහල Comment Section එකේ දාලා යන්න. අපි එකිනෙකාට උදව් කරගෙන ඉගෙන ගනිමු!

ආයුබෝවන්! 👋