viernes, 16 de enero de 2015

Introducción al ESB de WSO2 a través de ejemplos prácticos. VI

Existe una empresa, a la cual llamaremos “Empresa A” que posee un sistema de órdenes de compra que genera diariamente un fichero con los datos de las órdenes a realizar.
Este fichero es procesado por el personal de la empresa el cual se encarga de notificar del recibo de la orden a los destinatarios de la misma y de insertar las órdenes en una base de datos.

La “Empresa A” ha decidido automatizar el proceso, como un primer paso en un desarrollo general de sus sistemas a manera de PoC, y está buscando una solución que le permita un desarrollo rápido y que pueda evolucionar en el tiempo para integrar esta funcionalidad de negocio con otros sistemas que se están diseñando. La idea es que si se logra un éxito en este primer paso se podrá continuar con los siguientes en poco tiempo.

Bajo este esquema, la “Empresa A” nos ha pedido realizar dicha prueba de concepto, casi el primer paso completo, y para ello nos han dado los siguientes requerimientos:

  1. El fichero generado por el sistema antiguo puede encontrarse en el filesystem del mismo servidor donde esté la solución o puede estar en un ftp, aun no se deciden. Así que la solución debe permitir cambiar rápidamente de ubicación sin mucha complicación y con 0 codificación, solo configuración.
  2. En caso de que el fichero se almacene en un ftp se debe proporcionar algún mecanismo de seguridad, mínimo usuario y contraseña. Aunque aun está por determinar si se usará sftp, ftps, o algún otro mecanismo.
  3. Cada orden será una línea en el fichero y contendrá el ID de la orden, el ID del cliente, la fecha de la orden y el monto de la misma. Esto para la PoC pues la orden contiene mucha más información.
  4. Cada orden debe ser guardada en BD para su uso por futuros sistemas, además de permitir llevar un registro de las órdenes emitidas.
  5. Los clientes deben ser notificados tan pronto el sistema inicia el procesamiento de la orden, incluyendo los datos de la misma.
  6. En BD se almacenan los datos de los clientes así que la solución debe ser capaz de consultarlos mientras se procesa la orden.
  7. La solución debe ser liviana, para que pueda correr en los servidores de la empresa, los cuales son de prestaciones bajas.
  8. La PoC debe desarrollarse en menos de una semana.

Aunque es un escenario ficticio para mostrar lo fácil que es implementar con WSO2, bien podría ser un requerimiento real.
Revisando estos requerimientos y teniendo en cuenta las características de la “Empresa A”  hemos escogido usar la suite de WSO2 para su desarrollo. Las causas son las siguientes:

  1. No existen servidores de muy buenas prestaciones en “Empresa A” eso hace que debamos buscar una tecnología que corra en 1GB de RAM sin muchos problemas y que no consuma demasiados recursos de CPU y espacio en disco.
  2. Se desea que la PoC sea desarrollada en menos de una semana. Así que eso significa que no se debe reinventar la rueda y que debemos evitar mucha codificación.
  3. WSO2 tiene una comunidad de desarrolladores en un aumento significativo en el área. Tiene documentación oficial y existen muchos blogs independientes con tutoriales y casos de éxito que avalan su factibilidad técnica. Esto siempre es importante tenerlo en cuenta a la hora de buscar soluciones ya implementadas o ayuda rápida ante un problema puntual.
  4. Los clientes desean que la solución sea escalable y adaptable a nuevos escenarios de integración con otros sistemas. Así que un enfoque de integración de sistemas tiene sentido, y el WSO2 ESB se presta bastante bien para este enfoque.
  5. Como la empresa es pequeña no puede disponer de un gran presupuesto para comprar técnología cara y como WSO2 es gratis y hace lo mismo o casi lo mismo que esas tecnologías caras pues tiene sentido probar. Además no son tontos, piden una PoC primero. Y si les funciona pues pueden contar con soporte oficial de WSO2 que tampoco es caro.

