You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
355 lines
14 KiB
355 lines
14 KiB
from __future__ import unicode_literals
|
|
|
|
import logging
|
|
from operator import methodcaller
|
|
|
|
from ..exceptions import ElasticsearchException, TransportError
|
|
from ..compat import map, string_types
|
|
|
|
logger = logging.getLogger('elasticsearch.helpers')
|
|
|
|
class BulkIndexError(ElasticsearchException):
|
|
@property
|
|
def errors(self):
|
|
""" List of errors from execution of the last chunk. """
|
|
return self.args[1]
|
|
|
|
|
|
class ScanError(ElasticsearchException):
|
|
pass
|
|
|
|
def expand_action(data):
|
|
"""
|
|
From one document or action definition passed in by the user extract the
|
|
action/data lines needed for elasticsearch's
|
|
:meth:`~elasticsearch.Elasticsearch.bulk` api.
|
|
"""
|
|
# when given a string, assume user wants to index raw json
|
|
if isinstance(data, string_types):
|
|
return '{"index":{}}', data
|
|
|
|
# make sure we don't alter the action
|
|
data = data.copy()
|
|
op_type = data.pop('_op_type', 'index')
|
|
action = {op_type: {}}
|
|
for key in ('_index', '_parent', '_percolate', '_routing', '_timestamp',
|
|
'_ttl', '_type', '_version', '_version_type', '_id', '_retry_on_conflict'):
|
|
if key in data:
|
|
action[op_type][key] = data.pop(key)
|
|
|
|
# no data payload for delete
|
|
if op_type == 'delete':
|
|
return action, None
|
|
|
|
return action, data.get('_source', data)
|
|
|
|
def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer):
|
|
"""
|
|
Split actions into chunks by number or size, serialize them into strings in
|
|
the process.
|
|
"""
|
|
bulk_actions = []
|
|
size, action_count = 0, 0
|
|
for action, data in actions:
|
|
action = serializer.dumps(action)
|
|
cur_size = len(action) + 1
|
|
|
|
if data is not None:
|
|
data = serializer.dumps(data)
|
|
cur_size += len(data) + 1
|
|
|
|
# full chunk, send it and start a new one
|
|
if bulk_actions and (size + cur_size > max_chunk_bytes or action_count == chunk_size):
|
|
yield bulk_actions
|
|
bulk_actions = []
|
|
size, action_count = 0, 0
|
|
|
|
bulk_actions.append(action)
|
|
if data is not None:
|
|
bulk_actions.append(data)
|
|
size += cur_size
|
|
action_count += 1
|
|
|
|
if bulk_actions:
|
|
yield bulk_actions
|
|
|
|
def _process_bulk_chunk(client, bulk_actions, raise_on_exception=True, raise_on_error=True, **kwargs):
|
|
"""
|
|
Send a bulk request to elasticsearch and process the output.
|
|
"""
|
|
# if raise on error is set, we need to collect errors per chunk before raising them
|
|
errors = []
|
|
|
|
try:
|
|
# send the actual request
|
|
resp = client.bulk('\n'.join(bulk_actions) + '\n', **kwargs)
|
|
except TransportError as e:
|
|
# default behavior - just propagate exception
|
|
if raise_on_exception:
|
|
raise e
|
|
|
|
# if we are not propagating, mark all actions in current chunk as failed
|
|
err_message = str(e)
|
|
exc_errors = []
|
|
|
|
# deserialize the data back, thisis expensive but only run on
|
|
# errors if raise_on_exception is false, so shouldn't be a real
|
|
# issue
|
|
bulk_data = map(client.transport.serializer.loads, bulk_actions)
|
|
while True:
|
|
try:
|
|
# collect all the information about failed actions
|
|
action = next(bulk_data)
|
|
op_type, action = action.popitem()
|
|
info = {"error": err_message, "status": e.status_code, "exception": e}
|
|
if op_type != 'delete':
|
|
info['data'] = next(bulk_data)
|
|
info.update(action)
|
|
exc_errors.append({op_type: info})
|
|
except StopIteration:
|
|
break
|
|
|
|
# emulate standard behavior for failed actions
|
|
if raise_on_error:
|
|
raise BulkIndexError('%i document(s) failed to index.' % len(exc_errors), exc_errors)
|
|
else:
|
|
for err in exc_errors:
|
|
yield False, err
|
|
return
|
|
|
|
# go through request-reponse pairs and detect failures
|
|
for op_type, item in map(methodcaller('popitem'), resp['items']):
|
|
ok = 200 <= item.get('status', 500) < 300
|
|
if not ok and raise_on_error:
|
|
errors.append({op_type: item})
|
|
|
|
if ok or not errors:
|
|
# if we are not just recording all errors to be able to raise
|
|
# them all at once, yield items individually
|
|
yield ok, {op_type: item}
|
|
|
|
if errors:
|
|
raise BulkIndexError('%i document(s) failed to index.' % len(errors), errors)
|
|
|
|
def streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=100 * 1024 * 1024,
|
|
raise_on_error=True, expand_action_callback=expand_action,
|
|
raise_on_exception=True, **kwargs):
|
|
"""
|
|
Streaming bulk consumes actions from the iterable passed in and yields
|
|
results per action. For non-streaming usecases use
|
|
:func:`~elasticsearch.helpers.bulk` which is a wrapper around streaming
|
|
bulk that returns summary information about the bulk operation once the
|
|
entire input is consumed and sent.
|
|
|
|
|
|
:arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
|
|
:arg actions: iterable containing the actions to be executed
|
|
:arg chunk_size: number of docs in one chunk sent to es (default: 500)
|
|
:arg max_chunk_bytes: the maximum size of the request in bytes (default: 100MB)
|
|
:arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`)
|
|
from the execution of the last chunk when some occur. By default we raise.
|
|
:arg raise_on_exception: if ``False`` then don't propagate exceptions from
|
|
call to ``bulk`` and just report the items that failed as failed.
|
|
:arg expand_action_callback: callback executed on each action passed in,
|
|
should return a tuple containing the action line and the data line
|
|
(`None` if data line should be omitted).
|
|
"""
|
|
actions = map(expand_action_callback, actions)
|
|
|
|
for bulk_actions in _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer):
|
|
for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs):
|
|
yield result
|
|
|
|
def bulk(client, actions, stats_only=False, **kwargs):
|
|
"""
|
|
Helper for the :meth:`~elasticsearch.Elasticsearch.bulk` api that provides
|
|
a more human friendly interface - it consumes an iterator of actions and
|
|
sends them to elasticsearch in chunks. It returns a tuple with summary
|
|
information - number of successfully executed actions and either list of
|
|
errors or number of errors if `stats_only` is set to `True`.
|
|
|
|
See :func:`~elasticsearch.helpers.streaming_bulk` for more accepted
|
|
parameters
|
|
|
|
:arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
|
|
:arg actions: iterator containing the actions
|
|
:arg stats_only: if `True` only report number of successful/failed
|
|
operations instead of just number of successful and a list of error responses
|
|
|
|
Any additional keyword arguments will be passed to
|
|
:func:`~elasticsearch.helpers.streaming_bulk` which is used to execute
|
|
the operation.
|
|
"""
|
|
success, failed = 0, 0
|
|
|
|
# list of errors to be collected is not stats_only
|
|
errors = []
|
|
|
|
for ok, item in streaming_bulk(client, actions, **kwargs):
|
|
# go through request-reponse pairs and detect failures
|
|
if not ok:
|
|
if not stats_only:
|
|
errors.append(item)
|
|
failed += 1
|
|
else:
|
|
success += 1
|
|
|
|
return success, failed if stats_only else errors
|
|
|
|
def parallel_bulk(client, actions, thread_count=4, chunk_size=500,
|
|
max_chunk_bytes=100 * 1024 * 1024,
|
|
expand_action_callback=expand_action, **kwargs):
|
|
"""
|
|
Parallel version of the bulk helper run in multiple threads at once.
|
|
|
|
:arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
|
|
:arg actions: iterator containing the actions
|
|
:arg thread_count: size of the threadpool to use for the bulk requests
|
|
:arg chunk_size: number of docs in one chunk sent to es (default: 500)
|
|
:arg max_chunk_bytes: the maximum size of the request in bytes (default: 100MB)
|
|
:arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`)
|
|
from the execution of the last chunk when some occur. By default we raise.
|
|
:arg raise_on_exception: if ``False`` then don't propagate exceptions from
|
|
call to ``bulk`` and just report the items that failed as failed.
|
|
:arg expand_action_callback: callback executed on each action passed in,
|
|
should return a tuple containing the action line and the data line
|
|
(`None` if data line should be omitted).
|
|
"""
|
|
# Avoid importing multiprocessing unless parallel_bulk is used
|
|
# to avoid exceptions on restricted environments like App Engine
|
|
from multiprocessing.dummy import Pool
|
|
actions = map(expand_action_callback, actions)
|
|
|
|
pool = Pool(thread_count)
|
|
|
|
try:
|
|
for result in pool.imap(
|
|
lambda chunk: list(_process_bulk_chunk(client, chunk, **kwargs)),
|
|
_chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer)
|
|
):
|
|
for item in result:
|
|
yield item
|
|
|
|
finally:
|
|
pool.close()
|
|
pool.join()
|
|
|
|
def scan(client, query=None, scroll='5m', raise_on_error=True, preserve_order=False, **kwargs):
|
|
"""
|
|
Simple abstraction on top of the
|
|
:meth:`~elasticsearch.Elasticsearch.scroll` api - a simple iterator that
|
|
yields all hits as returned by underlining scroll requests.
|
|
|
|
By default scan does not return results in any pre-determined order. To
|
|
have a standard order in the returned documents (either by score or
|
|
explicit sort definition) when scrolling, use ``preserve_order=True``. This
|
|
may be an expensive operation and will negate the performance benefits of
|
|
using ``scan``.
|
|
|
|
:arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
|
|
:arg query: body for the :meth:`~elasticsearch.Elasticsearch.search` api
|
|
:arg scroll: Specify how long a consistent view of the index should be
|
|
maintained for scrolled search
|
|
:arg raise_on_error: raises an exception (``ScanError``) if an error is
|
|
encountered (some shards fail to execute). By default we raise.
|
|
:arg preserve_order: don't set the ``search_type`` to ``scan`` - this will
|
|
cause the scroll to paginate with preserving the order. Note that this
|
|
can be an extremely expensive operation and can easily lead to
|
|
unpredictable results, use with caution.
|
|
|
|
Any additional keyword arguments will be passed to the initial
|
|
:meth:`~elasticsearch.Elasticsearch.search` call::
|
|
|
|
scan(es,
|
|
query={"query": {"match": {"title": "python"}}},
|
|
index="orders-*",
|
|
doc_type="books"
|
|
)
|
|
|
|
"""
|
|
if not preserve_order:
|
|
kwargs['search_type'] = 'scan'
|
|
# initial search
|
|
resp = client.search(body=query, scroll=scroll, **kwargs)
|
|
|
|
scroll_id = resp.get('_scroll_id')
|
|
if scroll_id is None:
|
|
return
|
|
|
|
first_run = True
|
|
while True:
|
|
# if we didn't set search_type to scan initial search contains data
|
|
if preserve_order and first_run:
|
|
first_run = False
|
|
else:
|
|
resp = client.scroll(scroll_id, scroll=scroll)
|
|
|
|
for hit in resp['hits']['hits']:
|
|
yield hit
|
|
|
|
# check if we have any errrors
|
|
if resp["_shards"]["failed"]:
|
|
logger.warning(
|
|
'Scroll request has failed on %d shards out of %d.',
|
|
resp['_shards']['failed'], resp['_shards']['total']
|
|
)
|
|
if raise_on_error:
|
|
raise ScanError(
|
|
'Scroll request has failed on %d shards out of %d.' %
|
|
(resp['_shards']['failed'], resp['_shards']['total'])
|
|
)
|
|
|
|
scroll_id = resp.get('_scroll_id')
|
|
# end of scroll
|
|
if scroll_id is None or not resp['hits']['hits']:
|
|
break
|
|
|
|
def reindex(client, source_index, target_index, query=None, target_client=None,
|
|
chunk_size=500, scroll='5m', scan_kwargs={}, bulk_kwargs={}):
|
|
|
|
"""
|
|
Reindex all documents from one index that satisfy a given query
|
|
to another, potentially (if `target_client` is specified) on a different cluster.
|
|
If you don't specify the query you will reindex all the documents.
|
|
|
|
.. note::
|
|
|
|
This helper doesn't transfer mappings, just the data.
|
|
|
|
:arg client: instance of :class:`~elasticsearch.Elasticsearch` to use (for
|
|
read if `target_client` is specified as well)
|
|
:arg source_index: index (or list of indices) to read documents from
|
|
:arg target_index: name of the index in the target cluster to populate
|
|
:arg query: body for the :meth:`~elasticsearch.Elasticsearch.search` api
|
|
:arg target_client: optional, is specified will be used for writing (thus
|
|
enabling reindex between clusters)
|
|
:arg chunk_size: number of docs in one chunk sent to es (default: 500)
|
|
:arg scroll: Specify how long a consistent view of the index should be
|
|
maintained for scrolled search
|
|
:arg scan_kwargs: additional kwargs to be passed to
|
|
:func:`~elasticsearch.helpers.scan`
|
|
:arg bulk_kwargs: additional kwargs to be passed to
|
|
:func:`~elasticsearch.helpers.bulk`
|
|
"""
|
|
target_client = client if target_client is None else target_client
|
|
|
|
docs = scan(client,
|
|
query=query,
|
|
index=source_index,
|
|
scroll=scroll,
|
|
fields=('_source', '_parent', '_routing', '_timestamp'),
|
|
**scan_kwargs
|
|
)
|
|
def _change_doc_index(hits, index):
|
|
for h in hits:
|
|
h['_index'] = index
|
|
if 'fields' in h:
|
|
h.update(h.pop('fields'))
|
|
yield h
|
|
|
|
kwargs = {
|
|
'stats_only': True,
|
|
}
|
|
kwargs.update(bulk_kwargs)
|
|
return bulk(target_client, _change_doc_index(docs, target_index),
|
|
chunk_size=chunk_size, **kwargs)
|
|
|