Spring Integration 과 함께 - 실전편(1)

Spring Integration Example

Spring Integration Example - Student Registration
학생 Text 파일을 DB 와 File 로 저장하는 예제

 이전 post는 Spring Integration의 간단한 개념을 살펴보았다. 이번에는 동작하는 프로그램을 만든다. Web 에서 전송한 데이터를 DB와 Local File로 저장한다. Web에서 전송한 Text 는 아래와 같다. 순서, 이름, 나이, 성별을 '|' 로 구분하고, 각 학생은 Enter 로 나눴다.

1|김철수|20|MALE
2|이영희|39|FEMALE
3|마이크|21|UNKOWN

 김철수는 등록에 성공한다. 이영희는 나이 때문에 실패한다. 마이크는 성별이 정확하지 않아 오류가 발생한다.

개발환경

  • JDK : OpenJDK 11
  • Spring : 2.4.2
  • DB : h2-1.4.199

구성요소(Component)

Split

 메시지를 분할해서 전송한다. 분할한 메시지는 다음 단계에서 1건 씩 처리한다. 예제에서 각 학생을 Enter 로 분할한다.

Transformer

 메시지의 내용을 추가/수정/삭제할 수 있는 구성요소다. Split 에서 분할한 메시지를 Student 객체로 바꾼다. 

private Student transformToStudent(String text) {
  String[] tokens = text.split("\\|");
  String name = tokens[1].trim();
  Integer age = Integer.valueOf(tokens[2].trim());
  Student.Gender gender = Student.Gender.valueOf(tokens[3].trim());
  return new Student(name, age, gender);
}

Filter

 메시지를 특정 기준에 맞춰 걸러내는 역할을 한다. 기준은 Header와 Payload 모두 적용할 수 있다. 예제는 사정(?)이 있어 30살 미만만 등록할 수 있도록 설정했다. 그리고 Log 를 입력할 수 있도록 별도의 method로 만들었다.

private GenericSelector<Student> filterAge() {
  return source -> {
    if (!(source.getAge() < 30)) {
      System.out.printf("%s 의 나이(%d)가 너무 많습니다.%n",
          source.getName(), source.getAge());
    }
    return source.getAge() < 30;
  };
}

Log

 IntegrationFlow 에서 메세지 내용을 보여주는 method 다. Handle을 시작하면 Payload 가 보이지 않아, handle 전 단계에서 확인을 위해 사용하면 좋다. 

enrichHeader

 메시지의 Header 의 정보를 풍성하게(?) 만든다. 예제는 File 저장을 위한 경로와 이름을 저장한다.

@Bean
public IntegrationFlow registerStudentFlow() {
  return IntegrationFlows.from("students.channel.splitter")
      ....
      .enrichHeaders(h -> {
        h.headerExpression(FileHeaders.FILENAME, "payload.name.concat('.txt')");
        h.header("directory", "d:/integration/");
      })
      ....
      .get();
}

handle

 Service Activator라고 한다. Message를 처리하는 조회/수정/삭제/입력 역할을 한다. 예제에서는 파일 저장과 DB 입력 역할을 한다. DB 저장은 JPA를 사용했다. 파일 저장은 객체를 JSON으로 변환해서 각 학생 이름으로 저장한다.

@Bean
public IntegrationFlow registerStudentFlow() {
  return IntegrationFlows.from("students.channel.splitter")
      ...
      .publishSubscribeChannel(Executors.newCachedThreadPool(),
          pub -> {
            pub
                .subscribe(sub -> {
                  sub.handle(studentJPAExecutor(),
                      e -> e.transactional());
                })
                .subscribe(sub -> {
                  sub.transform(new ObjectToJsonTransformer())
                      .handle(saveFiles());
                });
          })
      .get();
}

그 외...

ExecutorSubscribableChannel

 13000 건을 처리 시 2분 정도 걸려 동시 처리로 Channel 을 변경했다. 시간은 15초로 줄어들었지만, 동시에 CPU가 100% 걸려 득과 실이 존재했다. 좋은 서버 성능을 가지고 있다면 한 번쯤 고려해볼 만하다.

Error Handling

 오류가 발생할 때, 처리하는 Channel 이다. Spring Integration은 오류가 발생 시,    errorChannel 에서 처리하는 Channel을 지정할 수 있다.

@MessagingGateway(errorChannel = "student.channel.error")
public interface StudentGateway {

  @Gateway(requestChannel = "students.channel.splitter")
  public void registerStudents(String value) throws MessagingException;

}

예제는 에러 내용과 오류를 기록한다.

@Bean
public IntegrationFlow errorHandling() {
  return IntegrationFlows.from("student.channel.error")
      .handle(new GenericHandler<ErrorMessage>() {
        @Override
        public Object handle(ErrorMessage payload, MessageHeaders headers) {
          MessagingException exception = ((MessagingException) payload.getPayload());
          System.err.println(exception.getFailedMessage().getPayload());
          System.err.println(exception.getRootCause().getLocalizedMessage());
          return null;
        }
      })
      .get();
}

실행 결과

실행결과


마치며.

 각 Component들을 최대한 활용한 예제를 만들었다. 실무에서는 Error Handling이나 동시성 처리를 위한 고민과 Skill이 더 들어갈 것이다. 다음 예제에서는 Spring 에서 외부 시스템을 호출해서 처리하는 예제를 만들어 보도록 하겠다. Source는 Github에 공유했다.

Resource

댓글

이 블로그의 인기 게시물

JPA 와 함께 - 느낀점

Scott 과 함께 - Recursive Query 구현하기