Mule
  1. Mule
  2. MULE-5749

Expected size of the number of messages to aggregate being set to -1 when using a jms reply-to channel

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 3.2.x
    • Fix Version/s: None
    • Component/s: Transport: JMS
    • Labels:
      None
    • Environment:

      Windows 7

    • User impact:
      Medium
    • Configuration:
      Hide

      <?xml version="1.0" encoding="UTF-8"?>
      <mule xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:spring="http://www.springframework.org/schema/beans"
      xmlns:context="http://www.springframework.org/schema/context"
      xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:script="http://www.mulesoft.org/schema/mule/scripting"
      xsi:schemaLocation="
      http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
      http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
      http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.2/mule.xsd
      http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/3.2/mule-http.xsd
      http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/3.2/mule-scripting.xsd">

      <endpoint name="inboundAggregatorFlow" address="http://localhost:8990" />
      <endpoint name="outboundSplitFlow" address="jms://out.flow.split" />
      <endpoint name="replyAggregatorFlow" address="jms://reply.flow.aggregator" />

      <flow name="asyncReplyFlowCustomAggregator">
      <inbound-endpoint ref="inboundAggregatorFlow"
      exchange-pattern="request-response" responseTimeout="1000000" />

      <expression-transformer evaluator="groovy"
      expression="Arrays.asList(payload.replace('/async/?payload=', '').split(','))" />

      <request-reply timeout="1000000">
      <outbound-endpoint ref="outboundSplitFlow"
      exchange-pattern="one-way">
      <collection-splitter
      enableCorrelation="IF_NOT_SET" />
      </outbound-endpoint>
      <inbound-endpoint ref="replyAggregatorFlow">
      <custom-aggregator
      class="com.mulesoft.custom.aggregator.CustomSortingAggregator" timeout="1000000"/>
      </inbound-endpoint>
      </request-reply>

      </flow>

      <flow name="processorFlowCustomAggregator">
      <inbound-endpoint ref="outboundSplitFlow"
      exchange-pattern="one-way" />

      <script:component>
      <script:script engine="groovy">
      Integer.parseInt(payload)
      </script:script>
      </script:component>

      </flow>

      </mule>

      Show
      <?xml version="1.0" encoding="UTF-8"?> <mule xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:script="http://www.mulesoft.org/schema/mule/scripting" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.2/mule.xsd http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/3.2/mule-http.xsd http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/3.2/mule-scripting.xsd "> <endpoint name="inboundAggregatorFlow" address="http://localhost:8990" /> <endpoint name="outboundSplitFlow" address="jms://out.flow.split" /> <endpoint name="replyAggregatorFlow" address="jms://reply.flow.aggregator" /> <flow name="asyncReplyFlowCustomAggregator"> <inbound-endpoint ref="inboundAggregatorFlow" exchange-pattern="request-response" responseTimeout="1000000" /> <expression-transformer evaluator="groovy" expression="Arrays.asList(payload.replace('/async/?payload=', '').split(','))" /> <request-reply timeout="1000000"> <outbound-endpoint ref="outboundSplitFlow" exchange-pattern="one-way"> <collection-splitter enableCorrelation="IF_NOT_SET" /> </outbound-endpoint> <inbound-endpoint ref="replyAggregatorFlow"> <custom-aggregator class="com.mulesoft.custom.aggregator.CustomSortingAggregator" timeout="1000000"/> </inbound-endpoint> </request-reply> </flow> <flow name="processorFlowCustomAggregator"> <inbound-endpoint ref="outboundSplitFlow" exchange-pattern="one-way" /> <script:component> <script:script engine="groovy"> Integer.parseInt(payload) </script:script> </script:component> </flow> </mule>
    • Similar Issues:
      MULE-8590Define behavior to aggregate collections splitted with group size set as unknown
      MULE-5735async-reply (services) and reqeust-reply (flows) fail in cluster
      MULE-7329Number of JMS consumers decreases to 1 after reconnection
      MULE-7195Incorrect message redelivery policy when using JMS
      MULE-1987Reply-to property being handled to early in message flow
      MULE-8589Collection Splitter should set group size when auto-paging is available
      MULE-341FilteringListMessageSplitter does not set correlation group size
      MULE-5020Support other messaging patterns for aggregation
      MULE-2282Unnecessary JMS Temp queue creation for no reply situation
      MULE-4061JMS response picking the qrong message from reply queue when calling from Webservice (axis inbound)

      Description

      The config in this jira can be found in the test project at:
      http://dev.ee.mulesource.com/repos/mulesource/sandbox/mule-tests-cluster-manual/

      The issue is that when using a jms reply-to channel and a custom correlation aggregator to collect the split messages, the expected size of the messages to collect is being set to -1 by Mule. That field is being correctly set to the number of messages which were split when the vm transport is used instead (in the config in this jira, edit the 2 global endpoints "outboundSplitFlow" and "replyAggregatorFlow" to use vm).

      Description of the config:

      A collection splitter is used to split the incoming list of comma separated integers (e.g. 2,1,4,3). The individual messages are sent to a flow with a component to turn them to an Integer. They are then sent to the reply-to channel where they are aggregated and sorted by the custom aggregator. The custom aggregator is included as an attachment to this jira.

        Activity

        Hide
        Mike Schilling added a comment -

        In both cases, the "expected size" is set the same way: from the MULE_CORRELATION_GROUP_SIZE property of the message. In the VM case, the property is simply carried along with the message as it moves into and out of the queue. Can you see where it's getting lost in the JMS case?

        Show
        Mike Schilling added a comment - In both cases, the "expected size" is set the same way: from the MULE_CORRELATION_GROUP_SIZE property of the message. In the VM case, the property is simply carried along with the message as it moves into and out of the queue. Can you see where it's getting lost in the JMS case?

          People

          • Assignee:
            Unassigned
            Reporter:
            Justin Calleja
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:

              Development