如何使用大容量 API 存储关键字在 ES 使用 Python

我必须在 ElasticSearch 中存储一些与我的 python 程序集成的消息。 现在我试图存储的信息是:

d={"message":"this is message"}
for index_nr in range(1,5):
ElasticSearchAPI.addToIndex(index_nr, d)
print d

这意味着如果我有10条信息,那么我必须重复我的代码10次。 所以我想做的是尝试创建一个脚本文件或批处理文件。 我已经检查了 弹性搜寻指南,BULK API 是可以使用的。 格式大致如下:

{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "type1", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "type1", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
{ "doc" : {"field2" : "value2"} }

我所做的是:

{"index":{"_index":"test1","_type":"message","_id":"1"}}
{"message":"it is red"}
{"index":{"_index":"test2","_type":"message","_id":"2"}}
{"message":"it is green"}

我也使用卷曲工具来存储文档。

$ curl -s -XPOST localhost:9200/_bulk --data-binary @message.json

现在我想使用 我的 Python 代码来存储文件到弹性搜索。

103791 次浏览
from datetime import datetime


from elasticsearch import Elasticsearch
from elasticsearch import helpers


es = Elasticsearch()


actions = [
{
"_index": "tickets-index",
"_type": "tickets",
"_id": j,
"_source": {
"any":"data" + str(j),
"timestamp": datetime.now()}
}
for j in range(0, 10)
]


helpers.bulk(es, actions)

虽然@just inachen 的代码帮助我开始使用 py-elasticsearch,但是在查看了源代码之后,我做了一个简单的改进:

es = Elasticsearch()
j = 0
actions = []
while (j <= 10):
action = {
"_index": "tickets-index",
"_type": "tickets",
"_id": j,
"_source": {
"any":"data" + str(j),
"timestamp": datetime.now()
}
}
actions.append(action)
j += 1


helpers.bulk(es, actions)

helpers.bulk()已经为您做了细分。我所说的细分是指每次发送到服务器的卡盘。如果想减少已发送文档的数据块,可以这样做: helpers.bulk(es, actions, chunk_size=100)

一些便利的信息开始:

helpers.bulk()只是 helpers.streaming_bulk的一个包装器,但是第一个接受一个列表,这使得它很方便。

helpers.streaming_bulk已经基于 Elasticsearch.bulk(),所以你不需要担心选择什么。

所以在大多数情况下,Helpers.Bulk ()应该是您所需要的全部。

(本线程中提到的其他方法在 ES 更新中使用 python 列表,这不是一个好的解决方案,特别是当您需要向 ES 添加数百万个数据时)

更好的方法是使用 蟒蛇生成器-处理千兆字节的数据,而不会超出内存或在速度上有太大的损失

下面是一个来自实际用例的示例片段——将来自 nginx 日志文件的数据添加到 ES 进行分析。

def decode_nginx_log(_nginx_fd):
for each_line in _nginx_fd:
# Filter out the below from each log line
remote_addr = ...
timestamp   = ...
...


# Index for elasticsearch. Typically timestamp.
idx = ...


es_fields_keys = ('remote_addr', 'timestamp', 'url', 'status')
es_fields_vals = (remote_addr, timestamp, url, status)


# We return a dict holding values from each line
es_nginx_d = dict(zip(es_fields_keys, es_fields_vals))


# Return the row on each iteration
yield idx, es_nginx_d   # <- Note the usage of 'yield'


def es_add_bulk(nginx_file):
# The nginx file can be gzip or just text. Open it appropriately.
...


es = Elasticsearch(hosts = [{'host': 'localhost', 'port': 9200}])


# NOTE the (...) round brackets. This is for a generator.
k = ({
"_index": "nginx",
"_type" : "logs",
"_id"   : idx,
"_source": es_nginx_d,
} for idx, es_nginx_d in decode_nginx_log(_nginx_fd))


helpers.bulk(es, k)


# Now, just run it.
es_add_bulk('./nginx.1.log.gz')

这个框架演示了生成器的使用。如果需要,您甚至可以在一台裸机上使用它。你可以继续扩展这一点,以迅速适应你的需要。

Python Elasticsearch 引用 给你

目前我能想到两个选择:

1. 用每个实体定义索引名称和文档类型:

es_client = Elasticsearch()


body = []
for entry in entries:
body.append({'index': {'_index': index, '_type': 'doc', '_id': entry['id']}})
body.append(entry)


response = es_client.bulk(body=body)

2. 提供默认索引和文档类型的方法:

es_client = Elasticsearch()


body = []
for entry in entries:
body.append({'index': {'_id': entry['id']}})
body.append(entry)


response = es_client.bulk(index='my_index', doc_type='doc', body=body)

合作伙伴:

ES 版本: 6.4.0

ES python lib: 6.3.1

我的工作代码

from elasticsearch import Elasticsearch
from elasticsearch import helpers
from elasticsearch_dsl import connections
import pandas as pd




# initialize list of lists
data = [['tom', 10, 'NY'], ['nick', 15, 'NY'], ['juli', 14, 'NY'], ['akshay', 30, 'IND'], ['Amit', 14, 'IND']]


# Create the pandas DataFrame
df = pd.DataFrame(data, columns = ['Name', 'Age', 'Country'])


from elasticsearch import Elasticsearch
from elasticsearch import helpers


es_client = connections.create_connection(hosts=['http://localhost:9200/'])
def doc_generator(df):
df_iter = df.iterrows()
for index, document in df_iter:
yield {
"_index": 'age_sample',
"_type": "_doc",
"_source": document,
}


helpers.bulk(es_client, doc_generator(df))


#get data from elastic search
from elasticsearch_dsl import Search
s = Search(index="age_sample").query("match", Name='nick')