Así que manos a la obra y veamos que encontramos para solucionar el problema. San Google viene a nuestro rescate y ya tenemos en nuestras manos la PoC practicamente implementada. Es lo bueno de contar con una comunidad como la de WSO2.  Este enlace nos viene como anillo al dedo: http://wso2.org/library/articles/2012/01/integrating-different-systems-with-wso2-esb lo tiene todo o casi todo.

El escenario que implementan es el siguiente:
image

Analicemos los aspectos técnicos de la solución que propone WSO2 a nuestro problema:
  1. El ESB debe estar comprobando cada determinado periodo de tiempo si se pone un fichero con el listado de órdenes en el FTP o filesystem, según se configure. Además debe ser capaz de procesar el fichero y generar un XML entendible por el resto de la solución.
  2. Cada orden debe procesarse, así que se debe iterar por cada línea del fichero extrayendo la información y completándola con la información de los clientes, digamos que nos basta con tener su nombre y correo electrónico.
  3. Luego esta información debe ser enviada por correo al cliente y finalmente almacenada en una BD.

Para el punto 1 sabemos que el ESB de WSO2 cuenta con un transporte llamado VFS, o Virtual File System, que nos permite conectarnos a un filesystem local o remoto o a un ftp usando diferentes mecanismos.
Lamentablemente :-D el ESB no puede magicamente entender el fichero y llevarlo a un XML de utilidad, así que debemos implementar un componente que sea capaz de hacer esto, este componente se llama “CSVFile Builder” y debe implementar la interfaz org.apache.axis2.builder.Builder.

Luego vendrá un servicio proxy que recibirá el XML ya elaborado, iterará por cada orden y se encargará de obtener los datos de los usuarios, enviar el correo y finalmente guardar la información de la orden en una BD. Para el envío del correo necesitamos darle algún formato a los datos, así que implementaremos otro componente llamado “EmailMessage Formatter” y que implementa la interfaz org.apache.axis2.transport.MessageFormatter.
La información de las órdenes se guardará como se muestra en la siguiente línea:

001,001,2011-12-26T18:28:48.214+05:30,456.76

[codigo de la orden], [codigo id del usuario], [fecha de la orden], [monto de la orden]

La implementación del componente CSVFile Builder es la siguiente:

package sample.shop.builder;

import org.apache.axiom.om.OMAbstractFactory;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMNamespace;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axiom.soap.SOAPFactory;
import org.apache.axis2.AxisFault;
import org.apache.axis2.builder.Builder;
import org.apache.axis2.context.MessageContext;

import java.io.*;

import sample.shop.formatter.EmailMessageFormatter;

import javax.xml.stream.XMLStreamException;

public class CSVFileBuilder implements Builder {

    public OMElement processDocument(InputStream inputStream,
                                     String s,
                                     MessageContext messageContext) throws AxisFault {

        SOAPFactory soapFactory = OMAbstractFactory.getSOAP11Factory();
        SOAPEnvelope soapEnvelope = soapFactory.getDefaultEnvelope();

        OMNamespace omNamespace = soapFactory.createOMNamespace("http://wso2.org/sample/shop/order", "ns1");

        OMElement batchRequestElement =
                soapFactory.createOMElement("AddOrder_batch_req", omNamespace);

        OMElement addOrderElement = null;

        OMElement orderIDElement = null;
        OMElement customerIDElement = null;
        OMElement dateElement = null;
        OMElement priceElement = null;

        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
        try {
            String readLine = bufferedReader.readLine();
            String[] values = null;
            while (readLine != null) {
                addOrderElement = soapFactory.createOMElement("AddOrder", omNamespace);
                values = readLine.split(",");

                //adding child elements.
                orderIDElement = soapFactory.createOMElement("orderID", omNamespace);
                orderIDElement.setText(values[0]);
                addOrderElement.addChild(orderIDElement);

                customerIDElement = soapFactory.createOMElement("customerID", omNamespace);
                customerIDElement.setText(values[1]);
                addOrderElement.addChild(customerIDElement);

                dateElement = soapFactory.createOMElement("date", omNamespace);
                dateElement.setText(values[2]);
                addOrderElement.addChild(dateElement);

                priceElement = soapFactory.createOMElement("price", omNamespace);
                priceElement.setText(values[3]);
                addOrderElement.addChild(priceElement);

                batchRequestElement.addChild(addOrderElement);
                readLine = bufferedReader.readLine();
            }

            soapEnvelope.getBody().addChild(batchRequestElement);
            return soapEnvelope;

        } catch (IOException e) {
            throw new AxisFault("Can not read the input stream", e);
        } finally {
            try {
                bufferedReader.close();
            } catch (IOException e) {
                System.out.println("Error in closing the reader");
            }
        }

    }
}


