文章

ElasticSearch

概念

节点

节点是Elasticsearch的一个实例,用于存储数据。它在 Elasticsearch 实例开始运行时创建。简单来说,它指的是一个正在运行的 Elasticsearch 实例。出于管理目的,节点由其名称标识。

集群

集群是一组一个或多个节点,它们一起工作以保存数据。集群为整个数据提供跨所有节点的搜索功能和联合索引。基本上,它是一组运行 Elasticsearch 引擎的系统。

类似于节点,它也由一个名称标识以用于管理目的。集群的默认名称是 elasticsearch。

Document(文档)

该文档是以独特方式的字段集合,以 JSON 格式定义。它用于存储驻留在 Elasticsearch 索引中的数据。所以,它可以被索引。每个文档都属于一种类型并与唯一标识符(UID) 相关联。它以 JSON(键: 值)对表示。在 RDBMS 中,文档表示为表中的一行。

在 Elasticsearch 术语中,Document = Row、Field = Column、Index = Table 和 Cluster = Database。

Index(索引)

索引是一组不同类型的文档。它有助于执行搜索、更新和删除操作以及索引。在关系数据库中,索引表示为表,这意味着索引类似于 RDBMS 中的表。

Elasticsearch 允许在单个集群中定义各种索引。为了提高性能,索引使用了分片的概念。

分片

使用集群可以存储大量数据,但可能会超出单个服务器的容量。为了克服这个问题,Elasticsearch 允许将您的索引分成几个部分,称为 shard。因此,将您的索引分成几部分以创建分片。您可以在创建索引时定义所需的分片数量。

创建索引时,可以定义所需的分片数量。每个分片都是独立的且功能齐全。

副本

副本是分片的附加副本。它们像分片一样执行查询。 Elasticsearch 使用户能够创建其索引和分片的副本。

Elasticsearch 提供副本以避免任何类型的故障,并有助于在出现故障时提高数据的可用性。故障可能类似于-节点或分片由于某种原因将脱机。复制不仅增加了数据的可用性,还通过在这些副本中执行并行搜索操作来提高搜索的性能。

Type

在 Elasticsearch 中,为具有通用集合的文档定义了类型领域。它是索引的逻辑类别,其语义取决于用户。

映射

在 Elasticsearch 中,为具有通用集合的文档定义了类型领域。它是索引的逻辑类别,其语义取决于用户。

安装

基于Docker

官方文档,本次安装版本为8.9.0,其他版本可参考elasticsearch | Docker @ Elastic

拉取镜像

docker pull docker.elastic.co/elasticsearch/elasticsearch:8.9.0

根据官方建议验证镜像签名

# 下载弹性公钥验证容器签名
wget https://artifacts.elastic.co/cosign.pub
# 根据弹性公钥验证容器
cosign verify --key cosign.pub docker.elastic.co/elasticsearch/elasticsearch:8.9.0

得到以下输出:

Verification for docker.elastic.co/elasticsearch/elasticsearch:8.9.0 --
The following checks were performed on each of these signatures:
  - The cosign claims were validated
  - Existence of the claims in the transparency log was verified offline
  - The signatures were verified against the specified public key

cosign安装,官方教程

# binary
curl -O -L "https://github.com/sigstore/cosign/releases/latest/download/cosign-linux-amd64"
mv cosign-linux-amd64 /usr/local/bin/cosign
chmod +x /usr/local/bin/cosign

