使用Python生成器封装ElasticSearch查询结果
目录
Elasticsearch 默认查询返回 10 个结果,可以通过设置 size
和 from
获取更多的查询结果。
如果 size
设置过大,会显著增加查询执行时间。所以推荐通过多次查询获取全部符合条件的文档。
本文介绍如何将 Elasticsearch 的查询结果封装到 Python 生成器中,返回所有的查询结果。
查询
本文使用 elasticsearch-py 库访问 Elasticsearch 库。
创建一个客户端。
client = Elasticsearch(hosts=["localhost:9200"])
构建查询体,从第一条记录开始返回20条记录。
search_body = {
"size": 20,
"from": 0,
"query": {
"bool": {
"must": [
{
"match": {
"data.system": "grapes_meso_3km"
}
},
{
"match": {
"data.start_time": "2020-03-17T00:00:00"
}
}
]
}
}
}
执行查询操作,将查询结果解析为 message
对象,放到数组中。
res = client.search(index="2020-03", body=search_body)
messages = []
for hit in res['hits']['hits']:
messages.append(load_message(hit["_source"]))
返回所有数据
res['hits']['total']['value']
是符合条件的文档数量,获取所有数据可以递增 search_body
的 from
值,重复执行查询操作。
import numpy as np
messages = []
search_from = 0
total = np.iinfo(np.int16).max
while search_from < total:
search_body["from"] = search_from
res = client.search(index="2020-03", body=search_body)
for hit in res['hits']['hits']:
messages.append(load_message(hit["_source"]))
total = res['hits']['total']['value']
search_from += len(res['hits']['hits'])
封装生成器
如果需要获取任意数量的数据,可以将查询结果封装到生成器中。
首先将查询过程封装为一个函数。
def _get_result(index: str, query_body: dict, search_from: int, search_size: int):
search_body = {
"size": search_size,
"from": search_from,
}
search_body.update(**query_body)
print(search_body)
res = self.client.search(index=index, body=search_body)
return res
然后修改将上一节的代码封装到返回生成器的函数中,使用 yield
逐个返回查询结果。
只有在需要的文档尚未获取的时候,函数才会执行查询操作。
def get_production_messages(
system: str,
start_time: datetime.datetime = None,
size: int = 20,
) -> typing.Iterable[ProductionEventMessage]:
conditions = [{
"match": {"data.system": system}
}]
if start_time is not None:
conditions.append({"match": {"data.start_time": start_time.isoformat()}})
query_body = {
"query": {
"bool": {
"must": conditions
},
},
}
search_from = 0
total = np.iinfo(np.int16).max
while search_from < total:
res = _get_result(
index="2020-03",
query_body=query_body,
search_from=search_from,
search_size=size,
)
total = res['hits']['total']['value']
logger.info(f"total: {total}")
search_from += len(res['hits']['hits'])
logger.info("result count: {}", len(res["hits"]["hits"]))
for hit in res['hits']['hits']:
yield load_message(hit["_source"])
遍历函数的返回结果,可以获取符合条件的所有文档。
下面的代码将返回的 message 对象放入 pandas.DataFrame
对象中并排序。
import pandas as pd
results = get_production_messages(
system=system,
start_time=start_time
)
df = pd.DataFrame(columns=["time"])
for result in results:
hours = get_hour(result)
message_time = result.time.ceil("S")
current_df = pd.DataFrame({"time": [message_time]}, columns=["time"], index=[hours])
df = df.append(current_df)
logger.info("Get {} results", len(df))
df = df.sort_index()
print(df)
print(f"Latest time: {df.time.max()}")
参考
本文的示例代码来自 nwpc-oper/nwpc-message-tool 项目。