Initializing Flux in Java Code to Support Runtime Configuration Properties
Introduction
Often times when our customers design workflows, the workflows act as templates and read in different values based on the location of the workflow. For example, let’s say I’ve created a workflow to automate file transfers for a customer, such as “ACME Bank”. The same workflow could likely be used to service all of my customers but with variables whose values are different based on which customer the workflow template is servicing. So, if I run the workflow template for ACME Bank, it may pull files from acmebank.com but if I run the workflow template for Wile Bank, it would put files from wilebank.com. We’ll look at how to setup a Flux environment to support this sort of flexible configuration to reduce overhead in maintaining workflows for many customers. You can easily reduce 200+ workflows down to one workflow using this technique as I’ve witnessed which drastically simplifies your Flux setup.
Initializing Flux
First, let’s create a Java class that will be responsible for initializing the Flux environment.
FluxController.java
import flux.Configuration;
import flux.Engine;
import flux.Factory;
import flux.runtimeconfiguration.RuntimeConfigurationNode;
public class FluxController {
private static Factory factory = Factory.makeInstance();
private static Engine engine;
private String enginePropertiesFilename = "engine.properties";
private String runtimePropertiesFilename = "runtime.properties";
public FluxController() throws Exception {
initializeEngine();
}
public Engine initializeEngine() throws Exception {
System.out.println("Flux version: " + flux.Factory.getVersion());
System.out.println("Copyright 2000-2011 Flux Corporation. All rights reserved.\n");
System.out.println("Creating Flux engine using properties file: " + enginePropertiesFilename + "...");
Configuration engineConfiguration = factory.makeConfigurationFromProperties(enginePropertiesFilename);
System.out.println("Flux engine created!");
engineConfiguration.setRuntimeConfiguration(loadRuntimeConfiguration());
engine = factory.makeEngine(engineConfiguration);
engine.start();
return engine;
}
private RuntimeConfigurationNode loadRuntimeConfiguration() throws Exception {
return new RuntimeConfigurationLoader(runtimePropertiesFilename).getRuntimeConfigurationRootNode();
}
public static Engine getEngine() {
return engine;
}
public static void disposeEngine() throws Exception {
if (engine != null) {
System.out.println("Disposing Flux engine...");
engine.dispose();
System.out.println("Flux engine is disposed!");
}
}
public static void main(String[] args) throws Exception {
new FluxController();
}
}
Loading Runtime Configuration Properties
The FluxController class above relies on RuntimeConfigurationLoader to read in the custom runtime configuration properties, or variables, that we’ll use in our workflow templates. Here’s what it looks like.
RuntimeConfigurationLoader
import flux.Factory;
import flux.runtimeconfiguration.RuntimeConfigurationNode;
import flux.xml.XmlFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.InputStream;
import java.util.Enumeration;
import java.util.List;
import java.util.Properties;
import java.util.StringTokenizer;
public class RuntimeConfigurationLoader {
private RuntimeConfigurationNode runtimeConfigurationRootNode = null;
public RuntimeConfigurationLoader(String filename) throws Exception {
Properties runtimeProperties = new Properties();
runtimeProperties.load(new FileReader(filename));
runtimeConfigurationRootNode = Factory.makeInstance().makeRuntimeConfigurationFactory().makeRuntimeConfiguration();
Enumeration names = runtimeProperties.propertyNames();
while (names.hasMoreElements()) {
String branch = (String) names.nextElement();
StringTokenizer st = new StringTokenizer(branch, "/");
int count = st.countTokens();
RuntimeConfigurationNode currentNode = runtimeConfigurationRootNode;
while (st.hasMoreElements()) {
String name = (String) st.nextElement();
if (count > 1) {
RuntimeConfigurationNode node = currentNode.getChild(name);
if (node == null) {
currentNode = currentNode.makeChild(name);
} else {
currentNode = node;
}
} else {
Object obj = null;
String value = runtimeProperties.getProperty(branch, "");
if (name.equalsIgnoreCase(RuntimeConfigurationNode.DEFAULT_FLOW_CHART_ERROR_HANDLER)) {
obj = loadFlowChart(value);
//} else if (name.equalsIgnoreCase(RuntimeConfigurationNode.CONCURRENCY_THROTTLE)) {
// Handle standard Flux runtime configuration properties here
} else {
obj = value;
}
if (obj != null) {
currentNode.put(name, obj);
} else {
System.out.println("Failed to load runtime configuration property: " + name);
}
}
count--;
}
}
}
public RuntimeConfigurationNode getRuntimeConfigurationRootNode() {
return runtimeConfigurationRootNode;
}
private Object loadFlowChart(String urlPath) throws Exception {
List flowCharts;
InputStream fileStream = new FileInputStream(new File(urlPath));
// Or load resource from class path
// InputStream fileStream = ClassLoader.getSystemResourceAsStream(resourceName);
try {
flowCharts = XmlFactory.makeInstance().makeXmlEngineHelper().makeFlowChartsFromXml(fileStream, false);
} finally {
fileStream.close();
}
if (flowCharts.size() > 0) {
return flowCharts.get(0);
} // if
return null;
}
}
Runtime Configuration Properties
Workflows can have variables defined that are only accessible within the workflow itself (flow chart variables). We can also define runtime configuration properties that all workflows can access or isolate the runtime configuration properties so they can be easily accessed within the context of a workflow. In the example mentioned in the introduction, we may want to define some custom properties like: /ACME Bank/username=acmeuser and /Wile Bank/username=wileuser. When we run the same workflow template for these different customers, the correct value will be used. Let’s see what the configuration file format would look like to achieve something like this.
runtime.properties
/businessUnit1/customConfigProperty=testValue1 /businessUnit2/customConfigProperty=testValue2
Test it!
No Java code would be complete without a test that we can automate! Let’s make sure we can initialize Flux, access our custom configuration properties, and shutdown Flux.
import flux.Engine;
import flux.runtimeconfiguration.RuntimeConfigurationNode;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class RuntimeConfigurationLoaderTest {
@Test
public void testLoad() throws Exception {
new FluxController();
Engine engine = FluxController.getEngine();
assertTrue(engine.isRunning());
RuntimeConfigurationNode businessUnitConfigNode1 = engine.getRuntimeConfiguration().getChild("businessUnit1");
assertNotNull(businessUnitConfigNode1);
Object customConfigProperty1 = businessUnitConfigNode1.getProperty("customConfigProperty");
assertNotNull(customConfigProperty1);
assertEquals("testValue1", customConfigProperty1.toString());
RuntimeConfigurationNode businessUnitConfigNode2 = engine.getRuntimeConfiguration().getChild("businessUnit2");
assertNotNull(businessUnitConfigNode2);
Object customConfigProperty2 = businessUnitConfigNode2.getProperty("customConfigProperty");
assertNotNull(customConfigProperty2);
assertEquals("testValue2", customConfigProperty2.toString());
FluxController.disposeEngine();
assertTrue(engine.isDisposed());
}
}
Conclusion
Now we can simply edit one of our workflows and change static values to: ${runtime customConfigProperty}. If the workflow runs under “/businessUnit1″, the value “testValue1″ would be used but when the same workflow runs under “/businessUnit2″, the value “testValue2″ would be used. Pretty handy, eh?
Flux Unit (functional, integration: make sure it works) Testing
Introduction
We take testing very seriously here @ Flux. Testing complex enterprise systems is not for the faint of heart as it can become very complicated in short order. Designing unit tests, functional tests, integration tests, user interface tests, and in-container tests (inside of enterprise application containers or a simple servlet container) are enough to make your head spin; much less keeping the systems up and running (not many QA teams have dedicated System and Database Administrators).
You may have landed on this blog post because you are considering building some automated tests to ensure that your enterprise software which uses Flux is up-to-snuff and enterprise-ready. No worries. Let’s keep it simple. We’ll start out by looking out some Java code which can be used as a starting point for your Flux related tests. The test code that we’ll consider today will cover unit, functional, and integration tests (as I like to call them: make sure it works tests). We’ll save in-container testing and user interface testing for another day.
Testing Environment
What’s required in a Flux testing environment? We’ll use JUnit (http://www.junit.org/), Apache log4j (http://logging.apache.org/log4j/), and of course Flux (http://fluxcorp.com/). You can use Flux with a lightweight in-memory database (H2: http://www.h2database.com/) or you can configure Flux with the database that you’ll use in production (or to test with multiple databases if you are an OEM embedding Flux in an application that supports multiple databases such as Oracle, SQL Server, DB2, MySQL, or PostgreSQL).
Abstract Flux Test
Let’s start by creating an abstract test which can be extended to create a Flux test for your application. Let’s call this AbstractFluxTest.java (a very sensible name). This abstract test will take care of initializing Flux, setting up the test, cleaning up after the test, and provide some convenience methods for things like loading flow charts from XML files (.FFC) and verifying that Flux flow charts and actions executed as expected.
AbstractFluxTest.java
import flux.*;
import flux.xml.XmlEngineHelper;
import flux.xml.XmlFactory;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import java.io.FileInputStream;
import java.util.Date;
import java.util.List;
public abstract class AbstractFluxTest {
protected Factory factory = Factory.makeInstance();
protected EngineHelper engineHelper = factory.makeEngineHelper();
protected Engine engine;
protected String enginePropertiesFile = "engine.properties";
protected boolean clearEngine = false;
protected Date startTime;
public static boolean fired = false;
public static final Logger log = Logger.getLogger("flux-test");
@Before
public void setUpTest() throws Exception {
log.info("[START] AbstractFluxTest.setUpTest");
startTime = new Date();
engine = factory.makeEngine(enginePropertiesFile);
if (clearEngine) {
engine.clear();
}
engine.start();
log.info("[END] AbstractFluxTest.setUpTest");
}
@After
public void tearDownTest() throws Exception {
log.info("[START] AbstractFluxTest.tearDownTest");
fired = false;
engine.dispose();
log.info("[END] AbstractFluxTest.tearDownTest");
}
@SuppressWarnings("unchecked")
protected FlowChart readFlowChartFFC(String fileName) throws Exception {
XmlEngineHelper xmlHelper = XmlFactory.makeInstance().makeXmlEngineHelper();
FileInputStream fileInputStream = new FileInputStream(fileName);
try {
List<FlowChart> flowCharts = xmlHelper.makeFlowChartsFromXml(fileInputStream, false);
return flowCharts.get(0);
} finally {
fileInputStream.close();
}
}
protected FlowChart readFlowChartFFCOnClassPath(String filename) throws Exception {
return readFlowChartFFC(this.getClass().getClassLoader().getResource(filename).getPath());
}
protected boolean didActionExecute(String namespace, String actionName) throws Exception {
log.info("didActionExecute namespace: " + namespace + " actionName: " + actionName);
ActionHistoryIterator it = engine.getActionHistory(namespace, actionName, startTime, new Date());
try {
return it.next() != null;
} catch (Exception e) {
return false;
} finally {
it.close();
}
}
public long waitForRuns(String namespace, int minimumExpectedRunCount, int delayInSeconds, int timeoutInSeconds) throws Exception {
long runCount = 0;
long totalTime = 0;
int delayInMillis = delayInSeconds * 1000;
String initialMsg = "waitForRuns namespace:" + namespace + " minimumExpectedRunCount: " + minimumExpectedRunCount + " delayInSeconds: " + delayInSeconds +
" timeoutInSeconds: " + timeoutInSeconds;
log.info(initialMsg);
while (runCount < minimumExpectedRunCount && totalTime < timeoutInSeconds) {
runCount = engine.getRunCount(namespace);
log.info(initialMsg + " runCount: " + runCount + " totalTime: " + totalTime);
if (runCount < minimumExpectedRunCount) {
Thread.sleep(delayInMillis);
totalTime += delayInSeconds;
}
}
return runCount;
}
}
Flux Engine Configuration
AbstractFluxTest loads the Flux engine configuration from the file: engine.properties. You can add more configuration properties to match your production environment. Here’s the recommended minimum configuration for a testing environment:
engine.properties
database_type=H2 logger_type=log4j
Apache log4j Configuration
I’ve decided to use Apache log4j for logging in these tests. I’ve chosen log4j for two reasons: it’s popular and Flux can tie into log4j (you can configure Flux and your test code to log to the same or different files and some other crazy things). Here’s a simple log4j configuration that will log the test messages (logged via log.info() in the test code) to both the console and the flux-test.log file (where the Flux engine is configured to log to as well via the engine.properties file: logger_type=log4j).
log4j.rootLogger=INFO,FileAppender log4j.logger.flux-test=INFO,ConsoleAppender log4j.appender.ConsoleAppender.Threshold=INFO log4j.appender.ConsoleAppender=org.apache.log4j.ConsoleAppender log4j.appender.ConsoleAppender.layout=org.apache.log4j.PatternLayout log4j.appender.ConsoleAppender.layout.ConversionPattern=%d %-4r %-5p [%t] %10c %3x - %m%n\ log4j.appender.FileAppender=org.apache.log4j.RollingFileAppender log4j.appender.FileAppender.MaxFileSize=50MB log4j.appender.FileAppender.MaxBackupIndex=10 log4j.appender.FileAppender.File=flux-test.log log4j.appender.FileAppender.layout=org.apache.log4j.PatternLayout log4j.appender.FileAppender.layout.ConversionPattern=%d %-4r %-5p [%t] %15c %3x - %m%n\
Sample Flux Test
Now that we have the test framework in place, let’s take a look at a simple Flux test.
FluxTest.java
import flux.FlowChart;
import flux.JavaAction;
import flux.TimerTrigger;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
public class FluxTest extends AbstractFluxTest {
@Test
public void testFFCFile() throws Exception {
String namespace = "/FluxFlowChart"; log.info("[START] FluxTest.testFFCFile");
FlowChart flowChart = readFlowChartFFC("flowCharts/FluxFlowChart.ffc");
flowChart.setName(namespace);
engine.put(flowChart);
waitForRuns(namespace, 1, 10, 120);
didActionExecute("/FluxFlowChart", "Timer Trigger");
didActionExecute("/FluxFlowChart", "Java Action");
assertTrue(fired);
log.info("[END] FluxTest.testFFCFile");
}
@Test
public void testJavaAPI() throws Exception {
log.info("[START] FluxTest.testJavaAPI");
String namespace = "/FluxFlowChart";
FlowChart flowChart = engineHelper.makeFlowChart(namespace);
TimerTrigger timerTrigger = flowChart.makeTimerTrigger("Timer Trigger");
timerTrigger.setTimeExpression("+1m");
JavaAction javaAction = flowChart.makeJavaAction("Java Action");
javaAction.setListener(FluxTestListener.class);
timerTrigger.addFlow(javaAction);
engine.put(flowChart);
waitForRuns(namespace, 1, 10, 120);
didActionExecute("/FluxFlowChart", "Timer Trigger");
didActionExecute("/FluxFlowChart", "Java Action");
assertTrue(fired);
log.info("[END] FluxTest.testJavaAPI");
}
}
FluxTest.java provides to tests: one for testing a flow chart which was designed using the visual Flux flow chart designer and another that creates the flow chart to test using the Flux Java API. In testFFCFile(), we load the flow chart from the file FluxFlowChart.ffc, ensure that the name is set correctly, and verify that the actions and Java code executed as expected.
Here’s what the flow chart looks like that is loaded from FluxFlowChart.ffc:

Flux Test Listener
Many Flux customers use Java Action’s in their flow charts. A Java Action allows flow charts (which are often designed visually by connecting various components together with conditions) to execute some Java code. in FluxTest above, we designed a flow chart that executes FluxTestListener. For illustration purposes, this listener code is designed to update a variable in the test case which can be asserted via JUnit after the flow chart completes. This illustrates how we can ensure that the actual Java code executed as anticipated.
FluxTestListener.java
import flux.ActionListener;
import flux.KeyFlowContext;
public class FluxTestListener implements ActionListener {
public Object actionFired(KeyFlowContext flowContext) throws Exception {
FluxTest.fired = true;
flowContext.getLogger().info("FluxTestListener.actionFired!");
return null;
}
}
Conclusion
Well, that should give you enough to get you started testing with Flux. I’ll follow-up with some more Flux test related blog posts (maybe detecting errors from the Flux engine via Apache log4j will be next). If you have questions about testing your Flux based software or would like more code examples, just reach out to Flux Support and we’ll be sure to satisfy your Flux testing appetite. Have fun!
Flux Mobile on iPhone, iPad, Android, and Blackberry!
Flux operators are responsible for overseeing the automated jobs in their environment. These jobs often run during non-working hours. Having quick access to system and job status from mobile devices can be really beneficial for some operators.
Specific applications can be developed quickly to allow these operators to administer their jobs from mobile devices. I developed a mobile application that uses the backend REST API in about 2 hours. The REST API is accessible in Flux 7.10 (since the new Flux Operations Console is based on the REST backend) and will be fully documented with examples in Flux 7.11.
Here’s a quick video showing “Flux Mobile” running on Flux 7.10:
Are you interested in viewing the status of your Flux jobs from a mobile device? I’d love to hear from you!
Data Warehouse Automation
“We’re trying to reduce our 2 a.m. calls, like everyone else.” That’s what I heard when a CIO came to us looking to solve problems in his data center. He didn’t sound excited about it. But, he knew that if they could find solutions to common problems they’ve encountered, that there could be a tremendous positive impact on the organization.
How often are your data center operators called in to rescue you from a bad situation? Can these situations be avoided to reduce these calls? Data warehouse automation is an area where workload automation can truly shine.
- Did the data arrive from the customer as expected? If not, does the customer need to be notified?
- Was the data intact and valid?
- Is the ETL process started as soon as the data is available?
- Could the data be extracted, transformed, and loaded without any errors?
- If there are errors, are they resolved automatically with proper logging, auditing, notifications, and reporting?
- Does the entire process need to start over when an error occurs, or can parts of the process be restarted to increase throughput?
- Is report generation started as soon as the data is available?
- Are reporting errors handled automatically?
- Is the report sent to recipient as soon as it’s available?
- Are multiple delivery mechanisms supported, such as: FTP, FTPS, SFTP, UNC, JMS, etc?
Are you looking to solve some of these problems with your data warehouse automation solution? Great! I’ve got some news for you! Flux 7.11 will include Cognos integration straight out of the box! We’re in the preliminary stages of planning support for Informatica as well. With support planned for Informatica and Cognos, Flux should be on your radar.
This ain’t your grandpa’s job scheduler!
We’ve been working on redesigning and rewriting the Flux Operations Console from the ground up. It’s designed to be responsive and scale well to provide you with the latest runtime stats on your distributed enterprise jobs in one central webapp.
Here’s a quick video (1 min 26 secs) that illustrates the performance and scalability of the new Ops Console:
Flux Operations Console Performance Video
This video was taken when around 6,000 flow charts were running in the cluster and a background thread was adding flow charts constantly. Google Chrome on Mac was used for the browser.
Still using cron, Autosys, CONTROL-M, Quartz? Does the new Flux Ops Console make you jealous?
Interactive Flux Architecture Diagram
We’re working on an interactive Flux architecture diagram that will be included in the next version of the Flux Operations Console. This diagram is displayed to first time Flux users to help them understand the Flux components and how they relate to each other.
You can take a peek at the diagram here:
http://screencast.com/t/YTljNTU5Zj
PS – Have no fear iPad users! Flash was not used for the interactive diagram.
Automation in the cloud: debunking marketing hype
The latest craze in the enterprise job scheduling industry seems to be cloud computing. It can be difficult to determine what a job scheduling product offers to address cloud computing by listening to presentations given by the executives.
- “Support for events is necessary”
- “Linking tasks together”
- “Security”
What is different about job scheduling in the cloud and a traditional enterprise production environment? If you move your data center to Amazon EC2 or another cloud provider, how does the move impact your enterprise jobs?
I was on a conference call with a new Flux customer yesterday discussing how they can leverage Flux more in their cloud environment. It seems pretty simple to me. Servers are static in a traditional data center. Your data center should scale dynamically when moved to the cloud. Today you’re running 5 servers to handle your load. Next month you may need to add 2 additional servers to handle the load. Scaling to meet demand should be automated. Configure the cloud server images with the necessary software and then start/stop instances from APIs or a user interface.
How should job scheduling software support cloud environments? Supporting the ability to add and remove servers dynamically is key.