Este componente recibe la información del fichero en un InputStream, crea la estructura del mensaje que será procesado en el ESB y luego itera por cada línea rellenando el mensaje con los datos de las órdenes.


El mensaje generado entra al siguiente proxy:

<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse" name="OrderProcessor" transports="https http vfs" startOnLoad="true" trace="disable">
    <target>
        <inSequence>
            <log level="full"/>
            <property name="OUT_ONLY" value="true"/>
            <clone continueParent="true" sequential="true">
                <target>
                    <sequence>
                        <iterate xmlns:sn="http://wso2.org/sample/shop/order" id="orderIterator" expression="//sn:AddOrder_batch_req/sn:AddOrder" sequential="true">
                            <target>
                                <sequence>
                                    <log level="full"/>
                                    <dblookup>
                                        <connection>
                                            <pool>
                                                <password>your_password</password>
                                                <user>postgres</user>
                                                <url>jdbc:postgresql://localhost:5432/SHOP_DB</url>
                                                <driver>org.postgresql.Driver</driver>
                                            </pool>
                                        </connection>
                                        <statement>
                                            <sql>select "EMAIL_C","NAME_C" from "CUSTOMER_T" where "CUSTOMER_ID_C" = ?</sql>
                                            <parameter expression="//sn:AddOrder/sn:customerID" type="VARCHAR"/>
                                            <result name="email" column="EMAIL_C"/>
                                            <result name="name" column="NAME_C"/>
                                        </statement>
                                    </dblookup>
                                    <log level="custom">
                                        <property name="email" expression="get-property('email')"/>
                                        <property name="name" expression="get-property('name')"/>
                                    </log>
                                    <xslt key="orderTransformer">
                                        <property name="email" expression="get-property('email')"/>
                                        <property name="name" expression="get-property('name')"/>
                                    </xslt>
                                    <log level="full"/>
                                    <property name="messageType" value="text/csv" scope="axis2"/>

         <property name="Subject" expression="fn:concat('Customer Data: ', get-property('name'))" scope="transport"/>
                                    <header name="To" expression="fn:concat('mailto:', get-property('email'))"/>
                                    <send/>
                                </sequence>
                            </target>
                        </iterate>
                    </sequence>
                </target>
            </clone>
            <log level="full"/>
            <property name="messageType" value="application/soap+xml" scope="axis2"/>
            <send>
                <endpoint>
                    <address uri="http://localhost:9765/services/OrderService"/>
                </endpoint>
            </send>
            <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
        </inSequence>
    </target>
    <parameter name="transport.vfs.ActionAfterProcess">MOVE</parameter>
    <parameter name="transport.PollInterval">5</parameter>
 <parameter name="transport.vfs.FileURI">vfs:ftp://user:tupass@tuserver/projects/WSO2/data</parameter>
    <parameter name="transport.vfs.MoveAfterProcess">vfs:ftp://user:tupass@tuserver/projects/WSO2/data/processed</parameter>
    <parameter name="transport.vfs.MoveAfterFailure">vfs:ftp://user:tupass@tuserver/projects/WSO2/data/failure</parameter>
 <parameter name="transport.vfs.Locking">disable</parameter>
 <!-- En caso de que deseen usar el filesystem en vez 
      de un ftp deben descomentariar las 3 líneas 
   siguientes y comentar las 3 de las mismas 
   propiedades que apuntan a un ftp-->
 <!--parameter name="transport.vfs.MoveAfterProcess">E:\Work\WSO2\sample_8\sample\data\processed</parameter-->
    <!--parameter name="transport.vfs.FileURI">E:\Work\WSO2\sample_8\sample\data\</parameter-->
    <!--parameter name="transport.vfs.MoveAfterFailure">E:\Work\WSO2\sample_8\sample\data\failure</parameter-->
    <parameter name="transport.vfs.FileNamePattern">.*.txt</parameter>
    <parameter name="transport.vfs.ContentType">text/csv</parameter>
    <parameter name="transport.vfs.ActionAfterFailure">MOVE</parameter>
