ElasticSearch大批量数据入库

论坛 期权论坛 脚本     
匿名网站用户   2020-12-20 07:40   2975   0

最近着手处理大批量数据的任务。

现状是这样的,一个数据采集程序承载大批量数据的存储和检索。后期可能需要对大批量数据进行统计。

数据分布情况

13个点定时生成采集结果到4个文件(小文件生成周期是5分钟)

名称                                                 大小(b)
gather_1_2014-02-27-14-50-0.txt                      568497
gather_1_2014-02-27-14-50-1.txt                      568665
gather_1_2014-02-27-14-50-2.txt                      568172
gather_1_2014-02-27-14-50-3.txt                      568275

同步使用shell脚本对四个文件入到sybase_iq库的一张表tab_tmp_2014_2_27中.

每天数据量大概是3亿条,所以小文件的总量大概是3G。小文件数量大,单表容量大执行复合主键查询,由原来2s延时变成了,5~10分钟。

针对上述情况需要对目前的储存结构进行优化。

才是看了下相关系统 catior使用的是环状数据库,存储相关的数据优点方便生成MRTG图,缺点不利于数据统计。后来引入elasticsearch来对大数据检索进行优化。

测试平台

cpu: AMD Opteron(tm) Processor 6136 64bit 2.4GHz   * 32
内存: 64G
硬盘:1.5T
操作系统:Red Hat Enterprise Linux Server release 6.4 (Santiago)

读取文件的目录结构:

[test@test001 data]$ ls
0  1  2  3

简单测试代码:

public class FileReader
{

 private File file;
 private String splitCharactor;
 private Map<String, Class<?>> colNames;
 private static final Logger LOG = Logger.getLogger(FileReader.class);

 /**
  * @param path
  *            文件路径
  * @param fileName
  *            文件名
  * @param splitCharactor
  *            拆分字符
  * @param colNames
  *            主键名称
  */
 public FileReader(File file, String splitCharactor, Map<String, Class<?>> colNames)
 {
  this.file = file;
  this.splitCharactor = splitCharactor;
  this.colNames = colNames;
 }

 /**
  * 读取文件
  * 
  * @return
  * @throws Exception
  */
 public List<Map<String, Object>> readFile() throws Exception
 {
  List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
  if (!file.isFile())
  {
   throw new Exception("File not exists." + file.getName());
  }
  LineIterator lineIterator = null;
  try
  {
   lineIterator = FileUtils.lineIterator(file, "UTF-8");
   while (lineIterator.hasNext())
   {
    String line = lineIterator.next();
    String[] values = line.split(splitCharactor);
    if (colNames.size() != values.length)
    {
     continue;
    }
    Map<String, Object> map = new HashMap<String, Object>();
    Iterator<Entry<String, Class<?>>> iterator = colNames.entrySet()
      .iterator();
    int count = 0;
    while (iterator.hasNext())
    {
     Entry<String, Class<?>> entry = iterator.next();
     Object value = values[count];
     if (!String.class.equals(entry.getValue()))
     {
      value = entry.getValue().getMethod("valueOf", String.class)
        .invoke(null, value);
     }
     map.put(entry.getKey(), value);
     count++;
    }
    list.add(map);
   }
  }
  catch (IOException e)
  {
   LOG.error("File reading line error." + e.toString(), e);
  }
  finally
  {
   LineIterator.closeQuietly(lineIterator);
  }
  return list;
 }
}

public class StreamIntoEs
{

 public static class ChildThread extends Thread
 {

  int number;

  public ChildThread(int number)
  {
   this.number = number;
  }

  @Override
  public void run()
  {
   Settings settings = ImmutableSettings.settingsBuilder()
     .put("client.transport.sniff", true)
     .put("client.transport.ping_timeout", 100)
     .put("cluster.name", "elasticsearch").build();
   TransportClient client = new TransportClient(settings)
     .addTransportAddress(new InetSocketTransportAddress("192.168.32.228",
       9300));
   File dir = new File("/export/home/es/data/" + number);
   LinkedHashMap<String, Class<?>> colNames = new LinkedHashMap<String, Class<?>>();
   colNames.put("aa", Long.class);
   colNames.put("bb", String.class);
   colNames.put("cc", String.class);
   colNames.put("dd", Integer.class);
   colNames.put("ee", Long.class);
   colNames.put("ff", Long.class);
   colNames.put("hh", Long.class);
   int count = 0;
   long startTime = System.currentTimeMillis();
   for (File file : dir.listFiles())
   {
    int currentCount = 0;
    long startCurrentTime = System.currentTimeMillis();
    FileReader reader = new FileReader(file, "\\$", colNames);
    BulkResponse resp = null;
    <strong>BulkRequestBuilder bulkRequest = client.prepareBulk();</strong>
    try
    {
     List<Map<String, Object>> results = reader.readFile();
     for (Map<String, Object> col : results)
     {
      bulkRequest.add(client.prepareIndex("flux", "fluxdata")
        .setSource(JSON.toJSONString(col)).setId(col.get("getway")+"##"+col.get("port_info")+"##"+col.get("device_id")+"##"+col.get("collecttime")));
      count++;
      currentCount++;
     }
     resp = bulkRequest.execute().actionGet();
    }
    catch (Exception e)
    {
     // TODO Auto-generated catch block
     e.printStackTrace();
    }
    long endCurrentTime = System.currentTimeMillis();
    System.out.println("[thread-" + number + "-]per count:" + currentCount);
    System.out.println("[thread-" + number + "-]per time:"
      + (endCurrentTime - startCurrentTime));
    System.out.println("[thread-" + number + "-]per count/s:"
      + (float) currentCount / (endCurrentTime - startCurrentTime)
      * 1000);
    System.out.println("[thread-" + number + "-]per count/s:"
      + resp.toString());
   }
   long endTime = System.currentTimeMillis();
   System.out.println("[thread-" + number + "-]total count:" + count);
   System.out.println("[thread-" + number + "-]total time:"
     + (endTime - startTime));
   System.out.println("[thread-" + number + "-]total count/s:" + (float) count
     / (endTime - startTime) * 1000);
   // IndexRequest request =
   // = client.index(request);
  }
 }

