Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • The solution implements a job expect that can be added to the top of any job chain.
    • This job implements a spooler_process() function that checks if an order matches the expected value and that suspends non-matching orders.
      • Expected values can be provided by incoming orders
        • by use of parameters:
          • each incoming order is assumed to provide a parameter that contains the expected value. 
            • Example for use with oder parameter: business_date = 2015-11-01
          • the name of the incoming orders' parameter is specified as the value of the control_order_expected_parameter parameter to the expect job.
            • Example for use with expect job parameter: control_order_expected_parameter= business_date
        • without parameters: 
          • the job will expect a numeric order id to be provided and increments the order id to calculcate the next expected value.
      • For expected values a function has to be provided that calculates the next expected value:
        • the function is specified by use of the conctrol_order_expected_value_function parameter to the expect job. The function is implemented with some JavaScript code that returns the next expected value based on the current value that is provided with the currentValue variable:
          • Example for numeric calculation: 
            • parseInt(currentValue) + 1
            • Explanation: the function parses the current expected numeric value and returns the incremented value
          • Example for date calculcation:
            • (new Date( (new Date(currentValue)).setDate( (new Date(currentValue)).getDate()+1 ) )).toISOString().substring(0,10);
            • Explanation: the function accepts the current date value, adds one day and returns the date in ISO format, e.g. 2015-11-01
      • For expected values a default value can be provided that is used to check the expected value of the first incoming order:
        • the default value can be specified by use of the control_order_expected_default_value parameter to the expect job.
          • Example for use with numeric default value: control_order_expected_default_value = 1
          • Example for use with date default value: control_order_expected_default_value = 2015-11-01
        • without specification of a default value the value of the first order processed by the expect job will be used as a default.
    • This job is configured for a single task, i.e. it executes incoming orders sequentually.
    • Having received an order with the expected value this job moves that order to the next job node in the job chain and activates any suspended orders to be rechecked if they provide the next expected value.
  • The solution automatically creates a control order that is used to store the next expected value.
    • The name of the control order is created from the prefix "control_order_" and the state that the expect job node is assigned.
    • The control order is suspended and not processed by subsequent job nodes. Its only purpose is to carry the next expected value for use of the solution with JobScheduler Agents and cluster members.
  • The sample makes use of a job chain job_chain_expected_orders that includes the job nodes for the expect job and a hello job. The job chain accepts Ad Hoc orders that are added by use of JOC and the job chain can easily be modified to watch for incoming files and to create one order for each file.
  • Hint: to re-use the expect job you can
    • store the job to some central folder and reference the job in individual job chains.
    • move the JavaScript code of the job to come central location and use a corresponding <include> element for the individual job scriptscripts.

 

Code Block
languagejs
titleJob expect
collapsetrue
var jobChainPath = null;
var jobChainNodeState = null;

var orderExpectedValue = null;
var orderExpectedDefaultValue = null;

var orderExpectedValueFunction = "parseInt(currentValue) + 1";
var controlOrderPrefix = "control_order_";
var controlOrderID = null;

function executeXML( command ) {
  var rc = false;

  spooler_log.debug( ".... executing xml command: " + command );
  var response = spooler.execute_xml( command );
  var xmlDOM = new Packages.sos.xml.SOSXMLXPath( new java.lang.StringBuffer( response ) );
  var errorCode = xmlDOM.selectSingleNodeValue( "//ERROR/@code" );
  var errorText = xmlDOM.selectSingleNodeValue( "//ERROR/@text" );
  if ( errorCode || errorText ) {
    spooler_log.error( ".... xml response: errorCode=" + errorCode + ", errorText=" + errorText );
  } else {
    rc = xmlDOM;
  }

  return rc;
}

function getOrderExpectedValue() {
  if (orderExpectedValue == null) {
    orderDOM = executeXML( "<show_order job_chain='" + jobChainPath + "' order='" + controlOrderID + "' what='payload'/>" );
    if ( orderDOM ) {
      orderExpectedValue = orderDOM.selectSingleNodeValue( "/spooler/answer/order/payload/params/param[@name = '" + controlOrderID + "']/@value" );
    }
  }

  if ( !orderExpectedValue ) {
    orderExpectedValue = orderExpectedDefaultValue;
    spooler_log.info( ".... getOrderExpectedValue(): " + orderExpectedValue + " (default)" );
  } else {
    spooler_log.info( ".... getOrderExpectedValue(): " + orderExpectedValue );
  }

  return orderExpectedValue;
}

function setOrderExpectedValue( expectedValue ) {
  spooler_log.info( ".... setOrderExpectedValue(): " + expectedValue );
  var rc = true;
  orderExpectedValue = expectedValue;

  // check existence of control order, if historic then its state does not match the job node state
  var orderDOM = executeXML( "<show_order job_chain='" + jobChainPath + "' order='" + controlOrderID + "' what='payload'/>" );
  if ( orderDOM ) {
    var orderState = orderDOM.selectSingleNodeValue( "/spooler/answer/order/@state" );
    if (!orderState || orderState != jobChainNodeState) {
      rc = false;
    }
  } else {
    rc = false;
  }

  // add or modify control order
  if (!rc) {
    orderDOM = executeXML( "<add_order job_chain='" + jobChainPath + "' id='" + controlOrderID + "' state='" + jobChainNodeState + "' suspended='yes'><params><param name='" + controlOrderID + "' value='" + orderExpectedValue + "'/></params><run_time/></add_order>" );
  } else {
    orderDOM = executeXML( "<modify_order job_chain='" + jobChainPath + "' order='" + controlOrderID + "' state='" + jobChainNodeState + "' suspended='yes'><params><param name='" + controlOrderID + "' value='" + orderExpectedValue + "'/></params><run_time/></modify_order>" );
  }
  spooler_job.state_text = "next expected value: " + orderExpectedValue;
}

