# DigiWF Message

(opens new window)

Die DigiWF Message Bibliothek ist eine Abstraktionsschicht, die die Kommunikation zwischen verschiedenen Komponenten vereinfacht und die technische Komplexität reduziert.

Die Idee hinter der DigiWF Message Bibliothek ist, immer wiederkehrende Spring Cloud Stream Konfigurationen an einer zentralen Stelle zu lösen und eine API bereitzustellen, um diese zu nutzen. Dadurch muss ein Integrations-Developer nicht mehr in die Tiefe der Spring Cloud Stream Konfigurationen einsteigen und kann sich auf die Implementierung der Integration fokussieren.

Die Bibliothek stellt eigene APIs für das Versenden von Nachrichten bereit, die Nachrichten an einen Message Broker senden. Zusätzlich konfiguriert die Bibliothek Event Routing, wobei Nachrichten einem Consumer durch Namensgleichheit von Header type und Consumer zugeordnet werden.

# Verwendung

Die DigiWF Message Bibliothek stellt die MessageApi bereit, die verwendet wird, um Nachrichten an die entsprechenden Destinations (Zieltopics) zu versenden. Zusätzlich werden APIs für wiederkehrende Nachrichtentypen bereitgestellt, die wiederum auf der MessageApi aufbauen. Hierfür haben wir die ProcessApi und die ErrorApi geschaffen. Die ProcessApi kann verwendet werden, um in DigiWF Prozesse zu starten, Messages an Prozesse zu korrelieren und Fehlerbehandlung durchzuführen. Die ErrorApi stellt die Exceptions BpmnError für fachliche Fehler und IncidentError für technische Fehler bereit, die geworfen und in der Anwendung abgefangen werden können.

Die Destinations für die unterschiedlichen Aktionen können über die application.yml konfiguriert werden ( siehe Konfiguration).

Usage Examples finden Sie

im Example-Module in Github (opens new window).

# MessageApi

Die MessageApi stellt die Methode sendMessage zur Verfügung, die verwendet wird, um eine Nachricht an eine bestimmte Destination zu senden. Eine Message besteht aus einem payload und headers. Der Payload enthält die Daten, die übermittelt werden sollen. Die Headers sind ein Key-Value-Paar, das zusätzliche Informationen zur Nachricht enthält.

Bei DigiWF verwenden wir Spring Cloud Stream, um Nachrichten an Kafka (Message Broker) zu senden. Der Payload ist hierbei das Event, das an Kafka gesendet wird. Die Headers enthalten wichtige Informationen, wie beispielsweise die Prozessinstanz Id, den Type des Events usw.

Usage Example


@RequiredArgsConstructor
public class MessageServiceExample {
    private final MessageApi sendMessageApi;

    public void sendMessageExample(final Message message) {
        // send a message to the destination
        final boolean success = this.sendMessageApi.sendMessage(message, "my-destination");
        System.out.println("Message sent: " + success);
    }

