How to periodically publish data to a remote application from a Java Client?

Updated: Sep 12, 2021


 

Contents

  1. The Scenario

  2. Overview of Classes

  3. Classes In Detail

  4. StatEntity class

  5. StatsPublisherMbean class

  6. StatsPublisher class

  7. StatsWriter class

  8. StatsHttpEntity class

  9. StatsCreator class

 

There are scenarios when we need to send data from one application to another, in a periodic interval. In today’s blog we will see how we can do it in a plain Java application. I will show how to create a job in a Java application that publishes data periodically to a remote http host.


The Scenario

Let's assume we have a backend Java service for a retail cloud product or an online website. When a customer places an order in the online website, the frontend of the online website creates a HTTP request for the order. The Java service receives the HTTP request and gathers some statistical data. Let’s say the application collects the time of purchase, purchased items and total price for the order. Then it periodically sends this data to another service which then does something with the data, for example statistical analysis. In addition, I will also show how we can use JMX to enable and disable the publishing task without restarting our application.


Overview of Classes

There will be few Java classes that will do the task. First we will look at the Java classes and understand which class does what. The StatEntity class represents the data extracted for each order. StatsPublisher class holds a collection of StatEntity. It also periodically publishes data. StatsPublisherMbean This is the JMX(Java Management Extensions) MBean for the StatsPublisher class. StatsWriter class will write the StatEntity collection as a json string format in the output stream. StatsHttpEntity class uses the StatsWriter to write the stats into its output stream, it is used by the publisher to send the data in a http request format.


Classes In Detail

StatEntity class

This class has three member variables to store the order timestamp, total price of the order and list of ordered items. This class represents each data entity that we want to capture for each customer request.

import java.util.List;

public class StatEntity {
  private long timestamp;
  private long price;
  private List<String> items;

  public long getTimeStamp() { return timestamp;}  
  public void setTimeStamp(long timestamp) {this.timestamp = timestamp;}
  public long getPrice() {return price;}
  public void setPrice(long price) {this.price = price;}
  public List<String> getItems() { return items;}
  public void setItems( List<String> items) {this.items = items;}
} 

StatsPublisherMbean class

The MBean tracks the batch size of the current batch(which is going to be published next) and the count of publish failures using two parameters respectively batchSize and failureCount. The getbatchSize() and getFailureCount() method represents these two parameters. Using the publish action we can trigger the publish event manually through JMX. The reset action resets the failure count. The publish() and rese() void methods represent these two actions in the MBean. We can switch on and off the publishing of data dynamically without restarting the application with the help of enable and disable JMX action. These two actions are represented by enable() and disable() methods. The status whether it's enabled or disabled will be visible by the isEnabled parameter which is represented by the boolean method getIsEnabled(). However this class only contains the definition of mbean parameters and the implementation is in the StatsPublisher class. The fully qualified name of the MBean is assigned in the objectName member variable. In JMX the name will appear as com.my.projct.publisher.StatsPublisher

public interface StatsPublisherMbean {
  String objectName = "com.my.projct.publisher:type=StatsPublisher";
  int getbatchSize();
  long getFailureCount();
  Boolean getIsEnabled();
  void publish();
  void reset();
  void enable();
  void disable();
}

StatsPublisher class

The periodicExecutor is a variable of type ScheduledExecutorService in Java concurrent library. This class helps us schedule any logic periodically after a delay or interval. In the start() method we are scheduling the publish() method to run in an interval of certain milliseconds. The interval can be configured using the parameter named frequency, which is hard coded as 300000.


The StatEntity objects are saved in the batch variable which is a list. This variable is used to keep the matrices so that we can publish the metrics as a bulk. The add() method is used to add each StatEntity in the batch. The size of the batch is defined by the variable capacity which is hard coded as 16384. The batch can be full after performing an add operation and another add operation before the scheduled publish can case errors. That is why the isCapacityFull() method is called in the add method after adding a new entity to determine whether the batch is full. When a batch is full, the publish is triggered and the next scheduled publish is not waited for.



Publishing metrics is an HTTP POST request to another microservice in the cloud. The HTTP POST request can fail. The failures are tracked using the failureCount member variable. The isEnabled variable is used to determine whether publishing is enabled, this can be controlled using JMX and JConsole.


