Hello!
I write a lot of code that needs to execute periodically. For instance, I’ve written services that need to check a remote file system for changes and import those changes once detected.
I’ve also written quite a few services that need to send information over a message bus periodically to, say, report status information or provide information about a long running process, or update the status of a given resource (such as whether or not a database is up).
Certainly, with Java 1.5 and its built in java.util.concurrent libraries, a lot of this sort of thing has become easier, but it’s always a good idea to make it easier still and, more importantly for me, much more readable.
For instance, given a class that does something useful, a timer (or clock, as I call it) should be super easy, something like (in Groovy):
class Monitor {
def clock
Monitor() {
clock = new Clock(120, this.&execute)
}
def execute() {
println "doing something useful every 2 minutes"
}
def start() {
clock.start()
}
def stop() {
clock.stop()
}
}
When the Monitor is instantiated, it instantiates a Clock object which takes the number of seconds to wait between ticks, and a closure, which gets called whenever the clock ticks. The idea is that all the machinery for starting, stopping, and maintaining the life-cycle of a periodic timer is encapsulated in the clock class, including all those complicated Java factory, object-wrapping-other-object stuff that is un-Groovy-like, and, frankly, makes code so unreadble.
Another use case for this kind of thing is a module in your application that fires events at regularly scheduled intervals. With the Clock object, you could separate that concern from your various worker modules, and implement the concern in one place as simply as:
class Events {
def clocks = []
def fireDatabaseCheck() {
Notifications.fire(Notifications.DB_CHECK)
}
def fireStatusUpdate() {
Notifications.fire(Notifications.STATUS_UPDATE)
}
def start() {
clocks = [
new Clock(120, this.&fireDatabaseCheck),
new Clock(30, this.&fireStatusUpdate)
]
}
def stop() {
clocks.each { clock ->
clock.stop()
}
}
}
You’ll have to imagine a Notification class used by other long-lived objects to subscribe to various events. And let’s hope that the Notification class somehow manages concurrency such that messages delivered to subscribers are not held-up if the subscribers take a long time to handle the messages. Oy. Well, I think I have a solution for that problem, but I’ll leave that for another blog entry.
The advantage (arguably) to the Events class is that you have all your timed events in one place so that maintainers can easily find it, and once found, can be assured that this one file contains everything they need to know about when events are generated. Regardless, the use of the Clock object helps make the code as clear as possible under the circumstances.
The end of this entry has the complete code, but I’ll take you through some bits and pieces just for fun. Please note that this all works in Java just was well as Groovy, though you’ll have to implement some interfaces and anonymous classes to emulate Groovy’s true closures.
Implementing an actual scheduled task using the java.util.concurrent library is fairly straight forward:
import java.util.concurrent.*
import java.util.concurrent.TimeUnit.*
def scheduler = Executors.newSingleThreadScheduledExecutor()
def task = scheduler.scheduleAtFixedRate(new Runnable(),
0, 120, TimeUnit.seconds)
All you have to do is acquire a scheduled thread executor from the library then create a task, which is just an instance that implements the Runnable interface. The parameters are typical: the number of time units to wait before the first run (in this case, 0), the number of time units between ticks (120, here), and the time unit itself.
In Groovy, all closures implement the Runnable interface, so you can just use a closure rather than an object instance:
def start = 0
def interval = 120
def units = TimeUnit.seconds
def closure = { println "do something " }
def task = scheduler.scheduleAtFixedRate(closure, start, interval, units)
There’s a problem, here, though, which is that if the closure throws an exception, it’ll get swallowed. You’ll never know it happened unless you capture a Throwable, or add an exception handler to the thread (which I won’t show here).
With Groovy, you can use closures to simulate new control structures to help with things like these. For instance:
def safe(Closure closure) {
return {
try {
closure()
}
catch (Throwable t) {
log.info "thread terminated: $t"
}
}
}
What this interesting bit of code does is return a closure, which wraps a closure in a try/catch block. It’s a function that takes a function, and returns another function. I know that sounds confusing, but it’s a really nice thing to do in a language that supports functions as first class entities.
What the above enables you to do is something like:
def closure = safe { println "do something" }
The closure variable will be a closure that wraps another closure in a try/catch block. The try catch block does nothing other than log the error. You can imagine adding some functionality that will enable you to recover from certain error conditions, but I’ve found that for the most part, you really can’t as far as periodic events go. You log the error, and then just die until it’s time for another attempt. Presumably the closure you’re using as a scheduled task will know how to report errors to other parts of your application.
And any rate, what you end up with is:
def start = 0
def interval = 120
def units = TimeUnit.SECONDS
def closure = safe { println "do something " }
def task = scheduler.scheduleAtFixedRate(closure, start, interval, units)
Or:
def task = scheduler.scheduleAtFixedRate(
safe { closure }, 0, 120, TimeUnit.SECONDS)
The iffy-ist (technical term) part of this whole proposition is stopping the scheduled task. I’ve come up with the following:
if (task)
task.cancel(true)
if (scheduler) {
def tasks = scheduler.shutdownNow()
tasks.each { task ->
task.cancel(true)
}
}
This all seems reasonable, but the problem is the task itself. If it’s doing something like talking to an SFTP server, or hanging on to an HTTP connection, or writing to a database, the tasks won’t really cancel. Or do they? I’ve never gotten consistent results, and in fact, have given up trying too hard for a generic solution. (Again, I’d rather just move to a better platform for concurrent processing.)
Assuming, though, that the tasks themselves properly terminate via other means, such as setting a shared variable, as in def quitNow = true, then I think the above is sufficient.
I’ve developed a Worker class which at least helps with this sort of thing. The Worker class takes a closure and manages its life-cycle. It’s great for using as a task in the Clock class.
An interesting modification of the Clock class might be to provide not only a closure that gets called when work needs to be done, but another closure that gets called when work needs to be interrupted.
Regardless, please know that the code presented here is in no way full proof against the host of concurrency problems any Java based technology is heir to. What I hope it does is provide a way to get something working in your application more quickly than you might otherwise have done, or better yet, spurred your own “simplifying” thoughts for working with this sort of issue.
Personally, I cut/paste this code into every application I write that needs this sort of thing, and then tweak it to fit the specifics of that app. For me, this is much, much easier than developing a far more comprehensive and flexible library with dozens of classes and tens of ways to modify behavior via interfaces, overrides, injections, subclassing, delegation, and so on. Sounds great: way too hard to maintain.
So, here’s the full code for the Clock class:
package com.zentrope.lib
import java.util.concurrent.*
import java.util.concurrent.TimeUnit.*
import org.apache.log4j.*
class Clock {
static log = Logger.getLogger(Clock.class)
def scheduler = null
def task = null
def seconds = 0
def timeUnit = TimeUnit.SECONDS // no minutes in Java 1.5
def closure = null
Clock(seconds, Closure closure) {
this.seconds = seconds
this.closure = closure
}
def safe(function) {
return {
try {
function()
}
catch (Throwable t) {
log.info "thread terminated: $t"
}
}
}
def start() {
scheduler = Executors.newSingleThreadScheduledExecutor()
task = scheduler.scheduleAtFixedRate(safe { closure() },
0, seconds, timeUnit)
}
def stop() {
if (task)
task.cancel(true)
if (scheduler) {
def tasks = scheduler.shutdownNow()
tasks.each { task ->
task.cancel(true)
}
}
}
}
Caveats (and just to repeat yet again):
As implemented above, the closure called by the scheduled thread might block, or take a long time to run, or spawn its own thread and return immediately. As will all things concurrent in Java, you have to pay close attention to these issues or they’ll bite. Chances are, they’ll bite you anyway. That’s why I prefer an Actors metaphor, and thus Erlang.
Technorati Tags: groovy, java, concurrency, utility