Reference implementation

The following section provides details about the classes that have implemented the API from the otlpClientPlugin.jar file.

APM

APM Connector reference implementation

This class is used to run the attach and release connectivity operations and must implement the TraceExportConnector interface.

Example:


/**
 * 
     An APM client that creates a connection resource that helps to publish traces to APM cluster.

    Assume that you want to publish a data to an OTLP endpoint e.g. https://api.xxxx.io/v1/traces

    Usage example:


         PoolingNHttpClientConnectionManager asyncConnManager = new
         PoolingNHttpClientConnectionManager( new
         DefaultConnectingIOReactor(IOReactorConfig.DEFAULT)); 
         HttpAsyncClientBuilder asyncClientBuilder = HttpAsyncClients.custom();
         asyncClientBuilder.setConnectionManager(asyncConnManager);
         CloseableHttpAsyncClient clientHttps = syncClientBuilder.build();
         clientHttps.start();
      
    To enable loggers in your implementation, get a logger instance from the logger factory with name "e2eagentOtlpTraces". The logs generated will be added to a file e2eagentOtlpTraces.log.

    Usage example:


        private final Logger e2eagentOtlpTraces = LoggerFactory.getLogger("e2eagentOtlpTraces");
     
    Note: In the example, you are adding the serverInfo to systemConnection object.
    Usage example:

    TargetServerConnection object has been initialized in the class level. Example is as shown below.

     private final TargetServerConnection systemConnection = new TargetServerConnection(); 
        
         final HashMap<String, Object> serverInfo = new HashMap<String, Object>();
         serverInfo.put("CONNECTION_CLIENT", clientHttps);
         serverInfo.put("CONFIG", externalServerConfig); 
         systemConnection.setConnection(serverInfo);
 */