function calculateOrderExpectedValue( currentValue ) {
  // calculate next date for a date parameter
  // var expectedValue = (new Date( (new Date(currentValue)).setDate( (new Date(currentValue)).getDate()+1 ) )).toISOString().substring(0,10);
  // calculate increment for a numeric parameter or order id
  // var expectedValue = parseInt(currentValue) + 1;

  var expectedValue = eval( orderExpectedValueFunction );
  return expectedValue;
}

function spooler_init() {
  jobChainPath = spooler_task.order.job_chain.path;
  jobChainNodeState = spooler_task.order.job_chain_node.state;
  controlOrderID = controlOrderPrefix + spooler_task.order.job_chain_node.state;

  return true;
}

function spooler_process() {
  var rc = true;

  // control order is always suspended
  if (spooler_task.order.id == controlOrderID) {
    spooler_log.info( ".. control order identified, processing suspended" );
    suspendOrder();
    return rc;
  }

  // merge parameters from task and order
  var params = spooler.create_variable_set();
  params.merge( spooler_task.params );
  params.merge( spooler_task.order.params );

  // get expected value from a parameter name or from the Order ID
  spooler_log.info( ".. control parameter for expected value is looked up: " + controlOrderPrefix + "expected_parameter" );
  var expectedParameter = params.value( controlOrderPrefix + "expected_parameter" );
  if (expectedParameter) {
    var currentValue = params.value( expectedParameter );
    spooler_log.info( ".. current value is used from control parameter [" + expectedParameter + "]: " + currentValue );
  } else {
    var currentValue = spooler_task.order.id;
    spooler_log.info( ".. current value is used from order id: " + currentValue );
  }

  // get expected default value from a parameter or from the order id
  if (!orderExpectedDefaultValue) {
    orderExpectedDefaultValue = params.value( controlOrderPrefix + "expected_default_value" );
    if (orderExpectedDefaultValue) {
      spooler_log.info( ".. default value is used from control parameter [" + controlOrderPrefix + "expected_default_value]: " + orderExpectedDefaultValue );
    } else {
      spooler_log.info( ".. default value is used from order id: " + spooler_task.order.id );
      orderExpectedDefaultValue = spooler_task.order.id;
    }
  }

  // get expected value calculation function
  var expectedValueFunction = params.value( controlOrderPrefix + "expected_value_function" );
  if (expectedValueFunction) {
    orderExpectedValueFunction = expectedValueFunction;
    spooler_log.info( ".. using expected value function: " + orderExpectedValueFunction );
  } else {
    spooler_log.info( ".. using expected value default function: " + orderExpectedValueFunction );
  }

  // check if order value matches expectation
  if ( getOrderExpectedValue() == currentValue ) {
    // after processing of the current order all suspended orders are activated
    spooler_log.info( ".. current order provides expected value: " + getOrderExpectedValue() );
    activateSuspendedOrders();
    setOrderExpectedValue( calculateOrderExpectedValue( getOrderExpectedValue() ) );
  } else {
    // suspend non-matching order
    spooler_log.info( ".. suspending current order: expected value=" + getOrderExpectedValue() + ", current value=" + currentValue );
    suspendOrder();
  }

  return rc;
}

function suspendOrder() {
    spooler_task.order.suspended = true;
    spooler_task.order.state = jobChainNodeState;
}

function activateSuspendedOrders() {
  var rc = true;
  var orderList = Array();

  // select suspended orders of the current job node
  var orderDOM = executeXML( "<show_job_chain job_chain='" + jobChainPath + "' what='job_chain_orders'/>" );
  var orderNodes = orderDOM.selectNodeList( "/spooler/answer/job_chain/job_chain_node[@state = '" + jobChainNodeState + "']/order_queue/order[@suspended = 'yes']" );

  // traverse order list and add orders to sort array
  for( orderIndex=0; orderIndex<orderNodes.getLength(); orderIndex++ ) {
    var orderNode = orderNodes.item(orderIndex);
    var orderID = orderDOM.selectSingleNodeValue( orderNode, "@id" );
    if (orderID == null || controlOrderID) {
      continue;
    }
    spooler_log.info( ".... suspended order found: " + orderID );
    orderList.push( orderID );
  }

  // alphabetical string sort
  orderList.sort(function(a, b){return (a > b) - (a < b) });
  // numeric sort
  // orderList.sort(function(a, b){return b - a) });

  for(i=0; i<orderList.length; i++) {
    spooler_log.info( ".... activating order: " + orderList[i] );
    orderDOM = executeXML( "<modify_order job_chain='" + jobChainPath + "' order='" + orderList[i] + "' state='" + jobChainNodeState + "' suspended='no'/>" );
    if ( !orderDOM ) {
      rc = false;
    }  
  }

  return rc;
}

...