Calling Send and Receive Pipelines from the orchestration expression shapes


The composed message processing integration pattern implies that a composed message individual records are split up and then routed and processed then aggregate the response into a single message again. 

In our example we process Orders from a company just apply a simple transformation that just counts the number of items and then summarize them into a single message again. We will use Flat files as input and output messages. Moreover we will call the receive and send pipelines at runtime in the orchestration and process individual records from the composed message.

First we will create flat file schemas for request and response messages from the flat file schema wizard. We have to create send and receive pipelines in which we will use Flat file assemble and Flat file disassembler components respectively. We have a header in our input flat file so we will create header schema and use it in the receive Pipeline.

Configure the header schema and document schema property of the disassembler component in the receive pipeline. Also configure the document schema property of the of the assembler component in the send pipeline. Keep the schemas and the pipelines project separate from the orchestrations project.

Build the Schemas and Pipelines project and then create another project containing the orchestration. In the first stage of the orchestration call the receive pipeline to disassemble the flat file message. The input and output messages will be flat files so we have to keep the output and input messages of type System.XML.XMLDocument.

To execute the receive pipeline we have to keep the scope of the expression shape as atomic because the pipeline can be executed in the atomic shape. Also when executing the pipeline any failure can occur so we can keep an exception handling block which in our example dumps the message in an error folder.

 The Orchestration has three steps execute the receive pipeline, loop through each individual message and make an aggregate message response message by executing the send pipeline that assembles the individual messages.

 

First Step:

 Calling Receive Pipeline

 

To execute the pipeline add a reference to the Microsoft.XLANGs.Pipeline.dll and Microsoft.BizTalk.Pipeline.dll assemblies. The pipeline is executed by the following static method.

ReceivePipelineOutput = Microsoft.XLANGs.Pipeline.XLANGPipelineManager.ExecuteReceivePipeline(typeof(ShemasAndPipelines.ReceiveCustOrdres), OrdersInputInterchange);

The method takes the type of pipeline and the input message as input parameters and returns an enumerator of type Microsoft.XLANGs.Pipeline.ReceivePipelineOutputMessages. A variable (ReceivePipelineOutput) of this type is created in the atomic scope variables. When we execute the pipeline we can iterate through all the messages with this enumerator. Below is the code written in the execute pipeline expression shape.

Second Step:

Now in the second step iterate all the messages and save it in the input message of type flat file schema which you created from the flat file wizard (OrdersInputMsg).

 ReceivePipelineOutput.GetCurrent(OrdersInputMsg);

iteration

You can perform any processing during the loop in your orchestration. For e.g. you can have a correlation for sending the order for approval or route the message or save it into the database. In my scenario I had to create a map that counts the items and map it to the destination message (OrderSummary). The output message can be added to the send pipeline by the variable SendPipelineInput of type Microsoft.XLANGs.Pipeline.SendPipelineInputMessages. And then call its static method add which takes in Microsoft.XLANGs.BaseType.XLANGMessage which is the base class for any message.

SendPipelineInput.Add(OrderSummary);

 OrderSummary message is constructed in the transform shape and each message is then added to a message collection of send pipeline.

Step 3:

In the last step we execute the send pipeline to assemble our collection of messages into a single composite message.

 Calling Send Pipeline

 

The send pipeline is executed from the code given below in which ExecuteSendPipeline method is executed which takes the type of pipeline, SendPipelineInput variable which holds the collection of the output messages and the Output message of type System.XML.XMLDocument.

 Microsoft.XLANGs.Pipeline.XLANGPipelineManager.ExecuteSendPipeline(typeof(ShemasAndPipelines.SendCustOrdersPipeline), SendPipelineInput, OrdersOutputInterchange);

 
Build and deploy the project, also note that keep the pipelines as Passthrough in the receive location as well as the send port.


technorati tags :

