使用Python生成器封装Elasticsearch查询结果

目录

Elasticsearch默认查询返回10个结果,可以通过设置sizefrom获取更多的查询结果。 如果size设置过大,会显著增加查询执行时间。所以推荐通过多次查询获取全部符合条件的文档。

本文介绍如何将Elasticsearch的查询结果封装到Python生成器中,返回所有的查询结果。

查询

本文使用elasticsearch-py库访问ElasticSearch库。

创建一个客户端。

client = Elasticsearch(hosts=["localhost:9200"])

构建查询体,从第一条记录开始返回20条记录。

search_body = {
  "size": 20,
  "from": 0,
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "data.system": "grapes_meso_3km"
          }
        },
        {
          "match": {
            "data.start_time": "2020-03-17T00:00:00"
          }
        }
      ]
    }
  }
}

执行查询操作,将查询结果解析为message对象,放到数组中。

res = client.search(index="2020-03", body=search_body)
messages = []
for hit in res['hits']['hits']:
    messages.append(load_message(hit["_source"]))

返回所有数据

res['hits']['total']['value']是符合条件的文档数量,获取所有数据可以递增search_bodyfrom值,重复执行查询操作。

import numpy as np

messages = []
search_from = 0
total = np.iinfo(np.int16).max

while search_from < total:
    search_body["from"] = search_from
    res = client.search(index="2020-03", body=search_body)
    for hit in res['hits']['hits']:
        messages.append(load_message(hit["_source"]))

    total = res['hits']['total']['value']
    search_from += len(res['hits']['hits'])

封装生成器

如果需要获取任意数量的数据,可以将查询结果封装到生成器中。

首先将查询过程封装为一个函数。

def _get_result(index: str, query_body: dict, search_from: int, search_size: int):
    search_body = {
        "size": search_size,
        "from": search_from,
    }
    search_body.update(**query_body)
    print(search_body)
    res = self.client.search(index=index, body=search_body)
    return res

然后修改将上一节的代码封装到返回生成器的函数中,使用yield逐个返回查询结果。 只有在需要的文档尚未获取的时候,函数才会执行查询操作。

def get_production_messages(
        system: str,
        start_time: datetime.datetime = None,
        size: int = 20,
) -> typing.Iterable[ProductionEventMessage]:
    conditions = [{
        "match": {"data.system": system}
    }]
    if start_time is not None:
        conditions.append({"match": {"data.start_time": start_time.isoformat()}})

    query_body = {
        "query": {
            "bool": {
                "must": conditions
            },
        },
    }

    search_from = 0
    total = np.iinfo(np.int16).max

    while search_from < total:
        res = _get_result(
            index="2020-03",
            query_body=query_body,
            search_from=search_from,
            search_size=size,
        )

        total = res['hits']['total']['value']
        logger.info(f"total: {total}")
        search_from += len(res['hits']['hits'])
        logger.info("result count: {}", len(res["hits"]["hits"]))
        for hit in res['hits']['hits']:
            yield load_message(hit["_source"])

遍历函数的返回结果,可以获取符合条件的所有文档。 下面的代码将返回的message对象放入pandas.DataFrame对象中并排序。

import pandas as pd

results = get_production_messages(
    system=system,
    start_time=start_time
)

df = pd.DataFrame(columns=["time"])
for result in results:
    hours = get_hour(result)
    message_time = result.time.ceil("S")
    current_df = pd.DataFrame({"time": [message_time]}, columns=["time"], index=[hours])
    df = df.append(current_df)

logger.info("Get {} results", len(df))
df = df.sort_index()
print(df)
print(f"Latest time: {df.time.max()}")

参考

本文的示例代码来自nwpc-oper/nwpc-message-tool项目。