public class APMTraceExportConnectorImpl implements TraceExporterConnector {
    private final Logger e2eagentOtlpTraces = LoggerFactory.getLogger("e2eagentOtlpTraces");
    private final TargetServerConnection systemConnection = new TargetServerConnection();
    private List<String> ERROR_CODES = null;
   

/**

*
     
This method is used to connect to Plugged-In external APM server from End-to-End Monitoring.

    1. To connect using SSL, do the following steps.

      a. SSL property "exporter.tls" needs to be enabled e.g. "true" in the agent.config file.

      b. If the target server requires a client certificate for the identity

    Usage example for SSL configuration.


        String keystoreType = targetServerConfig.getString(TargetServerConfig.KEYSTORE_TYPE);
        KeyStore trustStore = keystoreType != null && keystoreType.equalsIgnoreCase("JKS")
                ? KeyStore.getInstance(KeyStore.getDefaultType())
                : KeyStore.getInstance(keystoreType);
        trustStore.load(loadStream(trustStoreLoc), truststorePassword.toCharArray());
        SSLContextBuilder sslBuilder = SSLContexts.custom()
                                .loadTrustMaterial(trustStore, new TrustSelfSignedStrategy())
                                .setSecureRandom(new SecureRandom());
        SSLContext sslContext = sslBuilder.build();
        SSLIOSessionStrategy sessionStrategy = null;
        if (targetServerConfig.getBoolean(TargetServerConfig.NO_OP_HOSTName_VERIFIER)) {
            sessionStrategy = new SSLIOSessionStrategy(sslContext, null, null,
                    NoopHostnameVerifier.INSTANCE);
        } else {
            sessionStrategy = new SSLIOSessionStrategy(sslContext, null, null,
                    SSLConnectionSocketFactory.getDefaultHostnameVerifier());
        }
        RegistryBuilder<SchemeIOSessionStrategy> registryBuilder = RegistryBuilder.create();
        registryBuilder.register("https", sessionStrategy).register("http",
                NoopIOSessionStrategy.INSTANCE);
        PoolingNHttpClientConnectionManager clientConnectionManager = new PoolingNHttpClientConnectionManager(
                new DefaultConnectingIOReactor(IOReactorConfig.DEFAULT), registryBuilder.build());
        HttpAsyncClientBuilder asyncClientBuilder = HttpAsyncClients.custom();
        asyncClientBuilder.setConnectionManager(clientConnectionManager);
        // Create a UsernamePasswordCredentials object
        asyncClientBuilder = passwordBuilder(username, password, asyncClientBuilder);
        if (asyncClientBuilder == null)
            return null;
        clientHttps = asyncClientBuilder.build();
        clientHttps.start();                        
         
    2. In case of username and password is configured for the external target server then use below mechanism to configure the credentials in the connection property.
    Usage example:

     
        if (username != null && !username.isEmpty()) {
                if (password != null && !password.isEmpty()) {
                    final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                    credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
                    asyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                } else {
                    e2eagentOtlpTraces.info("Password initializing from IS PassMan is in progress");
                    return null;
                }
         }
         
    Note: E2EM agent is loaded during the Integration Server startup. Password retrieval from the password manager will take a while so have a logic to re-try until password is received.

    3. If SSL is not enabled, then do the following steps.
      a. SSL property needs to be disabled in the agent.config file.

      B. If the target server doesn't require a client certificate for the identity.

    Usage example:


         PoolingNHttpClientConnectionManager asyncConnManager = new
         PoolingNHttpClientConnectionManager( new
         DefaultConnectingIOReactor(IOReactorConfig.DEFAULT)); HttpAsyncClientBuilder
         asyncClientBuilder = HttpAsyncClients.custom();
         asyncClientBuilder.setConnectionManager(asyncConnManager);
         CloseableHttpAsyncClient clientHttps = syncClientBuilder.build();
         clientHttps.start();
             }
    4. The connection object has to be added into TargetServerConnection for the exporter implementation reference.
    TargetServerConnection object has been initialized in the class level. Example is as shown below.

     private final TargetServerConnection systemConnection = new TargetServerConnection(); 
    Usage example:


         final HashMap<String, Object> serverInfo = new HashMap<String, Object>();
         serverInfo.put("CONNECTION_CLIENT", clientHttps); 
         serverInfo.put("CONFIG", externalServerConfig); 
         systemConnection.setConnection(serverInfo);
         
    5. Error codes have been configured to validate the connectivity of target OTLP REST endpoint. In the example below shows how to initialize the error codes.
    The error codes are configurable in the agent config file. The default values are "exporter.api_error_codes=502,503,504"

    Usage example:


         this.ERROR_CODES = Arrays.asList(externalServerConfig.getString(TargetServerConfig.API_ERROR_CODES).split(","));
         
    Note:
    Configured passwords are retrieved from Integration Server Password Manager utility.
    The error codes are configured to validate the connectivity for OTLP REST endpoint. You can load the error codes if required. Refer to point.
     * 
     * 
     * @param targetServerConfig A TargetServerConfig object with parameters
     *                           specific for connecting to the external server.
     *                           Parameters defined in the Connectivity Information
     *                           while creating the Exporter from E2EM can be
     *                           retrieved using getters.
     * @return ExternalSystemConnection object with connection to external server
     * @throws Exception
     */
    @Override
    public TargetServerConnection connect(TargetServerConfig targetServerConfig) throws Exception {
       // Code to create required connection resource object
        return systemConnection;
    }
    /**
     * This method is used to release the external server connection from End-to-End
     * Monitoring. 
     * 
     * Here is an example, 
     *
    final HashMap<String, Object> serverInfo = (HashMap<String, Object>) systemConnection.getConnection();
    CloseableHttpAsyncClient httpClient = (CloseableHttpAsyncClient) serverInfo.get("CONNECTION_CLIENT");
    if (httpClient != null) {
        httpClient.close();
    }}
     * 
     * 
     * @param systemConnection A TargetServerConnection object containing connection
     *                         to the plugged in EXPORTER connection to the plugged
     *                         in external server.
     * @throws IOException
     */
    @SuppressWarnings("unchecked")
    @Override
    public void release(TargetServerConnection systemConnection) throws Exception {
      // code to release the connection resource
    }
    /**
     This method is used to check the connectivity with the external server.

    The anticipated error codes for your external server can be configured in the agent.config file. "exporter.api_error_codes=502,503,504"

    If any of these error codes are received, then this method returns false. Else, it returns true.

    Usage example:


    final HashMap<String, Object> serverInfo = (HashMap<String, Object>) systemConnection.getConnection();
    TargetServerConfig targetServerConfig = (TargetServerConfig) serverInfo.get("CONFIG");
    HttpGet httpGet = new HttpGet(targetServerConfig.getString(TargetServerConfig.SERVER_HOST));
    try (CloseableHttpClient client = HttpClients.createDefault();
            CloseableHttpResponse response = client.execute(httpGet)) {
        int statusCode = response.getStatusLine().getStatusCode();
        return this.ERROR_CODES.contains(String.valueOf(statusCode)) ? false : true;
    }
     * 
     * @param systemConnection A TargetServerConnection object containing connection
     *                         to the plugged in EXPORTER connection to the plugged
     *                         in external server.
     * @return Boolean
     * @throws Exception
     */
    @SuppressWarnings("unchecked")
    @Override
    public Boolean getConnectivityStatus(TargetServerConnection targetServerConnection) throws Exception {
       // Code to check the health of target server.
    }
}

