Scope

Solution

Pattern

job_chain [label="Job Chain",fillcolor="orange"]
job_split [label="Job Split",fillcolor="lightskyblue"]
job_create [label="Job Create Balance Statements",fillcolor="lightskyblue"]
job_create1 [label="Task Create Balance Statements\non Agent 1",fillcolor="white"]
job_create2 [label="Task Create Balance Statements\non Agent 2",fillcolor="white"]
job_create3 [label="Task Create Balance Statements\non Agent 3",fillcolor="white"]
job_create4 [label="Task Create Balance Statements\non Agent 4",fillcolor="white"]
job_create5 [label="Task Create Balance Statements\non Agent 5",fillcolor="white"]
job_sync [label="Job Synchronize",fillcolor="lightskyblue"]
job_do_something [label="Job DoSomething",fillcolor="lightskyblue"]

job_chain -> job_split
job_split -> job_create
job_create -> job_create1
job_create -> job_create2
job_create -> job_create3
job_create -> job_create4
job_create -> job_create5
job_create1 -> job_sync
job_create2 -> job_sync
job_create3 -> job_sync
job_create4 -> job_sync
job_create5 -> job_sync
job_sync -> job_do_something

Implementation

Components

 

<?xml version="1.0" encoding="ISO-8859-1"?>
<job  order="yes" stop_on_error="no" title="split processing for execution on a number of agents">

    <params >
        <param  name="sync_state_name" value="sync"/>
    </params>

    <script  language="java:javascript">
        <![CDATA[
function spooler_process()
{
	var rc = true;
	var syncParameterName = spooler_task.order.job_chain.name + '_required_orders';
 
    // get job and order parameters
  	params = spooler.create_variable_set();
  	params.merge( spooler_task.params );
  	params.merge( spooler_task.order.params );

	if ( !params.value( 'min_account' ) )
	{
		spooler_log.error( 'parameter missing: min_account' );
	}

	if ( !params.value( 'max_account' ) )
	{
		spooler_log.error( 'parameter missing: max_account' );
	}

	if ( !params.value( 'sync_state_name' ) )
	{
		spooler_log.error( 'parameter missing: sync_state_name' );
	}

    var minAccount = parseInt(params.value( 'min_account' ));
    var maxAccount = parseInt(params.value( 'max_account' ));
    var syncStateName = params.value( 'sync_state_name' );

    spooler_log.debug( '.. executing xml: <show_state subsystems="folder job" what="source folders no_subfolders" path="' + spooler_task.order.job_chain_node.next_node.job.folder_path + '"/>' );
    var response = spooler.execute_xml( '<show_state subsystems="folder job" what="source folders no_subfolders" path="' + spooler_task.order.job_chain_node.next_node.job.folder_path + '"/>' );

    if ( response )
    {
        spooler_log.debug( '.... show state for job: ' + response );
        var xmlDOM = new Packages.sos.xml.SOSXMLXPath( new java.lang.StringBuffer( response ) );

		// get process class assignment from job
		spooler_log.debug( '.... query process class assignment from job: //folder[@path = "' + spooler_task.order.job_chain_node.next_node.job.folder_path + '"]/jobs/job[@path = "/' + spooler_task.order.job_chain_node.next_node.job.name + '"]' );

		var jobNode = xmlDOM.selectSingleNode( '//folder[@path = "' + spooler_task.order.job_chain_node.next_node.job.folder_path + '"]/jobs/job[@path = "/' + spooler_task.order.job_chain_node.next_node.job.name + '"]' );
		if ( !jobNode )
		{
			throw 'no configuraton found for job: ' + spooler_task.order.job_chain_node.next_node.job.name;
		}

		var processClassPath = jobNode.getAttribute( 'process_class' );
		if ( !processClassPath )
		{
			throw 'no process class assignment found for job: ' + spooler_task.order.job_chain_node.next_node.job.name
		}
		
		if ( processClassPath.lastIndexOf( '/' ) > -1 )
		{
			var processClassDirectory = processClassPath.substring( 0, processClassPath.lastIndexOf( '/' ) );
			var processClassName = processClassPath.substring( processClassPath.lastIndexOf( '/' ) + 1 );
		} else {
			var processClassDirectory = spooler_task.order.job_chain.path.substring( 0, spooler_task.order.job_chain.path.lastIndexOf( '/' ) );
			var processClassPath = processClassDirectory + '/' + processClassName;
		}
	}

    spooler_log.debug( '.. executing xml: <show_state subsystems="folder process_class" what="source folders no_subfolders" path="' + processClassDirectory + '"/>' );
    var response = spooler.execute_xml( '<show_state subsystems="folder process_class" what="source folders no_subfolders" path="' + processClassDirectory + '"/>' );

    if ( response )
    {
        spooler_log.debug( '.... show state for process class: ' + response );
        var xmlDOM = new Packages.sos.xml.SOSXMLXPath( new java.lang.StringBuffer( response ) );

		// get Agents from process class configuration
		var xmlAgents = xmlDOM.selectNodeList( '//process_classes/process_class[@path = "' + processClassPath + '"]//remote_scheduler' );

		if ( xmlAgents.getLength() == 0 )
		{
			spooler_log.error( 'no process class found for expected path: ' + processClassPath );
		}

		// check for available Agents
		var availableAgents = [];

		for( i=0; i < xmlAgents.getLength(); i++ )
		{
			var agentId = xmlAgents.item(i).getAttribute( 'remote_scheduler' );
			var agentService = agentId + '/jobscheduler/agent/api/';
			spooler_log.debug( '.... Agent configuration found: ' + agentId );

            try {
            	var agentResponse = com.sos.jitl.restclient.JobSchedulerRestClient.executeRestService( agentService );
			} catch (e) {
				spooler_log.info( '.... Agent Service [' + agentService + '] returns error: ' + String(e) );
				agentResponse = false;
			}

  			var skipAgent = false;

			if ( agentResponse ) 
			{
		        eval( "var jsonObject = " + agentResponse + ";" );
				spooler_log.debug( '.... output of Agent response: ' + agentResponse );

				if ( jsonObject.isTerminating )
				{
					spooler_log.info( '.... Agent Service [' + agentService + '] signals termination' );
					skipAgent = true;
				}
		    } else {
		        spooler_log.info( '.... no response from Agent web service at: ' + agentService );
				skipAgent = true;
		    }

			if ( skipAgent )
			{
				spooler_log.info( '.... skipping unavailable Agent: ' + agentId );
			} else {
				availableAgents.push( agentId );
			}
		}

		// sample: initialize chunk size from the number of available Agents
		var currentOrderChunkSize = ( maxAccount-minAccount+1 ) / availableAgents.length;
		var currentOrderChunkSizeInt = parseInt( currentOrderChunkSize );
		var currentOrderChunkSizeReminder = currentOrderChunkSize % 1;
		var subMinAccount = 0;
		var subMaxAccount = minAccount-1;

		for( i=0; i < availableAgents.length; i++ )
		{
			// sample: calculate chunk size for child order from the number of available Agents
			subMinAccount = subMaxAccount + 1;
			subMaxAccount = subMinAccount + currentOrderChunkSizeInt;

			if ( i == 0 && currentOrderChunkSizeReminder )
			{
				subMaxAccount += 1; 
			} else if ( i == availableAgents.length-1 ) {
				subMaxAccount = maxAccount;
			}

			// create child order
		    var subOrder = spooler.create_order();
		    var subParams = spooler.create_variable_set();
			subParams.set_var( syncParameterName, ( availableAgents.length+1 ) );
		    subParams.set_var( 'sync_session_id', spooler_task.order.id );
		    subParams.set_var( 'number_of_orders', availableAgents.length );
		    subParams.set_var( 'min_account', subMinAccount );
		    subParams.set_var( 'max_account', subMaxAccount );
		    subOrder.params = subParams;
		    subOrder.id = spooler_task.order.id + '_child_order_' + ( i + 1 );
			subOrder.title = 'child order of parent order: ' + spooler_task.order.id;
		    subOrder.state = spooler_task.order.job_chain_node.next_state;
		    subOrder.end_state = syncStateName;

			// launch child order
		    spooler_task.order.job_chain.add_order( subOrder );
		    spooler_log.info( '.. child order ' + ( i + 1 ) + ' has been added for range: ' + subMinAccount + ' - ' + subMaxAccount + ': ' + subOrder.id );
		}
    } else {
        spooler_log.error( 'no response from JobScheduler Master for command: <show_state>' );
        rc = false;
    }    

	spooler_task.order.params.set_var( syncParameterName, ( availableAgents.length+1 ) );
	spooler_task.order.params.set_var( 'sync_session_id', spooler_task.order.id );
	spooler_task.order.state = syncStateName;

  	return rc;
}
        ]]>
    </script>
    <run_time />
</job>

Explanations

 

<job  process_class="end_of_day" order="yes" stop_on_error="no" tasks="10" stderr_log_level="error">
    <params/>
    <script language="shell">
        <![CDATA[
@echo "job running on Agent: %SCHEDULER_HOST%:%SCHEDULER_HTTP_PORT%"
@echo "creating balance statements for clients: %SCHEDULER_PARAM_MIN_ACCOUNT% - %SCHEDULER_PARAM_MAX_ACCOUNT%"
ping -n 5 localhost
        ]]>
    </script>
    <run_time />
</job>

Explanations

 

<process_classes >
    <process_class  max_processes="10">
        <remote_schedulers  select="next">
            <remote_scheduler  remote_scheduler="http://server1:4445"/>
            <remote_scheduler  remote_scheduler="http://server2:4445"/>
            <remote_scheduler  remote_scheduler="http://server3:4445"/>
        </remote_schedulers>
    </process_class>
</process_classes>

Explanations

 

<order>
    <params>
        <param name="min_account" value="100000"/>
        <param name="max_account" value="200000"/>
    </params>
    <run_time/>
</order>

Explanations

Usage