Micro broker OSGI Commands (Part 2)
In my previous post, I mentioned that I had created an OSGI utility for "servicing" the micro broker. The utility simply calls various micro broker admin API routines. To get you started writing your own micro broker applications, posted below is the OSGI command's controller class for micro broker as well as several links on micro broker related resources:
import java.io.File;
import java.util.Calendar;
import java.util.Date;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.ibm.micro.admin.AdminException;
import com.ibm.micro.admin.BrokerDefinition;
import com.ibm.micro.admin.BrokerFactory;
import com.ibm.micro.admin.Client;
import com.ibm.micro.admin.LocalBroker;
import com.ibm.micro.admin.ResourceNotFoundException;
import com.ibm.micro.admin.Subscription;
import com.ibm.micro.admin.Trace;
/**
* Provides utility methods for servicing a local micro broker.
*
*
*/
public class BrokerServiceUtil {
private static Logger logger = Logger.getLogger(BrokerServiceUtil.class
.getName());
/**
* Creates a snapshot of the local micro broker.
*
* @return the trace file name
*/
public static String createSnapshot() {
try {
LocalBroker broker = isBrokerRunning();
if (broker != null) {
String snap = broker.getTrace().getStateSnapshotToFile(
broker.getDefinition().getDataDirectory());
logger.log(Level.FINE, "Micro broker snapshot created " + snap);
return snap;
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* Creates an FFDC for the local micro broker.
*/
public static void createFFDC() {
try {
LocalBroker broker = isBrokerRunning();
if (broker != null) {
broker.getTrace().performFFDC();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Creates a trace file for the local micro broker with TRACE_LEVEL_MAX.
*
* @return the trace file name
*/
public static String createTrace() {
return createTrace(Trace.TRACE_LEVEL_MAX);
}
/**
* Creates a trace file for the local micro broker.
*
* @param level
* the trace level
* @return the trace file name
*/
public static String createTrace(byte level) {
try {
LocalBroker broker = isBrokerRunning();
if (broker != null) {
broker.getTrace().setTraceLevel(level);
String trace = broker.getTrace().getTraceToFile(
broker.getDefinition().getDataDirectory());
logger.log(Level.FINE, "Micro broker trace created " + trace);
return trace;
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* Obtains a reference to the first local micro broker.
*
* @return the local micro broker
*/
public static LocalBroker getLocalBroker() {
try {
String[] brokers = BrokerFactory.INSTANCE.list();
logger.log(Level.FINE, "Found " + brokers.length
+ " micro brokers running");
if (brokers.length > 0 && brokers[0] != null) {
return BrokerFactory.INSTANCE.getByName(brokers[0]);
} else {
logger.log(Level.SEVERE, "Could not find a LocalBroker");
}
} catch (AdminException e) {
logger.log(Level.SEVERE, "Could not find a LocalBroker", e);
}
return null;
}
/**
* Validates if a local micro broker is found and if so whether it is
* running.
*
* @return the local micro broker if running; null if no broker is found or
* not running
*/
private static LocalBroker isBrokerRunning() {
LocalBroker broker = getLocalBroker();
if (broker != null) {
try {
if (!broker.isRunning()) {
logger.log(Level.SEVERE, broker.getName() + " not started");
return null;
}
} catch (Exception e) {
logger.log(Level.SEVERE, "Could not check LocalBroker ", e);
return null;
}
} else {
logger.log(Level.SEVERE, "No micro broker found");
return null;
}
return broker;
}
/**
* Resets the micro broker by stopping, renaming the broker's data
* directory, and restarting.
*/
public static void resetBroker() {
LocalBroker broker = getLocalBroker();
if (broker != null) {
try {
if (broker.isRunning()) {
broker.stop(false);
String data = broker.getDefinition().getDataDirectory();
File f = new File(data);
if (f.exists() && f.canWrite()) {
File temp = new File(data + "_"
+ Calendar.getInstance().getTimeInMillis());
f.renameTo(temp);
logger.log(Level.SEVERE, "Reset micro broker; backup "
+ temp.getName() + " created");
}
}
broker.start();
} catch (AdminException e) {
logger.log(Level.SEVERE, "Error resetting micro broker");
}
}
}
/**
* Sets the micro broker trace buffer to the number of lines. The default is
* 500 on micro broker start.
*
* @param lines
*/
public static void setTraceBuffer(int lines) {
LocalBroker broker = getLocalBroker();
if (broker != null) {
try {
broker.getDefinition().setTraceBufferSize(lines);
logger.log(Level.FINE, "micro broker trace buffer set to "
+ lines);
} catch (Exception e) {
logger.log(Level.SEVERE,
"Error setting micro broker trace buffer");
}
}
}
/**
* Retrieves a list of clients known to the micro broker.
*
* @return list of client identifiers (names) or empty if no known clients
*/
public static String[] getClients() {
LocalBroker broker = getLocalBroker();
if (broker != null) {
try {
Client[] clients = broker.getCommunications().getClients();
String[] list = new String[clients.length];
for (int i = 0; i < clients.length; i++) {
list[i] = clients[i].getName();
}
return list;
} catch (Exception e) {
logger.log(Level.SEVERE, "Error getting clients");
}
}
return new String[0];
}
/**
* Retrieves clients statistics in the order: Name, Connected,
* ConnectionTime, LastActivityTime, NumberOfInFlightMessages,
* NumberOfQueuedMessages, NumberOfSubscriptions, Protocol, and
* ProtocolVersion
*
* @param clientId
* the client ID to retrieve
* @return an array of stats or empty if the client was not found
*/
public static String[] getClientProperties(String clientId) {
LocalBroker broker = getLocalBroker();
if (broker != null) {
try {
Client client = broker.getCommunications().getClient(clientId);
if (client != null) {
return new String[] {
client.getName(),
String.valueOf(client.isConnected()),
client.getConnectionTime(),
client.getLastActivityTime(),
String
.valueOf(client
.getNumberOfInFlightMessages()),
String.valueOf(client.getNumberOfQueuedMessages()),
String.valueOf(client.getNumberOfSubscriptions()),
client.getProtocol(), client.getProtocolVersion() };
}
} catch (Exception e) {
logger.log(Level.SEVERE, "Error getting client " + clientId);
}
}
return new String[0];
}
/**
* Disconnects a client from the broker.
*
* @param clientId
* the client to disconnect
* @param flush
* flush any queued or in-doubt messages held in the broker for
* this client
* @return true if client disconnected
*/
public static boolean disconnect(String clientId, boolean flush) {
LocalBroker broker = getLocalBroker();
if (broker != null) {
try {
Client client = broker.getCommunications().getClient(clientId);
if (client != null) {
client.disconnect(flush);
try {
broker.getCommunications().getClient(clientId);
} catch (ResourceNotFoundException e) {
return true;
}
} else {
logger
.log(Level.SEVERE, "Error getting client "
+ clientId);
}
} catch (Exception e) {
logger.log(Level.SEVERE, "Error disconnecting client "
+ clientId);
}
}
return false;
}
/**
* Retrieves the list of topics currently subscribed by clients.
*
* @return the list of topics or empty if no subscriptions exist
*/
public static String[] getSubscriptions() {
LocalBroker broker = getLocalBroker();
if (broker != null) {
try {
Subscription[] subs = broker.getMessagingEngine()
.getAllSubscriptions();
String[] topics = new String[subs.length];
logger.log(Level.FINE, "Micro broker total subscription count "
+ subs.length);
for (int i = 0; i < subs.length; i++) {
logger.log(Level.FINE, subs[i].getClientID()
+ " subscribed to " + subs[i].getTopic());
topics[i] = subs[i].getTopic();
}
return topics;
} catch (Exception e) {
logger.log(Level.SEVERE, "Error getting subscriptions");
}
}
return new String[0];
}
/**
* Retrieves subscribers for a topic with the ordered information: ClientID,
* DurableName, QOS, Type.
*
* @param topic
* the topic to list subscribers
* @return a list of clients or empty if no subscribers exist
*/
public static String[][] getSubscription(String topic) {
LocalBroker broker = getLocalBroker();
if (broker != null) {
try {
Subscription[] subs = broker.getMessagingEngine()
.getSubscriptions(topic);
String[][] clients = new String[subs.length][4];
logger.log(Level.FINE, "Micro broker topic subscription count "
+ subs.length);
for (int i = 0; i < subs.length; i++) {
clients[i][0] = subs[i].getClientID();
clients[i][1] = subs[i].getDurableName() == null ? "null"
: subs[i].getDurableName();
clients[i][2] = String.valueOf(subs[i].getQOS());
clients[i][3] = subs[i].getType() == Subscription.CLIENT_TYPE_MICROBROKER_MQTT ? "MQTT"
: "JMS";
}
return clients;
} catch (Exception e) {
logger.log(Level.SEVERE, "Error getting subscriptions for "
+ topic);
}
}
return new String[0][0];
}
/**
* Deletes a subscription for a client on a given topic.
*
* @param clientId
* the client whose subscription is to be removed
* @param topic
* the topic subscribed
* @return true if successfully deleted
*/
public static boolean deleteSubscriber(String clientId, String topic) {
LocalBroker broker = getLocalBroker();
if (broker != null) {
try {
Subscription[] subs = broker.getMessagingEngine()
.getSubscriptions(topic);
for (Subscription s : subs) {
if (s.getClientID().equals(clientId)) {
s.delete();
return true;
}
}
logger.log(Level.WARNING, "Client " + clientId
+ " not subscribed to " + topic);
} catch (Exception e) {
logger.log(Level.SEVERE, "Error getting subscriptions");
}
}
return false;
}
/**
* Retrieves the local micro brokers properties with the ordered
* information: BytesReceived, BytesSent, MessagesReceived, MessagesSent,
* PubSubBytesReceived, PubSubBytesSent, PubSubMessagesReceived,
* PubSubMessagesSent, QueueBytesReceived, QueueBytesSent,
* QueueMessagesReceived, QueueMessagesSent
*
* @return list of broker usage properties
*/
public static String[] getBrokerUsageProperties() {
LocalBroker broker = getLocalBroker();
if (broker != null) {
try {
return new String[] {
String.valueOf(broker.getBytesReceived()),
String.valueOf(broker.getBytesSent()),
String.valueOf(broker.getMessagesReceived()),
String.valueOf(broker.getMessagesSent()),
String.valueOf(broker.getPubSubBytesReceived()),
String.valueOf(broker.getPubSubBytesSent()),
String.valueOf(broker.getPubSubMessagesReceived()),
String.valueOf(broker.getPubSubMessagesSent()),
String.valueOf(broker.getQueueBytesReceived()),
String.valueOf(broker.getQueueBytesSent()),
String.valueOf(broker.getQueueMessagesReceived()),
String.valueOf(broker.getQueueMessagesSent()) };
} catch (Exception e) {
logger.log(Level.SEVERE, "Error getting usage properties");
}
}
return new String[0];
}
/**
* Retrieves the local micro brokers properties with the ordered
* information: Name, Version, Port, DataDirectory, AdminUser,
* MaximumMessageSize, MaxNumberOfClients, MessageExpiryDefault,
* TimeStarted, Uptime
*
* @return list of broker properties
*/
public static String[] getBrokerProperties() {
LocalBroker broker = getLocalBroker();
if (broker != null) {
try {
BrokerDefinition def = broker.getDefinition();
return new String[] { def.getName(), broker.getVersion(),
String.valueOf(def.getPort()), def.getDataDirectory(),
def.getAdminUser(),
String.valueOf(def.getMaximumMessageSize()),
String.valueOf(def.getMaxNumberOfClients()),
String.valueOf(def.getMessageExpiryDefault()),
new Date(def.getTimeStarted()).toLocaleString(),
String.valueOf(broker.getUptime()) };
} catch (Exception e) {
logger.log(Level.SEVERE, "Error getting broker properties");
}
}
return new String[0];
}
}
|
|