There are a number of problems with Java serialization and numerous alternatives have been developed. But my focus here is on a particular use case, databases, and a single issue, performance.
Databases generally work with very large byte arrays. This is because seek time is very slow compared to the data transfer rate, so working with larger byte arrays often results in a performance gain. This is true for both hard disks and Solid State Disks (SSD). On the other hand, deserializing very large byte arrays is very CPU intensive. So Java databases optimize the size of the byte arrays they use to balance between the performance of the disk and the performance of the CPU. Fortunately the price of RAM has dropped significantly over the years so large memory caches can be used to hold the deserialized data, which reduces both the need to repeatedly read and deserialize the same data. Unfortunately there is still a need to frequently reserialize the updated data, which is also CPU intensive, and write it back to disk because of the transactional nature of many databases.
Significant performance gains are achieved by not using Java serialization, but working more closely with the data, reading and writing the binary form of integers, floats, strings and the like. This is not difficult to do, especially as most databases work only with well-defined tables where all the data in a given column is of the same type. But still, when reading or writing the data to disk, the entire byte array must be deserialized and subsequently reserialized. Working directly with the binary data is much faster than using Java serialization. But this is still a CPU-intensive process. And the irony is that many database transactions only access or update a miniscule amount of data.
Now in an ideal world, we would only deserialize data as needed, and then only reserialize the data that has changed. Doing this will mean that we can work with much larger byte arrays, resulting is an overall improvement in Java database performance. The data structures for doing this efficiently may be somewhat complex, but that should not be an issue so long as the API is reasonable. The term I use for this technology is JID, or Java Incremental Deserialization/reserialization.
laforge49 270005CXQG 2,176 Views
I see two kinds of actor models, the traditional actor model used by virtually all actor frameworks and the robust actor model used by JActor2 and Microsoft's Orleans project. But before we get into that, we need to talk just a little bit about systems like Python Twisted and Node.js. Both of these systems are event-based application platforms, where the events are processed by a single thread but with many services operating on separate threads. Both Twisted and Node.js deliver high-performance and, being largely single-threaded, provide implicit thread safety without the need for locks.
The key differentiator between actors and twisted/node.js is the extensive use of multi-threading in actor frameworks. All these frameworks work by message/event passing. But the use of events in a single threaded environment is proven technology. With actors, we do have the potential of making better use of today's multi-core processors, but single-threaded processes typically run faster than all but the most well-designed multi-threaded processes. And there is a significant down-side in the use of actors in terms of coupling arising from multiple event queues (one per actor) and out-of-order event processing.
The issue arises in part from the obligation of an actor not to block its thread, as actor platforms often run on a minimum number of threads and such blocking could easily lead to thread starvation of other actors and even deadlocks. Actors do not block when passing events to other actors, except for 2-way messages (request/response). The common actor model only supports replies, if it does so at all, by either blocking the actor's thread or by event filters.
What happens is that when one actor receives a request that requires consultation with other actors, it will temporarily select only events from the other actors it is consulting with until it receives the responses it needs to complete that initial request. Event processing then is not always in the order the events are received, the events being selected being based on the actor's state. This means that actors will often be coupled to other actors, with expectations as to how those other actors will behave. And coupling is fine, if it is well documented, isolated to small sub-systems, and the project is reasonably short-lived and/or reasonably stable.
The actor model used by both JActor and Orleans is a bit different. One of the key differences is that non-blocking 2-way messaging (request/response) is fully supported. Events are still supported, but are used mostly for things like notifications. And for 2-way messages, the actor's expectation is only that a response will be received within a reasonable time. And if an error occurs, then that error serves as the response.
With JActor/Orleans, responses are dispatched quite differently than events and requests. In JActor, a callback is provided when sending a request and that callback will be executed on the actor's thread. (Keeping in mind that the thread of an actor may change over time, but it only ever has one at most.) In contrast, any distinction in the common actor model between a request and a response must be done by the application itself. And to keep the application complexity reasonable, this generally means event filtering.
This is a really wonderful talk by someone who as worked with actors for a very long time. He makes a strong case for why message filtering can not reasonably be avoided when using the common actor model.
Here I explain the common actor model by way of an analog implementation that uses locks. My reason for doing this is to get people in a strong enough position that they can reason about actors. Actors are too often explained axiomatically, which leaves people with only a shallow understanding of actors.
laforge49 270005CXQG 1,738 Views
Recently posted on Code Plex: Microsoft's Orleans, now available as a preview. http://lnkd.in/dh4G-zq. Orleans is a validation of the actor model used by JActor2. For example, Orleans actors, like JActor actors, lack the failure modes of traditional actors and consequently do not have monitors. And really there is no good reason for actors to fail except that the traditional actor model makes it difficult to maintain systems of actors that do not experience deadlocks. More on this later.
laforge49 270005CXQG 2,008 Views
"An Object Oriented Model for Robust Multi-threaded Programming", which introduces JActo2, has been published by Java Magazine: http://javamag.org/developing-java-applications-issue-on-java-is-out/
laforge49 270005CXQG 1,632 Views
JActor2 is a multi-threaded OO programming model, inspired by Alan Kay's early thoughts on Objects. JActor2 is based on asynchronous 2-way messaging with assured responses. The net result being code that is both simpler and more robust, and hence easier to maintain.
JActor2 has been under development for a year. It has a simpler API and better documentation than JActor.
For more information, see here.
laforge49 270005CXQG 1,832 Views
To run JASocket you will need the JAR files for compatible versions of JActor, JID, JASocket and JFile. Download these projects from here and extract the jar files to a common directory, in our case c:\jaconfig. You will also need jar files from slf4j, sshd, joda-time and jline.
Next you need a few shell scripts. Here are some which work with windows:
Your directory should now look something like this:
The basic configuration of JAConfig is handled by JACNode. If you are familiar with JASocket, this code should be largely self explanatory:
Open a command window, go to the directory containing the jar files (you will need to be able to write to this directory) and enter the node command:
Now log in with the name admin and password admin.
Enter help to see the list of commands. There is nothing new here, as the commands are all implemented in JASocket:
Now we can use the localServers to see what is running on the local node:
We see that the config, kingmaker, quorum and ranker servers were started by node (class JACNode), while hostManager was started by kingmaker.
Authentication of operator passwords is handled by the config server, so a good starting point is to see what commands this server supports:
The very first thing we should do is to change the password for admin, and then test it.
The config database now holds the new admin password.
Accounts for operators are created just by assigning a password. Any operator can do this, but you will need the admin password. Note however that there is no difference between creating an operator account and changing another operator's password.
A quorum requires the participation of N/2+1 hosts. So with one host, you need one host to achieve a quorum. With two hosts you need both hosts running to achieve. But with three hosts, only two hosts are needed. Fault tolerance then is only achieved when running with 3 or more hosts.
The quorum server tracks the number of hosts that are available and gets the total host count from the config database--which must be manually assigned.
As soon as the total host count is set to 1 we achieve a quorum. The kingmaker, which was listening to the quorum server for a quorum notification, immediately starts up the cluster manager server.
Setting the totalHostCount back to 2 means we loose quorum and clusterManager stops running:
JAConfig has an alternate implementation of SSH server that subclasses HostServer. This allows us to use the host manager to run one copy of the SSH server on every host.
The port number used by the SSH server, 8889, is specified in the assignment to hostManager.ssh.
Multiple Nodes per Host
Starting a second node on the same host, we need an unused port. But when specifying the port, a console is not opened.
When more than one node runs on the same host, only one instance of the host manager is run, as well as only one instance of each host server.
Running a node on another host establishes quorum. In this case we will explicitly specify a port of 8880 to prevent the opening of another console:
The new host runs an instance of host manager, and an instance of each of the host servers. A single instance of the cluster manager is now running in the cluster.
As we define new cluster servers, one instance of each is run on the cluster. The nodes chosen to run the new servers are those least loaded.
Taking down the second node on the first host, the load has been rebalanced between the two remaining nodes.
The JASocket package makes it easy to create distributed, scalable software. JAConfig, in turn, makes it easy to manage in production.
The Config DB
JAConfig provides a fully replicated non-transactional, eventually consistent, key/value pair database for maintaining both configuration data and operator passwords. The database also provides change notifications, so servers can react to configuration changes. Every node in the cluster has a copy of this database both on disk and in memory, ensuring that the database is fully robust and supports fast queries. And there is neither a separate log file nor any need for a recovery mechanism--on startup, if the database is not valid its contents are discarded.
The underlying assumption of the database is that changes are infrequent, and that the system clocks of all the nodes in the cluster all have roughly the same time. Key/value pairs in the database always carry the timestamp of when the last change was made. Changes are propagated across all nodes in the cluster and shared when a node connects to another node. For each key, the change with the latest timestamp is retained.
The database is peer-based. So there is no single point of failure. And because there is no master copy, there are no warm or hot backups and fallover time is effectively 0. On the flip side, status information is completely out of scope, as frequent updates will break the underlying assumptions.
A cluster can be split into 2 or more smaller clusters by something as simple as a loose cable. If these smaller clusters act independently, inconsistent results can occur. This is managed by knowing the total number of host computers in the cluster and only allowing some activities to occur the the number of hosts currently connected to a given cluster is equal to or grater than (totalNumberOfHosts / 2) + 1. Clusters connected to this number of hosts have what is called a quorum and as there can not be two sub-clusters with a quorum of hosts, at most only one sub-cluster will be active.
Note that we are talking about a quorum of hosts rather than a quorum of nodes. Multiple nodes can run on each host and indeed there may be a large host which runs many of the nodes in a cluster. So if the quorum was based on nodes, it is possible that all the nodes of the quorum are running on the same host, which creates a single point of failure.
Cluster and Host Server Managers
At this time there are two types of server managers, the Cluster Manager and the Host Manager. These managers are started (and monitored) by a kingmaker server, which runs in every node. The kingmaker servers are responsible for having one cluster manager running in the cluster and one host manager running on every host.
The cluster manager uses the data in the config database to start and monitor a number of cluster servers, where each type of cluster server has only a single instance running somewhere in the cluster. The cluster manager and all the cluster servers stop running when the node is not a part of the active cluster (a cluster with a quorum of hosts).
Similarly, the host managers use the data in the config database to start and monitor a number of host servers, where each type of host server has only a single instance running on each host. Unlike the cluster manager and servers, the host manager and servers are unaffected by quorum considerations.
The cluster and host managers use a server named ranker to determine which node to use when starting a server. A simple ranker is provided which provides a list of nodes ordered by the number of servers running on each node. Alternative ranker implementations can be used as they are developed.
The HelloWorld class implements a very simple Server:
The serverName method returns the name of the server, which is published to all the nodes in the cluster.
The startServer method is called to start the server. In the case of the HelloWorld server, this method defines a server command, hi, and then performs the default server initialization.
Finally, the main method has been included to show how to run a node and start an initial server by calling the Node.startup method.
ServerCommand Base Class
Server commands, like HelloWorld's hi command, extend the ServerCommand class:
Every server command has a name, a description, and an eval method.
Command arguments are passed to the eval method as a String, which may be empty.
Command output is created using the out.print method, and the out object is returned as a response by calling the rp.processResponse method.
Some commands need to process user interrupts (^C) so that they can deliver partial results when one of the nodes is slow to respond. These commands subclass InterruptableServerCommand:
Server Base Class
All servers extend the Server class:
The getOperatorName method returns the name of the operator [or server] which started the server.
The runTime method returns the length of time the server has been running.
The startupArgs method returns the args string passed to the startup method.
The serverName method provides a default name for the server--the full class name. This method can be overridden to provide a more user-friendly name.
The node method returns the Node object.
The agentChannelManager method returns the AgentChannelManager, which provides an API for accessing other servers both on the same node and on other nodes.
The registerServerCommand method is used to register server commands.
The startup method is used to initialize and run the server. The MailboxFactory.addClosable method is called to ensure that the server's close method is called when the node is halted. A RegisterServer request is also sent to register the server with the local node and to publish the server's name with all the nodes in the cluster.
The startup method then calls the startServer method, which registers the help and shutdown commands. This method is overridden when the server supports additional commands or needs to perform additional initialization to start running the server.
The close command is called when the node is halting gracefully and when the server is being shutdown. In many cases this method must be overridden to close files or sockets and halt any ongoing processes.
Server command requests are passed to the server by a EvalServerCommand request, which in turn calls the evalServerCommand method.
The serverUserInterrupt method is called when a user interrupt is passed to a server. This method then forwards the interrupt to the server command.
The resisterShutdownCommand and registerHelpCommand methods define and register the shutdown and help commands respectively.
Finally, methods are provided for interacting with the operator which invoked a server command.
JASocket contains a number of commands, some are for there as an aid in managing the cluster and others are there to illustrate how they work. Here we look at the implementation of some of those commands to aid you in the implementation of your own.
The toAgent command is of particular interest as it is used to send commands to other nodes. The arg string consists of the node address (or resource name), the name of another command and [optionally] the arg string of that other command. This command removes the address from its arg string and creates an EvalAgent initialized with the remainder of the arg string. The EvalAgent is then shipped to the designated node.
This command shuts down the node. This is especially interesting when used with the to command, as it causes the channel to the remote node to be halted while a result is pending.
The exception command just raises an exception to show how an exception is handled.
The channels command lists the accessible remote nodes.
The servers command lists the names of the accessible servers for all the nodes in the cluster.
The localServers command provides information about all the servers running on a node.
The latencyTest command measures the time it takes to send a KeepAliveAgent to another node and get a response. This command has an optional argument--the number of times the request/response is to be performed.
The throughputTest command measure how quickly a number of messages can be sent to another node and get their responses.
The help command lists all the commands with a brief description of each command.
The startup command is used to start a server, given the full server class name and any arguments needed by that server.
The server command is used to send a command string to the named server.
The pause command simply completes after a number of seconds. It is implemented using a TimerTask and a Continuation.
The write command is used to send a message to another operator that is logged into the node where this command is run. The operator may be logged in via ConsoleApp or via SSH.
BroadcastAgent and BroadcasterAgent
The broadcast comman sends a message to all operators logged in on any node in the cluster.
WhoAgent and WhoerAgent
The who command lists every operator logged in on any node in the cluster. The display includes the operator name, node where the operator is logged in, how long the operator has been logged in, the number of commands entered and how long it has been since the last command.