在第一部分的系列中,我们探讨了MongoDB,这是最可靠和强大的文档导向NoSQL数据库之一。在这里的第二部分,我们将研究另一个无法避免的NoSQL数据库:
不仅仅是一个流行且强大的开源分布式NoSQL数据库,Elasticsearch首先是一个搜索和分析引擎。它构建在Apache Lucene之上,这是最著名的搜索引擎Java库,能够对结构和非结构化数据执行实时搜索和分析操作。它被设计来高效地处理大量数据。
再次声明,这个简短的帖子绝不是一个Elasticsearch教程。因此,强烈建议读者广泛使用官方文档,以及Madhusudhan Konda(Manning,2023)撰写的优秀书籍”Elasticsearch实战“,以了解更多关于产品架构和操作的知识。在这里,我们只是重新实现了之前相同的用例,但这次使用的是Elasticsearch而不是MongoDB。
那么,我们开始吧!
域模型
下面的图表展示了我们的*客户-订单-产品*域模型:
这幅图与第一部分中展示的图相同。和MongoDB一样,Elasticsearch也是一个文档数据存储,因此它期望文档以JSON格式呈现。唯一的区别在于,为了处理其数据,Elasticsearch需要将它们索引。
在Elasticsearch数据存储中索引数据有几种方式;例如,从关系数据库中传输它们,从文件系统中提取它们,从实时源中流式传输它们等。但无论采用哪种摄取方法,最终都包括通过专用客户端调用Elasticsearch RESTful API。这类专用客户端分为两种:
- 基于REST的客户端,如
curl
、Postman
、Java、JavaScript、Node.js等的HTTP模块。 - 编程语言SDK(软件开发工具包):Elasticsearch为所有最常用的编程语言提供了SDK,包括但不限于Java、Python等。
使用Elasticsearch索引新文档意味着通过针对名为_doc
的特殊RESTful API端点发起POST
请求来创建它。例如,以下请求将创建一个新的Elasticsearch索引并在其中存储一个新的客户实例。
POST customers/_doc/
{
"id": 10,
"firstName": "John",
"lastName": "Doe",
"email": {
"address": "[email protected]",
"personal": "John Doe",
"encodedPersonal": "John Doe",
"type": "personal",
"simple": true,
"group": true
},
"addresses": [
{
"street": "75, rue Véronique Coulon",
"city": "Coste",
"country": "France"
},
{
"street": "Wulfweg 827",
"city": "Bautzen",
"country": "Germany"
}
]
}
使用curl
或Kibana控制台(稍后会看到)运行上述请求将产生以下结果:
{
"_index": "customers",
"_id": "ZEQsJI4BbwDzNcFB0ubC",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 1,
"_primary_term": 1
}
这是对POST
请求的Elasticsearch标准响应。它确认已创建名为customers
的索引,并拥有一个新的customer
文档,该文档由系统自动生成的ID标识(在此案例中为ZEQsJI4BbwDzNcFB0ubC
)。
这里还出现了其他有趣的参数,如_version
和特别是_shards
。无需过多细节,Elasticsearch将索引创建为文档的逻辑集合。就像将纸质文档保存在文件柜中一样,Elasticsearch将文档保存在索引中。每个索引由分片,组成,这些分片是Apache Lucene的物理实例,Lucene是背后的引擎,负责将数据存入或从存储中取出。它们可能是主分片,存储文档,或者是副本分片,正如其名称所示,存储主分片的副本。更多关于这方面的信息可以在Elasticsearch文档中找到 – 现在,我们需要注意的是,我们名为customers
的索引由两个分片组成:其中一个是主分片。
A final notice: the POST
request above doesn’t mention the ID value as it is automatically generated. While this is probably the most common use case, we could have provided our own ID value. In each case, the HTTP request to be used isn’t POST
anymore, but PUT
.
回到我们的域模型图,如您所见,其中心文档是Order
,存储在名为Orders
的专用集合中。一个Order
是OrderItem
文档的聚合体,每个OrderItem
都指向其关联的Product
。Order
文档还引用了下单的Customer
。在Java中,这是以下面的方式实现的:
public class Customer
{
private Long id;
private String firstName, lastName;
private InternetAddress email;
private Set<Address> addresses;
...
}
上面的代码展示了`Customer`类的一个片段。这是一个简单的POJO(Plain Old Java Object),具有客户ID、名、姓、电子邮件地址以及一组邮政地址等属性。
现在让我们来看看`Order`文档。
public class Order
{
private Long id;
private String customerId;
private Address shippingAddress;
private Address billingAddress;
private Set<String> orderItemSet = new HashSet<>()
...
}
在这里,您会注意到与MongoDB版本相比有一些不同。实际上,在MongoDB中,我们使用了一个指向与此订单关联的客户实例的引用。而在Elasticsearch中,这种引用的概念不存在,因此我们使用这个文档ID来创建订单与其下单客户之间的关联。同样的,这也适用于`orderItemSet`属性,它创建了订单与其项目之间的关联。
我们的域模型的其余部分也非常相似,并且基于相同的规范化思想。例如,`OrderItem`文档:
public class OrderItem
{
private String id;
private String productId;
private BigDecimal price;
private int amount;
...
}
在这里,我们需要关联构成当前订单项目对象的产品。最后但同样重要的是,我们有`Product`文档:
public class Product
{
private String id;
private String name, description;
private BigDecimal price;
private Map<String, String> attributes = new HashMap<>();
...
}
数据仓库
Quarkus Panache 大大简化了数据持久化过程,支持活动记录和仓库设计模式。在第一部分中,我们使用了Quarkus Panache扩展来为MongoDB实现数据仓库,但目前还没有相应的Quarkus Panache扩展用于Elasticsearch。因此,在等待可能的未来Quarkus Elasticsearch扩展的同时,我们需要使用Elasticsearch专用客户端手动实现我们的数据仓库。
Elasticsearch 是用Java编写的,因此它提供了对使用Java客户端库调用Elasticsearch API的原生支持并不奇怪。这个库基于流式API构建器设计模式,并提供了同步和异步处理模型。它至少需要Java 8。
那么,基于流式API构建器的数据仓库是什么样的呢?下面是CustomerServiceImpl
类的一个片段,该类作为Customer
文档的数据仓库。
@ApplicationScoped
public class CustomerServiceImpl implements CustomerService
{
private static final String INDEX = "customers";
@Inject
ElasticsearchClient client;
@Override
public String doIndex(Customer customer) throws IOException
{
return client.index(IndexRequest.of(ir -> ir.index(INDEX).document(customer))).id();
}
...
正如我们所看到的,我们的数据仓库实现必须是一个具有应用程序作用域的CDI bean。Elasticsearch Java客户端只是通过Quarkus扩展quarkus-elasticsearch-java-client
注入。这种方式避免了我们否则必须使用的许多花哨的特性。我们需要能够注入客户端的唯一事情是声明以下属性:
quarkus.elasticsearch.hosts = elasticsearch:9200
在这里,elasticsearch
是我们在 docker-compose.yaml
文件中与 Elastic search 数据库服务器关联的 DNS(域名服务器)名称。9200
是服务器用于监听连接的 TCP 端口号。
上面提到的 doIndex()
方法会创建一个名为 customers
的新索引(如果尚不存在),并将一个代表 Customer
类实例的新文档索引(存储)到其中。索引过程是基于一个 IndexRequest
来执行的,它接受索引名称和文档正文作为输入参数。至于文档 ID,则是自动生成并返回给调用者以供进一步引用。
下面的方法允许通过作为输入参数给出的 ID 检索客户:
...
@Override
public Customer getCustomer(String id) throws IOException
{
GetResponse<Customer> getResponse = client.get(GetRequest.of(gr -> gr.index(INDEX).id(id)), Customer.class);
return getResponse.found() ? getResponse.source() : null;
}
...
原理相同:使用这种 流畅API 构建器模式,我们以类似于 IndexRequest
的方式构建一个 GetRequest
实例,并使用 Elasticsearch Java 客户端运行它。我们的数据存储库的其他端点,允许我们执行完整的搜索操作或更新和删除客户,也是以相同的方式设计的。
请花些时间查看代码,以理解事物是如何工作的。
REST API
我们的MongoDB REST API接口非常容易实现,这得益于quarkus-mongodb-rest-data-panache
扩展,其中的注解处理器自动生成了所有必需的端点。使用Elasticsearch时,我们还没有享受到同样的便利,因此需要手动实现。这并不是什么大问题,因为我们能够注入之前的数据仓库,如下所示:
@Path("customers")
@Produces(APPLICATION_JSON)
@Consumes(APPLICATION_JSON)
public class CustomerResourceImpl implements CustomerResource
{
@Inject
CustomerService customerService;
@Override
public Response createCustomer(Customer customer, @Context UriInfo uriInfo) throws IOException
{
return Response.accepted(customerService.doIndex(customer)).build();
}
@Override
public Response findCustomerById(String id) throws IOException
{
return Response.ok().entity(customerService.getCustomer(id)).build();
}
@Override
public Response updateCustomer(Customer customer) throws IOException
{
customerService.modifyCustomer(customer);
return Response.noContent().build();
}
@Override
public Response deleteCustomerById(String id) throws IOException
{
customerService.removeCustomerById(id);
return Response.noContent().build();
}
}
这是客户REST API实现的代码。其他与订单、订单项和产品相关的实现是类似的。
接下来,我们来看看如何运行和测试整个系统。
运行和测试我们的微服务
现在我们已经了解了我们实现的细节,让我们看看如何运行和测试它。我们选择使用docker-compose
工具来执行这一操作。以下是相关的docker-compose.yml
文件:
version: "3.7"
services:
elasticsearch:
image: elasticsearch:8.12.2
environment:
node.name: node1
cluster.name: elasticsearch
discovery.type: single-node
bootstrap.memory_lock: "true"
xpack.security.enabled: "false"
path.repo: /usr/share/elasticsearch/backups
ES_JAVA_OPTS: -Xms512m -Xmx512m
hostname: elasticsearch
container_name: elasticsearch
ports:
- "9200:9200"
- "9300:9300"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- node1-data:/usr/share/elasticsearch/data
networks:
- elasticsearch
kibana:
image: docker.elastic.co/kibana/kibana:8.6.2
hostname: kibana
container_name: kibana
environment:
- elasticsearch.url=http://elasticsearch:9200
- csp.strict=false
ulimits:
memlock:
soft: -1
hard: -1
ports:
- 5601:5601
networks:
- elasticsearch
depends_on:
- elasticsearch
links:
- elasticsearch:elasticsearch
docstore:
image: quarkus-nosql-tests/docstore-elasticsearch:1.0-SNAPSHOT
depends_on:
- elasticsearch
- kibana
hostname: docstore
container_name: docstore
links:
- elasticsearch:elasticsearch
- kibana:kibana
ports:
- "8080:8080"
- "5005:5005"
networks:
- elasticsearch
environment:
JAVA_DEBUG: "true"
JAVA_APP_DIR: /home/jboss
JAVA_APP_JAR: quarkus-run.jar
volumes:
node1-data:
driver: local
networks:
elasticsearch:
这个文件指示docker-compose
工具运行三个服务:
- A service named
elasticsearch
running the Elasticsearch 8.6.2 database - A service named
kibana
running the multipurpose web console providing different options such as executing queries, creating aggregations, and developing dashboards and graphs - A service named
docstore
running our Quarkus microservice
现在,您可以检查所有必需的进程是否正在运行:
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
005ab8ebf6c0 quarkus-nosql-tests/docstore-elasticsearch:1.0-SNAPSHOT "/opt/jboss/containe…" 3 days ago Up 3 days 0.0.0.0:5005->5005/tcp, :::5005->5005/tcp, 0.0.0.0:8080->8080/tcp, :::8080->8080/tcp, 8443/tcp docstore
9678c0a04307 docker.elastic.co/kibana/kibana:8.6.2 "/bin/tini -- /usr/l…" 3 days ago Up 3 days 0.0.0.0:5601->5601/tcp, :::5601->5601/tcp kibana
805eba38ff6c elasticsearch:8.12.2 "/bin/tini -- /usr/l…" 3 days ago Up 3 days 0.0.0.0:9200->9200/tcp, :::9200->9200/tcp, 0.0.0.0:9300->9300/tcp, :::9300->9300/tcp elasticsearch
$
为了确认Elasticsearch服务器是否可用并能执行查询,您可以通过http://localhost:601连接到Kibana。在页面滚动到底部并选择偏好菜单中的Dev Tools
后,您可以运行如下所示的查询:
为了测试微服务,请按以下步骤操作:
1. 克隆相关的GitHub仓库:
$ git clone https://github.com/nicolasduminil/docstore.git
2. 进入项目:
$ cd docstore
3. 切换到正确的分支:
$ git checkout elastic-search
4. 构建:
$ mvn clean install
5. 运行集成测试:
$ mvn -DskipTests=false failsafe:integration-test
最后一个命令将运行提供的17个集成测试,这些测试应该都会成功。您还可以通过在浏览器中访问 http://localhost:8080/q:swagger-ui 使用Swagger UI界面进行测试。接着,为了测试端点,您可以使用位于 docstore-api
项目中 src/resources/data
目录下的JSON文件中的有效负载。
享受吧!
Source:
https://dzone.com/articles/cruding-nosql-data-with-quarkus-part-two-elasticse