APM Client reference implementation

This class is used to run the trace export operation and must implement the TraceExportClientService interface.

Example:

/**
    An APM client that publishes traces to the APM cluster.

    This is a thread safe implementation and it shares a single CloseableHttpAsyncClient instance across threads for better performance.

    This method adds a listener to notify the status of the execution to the caller. To add a listener, do the following :

    private List<OutboundOtlpHttpChannelListener> listeners = Collections.synchronizedList(new LinkedList<OutboundOtlpHttpChannelListener>()); 
      
      public void addChannelListener(OutboundOtlpHttpChannelListener listener) {
            listeners.add(listener);
        }
      
      
    To enable loggers in your implementation, get a logger instance from the logger factory with name "e2eagentOtlpTraces". The logs generated will be added to a file e2eagentOtlpTraces.log.

    Usage example:


        private final Logger e2eagentOtlpTraces = LoggerFactory.getLogger("e2eagentOtlpTraces");
 * 
 *
 */
public class APMTraceExportClientServiceImpl implements TraceExporterClientService {
    private final Logger e2eagentOtlpTraces = LoggerFactory.getLogger("e2eagentOtlpTraces");
    
    private List<OutboundOtlpHttpChannelListener> listeners = Collections.synchronizedList(new LinkedList<OutboundOtlpHttpChannelListener>());

/**
 
    This method is used to export the trace data using the Plugged-In external server.

    Usage example: Using the CloseableHttpAsyncClient to send records with single TracesData containing the key/value pairs.

    Get the connection resource from TargetServerConnection

    Create HttpPost or required type with the hostname. URL is dynamically loaded from agent config property field "exporter.url".

    Prepare and set headers for the HTTP Post object. Header values are being dynamically loaded from agent configuration property field "exporter.headers".


        final HashMap<String, Object> serverInfo = (HashMap<String, Object>) systemConnection.getConnection();
        CloseableHttpAsyncClient httpClient = (CloseableHttpAsyncClient) serverInfo.get("CONNECTION_CLIENT");
        TargetServerConfig externalServerConfig = (TargetServerConfig) serverInfo.get("CONFIG");
        HttpPost httpPost = new HttpPost(externalServerConfig.getString(TargetServerConfig.SERVER_HOST));
        String serverHeader = externalServerConfig.getString(TargetServerConfig.HEADERS);
        String contentType = "";
        if (serverHeader != null) {
            String[] headers = serverHeader.split(",");
            for (String header : headers) {
                String[] individualHeader = header.split("#");
                if (individualHeader[0].equalsIgnoreCase("Content-Type")) {
                    contentType = individualHeader[1];
                }
                httpPost.setHeader(individualHeader[0], individualHeader[1]);
            }
        }
        
    Default security headers. Add required security headers based on your needs

    Usage example:

     
        // -- default security headers
        httpPost.setHeader("Content-Security-Policy", "default-src 'none'");
        httpPost.setHeader("Feature-Policy", "microphone 'none'; geolocation 'none'");
        httpPost.setHeader("X-XSS-Protection", "1; mode=block");
        httpPost.setHeader("X-Content-Type-Options", "nosniff");
        httpPost.setHeader("X-Frame-Options", "SAMEORIGIN");
        httpPost.setHeader("Access-Control-Allow-Origin", "null");
        httpPost.setHeader("Referrer-Policy", "no-referrer");
        httpPost.setHeader("Cache-Control", "no-Cache");
        
    Prepare and set supported content type for the request. Content-Type is being added part of header values in agent config.

    Usage example:

     
        if (!contentType.isEmpty() && contentType.equalsIgnoreCase("application/x-protobuf")) {
            byte[] serializedMessage = traceData.toByteArray();
            // Set the serialized protobuf message as the request body
            ByteArrayEntity reqEntity = new ByteArrayEntity(serializedMessage);
            httpPost.setEntity(reqEntity);
        } else {
            String protoJson = JsonFormat.printer().printingEnumsAsInts().omittingInsignificantWhitespace()
                    .print(traceData);
            StringEntity requestEntity = new StringEntity(protoJson, ContentType.APPLICATION_JSON);
            httpPost.setHeader("Accept", "application/json");
            httpPost.setEntity(requestEntity);
            e2eagentOtlpTraces.debug("JSON data: \n " + protoJson);
        }
        
    Post the data using Async mechanism and response is handled in respective callback methods.

    Usage example:

     
         httpClient.execute(httpPost, new FutureCallback<HttpResponse>() {
            @Override
            public void completed(final HttpResponse response) {
                String body = null;
                if (response.getStatusLine().getStatusCode() == 200) {
                    APMTraceExportClientServiceImpl.this.notify(httpPost, true);
                    body = EntityUtils.toString(response.getEntity());                    
                } else {
                    APMTraceExportClientServiceImpl.this.notify(httpPost, false);
                    body = EntityUtils.toString(response.getEntity());                    
                }
            }

            @Override
            public void failed(final Exception e) {
                APMTraceExportClientServiceImpl.this.notify(httpPost, false);            
            }

            @Override
            public void cancelled() {
                APMTraceExportClientServiceImpl.this.notify(httpPost, false);            
            }
        });
        
    The send() method is asynchronous. When called it publishes the record to an OTLP endpoint.

    This allows sending many records in parallel without blocking to wait for the response after each one.
 * 
 * @param systemConnection A ExternalSystemConnection object containing
 *                         parameters specific to run the pushing action through
 *                         the external server.
 * @param traceData        An object array containing trace data.
 * @return A Boolean
 * @throws Exception if an error occurs in the exporting process.
     */
    @SuppressWarnings("unchecked")
    @Override
    public void send(TargetServerConnection systemConnection, TracesData traceData)
            throws Exception {
       // Code to process the trace data to external server.
    }
    
