Spring Integration 과 함께 - 실전편(1)
Spring Integration Example
![]() |
학생 Text 파일을 DB 와 File 로 저장하는 예제 |
이전 post는 Spring Integration의 간단한 개념을 살펴보았다. 이번에는 동작하는 프로그램을 만든다. Web 에서 전송한 데이터를 DB와 Local File로 저장한다. Web에서 전송한 Text 는 아래와 같다. 순서, 이름, 나이, 성별을 '|' 로 구분하고, 각 학생은 Enter 로 나눴다.
1|김철수|20|MALE 2|이영희|39|FEMALE 3|마이크|21|UNKOWN
김철수는 등록에 성공한다. 이영희는 나이 때문에 실패한다. 마이크는 성별이 정확하지 않아 오류가 발생한다.
개발환경
- JDK : OpenJDK 11
- Spring : 2.4.2
- DB : h2-1.4.199
구성요소(Component)
Split
Transformer
메시지의 내용을 추가/수정/삭제할 수 있는 구성요소다. Split 에서 분할한 메시지를 Student 객체로 바꾼다.
private Student transformToStudent(String text) { String[] tokens = text.split("\\|"); String name = tokens[1].trim(); Integer age = Integer.valueOf(tokens[2].trim()); Student.Gender gender = Student.Gender.valueOf(tokens[3].trim()); return new Student(name, age, gender); }
Filter
메시지를 특정 기준에 맞춰 걸러내는 역할을 한다. 기준은 Header와 Payload 모두 적용할 수 있다. 예제는 사정(?)이 있어 30살 미만만 등록할 수 있도록 설정했다. 그리고 Log 를 입력할 수 있도록 별도의 method로 만들었다.
private GenericSelector<Student> filterAge() { return source -> { if (!(source.getAge() < 30)) { System.out.printf("%s 의 나이(%d)가 너무 많습니다.%n", source.getName(), source.getAge()); } return source.getAge() < 30; }; }
Log
enrichHeader
메시지의 Header 의 정보를 풍성하게(?) 만든다. 예제는 File 저장을 위한 경로와 이름을 저장한다.
@Bean public IntegrationFlow registerStudentFlow() { return IntegrationFlows.from("students.channel.splitter") .... .enrichHeaders(h -> { h.headerExpression(FileHeaders.FILENAME, "payload.name.concat('.txt')"); h.header("directory", "d:/integration/"); }) .... .get(); }
handle
Service Activator라고 한다. Message를 처리하는 조회/수정/삭제/입력 역할을 한다. 예제에서는 파일 저장과 DB 입력 역할을 한다. DB 저장은 JPA를 사용했다. 파일 저장은 객체를 JSON으로 변환해서 각 학생 이름으로 저장한다.
@Bean public IntegrationFlow registerStudentFlow() { return IntegrationFlows.from("students.channel.splitter") ... .publishSubscribeChannel(Executors.newCachedThreadPool(), pub -> { pub .subscribe(sub -> { sub.handle(studentJPAExecutor(), e -> e.transactional()); }) .subscribe(sub -> { sub.transform(new ObjectToJsonTransformer()) .handle(saveFiles()); }); }) .get(); }
그 외...
ExecutorSubscribableChannel
Error Handling
오류가 발생할 때, 처리하는 Channel 이다. Spring Integration은 오류가 발생 시, errorChannel 에서 처리하는 Channel을 지정할 수 있다.
@MessagingGateway(errorChannel = "student.channel.error") public interface StudentGateway { @Gateway(requestChannel = "students.channel.splitter") public void registerStudents(String value) throws MessagingException; }
예제는 에러 내용과 오류를 기록한다.
@Bean public IntegrationFlow errorHandling() { return IntegrationFlows.from("student.channel.error") .handle(new GenericHandler<ErrorMessage>() { @Override public Object handle(ErrorMessage payload, MessageHeaders headers) { MessagingException exception = ((MessagingException) payload.getPayload()); System.err.println(exception.getFailedMessage().getPayload()); System.err.println(exception.getRootCause().getLocalizedMessage()); return null; } }) .get(); }
실행 결과
![]() |
실행결과 |
마치며.
Resource
- https://github.com/manojp1988/spring-integration/blob/master/javadsl/src/main/java/ErrorChannel/ErrorChannelExample.java
- https://docs.spring.io/spring-integration/docs/current/reference/html/
- https://sup2is.github.io/2020/08/12/what-is-spring-integration.html
- https://stackoverflow.com/questions/36755068/how-to-create-messageselector-filter-in-spring-integration-java-dsl
- https://gitter.im/spring-projects/spring-integration?at=5aa2592e8f1c77ef3a9e50f9
댓글
댓글 쓰기