ElasticSearch
概念
节点
节点是Elasticsearch的一个实例,用于存储数据。它在 Elasticsearch 实例开始运行时创建。简单来说,它指的是一个正在运行的 Elasticsearch 实例。出于管理目的,节点由其名称标识。
集群
集群是一组一个或多个节点,它们一起工作以保存数据。集群为整个数据提供跨所有节点的搜索功能和联合索引。基本上,它是一组运行 Elasticsearch 引擎的系统。
类似于节点,它也由一个名称标识以用于管理目的。集群的默认名称是 elasticsearch。
Document(文档)
该文档是以独特方式的字段集合,以 JSON 格式定义。它用于存储驻留在 Elasticsearch 索引中的数据。所以,它可以被索引。每个文档都属于一种类型并与唯一标识符(UID) 相关联。它以 JSON(键: 值)对表示。在 RDBMS 中,文档表示为表中的一行。
在 Elasticsearch 术语中,Document = Row、Field = Column、Index = Table 和 Cluster = Database。
Index(索引)
索引是一组不同类型的文档。它有助于执行搜索、更新和删除操作以及索引。在关系数据库中,索引表示为表,这意味着索引类似于 RDBMS 中的表。
Elasticsearch 允许在单个集群中定义各种索引。为了提高性能,索引使用了分片的概念。
分片
使用集群可以存储大量数据,但可能会超出单个服务器的容量。为了克服这个问题,Elasticsearch 允许将您的索引分成几个部分,称为 shard。因此,将您的索引分成几部分以创建分片。您可以在创建索引时定义所需的分片数量。
创建索引时,可以定义所需的分片数量。每个分片都是独立的且功能齐全。
副本
副本是分片的附加副本。它们像分片一样执行查询。 Elasticsearch 使用户能够创建其索引和分片的副本。
Elasticsearch 提供副本以避免任何类型的故障,并有助于在出现故障时提高数据的可用性。故障可能类似于-节点或分片由于某种原因将脱机。复制不仅增加了数据的可用性,还通过在这些副本中执行并行搜索操作来提高搜索的性能。
Type
在 Elasticsearch 中,为具有通用集合的文档定义了类型领域。它是索引的逻辑类别,其语义取决于用户。
映射
在 Elasticsearch 中,为具有通用集合的文档定义了类型领域。它是索引的逻辑类别,其语义取决于用户。
安装
基于Docker
官方文档,本次安装版本为8.9.0,其他版本可参考elasticsearch | Docker @ Elastic
拉取镜像
docker pull docker.elastic.co/elasticsearch/elasticsearch:8.9.0
根据官方建议验证镜像签名
# 下载弹性公钥验证容器签名
wget https://artifacts.elastic.co/cosign.pub
# 根据弹性公钥验证容器
cosign verify --key cosign.pub docker.elastic.co/elasticsearch/elasticsearch:8.9.0
得到以下输出:
Verification for docker.elastic.co/elasticsearch/elasticsearch:8.9.0 --
The following checks were performed on each of these signatures:
- The cosign claims were validated
- Existence of the claims in the transparency log was verified offline
- The signatures were verified against the specified public key
cosign安装,官方教程
# binary
curl -O -L "https://github.com/sigstore/cosign/releases/latest/download/cosign-linux-amd64"
mv cosign-linux-amd64 /usr/local/bin/cosign
chmod +x /usr/local/bin/cosign
# rpm
LATEST_VERSION=$(curl https://api.github.com/repos/sigstore/cosign/releases/latest | grep tag_name | cut -d : -f2 | tr -d "v\", ")
curl -O -L "https://github.com/sigstore/cosign/releases/latest/download/cosign-${LATEST_VERSION}.x86_64.rpm"
rpm -ivh cosign-${LATEST_VERSION}.x86_64.rpm
# dkpg
LATEST_VERSION=$(curl https://api.github.com/repos/sigstore/cosign/releases/latest | grep tag_name | cut -d : -f2 | tr -d "v\", ")
curl -O -L "https://github.com/sigstore/cosign/releases/latest/download/cosign_${LATEST_VERSION}_amd64.deb"
dpkg -i cosign_${LATEST_VERSION}_amd64.deb
启动单节点es集群用来开发和测试
创建主机文件路径,将配置、数据和插件映射到容器内部
# 创建Elasticsearch配置文件夹
sudo mkdir -p /usr/local/data-docker/elasticsearch/config
# 创建Elasticsearch数据文件夹
sudo mkdir -p /usr/local/data-docker/elasticsearch/data
# 创建Elasticsearch插件文件夹(如:ik)
sudo mkdir -p /usr/local/data-docker/elasticsearch/plugin
创建并写入elasticsearch.yml配置
sudo vim /usr/local/data-docker/elasticsearch/config/elasticsearch.yml
http.host: 0.0.0.0
# 开启x-pack插件,用于添加账号密码、安全控制等配置
xpack.security.enabled: false #最关键的一句
xpack.security.transport.ssl.enabled: false
xpack.security.enrollment.enabled: true
xpack配置不加会启动失败
文件夹赋权
sudo chmod -R 777 /usr/local/data-docker/elasticsearch/
创建并启动容器
docker run --name elasticsearch -d \
-e ES_JAVA_OPTS="-Xms512m -Xmx512m" \
-e "discovery.type=single-node" \
-p 9200:9200 \
-p 9300:9300 \
-v /usr/local/data-docker/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /usr/local/data-docker/elasticsearch/data:/usr/share/elasticsearch/data \
-v /usr/local/data-docker/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
docker.elastic.co/elasticsearch/elasticsearch:8.9.0
测试服务
命令行执行:
curl http://localhost:9200
得到以下结果:
{
"name" : "b54cdd552a68",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "F4Y7dQ6oR42lyi2iL6IfvQ",
"version" : {
"number" : "8.9.0",
"build_flavor" : "default",
"build_type" : "docker",
"build_hash" : "8aa461beb06aa0417a231c345a1b8c38fb498a0d",
"build_date" : "2023-07-19T14:43:58.555259655Z",
"build_snapshot" : false,
"lucene_version" : "9.7.0",
"minimum_wire_compatibility_version" : "7.17.0",
"minimum_index_compatibility_version" : "7.0.0"
},
"tagline" : "You Know, for Search"
}
5. 安装ik分词器
ik分词的版本需要与ElasticSearch版本一致,来自ik分词器,使用命令下载:
wget https://github.com/medcl/elasticsearch-analysis-ik/files/12237226/elasticsearch-analysis-ik-8.9.0.zip
测试(Python)
a) 安装elasticsearch库
pip install elasticsearch
b) 连接es服务
from elasticsearch import Elasticsearch
es = Elasticsearch([{"host": "127.0.0.1", "port": 9200}])
c) 创建索引(数据库)
body = {
"mappings": {
"properties": {
"knowledge_class": {
"type": "text"
},
"question": {
"type": "text"
},
"answer": {
"type": "text"
},
"question_vector": {
"type": "dense_vector",
"dims": 384
}
}
}
}
# 创建数据库
es.indices.create(index="knowledge", id=0, body=body)
d) 插入数据
a. 读取知识库
import json
knowledge_vec_file = "../data/knowledge_vec.json"
with open(knowledge_vec_file, "r", encoding="utf-8") as f:
knowledge_vectors = json.load(f)
b. 插入知识库数据到es
# 所有要插入数据
insert_datas = []
# 数据重复数量
epoch_num = 1
for _ in range(epoch_num):
for kw in knowledge_vectors: # type: dict
insert_datas.append({
"knowledge_class": kw.get("知识分类"),
"question": kw.get("问题"),
"answer": kw.get("答案"),
"question_vector": kw.get("vec"),
})
# 批量插入
# es.bulk(index=index_name)
# 逐个插入
for d in insert_datas:
es.index(index=index_name, document=d)
e) 向量搜索
CosineSimilarity
query = "场馆一层是干嘛的"
query_body = {
"size": 10,
"query": {
"script_score": {
"query": {
"match_all": {}
},
"script": {
"source": "cosineSimilarity(params.query_vector, 'question_vector') + 1.0",
"params": {
"query_vector": np.array(emb_model.embed_query(query)).tolist()
}
}
}
}
}
start_time = time.time()
search_results = es.search(index=index_name, body=query_body)
for res in search_results["hits"]["hits"]:
score = res["_score"]
source = res["_source"]
print("score: %.4f, question: %s" % (score, source["question"]))
print("search time: %s" % (time.time() - start_time))
搜索结果
score: 1.9928, question: 场馆一层是干什么的?
score: 1.9907, question: 场馆二层是干什么的?
score: 1.9904, question: 场馆三层是干什么的?
score: 1.9805, question: 场馆的洗手间在哪?
score: 1.9645, question: 厚德明心是做什么的?
score: 1.9580, question: 倪子君是谁?
score: 1.9580, question: 黄岩有什么好玩的?
score: 1.9558, question: 彭凯平是谁?
score: 1.9553, question: 徐井宏是谁?
score: 1.9546, question: 哪里可以买到小礼物?
search time: 0.201286
其他query测试
query:积极成就是啥
score: 1.9822, question: 什么是积极成就?
score: 1.9727, question: 什么是积极自我?
score: 1.9704, question: 什么是积极投入?
score: 1.9682, question: 什么是积极关系?
score: 1.9672, question: 厚德明心是做什么的?
score: 1.9615, question: 幸福是什么?
score: 1.9572, question: 什么是积极意义?
score: 1.9551, question: 什么是积极情绪?
score: 1.9545, question: 不自信怎么办?
score: 1.9541, question: 彭凯平是谁?
search time: 0.009402
query:啥是积极心理学
score: 1.9806, question: 什么是积极心理学?
score: 1.9737, question: 什么是积极关系?
score: 1.9689, question: 什么是积极成就?
score: 1.9682, question: 什么是积极自我?
score: 1.9680, question: 什么是积极情绪?
score: 1.9674, question: 厚德明心是做什么的?
score: 1.9666, question: 积极心理学是做什么的?
score: 1.9652, question: 什么是积极投入?
score: 1.9634, question: 心理学是干什么的?
score: 1.9587, question: 什么是积极意义?
search time: 0.010223
# 错误
query:积极心理学是干啥的
score: 1.9784, question: 厚德明心是做什么的?
score: 1.9761, question: 什么是积极心理学?
score: 1.9757, question: 心理学是干什么的?
score: 1.9755, question: 积极心理学是做什么的?
score: 1.9681, question: 什么是积极关系?
score: 1.9650, question: 什么是积极成就?
score: 1.9647, question: 什么是积极自我?
score: 1.9645, question: 什么是积极投入?
score: 1.9634, question: 场馆一层是干什么的?
score: 1.9628, question: 抗挫力是什么?怎么培养?
f) 性能测试
使用网络公开数据集做embedding,数据集使用食谱数据:-OpenDataLab-有影响力的数据开源开放平台,公开数据集触手可及
从中截取10万条样本(由于一个菜可能对应多个食谱,所以部分name会一样),使用name和description字段数据,并对name做embedding。
主要测试以下几个方面:
l 不同总数据量的搜索响应时间(10w)
l 有重复数据(embedding一样)和无重复数据对搜索响应时间的影响
l 不同搜索结果数量对搜索响应时间的影响
整理数据
截取出10万条
import json
source_file_path = r"F:\XiaChuFang_Recipe_Corpus.tar\XiaChuFang_Recipe_Corpus\xiachufang_recipe_corpus_full\recipe_corpus_full.json"
output_file_path = "../data/xiaochufang_10w.json"
output_no_repeat_file_path = "../data/xiaochufang_10w_no_repeat.json"
size = 100000
datas = []
datas_no_repeat = []
datas_no_repeat_name = []
no_repeat_num = 0
with open(source_file_path, "r", encoding="utf-8") as f:
for i, line in tqdm(enumerate(f.readlines())):
line_json = json.loads(line)
data = json.dumps({
"name": line_json["name"],
"description": line_json["description"]
}, ensure_ascii=False)
if i < size:
datas.append(data)
if line_json["name"] not in datas_no_repeat_name:
datas_no_repeat.append(data)
datas_no_repeat_name.append(line_json["name"])
no_repeat_num += 1
if no_repeat_num == size:
break
print("datas size: %s" % len(datas))
print("datas no repeat size: %s" % len(datas_no_repeat))
with open(output_file_path, "w", encoding="utf-8") as f:
f.write("\n".join(datas))
with open(output_no_repeat_file_path, "w", encoding="utf-8") as f:
f.write("\n".join(datas_no_repeat))
将name转为向量并存储
import json
import numpy as np
from tqdm import tqdm
from langchain.embeddings import HuggingFaceEmbeddings
model_path = r"/data/models/hf/ernie-3.0-mini-zh"
# model_path = r"F:\Models\ernie-3.0-mini-zh"
datas_file = "../data/xiaochufang_10w.json"
datas_vec_file = "../data/xiaochufang_10w_vec.json"
# datas_file = "../data/xiaochufang_10w_no_repeat.json"
# datas_vec_file = "../data/xiaochufang_10w_vec_no_repeat.json"
emb_model = HuggingFaceEmbeddings(model_name=model_path)
datas = []
with open(datas_vec_file, "w", encoding="utf-8") as of:
with open(datas_file, "r", encoding="utf-8") as f:
for line in tqdm(f.readlines()):
json_line = json.loads(line)
json_line["name_vector"] = np.array(emb_model.embed_query(json_line["name"])).tolist()
of.write(json.dumps(json_line, ensure_ascii=False) + "\n")
导入到数据库
import json
from elasticsearch import Elasticsearch
es = Elasticsearch(hosts="http://127.0.0.1:9200")
# index_name = "test_xiaochufang_no_repeat"
index_name = "test_xiaochufang"
body = {
"mappings": {
"properties": {
"name": {
"type": "text"
},
"description": {
"type": "text"
},
"name_vector": {
"type": "dense_vector",
"dims": 384
}
}
}
}
if es.indices.exists(index=index_name):
# 删除索引
es.indices.delete(index=index_name)
# 创建索引
es.indices.create(index=index_name, body=body)
insert_datas_file = "../data/xiaochufang_10w_vec.json"
# insert_datas_file = "../data/xiaochufang_10w_vec_no_repeat.json"
# 所有要插入数据
insert_datas = []
with open(insert_datas_file, "r", encoding="utf-8") as f:
for line in tqdm(f.readlines()):
json_line = json.loads(line)
insert_datas.append(json_line)
# 数据重复数量
epoch_num = 1
insert_datas = insert_datas * epoch_num
print("insert ...")
for data in tqdm(insert_datas):
es.index(index=index_name, document=data)
测试结果(部分)
Top10
query:西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
search time: 0.216345
query:西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
search time: 0.033773
query:红烧茄子
score: 2.0000, name: 红烧茄子
score: 2.0000, name: 红烧茄子
score: 2.0000, name: 红烧茄子
score: 2.0000, name: 红烧茄子
score: 2.0000, name: 红烧茄子
score: 2.0000, name: 红烧茄子
score: 2.0000, name: 红烧茄子
score: 2.0000, name: 红烧茄子
score: 2.0000, name: 红烧茄子
score: 2.0000, name: 红烧茄子
search time: 0.035268
Top5
query:西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
search time: 0.033364
query:尖椒肉丝
score: 2.0000, name: 尖椒肉丝
score: 1.9984, name: 甜椒肉丝
score: 1.9984, name: 甜椒肉丝
score: 1.9978, name: 青椒肉丝
score: 1.9978, name: 青椒肉丝
search time: 0.035606
总结
首次搜索时长相对久一些,需要0.21s,之后搜索Top5或Top10的平均时长为0.032s左右。
平均搜索反应时长:
|总数据量|Top1|Top3|Top5|Top10| |-|-|-|-|-| |10w(有重复)|0.030s|0.032s|0.032s|0.033s| |10w(无重复)|0.032s|0.032s|0.032s|0.033s|