Elaborazione dei messaggi utilizzando il provider Kafka

Con un provider di messaggistica Kafka , è possibile elaborare i messaggi in uscita in modo sequenziale e i messaggi in entrata in modo sequenziale o continuo. In generale, l'elaborazione dei messaggi Kafka è simile all'elaborazione dei messaggi JMS.

Kafka elaborazione dei messaggi

Kafka tiene traccia dei messaggi elaborati conservando un valore di offset che rappresenta l'ordine sequenziale in cui i messaggi vengono ricevuti dagli argomenti Kafka . Il valore di offset indica il successivo messaggio da elaborare. Una volta elaborato un messaggio in Kafka, l'offset è ora oltre tale messaggio elaborato e il messaggio non viene mai più elaborato.

Il messaggio elaborato non viene eliminato dalla coda. Rimane nella coda fino a quando non viene soddisfatta la scadenza configurata per il messaggio.

Maximo® Manage fornisce un'applicazione che consente di rielaborare i messaggi che presentano errori. Consultare ulteriori informazioni sull'elaborazione degli errori.

Limiti di dimensione del messaggio

Configuri un limite di dimensione del messaggio nel server Kafka . Quando si configura questo limite, considerare la dimensione delle transazioni, inclusa la dimensione degli allegati integrati. La dimensione elevata del messaggio può avere un impatto negativo sulle prestazioni nella lettura e scrittura dei messaggi nella coda.

In Maximo Manage, è possibile aggiungere la proprietà di sistema mxe.kafka.messagesize per la dimensione massima del messaggio che è possibile elaborare. Configurare la proprietà in modo che corrisponda alla dimensione del messaggio, in byte, impostata sul server Kafka . L'impostazione predefinita è 10 MB.

Kafka

In Maximo Manage, Kafka le azioni di scrittura attendono il riconoscimento dai broker Kafka . È possibile controllare il timeout di scrittura per il produttore Kafka utilizzando la proprietà mxe.kafka.waittimeforack . L'unità di misurazione è in millisecondi.

Elaborazione messaggi sequenziale in uscita

Il framework di integrazione utilizza argomenti Kafka come code di messaggi sequenziali in uscita. Quando un messaggio viene inviato dal framework di integrazione, viene instradato al tuo provider Kafka , dove è ospitato come messaggio JSON in un argomento predefinito. Anche se i canali di pubblicazione possono inviare messaggi in formato XML o JSON, il prodotto li avvolge come messaggi formattati JSON e li scrive nell'argomento Kafka . Un messaggio in entrata utilizza la struttura mostrata nel seguente esempio di codice:
{
      “interfacetype”:”MAXIMO”,
      “INTERFACE”:”MXPRInterface”,
      “payload”:”<xml/json message>”,
      “SENDER”:”MX”,
      “destination”:”<external system name>”,
      “destjndiname”:”<kafka topic name>”,
      “compressed”:”1”,
      “MEAMessageID”:”<providername>~<topic>~<partition>~<offset>”,
      “mimetype”:”application/xml”
}
La proprietà payload contiene il payload inviato all'endpoint. L'intero messaggio Kafka viene compresso al momento dell'archiviazione all'interno delle partizioni Kafka . Nelle partizioni Kafka , i messaggi vengono memorizzati in un ordine di data e ora rigoroso. Alla posizione del messaggio nella partizione viene assegnato un numero ID sequenziale noto come offsetdel messaggio. Sebbene gli argomenti Kafka possano essere partizionati, gli argomenti elaborati in modo sequenziale devono trovarsi in una singola partizione.

Dopo che il messaggio è stato scritto nell'argomento Kafka , viene elaborato dalla crontask Kafka associata che configuri nell'applicazione Configurazione crontask per ogni argomento. L'elaborazione si verifica in base al sistema esterno e all'endpoint configurati per il canale.

Elaborazione sequenziale in entrata

L'elaborazione dei messaggi in entrata può essere sequenziale o continua. L'elaborazione sequenziale in entrata è simile all'elaborazione sequenziale in entrata e richiede gli stessi tipi di configurazione, inclusa un'istanza della crontask Kafka .

La struttura di un messaggio in entrata sequenziale viene mostrata nel seguente esempio:
{
     “interfacetype”: "MAXIMO",
     “SENDER”: "testkafka",
     “destination”: "testkafka",
     “USER”: "wilson",
     “MEAMessageID”: "….",
     “INTERFACE”: "MXASSETInterface",
     “payload”: "{"assetnum":"AZ163","siteid":"BEDFORD"}",
     “mimetype”: "application/json",
    “destjndiname”: "anamitratestcont"
} 

Configurare le code sequenziali con una singola partizione e non più di un'istanza crontask per utilizzare i messaggi dalla coda.

Consultare ulteriori informazioni sull'elaborazione degli errori.

Elaborazione e scalabilità delle code continue in uscita e in ingresso

In generale, e a differenza delle code sequenziali, l'elaborazione della coda continua non si arresta quando un messaggio presenta un errore. I messaggi con errori vengono scritti nella tabella di errori ed elaborati dalla tabella di errori dall'istanza della crontask KAFKAERROR per la rispettiva coda.

Una singola coda Kafka può essere ridimensionata come multithreado come se avesse più consumer di messaggi, definendo più partizioni per la coda. Ogni partizione rappresenta un consumatore separato e ogni partizione richiede la propria istanza crontask per supportare l'utilizzo dei messaggi per la partizione. Per ridimensionare l'elaborazione, configurare più partizioni. Il numero di istanze crontask deve essere uguale al numero di partizioni.

La coda continua in uscita scrive i messaggi in uscita per le interfacce se selezionata nell'applicazione Sistemi esterni dell'applicazione Maximo Manage . Più sistemi esterni possono utilizzare la stessa coda continua in uscita.

L'elaborazione continua dei messaggi in entrata con Kafka è simile all'elaborazione dei messaggi JMS, tuttavia, ci sono differenze nella scalabilità e nell'elaborazione degli errori.

Consultare ulteriori informazioni sull'elaborazione degli errori dei messaggi nelle code continue e sulla configurazione di un ritardo di riconsegna.