    /**
    
    The status of the execution needs to be notified to the caller. To do, you need to add configured listener as shown in below example.


         listeners.add(listener);
         
    The configured listener class gets a status update (true/false) as and when it receives an HTTP response from the ASYNC execution.

    Usage example code for how to send an update to listener class using a notify separate method.

     
          private void notify(HttpPost httpPost, boolean status) {
              for (OutboundOtlpHttpChannelListener listener : listeners) {
                         TODO - Get the status from Http Response.
                        listener.statusChanged(count, status);
                }
     }
     */
    public void addChannelListener(OutboundOtlpHttpChannelListener listener) {
        listeners.add(listener);
    }
}

Kafka

Kafka Connector reference implementation

This class is used to run the attach and release connectivity operations and must implement the TraceExportConnector interface.

Example:

/**
    A KAFKA client that creates a connection resource that helps to publish traces to KAFKA cluster.

    Usage example of the sample producer to send records with strings containing sequential numbers as the key/value pairs.


     
     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("linger.ms", 1);
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     
     producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
     
     
    To enable loggers in your implementation, get a logger instance from the logger factory with name "e2eagentOtlpTraces". The logs generated will be added to a file e2eagentOtlpTraces.log.

    Usage example:


        private final Logger e2eagentOtlpTraces = LoggerFactory.getLogger("e2eagentOtlpTraces");
     
    Note: In the example, you are adding the serverInfo to systemConnection object.
    Usage example:

    TargetServerConnection object has been initialized in the class level.

     private final TargetServerConnection systemConnection = new TargetServerConnection(); 

        final HashMap<String, Object> serverInfo = new HashMap<String, Object>();
        serverInfo.put("PROPERTIES", props);
        serverInfo.put("CONNECTION", producer);
        systemConnection.setConnection(serverInfo);

 *
 */
public class KafkaTraceExportConnectorImpl implements TraceExporterConnector {
    
    private final Logger e2eagentOtlpTraces = LoggerFactory.getLogger("e2eagentOtlpTraces");
    
