Wednesday, July 24, 2013

Java Standalone JMS-JNDI Application

How to Connect to ActiveMQ
1. create jndi properties file
2. load the jndi properties file and read the JMS JNDI connection attributes
3. Instantiate the connection and send a request through a Producer Application
4. Consume messages using Consumer Application.

The Following below application takes an xml and xsd as input, performs validation on a logic and places the messages in a JMS Queue. The consumer application consumes the messages and writes them to a folder location in the path of the given xml leaf nodes.

1. JNDI Properties File
#ActiveMQ Connection Settings
# contextFactory
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory

# use the following property to configure the default connector
java.naming.provider.url = vm://localhost

# use the following property to specify the JNDI name the connection factory
# should appear as. 
#connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry
connectionFactoryNames = queueConnectionFactory
# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
queue.sai = FOO_BAR

#org.apache.activemq.ActiveMQConnectionFactory=myQueueConnectionFactory
# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
#topic.MyTopic = example.MyTopic


2. JNDI Lookup Java Class

public class JndiLookup {

    private static Properties jndiProperties = null;
    private static Context ctx = null;
    private static Logger LOG = LoggerFactory.getLogger(JndiLookup.class);
    private static final JndiLookup jndiLookUp = new JndiLookup();

    private JndiLookup() {
        
    }

    public static JndiLookup getInstance() {
        return jndiLookUp;
    }
       
    public void loadProperties() throws Exception {
        try {
            jndiProperties = new Properties();
            LOG.info("Loading properties file");
            loadFromResourceBundle();
        } catch (final Exception ex) {
            throw ex;
        }
    }

    private void loadFromResourceBundle() throws Exception {
        String key = null;
        String value = null;
        ResourceBundle rb = ResourceBundle.getBundle(LoadConstants.jndi_properties);

        Enumeration enum1 = rb.getKeys();
        while (enum1.hasMoreElements()) {
            key = enum1.nextElement().toString();
            value = rb.getString(key);
            LOG.info("Key: " + key + "Value :" + value);
            // LOG.info(value);
            jndiProperties.put(key, value);
        }
    }

    public void connectToJNDI() throws javax.naming.NamingException {

        // jndiProperties was loaded from PropertiesManagement.properties
        ctx = new InitialContext(jndiProperties);
       
        LOG.info("Connected to Provider URL " + ctx.getEnvironment().get(Context.PROVIDER_URL));
        LOG.info("Connected to INITIAL CONTEXT Factory " +             ctx.getEnvironment().get(Context.INITIAL_CONTEXT_FACTORY));
        
    }

    public QueueConnectionFactory lookupQueueConnectionFactory() throws   javax.naming.NamingException {

        Enumeration enum1=jndiProperties.keys();
        while (enum1.hasMoreElements()) {
           String key = enum1.nextElement().toString();
           String value = (String) jndiProperties.get(key);
            LOG.info(" Key: " + key + " Value :" + value);
    }
        LOG.info("Key in lookupQueueConnectionFactory " + jndiProperties.get("java.naming.factory.initial").toString());
        return (QueueConnectionFactory) ctx.lookup("queueConnectionFactory");

    }

    public Queue lookupQueue() throws javax.naming.NamingException {
        return (Queue) ctx.lookup("sai");
    }
    
    public String getUserName() throws javax.naming.NamingException{
        return ctx.lookup("username").toString();
    }
    
    public String getPassword() throws javax.naming.NamingException{
        return ctx.lookup("password").toString();
    }
}

3. ProducerImpl.java

public class ProducerImpl {

    private static Logger LOG = LoggerFactory.getLogger(ProducerImpl.class);

    private static QueueConnectionFactory queueConnectionFactory = null;
    private static QueueSession queueSession = null;
    private static QueueConnection queueConnection = null;
    private static MessageProducer producer = null;
    private static Queue queue;
    private static TextMessage message;
    private static File xsdFile;
    private static File xmlFile;

