|
Apply checkstyle, set fix version and priority.
This is not a fix, read my comments on http://fisheye.codehaus.org/changelog/mule/?cs=6972
I see, I fixed the leak in the non-transacted case, but then broke the transacted case.
So, help me out here. I could do something similar to the JmsMessageDispatcher where I only close the session if it's not in a transacted or cached? It's too bad we have to open a JMS session at all in the AbstractJmsTransformer at all, but I guess that's the only way to create the JmsMessage ? Please refer to case 1784 for potential fix.
I checked in a fix, which is similar to my previous fix but handles the case where the session in a transaction. All jms tests passed, including the transaction tests. This fix is for a customer case (00001091). I checked the fix into the sandbox this time, since it's a 1.4.0 fix that needs to be merged to trunk. r7031 No problem Quoc. BTW my suggested fix is a code change to AbstractJmsTransformer.transformToMessage, not a config workaround. I believe this case to be synonynous to both 1079 and 1784.
Here is my suggested fix, I have tested with both no transaction and resource local transaction (haven't tested with XA yet but I will). I'd be happy to try out your proposed fix, if you can post it here (I don't have subversion)? protected Message transformToMessage(Object src) throws TransformerException { Session session = null; if (src instanceof Message) { result = (Message) src; result.clearProperties(); } else { session = this.getSession(); result = JmsMessageUtils.toMessage(src, session); } // set the event properties on the Message this.setJmsProperties(ctx.getMessage(), result); return result; Justin, Yes, looks like we have roughly the same fix, except I just used a slightly different criteria for whether the session is in a transcation. (Your criteria may actually be better). Here's the fix I proposed: http://svn.codehaus.org/mule/trunk/mule-sandbox/transports/jms-leak-1.4.0/ I also had to modify one of the test cases slightly, that's nested down in the url above. If you could test, that would be great. I will do the same and test your fix. Thanks! Thanks for sending that through Quoc. I tested your fix in our test env with XA transaction, resource (i.e. websphere MQ) transaction and no transaction and it works in all three cases, so well done! I believe that our fixes are equivalent, so no need to bother testing mine, just check yours in, I hope we'll see this in the Mule 1.4.2 release!
BTW session caching is performed in the message dispatcher, as such it is not available in the jms connector, so unless the jms connector is reworked to implement session caching you can't access the cached session from the transformer. Session caching is only applicable if there is no transaction available. Another interesting thing I discovered is that the jms receiver endpoint also invokes the ObjectToJmsMessage response transformer by default, even if you set synchronous="false" on the endpoint, seems to be redundant in that case? Cheers, Justin Sounds good, yes I will merge the change to trunk and hopefully it gets into 1.4.2. I appreciate the help and insight, Justin. That's great to hear the WebSphere tests also pass. Btw, the customer also verified the fix in their WebLogic environment. Hi,
I tried this new code of Transformer, digged in the Mule 14.1 code and I think JMS session management should be redesigned again. Probably my note is out of scope of this bug, but I cannot see any working solution if only transformer code is changed. We're using Mule&JMS in a few applications that use WebSphere MQ queues and mostlty work according to scenarios where bug fix does not work: 1. Event from Transacted JMS Receiver -> JMS Transformer -> JMS Dispatch (one or n-times) 2. Event from somewhere else -> JMS Transformer -> JMS Dispatch (one or n-times) without transaction 3. Event from somewhere else -> JMS Transformer -> JMS Dispatch (one or n-times) within transaction. I felt a bit lost, so tried to find solution somewhere else. So I looked at JMSConnector - and I think this class is right the place for storing JMS sessions. They should be kept like JDBC Connections in the managed pool. The number of sessions needed is a count of concurrent JMS dispatchers and probably depends on threading profile of Mule components. I tried to do quite primitive pool that controls creation of JMS sessions. Here's the code. Better implementations could decorate JmsTransaction class to release session to the pool. import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.jms.JMSException; import javax.jms.Session; import org.mule.providers.ConnectException; import org.mule.providers.jms.JmsConnector; import org.mule.transaction.TransactionCoordination; import org.mule.umo.TransactionException; import org.mule.umo.UMOTransaction; public class SessionPoolJmsConnector extends JmsConnector { private Map<Thread, Session> sessionPool = new ConcurrentHashMap<Thread, Session>(); @Override public Session getSession(boolean transacted, boolean topic) throws JMSException { if (!isConnected()) { throw new JMSException("Not connected"); } Session session = getSessionFromTransaction(); if (session != null) { return session; } // find session session = sessionPool.get(Thread.currentThread()); if (session == null) { // bind to thread session = super.getSession(transacted, topic); sessionPool.put(Thread.currentThread(), session); } else if (transacted) { doBindSessionToTransaction(session); } return session; } private final void doBindSessionToTransaction(Session session) { UMOTransaction tx = TransactionCoordination.getInstance() .getTransaction(); if (tx != null) { logger.debug("Binding session to current transaction"); try { tx.bindResource(getConnection(), session); } catch (TransactionException e) { throw new RuntimeException( "Could not bind session to current transaction", e); } } } private final void clearSessionPool() { for (Iterator<Thread> it = sessionPool.keySet().iterator(); it .hasNext();) { Thread thread = it.next(); it.remove(); closeQuietly(sessionPool.get(thread)); } } @Override protected void doDisconnect() throws ConnectException { clearSessionPool(); super.doDisconnect(); } @Override protected void doDispose() { clearSessionPool(); super.doDispose(); } } I've had to take the following change to the ObjectToJmsMessage transformer:
Additional code is underlined as inserted. Prior to ths fix the transacted flag was not set correctly in a case when this transformer was first to obtain the new session. /**
protected final Message getResult(Object src) throws JMSException, TransformerException { boolean transacted = false; // Figure out if the session comes from a Transaction or Cache. This // TODO: JmsMessageDispatcher has similar code to figure out if session is in transaction Session session = null; JmsConnector connector = (JmsConnector) endpoint.getConnector(); session = connector.getSessionFromTransaction(); if (session != null) { transacted = true; } else { session = this.getSession(); UMOTransaction tx = TransactionCoordination.getInstance().getTransaction(); } } // TODO: figure out if JmsSession caching is on. Since it's off by default, try { return JmsMessageUtils.toMessage(src, session); } finally { if (session != null && !transacted && !cached) { session.close(); } } } We also found a similar problem to what Leon just posted in JMSMessageDispatcher dispatchMessage method when used with a non-JMS receiver (e.g. DB).
The JMSMessageDispatcher checks the endpoint configuration to determine if it is transacted. This means that it is neccessary to add an ALWAYS_JOIN transaction on the jms endpoint, or else it wont participate in the transaction. The solution is to check if the connector has been registered in the transaction (by connector.getSession), then it will be automatically enlisted without need to add a transaction on the endpoint. See the underlined code below... private UMOMessage dispatchMessage(UMOEvent event) throws Exception if (logger.isDebugEnabled()) { logger.debug("dispatching on endpoint: " + event.getEndpoint().getEndpointURI() + ". Event id is: " + event.getId()); } try // If a transaction is running, we can not receive any messages } else { session = connector.getSession(event.getEndpoint()); cachedSession = session; } } /* if (event.getEndpoint().getTransactionConfig().isTransacted()) { transacted = true; }*/ Justin, please file separate issues for the above, and use patches.
Quoc,
Where do we stand with this issue. How long would it take to finish up the work? Thank you, Hi Quoc
Andrew Calafato and I have also stumbled across this problem. Seeing that you're swamped do you need a hand? Thanks Hi Quoc
I made some changes and put the session on the thread, to avoid opening and closing of sessions in the transformer. The session leak seems to be fixed. However, I'm having problems with the xa transactions not picking up all the messages on the queue. I'm not sure whether this is related so I'm not committing anything at this time. In case it helps anyone, I would like to volunteer to be a tester of the fix for this - we have MQ with and without XA and can run volume tests (10,000s of messages) on both cases. And we have a build environment of the current 1.4.4 trunk for patching/debugging/etc.
Update: The leak seems to be fixed and the XA transactions are working fine but I broke the topics. Will be working on them next. Since I put the session on the thread, all messageListeners that are on the thread get registered with the session and this blows up the topics.
http://fisheye.codehaus.org/changelog/mule/?cs=9693
Comments inlined in the changeset |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
The AbstractJmsTransformer was opening a JMS session but not closing it. We should also look into an alternative to creating a JMS session just in the transformer (to reduce overhead)
Fix committed: r6970