Інтеграція Java JMS черги повідомлень з PHP STOMP чергою

Опис задачі

Комунікація PHP<->JAVA через черги повідомлень.

Все частіше приходиться зустрічатись із ситуаціями, коли необхідно інтегрувати різні платформи; наприклад, PHP та Java.

Веб сервіси надають можливість взаємодіяти між платформами. Але така взаємодія є досить обмеженою, оскільки інтегруються тільки сервіси, які можуть викликати методи один одного синхронним чи асинхронним способом. Такі речі, як черги повідомлень (message queues), до останнього часу були досить складними для інтеграції. Черги повідомлень все більше проникають зі світу корпоративних проектів у світ веб проектів, де вони надають досить елегантний шлях для покращення характеристики масштабування проектів. Даний туторіал демонструє як працювати з JMS чергами повідомлень в Java та PHP.

Опис технологій

JMS

Java Message Services (JMS) це API, частина специфікацій Java Enterprise Edition, що описує механізми обміну повідомленнями між двома чи більше клієнтами. JMS існує досить довго, є перевіреним часом та використовується в багатьох проектах. З більш відоміших можна назвати LinkedIn. Клієнти взаємодіють за допомогою брокерів повідомлень (message broker), наприклад, таких як ActiveMQ, які беруть на себе відповідальність за координацію та доставку повідомлень.

Хоча прості сценарії використання реалізуються відносно легко, JMS API є досить складне, та тісно пов'язане з Java. Цей факт ускладнив використання JMS поза межами Java платформи.

STOMP

Streaming Text Oriented Protocol (STOMP) це полегшений текстовий протокол обміну повідомленнями. Головною ідеєю протоколу є простота, яка має полегшити використання та створення клієнтів для різних платформ і мов програмування. Завдячуючи цьому, протокол є відносно поширеним серед PHP, Python та інших платформ.

Існує багато брокерів які підтримують STOMP. ActiveMQ є також досить популярним для цього протоколу.

ActiveMQ

ActiveMQ це брокер повідомлень з відкритим кодом, який підтримує багато протоколів та механізмів взаємодії з чергами повідомлень, також на даний момент є дуже популярним серед розробників. Для нашого випадку він цікавий тим, що дозволяє використовувати та поєднувати STOMP та JMS черги повідомлень. Іншими словами, ActiveMQ надає можливість прозоро взаємодіяти з JMS чергами в PHP за допомогою STOMP.

Реалізація

Для цього проекту необхідні наступні речі:

