Bulk Api를 활용하여 Elasticsearch에 대량 데이터 인덱스하기
해당 포스트는 다음 링크의 포스트를 참고하여 작성한 것입니다.
[바로가기]
이 포스트에서는 Bulk Api와 High-Level-Rest-Client를 활용하여 Elasticsearch에 대량의 데이터를
업로드하는 방법을 소개합니다.
※ Elasticsearch는 5.6.0 버전에서 Rest Client를 Low-Level과 High-Level 두 가지로 분리시켰습니다.
Transport Client는 7.0에서 deprecated되었으며 8.0에서는 삭제시킬 예정이라고 합니다.
이 글에서는 Low-Level-Rest-Client와 Hight-Level-Rest-Client의 차이점에 대한 내용은 생략합니다.
데이터는 JSON 파일이고 Elasticsearch에 업로드되는 과정은 다음과 같습니다.
사내 서버에서 1시간 마다 로그 파일 생성 -> 스프링 스케쥴러를 이용해 매 시간마다 해당 로그 파일을 로드
-> 해당 로그파일을 파싱하지 않고 바로 Bulk Api를 사용해 Elasticsearch에 PUT
0. 시스템 환경
- Spring Boot 2.1.2 RELEASE
- Spring webflux
- JAVA 8
- Elasticsearch 6.6.0
1. JSON 파일 준비
- Bulk API를 사용하기 위해서는 줄바꿈 형식의 데이터셋이 필요합니다.
즉, 마지막 행을 비롯하여 모든 행의 끝에는 개행문자가 있어야 합니다.(\n)
예를 들면 다음과 같습니다.
{ "data" : "1" }
{ "data" : "2" }
...
2. BulkProcessor vs BulkRequestBuilder
- 이 포스트에서는 BulkRequest를 사용합니다. 원글에서는 두 가지 모두를 사용한 예제가 있습니다.
- 둘의 차이점은 다음 링크에서 확인할 수 있습니다.
[바로가기]
3. application.properties
elasticsearch.host=127.0.0.1
elasticsearch.port=9200
4. Elasticsearch Config 파일 작성
- RestHighLevelClient를 Bean으로 등록해 필요할 때마다 꺼내쓸 수 있도록 합니다.
@Configuration
public class ElasticSearchConfig {
@Value("${elasticsearch.host}")
private String host;
@Value("${elasticsearch.port}")
private int port;
@Bean
public RestHighLevelClient getRestClient() {
RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, "http"));
return new RestHighLevelClient(builder);
}
}
5. Shceduler 작성
- 스케쥴러를 사용하기 위해서는 별도의 Bean을 등록해야 하므로 프로젝트 Application에 추가합니다.
- @EnableScheduling 어노테이션을 명시하지 않을 경우 어플리케이션은 시작과 동시에 종료됩니다.
- 예제는 단일쓰레드이며 쓰레드 풀이 필요하다면 ThreadPoolTaskScheduler를 사용하면 됩니다.
@SpringBootApplication
@EnableScheduling
public class ExampleApplication {
public static void main(String[] args) {
SpringApplication.run(ExampleApplication.class, args);
}
// 추가된 부분
@Bean
public TaskScheduler taskScheduler() {
return new ConcurrentTaskScheduler();
}
}
6. Sheduler 작성
@Component
public class ExampleScheduler {
@Autowired ExampleService exampleService;
@Scheduled(cron="15 0 * * * *")
public void getJSONFileFromServer() throws IOException {
// 파일은 "년-월-일-시"의 형식으로 생성
Date date = new Date();
SimpleDateFormat format = new SimpleDateFormat("YYYY-MM-dd-HH");
requestIndexToElasticFromJsonFile(format.format(date));
}
private void requestIndexToElasticFromJsonFile(String fileName) {
File file = new File("서버파일경로" + fileName);
if(!file.isFile()) { // 파일이 아직 생성되지 않았다면
fileCheckWithTimer(fileName);
return;
}
exampleService.bulkToElasticsearch(fileName);
}
private void fileCheckWithTimer(String fileName) {
File file = new File("서버파일경로" + fileName);
Timer timer = new Timer();
TimerTask timerTask = new TimerTask() {
int count = 0;
@Override
public void run() {
if(file.isFile()) {
timer.cancel();
try {
exampleService.bulkToElasticsearch(fileName);
} catch(IOException e) {
e.printStackTrace();
}
} else if(!file.isFile() && count == 58) {
// 1시간이 지나도록 파일이 생성되지 않으면 다음 파일 작업을 위해 중단
timer.cancel();
} else
System.out.println("File " + fileName + " is searching");
count++;
}
};
timer.schedule(timerTask, 0, 60000); // 1분마다
}
}
7. ServiceImpl 작성
@Component
public class ExampleServiceImpl implements ExampleService {
@Autowired RestHighLevelClient restHighLevelClient;
...
public void bulkToElasticsearch(String fileName) thorws IOException {
String filePath = "서버파일경로" + fileName;
File file = new File(filePath);
BulkRequest bulkRequest = new BulkRequest();
int id = 0; // id를 함께 인덱스하기 위한 시작값 생성
int batch = 1000;
String line;
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));
while((line = br.readLine()) != null) {
// json 파일을 행 단위로 읽는 족족 IndexRequest에 담는다.
IndexRequest indexRequest = new IndexRequest("index", fileName, id + "");
indexRequest.source(line, XContentType.JSON);
bulkRequest.add(indexRequest);
if(id % batch == 0) { // 1000개가 넘을 경우 1000 단위로 인덱스한다.
BulkResponose bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
if(bulkResponse.hasFailures()) {
for(BulkItemResponse bulkItemResponse : bulkResponse) {
if(bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
System.out.println("Error : " + failure.toString());
}
}
}
System.out.println("Uploaded bulk id : " + id);
bulkRequest = new BulkRequest();
}
id++;
}
if(bulkRequest.numberOfActions() > 0) {
// 코드가 반복되므로 차후 리팩토링
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
if(bulkResponse.hasFailures()) {
for(BulkItemResponse bulkItemResponse : bulkResponse) {
if(bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
System.out.println("Error : " + failure.toString());
}
}
}
}
System.out.println("Total uploaded : " + id);
restHighLevelClient.close();
br.close();
}
...
}
8. 결과
- 다음과 같이 1938개의 도큐먼트를 1000개 단위로 엘라스틱서치에 업로드하는 것을 성공했습니다.
9. 기타
- BulkProcessor를 사용하면 자동으로 문서 단위로 벌크 업로드를 해주는 거 같습니다. 이건 다음에...;
- builder를 생성하고 builder 옵션을 통해 설정할 수 있는데 기본 값은 1000개의 도큐먼트 단위, 5MB의 크기,
1개의 동시 요청 등이 있습니다. 기본 옵션만으로도 BulkProcessor가 알아서 벌크 업로드를 하는 듯 합니다.
- BulkProcessor의 기능도 익히고 나면 이게 더 나은 선택이 될 수도 있겠습니다.
- 일단 사내 테스트 용으로 생성된 파일이 단위당 1200만건 정도니, 파일을 확보 후에
BulkRequest와 BulkProcessor 둘 다 테스트 후 사용하는 것이 좋을 듯 하네요.
이상으로 Elasticsearch 6.6.0 및 Bulk Api와 Spring Boot를 활용한 대량 데이터 업로드 예제를 마치겠습니다.
수고했어 김과장~