Wednesday, October 14, 2015

BPEL correlation and Message Aggregation


How does BPEL identify which response is for which request?
We all know that there are two major type of processing synchronous and asynchronous and for synchronous processing it uses same TCP connection or in simpler terms single port. So there is no need of correlation for synchronous process.
For Asynchronous processing, BPEL process will usually correlate message using "WS-Addressing", it’s something like web session cookies, which happens internally and we do not need to worry about correlating response messages.


WS-Addressing
The basic question every beginner has is how an asynchronous response gets mapped to the correct instance out of many waiting instances. The answer is, whenever asynchronous service is invoked, Oracle BPM adds a message UID in WS-Addressing headers that flows with SOAP packets, which are used internally by BPEL server to correlate the request/response.

So when does we need manual Correlation?
Below are the scenarios when we will require correlation:
  • When external web service doesn’t support WS-Addressing
  • When the message travels through several services and the response is solicited by the initial service from the last service directly. For example, Request path is A > B > C and response comes to A directly from C, i.e. C > A
  • When receiving unsolicited messages from another system.
  • When communicating via files, i.e. When BPEL instance is expecting messages from a third party.

What is correlation?
It is a BPEL technique called correlation set to match or correlate the messages based on content of the message.
It is a collection of properties used by the BPEL process service engine to allow custom correlation.


Before starting with our example we need to understand in which activities do we need correlation. Well the answer is quite self-explanatory, we need correlation whenever data is going out or coming in to the BPEL process, i.e.
  • Receive activity
  • Reply activity
  • Invoke activity
  • onMessage branch
  • onEvent branch

I will first give the scenario of our example:

you can download the sample from here.
we will have two receive activities in our BPEL process,
One receive will take input as Country Name and Language
Other receive will take input as Country Name and Capital.
The response will have Country Name, Language and Capital.


we will use Country Name to correlate both the incoming messages and form correct output based on both the input.
we will first initiate exposed web service with input Country Name and Language.
It will wait at the other receive activity to receive activity. once it receives the same country name as input for the other exposed service, the first instance will complete processing.

We can see our deployed process has two services, we will first initiate - CountryInput


Here we have given input as Austtralia and English (ignore the typo mistake)
we can see the instance is waiting at the other receive activity.
Let us initiate another request for the same CountryInput to be more clear on behavior.
Here i have given input as Finland and Finnish.

Again the instance is waiting at the other receive activity.
Now let us initiate our other service which is CapitalInput.
Let's try with giving a different Country, other than what we initiated earlier for which the instances are waiting. Here I have given India as country name, but no instance is waiting to receive India, so nothing the previous processes should continue waiting.
As we can see, nothing happened and it did not un pause the previous instances.
Now lets give the country Name as Austtralia, and capital as canbarra, remember for Austtralia, one instance is waiting, so initiating this request should unpause the waiting instance for country name Austtralia.
we can see this has unpaused the previous flow, Lets check the response to see if it has correctly mapped the country, capital and language or not
we can see from the response, the details are correctly mapped, So, we saw that it only unpaused the matching country name, the other instance with country name as finland is still waiting.
Lets unpause the flow for Finland also, here i have given input as Finland and Helsinki
We can see that it has unpaused correct instance and hence formed correct response.





Now let's start with the development of our example:

First create a bpel process with two receive activities, one taking input as country Name and Language and other taking input as country Name and Capital.

create a new application and new project and give a proper name.
Let us first create a schema for our input and outputs - Country.xsd
I have created 3 elements, for two receive and one response
<?xml version="1.0" encoding="windows-1252" ?>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema"
            xmlns="http://www.example.org"
            targetNamespace="http://www.example.org"
            elementFormDefault="qualified">
  <xsd:element name="CountryInput">
    <xsd:annotation>
      <xsd:documentation>
        A sample element
      </xsd:documentation>
    </xsd:annotation>
    <xsd:complexType>
      <xsd:sequence>
        <xsd:element name="CountryName" type="xsd:string"/>
        <xsd:element name="Language" type="xsd:string"/>
      </xsd:sequence>
    </xsd:complexType>
  </xsd:element>
  <xsd:element name="CapitalInput">
    <xsd:complexType>
      <xsd:sequence>
        <xsd:element name="CountryName" type="xsd:string"/>
        <xsd:element name="Capital" type="xsd:string"/>
      </xsd:sequence>
    </xsd:complexType>
  </xsd:element>
  <xsd:element name="Output">
    <xsd:complexType>
      <xsd:sequence>
        <xsd:element name="CountryName" type="xsd:string"/>
        <xsd:element name="Capital" type="xsd:string"/>
        <xsd:element name="Language" type="xsd:string"/>
      </xsd:sequence>
    </xsd:complexType>
  </xsd:element>


</xsd:schema>



Now lets start with our bpel process. drag and drop or add a new bpel process.
for the default receive we want to have Country Name and language as input, so i have selected the elements accordingly and made it asynchronous.





Let us add a webservice, this is for the second receive.

we will generate WSDL for this webservice from the schema, click on the gear button and browse for the CapitalInput element. so for this webservice the input will be Country Name and Capital. Make it a one way process, as we just want to provide input here.


Add a receive activity for the second webservice.


add an assign activity.
Map the output to both the input to get all data.

Now the basic structure is ready, let's start Correlating the requests, as i said before we will use CountryName to correlate requests.
click on correlation sets and add a new correlation set.


Add a new property and name it accordingly, give type as string as Country Name is string.
Now we will add property Aliases, i.e. we will add the country Name coming from both the inputs.
select the element and give query to reach to the field- CountryName,
or you can do this from the Structure Window of the JDeveloper, right click on the 'Property Aliases' and select 'Create Propery Alias'
Select the message type that you want to set to the correlation propery (already created)
similarly select the other input and give query for the other CountryName

Our CorrelationSet, property and property alias is done. we can now add these to our receive activities.
double click on the first receive activity, go to correlations and add the correlation set we just created.
set Initiate as Yes as we want an instance to be initiated for this receive.
When set to yes, correlation set is initiated with the values of the properties available in the message being transferred
When set to no, correlation set validates the value of the property available in the message

Similarly double click on the second receive activity, go to correlations and add the correlation set we just created.
set Initiate as No as we don't want an instance to be initiated for this receive.

Our development is done, we have correlated both the receive activities with Country Name property. Lets deploy.

I got error while compiling because of the namespace we gave in the query field.
When we create CorrelationSet, all the details goes to .wsdl file (if .wsdl file for the project is not present for ex. you are using messaging adapters then it will go to .jca file)
We can see that my query has namespace as ns2 which is not valid.
for our variable the valid namespace is ns1, i just changed the namespace to ns1 and deployed our composite.


Apparantly in 12C Oracle has fixed this and has provided an option to browse to the variable and get query, so in 12C we won't be facing this issue, but for now in 11g you will just have to be cleaver i guess...

11 comments: