엘라스틱서치2022. 10. 25. 17:25

# Deprecated된 RestHighLevelClient를 대체하기 위해 ElasticsearchClient를 활용.

 

public static CountResponse getCountResponse(ElasticsearchClient client, Query query) {
  CountRequest countRequest = new CountRequest.Builder().query(query).build();
  try {
    return client.count(countRequest);
  } catch (IOException e) {
    e.printStackTrace();
  }
  return null;
}

 

※ 매개변수로 넘어오는 Query 객체는 MatchQuery, RangeQuery, BoolQuery 등.

 

 

Posted by 홍규홍규
엘라스틱서치2022. 9. 13. 17:26

# Deprecated된 RestHighLevelClient를 대체하기 위해 ElasticsearchClient를 활용.

 

 ExistsRequest existsRequest = new ExistsRequest.Builder().index(index).build();
  try {
    BooleanResponse response = client.indices().exists(existsRequest);
    return response.value();
  } catch (IOException e) {
    return false;
  }
}

 

※ 참고. 매개변수는 다음과 같이 List 또는 String 타입

public final Builder index(List<String> list) {
  this.index = _listAddAll(this.index, list);
  return this;
}

public final Builder index(String value, String... values) {
  this.index = _listAdd(this.index, value, values);
  return this;
}
Posted by 홍규홍규
엘라스틱서치2022. 9. 13. 17:20

# Deprecated된 RestHighLevelClient를 대체하기 위해 ElasticsearchClient를 활용.

 

1. 버전 확인. (버전별 호환은 아래 사이트에서 확인할 수 있다.)