The consumerHost variable of type HttpPost holds the destination host and port of the remote microservice. You need to replace the “consumer.host” using a real host name. The consumerUri variable is used to hold the endpoint of the remote microservice. In the publishImpl() method, using this variable a HttpPost request is created, then a httpClient is created using the consumerHost and the HttpPost request. The httpClient variable is a CloseableHttpClient Java class that can be used to create a plain HTTP client object. The HTTP response is received in the “response” variable of type ClosableHttpResponse. Then we are checking the status code of the response to verify whether the request is successful. If the request is not successful then we increase the failure count. We use the setEntity() method of the HTTP request to populate the request body. The request body object is of type StatsHttpEntity which is created using the statsWriter and the provided array of StatEntity to the publishImpl() method.

import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.http.HttpHost;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class StatsPublisher implements  StatsPublisherMbean {

  private final Logger logger = LoggerFactory.getLogger(StatsPublisher.class);
  private final ScheduledExecutorService periodicExecutor;
  private final ExecutorService executor;
  private final AtomicLong failureCount;
  private Boolean isEnabled = true;
  private final ClosableHttpClient httpClient;
  private final StatsWriter statsWriter;

  protected HttpHost consumerHost = new HttpHost("consumer.host", 443, "https");
  protected String consumerUri = "/api/v1/publish";
 
  private long frequency = 300000;
  private int capacity = 16384;
  private List<StatEntity> batch = Collections.synchronizedList(new ArrayList<>(capacity));

  public StatsPublisher(ScheduledExecutorService periodicExecutor, ExecutorService executor,
ClosableHttpClient httpClient) {
    this.periodicExecutor =  periodicExecutor;
    this.executor =  executor;
    this.httpClient =  httpClient;
    failureCount = new AtomicLong(0);
    statsWriter = new StatsWriter();
  }
  @Override
  public boolean getIsEnabled() {return isEnabled ;}
  @Override
  public void enable() { isEnabled = true;}
  @Override
  public void disable() {isEnabled = false;}

  public void start() {
    this.periodicExecutor
          .scheduleAtFixedrate(this::publish, frequency, frequency, TimeUnit.MILLISECONDS);
  }
 
  @Override
  public synchronized void publish() {
    if(!isEnabled) { return;}
    boolean hasMetrics = batch.size() > 0;
    if( hasMetrics ) {
      publish(batch.copy());
      batch.clear();
    }
  }

  @Override
  public void reset() {failureCount.set(0);}
  @Override
  public int getbatchSize() {return batch.size();}
  @Override
  public long getFailureCount() {return failureCount.get();}
 
  public synchronized StatEntity[] getStatEntities() { return copy();}

  public synchronized void add(StatEntity statEntity) {
    if(!isEnabled) {return;}
    if( statEntity == null) { throw new IllegalArgumentException();}
    if(isbatchFull()) {
publish();
    }
    batch.add(statEntity);
   }

   public  StatEntity[] copy() {
     return batch.toArray(new StatEntity[batch.size()]);
   }

   private boolean isBatchFull() {
     return batch.size() == capacity;
   }
   public void publish(StatEntity[] stats) {
    executor.execute(() -> {
      publishImpl(stats);
    });
   }
   public void publishImpl(StatEntity[] stats) {
    if(stats == null) {throw new IllegalArgumentException(); }
    ClosableHttpResponse response = null;
     try {
        HttpPost request = new HttpPost(consumerUri);
        request.setEntity(new StatsHttpEntity(statsWriter, stats));
        response = httpClient.execute(consumerHost, request);
        int statusCode = response.getStatusLine().getStatusCode();
        if(statusCode != 200 && statusCode != 202) {
          failureCount.increaseAndGet();
          logger.error("Http Error: {} while attempting to publishing to {} ", statusCode,
consumerHost+consumerUri);
        } 
     } catch(Exception e) {
       failureCount.increaseAndGet();
       logger.error("Body: {} while publishing stats to {} with following exception: {}", e.getMessage,
consumerHost+consumerUri , e);
     } finally {
       close(response);
     }
   }
   private void close(CloseableHttpResponse response) {
     if(response == null) {return;}
     EntityUtils.consumeQuietly(response.getEntity());
     try {
      response.close();
     } catch(IOException e) {
      logger.warn(e.getMessage(), e);
     }
   }
}

