Details

  • Type: New Feature New Feature
  • Status: Open Open
  • Priority: To be reviewed To be reviewed
  • Resolution: Unresolved
  • Affects Version/s: 3.0.0-M4
  • Fix Version/s: None
  • Labels:
    None
  • Environment:

    All

  • User impact:
    Low
  • Migration Impact:
    none, the implementation is backwards compatible with current implementation when enableDuplicateCorrelationIds is set to false (the default)
  • Similar Issues:
    None

Description

Current aggregation implementation limits the aggregation pattern to the scatter and gather pattern where in 1 message is distributed to several locations and responses are aggregated. While this is probably the most typical use case there are cases where additional support would be useful and added very trivially. Specifically, for the case where correlation ids are repeatable for continuous updates of a particular event. For instance if there was a system designed to Track temperature over a period of time every 30s for a particular zip code (correlation id) but the client only wants to be notified during a particular event (say a swing by 10 degrees) or after a given period of time (say 10 minutes). To handle this currently I have to Extend EventCorrelator, overwrite isGroupAlreadyProcessed to always return false, modified enableTimeoutMonitor to run my own version of ExpiringGroupWork which disables all of the blocks concerning expiredAndDispatched variable that would have stopped events from being processed that had the same correlationId. While this works it seems a bit excessive and ugly and could be made much simpler by adding an additional field (enableDuplicateCorrelationIds) which if set would run the default implementation and if not would allow for duplicate correlationIds to be processed.

I have included what my extended class looks like below for reference which 95% is taken from the base EventCorrelator

import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkException;

import org.mule.DefaultMuleEvent;
import org.mule.DefaultMuleSession;
import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.MuleMessage;
import org.mule.api.routing.MessageInfoMapping;
import org.mule.api.service.Service;
import org.mule.config.i18n.CoreMessages;
import org.mule.context.notification.RoutingNotification;
import org.mule.routing.CorrelationTimeoutException;
import org.mule.routing.EventCorrelator;
import org.mule.routing.EventCorrelatorCallback;
import org.mule.routing.inbound.EventGroup;
import org.mule.util.monitor.Expirable;
import org.mule.util.monitor.ExpiryMonitor;

import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**

  • This is an extension of the base EventCorrelator which does not drop messages
  • with IDs that have already been processed.
  • @author jjohnson
    *
    */
    public class AlwaysFireEventCorrelator extends EventCorrelator {

private EventCorrelatorCallback callback;
private MessageInfoMapping messageInfoMapping;
private MuleContext context;
private AtomicBoolean timerStarted = new AtomicBoolean(false);
private boolean enableDuplicateCorrelationIds = false;
public AlwaysFireEventCorrelator(EventCorrelatorCallback callback,
MessageInfoMapping messageInfoMapping, MuleContext context) { super(callback, messageInfoMapping, context); this.callback = callback; this.messageInfoMapping = messageInfoMapping; this.context = context; }
@Override
protected boolean isGroupAlreadyProcessed(Object id) { //We always want to process the data, even if the group has already been sent. if(enableDuplicateCorrelationIds) return false; else return super.isGroupAlreadyProcessed(id); }

@Override
public void enableTimeoutMonitor() throws WorkException
{
if (timerStarted.get())

{ return; }

this.context.getWorkManager().scheduleWork(new ExpiringGroupWork());
}

public void setEnableDuplicateCorrelationIds(
boolean enableDuplicateCorrelationIds) { this.enableDuplicateCorrelationIds = enableDuplicateCorrelationIds; }
public boolean isEnableDuplicateCorrelationIds() { return enableDuplicateCorrelationIds; }

private final class ExpiringGroupWork implements Work, Expirable
{
private static final long ONE_DAY_IN_MILLI = 1000 * 60 * 60 * 24;
protected long groupTimeToLive = ONE_DAY_IN_MILLI;
private ExpiryMonitor expiryMonitor;

/**

  • A map with keys = group id and values = group creation time
    */
    private Map expiredAndDispatchedGroups = new ConcurrentHashMap();

public ExpiringGroupWork()

{ this.expiryMonitor = new ExpiryMonitor("EventCorrelator", 1000 * 60); //clean up every 30 minutes this.expiryMonitor.addExpirable(1000 * 60 * 30, TimeUnit.MILLISECONDS, this); }

/**

  • Removes the elements in expiredAndDispatchedGroups when groupLife is reached
    */
    public void expired()
    {
    if(!enableDuplicateCorrelationIds){
    for (Object o : expiredAndDispatchedGroups.keySet())
    {
    Long time = (Long) expiredAndDispatchedGroups.get(o);
    if (time + groupTimeToLive < System.currentTimeMillis())
    Unknown macro: { expiredAndDispatchedGroups.remove(o); logger.warn(MessageFormat.format("Discarding group {0}", o));
    }
    }
    }
    }

    public void release()
    { //no op }

    public void run()
    {
    while (true)
    {
    List<EventGroup> expired = new ArrayList<EventGroup>(1);
    for (Object o : eventGroups.values())
    {
    EventGroup group = (EventGroup) o;
    if ((group.getCreated() + getTimeout() * MILLI_TO_NANO_MULTIPLIER) < System.nanoTime())
    { expired.add(group); }
    }
    if (expired.size() > 0)
    {
    for (Object anExpired : expired)
    {
    EventGroup group = (EventGroup) anExpired;
    eventGroups.remove(group.getGroupId());
    locks.remove(group.getGroupId());

    final Service service = group.toArray()[0].getService();

    if (isFailOnTimeout())
    { context.fireNotification(new RoutingNotification(group.toMessageCollection(), null, RoutingNotification.CORRELATION_TIMEOUT)); service.getExceptionListener().exceptionThrown( new CorrelationTimeoutException(CoreMessages.correlationTimedOut(group.getGroupId()), group.toMessageCollection())); }
    else
    {
    if (logger.isDebugEnabled())
    {
    logger.debug(MessageFormat.format(
    "Aggregator expired, but ''failOnTimeOut'' is false. Forwarding {0} events out of {1} " + "total for group ID}

try
{
if (!(group.getCreated() + groupTimeToLive < System.currentTimeMillis()))
{
MuleMessage msg = callback.aggregateEvents(group);
MuleEvent newEvent = new DefaultMuleEvent(msg, group.toArray()[0].getEndpoint(),
new DefaultMuleSession(service, context), false);

if (!expiredAndDispatchedGroups.containsKey(group.getGroupId()) || enableDuplicateCorrelationIds)

{ // TODO which use cases would need a sync reply event returned? service.dispatchEvent(newEvent); expiredAndDispatchedGroups.put(group.getGroupId(), group.getCreated()); }

else
{
logger.warn(MessageFormat.format("Discarding group {0}", group.getGroupId()));
}
}
}
catch (Exception e)

{ service.getExceptionListener().exceptionThrown(e); }

}
}
}
try

{ Thread.sleep(100); }

catch (InterruptedException e)

{ break; }

}
}
}

}

Activity

There are no comments yet on this issue.

People

Vote (0)
Watch (2)

Dates

  • Created:
    Updated: