You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
151 lines
5.1 KiB
151 lines
5.1 KiB
from six import iteritems
|
|
from itertools import chain
|
|
|
|
from .utils import DslBase
|
|
from .field import InnerObject, String
|
|
from .connections import connections
|
|
from .exceptions import IllegalOperation
|
|
|
|
META_FIELDS = frozenset((
|
|
'dynamic', 'transform', 'dynamic_date_formats', 'date_detection',
|
|
'numeric_detection', 'dynamic_templates'
|
|
))
|
|
|
|
class Properties(InnerObject, DslBase):
|
|
def __init__(self, name):
|
|
self._name = name
|
|
super(Properties, self).__init__()
|
|
|
|
def __repr__(self):
|
|
return 'Properties(%r)' % self._name
|
|
|
|
@property
|
|
def name(self):
|
|
return self._name
|
|
|
|
|
|
class Mapping(object):
|
|
def __init__(self, name):
|
|
self.properties = Properties(name)
|
|
self._meta = {}
|
|
|
|
def __repr__(self):
|
|
return 'Mapping(%r)' % self.doc_type
|
|
|
|
@classmethod
|
|
def from_es(cls, index, doc_type, using='default'):
|
|
m = cls(doc_type)
|
|
m.update_from_es(index, using)
|
|
return m
|
|
|
|
def _collect_analysis(self):
|
|
analysis = {}
|
|
fields = []
|
|
if '_all' in self._meta:
|
|
fields.append(String(**self._meta['_all']))
|
|
|
|
for f in chain(fields, self.properties._collect_fields()):
|
|
for analyzer_name in ('analyzer', 'index_analyzer', 'search_analyzer'):
|
|
if not hasattr(f, analyzer_name):
|
|
continue
|
|
analyzer = getattr(f, analyzer_name)
|
|
d = analyzer.get_analysis_definition()
|
|
# empty custom analyzer, probably already defined out of our control
|
|
if not d:
|
|
continue
|
|
|
|
# merge the defintion
|
|
# TODO: conflict detection/resolution
|
|
for key in d:
|
|
analysis.setdefault(key, {}).update(d[key])
|
|
|
|
return analysis
|
|
|
|
def save(self, index, using='default'):
|
|
# TODO: replace with creating an Index instance to avoid duplication
|
|
es = connections.get_connection(using)
|
|
if not es.indices.exists(index=index):
|
|
es.indices.create(index=index, body={'mappings': self.to_dict(), 'settings': {'analysis': self._collect_analysis()}})
|
|
else:
|
|
analysis = self._collect_analysis()
|
|
if analysis:
|
|
if es.cluster.state(index=index, metric='metadata')['metadata']['indices'][index]['state'] != 'close':
|
|
# TODO: check if the analysis config is already there
|
|
raise IllegalOperation(
|
|
'You cannot update analysis configuration on an open index, you need to close index %s first.' % index)
|
|
es.indices.put_settings(index=index, body={'analysis': analysis})
|
|
es.indices.put_mapping(index=index, doc_type=self.doc_type, body=self.to_dict())
|
|
|
|
def update_from_es(self, index, using='default'):
|
|
es = connections.get_connection(using)
|
|
raw = es.indices.get_mapping(index=index, doc_type=self.doc_type)
|
|
_, raw = raw.popitem()
|
|
raw = raw['mappings'][self.doc_type]
|
|
|
|
for name, definition in iteritems(raw['properties']):
|
|
self.field(name, definition)
|
|
|
|
# metadata like _all etc
|
|
for name, value in iteritems(raw):
|
|
if name != 'properties':
|
|
if isinstance(value, dict):
|
|
self.meta(name, **value)
|
|
else:
|
|
self.meta(name, value)
|
|
|
|
def update(self, mapping, update_only=False):
|
|
for name in mapping:
|
|
if update_only and name in self:
|
|
# nested and inner objects, merge recursively
|
|
if hasattr(self[name], 'update'):
|
|
self[name].update(mapping[name])
|
|
continue
|
|
self.field(name, mapping[name])
|
|
|
|
if update_only:
|
|
for name in mapping._meta:
|
|
if name not in self._meta:
|
|
self._meta[name] = mapping._meta[name]
|
|
else:
|
|
self._meta.update(mapping._meta)
|
|
|
|
def __contains__(self, name):
|
|
return name in self.properties.properties
|
|
|
|
def __getitem__(self, name):
|
|
return self.properties.properties[name]
|
|
|
|
def __iter__(self):
|
|
return iter(self.properties.properties)
|
|
|
|
@property
|
|
def doc_type(self):
|
|
return self.properties.name
|
|
|
|
def field(self, *args, **kwargs):
|
|
self.properties.field(*args, **kwargs)
|
|
return self
|
|
|
|
def meta(self, name, params=None, **kwargs):
|
|
if not name.startswith('_') and name not in META_FIELDS:
|
|
name = '_' + name
|
|
|
|
if params and kwargs:
|
|
raise ValueError('Meta configs cannot have both value and a dictionary.')
|
|
|
|
self._meta[name] = kwargs if params is None else params
|
|
return self
|
|
|
|
def to_dict(self):
|
|
d = self.properties.to_dict()
|
|
meta = self._meta
|
|
|
|
# hard coded serialization of analyzers in _all
|
|
if '_all' in meta:
|
|
meta = meta.copy()
|
|
_all = meta['_all'] = meta['_all'].copy()
|
|
for f in ('analyzer', 'search_analyzer', 'index_analyzer'):
|
|
if hasattr(_all.get(f, None), 'to_dict'):
|
|
_all[f] = _all[f].to_dict()
|
|
d[self.doc_type].update(meta)
|
|
return d
|
|
|