JIRA

  • Log In Access more options
    • Online Help
    • GreenHopper Help
    • Agile Answers
    • Use Agile By Default
    • Keyboard Shortcuts
    • About JIRA
    • JIRA Credits
    • What’s New
  • Dashboards Access more options (Alt+d)
  • Projects Access more options (Alt+p)
  • Issues Access more options (Alt+i)
  • Agile Access more options (Alt+g)
  • Create Issue
  • Mule
  • MULE-5860

Inserting <collection-splitter /><collection-aggregator /> into a flow produces unexpected results

  • Agile Board
  • More Actions
  • Views
    • XML
    • Word
    • Printable

Details

  • Type: Bug Bug
  • Status: Closed Closed
  • Priority: Critical Critical
  • Resolution: Fixed
  • Affects Version/s: 3.1.2, 3.2.0
  • Fix Version/s: 3.1.4 (EE only), 3.2.3 (EE only), 3.3.1
  • Component/s: Core: Transformers
  • Labels:
    • reviewed
    • targeted-3.3.1
  • User impact:
    High
  • Configuration:
    Hide

    <?xml version="1.0" encoding="UTF-8"?>
    <mule xmlns="http://www.mulesoft.org/schema/mule/core"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:spring="http://www.springframework.org/schema/beans"
    xmlns:stdio="http://www.mulesoft.org/schema/mule/stdio"
    xmlns:quartz="http://www.mulesoft.org/schema/mule/quartz"
    xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting"
    xsi:schemaLocation="
    http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.2/mule.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.mulesoft.org/schema/mule/stdio http://www.mulesoft.org/schema/mule/stdio/3.2/mule-stdio.xsd
    http://www.mulesoft.org/schema/mule/quartz http://www.mulesoft.org/schema/mule/quartz/3.2/mule-quartz.xsd
    http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/3.2/mule-scripting.xsd
    ">

    <quartz:connector name="quartzConnector">
    <quartz:factory-property key="org.quartz.scheduler.instanceName"
    value="quartzInstance" />
    <quartz:factory-property key="org.quartz.threadPool.class"
    value="org.quartz.simpl.SimpleThreadPool" />
    <quartz:factory-property key="org.quartz.threadPool.threadCount"
    value="3" />
    <quartz:factory-property key="org.quartz.scheduler.rmi.proxy"
    value="false" />
    <quartz:factory-property key="org.quartz.scheduler.rmi.export"
    value="false" />
    <quartz:factory-property key="org.quartz.jobStore.class"
    value="org.quartz.simpl.RAMJobStore" />
    </quartz:connector>

    <flow name="test">
    <quartz:inbound-endpoint name="timeTrigger"
    jobName="myJob" repeatInterval="6000" connector-ref="quartzConnector">
    <quartz:event-generator-job />
    </quartz:inbound-endpoint>

    <scripting:transformer>
    <scripting:script engine="groovy">
    <scripting:text>
    def foo = [1, 2, 3, 4, 5]
    message.setPayload(foo)
    return message
    </scripting:text>
    </scripting:script>
    </scripting:transformer>

    <collection-splitter />

    <collection-aggregator />

    <scripting:transformer>
    <scripting:script engine="groovy">
    <scripting:text>
    def foo = []
    for (def i = 0; i < payload.size(); i++) { def element = payload.get(i) foo.add(element * element) }
    println "\ntransformed: " + foo
    return foo
    </scripting:text>
    </scripting:script>
    </scripting:transformer>

    <stdio:outbound-endpoint system="OUT" />
    </flow>
    </mule>

    Show
    <?xml version="1.0" encoding="UTF-8"?> <mule xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:stdio="http://www.mulesoft.org/schema/mule/stdio" xmlns:quartz="http://www.mulesoft.org/schema/mule/quartz" xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" xsi:schemaLocation=" http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.2/mule.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.mulesoft.org/schema/mule/stdio http://www.mulesoft.org/schema/mule/stdio/3.2/mule-stdio.xsd http://www.mulesoft.org/schema/mule/quartz http://www.mulesoft.org/schema/mule/quartz/3.2/mule-quartz.xsd http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/3.2/mule-scripting.xsd "> <quartz:connector name="quartzConnector"> <quartz:factory-property key="org.quartz.scheduler.instanceName" value="quartzInstance" /> <quartz:factory-property key="org.quartz.threadPool.class" value="org.quartz.simpl.SimpleThreadPool" /> <quartz:factory-property key="org.quartz.threadPool.threadCount" value="3" /> <quartz:factory-property key="org.quartz.scheduler.rmi.proxy" value="false" /> <quartz:factory-property key="org.quartz.scheduler.rmi.export" value="false" /> <quartz:factory-property key="org.quartz.jobStore.class" value="org.quartz.simpl.RAMJobStore" /> </quartz:connector> <flow name="test"> <quartz:inbound-endpoint name="timeTrigger" jobName="myJob" repeatInterval="6000" connector-ref="quartzConnector"> <quartz:event-generator-job /> </quartz:inbound-endpoint> <scripting:transformer> <scripting:script engine="groovy"> <scripting:text> def foo = [1, 2, 3, 4, 5] message.setPayload(foo) return message </scripting:text> </scripting:script> </scripting:transformer> <collection-splitter /> <collection-aggregator /> <scripting:transformer> <scripting:script engine="groovy"> <scripting:text> def foo = [] for (def i = 0; i < payload.size(); i++) { def element = payload.get(i) foo.add(element * element) } println "\ntransformed: " + foo return foo </scripting:text> </scripting:script> </scripting:transformer> <stdio:outbound-endpoint system="OUT" /> </flow> </mule>
  • Log Output:
    Hide
    INFO 2011-10-31 14:42:12,070 [main] org.mule.DefaultMuleContext:
    **********************************************************************
    * Mule ESB and Integration Platform *
    * Version: 3.2.0 Build: 22917 *
    * MuleSoft, Inc. *
    * For more information go to http://www.mulesoft.org *
    * *
    * Server started: 10/31/11 2:42 PM *
    * Server ID: 215609bd-03c6-11e1-b1a3-3d96c762086b *
    * JDK: 1.6.0_26 (mixed mode) *
    * OS encoding: UTF-8, Mule encoding: UTF-8 *
    * OS: Mac OS X (10.6.8, x86_64) *
    * Host: myhost (x.x.x.x) *
    * *
    * Agents Running: *
    * JMX Agent *
    **********************************************************************

    [9, 4, 1, 16, 25]
    INFO 2011-10-31 14:42:12,589 [connector.stdio.mule.default.dispatcher.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'connector.stdio.mule.default.dispatcher.1619081930'. Object is: StdioMessageDispatcher
    INFO 2011-10-31 14:42:12,589 [connector.stdio.mule.default.dispatcher.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'connector.stdio.mule.default.dispatcher.1619081930'. Object is: StdioMessageDispatcher
    [3, 2, 1, 4, 5]
    [9, 25, 16, 1, 4]
    [3, 5, 4, 1, 2]
    [1, 4, 16, 9, 25]
    [1, 2, 4, 3, 5]
    Show
    INFO 2011-10-31 14:42:12,070 [main] org.mule.DefaultMuleContext: ********************************************************************** * Mule ESB and Integration Platform * * Version: 3.2.0 Build: 22917 * * MuleSoft, Inc. * * For more information go to http://www.mulesoft.org * * * * Server started: 10/31/11 2:42 PM * * Server ID: 215609bd-03c6-11e1-b1a3-3d96c762086b * * JDK: 1.6.0_26 (mixed mode) * * OS encoding: UTF-8, Mule encoding: UTF-8 * * OS: Mac OS X (10.6.8, x86_64) * * Host: myhost (x.x.x.x) * * * * Agents Running: * * JMX Agent * ********************************************************************** [9, 4, 1, 16, 25] INFO 2011-10-31 14:42:12,589 [connector.stdio.mule.default.dispatcher.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'connector.stdio.mule.default.dispatcher.1619081930'. Object is: StdioMessageDispatcher INFO 2011-10-31 14:42:12,589 [connector.stdio.mule.default.dispatcher.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'connector.stdio.mule.default.dispatcher.1619081930'. Object is: StdioMessageDispatcher [3, 2, 1, 4, 5] [9, 25, 16, 1, 4] [3, 5, 4, 1, 2] [1, 4, 16, 9, 25] [1, 2, 4, 3, 5]
  • Similar Issues:
    None

Description

The default behaviour of collection-aggregator seems strange in Mule 3.2.0 and 3.1.2;
intuitively, this is a bug (or class of bugs).
If the following is correct or partially correct then there is probably room for improvement in the docs.

Take the following structure:

<flow ...
...
<collection-splitter />
...
<collection-aggregator />

<transformer ...

<outbound-endpoint ...
</flow>

  • The above flow will afaics output what is emitted by the collection-aggregator, ignoring any latter transfomer before the outbound-endpoint.
  • If the collection-splitter emits sequence ids for the correlation group (e.g. if it's fed a list) the collection-aggregator doesn't seem to make use of it by default. Of course, a collection splitter might also be fed other payload types with no inherent ordering such as sets, where ordered output should not be expected.
    There seems to have been related issues in the past as well, e.g. MULE-5844

In the attached config, the output of the outbound-endpoint and the return value of the last transformer are different. And, as per the second point above, the order of elements in the payload is different between the same constant flow input.
If the splitter and aggregator are commeted out, they will show the same message payloads, and the ordering is constant when the input is constant.

Issue Links

is duplicated by

Bug - A problem which impairs or prevents the functions of the product. MULE-5745 <script:component>'s output ignored if used after an 'All' message processor

  • Critical - Crashes, loss of data, severe memory leak.
  • Closed - The issue is considered finished, the resolution is correct. Issues which are not closed can be reopened.
relates to

Bug - A problem which impairs or prevents the functions of the product. MULE-5534 CLONE - Message modifications are discarded when using Collection Aggregator

  • Critical - Crashes, loss of data, severe memory leak.
  • Closed - The issue is considered finished, the resolution is correct. Issues which are not closed can be reopened.

Bug - A problem which impairs or prevents the functions of the product. DOCS-43 Combine-collections-transformer is not a transformer so it should be moved from the Transformer Configuration Reference to the Routing Message Processors page

  • Critical - Crashes, loss of data, severe memory leak.
  • Closed - The issue is considered finished, the resolution is correct. Issues which are not closed can be reopened.

Bug - A problem which impairs or prevents the functions of the product. MULE-4213 Response transformer results are discarded when using multicasting router

  • Critical - Crashes, loss of data, severe memory leak.
  • Closed - The issue is considered finished, the resolution is correct. Issues which are not closed can be reopened.

Activity

Ascending order - Click to sort in descending order
  • All
  • Comments
  • Work Log
  • History
  • Activity
  • Transitions
  • Commits
  • Source
  • Builds
Hide
Permalink
Mariano Capurro added a comment - 13/Nov/11 11:30 AM

Having similar issues with a different configuration. Configuring the transformer at outbound endpoint level with transformer-refs is not working either.
Everything works if the outbound-endpoint is request-response.

Show
Mariano Capurro added a comment - 13/Nov/11 11:30 AM Having similar issues with a different configuration. Configuring the transformer at outbound endpoint level with transformer-refs is not working either. Everything works if the outbound-endpoint is request-response.
Hide
Permalink
Tobias Stjernefeldt added a comment - 29/Dec/11 06:06 AM

I took another look at this and there seems to be multiple issues in both Mule 2.x (MULE-4213) and 3.x manifesting the same underlying problems.

Afaics the implementation of DefaultMessageCollection seems to be confused whether it is a message or a collection of messages - it contains both

  • private transient Object payload; inherited from DefaultMuleMessage, and
  • private List messageList = new CopyOnWriteArrayList(); the messages in the list contain their own payload objects

Both inside and outside the class there is then code assuming one way or another, eg.
the inherited getPayload() returns the internal payload object, whereas
getPayload(Class) returns a list of payload values taken from the messageList object.

So what seems to happen in the case of this ticket:

  • collection-aggregator produces a DefaultMessageCollection message
  • transformations: these normally modify the internal payload object
  • outbound-endpoint uses methods that gets payload out of the messageList objects

I think there are two design issues here:

  • Should collection-aggregator emit a message with a collection payload, or a collection of messages? The latter seems intuitively like a tight coupling to the configuration of previous stages in the flow.
  • What is the payload of a DefaultMessageCollection?
Show
Tobias Stjernefeldt added a comment - 29/Dec/11 06:06 AM I took another look at this and there seems to be multiple issues in both Mule 2.x (MULE-4213) and 3.x manifesting the same underlying problems. Afaics the implementation of DefaultMessageCollection seems to be confused whether it is a message or a collection of messages - it contains both
  • private transient Object payload; inherited from DefaultMuleMessage, and
  • private List messageList = new CopyOnWriteArrayList(); the messages in the list contain their own payload objects
Both inside and outside the class there is then code assuming one way or another, eg. the inherited getPayload() returns the internal payload object, whereas getPayload(Class) returns a list of payload values taken from the messageList object. So what seems to happen in the case of this ticket:
  • collection-aggregator produces a DefaultMessageCollection message
  • transformations: these normally modify the internal payload object
  • outbound-endpoint uses methods that gets payload out of the messageList objects
I think there are two design issues here:
  • Should collection-aggregator emit a message with a collection payload, or a collection of messages? The latter seems intuitively like a tight coupling to the configuration of previous stages in the flow.
  • What is the payload of a DefaultMessageCollection?
Hide
Permalink
Tobias Stjernefeldt added a comment - 29/Dec/11 06:33 AM

This issue can be worked around by reconciling the internal "payload" states in DefaultMessageCollection.
E.g. in the case of
<collection-aggregator/>
<transformations... />
<outbound-endpoint/>

and if the transformations affect the internal payload object, and the outbound-endpoint receives the payloads in the internal messageList
-then it's possible to insert a transformer before the endpoint that reconciles the internal payload states, something along the lines of this:

	<scripting:transformer name="messagePayloadToCollectionPayload">
		<scripting:script engine="groovy">
			<scripting:text>
				if (message instanceof org.mule.DefaultMessageCollection) {
					def messages = message.getMessagesAsArray()
					if (payload instanceof List) {
						for (i = 0; i &lt; payload.size(); i++) {
							p = payload.get(i)
							m = messages[i]
							m.setPayload(p)
						}
					}
				}
				return payload // by convention
			</scripting:text>
		</scripting:script>
	</scripting:transformer>

If you are in control of what transformers do it's of course also possible to write transformers that directly apply changes (by using the right DefaultMessageCollection methods) to the payload state that the outbound-endpoint will read later.

NB I haven't done any extensive testing on this so there could be side effects or corner cases I'm not aware of.

cheers
Tobias

Show
Tobias Stjernefeldt added a comment - 29/Dec/11 06:33 AM This issue can be worked around by reconciling the internal "payload" states in DefaultMessageCollection. E.g. in the case of <collection-aggregator/> <transformations... /> <outbound-endpoint/> and if the transformations affect the internal payload object, and the outbound-endpoint receives the payloads in the internal messageList -then it's possible to insert a transformer before the endpoint that reconciles the internal payload states, something along the lines of this:
	<scripting:transformer name="messagePayloadToCollectionPayload">
		<scripting:script engine="groovy">
			<scripting:text>
				if (message instanceof org.mule.DefaultMessageCollection) {
					def messages = message.getMessagesAsArray()
					if (payload instanceof List) {
						for (i = 0; i &lt; payload.size(); i++) {
							p = payload.get(i)
							m = messages[i]
							m.setPayload(p)
						}
					}
				}
				return payload // by convention
			</scripting:text>
		</scripting:script>
	</scripting:transformer>
If you are in control of what transformers do it's of course also possible to write transformers that directly apply changes (by using the right DefaultMessageCollection methods) to the payload state that the outbound-endpoint will read later. NB I haven't done any extensive testing on this so there could be side effects or corner cases I'm not aware of. cheers Tobias
Hide
Permalink
Daniel Feist added a comment - 02/Feb/12 07:03 AM

Hi Tobias,

Can you give me an idea of what transformers you are trying to apply to the MuleMessageCollection?

Show
Daniel Feist added a comment - 02/Feb/12 07:03 AM Hi Tobias, Can you give me an idea of what transformers you are trying to apply to the MuleMessageCollection?
Hide
Permalink
Daniel Feist added a comment - 02/Feb/12 07:09 AM

Also, if I were to make the change (and update usages) so that getPayload() is overwritten and behaves similarly to getPayload(Class), do you have a test case that I could run to ensure this fixes your issue?

Show
Daniel Feist added a comment - 02/Feb/12 07:09 AM Also, if I were to make the change (and update usages) so that getPayload() is overwritten and behaves similarly to getPayload(Class), do you have a test case that I could run to ensure this fixes your issue?
Hide
Permalink
Henryk Paluch added a comment - 29/Feb/12 08:59 AM

Hello!
Please note that you can now use CombineCollectionTransformer instead of groovy script:

  
  ...
        <custom-transformer class="org.mule.transformer.simple.CombineCollectionsTransformer" doc:name="Message[] to Message of []"/>
  ...
Show
Henryk Paluch added a comment - 29/Feb/12 08:59 AM Hello! Please note that you can now use CombineCollectionTransformer instead of groovy script:
  
  ...
        <custom-transformer class="org.mule.transformer.simple.CombineCollectionsTransformer" doc:name="Message[] to Message of []"/>
  ...
Hide
Permalink
Franco Gasperino added a comment - 04/Mar/12 10:03 PM

I have also encountered this:

<script:transformer name="multiplier">
<script:script engine="groovy">
Object.sleep(1000);
return payload * payload;
</script:script>
</script:transformer>

<flow name="work1In">
<description>
<![CDATA[
Work-1 Demo.

This demonstrates a standard synchronous processing model of using a collection splitter. Each call to
the flow work1Worker will be done serially.
]]>
</description>

<http:inbound-endpoint name="httpWork1In"
host="127.0.0.1" port="8080"
path="work-1" exchange-pattern="request-response">
</http:inbound-endpoint>

<expression-transformer evaluator="groovy" expression="1..10"/>

<collection-splitter enableCorrelation="ALWAYS"/>
<flow-ref name="work1Worker"/>
<collection-aggregator/>
<combine-collections-transformer/>

<logger message="Aggregated payload: #[groovy:payload.size()] elements" level="ERROR"/>
<logger message="Aggregated payload: #[payload]" level="ERROR"/>

<json:object-to-json-transformer/>

<logger message="" level="ERROR"/>
<logger message="#[payload]" level="ERROR"/>

<logger message="Done." level="ERROR"/>
</flow>

<flow name="work1Worker">
<expression-transformer evaluator="message" expression="correlationSequence"/>
<logger message="ID: #[payload]" level="ERROR"/>
<transformer ref="multiplier"/>
</flow>

The response to the HTTP consumer is the result of the <collection-aggregator/>, and not the json-to-object-transformer.

Show
Franco Gasperino added a comment - 04/Mar/12 10:03 PM I have also encountered this: <script:transformer name="multiplier"> <script:script engine="groovy"> Object.sleep(1000); return payload * payload; </script:script> </script:transformer> <flow name="work1In"> <description> <![CDATA[ Work-1 Demo. This demonstrates a standard synchronous processing model of using a collection splitter. Each call to the flow work1Worker will be done serially. ]]> </description> <http:inbound-endpoint name="httpWork1In" host="127.0.0.1" port="8080" path="work-1" exchange-pattern="request-response"> </http:inbound-endpoint> <expression-transformer evaluator="groovy" expression="1..10"/> <collection-splitter enableCorrelation="ALWAYS"/> <flow-ref name="work1Worker"/> <collection-aggregator/> <combine-collections-transformer/> <logger message="Aggregated payload: #[groovy:payload.size()] elements" level="ERROR"/> <logger message="Aggregated payload: #[payload]" level="ERROR"/> <json:object-to-json-transformer/> <logger message="" level="ERROR"/> <logger message="#[payload]" level="ERROR"/> <logger message="Done." level="ERROR"/> </flow> <flow name="work1Worker"> <expression-transformer evaluator="message" expression="correlationSequence"/> <logger message="ID: #[payload]" level="ERROR"/> <transformer ref="multiplier"/> </flow> The response to the HTTP consumer is the result of the <collection-aggregator/>, and not the json-to-object-transformer.
Hide
Permalink
Daniel Feist added a comment - 16/Apr/12 09:22 PM

Increasing priority to 3.3 blocker given number of votes on this and issue it is causing.

Show
Daniel Feist added a comment - 16/Apr/12 09:22 PM Increasing priority to 3.3 blocker given number of votes on this and issue it is causing.
Hide
Permalink
Pablo Kraan added a comment - 15/Jul/12 08:57 AM

For comments about the applied fix, please take a look at the revision description.

Fix 3.1.x
http://fisheye.codehaus.org/changelog/mule/?cs=24619

Fix 3.2.x
http://fisheye.codehaus.org/changelog/mule/?cs=24620

Fix 3.3.x
http://fisheye.codehaus.org/changelog/mule/?cs=24621

Fix 3.x
http://fisheye.codehaus.org/changelog/mule/?cs=24622

Show
Pablo Kraan added a comment - 15/Jul/12 08:57 AM For comments about the applied fix, please take a look at the revision description. Fix 3.1.x http://fisheye.codehaus.org/changelog/mule/?cs=24619 Fix 3.2.x http://fisheye.codehaus.org/changelog/mule/?cs=24620 Fix 3.3.x http://fisheye.codehaus.org/changelog/mule/?cs=24621 Fix 3.x http://fisheye.codehaus.org/changelog/mule/?cs=24622

People

  • Assignee:
    Pablo Kraan
    Reporter:
    Tobias Stjernefeldt
Vote (11)
Watch (14)

Dates

  • Created:
    31/Oct/11 09:16 AM
    Updated:
    02/Aug/12 09:01 AM
    Resolved:
    15/Jul/12 08:58 AM

Agile

  • Completed Sprints:
    Sprint 4 ended 12/Jul/12
    Studio Sprint 1 ended 15/Feb/13
  • View on Board
  • Atlassian JIRA (v5.0.7#734-sha1:8ad78a6)
  • Report a problem
  • Powered by a free Atlassian JIRA open source license for MuleForge. Try JIRA - bug tracking software for your team.