    /**
     * Default Constructor
     */
    public ProducerImpl() {
        DOMConfigurator.configure(LoadConstants.SLF4JPATH);
        xsdFile = new File(ProducerImpl.class.getResource(LoadConstants.INPUT_XSD).getFile());
        xmlFile = new File(ProducerImpl.class.getResource(LoadConstants.INPUT_XML).getFile());
    }

    public static void main(String[] args) throws Throwable {
        // TODO Auto-generated method stub
        // send message
         ProducerImpl producerImpl = new ProducerImpl();
         producerImpl.produceMessages();
    }

    /**
     * This method will send messages to Queue only if given xmlfile is valid.
     * else it logs the invalid xml message and closes the queue connection.
     * 
     * @throws Throwable
     */
    public void produceMessages() throws Throwable {
        try {
            if (DotPathUtil.getNewInstance().validate(xmlFile, xsdFile)) {
                String xmlString = DotPathUtil.convertXMLFileToString(xmlFile);

                // Get the Connection using JNDI
                getConnection();

                // create queue session using JNDI
                queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

                // get queue name using JNDI lookup
                queue = JndiLookup.getInstance().lookupQueue();

                LOG.info("sending valid xml message.....");

                // create the producer
                producer = queueSession.createProducer(queue);

                // create the message
                message = queueSession.createTextMessage(xmlString);

                // send the message
                producer.send(message);
                LOG.info("valid xml message sent.....");

            } else {
                // else log the invalid message
                LOG.info("Logging the Invalid Message");
                String xmlString = DotPathUtil.convertXMLFileToString(xmlFile);
                // print the invalid message
                LOG.info(xmlString);
            }
            xmlFile = null;
            xsdFile = null;

            LOG.info("Program Ended");

        } catch (Exception e) {

            e.printStackTrace();
        } finally {
            if (queueSession != null && queueConnection != null) {
                queueSession.close();
                queueConnection.close();
            }
        }
    }

    /**
     * This method creates the connection using the JNDI lookup.
     * 
     * @throws Throwable
     */
    public static void getConnection() throws Throwable {
        try {
            JndiLookup.getInstance().loadProperties();
            JndiLookup.getInstance().connectToJNDI();
            queueConnectionFactory = JndiLookup.getInstance().lookupQueueConnectionFactory();
            //queueConnection = queueConnectionFactory.createQueueConnection(JndiLookup.getInstance().getPassword(),JndiLookup.getInstance().getPassword());
            queueConnection = queueConnectionFactory.createQueueConnection();
        } catch (final Throwable ex) {
            throw ex;
        }
    }
}


3. ConsumerApplication

public class ConsumerImpl{

    private static Logger LOG = LoggerFactory.getLogger(ConsumerImpl.class);
    private static QueueReceiver queueReceiver = null;
    private static QueueSession queueSession = null;
    private static QueueConnection queueConnection = null;
    private static QueueConnectionFactory queueConnectionFactory = null;
    private static TextMessage message=null;
    private static File xsdFile=null;

    public ConsumerImpl() {
        DOMConfigurator.configure(LoadConstants.SLF4JPATH);
        xsdFile = new File(ConsumerImpl.class.getResource(LoadConstants.INPUT_XSD).getFile());
    }

    /**
     * This method consumes the
     * 
     * @throws JMSException
     */
    public void consumeMessage() throws JMSException {
        try {
            JndiLookup.getInstance().loadProperties();
            JndiLookup.getInstance().connectToJNDI();
            queueConnectionFactory = JndiLookup.getInstance().lookupQueueConnectionFactory();

            queueConnection = queueConnectionFactory.createQueueConnection();
            queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

            queueReceiver = queueSession.createReceiver(JndiLookup.getInstance().lookupQueue());
            LOG.info("QUEUE Obtained" + queueReceiver.getQueue().getQueueName());
            queueConnection.start();

            while (true) {
                Message m = queueReceiver.receive(1);
                if (m != null) {
                    if (m instanceof TextMessage) {
                        message = (TextMessage) m;
                        LOG.info("Consuming message: " + message.getText());
                        if (DotPathUtil.getNewInstance().validate(message.getText(), xsdFile))
                            DotPathUtil.createFile(message.getText(), xsdFile);
                    } else {
                        break;
                    }
                }
            }
            xsdFile = null;
            LOG.info("Program Ended");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            queueReceiver.close();
            queueSession.close();
            queueConnection.close();
        }
    }
}

