/*
* $Header$
* $Revision$
* $Date$
* ------------------------------------------------------------------------------------------------------
*
* Copyright (c) SymphonySoft Limited. All rights reserved.
* http://www.symphonysoft.com
*
* The software in this package is published under the terms of the BSD
* style license a copy of which has been included with this distribution in
* the LICENSE.txt file.
*
*/
package org.mule.providers.tcp;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.net.URI;
import org.mule.config.i18n.Message;
import org.mule.impl.MuleMessage;
import org.mule.providers.ConnectException;
import org.mule.providers.PollingMessageReceiver;
import org.mule.umo.UMOComponent;
import org.mule.umo.UMOMessage;
import org.mule.umo.endpoint.UMOEndpoint;
import org.mule.umo.lifecycle.InitialisationException;
import org.mule.umo.provider.UMOConnector;
import org.mule.umo.provider.UMOMessageAdapter;
/**
* TcpStreamingMessageReceiver establishes a tcp client
* connection to an external server and reads the streaming data. No
* polling frequency is used since with blocking i/o reads will block,
* and with non-blocking i/o reads will occur when data is available.
* Causing delays between read attempts is unnecessary, so this forces
* the pollingFrequency property to zero so no pause occurs in the
* PollingMessageReceiver class.
*
* @author Rich Lucente
*
* @version $Revision$
*/
public class TcpStreamingMessageReceiver extends PollingMessageReceiver {
protected Socket clientSocket = null;
protected DataInputStream dataIn = null;
protected TcpProtocol protocol = null;
public TcpStreamingMessageReceiver(UMOConnector connector,
UMOComponent component, UMOEndpoint endpoint)
throws InitialisationException {
this(connector, component, endpoint, new Long(0));
}
private TcpStreamingMessageReceiver(UMOConnector connector,
UMOComponent component, UMOEndpoint endpoint, Long frequency)
throws InitialisationException {
super(connector, component, endpoint, frequency);
protocol = ((TcpConnector) connector).getTcpProtocol();
setFrequency(0);
}
public void poll() throws Exception {
setFrequency(0); // make sure this is zero and not overridden via config
byte[] data = protocol.read(dataIn);
if (data != null) {
UMOMessageAdapter adapter = connector.getMessageAdapter(data);
UMOMessage message = new MuleMessage(adapter);
routeMessage(message, endpoint.isSynchronous());
}
}
public void doConnect() throws ConnectException {
URI uri = endpoint.getEndpointURI().getUri();
String host = uri.getHost();
if (host == null || host.length() == 0) {
host = "localhost";
}
try {
logger.debug("attempting to connect to server socket");
InetAddress inetAddress = InetAddress.getByName(host);
clientSocket = new Socket(inetAddress, uri.getPort());
TcpConnector connector = (TcpConnector) this.connector;
clientSocket.setReceiveBufferSize(connector.getBufferSize());
clientSocket.setSendBufferSize(connector.getBufferSize());
clientSocket.setSoTimeout(connector.getReceiveTimeout());
dataIn = new DataInputStream(new BufferedInputStream(clientSocket
.getInputStream()));
logger.debug("connected to server socket");
} catch (Exception e) {
e.printStackTrace();
throw new ConnectException(new Message("tcp", 1, uri), e, this);
}
}
public void doDisconnect() throws Exception {
try {
if (clientSocket != null && !clientSocket.isClosed()) {
clientSocket.shutdownInput();
clientSocket.shutdownOutput();
clientSocket.close();
}
} finally {
clientSocket = null;
dataIn = null;
logger.info("closed tcp client socket");
}
}
}