Mule
  1. Mule
  2. MULE-5860

Inserting <collection-splitter /><collection-aggregator /> into a flow produces unexpected results

    Details

    • User impact:
      High
    • Configuration:
      Hide

      <?xml version="1.0" encoding="UTF-8"?>
      <mule xmlns="http://www.mulesoft.org/schema/mule/core"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:spring="http://www.springframework.org/schema/beans"
      xmlns:stdio="http://www.mulesoft.org/schema/mule/stdio"
      xmlns:quartz="http://www.mulesoft.org/schema/mule/quartz"
      xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting"
      xsi:schemaLocation="
      http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.2/mule.xsd
      http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
      http://www.mulesoft.org/schema/mule/stdio http://www.mulesoft.org/schema/mule/stdio/3.2/mule-stdio.xsd
      http://www.mulesoft.org/schema/mule/quartz http://www.mulesoft.org/schema/mule/quartz/3.2/mule-quartz.xsd
      http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/3.2/mule-scripting.xsd
      ">

      <quartz:connector name="quartzConnector">
      <quartz:factory-property key="org.quartz.scheduler.instanceName"
      value="quartzInstance" />
      <quartz:factory-property key="org.quartz.threadPool.class"
      value="org.quartz.simpl.SimpleThreadPool" />
      <quartz:factory-property key="org.quartz.threadPool.threadCount"
      value="3" />
      <quartz:factory-property key="org.quartz.scheduler.rmi.proxy"
      value="false" />
      <quartz:factory-property key="org.quartz.scheduler.rmi.export"
      value="false" />
      <quartz:factory-property key="org.quartz.jobStore.class"
      value="org.quartz.simpl.RAMJobStore" />
      </quartz:connector>

      <flow name="test">
      <quartz:inbound-endpoint name="timeTrigger"
      jobName="myJob" repeatInterval="6000" connector-ref="quartzConnector">
      <quartz:event-generator-job />
      </quartz:inbound-endpoint>

      <scripting:transformer>
      <scripting:script engine="groovy">
      <scripting:text>
      def foo = [1, 2, 3, 4, 5]
      message.setPayload(foo)
      return message
      </scripting:text>
      </scripting:script>
      </scripting:transformer>

      <collection-splitter />

      <collection-aggregator />

      <scripting:transformer>
      <scripting:script engine="groovy">
      <scripting:text>
      def foo = []
      for (def i = 0; i < payload.size(); i++)

      { def element = payload.get(i) foo.add(element * element) }

      println "\ntransformed: " + foo
      return foo
      </scripting:text>
      </scripting:script>
      </scripting:transformer>

      <stdio:outbound-endpoint system="OUT" />
      </flow>
      </mule>

      Show
      <?xml version="1.0" encoding="UTF-8"?> <mule xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:stdio="http://www.mulesoft.org/schema/mule/stdio" xmlns:quartz="http://www.mulesoft.org/schema/mule/quartz" xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" xsi:schemaLocation=" http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.2/mule.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.mulesoft.org/schema/mule/stdio http://www.mulesoft.org/schema/mule/stdio/3.2/mule-stdio.xsd http://www.mulesoft.org/schema/mule/quartz http://www.mulesoft.org/schema/mule/quartz/3.2/mule-quartz.xsd http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/3.2/mule-scripting.xsd "> <quartz:connector name="quartzConnector"> <quartz:factory-property key="org.quartz.scheduler.instanceName" value="quartzInstance" /> <quartz:factory-property key="org.quartz.threadPool.class" value="org.quartz.simpl.SimpleThreadPool" /> <quartz:factory-property key="org.quartz.threadPool.threadCount" value="3" /> <quartz:factory-property key="org.quartz.scheduler.rmi.proxy" value="false" /> <quartz:factory-property key="org.quartz.scheduler.rmi.export" value="false" /> <quartz:factory-property key="org.quartz.jobStore.class" value="org.quartz.simpl.RAMJobStore" /> </quartz:connector> <flow name="test"> <quartz:inbound-endpoint name="timeTrigger" jobName="myJob" repeatInterval="6000" connector-ref="quartzConnector"> <quartz:event-generator-job /> </quartz:inbound-endpoint> <scripting:transformer> <scripting:script engine="groovy"> <scripting:text> def foo = [1, 2, 3, 4, 5] message.setPayload(foo) return message </scripting:text> </scripting:script> </scripting:transformer> <collection-splitter /> <collection-aggregator /> <scripting:transformer> <scripting:script engine="groovy"> <scripting:text> def foo = [] for (def i = 0; i < payload.size(); i++) { def element = payload.get(i) foo.add(element * element) } println "\ntransformed: " + foo return foo </scripting:text> </scripting:script> </scripting:transformer> <stdio:outbound-endpoint system="OUT" /> </flow> </mule>
    • Log Output:
      Hide
      INFO 2011-10-31 14:42:12,070 [main] org.mule.DefaultMuleContext:
      **********************************************************************
      * Mule ESB and Integration Platform *
      * Version: 3.2.0 Build: 22917 *
      * MuleSoft, Inc. *
      * For more information go to http://www.mulesoft.org *
      * *
      * Server started: 10/31/11 2:42 PM *
      * Server ID: 215609bd-03c6-11e1-b1a3-3d96c762086b *
      * JDK: 1.6.0_26 (mixed mode) *
      * OS encoding: UTF-8, Mule encoding: UTF-8 *
      * OS: Mac OS X (10.6.8, x86_64) *
      * Host: myhost (x.x.x.x) *
      * *
      * Agents Running: *
      * JMX Agent *
      **********************************************************************

      [9, 4, 1, 16, 25]
      INFO 2011-10-31 14:42:12,589 [connector.stdio.mule.default.dispatcher.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'connector.stdio.mule.default.dispatcher.1619081930'. Object is: StdioMessageDispatcher
      INFO 2011-10-31 14:42:12,589 [connector.stdio.mule.default.dispatcher.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'connector.stdio.mule.default.dispatcher.1619081930'. Object is: StdioMessageDispatcher
      [3, 2, 1, 4, 5]
      [9, 25, 16, 1, 4]
      [3, 5, 4, 1, 2]
      [1, 4, 16, 9, 25]
      [1, 2, 4, 3, 5]
      Show
      INFO 2011-10-31 14:42:12,070 [main] org.mule.DefaultMuleContext: ********************************************************************** * Mule ESB and Integration Platform * * Version: 3.2.0 Build: 22917 * * MuleSoft, Inc. * * For more information go to http://www.mulesoft.org * * * * Server started: 10/31/11 2:42 PM * * Server ID: 215609bd-03c6-11e1-b1a3-3d96c762086b * * JDK: 1.6.0_26 (mixed mode) * * OS encoding: UTF-8, Mule encoding: UTF-8 * * OS: Mac OS X (10.6.8, x86_64) * * Host: myhost (x.x.x.x) * * * * Agents Running: * * JMX Agent * ********************************************************************** [9, 4, 1, 16, 25] INFO 2011-10-31 14:42:12,589 [connector.stdio.mule.default.dispatcher.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'connector.stdio.mule.default.dispatcher.1619081930'. Object is: StdioMessageDispatcher INFO 2011-10-31 14:42:12,589 [connector.stdio.mule.default.dispatcher.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'connector.stdio.mule.default.dispatcher.1619081930'. Object is: StdioMessageDispatcher [3, 2, 1, 4, 5] [9, 25, 16, 1, 4] [3, 5, 4, 1, 2] [1, 4, 16, 9, 25] [1, 2, 4, 3, 5]
    • Similar Issues:
      MULE-6104request-reply router, collection-splitter and collection-aggregator triplet bug
      MULE-4112Collection aggregator router seems to require the inbound-endpoint to be synchronous
      MULE-8589Collection Splitter should set group size when auto-paging is available
      MULE-5883Session Properties being lost inside a collection splitter
      MULE-5844Mule loosing session variables after a collection spllitter / aggregator
      MULE-6067<collection-splitter> should handle arrays
      MULE-5534CLONE - Message modifications are discarded when using Collection Aggregator
      MULE-6742Inconsistent For Each scope implementation
      MULE-5818Description of enableCorrelation attribute in Collection Splitter Doc page
      MULE-8590Define behavior to aggregate collections splitted with group size set as unknown
    • Sprint:
      Sprint 4, Studio Sprint 1

      Description

      The default behaviour of collection-aggregator seems strange in Mule 3.2.0 and 3.1.2;
      intuitively, this is a bug (or class of bugs).
      If the following is correct or partially correct then there is probably room for improvement in the docs.

      Take the following structure:

      <flow ...
      ...
      <collection-splitter />
      ...
      <collection-aggregator />

      <transformer ...

      <outbound-endpoint ...
      </flow>

      • The above flow will afaics output what is emitted by the collection-aggregator, ignoring any latter transfomer before the outbound-endpoint.
      • If the collection-splitter emits sequence ids for the correlation group (e.g. if it's fed a list) the collection-aggregator doesn't seem to make use of it by default. Of course, a collection splitter might also be fed other payload types with no inherent ordering such as sets, where ordered output should not be expected.
        There seems to have been related issues in the past as well, e.g. MULE-5844

      In the attached config, the output of the outbound-endpoint and the return value of the last transformer are different. And, as per the second point above, the order of elements in the payload is different between the same constant flow input.
      If the splitter and aggregator are commeted out, they will show the same message payloads, and the ordering is constant when the input is constant.

        Issue Links

          Activity

          Hide
          Mariano Capurro added a comment -

          Having similar issues with a different configuration. Configuring the transformer at outbound endpoint level with transformer-refs is not working either.
          Everything works if the outbound-endpoint is request-response.

          Show
          Mariano Capurro added a comment - Having similar issues with a different configuration. Configuring the transformer at outbound endpoint level with transformer-refs is not working either. Everything works if the outbound-endpoint is request-response .
          Hide
          Tobias Stjernefeldt added a comment -

          I took another look at this and there seems to be multiple issues in both Mule 2.x (MULE-4213) and 3.x manifesting the same underlying problems.

          Afaics the implementation of DefaultMessageCollection seems to be confused whether it is a message or a collection of messages - it contains both

          • private transient Object payload; inherited from DefaultMuleMessage, and
          • private List messageList = new CopyOnWriteArrayList(); the messages in the list contain their own payload objects

          Both inside and outside the class there is then code assuming one way or another, eg.
          the inherited getPayload() returns the internal payload object, whereas
          getPayload(Class) returns a list of payload values taken from the messageList object.

          So what seems to happen in the case of this ticket:

          • collection-aggregator produces a DefaultMessageCollection message
          • transformations: these normally modify the internal payload object
          • outbound-endpoint uses methods that gets payload out of the messageList objects

          I think there are two design issues here:

          • Should collection-aggregator emit a message with a collection payload, or a collection of messages? The latter seems intuitively like a tight coupling to the configuration of previous stages in the flow.
          • What is the payload of a DefaultMessageCollection?
          Show
          Tobias Stjernefeldt added a comment - I took another look at this and there seems to be multiple issues in both Mule 2.x ( MULE-4213 ) and 3.x manifesting the same underlying problems. Afaics the implementation of DefaultMessageCollection seems to be confused whether it is a message or a collection of messages - it contains both private transient Object payload; inherited from DefaultMuleMessage, and private List messageList = new CopyOnWriteArrayList(); the messages in the list contain their own payload objects Both inside and outside the class there is then code assuming one way or another, eg. the inherited getPayload() returns the internal payload object, whereas getPayload(Class) returns a list of payload values taken from the messageList object. So what seems to happen in the case of this ticket: collection-aggregator produces a DefaultMessageCollection message transformations: these normally modify the internal payload object outbound-endpoint uses methods that gets payload out of the messageList objects I think there are two design issues here: Should collection-aggregator emit a message with a collection payload, or a collection of messages? The latter seems intuitively like a tight coupling to the configuration of previous stages in the flow. What is the payload of a DefaultMessageCollection?
          Hide
          Tobias Stjernefeldt added a comment -

          This issue can be worked around by reconciling the internal "payload" states in DefaultMessageCollection.
          E.g. in the case of
          <collection-aggregator/>
          <transformations... />
          <outbound-endpoint/>

          and if the transformations affect the internal payload object, and the outbound-endpoint receives the payloads in the internal messageList
          -then it's possible to insert a transformer before the endpoint that reconciles the internal payload states, something along the lines of this:

          	<scripting:transformer name="messagePayloadToCollectionPayload">
          		<scripting:script engine="groovy">
          			<scripting:text>
          				if (message instanceof org.mule.DefaultMessageCollection) {
          					def messages = message.getMessagesAsArray()
          					if (payload instanceof List) {
          						for (i = 0; i &lt; payload.size(); i++) {
          							p = payload.get(i)
          							m = messages[i]
          							m.setPayload(p)
          						}
          					}
          				}
          				return payload // by convention
          			</scripting:text>
          		</scripting:script>
          	</scripting:transformer>
          

          If you are in control of what transformers do it's of course also possible to write transformers that directly apply changes (by using the right DefaultMessageCollection methods) to the payload state that the outbound-endpoint will read later.

          NB I haven't done any extensive testing on this so there could be side effects or corner cases I'm not aware of.

          cheers
          Tobias

          Show
          Tobias Stjernefeldt added a comment - This issue can be worked around by reconciling the internal "payload" states in DefaultMessageCollection. E.g. in the case of <collection-aggregator/> <transformations... /> <outbound-endpoint/> and if the transformations affect the internal payload object, and the outbound-endpoint receives the payloads in the internal messageList -then it's possible to insert a transformer before the endpoint that reconciles the internal payload states, something along the lines of this: <scripting:transformer name="messagePayloadToCollectionPayload"> <scripting:script engine="groovy"> <scripting:text> if (message instanceof org.mule.DefaultMessageCollection) { def messages = message.getMessagesAsArray() if (payload instanceof List) { for (i = 0; i &lt; payload.size(); i++) { p = payload.get(i) m = messages[i] m.setPayload(p) } } } return payload // by convention </scripting:text> </scripting:script> </scripting:transformer> If you are in control of what transformers do it's of course also possible to write transformers that directly apply changes (by using the right DefaultMessageCollection methods) to the payload state that the outbound-endpoint will read later. NB I haven't done any extensive testing on this so there could be side effects or corner cases I'm not aware of. cheers Tobias
          Hide
          Daniel Feist added a comment -

          Hi Tobias,

          Can you give me an idea of what transformers you are trying to apply to the MuleMessageCollection?

          Show
          Daniel Feist added a comment - Hi Tobias, Can you give me an idea of what transformers you are trying to apply to the MuleMessageCollection?
          Hide
          Daniel Feist added a comment -

          Also, if I were to make the change (and update usages) so that getPayload() is overwritten and behaves similarly to getPayload(Class), do you have a test case that I could run to ensure this fixes your issue?

          Show
          Daniel Feist added a comment - Also, if I were to make the change (and update usages) so that getPayload() is overwritten and behaves similarly to getPayload(Class), do you have a test case that I could run to ensure this fixes your issue?
          Hide
          Henryk Paluch added a comment -

          Hello!
          Please note that you can now use CombineCollectionTransformer instead of groovy script:

            
            ...
                  <custom-transformer class="org.mule.transformer.simple.CombineCollectionsTransformer" doc:name="Message[] to Message of []"/>
            ...
          
          Show
          Henryk Paluch added a comment - Hello! Please note that you can now use CombineCollectionTransformer instead of groovy script: ... <custom-transformer class="org.mule.transformer.simple.CombineCollectionsTransformer" doc:name="Message[] to Message of []"/> ...
          Hide
          Franco Gasperino added a comment -

          I have also encountered this:

          <script:transformer name="multiplier">
          <script:script engine="groovy">
          Object.sleep(1000);
          return payload * payload;
          </script:script>
          </script:transformer>

          <flow name="work1In">
          <description>
          <![CDATA[
          Work-1 Demo.

          This demonstrates a standard synchronous processing model of using a collection splitter. Each call to
          the flow work1Worker will be done serially.
          ]]>
          </description>

          <http:inbound-endpoint name="httpWork1In"
          host="127.0.0.1" port="8080"
          path="work-1" exchange-pattern="request-response">
          </http:inbound-endpoint>

          <expression-transformer evaluator="groovy" expression="1..10"/>

          <collection-splitter enableCorrelation="ALWAYS"/>
          <flow-ref name="work1Worker"/>
          <collection-aggregator/>
          <combine-collections-transformer/>

          <logger message="Aggregated payload: #[groovy:payload.size()] elements" level="ERROR"/>
          <logger message="Aggregated payload: #[payload]" level="ERROR"/>

          <json:object-to-json-transformer/>

          <logger message="" level="ERROR"/>
          <logger message="#[payload]" level="ERROR"/>

          <logger message="Done." level="ERROR"/>
          </flow>

          <flow name="work1Worker">
          <expression-transformer evaluator="message" expression="correlationSequence"/>
          <logger message="ID: #[payload]" level="ERROR"/>
          <transformer ref="multiplier"/>
          </flow>

          The response to the HTTP consumer is the result of the <collection-aggregator/>, and not the json-to-object-transformer.

          Show
          Franco Gasperino added a comment - I have also encountered this: <script:transformer name="multiplier"> <script:script engine="groovy"> Object.sleep(1000); return payload * payload; </script:script> </script:transformer> <flow name="work1In"> <description> <![CDATA[ Work-1 Demo. This demonstrates a standard synchronous processing model of using a collection splitter. Each call to the flow work1Worker will be done serially. ]]> </description> <http:inbound-endpoint name="httpWork1In" host="127.0.0.1" port="8080" path="work-1" exchange-pattern="request-response"> </http:inbound-endpoint> <expression-transformer evaluator="groovy" expression="1..10"/> <collection-splitter enableCorrelation="ALWAYS"/> <flow-ref name="work1Worker"/> <collection-aggregator/> <combine-collections-transformer/> <logger message="Aggregated payload: # [groovy:payload.size()] elements" level="ERROR"/> <logger message="Aggregated payload: # [payload] " level="ERROR"/> <json:object-to-json-transformer/> <logger message="" level="ERROR"/> <logger message="# [payload] " level="ERROR"/> <logger message="Done." level="ERROR"/> </flow> <flow name="work1Worker"> <expression-transformer evaluator="message" expression="correlationSequence"/> <logger message="ID: # [payload] " level="ERROR"/> <transformer ref="multiplier"/> </flow> The response to the HTTP consumer is the result of the <collection-aggregator/>, and not the json-to-object-transformer.
          Hide
          Daniel Feist added a comment -

          Increasing priority to 3.3 blocker given number of votes on this and issue it is causing.

          Show
          Daniel Feist added a comment - Increasing priority to 3.3 blocker given number of votes on this and issue it is causing.
          Hide
          Pablo Kraan added a comment -
          Show
          Pablo Kraan added a comment - For comments about the applied fix, please take a look at the revision description. Fix 3.1.x http://fisheye.codehaus.org/changelog/mule/?cs=24619 Fix 3.2.x http://fisheye.codehaus.org/changelog/mule/?cs=24620 Fix 3.3.x http://fisheye.codehaus.org/changelog/mule/?cs=24621 Fix 3.x http://fisheye.codehaus.org/changelog/mule/?cs=24622

            People

            • Assignee:
              Pablo Kraan
              Reporter:
              Tobias Stjernefeldt
            • Votes:
              11 Vote for this issue
              Watchers:
              14 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development

                  Agile