4. Utility Java DotPath Class which performs validation of given XML. This java class is common utility class which performs validation for placing the messages in the queue and also peforming XML validation after consuming messsages from the queue.

public class DotPathUtil implements DotPath {

    private static Logger LOG = LoggerFactory.getLogger(DotPathUtil.class);
    private static String root = System.getProperty("user.dir") + "\\";
    private static final DotPathUtil dotPathUtil = new DotPathUtil();
    private static StringBuilder sb = new StringBuilder();
    private static ArrayList<StringBuilder> al = new ArrayList<StringBuilder>();

    private DotPathUtil() {

    }

    public static void createFile(String str, File xsdFile) throws FileNotFoundException, XPathExpressionException, SAXException, IOException, ParserConfigurationException, JMSException {

        boolean createdDir = false;
        FileOutputStream fop = null;

        try {
            getPaths(str);
            LOG.info("reading the xml: " + str);
            Format timestamp = new SimpleDateFormat("yyyy-MM-dd-HH.mm.ss.Z");
            for (int i = 0; i < al.size(); i++) {
                File directory = new File(al.get(i).toString());
                if (!directory.exists()) {
                    createdDir = directory.mkdirs();
                    if (createdDir)
                        LOG.info("Directory created " + directory.getAbsolutePath());
                    LOG.info("Creating File in newly created directory");
                    // fw = new FileWriter(directory + "\\"+
                    // timestamp.format(new Date()) + ".txt");
                    fop = new FileOutputStream(directory + "\\" + timestamp.format(new Date()) + ".txt");

                    byte[] stringContentInBytes = str.getBytes();
                    /*
                     * br = new BufferedReader(new FileReader(str)); String inp;
                     * while ((inp = br.readLine()) != null)
                     */
                    // fw.write(stringContentInBytes + "\n");
                    fop.write(stringContentInBytes);
                    fop.flush();
                    fop.close();
                    LOG.info("File Written successfully at: " + directory.getAbsolutePath());
                } else {
                    LOG.info("Creating File in already existing directory");
                    fop = new FileOutputStream(directory + "\\" + timestamp.format(new Date()) + ".txt");
                    byte[] stringContentInBytes = str.getBytes();
                    fop.write(stringContentInBytes);
                    fop.flush();
                    fop.close();
                    LOG.info("File Written successfully at: " + directory.getAbsolutePath());
                }
            }
            LOG.info("Files Written successfully");
            //sb=new StringBuilder();
        } catch (SAXException | ParserConfigurationException | IOException | JMSException e) {
            e.printStackTrace();
        } finally {
            if(fop!=null)
              fop.close();
        }
    }

    public static InputStream getInputStream(String str) throws JMSException {

        InputStream inputStream = new ByteArrayInputStream(str.getBytes());
        return inputStream;
    }

    public static void getPaths(String str) throws FileNotFoundException, SAXException, IOException, ParserConfigurationException, JMSException {
        // StringBuilder sb = new StringBuilder();
        Document document = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(getInputStream(str));
        // LOG.info(document.getDocumentElement());
        if(sb!=null)
            sb=new StringBuilder();
        getChildPath(document.getChildNodes());
        

        // return sb.toString();
    }