</proxy>


Analicemos este proxy:


Por la secuencia de entrada recibe un mensaje y usando un mediador log, lo imprime en consola.

<?xml version="1.0" encoding="utf-8"?>
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
 <soapenv:Body>
  <ns1:AddOrder_batch_req xmlns:ns1="http://wso2.org/sample/shop/order">
   <ns1:AddOrder>
    <ns1:orderID>001</ns1:orderID>
    <ns1:customerID>001</ns1:customerID>
    <ns1:date>2011-12-26T18:28:48.214+05:30</ns1:date>
    <ns1:price>456.76</ns1:price>
   </ns1:AddOrder>
   <ns1:AddOrder>
    <ns1:orderID>002</ns1:orderID>
    <ns1:customerID>003</ns1:customerID>
    <ns1:date>2011-12-27T18:28:48.214+05:30</ns1:date>
    <ns1:price>451.76</ns1:price>
   </ns1:AddOrder>
   <ns1:AddOrder>
    <ns1:orderID>003</ns1:orderID>
    <ns1:customerID>003</ns1:customerID>
    <ns1:date>2011-12-27T18:28:48.214+05:30</ns1:date>
    <ns1:price>451.76</ns1:price>
   </ns1:AddOrder>
  </ns1:AddOrder_batch_req>
 </soapenv:Body>
</soapenv:Envelope>


Luego clona el mensaje y lo manda a un mediador iterate, que se encarga usando xpath de obtener cada una de las órdenes contenidas en el mensaje.

Un mensaje con una orden tiene la siguiente estructura:

<?xml version="1.0" encoding="utf-8"?>
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
 <soapenv:Body>
  <ns1:AddOrder xmlns:ns1="http://wso2.org/sample/shop/order">
   <ns1:orderID>001</ns1:orderID>
   <ns1:customerID>001</ns1:customerID>
   <ns1:date>2011-12-26T18:28:48.214+05:30</ns1:date>
   <ns1:price>456.76</ns1:price>
  </ns1:AddOrder>
 </soapenv:Body>
</soapenv:Envelope>


  Para cada orden realiza las siguientes acciones:

  • Usa el mediador dblookup para realizar una consulta a la BD y usando el valor del customerID obtener para ese cliente su nombre y correo electrónico.
  • Usa el mediador log para imprimer el nombre y el correo en la consola.
  • Usa el mediador xslt para transformar el mensaje pasándole como parámetros el nombre y el correo y agregandolos a la estructura del mensaje. Esto se hace en la transformación orderTransformer que pueden ver a continuación: 
<?xml version="1.0" encoding="UTF-8"?>
<localEntry xmlns="http://ws.apache.org/ns/synapse" key="orderTransformer">
    <xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:ns1="http://wso2.org/sample/shop/order" version="1.0">
        <xsl:output method="xml" omit-xml-declaration="yes"/>
        <xsl:param name="email"/>
        <xsl:param name="name"/>
        <xsl:template match="/">
            <xsl:apply-templates select="//ns1:AddOrder"/>
        </xsl:template>
        <xsl:template match="ns1:AddOrder">
            <ns1:AddOrder>
                <ns1:orderID>
                    <xsl:value-of select="//ns1:orderID/text()"/>
                </ns1:orderID>
                <ns1:customerID>
                    <xsl:value-of select="//ns1:customerID/text()"/>
                </ns1:customerID>
                <ns1:date>
                    <xsl:value-of select="//ns1:date/text()"/>
                </ns1:date>
                <ns1:price>
                    <xsl:value-of select="//ns1:price/text()"/>
                </ns1:price>
                <ns1:email>
                    <xsl:value-of select="$email"/>
                </ns1:email>
                <ns1:name>
                    <xsl:value-of select="$name"/>
                </ns1:name>
            </ns1:AddOrder>
        </xsl:template>
    </xsl:stylesheet>
