/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using Apache.NMS.ActiveMQ.Commands; using Apache.NMS.ActiveMQ.Transport; using Apache.NMS.ActiveMQ.Util; using System.Collections; namespace Apache.NMS.ActiveMQ.State { /// /// Tracks the state of a connection so a newly established transport can be /// re-initialized to the state that was tracked. /// public class ConnectionStateTracker : CommandVisitorAdapter { private static readonly Tracked TRACKED_RESPONSE_MARKER = new Tracked(null); protected readonly Dictionary connectionStates = new Dictionary(); private bool isTrackTransactions; private bool isTrackTransactionProducers = true; private bool isRestoreSessions = true; private bool isRestoreConsumers = true; private bool isRestoreProducers = true; private bool isRestoreTransaction = true; private bool isTrackMessages = true; private int maxCacheSize = 256; private readonly LRUCache messageCache = new LRUCache(256); private class RemoveTransactionAction : ResponseHandler { private readonly TransactionInfo info; private readonly ConnectionStateTracker cst; public RemoveTransactionAction(TransactionInfo info, ConnectionStateTracker aCst) { this.info = info; this.cst = aCst; } public override void OnResponse() { ConnectionState cs; if(cst.connectionStates.TryGetValue(info.ConnectionId, out cs)) { cs.RemoveTransactionState(info.TransactionId); } } } /// /// /// /// null if the command is not state tracked. public Tracked Track(Command command) { try { return (Tracked) command.Visit(this); } catch(IOException) { throw; } catch(Exception e) { throw new IOException(e.Message); } } public void TrackBack(Command command) { } public void DoRestore(ITransport transport) { // Restore the connections. foreach(ConnectionState connectionState in connectionStates.Values) { ConnectionInfo info = connectionState.Info; info.FailoverReconnect = true; if (Tracer.IsDebugEnabled) { Tracer.Debug("conn: " + connectionState.Info.ConnectionId); } transport.Oneway(info); DoRestoreTempDestinations(transport, connectionState); if(RestoreSessions) { DoRestoreSessions(transport, connectionState); } if(RestoreTransaction) { DoRestoreTransactions(transport, connectionState); } } // Now flush messages foreach(Command command in messageCache.Values) { if (Tracer.IsDebugEnabled) { Tracer.Debug("Replaying command: " + command); } transport.Oneway(command); } } private void DoRestoreTransactions(ITransport transport, ConnectionState connectionState) { AtomicCollection transactionStates = connectionState.TransactionStates; List toRollback = new List(); foreach(TransactionState transactionState in transactionStates) { // rollback any completed transactions - no way to know if commit got there // or if reply went missing if (transactionState.Commands.Count != 0) { Command lastCommand = transactionState.Commands[transactionState.Commands.Count - 1]; if (lastCommand.IsTransactionInfo) { TransactionInfo transactionInfo = lastCommand as TransactionInfo; if (transactionInfo.Type == TransactionInfo.COMMIT_ONE_PHASE) { if (Tracer.IsDebugEnabled) { Tracer.Debug("rolling back potentially completed tx: " + transactionState.Id); } toRollback.Add(transactionInfo); continue; } } } // replay the add and remove of short lived producers that may have been // involved in the transaction foreach (ProducerState producerState in transactionState.ProducerStates) { if (Tracer.IsDebugEnabled) { Tracer.Debug("tx replay producer :" + producerState.Info); } transport.Oneway(producerState.Info); } foreach (Command command in transactionState.Commands) { if (Tracer.IsDebugEnabled) { Tracer.Debug("tx replay: " + command); } transport.Oneway(command); } foreach (ProducerState producerState in transactionState.ProducerStates) { if (Tracer.IsDebugEnabled) { Tracer.Debug("tx remove replayed producer :" + producerState.Info); } RemoveInfo producerRemove = new RemoveInfo(); producerRemove.ObjectId = producerState.Info.ProducerId; transport.Oneway(producerRemove); } } foreach (TransactionInfo command in toRollback) { // respond to the outstanding commit ExceptionResponse response = new ExceptionResponse(); response.Exception = new BrokerError(); response.Exception.Message = "Transaction completion in doubt due to failover. Forcing rollback of " + command.TransactionId; response.Exception.ExceptionClass = (new TransactionRolledBackException()).GetType().FullName; response.CorrelationId = command.CommandId; transport.Command(transport, response); } } /// /// /// /// protected void DoRestoreSessions(ITransport transport, ConnectionState connectionState) { // Restore the connection's sessions foreach(SessionState sessionState in connectionState.SessionStates) { if (Tracer.IsDebugEnabled) { Tracer.Debug("Restoring session: " + sessionState.Info.SessionId); } transport.Oneway(sessionState.Info); if(RestoreProducers) { DoRestoreProducers(transport, sessionState); } if(RestoreConsumers) { DoRestoreConsumers(transport, sessionState); } } } /// /// /// /// protected void DoRestoreConsumers(ITransport transport, SessionState sessionState) { // Restore the session's consumers but possibly in pull only (prefetch 0 state) till // recovery completes. ConnectionState connectionState = null; bool connectionInterruptionProcessingComplete = false; if(connectionStates.TryGetValue(sessionState.Info.SessionId.ParentId, out connectionState)) { connectionInterruptionProcessingComplete = connectionState.ConnectionInterruptProcessingComplete; } // Restore the session's consumers foreach(ConsumerState consumerState in sessionState.ConsumerStates) { ConsumerInfo infoToSend = consumerState.Info; if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0 && transport.WireFormat.Version > 5) { infoToSend = consumerState.Info.Clone() as ConsumerInfo; lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot) { if(!connectionState.RecoveringPullConsumers.ContainsKey(infoToSend.ConsumerId)) { connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info); } } infoToSend.PrefetchSize = 0; if(Tracer.IsDebugEnabled) { Tracer.Debug("restore consumer: " + infoToSend.ConsumerId + " in pull mode pending recovery, overriding prefetch: " + consumerState.Info.PrefetchSize); } } if(Tracer.IsDebugEnabled) { Tracer.Debug("restore consumer: " + infoToSend.ConsumerId); } transport.Oneway(infoToSend); } } /// /// /// /// protected void DoRestoreProducers(ITransport transport, SessionState sessionState) { // Restore the session's producers foreach(ProducerState producerState in sessionState.ProducerStates) { if (Tracer.IsDebugEnabled) { Tracer.Debug("Restoring producer: " + producerState.Info.ProducerId); } transport.Oneway(producerState.Info); } } /// /// /// /// protected void DoRestoreTempDestinations(ITransport transport, ConnectionState connectionState) { // Restore the connection's temp destinations. foreach(DestinationInfo destinationInfo in connectionState.TempDestinations) { transport.Oneway(destinationInfo); } } public override Response ProcessAddDestination(DestinationInfo info) { if(info != null && info.Destination.IsTemporary) { ConnectionState cs; if(connectionStates.TryGetValue(info.ConnectionId, out cs)) { cs.AddTempDestination(info); } } return TRACKED_RESPONSE_MARKER; } public override Response ProcessRemoveDestination(DestinationInfo info) { if(info != null && info.Destination.IsTemporary) { ConnectionState cs; if(connectionStates.TryGetValue(info.ConnectionId, out cs)) { cs.RemoveTempDestination(info.Destination); } } return TRACKED_RESPONSE_MARKER; } public override Response ProcessAddProducer(ProducerInfo info) { if(info != null && info.ProducerId != null) { SessionId sessionId = info.ProducerId.ParentId; if(sessionId != null) { ConnectionId connectionId = sessionId.ParentId; if(connectionId != null) { ConnectionState cs; if(connectionStates.TryGetValue(connectionId, out cs)) { SessionState ss = cs[sessionId]; if(ss != null) { ss.AddProducer(info); } } } } } return TRACKED_RESPONSE_MARKER; } public override Response ProcessRemoveProducer(ProducerId id) { if(id != null) { SessionId sessionId = id.ParentId; if(sessionId != null) { ConnectionId connectionId = sessionId.ParentId; if(connectionId != null) { ConnectionState cs = null; if(connectionStates.TryGetValue(connectionId, out cs)) { SessionState ss = cs[sessionId]; if(ss != null) { ss.RemoveProducer(id); } } } } } return TRACKED_RESPONSE_MARKER; } public override Response ProcessAddConsumer(ConsumerInfo info) { if(info != null) { SessionId sessionId = info.ConsumerId.ParentId; if(sessionId != null) { ConnectionId connectionId = sessionId.ParentId; if(connectionId != null) { ConnectionState cs = null; if(connectionStates.TryGetValue(connectionId, out cs)) { SessionState ss = cs[sessionId]; if(ss != null) { ss.AddConsumer(info); } } } } } return TRACKED_RESPONSE_MARKER; } public override Response ProcessRemoveConsumer(ConsumerId id) { if(id != null) { SessionId sessionId = id.ParentId; if(sessionId != null) { ConnectionId connectionId = sessionId.ParentId; if(connectionId != null) { ConnectionState cs = null; if(connectionStates.TryGetValue(connectionId, out cs)) { SessionState ss = cs[sessionId]; if(ss != null) { ss.RemoveConsumer(id); } cs.RecoveringPullConsumers.Remove(id); } } } } return TRACKED_RESPONSE_MARKER; } public override Response ProcessAddSession(SessionInfo info) { if(info != null) { ConnectionId connectionId = info.SessionId.ParentId; if(connectionId != null) { ConnectionState cs = null; if(connectionStates.TryGetValue(connectionId, out cs)) { cs.AddSession(info); } } } return TRACKED_RESPONSE_MARKER; } public override Response ProcessRemoveSession(SessionId id) { if(id != null) { ConnectionId connectionId = id.ParentId; if(connectionId != null) { ConnectionState cs = null; if(connectionStates.TryGetValue(connectionId, out cs)) { cs.RemoveSession(id); } } } return TRACKED_RESPONSE_MARKER; } public override Response ProcessAddConnection(ConnectionInfo info) { if(info != null) { ConnectionState connState = new ConnectionState(info); if(connectionStates.ContainsKey(info.ConnectionId)) { connectionStates[info.ConnectionId] = connState; } else { connectionStates.Add(info.ConnectionId, connState); } } return TRACKED_RESPONSE_MARKER; } public override Response ProcessRemoveConnection(ConnectionId id) { if(id != null) { connectionStates.Remove(id); } return TRACKED_RESPONSE_MARKER; } public override Response ProcessMessage(Message send) { if(send != null) { if(TrackTransactions && send.TransactionId != null) { ProducerId producerId = send.ProducerId; ConnectionId connectionId = producerId.ParentId.ParentId; if(connectionId != null) { ConnectionState cs = null; if(connectionStates.TryGetValue(connectionId, out cs)) { TransactionState transactionState = cs[send.TransactionId]; if(transactionState != null) { transactionState.AddCommand(send); if (isTrackTransactionProducers) { SessionState ss = cs[producerId.ParentId]; ProducerState producerState = ss[producerId]; producerState.TransactionState = transactionState; } } } } return TRACKED_RESPONSE_MARKER; } else if(TrackMessages) { messageCache.Add(send.MessageId, (Message) send.Clone()); } } return null; } public override Response ProcessMessageAck(MessageAck ack) { if(TrackTransactions && ack != null && ack.TransactionId != null) { ConnectionId connectionId = ack.ConsumerId.ParentId.ParentId; if(connectionId != null) { ConnectionState cs = null; if(connectionStates.TryGetValue(connectionId, out cs)) { TransactionState transactionState = cs[ack.TransactionId]; if(transactionState != null) { transactionState.AddCommand(ack); } } } return TRACKED_RESPONSE_MARKER; } return null; } public override Response ProcessBeginTransaction(TransactionInfo info) { if(TrackTransactions && info != null && info.TransactionId != null) { ConnectionId connectionId = info.ConnectionId; if(connectionId != null) { ConnectionState cs = null; if(connectionStates.TryGetValue(connectionId, out cs)) { cs.AddTransactionState(info.TransactionId); TransactionState state = cs[info.TransactionId]; state.AddCommand(info); } } return TRACKED_RESPONSE_MARKER; } return null; } public override Response ProcessPrepareTransaction(TransactionInfo info) { if(TrackTransactions && info != null) { ConnectionId connectionId = info.ConnectionId; if(connectionId != null) { ConnectionState cs = null; if(connectionStates.TryGetValue(connectionId, out cs)) { TransactionState transactionState = cs[info.TransactionId]; if(transactionState != null) { transactionState.AddCommand(info); } } } return TRACKED_RESPONSE_MARKER; } return null; } public override Response ProcessCommitTransactionOnePhase(TransactionInfo info) { if(TrackTransactions && info != null) { ConnectionId connectionId = info.ConnectionId; if(connectionId != null) { ConnectionState cs = null; if(connectionStates.TryGetValue(connectionId, out cs)) { TransactionState transactionState = cs[info.TransactionId]; if(transactionState != null) { transactionState.AddCommand(info); return new Tracked(new RemoveTransactionAction(info, this)); } } } } return null; } public override Response ProcessCommitTransactionTwoPhase(TransactionInfo info) { if(TrackTransactions && info != null) { ConnectionId connectionId = info.ConnectionId; if(connectionId != null) { ConnectionState cs = null; if(connectionStates.TryGetValue(connectionId, out cs)) { TransactionState transactionState = cs[info.TransactionId]; if(transactionState != null) { transactionState.AddCommand(info); return new Tracked(new RemoveTransactionAction(info, this)); } } } } return null; } public override Response ProcessRollbackTransaction(TransactionInfo info) { if(TrackTransactions && info != null) { ConnectionId connectionId = info.ConnectionId; if(connectionId != null) { ConnectionState cs = null; if(connectionStates.TryGetValue(connectionId, out cs)) { TransactionState transactionState = cs[info.TransactionId]; if(transactionState != null) { transactionState.AddCommand(info); return new Tracked(new RemoveTransactionAction(info, this)); } } } } return null; } public override Response ProcessEndTransaction(TransactionInfo info) { if(TrackTransactions && info != null) { ConnectionId connectionId = info.ConnectionId; if(connectionId != null) { ConnectionState cs = null; if(connectionStates.TryGetValue(connectionId, out cs)) { TransactionState transactionState = cs[info.TransactionId]; if(transactionState != null) { transactionState.AddCommand(info); } } } return TRACKED_RESPONSE_MARKER; } return null; } public override Response ProcessMessagePull(MessagePull pull) { if (pull != null) { // leave a single instance in the cache String id = pull.Destination + "::" + pull.ConsumerId; messageCache[id] = pull; } return null; } public bool RestoreConsumers { get { return isRestoreConsumers; } set { isRestoreConsumers = value; } } public bool RestoreProducers { get { return isRestoreProducers; } set { isRestoreProducers = value; } } public bool RestoreSessions { get { return isRestoreSessions; } set { isRestoreSessions = value; } } public bool TrackTransactions { get { return isTrackTransactions; } set { isTrackTransactions = value; } } public bool TrackTransactionProducers { get { return isTrackTransactionProducers; } set { isTrackTransactionProducers = value; } } public bool RestoreTransaction { get { return isRestoreTransaction; } set { isRestoreTransaction = value; } } public bool TrackMessages { get { return isTrackMessages; } set { isTrackMessages = value; } } public int MaxCacheSize { get { return maxCacheSize; } set { this.maxCacheSize = value; this.messageCache.MaxCacheSize = maxCacheSize; } } public void ConnectionInterruptProcessingComplete(ITransport transport, ConnectionId connectionId) { ConnectionState connectionState = null; if(connectionStates.TryGetValue(connectionId, out connectionState)) { connectionState.ConnectionInterruptProcessingComplete = true; Dictionary consumersToRestorePrefetchOn; lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot) { consumersToRestorePrefetchOn = new Dictionary(connectionState.RecoveringPullConsumers); connectionState.RecoveringPullConsumers.Clear(); } foreach(KeyValuePair entry in consumersToRestorePrefetchOn) { ConsumerControl control = new ConsumerControl(); control.ConsumerId = entry.Key; control.Prefetch = entry.Value.PrefetchSize; control.Destination = entry.Value.Destination; try { if(Tracer.IsDebugEnabled) { Tracer.Debug("restored recovering consumer: " + control.ConsumerId + " with: " + control.Prefetch); } transport.Oneway(control); } catch(Exception ex) { if(Tracer.IsDebugEnabled) { Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId + " with: " + control.Prefetch + "Error: " + ex.Message); } } } } } public void TransportInterrupted(ConnectionId id) { ConnectionState connection = null; if(connectionStates.TryGetValue(id, out connection)) { connection.ConnectionInterruptProcessingComplete = false; } } } }