How to Connect to ActiveMQ
1. create jndi properties file
2. load the jndi properties file and read the JMS JNDI connection attributes
3. Instantiate the connection and send a request through a Producer Application
4. Consume messages using Consumer Application.
The Following below application takes an xml and xsd as input, performs validation on a logic and places the messages in a JMS Queue. The consumer application consumes the messages and writes them to a folder location in the path of the given xml leaf nodes.
1. JNDI Properties File
#ActiveMQ Connection Settings
# contextFactory
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
# use the following property to configure the default connector
java.naming.provider.url = vm://localhost
# use the following property to specify the JNDI name the connection factory
# should appear as.
#connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry
connectionFactoryNames = queueConnectionFactory
# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
queue.sai = FOO_BAR
#org.apache.activemq.ActiveMQConnectionFactory=myQueueConnectionFactory
# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
#topic.MyTopic = example.MyTopic
2. JNDI Lookup Java Class
public class JndiLookup {
private static Properties jndiProperties = null;
private static Context ctx = null;
private static Logger LOG = LoggerFactory.getLogger(JndiLookup.class);
private static final JndiLookup jndiLookUp = new JndiLookup();
private JndiLookup() {
}
public static JndiLookup getInstance() {
return jndiLookUp;
}
public void loadProperties() throws Exception {
try {
jndiProperties = new Properties();
LOG.info("Loading properties file");
loadFromResourceBundle();
} catch (final Exception ex) {
throw ex;
}
}
private void loadFromResourceBundle() throws Exception {
String key = null;
String value = null;
ResourceBundle rb = ResourceBundle.getBundle(LoadConstants.jndi_properties);
Enumeration enum1 = rb.getKeys();
while (enum1.hasMoreElements()) {
key = enum1.nextElement().toString();
value = rb.getString(key);
LOG.info("Key: " + key + "Value :" + value);
// LOG.info(value);
jndiProperties.put(key, value);
}
}
public void connectToJNDI() throws javax.naming.NamingException {
// jndiProperties was loaded from PropertiesManagement.properties
ctx = new InitialContext(jndiProperties);
LOG.info("Connected to Provider URL " + ctx.getEnvironment().get(Context.PROVIDER_URL));
LOG.info("Connected to INITIAL CONTEXT Factory " + ctx.getEnvironment().get(Context.INITIAL_CONTEXT_FACTORY));
}
public QueueConnectionFactory lookupQueueConnectionFactory() throws javax.naming.NamingException {
Enumeration enum1=jndiProperties.keys();
while (enum1.hasMoreElements()) {
String key = enum1.nextElement().toString();
String value = (String) jndiProperties.get(key);
LOG.info(" Key: " + key + " Value :" + value);
}
LOG.info("Key in lookupQueueConnectionFactory " + jndiProperties.get("java.naming.factory.initial").toString());
return (QueueConnectionFactory) ctx.lookup("queueConnectionFactory");
}
public Queue lookupQueue() throws javax.naming.NamingException {
return (Queue) ctx.lookup("sai");
}
public String getUserName() throws javax.naming.NamingException{
return ctx.lookup("username").toString();
}
public String getPassword() throws javax.naming.NamingException{
return ctx.lookup("password").toString();
}
}
3. ProducerImpl.java
public class ProducerImpl {
private static Logger LOG = LoggerFactory.getLogger(ProducerImpl.class);
private static QueueConnectionFactory queueConnectionFactory = null;
private static QueueSession queueSession = null;
private static QueueConnection queueConnection = null;
private static MessageProducer producer = null;
private static Queue queue;
private static TextMessage message;
private static File xsdFile;
private static File xmlFile;
/**
* Default Constructor
*/
public ProducerImpl() {
DOMConfigurator.configure(LoadConstants.SLF4JPATH);
xsdFile = new File(ProducerImpl.class.getResource(LoadConstants.INPUT_XSD).getFile());
xmlFile = new File(ProducerImpl.class.getResource(LoadConstants.INPUT_XML).getFile());
}
public static void main(String[] args) throws Throwable {
// TODO Auto-generated method stub
// send message
ProducerImpl producerImpl = new ProducerImpl();
producerImpl.produceMessages();
}
/**
* This method will send messages to Queue only if given xmlfile is valid.
* else it logs the invalid xml message and closes the queue connection.
*
* @throws Throwable
*/
public void produceMessages() throws Throwable {
try {
if (DotPathUtil.getNewInstance().validate(xmlFile, xsdFile)) {
String xmlString = DotPathUtil.convertXMLFileToString(xmlFile);
// Get the Connection using JNDI
getConnection();
// create queue session using JNDI
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
// get queue name using JNDI lookup
queue = JndiLookup.getInstance().lookupQueue();
LOG.info("sending valid xml message.....");
// create the producer
producer = queueSession.createProducer(queue);
// create the message
message = queueSession.createTextMessage(xmlString);
// send the message
producer.send(message);
LOG.info("valid xml message sent.....");
} else {
// else log the invalid message
LOG.info("Logging the Invalid Message");
String xmlString = DotPathUtil.convertXMLFileToString(xmlFile);
// print the invalid message
LOG.info(xmlString);
}
xmlFile = null;
xsdFile = null;
LOG.info("Program Ended");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (queueSession != null && queueConnection != null) {
queueSession.close();
queueConnection.close();
}
}
}
/**
* This method creates the connection using the JNDI lookup.
*
* @throws Throwable
*/
public static void getConnection() throws Throwable {
try {
JndiLookup.getInstance().loadProperties();
JndiLookup.getInstance().connectToJNDI();
queueConnectionFactory = JndiLookup.getInstance().lookupQueueConnectionFactory();
//queueConnection = queueConnectionFactory.createQueueConnection(JndiLookup.getInstance().getPassword(),JndiLookup.getInstance().getPassword());
queueConnection = queueConnectionFactory.createQueueConnection();
} catch (final Throwable ex) {
throw ex;
}
}
}
3. ConsumerApplication
public class ConsumerImpl{
private static Logger LOG = LoggerFactory.getLogger(ConsumerImpl.class);
private static QueueReceiver queueReceiver = null;
private static QueueSession queueSession = null;
private static QueueConnection queueConnection = null;
private static QueueConnectionFactory queueConnectionFactory = null;
private static TextMessage message=null;
private static File xsdFile=null;
public ConsumerImpl() {
DOMConfigurator.configure(LoadConstants.SLF4JPATH);
xsdFile = new File(ConsumerImpl.class.getResource(LoadConstants.INPUT_XSD).getFile());
}
/**
* This method consumes the
*
* @throws JMSException
*/
public void consumeMessage() throws JMSException {
try {
JndiLookup.getInstance().loadProperties();
JndiLookup.getInstance().connectToJNDI();
queueConnectionFactory = JndiLookup.getInstance().lookupQueueConnectionFactory();
queueConnection = queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queueReceiver = queueSession.createReceiver(JndiLookup.getInstance().lookupQueue());
LOG.info("QUEUE Obtained" + queueReceiver.getQueue().getQueueName());
queueConnection.start();
while (true) {
Message m = queueReceiver.receive(1);
if (m != null) {
if (m instanceof TextMessage) {
message = (TextMessage) m;
LOG.info("Consuming message: " + message.getText());
if (DotPathUtil.getNewInstance().validate(message.getText(), xsdFile))
DotPathUtil.createFile(message.getText(), xsdFile);
} else {
break;
}
}
}
xsdFile = null;
LOG.info("Program Ended");
} catch (Exception e) {
e.printStackTrace();
} finally {
queueReceiver.close();
queueSession.close();
queueConnection.close();
}
}
}
4. Utility Java DotPath Class which performs validation of given XML. This java class is common utility class which performs validation for placing the messages in the queue and also peforming XML validation after consuming messsages from the queue.
public class DotPathUtil implements DotPath {
private static Logger LOG = LoggerFactory.getLogger(DotPathUtil.class);
private static String root = System.getProperty("user.dir") + "\\";
private static final DotPathUtil dotPathUtil = new DotPathUtil();
private static StringBuilder sb = new StringBuilder();
private static ArrayList<StringBuilder> al = new ArrayList<StringBuilder>();
private DotPathUtil() {
}
public static void createFile(String str, File xsdFile) throws FileNotFoundException, XPathExpressionException, SAXException, IOException, ParserConfigurationException, JMSException {
boolean createdDir = false;
FileOutputStream fop = null;
try {
getPaths(str);
LOG.info("reading the xml: " + str);
Format timestamp = new SimpleDateFormat("yyyy-MM-dd-HH.mm.ss.Z");
for (int i = 0; i < al.size(); i++) {
File directory = new File(al.get(i).toString());
if (!directory.exists()) {
createdDir = directory.mkdirs();
if (createdDir)
LOG.info("Directory created " + directory.getAbsolutePath());
LOG.info("Creating File in newly created directory");
// fw = new FileWriter(directory + "\\"+
// timestamp.format(new Date()) + ".txt");
fop = new FileOutputStream(directory + "\\" + timestamp.format(new Date()) + ".txt");
byte[] stringContentInBytes = str.getBytes();
/*
* br = new BufferedReader(new FileReader(str)); String inp;
* while ((inp = br.readLine()) != null)
*/
// fw.write(stringContentInBytes + "\n");
fop.write(stringContentInBytes);
fop.flush();
fop.close();
LOG.info("File Written successfully at: " + directory.getAbsolutePath());
} else {
LOG.info("Creating File in already existing directory");
fop = new FileOutputStream(directory + "\\" + timestamp.format(new Date()) + ".txt");
byte[] stringContentInBytes = str.getBytes();
fop.write(stringContentInBytes);
fop.flush();
fop.close();
LOG.info("File Written successfully at: " + directory.getAbsolutePath());
}
}
LOG.info("Files Written successfully");
//sb=new StringBuilder();
} catch (SAXException | ParserConfigurationException | IOException | JMSException e) {
e.printStackTrace();
} finally {
if(fop!=null)
fop.close();
}
}
public static InputStream getInputStream(String str) throws JMSException {
InputStream inputStream = new ByteArrayInputStream(str.getBytes());
return inputStream;
}
public static void getPaths(String str) throws FileNotFoundException, SAXException, IOException, ParserConfigurationException, JMSException {
// StringBuilder sb = new StringBuilder();
Document document = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(getInputStream(str));
// LOG.info(document.getDocumentElement());
if(sb!=null)
sb=new StringBuilder();
getChildPath(document.getChildNodes());
// return sb.toString();
}
public static void getChildPath(NodeList nodeList){
//for(int j=0;j<nodeList.getLength();j++)
for (int i = 0; i < nodeList.getLength(); i++) {
Node currentNode = nodeList.item(i);
System.out.println("CurrentNode name: "+ currentNode);
if (currentNode.getNodeType() == Node.ELEMENT_NODE) {
if (currentNode.getNodeName().trim().equalsIgnoreCase("root"))
{
System.out.println(sb);
sb.append(root);
}
else{
if(!currentNode.hasChildNodes()){
StringBuilder sb1= new StringBuilder();
al.add(sb1.append(sb+ currentNode.getNodeName() +"\\") );
System.out.println("In Leaf Node " +sb1);
}
else{
sb.append(currentNode.getNodeName() + "\\");
System.out.println(sb);
}
}
getChildPath(currentNode.getChildNodes());
}
}
}
public static String getPath(String str) throws FileNotFoundException, SAXException, IOException, ParserConfigurationException, XPathExpressionException, JMSException {
StringBuilder sb = new StringBuilder();
try {
LOG.info("in getPath >" + str + "<");
Document document = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(getInputStream(str));
NodeList nl = document.getChildNodes();
LOG.info("Childs :" + nl.getLength());
for (int i = 0; i < nl.getLength(); i++) {
Node n = nl.item(i);
if (n.getNodeType() == Node.ELEMENT_NODE) {
if (n.getNodeName().trim().equalsIgnoreCase("root"))
sb.append(root + "\\");
else
sb.append(n.getNodeName() + "\\");
nl = n.getChildNodes();
i = -1;
}
}
// return sb.toString();
} catch (SAXException | ParserConfigurationException | IOException | JMSException e) {
e.printStackTrace();
}
return sb.toString();
}
@Override
public boolean validate(String str, File xsdFile) {
boolean isValid = false;
try {
// Create source objects for XML and XSD files
Source xmlSource = new StreamSource(DotPathUtil.getInputStream(str));
Source xsdSource = new StreamSource(xsdFile);
// Create SchemaFactory using XSD file
SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
Schema schema = factory.newSchema(xsdSource);
// Validator will use xsd file for validating the XML file
Validator validator = schema.newValidator();
// Validate the DOM tree.
validator.validate(xmlSource);
isValid = true;
} catch (Exception e) {
e.printStackTrace();
}
if (isValid == true)
LOG.info("Document is valid!");
else
LOG.info("Document is NOT valid!");
return isValid;
}
@Override
public boolean validate(File xmlFile, File xsdFile) {
// TODO Auto-generated method stub
boolean isValid = false;
try {
// Create source objects for XML and XSD files
Source xmlSource = new StreamSource(xmlFile);
Source xsdSource = new StreamSource(xsdFile);
// Create SchemaFactory using XSD file
SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
Schema schema = factory.newSchema(xsdSource);
// Validator will use xsd file for validating the XML file
Validator validator = schema.newValidator();
// Validate the DOM tree.
validator.validate(xmlSource);
isValid = true;
} catch (Exception e) {
e.printStackTrace();
}
if (isValid == true)
LOG.info("Document is valid!");
else
LOG.info("Document is NOT valid!");
return isValid;
}
public static String convertXMLFileToString(File xmlFile) throws SAXException, ParserConfigurationException, TransformerFactoryConfigurationError, TransformerException, IOException {
try {
Document document = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(new FileInputStream(xmlFile));
StringWriter stw = new StringWriter();
Transformer serializer = TransformerFactory.newInstance().newTransformer();
serializer.transform(new DOMSource(document), new StreamResult(stw));
return stw.toString();
} catch (SAXException | ParserConfigurationException | TransformerFactoryConfigurationError | IOException e) {
e.printStackTrace();
}
return null;
}
public static DotPathUtil getNewInstance() {
return dotPathUtil;
}
}