code

대량 API를 사용하여 Python을 사용하여 ES에 키워드를 저장하는 방법

codestyles 2021. 1. 9. 09:57
반응형

대량 API를 사용하여 Python을 사용하여 ES에 키워드를 저장하는 방법


ElasticSearch에 일부 메시지를 저장해야 python 프로그램과 통합됩니다. 이제 메시지를 저장하려는 것은 다음과 같습니다.

d={"message":"this is message"}
    for index_nr in range(1,5):
        ElasticSearchAPI.addToIndex(index_nr, d)
        print d

즉, 10 개의 메시지가 있으면 코드를 10 번 반복해야합니다. 그래서 제가하고 싶은 것은 스크립트 파일이나 배치 파일을 만드는 것입니다. ElasticSearch Guide를 확인했습니다 . BULK API를 사용할 수 있습니다. 형식은 다음과 같아야합니다.

{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "type1", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "type1", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
{ "doc" : {"field2" : "value2"} }

내가 한 것은 :

{"index":{"_index":"test1","_type":"message","_id":"1"}}
{"message":"it is red"}
{"index":{"_index":"test2","_type":"message","_id":"2"}}
{"message":"it is green"}

또한 curl 도구를 사용하여 문서를 저장합니다.

$ curl -s -XPOST localhost:9200/_bulk --data-binary @message.json

이제 Python 코드 를 사용 하여 Elastic Search에 파일을 저장하려고합니다.


from datetime import datetime

from elasticsearch import Elasticsearch
from elasticsearch import helpers

es = Elasticsearch()

actions = [
  {
    "_index": "tickets-index",
    "_type": "tickets",
    "_id": j,
    "_source": {
        "any":"data" + str(j),
        "timestamp": datetime.now()}
  }
  for j in range(0, 10)
]

helpers.bulk(es, actions)

@justinachen의 코드가 py-elasticsearch로 시작하는 데 도움이되었지만 소스 코드를 살펴본 후 간단한 개선 작업을 수행 할 수 있습니다.

es = Elasticsearch()
j = 0
actions = []
while (j <= 10):
    action = {
        "_index": "tickets-index",
        "_type": "tickets",
        "_id": j,
        "_source": {
            "any":"data" + str(j),
            "timestamp": datetime.now()
            }
        }
    actions.append(action)
    j += 1

helpers.bulk(es, actions)

helpers.bulk() already does the segmentation for you. And by segmentation I mean the chucks sent every time to the server. If you want to reduce the chunk of sent documents do: helpers.bulk(es, actions, chunk_size=100)

Some handy info to get started:

helpers.bulk() is just a wrapper of the helpers.streaming_bulk but the first accepts a list which makes it handy.

helpers.streaming_bulk has been based on Elasticsearch.bulk() so you do not need to worry about what to choose.

So in most cases, helpers.bulk() should be all you need.


(the other approaches mentioned in this thread use python list for the ES update, which is not a good solution today, especially when you need to add millions of data to ES)

Better approach is using python generators -- process gigs of data without going out of memory or compromising much on speed.

Below is an example snippet from a practical use case - adding data from nginx log file to ES for analysis.

def decode_nginx_log(_nginx_fd):
    for each_line in _nginx_fd:
        # Filter out the below from each log line
        remote_addr = ...
        timestamp   = ...
        ...

        # Index for elasticsearch. Typically timestamp.
        idx = ...

        es_fields_keys = ('remote_addr', 'timestamp', 'url', 'status')
        es_fields_vals = (remote_addr, timestamp, url, status)

        # We return a dict holding values from each line
        es_nginx_d = dict(zip(es_fields_keys, es_fields_vals))

        # Return the row on each iteration
        yield idx, es_nginx_d   # <- Note the usage of 'yield'

def es_add_bulk(nginx_file):
    # The nginx file can be gzip or just text. Open it appropriately.
    ...

    es = Elasticsearch(hosts = [{'host': 'localhost', 'port': 9200}])

    # NOTE the (...) round brackets. This is for a generator.
    k = ({
            "_index": "nginx",
            "_type" : "logs",
            "_id"   : idx,
            "_source": es_nginx_d,
         } for idx, es_nginx_d in decode_nginx_log(_nginx_fd))

    helpers.bulk(es, k)

# Now, just run it.
es_add_bulk('./nginx.1.log.gz')

This skeleton demonstrates the usage of generators. You can use this even on a bare machine if you need to. And you can go on expanding on this to tailor to your needs quickly.

Python Elasticsearch reference here.


There are two options which I can think of at the moment:

1. Define index name and document type with each entity:

es_client = Elasticsearch()

body = []
for entry in entries:
    body.append({'index': {'_index': index, '_type': 'doc', '_id': entry['id']}})
    body.append(entry)

response = es_client.bulk(body=body)

2. Provide the default index and document type with the method:

es_client = Elasticsearch()

body = []
for entry in entries:
    body.append({'index': {'_id': entry['id']}})
    body.append(entry)

response = es_client.bulk(index='my_index', doc_type='doc', body=body)

Works with:

ES version:6.4.0

ES python lib: 6.3.1

ReferenceURL : https://stackoverflow.com/questions/20288770/how-to-use-bulk-api-to-store-the-keywords-in-es-by-using-python

반응형