</localEntry>

El resultado de esta transformación se ve en el siguiente mensaje:

<?xml version="1.0" encoding="utf-8"?>
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
 <soapenv:Body>
  <ns1:AddOrder xmlns:ns1="http://wso2.org/sample/shop/order">
   <ns1:orderID>001</ns1:orderID>
   <ns1:customerID>001</ns1:customerID>
   <ns1:date>2011-12-26T18:28:48.214+05:30</ns1:date>
   <ns1:price>456.76</ns1:price>
   <ns1:email>isildurmac@gmail.com</ns1:email>
   <ns1:name>IsildurMac</ns1:name>
  </ns1:AddOrder>
 </soapenv:Body>
</soapenv:Envelope>

  • Imprime el nuevo mensaje en la consola.
  • A continuación se setea una property para definir el messageType que debe ser de ese tipo para que se pueda disparar el componente para el envío del correo.
  • Se define el header to, incluyendo la dirección de correo del destinatario.
  • Finalmente se llama al mediador send. Aquí gracias al property que define el messageType se llama a un componente encargado de crear el mensaje de correo que será enviado al cliente. Veremos su código ahora:
package sample.shop.formatter;

import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMOutputFormat;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.transport.MessageFormatter;

import javax.xml.namespace.QName;
import java.io.IOException;
import java.io.OutputStream;
import java.io.ByteArrayOutputStream;
import java.net.URL;

public class EmailMessageFormatter implements MessageFormatter {

    public byte[] getBytes(MessageContext messageContext,
                           OMOutputFormat omOutputFormat) throws AxisFault {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        writeTo(messageContext, byteArrayOutputStream);
        return byteArrayOutputStream.toByteArray();
    }

    public void writeTo(MessageContext messageContext,
                        OMOutputFormat omOutputFormat,
                        OutputStream outputStream, boolean b) throws AxisFault {
        writeTo(messageContext, outputStream);

    }

    private void writeTo(MessageContext messageContext, OutputStream outputStream) throws AxisFault {
        SOAPEnvelope soapEnvelope = messageContext.getEnvelope();
        OMElement addOrderElement = soapEnvelope.getBody().getFirstElement();

        String orderID = addOrderElement.getFirstChildWithName(
                new QName("http://wso2.org/sample/shop/order", "orderID")).getText();
        String email = addOrderElement.getFirstChildWithName(
                new QName("http://wso2.org/sample/shop/order", "email")).getText();
        String name = addOrderElement.getFirstChildWithName(
                new QName("http://wso2.org/sample/shop/order", "name")).getText();
        String date = addOrderElement.getFirstChildWithName(
                new QName("http://wso2.org/sample/shop/order", "date")).getText();
        String price = addOrderElement.getFirstChildWithName(
                new QName("http://wso2.org/sample/shop/order", "price")).getText();

        try {
            outputStream.write("Order ID : ".getBytes());
            outputStream.write(orderID.getBytes());
            outputStream.write("\n".getBytes());

            outputStream.write("Email : ".getBytes());
            outputStream.write(email.getBytes());
            outputStream.write("\n".getBytes());

            outputStream.write("Name : ".getBytes());
            outputStream.write(name.getBytes());
            outputStream.write("\n".getBytes());

            outputStream.write("Date : ".getBytes());
            outputStream.write(date.getBytes());
            outputStream.write("\n".getBytes());

            outputStream.write("Price : ".getBytes());
            outputStream.write(price.getBytes());
            outputStream.write("\n".getBytes());


        } catch (IOException e) {
            throw new AxisFault("Can not write to the output stream");
        }

    }