    public void sendMessageWithHeadersExample(final Message message) {
        // example with headers
        final Map<String, Object> headers = Map.of("key", "value");
        final boolean success = this.sendMessageApi.sendMessage(message, headers, "my-destination");
        System.out.println("Message sent: " + success);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# ProcessApi

Die ProcessApi-Schnittstelle stellt Methoden zum Starten von Prozessen und Korrelieren von Nachrichten in Prozessen bereit. Im Hintergrund nutzt die ProcessAPI die MessageApi, um die Nachrichten an die entsprechenden Destinations zu senden. Die Destinations für die unterschiedlichen Aktionen können über die application.yml konfiguriert werden.

Usage Example


@RequiredArgsConstructor
public class ProcessService {
    private final ProcessApi processApi;

    public void sendMessages() {
        // Starten Sie einen neuen Prozess mit dem Schlüssel "meinProzess" und einigen Variablen
        processApi.startProcess("meinProzess", new HashMap<String, Object>());

        // Starten Sie einen neuen Prozess mit dem Schlüssel "meinProzess", einigen Variablen und einem fileContext
        processApi.startProcess("meinProzess", new HashMap<String, Object>(), "fileContext");

        // Korrelieren Sie eine Nachricht mit der Prozessinstanz-ID und einigen Variablen
        processApi.correlateMessage("123", "meineNachricht", new HashMap<String, Object>());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# ErrorApi

Die ErrorApi definiert Methoden zum Behandeln von Fehlern in einem BPMN-Prozess. Dabei wird zwischen fachlichen (BpmnError) und technischen Fehlern (IncidentError) unterschieden. BpmnError können im Prozess abgefangen und verarbeitet werden. Ein IncidentError führt hingegen zu einem Incident im Prozess.

Die ErrorApi stellt Exceptions für die beiden Fehler bereit, die geworfen und in der Anwendung abgefangen werden können. Für die Fehlerbehandlung werden ebenfalls Methoden bereitgestellt, die Nachrichten an die konfigurierten Destinations senden.

Usage Example

import java.util.Map;

@RequiredArgsConstructor
public class Example {
    private final ErrorApi errorApi;

    public void sendErrorMessages() {
        this.errorApi.handleBpmnError("ProcessInstanceID", "400", "Foo is not bar");

        this.errorApi.handleIncidentError("ProcessInstanceId", "The origin message", "The error message");
    }

    public void sendErrorMessagesWithException() {
        final Map<String, Object> originMessageHeaders = Map.of(
                "digiwf.messagename", "theMessageName",
                "digiwf.processinstanceid", "theProcessInstanceId"
        );

        try {
            throw new BpmnError("400", "Foo is not bar");
        } catch (final BpmnError bpmnError) {
            this.errorApi.handleBpmnError(message.getHeaders(), bpmnError);
        }

        try {
            throw new IncidentError("Foo is not bar");
        } catch (final IncidentError incidentError) {
            this.errorApi.handleIncident(message.getHeaders(), incidentError);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# Spring Cloud Stream Event Routing

Die DigiWF Message Bibliothek konfiguriert auch die Properties für das Event Routing von Spring Cloud Stream. Mit dem Event Routing können Nachrichten einem Consumer zugeordnet werden, wenn der Name des Consumers und der Header type übereinstimmen. (Siehe https://docs.spring.io/spring-cloud-stream/reference/spring-cloud-stream/event-routing.html (opens new window).) Nutzer der Bibliothek müssen nur noch folgende Properties setzen.

# Konfiguration

spring:
  cloud:
    stream:
      bindings:
        functionRouter-in-0:
          group: "dwf-digiwf-example-integration-local-01"
          destination: "digiwf-example-integration-local-01"
        sendMessage-out-0:
          destination: "digiwf-example-integration-local-01"
  [ ... ]
io:
  muenchendigital:
    digiwf:
      message:
        incidentDestination: "digiwf-example-integration-incident"
        bpmnErrorDestination: "digiwf-example-integration-technical-error"
        correlateMessageDestination: "digiwf-example-integration-correlate-message"
        startProcessDestination: "digiwf-message-scs-example-start-process"
        deadLetterQueueDestination: "digiwf-example-integration-incident"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Property Description
spring.cloud.stream.bindings.functionRouter-in-0.group Group name for the consumer(s) bound to the functionRouter-in-0 input binding
spring.cloud.stream.bindings.functionRouter-in-0.destination Destination (or kafak topic) to which the functionRouter-in-0 input binding should listen for messages
spring.cloud.stream.bindings.sendMessage-out-0.destination Destination to which the sendMessage-out-0 output binding should send messages
io.muenchendigital.digiwf.message.incidentDestination Destination to redirect incidents to (e.g. Kafka Topic)
io.muenchendigital.digiwf.message.bpmnErrorDestination Destination to redirect technical errors a.k.a. bpmn errors to (e.g. Kafka Topic)
io.muenchendigital.digiwf.message.correlateMessageDestination Destination to send correlate messages to (e.g. Kafka Topic)
io.muenchendigital.digiwf.message.startProcessDestination Destination to send start process messages to (e.g. Kafka Topic)
io.muenchendigital.digiwf.message.deadLetterQueueDestination Destination to send failing messages events to (e.g. Kafka Topic)

# Anpassbarkeit

Für die ErrorApi, ProcessApi und MessageApi stellen wir eine Standardimplementierung bereit, die auf Spring Cloud Stream basiert. Möchte man diese Implementierung ändern, kann man die entsprechenden Schnittstellen implementieren und als Bean bereitstellen.

Ein Beispiel für eine MessageApi implementierung, die lediglich die Nachrichten loggt, findet man in unserem Beispiel (opens new window).