You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
176 lines
5.8 KiB
176 lines
5.8 KiB
from .utils import DslBase, _make_dsl_class
|
|
|
|
__all__ = [
|
|
'A', 'Agg', 'Filter', 'Bucket', 'Children', 'DateHistogram', 'Filters',
|
|
'GeoDistance', 'GeohashGrid', 'Global', 'Histogram', 'Iprange', 'Missing',
|
|
'Nested', 'Range', 'ReverseNested', 'SignificantTerms', 'Terms', 'Avg',
|
|
'Cardinality', 'ExtendedStats', 'GeoBounds', 'Max', 'Min', 'Percentiles',
|
|
'PercenileRanks', 'ScriptedMetric', 'Stats', 'Sum', 'TopHits', 'ValueCount',
|
|
'AvgBucket', 'BucketScript', 'BucketSelector', 'CumulativeSum',
|
|
'Derivative', 'ExtendedStatsBucket', 'MaxBucket', 'MinBucket', 'MovingAvg',
|
|
'PercentilesBucket', 'SerialDiff', 'StatsBucket', 'SumBucket'
|
|
]
|
|
|
|
|
|
def A(name_or_agg, filter=None, **params):
|
|
if filter is not None:
|
|
if name_or_agg != 'filter':
|
|
raise ValueError("Aggregation %r doesn't accept positional argument 'filter'." % name_or_agg)
|
|
params['filter'] = filter
|
|
|
|
# {"terms": {"field": "tags"}, "aggs": {...}}
|
|
if isinstance(name_or_agg, dict):
|
|
if params:
|
|
raise ValueError('A() cannot accept parameters when passing in a dict.')
|
|
# copy to avoid modifying in-place
|
|
agg = name_or_agg.copy()
|
|
# pop out nested aggs
|
|
aggs = agg.pop('aggs', None)
|
|
# should be {"terms": {"fied": "tags"}}
|
|
if len(agg) != 1:
|
|
raise ValueError('A() can only accept dict with an aggregation ({"terms": {...}}). '
|
|
'Instead it got (%r)' % name_or_agg)
|
|
agg_type, params = agg.popitem()
|
|
if aggs:
|
|
params = params.copy()
|
|
params['aggs'] = aggs
|
|
return Agg.get_dsl_class(agg_type)(**params)
|
|
|
|
# Terms(...) just return the nested agg
|
|
elif isinstance(name_or_agg, Agg):
|
|
if params:
|
|
raise ValueError('A() cannot accept parameters when passing in an Agg object.')
|
|
return name_or_agg
|
|
|
|
# "terms", field="tags"
|
|
return Agg.get_dsl_class(name_or_agg)(**params)
|
|
|
|
class Agg(DslBase):
|
|
_type_name = 'agg'
|
|
_type_shortcut = staticmethod(A)
|
|
name = None
|
|
|
|
class AggBase(object):
|
|
_param_defs = {
|
|
'aggs': {'type': 'agg', 'hash': True},
|
|
}
|
|
def __getitem__(self, agg_name):
|
|
agg = self._params.setdefault('aggs', {})[agg_name] # propagate KeyError
|
|
|
|
# make sure we're not mutating a shared state - whenever accessing a
|
|
# bucket, return a shallow copy of it to be safe
|
|
if isinstance(agg, Bucket):
|
|
agg = A(agg.name, **agg._params)
|
|
# be sure to store the copy so any modifications to it will affect us
|
|
self._params['aggs'][agg_name] = agg
|
|
|
|
return agg
|
|
|
|
def __setitem__(self, agg_name, agg):
|
|
self.aggs[agg_name] = A(agg)
|
|
|
|
def _agg(self, bucket, name, agg_type, *args, **params):
|
|
agg = self[name] = A(agg_type, *args, **params)
|
|
|
|
# For chaining - when creating new buckets return them...
|
|
if bucket:
|
|
return agg
|
|
# otherwise return self._base so we can keep chaining
|
|
else:
|
|
return self._base
|
|
|
|
def metric(self, name, agg_type, *args, **params):
|
|
return self._agg(False, name, agg_type, *args, **params)
|
|
|
|
def bucket(self, name, agg_type, *args, **params):
|
|
return self._agg(True, name, agg_type, *args, **params)
|
|
|
|
def pipeline(self, name, agg_type, *args, **params):
|
|
return self._agg(False, name, agg_type, *args, **params)
|
|
|
|
|
|
class Bucket(AggBase, Agg):
|
|
def __init__(self, **params):
|
|
super(Bucket, self).__init__(**params)
|
|
# remember self for chaining
|
|
self._base = self
|
|
|
|
def to_dict(self):
|
|
d = super(AggBase, self).to_dict()
|
|
if 'aggs' in d[self.name]:
|
|
d['aggs'] = d[self.name].pop('aggs')
|
|
return d
|
|
|
|
class Filter(Bucket):
|
|
name = 'filter'
|
|
_param_defs = {
|
|
'filter': {'type': 'query'},
|
|
'aggs': {'type': 'agg', 'hash': True},
|
|
}
|
|
|
|
def __init__(self, filter=None, **params):
|
|
if filter is not None:
|
|
params['filter'] = filter
|
|
super(Filter, self).__init__(**params)
|
|
|
|
def to_dict(self):
|
|
d = super(Filter, self).to_dict()
|
|
d[self.name].update(d[self.name].pop('filter', {}))
|
|
return d
|
|
|
|
class Pipeline(Agg):
|
|
pass
|
|
|
|
AGGS = (
|
|
(Bucket, 'children', None),
|
|
(Bucket, 'date_histogram', None),
|
|
(Bucket, 'date_range', None),
|
|
(Bucket, 'filters', {'filters': {'type': 'query', 'hash': True}}),
|
|
(Bucket, 'geo_distance', None),
|
|
(Bucket, 'geohash_grid', None),
|
|
(Bucket, 'global', None),
|
|
(Bucket, 'histogram', None),
|
|
(Bucket, 'iprange', None),
|
|
(Bucket, 'missing', None),
|
|
(Bucket, 'nested', None),
|
|
(Bucket, 'range', None),
|
|
(Bucket, 'reverse_nested', None),
|
|
(Bucket, 'significant_terms', None),
|
|
(Bucket, 'terms', None),
|
|
|
|
(Agg, 'avg', None),
|
|
(Agg, 'cardinality', None),
|
|
(Agg, 'extended_stats', None),
|
|
(Agg, 'geo_bounds', None),
|
|
(Agg, 'max', None),
|
|
(Agg, 'min', None),
|
|
(Agg, 'percentiles', None),
|
|
(Agg, 'percentile_ranks', None),
|
|
(Agg, 'scripted_metric', None),
|
|
(Agg, 'stats', None),
|
|
(Agg, 'sum', None),
|
|
(Agg, 'top_hits', None),
|
|
(Agg, 'value_count', None),
|
|
|
|
(Pipeline, 'avg_bucket', None),
|
|
(Pipeline, 'bucket_script', None),
|
|
(Pipeline, 'bucket_selector', None),
|
|
(Pipeline, 'cumulative_sum', None),
|
|
(Pipeline, 'derivative', None),
|
|
(Pipeline, 'extended_stats_bucket', None),
|
|
(Pipeline, 'max_bucket', None),
|
|
(Pipeline, 'min_bucket', None),
|
|
(Pipeline, 'moving_avg', None),
|
|
(Pipeline, 'percentiles_bucket', None),
|
|
(Pipeline, 'serial_diff', None),
|
|
(Pipeline, 'stats_bucket', None),
|
|
(Pipeline, 'sum_bucket', None),
|
|
)
|
|
|
|
# generate the aggregation classes dynamicaly
|
|
for base, fname, params_def in AGGS:
|
|
# don't override the params def from AggBase
|
|
if params_def:
|
|
params_def.update(AggBase._param_defs)
|
|
fclass = _make_dsl_class(base, fname, params_def)
|
|
globals()[fclass.__name__] = fclass
|
|
|