/*
This file is part of VHMsg written by Edward Fast at
University of Southern California's Institute for Creative Technologies.
http://www.ict.usc.edu
Copyright 2008 Edward Fast, University of Southern California
VHMsg is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
VHMsg is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with VHMsg. If not, see .
*/
using System;
using System.Collections;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Web;
using KeyValuePairConsumer = System.Collections.Generic.KeyValuePair;
namespace VHMsg
{
///
/// This is the class that contains the message when received via the MessageEvent handler
/// It is received as an argument from MessageEvent
///
public class Message : EventArgs
{
///
/// String containing the message
///
public string s;
///
/// properties containing the multikey portion
///
public Dictionary properties;
///
/// Constructor
///
///
///
public Message(string s, Dictionary properties)
{
this.s = s;
this.properties = properties;
}
}
///
/// This is the main class for using this library
///
public class Client : IDisposable
{
private bool m_disposed = false;
private Apache.NMS.IConnection connection;
private Apache.NMS.ISession m_session;
private Apache.NMS.IDestination m_destination;
private Apache.NMS.IMessageProducer m_Producer;
private string m_host;
private string m_port;
private string m_scope;
private bool m_immediateMethod = true;
private bool m_subscribedAll = false;
private List m_consumers;
// variables needed for polling
private List m_messages = new List();
private object m_messageLock = new object();
private ManualResetEvent m_waitCondition = new ManualResetEvent(false);
private object m_waitLock = new object();
///
/// Delegate for the MessageEvent handler
///
public delegate void MessageEventHandler(object sender, Message message);
///
/// Message Handler. Clients need to add their callback function to this handler to receive messages.
///
public event MessageEventHandler MessageEvent;
///
/// Gets the server being used for the connection
///
public string Server
{
get { return m_host; }
}
///
/// Gets the server being used for the connection
///
public string Port
{
get { return m_port; }
}
///
/// Gets/Sets the scope being used for the connection
///
public string Scope
{
get { return m_scope; }
set { m_scope = value; }
}
///
/// Constructor
///
public Client()
{
m_consumers = new List>();
SetServerFromEnvironment();
SetPortFromEnvironment();
SetScopeFromEnvironment();
}
///
/// Destructor, follows the IDisposable pattern to force the release of ActiveMQ resources
///
~Client()
{
Dispose(false);
}
///
/// Dispose method. Manually releases ActiveMQ resources. Follows the IDisposable pattern
///
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this); // tell the GC that the Finalize process no longer needs to be run for this object.
}
///
/// Dispose method. Manually releases ActiveMQ resources. Follows the IDisposable pattern
///
///
protected virtual void Dispose(bool disposeManagedResources)
{
// process only if mananged and unmanaged resources have not been disposed of.
if (!m_disposed)
{
if (disposeManagedResources)
{
m_waitCondition.Close();
CloseConnection();
}
m_disposed = true;
}
}
///
///
///
protected void SetScopeFromEnvironment()
{
string scope = System.Environment.GetEnvironmentVariable("VHMSG_SCOPE");
if (scope != null)
{
m_scope = scope;
}
else
{
m_scope = "DEFAULT_SCOPE";
}
}
///
///
///
protected void SetServerFromEnvironment()
{
string host = System.Environment.GetEnvironmentVariable("VHMSG_SERVER");
if (host != null)
{
m_host = host;
}
else
{
m_host = "localhost";
}
}
///
///
///
protected void SetPortFromEnvironment()
{
string port = System.Environment.GetEnvironmentVariable("VHMSG_PORT");
if (port != null)
{
m_port = port;
}
else
{
m_port = "61616";
}
}
///
/// Opens a connection to the server.
///
/// By default, it uses 3 system environment variables as parameters
///
/// VHMSG_SERVER - This specifies the server to connect to. It can either be an ip address or domain name
///
/// VHMSG_PORT - This specifies the port to connect to.
///
/// VHMSG_SCOPE - A unique id used to distinguish messages sent by different modules using the same server. For example, if two users
/// are using the same server, they would set different scopes so that they wouldn't receives each other's messages.
///
public void OpenConnection()
{
string user = null; // ActiveMQConnection.DEFAULT_USER;
string password = null; // ActiveMQConnection.DEFAULT_PASSWORD;
string url = Apache.NMS.ActiveMQ.ConnectionFactory.DEFAULT_BROKER_URL;
bool topic = true;
//bool transacted = false;
Apache.NMS.AcknowledgementMode ackMode = Apache.NMS.AcknowledgementMode.AutoAcknowledge;
url = "tcp://" + m_host + ":" + m_port;
//System.out.println("getConnection(): " + url + " " + m_scope );
Apache.NMS.ActiveMQ.ConnectionFactory connectionFactory = new Apache.NMS.ActiveMQ.ConnectionFactory(new Uri(url));
connection = connectionFactory.CreateConnection(user, password);
//connection.setExceptionListener( this );
connection.Start();
//m_session = connection.CreateSession( transacted, ackMode );
m_session = connection.CreateSession(ackMode);
if (topic)
{
m_destination = m_session.GetTopic(m_scope);
}
else
{
m_destination = m_session.GetQueue(m_scope);
}
m_Producer = m_session.CreateProducer(m_destination);
m_Producer.DeliveryMode = Apache.NMS.MsgDeliveryMode.NonPersistent; // Persistent = false; // m_Producer.setDeliveryMode( DeliveryMode.NON_PERSISTENT );
}
///
/// Opens a connection to the server using a specified host. See .
///
/// the host to connect to. It can either be an ip address or domain name
///
public void OpenConnection(string server)
{
m_host = server;
OpenConnection();
}
///
/// Opens a connection to the server using a specified host. See .
///
/// the host to connect to. It can either be an ip address or domain name
/// the port to connect to.
///
public void OpenConnection(string server, string port)
{
m_host = server;
m_port = port;
OpenConnection();
}
///
/// Closes the connection to the server that was previously opened via .
///
public void CloseConnection()
{
lock (m_messageLock)
{
}
connection.Dispose();
m_consumers.Clear();
}
///
/// Sends a message to the server using 2 arguments. The first argument is the first word in the message, the second argument is the rest of the message.
///
/// The first word in the message
/// The rest of the message
public void SendMessage(string op, string arg)
{
op = op.Trim();
arg = arg.Trim();
if (string.IsNullOrEmpty(op))
return;
string arg_encoded = HttpUtility.UrlEncode(arg, Encoding.UTF8);
string mess = op + " " + arg_encoded;
Apache.NMS.ActiveMQ.Commands.ActiveMQTextMessage message = (Apache.NMS.ActiveMQ.Commands.ActiveMQTextMessage)m_session.CreateTextMessage(mess);
message.SetObjectProperty("ELVISH_SCOPE", m_scope);
message.SetObjectProperty("MESSAGE_PREFIX", op);
//message.SetObjectProperty( op, arg );
m_Producer.Send(message);
}
///
/// Sends a message to the server. The argument contains the full message
///
/// The full message to send
public void SendMessage(string opandarg)
{
opandarg = opandarg.Trim();
string op;
string arg;
int index = opandarg.IndexOf(" ");
if (index < 0)
{
op = opandarg;
arg = "";
}
else
{
op = opandarg.Substring(0, index);
arg = opandarg.Remove(0, index + 1);
}
SendMessage(op, arg);
}
///
/// Sends a message to the server where each item in the array is a separate word in the message.
///
/// The first word in the message
/// An array containing the rest of the message. Each item in the array is a separate word
public void SendMessage(string op, string[] args)
{
StringBuilder concatargs = new StringBuilder();
for (int i = 0; i < args.Length; i++)
{
if (i == 0)
{
concatargs.Append(args[i]);
}
else
{
concatargs.Append(" ");
concatargs.Append(args[i]);
}
}
SendMessage(op, concatargs.ToString());
}
///
///
///
public void EnablePollingMethod()
{
m_immediateMethod = false;
}
///
///
///
public void EnableImmediateMethod()
{
m_immediateMethod = true;
}
///
/// Subscribes to a message. This notifies the server that we are interested in messages that contain the given argument as the first word in the message.
/// See
///
/// For each message that is received, the MessageEvent handler will be called for all listeners. See
///
/// More than one subscription can be made. Alternatively, an asterisk (*) may be sent as a special-case argument that indicates we're interested in *all* messages.
/// This should be used very sparingly because it can cause quite a bit of network traffic.
///
/// Indicates what types of messages we are interested in receiving. This tells the server to send messages where the first word matches req
public void SubscribeMessage(string req)
{
// check if we've already subscribed to this message
foreach (KeyValuePairConsumer c in m_consumers)
{
if (c.Key == req)
{
return;
}
}
// special case for asterisk. If we pass in an asterisk, we are subscribing to all messages
string messageSelector;
if (req == "*")
{
messageSelector = "ELVISH_SCOPE = '" + m_scope + "' AND MESSAGE_PREFIX LIKE '%'";
m_subscribedAll = true;
}
else
{
messageSelector = "ELVISH_SCOPE = '" + m_scope + "' AND MESSAGE_PREFIX = '" + req + "'";
}
//string messageSelector = "ELVISH_SCOPE = '" + m_scope + "' AND MESSAGE_PREFIX = '" + req + "'";
//m_consumers.add( m_session.createDurableConsumer( m_destination, req, reqString, false ) );
//try
{
//MessageConsumer c = m_session.createDurableSubscriber( (Topic)m_destination, req, messageSelector, false );
Apache.NMS.IMessageConsumer c = m_session.CreateConsumer(m_destination, messageSelector);
//MessageConsumer c = m_session.createConsumer( m_destination, null );
//MessageConsumer c = m_session.createConsumer( m_destination );
c.Listener += this.OnMessage;
m_consumers.Add(new KeyValuePairConsumer(req, c));
//System.out.println("subscribeMessage(): " + messageSelector);
}
// if we subscribed to "*", remove all the other listeners to prevent duplicate messages
if (m_subscribedAll)
{
foreach (KeyValuePairConsumer c in m_consumers)
{
if (c.Key.CompareTo("*") != 0)
{
c.Value.Listener -= this.OnMessage;
c.Value.Listener -= this.OnMessageIgnore;
c.Value.Listener += this.OnMessageIgnore;
}
}
}
}
///
///
///
public bool UnsubscribeMessage(string req)
{
// if we are unsubscribing from "*", re-add all the other listeners so that messages continue to be received (only once)
if (req.CompareTo("*") == 0)
{
foreach (KeyValuePairConsumer c in m_consumers)
{
if (c.Key.CompareTo("*") != 0)
{
c.Value.Listener -= this.OnMessage;
c.Value.Listener -= this.OnMessageIgnore;
c.Value.Listener += this.OnMessage;
}
}
m_subscribedAll = false;
}
foreach (KeyValuePairConsumer c in m_consumers)
{
if (c.Key == req)
{
c.Value.Close();
m_consumers.Remove(c);
return true;
}
}
return false;
}
///
///
///
public int Poll()
{
int numMessages = 0;
if (!m_immediateMethod)
{
for (;;)
{
Message args;
lock (m_messageLock)
{
if (m_messages.Count == 0)
{
break;
}
args = m_messages[0];
m_messages.RemoveAt(0);
}
MessageEvent(this, args);
numMessages++;
}
}
return numMessages;
}
///
///
///
///
public void WaitAndPoll(double waitTimeSeconds)
{
lock (m_waitLock)
{
// if there are already messages in the queue, process them and return (don't wait)
int numMsgs = Poll();
if (numMsgs > 0)
return;
int waitTimeMilliseconds = (int)(waitTimeSeconds * 1000);
m_waitCondition.WaitOne(waitTimeMilliseconds);
m_waitCondition.Reset();
Poll();
}
}
///
/// This function is a callback function received whenever an ActiveMQ message is received from the server. It processes the message and passes it on to the client via
/// the MessageEvent handler.
///
/// ActiveMQ message received from the server
protected void OnMessage(Apache.NMS.IMessage msg)
{
string message = "";
//string elements = null ;
string temp = null;
//string holder = null ;
//StringTokenizer st = null;
//int index = 0 ;
//System.out.println("onMessage(): " + ((TextMessage)msg).getText() );
//if ( msg instanceof TextMessage )
{
Apache.NMS.ActiveMQ.Commands.ActiveMQTextMessage txtMsg = (Apache.NMS.ActiveMQ.Commands.ActiveMQTextMessage)msg;
temp = txtMsg.Text;
temp = HttpUtility.UrlDecode(temp);
temp = temp.Trim();
/*
// Strip off first char of args if it is a "
if( temp.substring(0,1).compareToIgnoreCase("\"") == 0 )
{
temp = (temp.substring( 1, temp.length())).trim() ;
if( temp.substring(0,1).compareToIgnoreCase("\"") == 0 )
{
// if 2 double quotes at end, take one double quote off
message += temp.substring( 0, temp.length()-1 );
}
else
{
message += "\"" + temp ;
}
}
else
{
message += temp ;
}
*/
message = temp;
Dictionary properties = new Dictionary();
foreach (string key in txtMsg.Properties.Keys)
{
object data = txtMsg.Properties[key];
string sData = data as string;
if (sData != null)
{
properties[key] = HttpUtility.UrlDecode(sData).Trim();
}
}
Message args = new Message(message, properties);
if (m_immediateMethod)
{
MessageEvent(this, args);
}
else
{
lock (m_messageLock)
{
m_messages.Add(args);
}
// signal the other thread that we've received a message (only used in WaitAndPoll() )
lock (m_waitLock)
{
m_waitCondition.Set();
}
}
}
}
///
/// This function is a callback function received whenever an ActiveMQ message is received from the server. It ignores the message and returns immediately.
///
/// ActiveMQ message received from the server
protected void OnMessageIgnore(Apache.NMS.IMessage msg)
{
return;
}
}
}