(https://docs.spring.io/spring-data/elasticsearch/docs/current/reference/html/#core.extensions)

 

2. 스프링부트 2.7.3 기준

  implementation 'org.springframework.boot:spring-boot-starter-data-elasticsearch'
  // 빈 생성 에러가 발생할 경우 아래도 추가해준다.
  implementation 'jakarta.json:jakarta.json-api:2.1.1'

 

3. Configuration 클래스에 Bean 추가

@Bean
public ElasticsearchClient esClient() {
  RestClient restClient = RestClient.builder(new HttpHost("url", 9200)).build();
  ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
  return new ElasticsearchClient(transport);
}

 

Posted by 홍규홍규
엘라스틱서치2019. 2. 26. 11:09

- Elasticsearch에서 제공하는 Type이 deprecated 되었습니다. 그것도 모르고 여태 써왔다니!!

  보통 RDB의 Table과 매칭시켜서 설명을 많이 했는데 deprecated된 김에 자세히 좀 들여다보았습니다.



6.0 이전까지는 하나의 Index에 복수의 Type을 지정해서 사용하는 것이 가능했습니다. 

마치 RDB의 Table처럼. 그리고 그 안에는 또 Document들이 저장되었습니다.


하지만 엄격히 말하면 Elasticsearch의 Type은 RDB의 Table과는 다릅니다.

Table의 경우 A라는 Table에 있는 name과 B라는 Table에 있는 name은 독립적이고 서로에게 아무런 

영향도 끼치지 않지만 Type의 경우엔 그렇지 않습니다.


A라는 Type의 name Field와 B라는 Type의 name Field는 내부적으로는 같은 루씬 Field를 사용합니다.

즉 두 Type의 name Field 모두 동일한 루씬 Field에 저장되며 두 Type 모두에서 동일한 mapping을 가집니다.

따라서 다른 Type의 동일한 Field는 서로 관련이 있게 되고, 이것이 내부적으로 몇 가지 문제를

야기할 가능성이 있다고 합니다.


그래서 6.0에서 deprecated되고 7.0부터는 아예 삭제가 되어 Type과 관련된 기능은 사용할 수 없게 됩니다.


중요한 것은 instead of, 즉 대안이 없습니다. 그냥 사용하면 안 됩니다.

7.0부터는 모든 Index는 단 하나의 Type(_doc)만을 가지게 됩니다.


따라서 Type으로 나누던 데이터 저장 방법을 Index로 나누는 걸로 변경해야 합니다.


혹시 더 좋은 방법 있으면 공유 좀...


- 관련 레퍼런스

https://www.elastic.co/guide/en/elasticsearch/reference/master/removal-of-types.html


- deprecated list

https://artifacts.elastic.co/javadoc/org/elasticsearch/client/elasticsearch-rest-high-level-client/7.0.0-alpha2/deprecated-list.html

Posted by 홍규홍규
엘라스틱서치2019. 2. 25. 13:39

1. 설치 환경

 - CentOS Linux 7 (release 7.3.1611)


2. Elasticsearch 다운로드 및 설치

 - Elasticsearch는 root계정으로는 실행할 수 없다. 따라서 계정을 추가하고 해당 계정으로 설치하고 실행해야 한다.

 - https://www.elastic.co/kr/downloads/elasticsearch 사이트 접속 후 MACOS/LINUX 의 링크 주소를 복사한다. 

   (현재 버전 6.6.1)

 - 리눅스 서버에 로그인하고 Elasticsearch를 다운받는다.

# wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.6.0.tar.gz

 - root가 아닌 계정으로는 rpm을 통해 설치할 수 없으므로 위처럼 wget 명령어를 사용한다.

 - 다운로드 후에는 다음 명령어를 통해 압축을 푼다.

# tar xvzf elasticsearch-6.6.0.tar.gz

압축이 풀리고 나면 실행할 수가 있는데, 로컬이 아닌 외부에서 서버로 접속할 수 있도록 ip와 port를 열어줘야 한다.


3. 방화벽 해제 후 ip와 port open

# firewall-cmd --permanent --zone=public --add-port=9200/tcp
# firewall-cmd --reload
# firewall-cmd --list-ports

 - Elasticsearch의 config폴더로 이동 후에 elasticsearch.yml을 편집합니다.

# cd elasticsearch-6.6.0/config/
# vi elasticsearch.yml

 - i를 눌러 편집모드로 전환한 후에 http.port 부분을 찾아서 주석을 해제하고 다음과 같이 설정한다.


 

 - http.host 에는 해당 리눅스의 서버 ip를 작성하면 된다.

 - 수정이 완료되었으면 Esc를 눌러 편집모드에서 빠져나온 후 :wq 를 입력해 저장하고 나온다.


4. Elasticsearch 실행

 - bin 폴더로 이동한 후 ./elasticsearch 를 입력하면 로그가 뜨면서 정상적으로 실행이 된다.


5. 테스트

 - 다음 화면과 같이 정상적으로 접속됨을 확인할 수 있다.

 

 




Posted by 홍규홍규
엘라스틱서치2019. 2. 13. 17:44

해당 포스트는 다음 링크의 포스트를 참고하여 작성한 것입니다.

[바로가기]


이 포스트에서는 Bulk Api와 High-Level-Rest-Client를 활용하여 Elasticsearch에 대량의 데이터를 

업로드하는 방법을 소개합니다. 


※ Elasticsearch는 5.6.0 버전에서 Rest Client를 Low-Level과 High-Level 두 가지로 분리시켰습니다.

   Transport Client는 7.0에서 deprecated되었으며 8.0에서는 삭제시킬 예정이라고 합니다.

   이 글에서는 Low-Level-Rest-Client와 Hight-Level-Rest-Client의 차이점에 대한 내용은 생략합니다.


데이터는 JSON 파일이고 Elasticsearch에 업로드되는 과정은 다음과 같습니다.


사내 서버에서 1시간 마다 로그 파일 생성 -> 스프링 스케쥴러를 이용해 매 시간마다 해당 로그 파일을 로드

-> 해당 로그파일을 파싱하지 않고 바로 Bulk Api를 사용해 Elasticsearch에 PUT



0. 시스템 환경

- Spring Boot 2.1.2 RELEASE

- Spring webflux

- JAVA 8

- Elasticsearch 6.6.0 


1. JSON 파일 준비

- Bulk API를 사용하기 위해서는 줄바꿈 형식의 데이터셋이 필요합니다.

  즉, 마지막 행을 비롯하여 모든 행의 끝에는 개행문자가 있어야 합니다.(\n)

  예를 들면 다음과 같습니다.

{ "data" : "1" }
{ "data" : "2" }
...


2. BulkProcessor vs BulkRequestBuilder

- 이 포스트에서는 BulkRequest를 사용합니다. 원글에서는 두 가지 모두를 사용한 예제가 있습니다.

- 둘의 차이점은 다음 링크에서 확인할 수 있습니다.

  [바로가기]


3. application.properties

elasticsearch.host=127.0.0.1
elasticsearch.port=9200


4. Elasticsearch Config 파일 작성

 - RestHighLevelClient를 Bean으로 등록해 필요할 때마다 꺼내쓸 수 있도록 합니다.

@Configuration
public class ElasticSearchConfig {
  @Value("${elasticsearch.host}")
  private String host;
  
  @Value("${elasticsearch.port}")
  private int port;

  @Bean
  public RestHighLevelClient getRestClient() {
    RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, "http"));
    return new RestHighLevelClient(builder);
  }
}


5. Shceduler 작성

 - 스케쥴러를 사용하기 위해서는 별도의 Bean을 등록해야 하므로 프로젝트 Application에 추가합니다.

 - @EnableScheduling 어노테이션을 명시하지 않을 경우 어플리케이션은 시작과 동시에 종료됩니다.

 - 예제는 단일쓰레드이며 쓰레드 풀이 필요하다면 ThreadPoolTaskScheduler를 사용하면 됩니다.

@SpringBootApplication
@EnableScheduling
public class ExampleApplication {

  public static void main(String[] args) {
    SpringApplication.run(ExampleApplication.class, args);
  }

  // 추가된 부분
  @Bean
  public TaskScheduler taskScheduler() {
    return new ConcurrentTaskScheduler();
  }
}


6. Sheduler 작성

@Component
public class ExampleScheduler {
  @Autowired ExampleService exampleService;

  @Scheduled(cron="15 0 * * * *")
  public void getJSONFileFromServer() throws IOException {
    // 파일은 "년-월-일-시"의 형식으로 생성
    Date date = new Date();
    SimpleDateFormat format = new SimpleDateFormat("YYYY-MM-dd-HH"); 
    requestIndexToElasticFromJsonFile(format.format(date));
  }

  private void requestIndexToElasticFromJsonFile(String fileName) {
    File file = new File("서버파일경로" + fileName);
    if(!file.isFile()) { // 파일이 아직 생성되지 않았다면
      fileCheckWithTimer(fileName);
      return;
    }
    exampleService.bulkToElasticsearch(fileName);
  }

  private void fileCheckWithTimer(String fileName) {
    File file = new File("서버파일경로" + fileName);
 
    Timer timer = new Timer();
    TimerTask timerTask = new TimerTask() {
      int count = 0;
      @Override
      public void run() {
        if(file.isFile()) {
          timer.cancel();
          try {
            exampleService.bulkToElasticsearch(fileName);
          } catch(IOException e) { 
            e.printStackTrace();
          }
        } else if(!file.isFile() && count == 58) {
          // 1시간이 지나도록 파일이 생성되지 않으면 다음 파일 작업을 위해 중단
          timer.cancel();
        } else
          System.out.println("File " + fileName + " is searching");
        count++;
      }
    };
    timer.schedule(timerTask, 0, 60000); // 1분마다 
  }
}


7. ServiceImpl 작성

@Component
public class ExampleServiceImpl implements ExampleService {
  @Autowired RestHighLevelClient restHighLevelClient;
  
  ...

  public void bulkToElasticsearch(String fileName) thorws IOException {
    String filePath = "서버파일경로" + fileName;
    File file = new File(filePath);
    BulkRequest bulkRequest = new BulkRequest();

    int id = 0; // id를 함께 인덱스하기 위한 시작값 생성
    int batch = 1000; 
    String line;

    BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));

    while((line = br.readLine()) != null) {
      // json 파일을 행 단위로 읽는 족족 IndexRequest에 담는다.
      IndexRequest indexRequest = new IndexRequest("index", fileName, id + "");
      indexRequest.source(line, XContentType.JSON);
      bulkRequest.add(indexRequest);

      if(id % batch == 0) { // 1000개가 넘을 경우 1000 단위로 인덱스한다.
        BulkResponose bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        if(bulkResponse.hasFailures()) {
          for(BulkItemResponse bulkItemResponse : bulkResponse) {
            if(bulkItemResponse.isFailed()) {
              BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
              System.out.println("Error : " + failure.toString());
            }
          }
        }
        System.out.println("Uploaded bulk id : " + id);
        bulkRequest = new BulkRequest();
      }
      id++;
    }

    if(bulkRequest.numberOfActions() > 0) {
      // 코드가 반복되므로 차후 리팩토링
      BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
      if(bulkResponse.hasFailures()) {
        for(BulkItemResponse bulkItemResponse : bulkResponse) {
          if(bulkItemResponse.isFailed()) {
            BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
            System.out.println("Error : " + failure.toString());
          }
        }
      }
    }
    System.out.println("Total uploaded : " + id);
    restHighLevelClient.close();
    br.close();
  }

  ...

}


8. 결과

 - 다음과 같이 1938개의 도큐먼트를 1000개 단위로 엘라스틱서치에 업로드하는 것을 성공했습니다.




9. 기타

 - BulkProcessor를 사용하면 자동으로 문서 단위로 벌크 업로드를 해주는 거 같습니다. 이건 다음에...;

 - builder를 생성하고 builder 옵션을 통해 설정할 수 있는데 기본 값은 1000개의 도큐먼트 단위, 5MB의 크기, 

   1개의 동시 요청 등이 있습니다. 기본 옵션만으로도 BulkProcessor가 알아서 벌크 업로드를 하는 듯 합니다.

 - BulkProcessor의 기능도 익히고 나면 이게 더 나은 선택이 될 수도 있겠습니다. 

 - 일단 사내 테스트 용으로 생성된 파일이 단위당 1200만건 정도니, 파일을 확보 후에 

   BulkRequest와 BulkProcessor 둘 다 테스트 후 사용하는 것이 좋을 듯 하네요.



이상으로 Elasticsearch 6.6.0 및 Bulk Api와 Spring Boot를 활용한 대량 데이터 업로드 예제를 마치겠습니다.


수고했어 김과장~


Posted by 홍규홍규