by Rahul

Categories

  • Big Data
  • java

Tags

  • bigdata
  • java
  • nifi

Apache NIFI provides various options to retry/wait in processors. If you want to implement custom processor with out of the box nifii solutions to wait on certain condition/external-resource then it would be complex workflow.

Alternatively, create native Thread.sleep and loop until expected condition is satisfied. I wouldn’t recommend doing this as it blocks entire flowfile execution.

There’s 3rd option which I am going to cover in this post using a custom Processor. This sample processor keep looping until it finds the file. If the file is not available then it penalize flow-file then transfer it on RETRY relartionship. The RETRY relationship point to self. And, this keep running until it finds the file then it sends incoming flow-file to success.

Nifi Retry Processor Group

Sample Processor :

public class RetrySample
         extends AbstractProcessor {
 
     public static final PropertyDescriptor FILE_PATH = new PropertyDescriptor
             .Builder().name("File Path")
             .description("file path")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .required(true)
             .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("Operation completed successfully")
             .build();
 
     public static final Relationship REL_RETRY = new Relationship.Builder()
             .name("retry")
             .description("Retry relation")
             .build();
     public static final String RETRY = "retry";
 
     private Set<Relationship> relationships;
 
     private List<PropertyDescriptor> descriptors;
 
     @Override
     protected void init(ProcessorInitializationContext context) {
         final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
         descriptors.add(FILE_PATH);
         this.descriptors = Collections.unmodifiableList(descriptors);
 
         final Set<Relationship> relationships = new HashSet<Relationship>();
         relationships.add(REL_SUCCESS);
         relationships.add(REL_RETRY);
         this.relationships = Collections.unmodifiableSet(relationships);
     }
 
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
         FlowFile ff = session.get();
         if(ff==null){
             return;
         }
         String path = context.getProperty(FILE_PATH).getValue();
 
         if(!new File(path).exists()){
             if (!ff.getAttributes().containsKey(RETRY)){
                 session.putAttribute(ff,RETRY,"yes");
             }
             session.penalize(ff);
             session.transfer(ff,REL_RETRY);
             return;
         }else {
             if (ff.getAttributes().containsKey(RETRY) ){
                 session.removeAttribute(ff, RETRY);
             }
             session.transfer(ff,REL_SUCCESS);
         }
 
     }
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return descriptors;
     }
 
     @Override
     public Set<Relationship> getRelationships() {
         return relationships;
     }
 }