Java 5+ ( http://java.sun.com/javase/downloads/index.jsp)

PHP 5 ( http://www.php.net/downloads.php)

Maven 2+ ( http://maven.apache.org/release-notes.html)

PHP STOMP Client ( http://stomp.codehaus.org/PHP)

Проект складається з 5 частин:

Брокера повідомлень (message broker), JMS продюсера (producer), споживача (consumer) на Java, STOMP продюсера та споживача на PHP.

Брокер можна завантажити з офіційногого сайту, або використати як додаток (plugin) до Maven проекту. Другий випадок є більш гнучкішим, так як треба менше розбиратись з конфігурацією.

Щоб створити новий модуль слід виконати наступну команду:

mvn --batch-mode archetype:generate -DarchetypeArtifactId=maven-archetype-quickstart -DgroupId=com.rozrobka -DartifactId=broker -Dversion=0.1

В новоствореній папці broker, з описом нового модуля, слід відредагувати pom.xml, вставивши наступний текст:

<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.rozrobka</groupId>
  <artifactId>broker</artifactId>
  <packaging>jar</packaging>
  <version>0.1</version>
  <name>broker</name>
  <url>http://maven.apache.org</url>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.activemq.tooling</groupId>
        <artifactId>maven-activemq-plugin</artifactId>
        <version>5.2.0</version>
        <configuration>
          <configUri>xbean:file:./conf/activemq.xml</configUri>
          <fork>false</fork>
          <systemProperties>
            <property>
              <name>javax.net.ssl.keyStorePassword</name>
              <value>password</value>
            </property>
            <property>
              <name>org.apache.activemq.default.directory.prefix</name>
              <value>./target/</value>
            </property>
          </systemProperties>
        </configuration>
        <dependencies>
          <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring</artifactId>
            <version>2.5.6</version>
          </dependency>
          <dependency>
            <groupId>org.mortbay.jetty</groupId>
            <artifactId>jetty-xbean</artifactId>
            <version>6.1.11</version>
          </dependency>
          <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-activemq</artifactId>
            <version>1.1.0</version>
          </dependency>
        </dependencies>
      </plugin>
    </plugins>
  </build>
</project>

Таким описом ми використаємо ActiveMQ додаток та вкажемо всі необхідні залежності.

Так як ми будемо використовувати нестандартну конфігурацію, необхідно її описати.

Файл pom.xml вже містить звернення до activemq.xml файлу. Для опису конфігурації слід створити папку «conf» та додати туди файл «activemq.xml» з наступним вмістом:

<?xml version="1.0"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core
  http://activemq.apache.org/schema/core/activemq-core.xsd">
  <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="./data">
    <transportConnectors>
      <transportConnector name="openwire" uri="tcp://localhost:61616"/>
      <transportConnector name="stomp" uri="stomp://localhost:61613"/>
    </transportConnectors>
  </broker>
</beans>

Ця конфігурація описує один брокер який буде слухати порти 61616 та 61613.

Щоб перевірити чи все успішно створено слід запустити ActiveMQ:

mvn org.apache.activemq.tooling:maven-activemq-plugin:5.2.0:run

Або:

mvn activemq:run

Якщо все було коректно створено та налаштовано, ActiveMQ запуститься.

Java JMS Cпоживач

Щоб створити модуль споживача слід виконати наступну команду в консолі:

mvn --batch-mode archetype:generate -DarchetypeArtifactId=maven-archetype-quickstart -DgroupId=com.rozrobka -DartifactId=consumer -Dversion=0.1

В створеній папці consumer слід поміняти вміст файлу «pom.xml» на наступний:

<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
  http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.rozrobka</groupId>
  <artifactId>consumer</artifactId>
  <packaging>jar</packaging>
  <version>0.1</version>
  <name>consumer</name>
  <url>http://maven.apache.org</url>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>javax.jms</groupId>
      <artifactId>jms</artifactId>
      <version>1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.activemq</groupId>
     <artifactId>activemq-core</artifactId>
      <version>5.2.0</version>
    </dependency>
  </dependencies>
  <repositories>
    <repository>
      <id>jboss</id>
      <url>http://repository.jboss.com/maven2</url>
      <releases>
      </releases>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </repository>
  </repositories>
</project>

Далі слід поміняти вміст класу App з папки «src/main/java/com/rozrobka» на наступний:

package com.rozrobka;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class App implements MessageListener {
    public final static String brokerURL = "tcp://localhost:61616";

    private ConnectionFactory factory;
    private Connection connection;
    private Session session;
    private MessageConsumer consumer;

    public static void main(String[] args) {
        App app = new App();
        app.run();
    }

    public void run() {
        try {
            System.out.println("Consumer is starting...");
            ConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("articles");
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(this);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void onMessage(Message message) {
        try {
            if (message instanceof TextMessage) {
                TextMessage txtMessage = (TextMessage)message;
                System.out.println("Recieved: " + txtMessage.getText());
            } else {
                System.out.println("Invalid message received.");
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

Проаналізуємо код.

Підєднання до брокера повідомлень:

ConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
    connection = factory.createConnection();
    connection.start();

Реєстрація в черзі «articles» та приєднання обробника подій:

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("articles");
    consumer = session.createConsumer(destination);
    consumer.setMessageListener(this);

Клас App реалізує інтерфейс MessageListener, що надає йому можливість обробляти події в методі onMessage. В цьому методі кожне повідмлення типу TextMessage буде виведене на екран.

Існують різні топології(схеми) відсилання повідомлень. В JMS підтримуються «черги» (queue) та «теми» (topic). Коли використовується черга, одне повідомлення буде надіслане будь-якому споживачеві черги. Споживач вибирається довільним чином, або згідно з правилами. Коли використовується тема, кожне повідомлення буде надсилатись всім споживачам. В нашому випадку використовується черга.

Споживач — готовий. Щоб його запустити слід виконати наступну команду в його ж папці:

mvn clean compile exec:java -Dexec.mainClass=com.rozrobka.App

Споживача можна залишити в такому стані, в якому він чекав би на повідомлення від продюсера. Щоб зупинити, слід натиснути Ctrl+C.

Java JMS Продюсер

Для створeння модуля продюсера необіхідно виконати наступну команду:

mvn --batch-mode archetype:generate -DarchetypeArtifactId=maven-archetype-quickstart -DgroupId=com.rozrobka -DartifactId=producer -Dversion=0.1

В папці producer файл «pom.xml» повинен мати такий самий вміст що і «pom.xml» з папки consumer, за вийнятком наступного:

Текст:

....
  <artifactId>consumer</artifactId>
...
  <name>consumer</name>
...

Повинен бути замінений на:

....
  <artifactId>producer</artifactId>
...
  <name>producer</name>
...

Далі слід поміняти контекст класу App з папки «src/main/java/com/rozrobka» на наступний:

package com.rozrobka;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class App {
    public static String brokerURL = "tcp://localhost:61616";
    private ConnectionFactory factory;
    private Connection connection;
    private Session session;
    private MessageProducer producer;

    public static void main( String[] args ) {
        App app = new App();
        app.run();
    }

    public void run() {
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
            connection = factory.createConnection();
            connection.start();

            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("articles");
            producer = session.createProducer(destination);

            for (int i = 0; i < 5; i++) {
                System.out.println("Sending message " + i);
                Message message = session.createTextMessage("Hello from Java!");
                producer.send(message);
            }

            if (connection != null) {
                connection.close();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

Проаналізуємо код.

Приєднання до брокера повідомлень:

ConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
    connection = factory.createConnection();
    connection.start();

Реєстрація в черзі «articles» та створення продюсера:

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("articles");
    producer = session.createProducer(destination);

Надсилання повідомлення:

Message message = session.createTextMessage("Hello from Java!");

Споживач — готовий. Щоб його запустити слід виконати наступну команду в його ж папці:

mvn clean compile exec:java -Dexec.mainClass=com.rozrobka.App

В cпоживачеві повинні з'явитись 5 повідомлень.

PHP STOMP Cпоживач

В папці проекту слід створити окрему папку php, а в ній — новий файл consumer.php з наступним вмістом:

<?php
set_include_path(get_include_path() . PATH_SEPARATOR . dirname( __FILE__ ) . '/Stomp/');
require_once('Stomp/Stomp.php');
$conn = new Stomp("tcp://localhost:61613");
echo "Starting...";
$conn->connect();
echo "Done!\n";
$conn->subscribe('/queue/articles');

while (1) {
  if (($msg = $conn->readFrame()) !== false) {
    echo (string)$msg;
    $conn->ack($msg);
  }
}
?>

Та скопіювати в папку STOMP клієнт. Щоб запустити PHP споживач слід виконати наступну команду:

php consumer.php

Якщо все запустилось без помилок, то можна ще раз запустити Java продюсера. В PHP споживачі повинні появитись повідомлення.

PHP STOMP Продюсер

У папці php слід створити файл producer.php з наступним вмістом:

<?php
set_include_path(get_include_path() . PATH_SEPARATOR . dirname( __FILE__ ) . '/Stomp/');
require_once('Stomp/Stomp.php');
$conn = new Stomp("tcp://localhost:61613");
echo "Starting...";
$conn->connect();
echo "Done!\n";

for ($i = 0; $i < 5; $i++) {
    echo "Sending message ".$i."\n";
    $conn->send("/queue/articles", "Hello from PHP!");
}
?>

Для запуску PHP продюсера необхідно виконати наступну команду:

php producer.php

Тепер є 2 продюсера та 2 споживача з якими можна експериментувати. :)

Код також можна завантажити з мого сайту.

Коментарі 2

aleks_raiden - 08 червня 2009, 16:04

эх, якби не стильки конфигурування просто для запуску… а може написати про MemcacheQ — теж можна використати для мижплатформового звязку

zenyk - 08 червня 2009, 17:08

Насправді там з конфігурації є тільки conf/activemq.xml, в якому декілька стрічок коду.

Коду дійсно не мало, але він повноцінний. Плюс був використаний Maven, який в любому випадку використовується на більшості, якщо не всіх комерційних Java проектах

MemcacheQ — виглядає цікаво. Хоча, якщо я правильно зрозумів, використана модель повідомлень є PULL, а не PUSH (типу JMS/AMQP та інших). Тобто у великих системах (50+ серваків) воно буде не ефективним для типових сценаріїв.

Коротко кажучи, треба ще подивитись :)

Коментувати
© 2009 - 2020, Розробка - соціальна ІТ спільнота.
Контакти: info@rozrobka.com
Правила користування