Elasticsearch DSL 是一个高级库,其目的是帮助编写和运行针对 Elasticsearch 的查询。它构建在官方低级客户端 (elasticsearch-py) 之上。
它提供了一种更方便、更惯用的方式来编写和操作查询。它与 Elasticsearch JSON DSL 非常接近,反映了其术语和结构。它直接使用定义的类或类似查询集的表达式公开了 Python 的整个 DSL 范围。
它还提供了一个可选的包装器,用于将文档作为 Python 对象进行处理:定义映射、检索和保存文档、将文档数据包装在用户定义的类中。
要使用其他 Elasticsearch API(例如集群运行状况),只需使用底层客户端。
pip 安装elasticsearch-dsl
请参阅示例目录以查看使用elasticsearch-dsl
的一些复杂示例。
该库与2.x
以来的所有 Elasticsearch 版本兼容,但您必须使用匹配的主要版本:
对于Elasticsearch 8.0及更高版本,请使用库的主要版本 8 ( 8.xy
)。
对于Elasticsearch 7.0及更高版本,请使用库的主要版本 7 ( 7.xy
)。
对于Elasticsearch 6.0及更高版本,请使用该库的主要版本 6 ( 6.xy
)。
对于Elasticsearch 5.0及更高版本,请使用库的主要版本 5 ( 5.xy
)。
对于Elasticsearch 2.0及更高版本,请使用库的主要版本 2 ( 2.xy
)。
在 setup.py 或requirements.txt 中设置要求的推荐方法是:
# Elasticsearch 8.x elasticsearch-dsl>=8.0.0,<9.0.0 # Elasticsearch 7.x elasticsearch-dsl>=7.0.0,<8.0.0 # Elasticsearch 6.x elasticsearch-dsl>=6.0.0,<7.0.0 # Elasticsearch 5.x elasticsearch-dsl>=5.0.0,<6.0.0 # Elasticsearch 2.x elasticsearch-dsl>=2.0.0,<3.0.0
开发发生在main
上,较旧的分支仅获得错误修复版本
让我们将一个典型的搜索请求直接写为dict
:
from elasticsearch import Elasticsearch
client = Elasticsearch ( "https://localhost:9200" )
response = client . search (
index = "my-index" ,
body = {
"query" : {
"bool" : {
"must" : [{ "match" : { "title" : "python" }}],
"must_not" : [{ "match" : { "description" : "beta" }}],
"filter" : [{ "term" : { "category" : "search" }}]
}
},
"aggs" : {
"per_tag" : {
"terms" : { "field" : "tags" },
"aggs" : {
"max_lines" : { "max" : { "field" : "lines" }}
}
}
}
}
)
for hit in response [ 'hits' ][ 'hits' ]:
print ( hit [ '_score' ], hit [ '_source' ][ 'title' ])
for tag in response [ 'aggregations' ][ 'per_tag' ][ 'buckets' ]:
print ( tag [ 'key' ], tag [ 'max_lines' ][ 'value' ])
这种方法的问题在于它非常冗长,容易出现语法错误,例如不正确的嵌套,难以修改(例如添加另一个过滤器)并且编写起来绝对不有趣。
让我们使用 Python DSL 重写该示例:
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
client = Elasticsearch ( "https://localhost:9200" )
s = Search ( using = client , index = "my-index" )
. filter ( "term" , category = "search" )
. query ( "match" , title = "python" )
. exclude ( "match" , description = "beta" )
s . aggs . bucket ( 'per_tag' , 'terms' , field = 'tags' )
. metric ( 'max_lines' , 'max' , field = 'lines' )
response = s . execute ()
for hit in response :
print ( hit . meta . score , hit . title )
for tag in response . aggregations . per_tag . buckets :
print ( tag . key , tag . max_lines . value )
如您所见,图书馆负责:
Query
对象(例如“match”)bool
查询term
查询放入bool
查询的过滤器上下文中让我们用一个简单的 Python 类来表示博客系统中的一篇文章:
from datetime import datetime
from elasticsearch_dsl import Document , Date , Integer , Keyword , Text , connections
# Define a default Elasticsearch client
connections . create_connection ( hosts = "https://localhost:9200" )
class Article ( Document ):
title = Text ( analyzer = 'snowball' , fields = { 'raw' : Keyword ()})
body = Text ( analyzer = 'snowball' )
tags = Keyword ()
published_from = Date ()
lines = Integer ()
class Index :
name = 'blog'
settings = {
"number_of_shards" : 2 ,
}
def save ( self , ** kwargs ):
self . lines = len ( self . body . split ())
return super ( Article , self ). save ( ** kwargs )
def is_published ( self ):
return datetime . now () > self . published_from
# create the mappings in elasticsearch
Article . init ()
# create and save and article
article = Article ( meta = { 'id' : 42 }, title = 'Hello world!' , tags = [ 'test' ])
article . body = ''' looong text '''
article . published_from = datetime . now ()
article . save ()
article = Article . get ( id = 42 )
print ( article . is_published ())
# Display cluster health
print ( connections . get_connection (). cluster . health ())
在此示例中您可以看到:
.save()
方法以挂钩持久性生命周期您可以在文档的持久性章节中查看更多内容。
elasticsearch-py
迁移您不必移植整个应用程序来获得 Python DSL 的好处,您可以从现有的dict
创建一个Search
对象,使用 API 修改它并将其序列化回dict
来逐步开始:
body = {...} # insert complicated query here
# Convert to Search object
s = Search . from_dict ( body )
# Add some filters, aggregations, queries, ...
s . filter ( "term" , tags = "python" )
# Convert back to dict to plug back into existing code
body = s . to_dict ()
激活虚拟环境(virtualenvs):
$ virtualenv venv
$ source venv/bin/activate
要安装开发所需的所有依赖项,请运行:
$ pip install -e ' .[develop] '
要运行elasticsearch-dsl-py
的所有测试,请运行:
$ python setup.py test
或者,可以使用test_elasticsearch_dsl
中的run_tests.py
脚本(它包装了 pytest)来运行测试套件的子集。下面是一些示例:
# Run all of the tests in `test_elasticsearch_dsl/test_analysis.py`
$ ./run_tests.py test_analysis.py
# Run only the `test_analyzer_serializes_as_name` test.
$ ./run_tests.py test_analysis.py::test_analyzer_serializes_as_name
pytest
将跳过test_elasticsearch_dsl/test_integration
中的测试,除非存在可以发生连接的 Elasticsearch 实例。默认情况下,根据elasticsearch-py
Connection 类中指定的默认值,在localhost:9200
尝试测试连接。由于运行集成测试会对 Elasticsearch 集群造成破坏性更改,因此仅当关联集群为空时才运行它们。因此,如果localhost:9200
处的 Elasticsearch 实例不满足这些要求,则可以通过TEST_ES_SERVER
环境变量指定不同的测试 Elasticsearch 服务器。
$ TEST_ES_SERVER=my-test-server:9201 ./run_tests
文档可在 https://elasticsearch-dsl.readthedocs.io 上获取。
想要破解 Elasticsearch DSL?惊人的!我们有贡献指南。
版权所有 2013 Elasticsearch
根据 Apache 许可证 2.0 版(“许可证”)获得许可;除非遵守许可证,否则您不得使用此文件。您可以在以下位置获取许可证副本:
http://www.apache.org/licenses/LICENSE-2.0
除非适用法律要求或书面同意,否则根据许可证分发的软件均按“原样”分发,不带任何明示或暗示的保证或条件。请参阅许可证,了解许可证下管理权限和限制的特定语言。