    private final TargetServerConnection systemConnection = new TargetServerConnection();
    /**
    This method is used to connect to Plugged-In external KAFKA server from End-to-End Monitoring.

    1. To connect using SSL, do the following steps.

      a. SSL property "exporter.tls" needs to be enabled e.g. "true" in the agent.config file.

      b. If the target server requires a client certificate for the identity

    Usage example for SSL configuration.

        String truststorePassword = targetServerConfig.getString(TargetServerConfig.EXPORTER_TRUSTSTORE_PASSWORD);
        if (truststorePassword != null && !truststorePassword.isEmpty()) {
            kafkaProperties.put("security.protocol", "SSL");
            kafkaProperties.put("ssl.truststore.location",
                    targetServerConfig.getString(TargetServerConfig.TRUSTSTORE_LOCATION));
            kafkaProperties.put("ssl.truststore.password", truststorePassword);
            String keyStoreLoc = targetServerConfig.getString(TargetServerConfig.KEYSTORE_LOCATION);
            if ((keyStoreLoc != null && !keyStoreLoc.isEmpty())) {
                kafkaProperties.put("ssl.keystore.location",
                        targetServerConfig.getString(TargetServerConfig.KEYSTORE_LOCATION));
                kafkaProperties.put("ssl.keystore.password",
                        targetServerConfig.getString(TargetServerConfig.KEYSTORE_PASSWORD));
                kafkaProperties.put("ssl.key.password",
                        targetServerConfig.getString(TargetServerConfig.KEY_PASSWORD));
            }
            if (!targetServerConfig.getBoolean(TargetServerConfig.NO_OP_HOSTName_VERIFIER)) {
                kafkaProperties.put("ssl.endpoint.identification.algorithm=", "");
            }
        } else {
            // -- Password initializing from IS PassMan is in progress.
            return null;
        }
        
    2. Skip the above step if server doesn't require to pass client certificate for the identity.
    3. In case of username and password is configured for the KAFKA cluster then use below mechanism to configure the plain SASL credentials in the connection property.
    Usage example:


        kafkaProperties.put("sasl.jaas.config",    "org.apache.kafka.common.security.plain.PlainLoginModule required username="+user+" password="+password+";");
        kafkaProperties.put("security.protocol", !targetServerConfig.getBoolean(TargetServerConfig.TLS) ? "SASL_PLAINTEXT" : "SASL_SSL");
        kafkaProperties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        
    Note:

    currently single SASL client authentication is supported for both PLAINTEXT and SSL.
    E2EM agent is loaded during the Integration Server startup. Password retrieval from the password manager will take a while so have a logic to re-try until password is received.

    4. The connection object has to be added into TargetServerConnection for the exporter implementation reference.
    TargetServerConnection object has been initialized in the class level. Example is as shown below.

     private final TargetServerConnection systemConnection = new TargetServerConnection(); 

        final HashMap<String, Object> serverInfo = new HashMap<String, Object>();
        serverInfo.put("PROPERTIES", props);
        serverInfo.put("CONNECTION", producer);
        systemConnection.setConnection(serverInfo);
        
    Note:
    Configured passwords are retrieved from Integration Server Password Manager utility.
     
     * 
     * @param targetServerConfig A ExternalServerConfig object with parameters
     *                             specific for connecting to the external server.
     *                             Parameters defined in the Connectivity
     *                             Information while creating the Exporter from E2EM
     *                             can be retrieved using getters.
     * @return ExternalSystemConnection object with connection to KAFKA server
     * @throws Exception
     */
    @Override
    public TargetServerConnection connect(TargetServerConfig targetServerConfig) throws Exception {
       // Code to prepare a connection resource object
        return systemConnection;
    }
    
    /**
     * This method is used to release the KAFKA server connection from End-to-End
     * Monitoring.
     * 
     * Here is an example,
      final HashMap<String, Object> serverInfo = (HashMap<String, Object>) systemConnection.getConnection();
       final Producer<String, TracesData> kafkaServer = (Producer<String, TracesData>) serverInfo .get("CONNECTION");
       kafkaServer.flush();
       kafkaServer.close();
  
     * @param systemConnection A ExternalSystemConnection object containing
     *                         connection to the plugged in EXPORTER connection to
     *                         the plugged in KAFKA server.
     * @throws IOException
     */
    @SuppressWarnings("unchecked")
    @Override
    public void release(TargetServerConnection systemConnection) throws Exception {
     // code to release the connection resource
    }
    /**
     This method is used to check the connectivity with the external Kafka server.

    The status is returned as true/false based on the kafka server response.

    Usage example:


     final HashMap<String, Object> serverInfo = (HashMap<String, Object>) systemConnection.getConnection();
        Properties properties = (Properties) serverInfo.get("PROPERTIES");
        TargetServerConfig externalServerConfig = (TargetServerConfig) serverInfo.get("CONFIG");
        try (AdminClient client = KafkaAdminClient.create(properties)) {
            DescribeTopicsResult topics = client
                    .describeTopics(Arrays.asList(externalServerConfig.getString(TargetServerConfig.KAFKA_TOPIC)));
            return (topics != null?true:false);
     }
     
     */
    @SuppressWarnings("unchecked")
    @Override
    public Boolean getConnectivityStatus(TargetServerConnection externalSystemConnection) throws Exception {
      // Code to check the health of target server.
      return true;
    }
}

