位置:首页 > Java技术 > EJB > EJB消息驱动Bean

EJB消息驱动Bean

一个消息驱动bean是一种类型的企业Bean,这是由EJB容器调用,当它接收到一个消息队列或主题。消息驱动bean是一个无状态的bean是用来做异步任务。

为了演示如何使用消息驱动bean,我们将利用EJB持久章节的内容,我们要做好以下几项工作。

  • Step 1. 在数据库中创建表(请参阅EJB持久章节)。

  • Step 2. 创建实体类对应的表 (请参阅EJB持久章节).

  • Step 3. 创建数据源和持久性单元 (请参阅EJB持久章节).

  • Step 4. 创建一个无状态EJB EntityManager实例 (请参阅EJB持久章节).

  • Step 5. 更新无状态EJB。添加添加记录的方法,并通过实体管理器,从数据库中获取记录 (请参阅EJB持久章节).

  • Step 6. 创建队列名为 BookQueue 在JBoss default 应用目录.

  • Step 7. 一个基于控制台应用程序客户端发送消息到这个队列

  • Step 8. 创建消息驱动bean将使用无状态的bean持久化客户数据。

  • Step 9. JBoss的EJB容器将调用上面的消息驱动bean,并把它传递的消息将要发送给客户端。

创建队列

创建一个文件名为jbossmq目的地service.xml中,如果不存在 <JBoss Installation Folder> > server > default > deploy 目录.

在这里,我们创建名为BookQueue一个队列

jbossmq-destinations-service.xml

<mbean code="org.jboss.mq.server.jmx.Queue"  
   name="jboss.mq.destination:service=Queue,name=BookQueue">  
   <depends optional-attribute-name="DestinationManager">
      jboss.mq:service=DestinationManager
   </depends>  
</mbean>  

当你启动JBoss,你会看到类似的内容在JBoss日志

...
10:37:06,167 INFO  [QueueService] Queue[/queue/BookQueue] started, fullSize=200000, pageSize=2000, downCacheSize=2000
...

 

创建消息驱动bean

@MessageDriven(
   name = "BookMessageHandler",
   activationConfig = {
      @ActivationConfigProperty( propertyName = "destinationType", 
                                 propertyValue = "javax.jms.Queue"),
      @ActivationConfigProperty( propertyName = "destination", 
                                 propertyValue ="/queue/BookQueue")
   }
)
public class LibraryMessageBean implements MessageListener {
 
   @Resource
   private MessageDrivenContext mdctx;  
 
   @EJB
   LibraryPersistentBeanRemote libraryBean;
 
   public LibraryMessageBean(){        
   }
 
   public void onMessage(Message message) {
   }
}
  • LibraryMessageBean annoatated@MessageDriven注解,把它标记为消息驱动bean。 

  • Its properties are defined as destinationType - Queue and destination - /queue/BookQueue.

  • It implements MessageListener interface which exposes onMessage method.

  • It has MessgeDrivenContext as resource.

  • LibraryPersistentBeanRemote stateless bean is injected in this bean for persistence purpose.

构建EjbComponent项目,并将其部署在JBoss上。构建和部署EJB模块后,我们需要一个客户端发送一个消息到JBoss队列。

示例应用程序

让我们创建一个测试EJB应用程序来测试消息驱动bean。

Step Description
1 Create a project with a name EjbComponent under a package com.tutorialspoint.entity as explained in the EJB - Create Application chapter. You can also use the project created in EJB - Create Application chapter as such for this chapter to understand ejb persistence concepts.
2 Create Book.java under package com.tutorialspoint.entity as created in EJB-Persistencechapter
3 Create LibraryPersistentBean.java and LibraryPersistentBeanRemote as created in EJB-Persistence chapter
4 Create jboss-ds.xml in EjbComponent > setup folder and persistence.xml in EjbComponent > src > conf folder. These folder can be seen in files tab in Netbeans as created in EJB-Persistence chapter
5 Create LibraryMessageBean.java under a package com.tutorialspoint.messagebean and modify it as shown below.
6 Create BookQueue queue in Jboss as described above.
7 Clean and Build the application to make sure business logic is working as per the requirements.
8 Finally, deploy the application in the form of jar file on JBoss Application Server. JBoss Application server will get started automatically if it is not started yet.
9 Now create the ejb client, a console based application in the same way as explained in theEJB - Create Application chapter under topic Create Client to access EJB. Modify it as shown below.

EJBComponent (EJB Module)

LibraryMessageBean.java

package com.tuturialspoint.messagebean;
 
import com.tutorialspoint.entity.Book;
import com.tutorialspoint.stateless.LibraryPersistentBeanRemote;
import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.EJB;
import javax.ejb.MessageDriven;
import javax.ejb.MessageDrivenContext;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
 
@MessageDriven(
   name = "BookMessageHandler",
   activationConfig = {
      @ActivationConfigProperty( propertyName = "destinationType", 
                                 propertyValue = "javax.jms.Queue"),
      @ActivationConfigProperty( propertyName = "destination", 
                                 propertyValue ="/queue/BookQueue")
   }
)
public class LibraryMessageBean implements MessageListener {
 