20 Responses to Calling Send and Receive Pipelines from the orchestration expression shapes

  1. Walid says:

    Hi Abdul,

    First, thanks for sharing your Biztalk knowledge.

    Secondly, I am new to biztalk and went through your article ‘Calling Send and Receive Pipelines from the orchestration expression shapes’
    and unfortunately I did not managed to have it working.

    Will it be possible if you could send me a working copy of your code to see what I am doing wrong, if it’s not too much asked.?

    Cheers,

    Walid

  2. Abdul Rafay says:

    Hi Walid,

    Here is the sample you asked. Change the extension to .rar.
    https://abdulrafaysbiztalk.wordpress.com/files/2009/06/processingflatfiles.doc

    Thanks,
    Abdul

  3. vinh says:

    I downloaded you sample. When I build the solution befor deplying I got 2 errors:

    Error 2 use of unassigned local variable ‘ReceivePipelineOutput’ C:\Users\Administrator\Downloads\ComposedMessageProcessing\ProcessingFlatFiles\FFProcessor.odx 464 51
    Error 3 use of unassigned local variable ‘ReceivePipelineOutput’ C:\Users\Administrator\Downloads\ComposedMessageProcessing\ProcessingFlatFiles\FFProcessor.odx 471 52

    Actually the variable ‘ReceivePipelineOutput’ was defined in atomic scope. I don’t know why i got the error.

    Best Regards, Vinh

  4. vinh says:

    Hi Abdul,

    Thanks for your prompt reply.

    I Put the following in Message assignment under loop:

    ReceivePipelineOutput = null;
    OrderSchema = null;
    ReceivePipelineOutput.GetCurrent(OrderSchema);

    Then build the solution. One error is gone and stillI got one error left:

    Error 2 use of unassigned local variable ‘ReceivePipelineOutput’ C:\Users\Administrator\Downloads\ComposedMessageProcessing\ProcessingFlatFiles\FFProcessor.odx 464 51

    It’s in ‘ReceivePipelineOutput.MoveNext()’ under loopthroughbatch.

    By the way, I am using VS2008 under Windows 2008 with BizTalk 2009.

    Best Regards,
    Vinh

  5. Abdul Rafay says:

    Ok I have checked my codes and it gave me an error. Can you add a shape in your orchestration in the beginning and assign null to ReceivePipelineOutput variable. The project will then compile. Remove the null assignments in rest of the orchestration.

  6. vinh says:

    Hi Abdul

    I put an Expression Shape just in front of LoopThroughBatch and put ReceivePipelineOutput = null; as the expression.

    I can build the solution successfully, and will deploy/test the solution.

    Thank you so much for the article and your promt response.

    Best Regards,
    Vinh

  7. Yehia says:

    Hi Abdul,

    Thanks so much for this invaluable information, I am new to Biztalk and I have successfully built and deployed the solution but when I drop orders.txt into the receive folder, I don’t get any output or error and I got on the BT admin, this error in the Application event log “Uncaught exception (see the “inner exception below…) Shape name” LoopThroughBatch” exception thrown from sergment 2, progress 14.
    Inner Exception: Object reference not set to an instance of an object.”

    Any idea of what is missing here.

    Thanks in advance.

  8. Yehia says:

    Hi Abdul,

    I managed to resolve this issue and the solution works.

    Thanks.

    • Kobe Chang says:

      Hi Yehia,

      Could u share that how did u fix this error? “Object reference not set to an instance of an object”. Thank you very much.

  9. Ashley says:

    Hi Abdul,

    Great article – I wonder if you can help me though?

    I’m consuming a wcf service from an orchestration using a request-response port and I have set it up to debatch the Response Message but am having problems with the XML disassembler on the pipeline. The response looks like this:


    <GetShiftResponse xmlns="http://arbitraryNamespace/arbitraryApplication/sample/">
    - <GetShiftResult>
    - <Shift xmlns="http://arbitraryOrg.BizTalk.ArbitraryApplication.Shemas.Shift">
    <StartDateTime xmlns="">2010-12-20T08:00:00</StartDateTime>
    <Group xmlns="">Group 5</Group>
    - <Stations xmlns="">
    - <Appliances xmlns="">
    - Appliance xmlns="">

    I’m eventually trying to get at Appliances …

    Hope you can help!

    Ashley

  10. Abdul Rafay says:

    Hi Ashley,

    This technique is also very efficient when it comes to debatching. Any thought of using an xpath and simple debatching? You can find an article on the splitter/debatching pattern on my blog.

    In your case I am not sure if you will be able to do it as you are consuming a WCF service the artifacts from the WSDL will be different. In order to use this technique you should have a envelope & document schema deployed in the BizTalk console. The XMLDisassembler pipeline debatches the document schema records and extracts the envelope XML and promotes it to the message context. The header will be preserved in the context if you set the preserve header property to true.

  11. Pingback: Microsoft » Un traitement BizTalk invoqué en interactif et par lots

  12. Carlos says:

    Thank you for sharing your knwoledge!
    It is great for us learning-BizTalk-developers to have common problems solved and shown the how-tos.

  13. Abhishek says:

    Hi Abdul,
    Can you please explain me why you have created a message of xml.xmldocument type as every message in biztalk msg box has the same type by default .Your receive port or receive shape will not be tightly bounded to any specified shema and Orchestration will try to pick each message comming into biztalk.

    Let me know whether i m wrong..

    regards
    Abhishek

  14. Gaurav Pal says:

    Hi Abdul

    Thanks for the knowledge sharing . I am working on just aggregating the xml messages in the orchestration, I am trying to call the sendpipeline which will aggregate all the messages and will give you one ENVELOPE message. i have tried the last step of your article. But when I try to run the code I get the following error

    “xlang/s engine event log entry: Uncaught exception (see the ‘inner exception’ below) has suspended an instance of service ‘CallPipelineInOrch.Orchestrations.Aggregator(6d4d69a2-430a-b57b-73a8-9a4898b10f45)’.
    The service instance will remain suspended until administratively resumed or terminated.
    If resumed the instance will continue from its last persisted state and may re-throw the same unexpected exception.
    InstanceId: 4429c1c4-0fc8-4254-a6b5-4393d7a6220c
    Shape name: ConstructEnvelope
    ShapeId: d4f61726-2f59-4cd0-9073-e3715e0d1431
    Exception thrown from: segment 2, progress 2
    Inner exception: There was a failure executing pipeline “CallPipelineInOrch.Pipeline.AggregateSend”. Error details: “Token StartElement in state Epilog would result in an invalid XML document.”.

    Exception type: XLANGPipelineManagerException
    Source: Microsoft.XLANGs.Pipeline
    Target Site: Void ExecutePipeline(Microsoft.BizTalk.PipelineOM.SendPipeline, Microsoft.XLANGs.Pipeline.SendPipelineInputMessages, Microsoft.XLANGs.BaseTypes.XLANGMessage)
    The following is a stack trace that identifies the location where the exception occured

    at Microsoft.XLANGs.Pipeline.XLANGPipelineManager.ExecutePipeline(SendPipeline p, SendPipelineInputMessages inMessages, XLANGMessage outMsg)
    at Microsoft.XLANGs.Pipeline.XLANGPipelineManager.ExecuteSendPipeline(Type sendPipelineType, SendPipelineInputMessages inMessages, XLANGMessage outXLANGMsg)
    at CallPipelineInOrch.Orchestrations.Aggregator.segment2(StopConditions stopOn)
    at Microsoft.XLANGs.Core.SegmentScheduler.RunASegment(Segment s, StopConditions stopCond, Exception& exp)
    Additional error information:

    Token StartElement in state Epilog would result in an invalid XML document.

    Exception type: InvalidOperationException
    Source: XML Assembler
    Target Site: Int32 Read(Byte[], Int32, Int32)
    The following is a stack trace that identifies the location where the exception occured

    at Microsoft.BizTalk.Component.XmlAsmStreamWrapper.Read(Byte[] buffer, Int32 offset, Int32 count)
    at Microsoft.XLANGs.Pipeline.XLANGPipelineManager.ConvertStreamToVirtualStream(Stream inputStream)
    at Microsoft.XLANGs.Pipeline.XLANGPipelineManager.WrapMessageWithVirtualStream(IBaseMessage msg)
    at Microsoft.XLANGs.Pipeline.XLANGPipelineManager.ExecutePipeline(SendPipeline p, SendPipelineInputMessages inMessages, XLANGMessage outMsg)”

    Please Help

    Regards
    Gaurav

  15. Ravin says:

    Hi Abdul
    Great Article.
    I deployed the solution successfully.
    But I am getting this error:”ShemasAndPipelines.ReceiveCustOrdres”. Error details: “Unexpected data found while looking for:
    ‘\r\n’
    The current definition being parsed is Header. The stream offset where the error occured is 0. The line number where the error occured is 1. The column where the error occured is 0.”.

    ==========================================
    I have created on Receive Location with these configuration :
    type: FILE
    Pipeline: ReceiveCustOrders
    File Mask: *.txt

  16. Abdul Rahiman says:

    Hi Abdul,

    I followed your article and code as well But I am getting following error in loopthroughBatch . error X2109: use of unassigned local variable ‘ReceivePipelineOutput’ at ReceivePipelineOutput.MoveNext() and ReceivePipelineOutput.GetCurrent(OrderSchema). We have already initialized already in executePipline by ReceivePipelineOutput =
    Microsoft.XLANGs.Pipeline.XLANGPipelineManager.ExecuteReceivePipeline(typeof(SchemasAndPipelines.ReceiveCustOrdersPipeline), OrdersInputInterchange); any suggestion Pl.

Leave a comment