    public static void getChildPath(NodeList nodeList){
        
        //for(int j=0;j<nodeList.getLength();j++)
        for (int i = 0; i < nodeList.getLength(); i++) {
            Node currentNode = nodeList.item(i);
            System.out.println("CurrentNode name: "+ currentNode);
            if (currentNode.getNodeType() == Node.ELEMENT_NODE) {
                if (currentNode.getNodeName().trim().equalsIgnoreCase("root"))
                { 
                    System.out.println(sb);
                    sb.append(root);
                }
                else{ 
                    if(!currentNode.hasChildNodes()){
                        StringBuilder sb1= new StringBuilder();                
                        al.add(sb1.append(sb+ currentNode.getNodeName() +"\\") );
                        System.out.println("In Leaf Node " +sb1);
                    }
                    else{
                        
                        sb.append(currentNode.getNodeName() + "\\");
                        System.out.println(sb);
                    } 
                }    
                    
                getChildPath(currentNode.getChildNodes());
            }
        }
    }

    public static String getPath(String str) throws FileNotFoundException, SAXException, IOException, ParserConfigurationException, XPathExpressionException, JMSException {
        StringBuilder sb = new StringBuilder();
        try {
            LOG.info("in getPath >" + str + "<");
            Document document = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(getInputStream(str));
            NodeList nl = document.getChildNodes();
            LOG.info("Childs :" + nl.getLength());
            for (int i = 0; i < nl.getLength(); i++) {
                Node n = nl.item(i);
                if (n.getNodeType() == Node.ELEMENT_NODE) {
                    if (n.getNodeName().trim().equalsIgnoreCase("root"))
                        sb.append(root + "\\");
                    else
                        sb.append(n.getNodeName() + "\\");
                    nl = n.getChildNodes();
                    i = -1;
                }
            }

            // return sb.toString();
        } catch (SAXException | ParserConfigurationException | IOException | JMSException e) {
            e.printStackTrace();
        }
        return sb.toString();
    }

    @Override
    public boolean validate(String str, File xsdFile) {

        boolean isValid = false;

        try {
            // Create source objects for XML and XSD files
            Source xmlSource = new StreamSource(DotPathUtil.getInputStream(str));
            Source xsdSource = new StreamSource(xsdFile);

            // Create SchemaFactory using XSD file
            SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
            Schema schema = factory.newSchema(xsdSource);

            // Validator will use xsd file for validating the XML file
            Validator validator = schema.newValidator();

            // Validate the DOM tree.
            validator.validate(xmlSource);

            isValid = true;

        } catch (Exception e) {
            e.printStackTrace();
        }

        if (isValid == true)
            LOG.info("Document is valid!");
        else
            LOG.info("Document is NOT valid!");

        return isValid;
    }

    @Override
    public boolean validate(File xmlFile, File xsdFile) {
        // TODO Auto-generated method stub
        boolean isValid = false;

        try {
            // Create source objects for XML and XSD files
            Source xmlSource = new StreamSource(xmlFile);
            Source xsdSource = new StreamSource(xsdFile);
            // Create SchemaFactory using XSD file
            SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
            Schema schema = factory.newSchema(xsdSource);

            // Validator will use xsd file for validating the XML file
            Validator validator = schema.newValidator();

            // Validate the DOM tree.
            validator.validate(xmlSource);
            isValid = true;

        } catch (Exception e) {
            e.printStackTrace();
        }

        if (isValid == true)
            LOG.info("Document is valid!");
        else
            LOG.info("Document is NOT valid!");

        return isValid;
    }

    public static String convertXMLFileToString(File xmlFile) throws SAXException, ParserConfigurationException, TransformerFactoryConfigurationError, TransformerException, IOException {
        try {
            Document document = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(new FileInputStream(xmlFile));
            StringWriter stw = new StringWriter();
            Transformer serializer = TransformerFactory.newInstance().newTransformer();
            serializer.transform(new DOMSource(document), new StreamResult(stw));
            return stw.toString();
        } catch (SAXException | ParserConfigurationException | TransformerFactoryConfigurationError | IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static DotPathUtil getNewInstance() {
        return dotPathUtil;
    }

}



No comments:

Post a Comment