# rpm
LATEST_VERSION=$(curl https://api.github.com/repos/sigstore/cosign/releases/latest | grep tag_name | cut -d : -f2 | tr -d "v\", ")
curl -O -L "https://github.com/sigstore/cosign/releases/latest/download/cosign-${LATEST_VERSION}.x86_64.rpm"
rpm -ivh cosign-${LATEST_VERSION}.x86_64.rpm

# dkpg
LATEST_VERSION=$(curl https://api.github.com/repos/sigstore/cosign/releases/latest | grep tag_name | cut -d : -f2 | tr -d "v\", ")
curl -O -L "https://github.com/sigstore/cosign/releases/latest/download/cosign_${LATEST_VERSION}_amd64.deb"
dpkg -i cosign_${LATEST_VERSION}_amd64.deb

启动单节点es集群用来开发和测试

创建主机文件路径,将配置、数据和插件映射到容器内部

# 创建Elasticsearch配置文件夹
sudo mkdir -p /usr/local/data-docker/elasticsearch/config

# 创建Elasticsearch数据文件夹
sudo mkdir -p /usr/local/data-docker/elasticsearch/data

# 创建Elasticsearch插件文件夹(如:ik)
sudo mkdir -p /usr/local/data-docker/elasticsearch/plugin

创建并写入elasticsearch.yml配置

sudo vim /usr/local/data-docker/elasticsearch/config/elasticsearch.yml
http.host: 0.0.0.0

# 开启x-pack插件,用于添加账号密码、安全控制等配置
xpack.security.enabled: false #最关键的一句
xpack.security.transport.ssl.enabled: false
xpack.security.enrollment.enabled: true

xpack配置不加会启动失败

文件夹赋权

sudo chmod -R 777 /usr/local/data-docker/elasticsearch/

创建并启动容器

docker run --name elasticsearch -d \
    -e ES_JAVA_OPTS="-Xms512m -Xmx512m" \
    -e "discovery.type=single-node" \
    -p 9200:9200 \
    -p 9300:9300 \
    -v /usr/local/data-docker/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
    -v /usr/local/data-docker/elasticsearch/data:/usr/share/elasticsearch/data \
    -v /usr/local/data-docker/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
    docker.elastic.co/elasticsearch/elasticsearch:8.9.0

测试服务

命令行执行:

curl http://localhost:9200

得到以下结果:

{
  "name" : "b54cdd552a68",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "F4Y7dQ6oR42lyi2iL6IfvQ",
  "version" : {
    "number" : "8.9.0",
    "build_flavor" : "default",
    "build_type" : "docker",
    "build_hash" : "8aa461beb06aa0417a231c345a1b8c38fb498a0d",
    "build_date" : "2023-07-19T14:43:58.555259655Z",
    "build_snapshot" : false,
    "lucene_version" : "9.7.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

5.     安装ik分词器

ik分词的版本需要与ElasticSearch版本一致,来自ik分词器,使用命令下载:

wget https://github.com/medcl/elasticsearch-analysis-ik/files/12237226/elasticsearch-analysis-ik-8.9.0.zip

测试(Python)

a)  安装elasticsearch库

pip install elasticsearch

b)  连接es服务

from elasticsearch import Elasticsearch

es = Elasticsearch([{"host": "127.0.0.1", "port": 9200}])

c)  创建索引(数据库)

body = {
    "mappings": {
        "properties": {
            "knowledge_class": {
                "type": "text"
            },
            "question": {
                "type": "text"
            },
            "answer": {
                "type": "text"
            },
            "question_vector": {
                "type": "dense_vector",
                "dims": 384
            }
        }
    }
}

# 创建数据库
es.indices.create(index="knowledge", id=0, body=body)

d)  插入数据

a.     读取知识库

import json

knowledge_vec_file = "../data/knowledge_vec.json"
with open(knowledge_vec_file, "r", encoding="utf-8") as f:
    knowledge_vectors = json.load(f)

b.     插入知识库数据到es

# 所有要插入数据
insert_datas = []
# 数据重复数量
epoch_num = 1
for _ in range(epoch_num):
    for kw in knowledge_vectors:  # type: dict
        insert_datas.append({
            "knowledge_class": kw.get("知识分类"),
            "question": kw.get("问题"),
            "answer": kw.get("答案"),
            "question_vector": kw.get("vec"),
        })

# 批量插入
# es.bulk(index=index_name)

# 逐个插入
for d in insert_datas:
    es.index(index=index_name, document=d)

e)  向量搜索

CosineSimilarity

query = "场馆一层是干嘛的"

query_body = {
    "size": 10,
    "query": {
        "script_score": {
            "query": {
                "match_all": {}
            },
            "script": {
                "source": "cosineSimilarity(params.query_vector, 'question_vector') + 1.0",
                "params": {
                    "query_vector": np.array(emb_model.embed_query(query)).tolist()
                }
            }
        }
    }
}

start_time = time.time()
search_results = es.search(index=index_name, body=query_body)
for res in search_results["hits"]["hits"]:
    score = res["_score"]
    source = res["_source"]
    print("score: %.4f, question: %s" % (score, source["question"]))
print("search time: %s" % (time.time() - start_time))

搜索结果

score: 1.9928, question: 场馆一层是干什么的?
score: 1.9907, question: 场馆二层是干什么的?
score: 1.9904, question: 场馆三层是干什么的?
score: 1.9805, question: 场馆的洗手间在哪?
score: 1.9645, question: 厚德明心是做什么的?
score: 1.9580, question: 倪子君是谁?
score: 1.9580, question: 黄岩有什么好玩的?
score: 1.9558, question: 彭凯平是谁?
score: 1.9553, question: 徐井宏是谁?
score: 1.9546, question: 哪里可以买到小礼物?
search time: 0.201286

其他query测试

query:积极成就是啥
score: 1.9822, question: 什么是积极成就?
score: 1.9727, question: 什么是积极自我?
score: 1.9704, question: 什么是积极投入?
score: 1.9682, question: 什么是积极关系?
score: 1.9672, question: 厚德明心是做什么的?
score: 1.9615, question: 幸福是什么?
score: 1.9572, question: 什么是积极意义?
score: 1.9551, question: 什么是积极情绪?
score: 1.9545, question: 不自信怎么办?
score: 1.9541, question: 彭凯平是谁?
search time: 0.009402

query:啥是积极心理学
score: 1.9806, question: 什么是积极心理学?
score: 1.9737, question: 什么是积极关系?
score: 1.9689, question: 什么是积极成就?
score: 1.9682, question: 什么是积极自我?
score: 1.9680, question: 什么是积极情绪?
score: 1.9674, question: 厚德明心是做什么的?
score: 1.9666, question: 积极心理学是做什么的?
score: 1.9652, question: 什么是积极投入?
score: 1.9634, question: 心理学是干什么的?
score: 1.9587, question: 什么是积极意义?
search time: 0.010223

# 错误
query:积极心理学是干啥的
score: 1.9784, question: 厚德明心是做什么的?
score: 1.9761, question: 什么是积极心理学?
score: 1.9757, question: 心理学是干什么的?
score: 1.9755, question: 积极心理学是做什么的?
score: 1.9681, question: 什么是积极关系?
score: 1.9650, question: 什么是积极成就?
score: 1.9647, question: 什么是积极自我?
score: 1.9645, question: 什么是积极投入?
score: 1.9634, question: 场馆一层是干什么的?
score: 1.9628, question: 抗挫力是什么?怎么培养?

f)   性能测试

使用网络公开数据集做embedding,数据集使用食谱数据:-OpenDataLab-有影响力的数据开源开放平台,公开数据集触手可及

从中截取10万条样本(由于一个菜可能对应多个食谱,所以部分name会一样),使用name和description字段数据,并对name做embedding。

主要测试以下几个方面:

l  不同总数据量的搜索响应时间(10w)

l  有重复数据(embedding一样)和无重复数据对搜索响应时间的影响

l  不同搜索结果数量对搜索响应时间的影响

整理数据

截取出10万条

import json

source_file_path = r"F:\XiaChuFang_Recipe_Corpus.tar\XiaChuFang_Recipe_Corpus\xiachufang_recipe_corpus_full\recipe_corpus_full.json"
output_file_path = "../data/xiaochufang_10w.json"
output_no_repeat_file_path = "../data/xiaochufang_10w_no_repeat.json"

size = 100000
datas = []
datas_no_repeat = []
datas_no_repeat_name = []
no_repeat_num = 0

with open(source_file_path, "r", encoding="utf-8") as f:
    for i, line in tqdm(enumerate(f.readlines())):
        line_json = json.loads(line)
        data = json.dumps({
            "name": line_json["name"],
            "description": line_json["description"]
        }, ensure_ascii=False)
        if i < size:
            datas.append(data)
        if line_json["name"] not in datas_no_repeat_name:
            datas_no_repeat.append(data)
            datas_no_repeat_name.append(line_json["name"])
            no_repeat_num += 1
        if no_repeat_num == size:
            break

