Transcript JMS - OCI

Java Messaging Service
• An Abstraction for using Messaging
Oriented Middleware
• Purpose is to provide a sophisticated, yet
straightforward way to exchange messages
between applications.
• A message is any data or event that
applications need to share.
•
•
•
•
Synchronous
Sender and Receiver “know” each other
A method is invoked
Examples
– CORBA
– RMI
– DCOM
• Proper distribution of
messages
• Fault Tolerance
• Load balancing
• Scalability
• Transaction Support
• Asynchronous
Delivery
• De-coupling of the
Sender and Receiver
• Headers
– Automatically Assigned
– Developer Assigned
• Properties
– Application Specific
– JMS Specific
– Provider (Vendor) specific
• Payload
– Data
– Event
• Used by the JMS clients to filter messages
• Boolean logic applied to
– Header values
– Property values
– Based on a subset of the SQL-92 syntax of the
WHERE clause
• Message
– Contains only headers and properties
– Useful for Event notification
• TextMessage
– Java String is the payload
– Useful for XML data
• ObjectMessage
– Serializable Java Obect as payload
• BytesMessage
– Useful for exchanging data in the application’s native
format
– Useful when just providing transport
• StreamMessage
– Payload is made up of Java primitive types (int, double,
long, char, etc)
• MapMessage
– Payload is name-value pair
– The name must be String
– The value can be a String or primitive
• AUTO_ACKNOWLEDGE
– Ack is provided by the Provider
• DUPS_OK_ACKNOWLEDGE
– Ack is provided by the Provider
• CLIENT_ACKNOWLEDGE
– Client explicitly acks each message
• Provides a Java API to access proprietary
and heterogeneous naming services.
• System Admin is responsible for creating,
configuring and binding the Administered
Objects of the Provider
– Destinations: Topics and Queues
– ConnectionFactory: Used by the Client to
create connections to the Message Server.
• Publish/Subscribe
–
–
–
–
Producer may send to multiple Consumers
Accomplished through a virtual channel called a Topic
Push model
Asynchronously or Synchronously
• Point to Point
–
–
–
–
Producer sends to one Consumer
Accomplished through a virtual channel called a Queue
Pull Model
Synchronously or Asynchronously
import javax.jms.*;
private TopicConnection connection = null;
private TopicSession pubSession = null;
private TopicSession subSession = null;
private TopicPublisher publisher = null;
private TopicSubscriber subscriber = null;
private recvWeatherTopic = null;
public class FlightData implements MessageListener {
public FlightData() {
//Set up JNDI properties, etc..
TopicConnectionFactory factory =
(TopicConnectionFactory) jndi.lookup(“TopicConnectionFactory”);
connection = factory.createTopicConnection();
pubSession = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
subSession = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
weatherTopic = (Topic) jndi.lookup(“FlightDataTopic”);
publisher = pubSession.createPublisher(weatherTopic);
recvWeatherTopic = subSession.createTemporaryTopic();
subscriber = subSession.createSubscriber(recvWeatherTopic);
subscriber.setMessengerListener(this);
subscriber.setMessengerListener(this);
connection.start();
} catch (JMSException jmsEx) {
jmsEx.printstackTrace();
} catch (NamingException nameEx) {
nameEx.printStackTrace();
}
}
private publishFightData(String wind, String planeType, String altitude,
String turbulence) {
try {
long time = System.curentTimeMillis();
StringBuffer xmlMsg = new StringBuffer(“<FlightData time=“+time+”>”);
xmlMsg.append(“<planeType>+planeType+</planeType>”);
xmlMsg.append(“<wind>+wind+</wind>”);
xmlMsg.append(“< altitude >+ altitude +</ altitude >”);
xmlMsg.append(“< turbulence >+ turbulence +</ turbulence >”);
xmlMsg.append(“< /FlightData >”);
TextMessage message = pubSession.createTextMessage();
message.setText(xmlMsg.toString());
//Alternate to setting time in the XML Message
message.setStringProperty(“Time”, “”+time);
//For those Subscribers that want to *provide* data
message.setJMSReplyTo(recvWeatherTopic);
publisher.publish(message, DeliveryMode.PERSISTENT,
DEFAULT_PRIORITY, 180000);
} catch (JMSException jmsEx) {
//Do right thing
}
public void onMessage(Message message) {
try {
TextMessage tMessage = (TextMessage) message;
if (tMessage.getStringProperty(“MessageSrc”).equals(“Pilot”)) {
String textFromMsg = tMessage.getText();
//Extract all pertinent Data in XML message
…….
publishFightData(wind, planeType, altitude, turbulence)
} else {
logMsg(“FSS: “+ tMessage.getText()+” has received the “+
“latest Weather info”);
}
} catch (Exception ex) {
//Do right thing
}
}
• Durable Subscriptions
– connection.setClientID(“DurableFSS”)
– subscriber =
session.createDurableSubscriber(recvWeatherTopic, “FSS
Subscription”);
• Temporary Topics
–
–
–
–
Dynamic
Associated to the Connection of the Session that created it.
Active for the life of the Connection.
Guaranteed to be unique across all connections
• Can’t Be Used Together
– Remember, Temporary Topics only good through Connection
life.
• Synchronous Messaging
– Use receive() method on the Session Object
• receive() – waits indefinitely
• receive(long timeout) – will wait that for a specific period
• receiveNoWait() – will return message if it is there, null
otherwise
– Create a TopicRequestor
• Requestor = new TopicRequestor(session, tempTopic);
• Message message = requestor.request(myMessage);
• The implementation differences for the previous
example:
– Need to lookup a Queue
QueueConnectionFactory qFactory =
(QueueConnectionFactory) jndi.lookup(“QueueConnectionFactory”);
– Create a separate Connection
qConnection = qFactory.createQueueConnection();
– Create a Sender/Receiver for the Queue from the Session
qSender = qSession.createSender(recvWeatherQueue);
– Call send()
• qSender.send(messge, DelieveryMode.PERSISTENT,
Message.DEFAULT_PRIORITY, 180000);
• Intended for one-to-one message exchange
• The message exchange really matters to the
producer and consumer.
• The message must only be processed once.
• Load balancing of processing is important.
– Take Advantage of the QueueBrowser
• Used by the System Administrator
• Used by an application to find “best”
message.
– Views the queue, finds the message it wants,
consumes the messages synchronously to get to
the one it wants, and then processes the desired
message.
• Store and Forward
– Messages marked as persistent
– Durable Subscribers
• Autonomous
– Self contained entities
• Acknowledgments
– Discussed earlier, gives
applications/components flexibility
• Transactions
– Local, set the boolean true when creating the Session
– Global, 2-Phase Commit
• Vendors that implement the JTA XA APIs
• Network Failures
– Server must attempt to reconnect
– Client should implement ExceptionListener interface
– Client is responsible for re-establishing the connection
• Dead Message Queue
• As of J2EE 1.3, The JMS API became
integral to the environment
– Included a new enterprise bean, the mesagedriven bean allowing asynchronous message
consumption
– Applications can have messages participate in the
Java Transaction API. (Distributed Transactions)
– Within the container architecture messages may
be consumed concurrently
– The Connector Architecture allows messages to
be a part of the heterogeneous transaction context
that includes Databases and EISs
• In 1.4 release of J2EE will upgrade to
the JMS 1.1 version,
– The API is not specific to the pub/sub or
point-to-point model.
– The JMS provider can be integrated using
a Resource Adapter from the Connector
Architecture
• Consider the obvious:
– Concurrent Users
– Peak Load time
– Hardware available
• Size of message payload
– Very important
• Look at the benefits of the models that best fit the
environment choose vendor best suited
• Look at Send Rate vs. Receive Rate
– When Send Rate far exceeds Receive Rate
• Look at reliability during peak usuage
• Look at reliability during extended usage
• O’Reilly’s “JAVA Messaging Service”
– R. Monson-Haefel, D. Chappell
• java.sun.com
• Jboss: www.jboss.org
• OpenJMS: http://openjms.sourceforge.net