   @Resource
   private MessageDrivenContext mdctx;  
 
   @EJB
   LibraryPersistentBeanRemote libraryBean;
 
   public LibraryMessageBean(){        
   }
 
   public void onMessage(Message message) {
      ObjectMessage objectMessage = null;
      try {
         objectMessage = (ObjectMessage) message;
         Book book = (Book) objectMessage.getObject(); 
         libraryBean.addBook(book);
 
      } catch (JMSException ex) {
         mdctx.setRollbackOnly();
      }       
   }   
}

EJBTester (EJB Client)

EJBTester.java

package com.tutorialspoint.test;
   
import com.tutorialspoint.entity.Book;
import com.tutorialspoint.stateless.LibraryPersistentBeanRemote;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Properties;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.naming.InitialContext;
import javax.naming.NamingException;
 
public class EJBTester {
 
   BufferedReader brConsoleReader = null; 
   Properties props;
   InitialContext ctx;
   {
      props = new Properties();
      try {
         props.load(new FileInputStream("jndi.properties"));
      } catch (IOException ex) {
         ex.printStackTrace();
      }
      try {
         ctx = new InitialContext(props);            
      } catch (NamingException ex) {
         ex.printStackTrace();
      }
      brConsoleReader = 
      new BufferedReader(new InputStreamReader(System.in));
   }
   
   public static void main(String[] args) {
 
      EJBTester ejbTester = new EJBTester();
 
      ejbTester.testMessageBeanEjb();
   }
   
   private void showGUI(){
      System.out.println("**********************");
      System.out.println("Welcome to Book Store");
      System.out.println("**********************");
      System.out.print("Options 
1. Add Book
2. Exit 
Enter Choice: ");
   }
   
   private void testMessageBeanEjb(){
 
      try {
         int choice = 1; 
         Queue queue = (Queue) ctx.lookup("/queue/BookQueue");
         QueueConnectionFactory factory =
         (QueueConnectionFactory) ctx.lookup("ConnectionFactory");
         QueueConnection connection =  factory.createQueueConnection();
         QueueSession session = 
         connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
         QueueSender sender = session.createSender(queue);
 
         while (choice != 2) {
            String bookName;
            showGUI();
            String strChoice = brConsoleReader.readLine();
            choice = Integer.parseInt(strChoice);
            if (choice == 1) {
               System.out.print("Enter book name: ");
               bookName = brConsoleReader.readLine();
               Book book = new Book();
               book.setName(bookName);
               ObjectMessage objectMessage = 
                  session.createObjectMessage(book);
               sender.send(objectMessage); 
            } else if (choice == 2) {
               break;
            }
         }
 
         LibraryPersistentBeanRemote libraryBean = 
         (LibraryPersistentBeanRemote)
         ctx.lookup("LibraryPersistentBean/remote");
 
         List<Book> booksList = libraryBean.getBooks();
 
         System.out.println("Book(s) entered so far: " + booksList.size());
         int i = 0;
         for (Book book:booksList) {
            System.out.println((i+1)+". " + book.getName());
            i++;
         }           
      } catch (Exception e) {
         System.out.println(e.getMessage());
         e.printStackTrace();
      }finally {
         try {
            if(brConsoleReader !=null){
               brConsoleReader.close();
            }
         } catch (IOException ex) {
            System.out.println(ex.getMessage());
         }
      }
   }   
}

EJBTester做以下任务。

  • jndi.properties中加载和初始化的InitialContext对象。

  • In testStatefulEjb() method, jndi lookup is done with name - "/queue/BookQueue" to obtain treference of queue available in Jboss. Then sender is created using queue session.

  • Then user is shown a library store User Interface and he/she is asked to enter choice.

  • If user enters 1, system asks for book name and sender sends the book name to queue. When JBoss container receives this message in queue, it calls our message driven bean's onMessage method. Our message driven bean then saves book using stateful session bean addBook() method. Session Bean is persisting the book in database via EntityManager call.

  • If user enters 2, then another jndi lookup is done with name - "LibraryStatefulSessionBean/remote" to obtain the remote business object (stateful ejb) again and listing of books is done.

运行客户端访问EJB

Locate EJBTester.java in project explorer. Right click on EJBTester class and select run file.

Verify the following output in Netbeans console.

run:
**********************
Welcome to Book Store
**********************
Options 
1. Add Book
2. Exit 
Enter Choice: 1
Enter book name: Learn EJB
**********************
Welcome to Book Store
**********************
Options 
1. Add Book
2. Exit 
Enter Choice: 2
Book(s) entered so far: 2
1. learn java
1. learn EJB
BUILD SUCCESSFUL (total time: 15 seconds)

上面显示的输出状态,我们的消息驱动bean接收消息和存储在持久性存储的书和书籍通过数据库检索。