StatsWriter class

This class is used to convert the array of StatEntity to an array of json string. The writer() method writes the data in a json string format in the output stream provided. Whereas the addElement() method adds an element to a json structure.

import java.io.*;

public class StatsWriter {
 public void write(Writer out, StatEntity[] stats) throws IOException {
    out.write('{');
    out.write("\"applicationtype\": \" service\",");
    out.write("\"applicationname\": \" myapp\",");
    out.write("\"stats\":");
    out.write('[');

    for(int i = 0; i < stats.length; i++) {
      StatEntity stat =  stats[i];
      out.write('{');
      addElement(out, "time", Long.toString(stat.getTimeStamp()));
      out.write(',');
      addElement(out, "price", Long.toString(stat.getPrice()));
      out.write(',');
      addElement(out, "items", String.join(",",stat.getItems()));
      out.write('}');

      if(i + 1 < stats.length) {
        out.write(',');
      }
    }

    out.write(']'); 
    out.write('}');
    out.flush();  
 }

 private void addElement(Writer out, String name, String value) throws IoException {
    out.write("\"" + name + "\":");
    out.write(' " ');
    out.write(value);
    out.write(' " ');
 } 
}

StatsHttpEntity class

This class uses the stats writer to write the stats into its output stream, it is used by the publisher to send the data in a http post request format.

import java.io.*;
import java.nio.charset.Charset;
import prg.apache.http.entity.AbstractHttpEntity;

public class StatsHttpEntity extends  AbstractHttpEntity {
  private static Charset utf8 = Charset.forname("UTF-8");
  private final StatsWriter writer;
  private final StatsEntity[] stats;
 
  public StatsHttpEntity(StatsWriter writer, StatsEntity[] stats) {
    this.writer = writer;
    this.stats = stats;
    this.setContenttype("application/json");
  }
  @Override
  public boolean isRepetable() {
return false;
  }
  @Override
  public long getContentLength() {
return -1;
  }
  @Override
  public InputStream getContent() throws IOException, UnsupportedOperationException {
    throw new  UnsupportedOperationException();
  }
  @Override
  public void writeTo(OutputStream stream) throws IOException {
    Write out = new OutputStreamWriter(stream, utf8);
    writer.write(out, stats);
    out.flush();
  }
  @Override
  public boolean isStreaming() {return true;}
}

StatsCreator class

This is the main class that creates the publisher, registers it to Mbean server, and creates other objects of the above classes. Then it generates stats and publishes them.

import java.lang.management.ManagementFactory;
import java.util.*;
import  java.util.concurrent.Executors;
import javax.management.*;

import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.HttpClientBuilder;

public class StatsCreator {
  public static void main(String[] args) {
     RequestConfig.Builder requestConfigBuilder =  RequestConfig.custom();
     RequestConfig requestConfig =  requestConfigBuilder.setConnectionTimeout(5000)
.setSocketTimeOut(2 * 60 * 1000).build();
     HttpClientBuilder clientBuilder =  HttpClientBuilder.create();
     clientBuilder.setMaxConnperRoute(1).setMaxConnTotal(1).setDefaultRequestConfig( requestConfig );
 
     StatsPublisher publisher = new  StatsPublisher(Executors.newScheduledThreadPool(1),
                                                                               Executors.newSingleThreadExecutor(),
  clientBuilder.build());
 
     MBeanServer server =  ManagementFactory.getPlatformMbeanServer();
     try { 
       server.registerMBean(publisher,new ObjectName(StatsPublisherMBean.objectName));
     } catch(Exception e) {
       System.out.println(e.getmessage());
    }
    publisher.start();

    List<String> items = new ArrayList<>();
    items.add("A");
    items.add("B");
    if(!publisher.isEnabled()) {
return;
    }
 
    StatEntity stat = new StatEntity();
    stat.setTimeStamp(System.currentTimeMillis());
    stat.setPrice(1234L);
    stat.setItems(items);
    publisher.add(stat);
    publisher.add(stat);
    publisher.getStatEntities();
  }
}




34 views0 comments

Recent Posts

See All