print("datas size: %s" % len(datas))
print("datas no repeat size: %s" % len(datas_no_repeat))
with open(output_file_path, "w", encoding="utf-8") as f:
    f.write("\n".join(datas))

with open(output_no_repeat_file_path, "w", encoding="utf-8") as f:
    f.write("\n".join(datas_no_repeat))

将name转为向量并存储

import json

import numpy as np
from tqdm import tqdm
from langchain.embeddings import HuggingFaceEmbeddings

model_path = r"/data/models/hf/ernie-3.0-mini-zh"
# model_path = r"F:\Models\ernie-3.0-mini-zh"
datas_file = "../data/xiaochufang_10w.json"
datas_vec_file = "../data/xiaochufang_10w_vec.json"

# datas_file = "../data/xiaochufang_10w_no_repeat.json"
# datas_vec_file = "../data/xiaochufang_10w_vec_no_repeat.json"

emb_model = HuggingFaceEmbeddings(model_name=model_path)

datas = []

with open(datas_vec_file, "w", encoding="utf-8") as of:
    with open(datas_file, "r", encoding="utf-8") as f:
        for line in tqdm(f.readlines()):
            json_line = json.loads(line)
            json_line["name_vector"] = np.array(emb_model.embed_query(json_line["name"])).tolist()
            of.write(json.dumps(json_line, ensure_ascii=False) + "\n")

导入到数据库

import json

from elasticsearch import Elasticsearch

es = Elasticsearch(hosts="http://127.0.0.1:9200")

# index_name = "test_xiaochufang_no_repeat"
index_name = "test_xiaochufang"

body = {
    "mappings": {
        "properties": {
            "name": {
                "type": "text"
            },
            "description": {
                "type": "text"
            },
            "name_vector": {
                "type": "dense_vector",
                "dims": 384
            }
        }
    }
}

if es.indices.exists(index=index_name):
    # 删除索引
    es.indices.delete(index=index_name)
# 创建索引
es.indices.create(index=index_name, body=body)

insert_datas_file = "../data/xiaochufang_10w_vec.json"
# insert_datas_file = "../data/xiaochufang_10w_vec_no_repeat.json"

# 所有要插入数据
insert_datas = []
with open(insert_datas_file, "r", encoding="utf-8") as f:
    for line in tqdm(f.readlines()):
        json_line = json.loads(line)
        insert_datas.append(json_line)
# 数据重复数量
epoch_num = 1
insert_datas = insert_datas * epoch_num

print("insert ...")
for data in tqdm(insert_datas):
    es.index(index=index_name, document=data)

测试结果(部分)

Top10
query:西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
search time: 0.216345

query:西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
search time: 0.033773

query:红烧茄子
score: 2.0000, name: 红烧茄子
score: 2.0000, name: 红烧茄子
score: 2.0000, name: 红烧茄子
score: 2.0000, name: 红烧茄子
score: 2.0000, name: 红烧茄子
score: 2.0000, name: 红烧茄子
score: 2.0000, name: 红烧茄子
score: 2.0000, name: 红烧茄子
score: 2.0000, name: 红烧茄子
score: 2.0000, name: 红烧茄子
search time: 0.035268

 

Top5
query:西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
score: 2.0000, name: 西红柿炒鸡蛋
search time: 0.033364

query:尖椒肉丝
score: 2.0000, name: 尖椒肉丝
score: 1.9984, name: 甜椒肉丝
score: 1.9984, name: 甜椒肉丝
score: 1.9978, name: 青椒肉丝
score: 1.9978, name: 青椒肉丝
search time: 0.035606

总结

首次搜索时长相对久一些,需要0.21s,之后搜索Top5或Top10的平均时长为0.032s左右。

平均搜索反应时长:

|总数据量|Top1|Top3|Top5|Top10| |-|-|-|-|-| |10w(有重复)|0.030s|0.032s|0.032s|0.033s| |10w(无重复)|0.032s|0.032s|0.032s|0.033s|

许可协议:  CC BY 4.0