Kafka Client reference implementation

This class is used to run the trace export operation and must implement the TraceExportClientService interface.

Example:

/**
 * 
    A KAFKA client implementation that publishes records to the KAFKA cluster.

    The producer is a thread safe implementation and has been optimized for better performance.

    This method adds a listener to notify the status of the execution to the caller. To add a listener, do the following :

    private List<OutboundOtlpHttpChannelListener> listeners = Collections.synchronizedList(new LinkedList<OutboundOtlpHttpChannelListener>()); 
      
      public void addChannelListener(OutboundOtlpHttpChannelListener listener) {
            listeners.add(listener);
        }
      
      
    To enable loggers in your implementation, get a logger instance from the logger factory with name "e2eagentOtlpTraces". The logs generated will be added to a file e2eagentOtlpTraces.log.

    Usage example:

        private final Logger e2eagentOtlpTraces = LoggerFactory.getLogger("e2eagentOtlpTraces");
 *
 */
public class KafkaTraceExportClientServiceImpl implements TraceExporterClientService {
    private final Logger e2eagentOtlpTraces = LoggerFactory.getLogger("e2eagentOtlpTraces");
    
    private List<OutboundOtlpHttpChannelListener> listeners = Collections.synchronizedList(new LinkedList<OutboundOtlpHttpChannelListener>());

    /**
    
    This method is used to export the trace data using the Plugged-In external server.

    Here is a simple example of using the producer to send records containing strings and trace data numbers as the key/value pairs.

    Get the connection resource "Producer" from TargetServerConnection.


        final HashMap<String, Object> serverInfo = (HashMap<String, Object>) systemConnection.getConnection();
        Producer<String, TracesData> producer = (Producer<String, TracesData>) serverInfo.get("CONNECTION");
        TargetServerConfig externalServerConfig = (TargetServerConfig) serverInfo.get("CONFIG");
        producer.send(new ProducerRecord<>(externalServerConfig.getString(TargetServerConfig.KAFKA_TOPIC),
                "trace", traceData), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception e) {
                        if (e != null) {
                            KafkaTraceExportClientServiceImpl.this.notify(traceData.getResourceSpansCount(),
                                    false);
                        } else {
                            long count = metadata.offset();
                            KafkaTraceExportClientServiceImpl.this.notify(traceData.getResourceSpansCount(),
                                    true);
                        }
                    }
                });
         
    The producer send() method is asynchronous. When called, it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency. </p

    The acks config controls the criteria under which requests are considered complete. The default setting "all" will result in blocking on the full commit of the record, the slowest but most durable setting.

    Usage example: The value is configurable in the agent config file. Applicable values are - 0, 1, all


        exporter.ack=all
    If the request fails, the producer can automatically retry. The retries setting defaults to Integer.MAX_VALUE, and it's recommended to use delivery.timeout.ms to control retry behavior, instead of retries.

    Usage example: The value is configurable in the agent config file.


        exporter.retries=0
    Note: As shown in above examples, Kafka properties are configurable in the agent config file. Refer to below list
    exporter.linger_ms_config
    exporter.request_timeout_ms_config
    exporter.send_buffer_config
    exporter.receive_buffer_config
    exporter.compression_type_config
    exporter.max_request_size_config
     * 
     * @param systemConnection A ExternalSystemConnection object containing
     *                         parameters specific to run the pushing action through
     *                         the KAFKA server.
     * @param traceData        An object array containing trace data.
     * @return A Boolean
     * @throws Exception if an error occurs in the exporting process.
     */
    @SuppressWarnings("unchecked")
    @Override
    public void send(TargetServerConnection systemConnection, TracesData traceData)
            throws Exception {      
              // Code to process the trace data to external server.
    }
    
    /**
    The configured listener class gets a status update (true/false) as and when it receives an response from the ASYNC execution.

    Here is an example of how to send an update to listener class using a notify private method.

 
      private void notify(HttpPost httpPost, boolean status) {
          for (OutboundOtlpHttpChannelListener listener : listeners) {
                listener.statusChanged(count, status);
        }
     }
     */
    public void addChannelListener(OutboundOtlpHttpChannelListener listener) {
        listeners.add(listener);
    }
}