 public static void main(String args[])
 {
  for (int i = 0; i < 4; i++)
  {
   ChildThread childThread = new ChildThread(i);
   childThread.start();
  }
 }
}

起了4个线程来做入库,每个文件解析完成进行一次批处理。

初始化脚本:

curl -XDELETE 'http://192.168.32.228:9200/twitter/'

curl -XPUT 'http://192.168.32.228:9200/twitter/' -d '
{
     "index" :{
          "number_of_shards" : 5,
          "number_of_replicas ": 0,
          <strong>"index.refresh_interval": "-1",
         "index.translog.flush_threshold_ops": "100000"</strong>
     }
}'

curl -XPUT 'http://192.168.32.228:9200/twiter/twiterdata/_mapping' -d '
{
             "<span style="font-size: 1em; line-height: 1.5;">twiterdata</span><span style="font-size: 1em; line-height: 1.5;">": {</span>
                    "aa" : {"type" : "long", "index" : "not_analyzed"},
                    "bb" : {"type" : "String", "index" : "not_analyzed"},
                    "cc" : {"type" : "String", "index" : "not_analyzed"},
                    "dd" : {"type" : "integer", "index" : "not_analyzed"},
                    "ee" : {"type" : "long", "index" : "no"},
                    "ff" : {"type" : "long", "index" : "no"},
                    "gg" : {"type" : "long", "index" : "no"},
                    "hh" : {"type" : "long", "index" : "no"},
                    "ii" : {"type" : "long", "index" : "no"},
                    "jj" : {"type" : "long", "index" : "no"},
                    "kk" : {"type" : "long", "index" : "no"},
                }
}

执行效率参考:

不开启refresh_interval
[test@test001 bin]$ more StreamIntoEs.out|grep total
[thread-2-]total count:1199411
[thread-2-]total time:1223718
[thread-2-]total count/s:980.1368
[thread-1-]total count:1447214
[thread-1-]total time:1393528
[thread-1-]total count/s:1038.5253
[thread-0-]total count:1508043
[thread-0-]total time:1430167
[thread-0-]total count/s:1054.4524
[thread-3-]total count:1650576
[thread-3-]total time:1471103
[thread-3-]total count/s:1121.9989
4195.1134

开启refresh_interval
[test@test001 bin]$ more StreamIntoEs.out |grep total
[thread-2-]total count:1199411
[thread-2-]total time:996111
[thread-2-]total count/s:1204.0938
[thread-1-]total count:1447214
[thread-1-]total time:1163207
[thread-1-]total count/s:1244.1586
[thread-0-]total count:1508043
[thread-0-]total time:1202682
[thread-0-]total count/s:1253.9
[thread-3-]total count:1650576
[thread-3-]total time:1236239
[thread-3-]total count/s:1335.1593
5037.3117

开启refresh_interval  字段类型转换
[test@test001 bin]$ more StreamIntoEs.out |grep total
[thread-2-]total count:1199411
[thread-2-]total time:1065229
[thread-2-]total count/s:1125.9653
[thread-1-]total count:1447214
[thread-1-]total time:1218342
[thread-1-]total count/s:1187.8552
[thread-0-]total count:1508043
[thread-0-]total time:1230474
[thread-0-]total count/s:1225.5789
[thread-3-]total count:1650576
[thread-3-]total time:1274027
[thread-3-]total count/s:1295.5581
4834.9575

开启refresh_interval  字段类型转换 设置id
[thread-2-]total count:1199411
[thread-2-]total time:912251
[thread-2-]total count/s:1314.7817
[thread-1-]total count:1447214
[thread-1-]total time:1067117
[thread-1-]total count/s:1356.1906
[thread-0-]total count:1508043
[thread-0-]total time:1090577
[thread-0-]total count/s:1382.7937
[thread-3-]total count:1650576
[thread-3-]total time:1128490
[thread-3-]total count/s:1462.6412
5516.4072

580M的数据平均用时大概是20分钟。索引文件大约为1.76G

相关测试结果可以参考这里:

elasticsearch 性能测试

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:1136255
帖子:227251
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP