使用BulkProcessor批量插入ES

论坛 期权论坛 脚本     
匿名技术用户   2021-1-7 14:33   146   0

话不多少,贴上代码

类实现2个接口

 implements Flushable, Closeable

 
private BulkProcessor bulkProcessor;

 /**
     * 初始化静态配置
     */
    @PostConstruct
    public void init() {
        bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                log.info("{} : Push bulk data to es, size is {}", executionId, request.requests().size());
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                log.info("{} : {} data has been saved in elasticsearch. It tooks {} mills, {}", executionId, request.numberOfActions(), response.getTook().getMillis(), response.hasFailures());
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                log.info("{} : Failed...", executionId);
            }
        }).setBulkActions(2000).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.MB)).setFlushInterval(TimeValue.timeValueSeconds(5)).build();
    }

 @Override
    public void flush() {
        bulkProcessor.flush();
    }

/**
     * 关闭客户端,释放资源
     */
    @Override
    public void close() {
        bulkProcessor.flush();
        try {
            bulkProcessor.awaitClose(30, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error("", e);
        }
        client.close();
        log.info("elasticsearch  closed!");
    }



/**
     * 说明: 批量写入当日的索引
     */
    public void batchSaveGroupGuid(Long groupId, List<Map<String, Object>> list, String indexName) {
        list.forEach(o -> {
            Map<String, Object> entityMap = new HashMap();
            entityMap.put("group_id", groupId);
            String guid = MapUtils.getString(o, "guid");
            if (StringUtils.isNotBlank(guid) && null!=groupId) {
                entityMap.put("guid", guid);
                bulkProcessor.add(new IndexRequest(indexName, ES_TYPE, groupId + "-" + guid).source(entityMap));
            }
        });
        log.info("异步写入ES成功:{},{}", groupId, list.size());
    }




/**
     * 说明: 创建索引
     */
    public void createIndex(String indexName) {
        if (!elasticsearchTemplate.indexExists(indexName)) {
         
            elasticsearchTemplate.createIndex(indexName);
           
            String mappingJson = "{\"properties\":{\"group_id\":{\"type\":\"long\"},\"guid\":{\"type\":\"keyword\"}}}";
            elasticsearchTemplate.putMapping(indexName, ES_TYPE, mappingJson);
            log.info("字段映射成功:{}", mappingJson);
        } else {
            log.info("索引已存在:{}", indexName);
        }
    }

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:7942463
帖子:1588486
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP