# DigiWF Message
Die DigiWF Message Bibliothek ist eine Abstraktionsschicht, die die Kommunikation zwischen verschiedenen Komponenten vereinfacht und die technische Komplexität reduziert.
Die Idee hinter der DigiWF Message Bibliothek ist, immer wiederkehrende Spring Cloud Stream Konfigurationen an einer zentralen Stelle zu lösen und eine API bereitzustellen, um diese zu nutzen. Dadurch muss ein Integrations-Developer nicht mehr in die Tiefe der Spring Cloud Stream Konfigurationen einsteigen und kann sich auf die Implementierung der Integration fokussieren.
Die Bibliothek stellt eigene APIs für das Versenden von Nachrichten bereit, die Nachrichten an einen Message Broker
senden. Zusätzlich konfiguriert die Bibliothek Event Routing, wobei Nachrichten einem Consumer
durch
Namensgleichheit von Header type
und Consumer zugeordnet werden.
# Verwendung
Die DigiWF Message Bibliothek stellt die MessageApi bereit, die verwendet wird, um Nachrichten an die entsprechenden
Destinations (Zieltopics) zu versenden.
Zusätzlich werden APIs für wiederkehrende Nachrichtentypen bereitgestellt, die wiederum auf der MessageApi aufbauen. Hierfür
haben wir die ProcessApi und die ErrorApi geschaffen. Die ProcessApi kann verwendet werden, um in DigiWF Prozesse zu
starten, Messages an Prozesse zu korrelieren und Fehlerbehandlung durchzuführen. Die ErrorApi stellt die
Exceptions BpmnError
für fachliche Fehler und IncidentError
für technische Fehler bereit, die geworfen und in der
Anwendung abgefangen werden können.
Die Destinations für die unterschiedlichen Aktionen können über die application.yml
konfiguriert werden (
siehe Konfiguration).
Usage Examples finden Sie
im Example-Module in Github (opens new window).
# MessageApi
Die MessageApi stellt die Methode sendMessage
zur Verfügung, die verwendet wird, um eine Nachricht an eine bestimmte
Destination zu senden. Eine Message besteht aus einem payload
und headers
. Der Payload enthält die Daten, die
übermittelt werden sollen. Die Headers sind ein Key-Value-Paar, das zusätzliche Informationen zur Nachricht enthält.
Bei DigiWF verwenden wir Spring Cloud Stream, um Nachrichten an Kafka (Message Broker) zu senden. Der Payload ist hierbei das Event, das an Kafka gesendet wird. Die Headers enthalten wichtige Informationen, wie beispielsweise die Prozessinstanz Id, den Type des Events usw.
Usage Example
@RequiredArgsConstructor
public class MessageServiceExample {
private final MessageApi sendMessageApi;
public void sendMessageExample(final Message message) {
// send a message to the destination
final boolean success = this.sendMessageApi.sendMessage(message, "my-destination");
System.out.println("Message sent: " + success);
}
public void sendMessageWithHeadersExample(final Message message) {
// example with headers
final Map<String, Object> headers = Map.of("key", "value");
final boolean success = this.sendMessageApi.sendMessage(message, headers, "my-destination");
System.out.println("Message sent: " + success);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# ProcessApi
Die ProcessApi
-Schnittstelle stellt Methoden zum Starten von Prozessen und Korrelieren von Nachrichten in Prozessen
bereit. Im Hintergrund nutzt die ProcessAPI die MessageApi, um die Nachrichten an die entsprechenden Destinations zu
senden. Die Destinations für die unterschiedlichen Aktionen können über die application.yml
konfiguriert werden.
Usage Example
@RequiredArgsConstructor
public class ProcessService {
private final ProcessApi processApi;
public void sendMessages() {
// Starten Sie einen neuen Prozess mit dem Schlüssel "meinProzess" und einigen Variablen
processApi.startProcess("meinProzess", new HashMap<String, Object>());
// Starten Sie einen neuen Prozess mit dem Schlüssel "meinProzess", einigen Variablen und einem fileContext
processApi.startProcess("meinProzess", new HashMap<String, Object>(), "fileContext");
// Korrelieren Sie eine Nachricht mit der Prozessinstanz-ID und einigen Variablen
processApi.correlateMessage("123", "meineNachricht", new HashMap<String, Object>());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# ErrorApi
Die ErrorApi definiert Methoden zum Behandeln von Fehlern in einem BPMN-Prozess. Dabei wird zwischen
fachlichen (BpmnError
) und technischen Fehlern (IncidentError
) unterschieden. BpmnError können im Prozess abgefangen
und verarbeitet werden. Ein IncidentError führt hingegen zu einem Incident im Prozess.
Die ErrorApi stellt Exceptions für die beiden Fehler bereit, die geworfen und in der Anwendung abgefangen werden können. Für die Fehlerbehandlung werden ebenfalls Methoden bereitgestellt, die Nachrichten an die konfigurierten Destinations senden.
Usage Example
import java.util.Map;
@RequiredArgsConstructor
public class Example {
private final ErrorApi errorApi;
public void sendErrorMessages() {
this.errorApi.handleBpmnError("ProcessInstanceID", "400", "Foo is not bar");
this.errorApi.handleIncidentError("ProcessInstanceId", "The origin message", "The error message");
}
public void sendErrorMessagesWithException() {
final Map<String, Object> originMessageHeaders = Map.of(
"digiwf.messagename", "theMessageName",
"digiwf.processinstanceid", "theProcessInstanceId"
);
try {
throw new BpmnError("400", "Foo is not bar");
} catch (final BpmnError bpmnError) {
this.errorApi.handleBpmnError(message.getHeaders(), bpmnError);
}
try {
throw new IncidentError("Foo is not bar");
} catch (final IncidentError incidentError) {
this.errorApi.handleIncident(message.getHeaders(), incidentError);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# Spring Cloud Stream Event Routing
Die DigiWF Message Bibliothek konfiguriert auch die Properties für das Event Routing von Spring Cloud Stream. Mit dem
Event Routing können Nachrichten einem Consumer
zugeordnet werden, wenn der Name des Consumers und der Header type
übereinstimmen. (Siehe
https://docs.spring.io/spring-cloud-stream/reference/spring-cloud-stream/event-routing.html (opens new window).)
Nutzer der Bibliothek müssen nur noch folgende Properties setzen.
# Konfiguration
spring:
cloud:
stream:
bindings:
functionRouter-in-0:
group: "dwf-digiwf-example-integration-local-01"
destination: "digiwf-example-integration-local-01"
sendMessage-out-0:
destination: "digiwf-example-integration-local-01"
[ ... ]
io:
muenchendigital:
digiwf:
message:
incidentDestination: "digiwf-example-integration-incident"
bpmnErrorDestination: "digiwf-example-integration-technical-error"
correlateMessageDestination: "digiwf-example-integration-correlate-message"
startProcessDestination: "digiwf-message-scs-example-start-process"
deadLetterQueueDestination: "digiwf-example-integration-incident"
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Property | Description |
---|---|
spring.cloud.stream.bindings.functionRouter-in-0.group | Group name for the consumer(s) bound to the functionRouter-in-0 input binding |
spring.cloud.stream.bindings.functionRouter-in-0.destination | Destination (or kafak topic) to which the functionRouter-in-0 input binding should listen for messages |
spring.cloud.stream.bindings.sendMessage-out-0.destination | Destination to which the sendMessage-out-0 output binding should send messages |
io.muenchendigital.digiwf.message.incidentDestination | Destination to redirect incidents to (e.g. Kafka Topic) |
io.muenchendigital.digiwf.message.bpmnErrorDestination | Destination to redirect technical errors a.k.a. bpmn errors to (e.g. Kafka Topic) |
io.muenchendigital.digiwf.message.correlateMessageDestination | Destination to send correlate messages to (e.g. Kafka Topic) |
io.muenchendigital.digiwf.message.startProcessDestination | Destination to send start process messages to (e.g. Kafka Topic) |
io.muenchendigital.digiwf.message.deadLetterQueueDestination | Destination to send failing messages events to (e.g. Kafka Topic) |
# Anpassbarkeit
Für die ErrorApi, ProcessApi und MessageApi stellen wir eine Standardimplementierung bereit, die auf Spring Cloud Stream basiert. Möchte man diese Implementierung ändern, kann man die entsprechenden Schnittstellen implementieren und als Bean bereitstellen.
Ein Beispiel für eine MessageApi implementierung, die lediglich die Nachrichten loggt, findet man in unserem Beispiel (opens new window).