...
Code Block |
---|
/** * @return The row distribution code (plugin id) */ public String getCode(); /** * @return The row distribution description (plugin description) */ public String getDescription(); /** * Do the actual row distribution in the step * @param rowMeta the meta-data of the row to distribute * @param row the data of the row data to distribute * @param stepInterface The step to distribute the rows in * @throws KettleStepException */ public void distributeRow(RowMetaInterface rowMeta, Object[] row, StepInterface stepInterface) throws KettleStepException; /** * Which mini-icon needs to be shown on the hop? * * @return the available code EImage or null if the standard icon needs to be used. Return null if you don't want to display an icon. */ public EImage getDistributionImage(); |
Example
The following code example describes a distribution algorithm that tries to lower the parallelism or more accurately, forces asynchronous operation of the target steps as they will get out of step with each other. The output row set buffers are filled up one by one. If one is full, the next one is approached:
Code Block |
---|
package org.pentaho.di.trans.step;package org.pentaho.di.trans.step;
import java.util.concurrent.TimeUnit;
import org.pentaho.di.core.RowSet;
import org.pentaho.di.core.exception.KettleStepException;
import org.pentaho.di.core.gui.PrimitiveGCInterface.EImage;
import org.pentaho.di.core.row.RowMetaInterface;
@RowDistributionPlugin(
code="Overflow",
name="Overflow",
description="When an output row set is full, move to the next one"
)
public class OverflowRowDistributionPlugin implements RowDistributionInterface {
@Override
public String getCode() {
return "Overflow";
}
@Override
public String getDescription() {
return "When an output row set is full, move to the next one";
}
@Override
public void distributeRow(RowMetaInterface rowMeta, Object[] row, StepInterface step)
throws KettleStepException {
RowSet rowSet = step.getOutputRowSets().get(step.getCurrentOutputRowSetNr());
boolean added = false;
while (!added) {
added=rowSet.putRowWait(rowMeta, row, 1, TimeUnit.NANOSECONDS);
if (added) {
break;
}
step.setCurrentOutputRowSetNr(step.getCurrentOutputRowSetNr()+1);
if (step.getCurrentOutputRowSetNr()>step.getOutputRowSets().size()-1) {
step.setCurrentOutputRowSetNr(0);
}
rowSet = step.getOutputRowSets().get(step.getCurrentOutputRowSetNr());
}
}
@Override
public EImage getDistributionImage() {
return null;
}
}
|