엘라스틱서치2019. 2. 13. 17:44

해당 포스트는 다음 링크의 포스트를 참고하여 작성한 것입니다.

[바로가기]


이 포스트에서는 Bulk Api와 High-Level-Rest-Client를 활용하여 Elasticsearch에 대량의 데이터를 

업로드하는 방법을 소개합니다. 


※ Elasticsearch는 5.6.0 버전에서 Rest Client를 Low-Level과 High-Level 두 가지로 분리시켰습니다.

   Transport Client는 7.0에서 deprecated되었으며 8.0에서는 삭제시킬 예정이라고 합니다.

   이 글에서는 Low-Level-Rest-Client와 Hight-Level-Rest-Client의 차이점에 대한 내용은 생략합니다.


데이터는 JSON 파일이고 Elasticsearch에 업로드되는 과정은 다음과 같습니다.


사내 서버에서 1시간 마다 로그 파일 생성 -> 스프링 스케쥴러를 이용해 매 시간마다 해당 로그 파일을 로드

-> 해당 로그파일을 파싱하지 않고 바로 Bulk Api를 사용해 Elasticsearch에 PUT



0. 시스템 환경

- Spring Boot 2.1.2 RELEASE

- Spring webflux

- JAVA 8

- Elasticsearch 6.6.0 


1. JSON 파일 준비

- Bulk API를 사용하기 위해서는 줄바꿈 형식의 데이터셋이 필요합니다.

  즉, 마지막 행을 비롯하여 모든 행의 끝에는 개행문자가 있어야 합니다.(\n)

  예를 들면 다음과 같습니다.

{ "data" : "1" }
{ "data" : "2" }
...


2. BulkProcessor vs BulkRequestBuilder

- 이 포스트에서는 BulkRequest를 사용합니다. 원글에서는 두 가지 모두를 사용한 예제가 있습니다.

- 둘의 차이점은 다음 링크에서 확인할 수 있습니다.

  [바로가기]


3. application.properties

elasticsearch.host=127.0.0.1
elasticsearch.port=9200


4. Elasticsearch Config 파일 작성

 - RestHighLevelClient를 Bean으로 등록해 필요할 때마다 꺼내쓸 수 있도록 합니다.

@Configuration
public class ElasticSearchConfig {
  @Value("${elasticsearch.host}")
  private String host;
  
  @Value("${elasticsearch.port}")
  private int port;

  @Bean
  public RestHighLevelClient getRestClient() {
    RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, "http"));
    return new RestHighLevelClient(builder);
  }
}


5. Shceduler 작성

 - 스케쥴러를 사용하기 위해서는 별도의 Bean을 등록해야 하므로 프로젝트 Application에 추가합니다.

 - @EnableScheduling 어노테이션을 명시하지 않을 경우 어플리케이션은 시작과 동시에 종료됩니다.

 - 예제는 단일쓰레드이며 쓰레드 풀이 필요하다면 ThreadPoolTaskScheduler를 사용하면 됩니다.

@SpringBootApplication
@EnableScheduling
public class ExampleApplication {

  public static void main(String[] args) {
    SpringApplication.run(ExampleApplication.class, args);
  }

  // 추가된 부분
  @Bean
  public TaskScheduler taskScheduler() {
    return new ConcurrentTaskScheduler();
  }
}


6. Sheduler 작성

@Component
public class ExampleScheduler {
  @Autowired ExampleService exampleService;

  @Scheduled(cron="15 0 * * * *")
  public void getJSONFileFromServer() throws IOException {
    // 파일은 "년-월-일-시"의 형식으로 생성
    Date date = new Date();
    SimpleDateFormat format = new SimpleDateFormat("YYYY-MM-dd-HH"); 
    requestIndexToElasticFromJsonFile(format.format(date));
  }

  private void requestIndexToElasticFromJsonFile(String fileName) {
    File file = new File("서버파일경로" + fileName);
    if(!file.isFile()) { // 파일이 아직 생성되지 않았다면
      fileCheckWithTimer(fileName);
      return;
    }
    exampleService.bulkToElasticsearch(fileName);
  }

