Calling Send and Receive Pipelines from the orchestration expression shapes
August 10, 2008 19 Comments
The composed message processing integration pattern implies that a composed message individual records are split up and then routed and processed then aggregate the response into a single message again.
In our example we process Orders from a company just apply a simple transformation that just counts the number of items and then summarize them into a single message again. We will use Flat files as input and output messages. Moreover we will call the receive and send pipelines at runtime in the orchestration and process individual records from the composed message.
First we will create flat file schemas for request and response messages from the flat file schema wizard. We have to create send and receive pipelines in which we will use Flat file assemble and Flat file disassembler components respectively. We have a header in our input flat file so we will create header schema and use it in the receive Pipeline.
Configure the header schema and document schema property of the disassembler component in the receive pipeline. Also configure the document schema property of the of the assembler component in the send pipeline. Keep the schemas and the pipelines project separate from the orchestrations project.
Build the Schemas and Pipelines project and then create another project containing the orchestration. In the first stage of the orchestration call the receive pipeline to disassemble the flat file message. The input and output messages will be flat files so we have to keep the output and input messages of type System.XML.XMLDocument.
To execute the receive pipeline we have to keep the scope of the expression shape as atomic because the pipeline can be executed in the atomic shape. Also when executing the pipeline any failure can occur so we can keep an exception handling block which in our example dumps the message in an error folder.
The Orchestration has three steps execute the receive pipeline, loop through each individual message and make an aggregate message response message by executing the send pipeline that assembles the individual messages.
To execute the pipeline add a reference to the Microsoft.XLANGs.Pipeline.dll and Microsoft.BizTalk.Pipeline.dll assemblies. The pipeline is executed by the following static method.
ReceivePipelineOutput = Microsoft.XLANGs.Pipeline.XLANGPipelineManager.ExecuteReceivePipeline(typeof(ShemasAndPipelines.ReceiveCustOrdres), OrdersInputInterchange);
The method takes the type of pipeline and the input message as input parameters and returns an enumerator of type Microsoft.XLANGs.Pipeline.ReceivePipelineOutputMessages. A variable (ReceivePipelineOutput) of this type is created in the atomic scope variables. When we execute the pipeline we can iterate through all the messages with this enumerator. Below is the code written in the execute pipeline expression shape.
Now in the second step iterate all the messages and save it in the input message of type flat file schema which you created from the flat file wizard (OrdersInputMsg).
You can perform any processing during the loop in your orchestration. For e.g. you can have a correlation for sending the order for approval or route the message or save it into the database. In my scenario I had to create a map that counts the items and map it to the destination message (OrderSummary). The output message can be added to the send pipeline by the variable SendPipelineInput of type Microsoft.XLANGs.Pipeline.SendPipelineInputMessages. And then call its static method add which takes in Microsoft.XLANGs.BaseType.XLANGMessage which is the base class for any message.
OrderSummary message is constructed in the transform shape and each message is then added to a message collection of send pipeline.
In the last step we execute the send pipeline to assemble our collection of messages into a single composite message.
The send pipeline is executed from the code given below in which ExecuteSendPipeline method is executed which takes the type of pipeline, SendPipelineInput variable which holds the collection of the output messages and the Output message of type System.XML.XMLDocument.
Microsoft.XLANGs.Pipeline.XLANGPipelineManager.ExecuteSendPipeline(typeof(ShemasAndPipelines.SendCustOrdersPipeline), SendPipelineInput, OrdersOutputInterchange);
Build and deploy the project, also note that keep the pipelines as Passthrough in the receive location as well as the send port.