    public String getContentType(MessageContext messageContext,
                                 OMOutputFormat omOutputFormat,
                                 String s) {
        return "text/csv";
    }

    public URL getTargetAddress(MessageContext messageContext,
                                OMOutputFormat omOutputFormat,
                                URL url) throws AxisFault {
        return url;
    }

    public String formatSOAPAction(MessageContext messageContext,
                                   OMOutputFormat omOutputFormat,
                                   String s) {
        return s;
    }
}

  • Aquí se obtiene del messageContext el mensaje SOAP y de sus nodos se extraen los valores necesarios para incluirlos en el correo a enviar. Luego se escriben fuera del código el contenido del OutputStream  se manda como un adjunto por correo. El envío por correo se logra mediante una configuración en el fichero axis2.xml del ESB.
<transportSender name="mailto" class="org.apache.axis2.transport.mail.MailTransportSender">
    <parameter name="mail.smtp.host">tuserverdecorreo</parameter>
    <parameter name="mail.smtp.port">puerto</parameter>
    <parameter name="mail.smtp.starttls.enable">false</parameter>
    <parameter name="mail.smtp.auth">false</parameter>
    <parameter name="mail.smtp.user">tuusuario</parameter>
    <parameter name="mail.smtp.password">*****</parameter>
    <parameter name="mail.smtp.from">tucorreo@gmail.com</parameter>
</transportSender> 
  • Ahora vendría para finalizar la implementación el guardar las órdenes en BD. Pero esto se hace fuera del mediador clone, el cual se usó para mantener el mensaje original y poder trabajar luego con este mensaje.
  • Una vez que salimos del mediador clone imprimimos nuevamente el mensaje.
  • Volvemos a cambiar el messageType al formato de mensaje SOAP.
  • Y enviamos este mensaje con todas las órdenes a un servicio de acceso a datos  que mostramos su configuración a continuación. Vena en particular la propiedad enableBatchRequests="true" porque esta es la que permite que podamos enviarle todas las órdenes y estas sean procesadas por el servicio.
<data name="OrderService" enableBatchRequests="true" serviceNamespace="http://wso2.org/sample/shop/order">
   <config id="shopdatasource">
      <property name="org.wso2.ws.dataservice.driver">org.postgresql.Driver</property>
      <property name="org.wso2.ws.dataservice.protocol">jdbc:postgresql://localhost:5432/SHOP_DB</property>
      <property name="username">postgres</property>
      <property name="password">tupass</property>
   </config>
   <query id="addOrderQuery" useConfig="shopdatasource">
      <sql>insert into "ORDER_T" values (:orderID, :customerID, :date,:price);</sql>
      <param name="orderID" sqlType="STRING" />
      <param name="customerID" sqlType="STRING" />
      <param name="date" sqlType="TIMESTAMP" />
      <param name="price" sqlType="DOUBLE" />
   </query>
   <operation name="AddOrder">
      <call-query href="addOrderQuery">
         <with-param name="orderID" query-param="orderID" />
         <with-param name="customerID" query-param="customerID" />
         <with-param name="date" query-param="date" />
         <with-param name="price" query-param="price" />
      </call-query>
   </operation>
</data>
  
  • Para aquellos que son detallistas deben haber notado que el mensaje creado a partir del fichero tenía como elemento raíz uno de nombre “AddOrder_batch_req” y esto no es casual, es para poder enlazar sin problema este mensaje con el servicio de acceso a datos invocando a la operación de procesamiento en batch.


Así termina el servicio proxy y los invito a probarlo, descargando los ficheros necesarios desde el enlace provisto en el blog original que puse al inicio o directamente desde aquí http://wso2.org/files/sample_8.zip.

Algunos cambios hechos a las configuraciones originales:
  1. Se usaba antes el filesystem para tener el fichero. En lo que muestro en esta entrada uso un ftp.
  2.  El SGBD era MySQL, acá usé PostgreSQL.

0 comentarios:

Publicar un comentario