How To Use Elasticsearch.helpers.streaming_bulk
Can someone advice how to use function elasticsearch.helpers.streaming_bulk instead elasticsearch.helpers.bulk for indexing data into elasticsearch. If I simply change streaming_b
Solution 1:
So streaming bulk returns an interator. Which means nothing will happen until you start iterating over it. The code for the 'bulk' function looks like this:
success, failed = 0, 0
# list of errors to be collected is not stats_only
errors = []
for ok, item in streaming_bulk(client, actions, **kwargs):
# go through request-reponse pairs and detect failures
if not ok:
if not stats_only:
errors.append(item)
failed += 1
else:
success += 1
return success, failed if stats_only else errors
So basically calling just streaming_bulk(client, actions, **kwargs) won't actually do anything. It's not until you iterate over it as is done in this for loop that the indexing actually starts to happen.
So in your code. You are welcome to change 'bulk' to 'streaming_bulk' however you need to iterate over the results of streaming bulk in order actually have anything indexed.
Solution 2:
streaming_bulk
consumes an iterator of actions
and yields a response for every action.
So you would first need to write a simple iterator over your documents like this:
def document_stream(file_to_index):
with open(file_to_index, "rb") as csvfile:
for row in csv.reader(csvfile):
yield {"_index": index_name,
"_type": type_name,
"_source": transform_row(row)
}
And then to do the streaming bulk insert
stream = document_stream(file_to_index)
for ok, response in streaming_bulk(es, actions = stream):
if not ok:
# failure inserting
print response
Post a Comment for "How To Use Elasticsearch.helpers.streaming_bulk"