  private void fileCheckWithTimer(String fileName) {
    File file = new File("서버파일경로" + fileName);
 
    Timer timer = new Timer();
    TimerTask timerTask = new TimerTask() {
      int count = 0;
      @Override
      public void run() {
        if(file.isFile()) {
          timer.cancel();
          try {
            exampleService.bulkToElasticsearch(fileName);
          } catch(IOException e) { 
            e.printStackTrace();
          }
        } else if(!file.isFile() && count == 58) {
          // 1시간이 지나도록 파일이 생성되지 않으면 다음 파일 작업을 위해 중단
          timer.cancel();
        } else
          System.out.println("File " + fileName + " is searching");
        count++;
      }
    };
    timer.schedule(timerTask, 0, 60000); // 1분마다 
  }
}


7. ServiceImpl 작성

@Component
public class ExampleServiceImpl implements ExampleService {
  @Autowired RestHighLevelClient restHighLevelClient;
  
  ...

  public void bulkToElasticsearch(String fileName) thorws IOException {
    String filePath = "서버파일경로" + fileName;
    File file = new File(filePath);
    BulkRequest bulkRequest = new BulkRequest();

    int id = 0; // id를 함께 인덱스하기 위한 시작값 생성
    int batch = 1000; 
    String line;

    BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));

    while((line = br.readLine()) != null) {
      // json 파일을 행 단위로 읽는 족족 IndexRequest에 담는다.
      IndexRequest indexRequest = new IndexRequest("index", fileName, id + "");
      indexRequest.source(line, XContentType.JSON);
      bulkRequest.add(indexRequest);

      if(id % batch == 0) { // 1000개가 넘을 경우 1000 단위로 인덱스한다.
        BulkResponose bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        if(bulkResponse.hasFailures()) {
          for(BulkItemResponse bulkItemResponse : bulkResponse) {
            if(bulkItemResponse.isFailed()) {
              BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
              System.out.println("Error : " + failure.toString());
            }
          }
        }
        System.out.println("Uploaded bulk id : " + id);
        bulkRequest = new BulkRequest();
      }
      id++;
    }

    if(bulkRequest.numberOfActions() > 0) {
      // 코드가 반복되므로 차후 리팩토링
      BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
      if(bulkResponse.hasFailures()) {
        for(BulkItemResponse bulkItemResponse : bulkResponse) {
          if(bulkItemResponse.isFailed()) {
            BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
            System.out.println("Error : " + failure.toString());
          }
        }
      }
    }
    System.out.println("Total uploaded : " + id);
    restHighLevelClient.close();
    br.close();
  }

  ...

}


8. 결과

 - 다음과 같이 1938개의 도큐먼트를 1000개 단위로 엘라스틱서치에 업로드하는 것을 성공했습니다.




9. 기타

 - BulkProcessor를 사용하면 자동으로 문서 단위로 벌크 업로드를 해주는 거 같습니다. 이건 다음에...;

 - builder를 생성하고 builder 옵션을 통해 설정할 수 있는데 기본 값은 1000개의 도큐먼트 단위, 5MB의 크기, 

   1개의 동시 요청 등이 있습니다. 기본 옵션만으로도 BulkProcessor가 알아서 벌크 업로드를 하는 듯 합니다.

 - BulkProcessor의 기능도 익히고 나면 이게 더 나은 선택이 될 수도 있겠습니다. 

 - 일단 사내 테스트 용으로 생성된 파일이 단위당 1200만건 정도니, 파일을 확보 후에 

   BulkRequest와 BulkProcessor 둘 다 테스트 후 사용하는 것이 좋을 듯 하네요.



이상으로 Elasticsearch 6.6.0 및 Bulk Api와 Spring Boot를 활용한 대량 데이터 업로드 예제를 마치겠습니다.


수고했어 김과장~


Posted by 홍규홍규