/* This file is part of VHMsg written by Edward Fast at University of Southern California's Institute for Creative Technologies. http://www.ict.usc.edu Copyright 2008 Edward Fast, University of Southern California VHMsg is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. VHMsg is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with VHMsg. If not, see . */ using System; using System.Collections; using System.Collections.Generic; using System.Text; using System.Threading; using System.Web; using KeyValuePairConsumer = System.Collections.Generic.KeyValuePair; namespace VHMsg { /// /// This is the class that contains the message when received via the MessageEvent handler /// It is received as an argument from MessageEvent /// public class Message : EventArgs { /// /// String containing the message /// public string s; /// /// properties containing the multikey portion /// public Dictionary properties; /// /// Constructor /// /// /// public Message(string s, Dictionary properties) { this.s = s; this.properties = properties; } } /// /// This is the main class for using this library /// public class Client : IDisposable { private bool m_disposed = false; private Apache.NMS.IConnection connection; private Apache.NMS.ISession m_session; private Apache.NMS.IDestination m_destination; private Apache.NMS.IMessageProducer m_Producer; private string m_host; private string m_port; private string m_scope; private bool m_immediateMethod = true; private bool m_subscribedAll = false; private List m_consumers; // variables needed for polling private List m_messages = new List(); private object m_messageLock = new object(); private ManualResetEvent m_waitCondition = new ManualResetEvent(false); private object m_waitLock = new object(); /// /// Delegate for the MessageEvent handler /// public delegate void MessageEventHandler(object sender, Message message); /// /// Message Handler. Clients need to add their callback function to this handler to receive messages. /// public event MessageEventHandler MessageEvent; /// /// Gets the server being used for the connection /// public string Server { get { return m_host; } } /// /// Gets the server being used for the connection /// public string Port { get { return m_port; } } /// /// Gets/Sets the scope being used for the connection /// public string Scope { get { return m_scope; } set { m_scope = value; } } /// /// Constructor /// public Client() { m_consumers = new List>(); SetServerFromEnvironment(); SetPortFromEnvironment(); SetScopeFromEnvironment(); } /// /// Destructor, follows the IDisposable pattern to force the release of ActiveMQ resources /// ~Client() { Dispose(false); } /// /// Dispose method. Manually releases ActiveMQ resources. Follows the IDisposable pattern /// public void Dispose() { Dispose(true); GC.SuppressFinalize(this); // tell the GC that the Finalize process no longer needs to be run for this object. } /// /// Dispose method. Manually releases ActiveMQ resources. Follows the IDisposable pattern /// /// protected virtual void Dispose(bool disposeManagedResources) { // process only if mananged and unmanaged resources have not been disposed of. if (!m_disposed) { if (disposeManagedResources) { m_waitCondition.Close(); CloseConnection(); } m_disposed = true; } } /// /// /// protected void SetScopeFromEnvironment() { string scope = System.Environment.GetEnvironmentVariable("VHMSG_SCOPE"); if (scope != null) { m_scope = scope; } else { m_scope = "DEFAULT_SCOPE"; } } /// /// /// protected void SetServerFromEnvironment() { string host = System.Environment.GetEnvironmentVariable("VHMSG_SERVER"); if (host != null) { m_host = host; } else { m_host = "localhost"; } } /// /// /// protected void SetPortFromEnvironment() { string port = System.Environment.GetEnvironmentVariable("VHMSG_PORT"); if (port != null) { m_port = port; } else { m_port = "61616"; } } /// /// Opens a connection to the server. /// /// By default, it uses 3 system environment variables as parameters /// /// VHMSG_SERVER - This specifies the server to connect to. It can either be an ip address or domain name /// /// VHMSG_PORT - This specifies the port to connect to. /// /// VHMSG_SCOPE - A unique id used to distinguish messages sent by different modules using the same server. For example, if two users /// are using the same server, they would set different scopes so that they wouldn't receives each other's messages. /// public void OpenConnection() { string user = null; // ActiveMQConnection.DEFAULT_USER; string password = null; // ActiveMQConnection.DEFAULT_PASSWORD; string url = Apache.NMS.ActiveMQ.ConnectionFactory.DEFAULT_BROKER_URL; bool topic = true; //bool transacted = false; Apache.NMS.AcknowledgementMode ackMode = Apache.NMS.AcknowledgementMode.AutoAcknowledge; url = "tcp://" + m_host + ":" + m_port; //System.out.println("getConnection(): " + url + " " + m_scope ); Apache.NMS.ActiveMQ.ConnectionFactory connectionFactory = new Apache.NMS.ActiveMQ.ConnectionFactory(new Uri(url)); connection = connectionFactory.CreateConnection(user, password); //connection.setExceptionListener( this ); connection.Start(); //m_session = connection.CreateSession( transacted, ackMode ); m_session = connection.CreateSession(ackMode); if (topic) { m_destination = m_session.GetTopic(m_scope); } else { m_destination = m_session.GetQueue(m_scope); } m_Producer = m_session.CreateProducer(m_destination); m_Producer.DeliveryMode = Apache.NMS.MsgDeliveryMode.NonPersistent; // Persistent = false; // m_Producer.setDeliveryMode( DeliveryMode.NON_PERSISTENT ); } /// /// Opens a connection to the server using a specified host. See . /// /// the host to connect to. It can either be an ip address or domain name /// public void OpenConnection(string server) { m_host = server; OpenConnection(); } /// /// Opens a connection to the server using a specified host. See . /// /// the host to connect to. It can either be an ip address or domain name /// the port to connect to. /// public void OpenConnection(string server, string port) { m_host = server; m_port = port; OpenConnection(); } /// /// Closes the connection to the server that was previously opened via . /// public void CloseConnection() { lock (m_messageLock) { } connection.Dispose(); m_consumers.Clear(); } /// /// Sends a message to the server using 2 arguments. The first argument is the first word in the message, the second argument is the rest of the message. /// /// The first word in the message /// The rest of the message public void SendMessage(string op, string arg) { op = op.Trim(); arg = arg.Trim(); if (string.IsNullOrEmpty(op)) return; string arg_encoded = HttpUtility.UrlEncode(arg, Encoding.UTF8); string mess = op + " " + arg_encoded; Apache.NMS.ActiveMQ.Commands.ActiveMQTextMessage message = (Apache.NMS.ActiveMQ.Commands.ActiveMQTextMessage)m_session.CreateTextMessage(mess); message.SetObjectProperty("ELVISH_SCOPE", m_scope); message.SetObjectProperty("MESSAGE_PREFIX", op); //message.SetObjectProperty( op, arg ); m_Producer.Send(message); } /// /// Sends a message to the server. The argument contains the full message /// /// The full message to send public void SendMessage(string opandarg) { opandarg = opandarg.Trim(); string op; string arg; int index = opandarg.IndexOf(" "); if (index < 0) { op = opandarg; arg = ""; } else { op = opandarg.Substring(0, index); arg = opandarg.Remove(0, index + 1); } SendMessage(op, arg); } /// /// Sends a message to the server where each item in the array is a separate word in the message. /// /// The first word in the message /// An array containing the rest of the message. Each item in the array is a separate word public void SendMessage(string op, string[] args) { StringBuilder concatargs = new StringBuilder(); for (int i = 0; i < args.Length; i++) { if (i == 0) { concatargs.Append(args[i]); } else { concatargs.Append(" "); concatargs.Append(args[i]); } } SendMessage(op, concatargs.ToString()); } /// /// /// public void EnablePollingMethod() { m_immediateMethod = false; } /// /// /// public void EnableImmediateMethod() { m_immediateMethod = true; } /// /// Subscribes to a message. This notifies the server that we are interested in messages that contain the given argument as the first word in the message. /// See /// /// For each message that is received, the MessageEvent handler will be called for all listeners. See /// /// More than one subscription can be made. Alternatively, an asterisk (*) may be sent as a special-case argument that indicates we're interested in *all* messages. /// This should be used very sparingly because it can cause quite a bit of network traffic. /// /// Indicates what types of messages we are interested in receiving. This tells the server to send messages where the first word matches req public void SubscribeMessage(string req) { // check if we've already subscribed to this message foreach (KeyValuePairConsumer c in m_consumers) { if (c.Key == req) { return; } } // special case for asterisk. If we pass in an asterisk, we are subscribing to all messages string messageSelector; if (req == "*") { messageSelector = "ELVISH_SCOPE = '" + m_scope + "' AND MESSAGE_PREFIX LIKE '%'"; m_subscribedAll = true; } else { messageSelector = "ELVISH_SCOPE = '" + m_scope + "' AND MESSAGE_PREFIX = '" + req + "'"; } //string messageSelector = "ELVISH_SCOPE = '" + m_scope + "' AND MESSAGE_PREFIX = '" + req + "'"; //m_consumers.add( m_session.createDurableConsumer( m_destination, req, reqString, false ) ); //try { //MessageConsumer c = m_session.createDurableSubscriber( (Topic)m_destination, req, messageSelector, false ); Apache.NMS.IMessageConsumer c = m_session.CreateConsumer(m_destination, messageSelector); //MessageConsumer c = m_session.createConsumer( m_destination, null ); //MessageConsumer c = m_session.createConsumer( m_destination ); c.Listener += this.OnMessage; m_consumers.Add(new KeyValuePairConsumer(req, c)); //System.out.println("subscribeMessage(): " + messageSelector); } // if we subscribed to "*", remove all the other listeners to prevent duplicate messages if (m_subscribedAll) { foreach (KeyValuePairConsumer c in m_consumers) { if (c.Key.CompareTo("*") != 0) { c.Value.Listener -= this.OnMessage; c.Value.Listener -= this.OnMessageIgnore; c.Value.Listener += this.OnMessageIgnore; } } } } /// /// /// public bool UnsubscribeMessage(string req) { // if we are unsubscribing from "*", re-add all the other listeners so that messages continue to be received (only once) if (req.CompareTo("*") == 0) { foreach (KeyValuePairConsumer c in m_consumers) { if (c.Key.CompareTo("*") != 0) { c.Value.Listener -= this.OnMessage; c.Value.Listener -= this.OnMessageIgnore; c.Value.Listener += this.OnMessage; } } m_subscribedAll = false; } foreach (KeyValuePairConsumer c in m_consumers) { if (c.Key == req) { c.Value.Close(); m_consumers.Remove(c); return true; } } return false; } /// /// /// public int Poll() { int numMessages = 0; if (!m_immediateMethod) { for (;;) { Message args; lock (m_messageLock) { if (m_messages.Count == 0) { break; } args = m_messages[0]; m_messages.RemoveAt(0); } MessageEvent(this, args); numMessages++; } } return numMessages; } /// /// /// /// public void WaitAndPoll(double waitTimeSeconds) { lock (m_waitLock) { // if there are already messages in the queue, process them and return (don't wait) int numMsgs = Poll(); if (numMsgs > 0) return; int waitTimeMilliseconds = (int)(waitTimeSeconds * 1000); m_waitCondition.WaitOne(waitTimeMilliseconds); m_waitCondition.Reset(); Poll(); } } /// /// This function is a callback function received whenever an ActiveMQ message is received from the server. It processes the message and passes it on to the client via /// the MessageEvent handler. /// /// ActiveMQ message received from the server protected void OnMessage(Apache.NMS.IMessage msg) { string message = ""; //string elements = null ; string temp = null; //string holder = null ; //StringTokenizer st = null; //int index = 0 ; //System.out.println("onMessage(): " + ((TextMessage)msg).getText() ); //if ( msg instanceof TextMessage ) { Apache.NMS.ActiveMQ.Commands.ActiveMQTextMessage txtMsg = (Apache.NMS.ActiveMQ.Commands.ActiveMQTextMessage)msg; temp = txtMsg.Text; temp = HttpUtility.UrlDecode(temp); temp = temp.Trim(); /* // Strip off first char of args if it is a " if( temp.substring(0,1).compareToIgnoreCase("\"") == 0 ) { temp = (temp.substring( 1, temp.length())).trim() ; if( temp.substring(0,1).compareToIgnoreCase("\"") == 0 ) { // if 2 double quotes at end, take one double quote off message += temp.substring( 0, temp.length()-1 ); } else { message += "\"" + temp ; } } else { message += temp ; } */ message = temp; Dictionary properties = new Dictionary(); foreach (string key in txtMsg.Properties.Keys) { object data = txtMsg.Properties[key]; string sData = data as string; if (sData != null) { properties[key] = HttpUtility.UrlDecode(sData).Trim(); } } Message args = new Message(message, properties); if (m_immediateMethod) { MessageEvent(this, args); } else { lock (m_messageLock) { m_messages.Add(args); } // signal the other thread that we've received a message (only used in WaitAndPoll() ) lock (m_waitLock) { m_waitCondition.Set(); } } } } /// /// This function is a callback function received whenever an ActiveMQ message is received from the server. It ignores the message and returns immediately. /// /// ActiveMQ message received from the server protected void OnMessageIgnore(Apache.NMS.IMessage msg) { return; } } }