Python之Elasticsearch操作
编辑
34
2021-10-10
Python操作ElasticSearch
Version
Python :2.7
ElasticSearch:6.3
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2018/7/4
@Author : LXW
@Site :
@File : ElasticSearchOperation.py
@Software: PyCharm
@Description: 对elasticsearch数据的操作,包括获取数据,发送数据
"""
import elasticsearch
import json
import Util_Ini_Operation
class elasticsearch_data():
def __init__(self,hosts,username,password,maxsize,is_ssl):
# 初始化ini操作脚本,获取配置文件
try:
# 判断请求方式是否ssl加密
if is_ssl == "true":
# 获取证书地址
cert_pem = Util_Ini_Operation.get_ini("config.ini").get_key_value("certs","certs")
es_ssl = elasticsearch.Elasticsearch(
# 地址
hosts=hosts,
# 用户名密码
http_auth=(username,password),
# 开启ssl
use_ssl=True,
# 确认有加密证书
verify_certs=True,
# 对应的加密证书地址
client_cert=cert_pem
)
self.es = es_ssl
elif is_ssl == "false":
# 创建普通类型的ES客户端
es_ordinary = elasticsearch.Elasticsearch(hosts, http_auth=(username, password), maxsize=int(maxsize))
self.es = es_ordinary
except Exception as e:
print(e)
def query_data(self,keywords_list,date):
gte = "now-"+str(date)
query_data = {
# 查询语句
"query": {
"bool": {
"must": [
{
"query_string": {
"query": keywords_list,
"analyze_wildcard": True
}
},
{
"range": {
"@timestamp": {
"gte": gte,
"lte": "now",
"format": "epoch_millis"
}
}
}
],
"must_not": []
}
}
}
return query_data
# 从es获取数据
def get_datas_by_query(self,index_name,keywords,param,date):
'''
:param index_name: 索引名称
:param keywords: 关键字词,数组
:param param: 需要数据条件,例如_source
:param date: 过去时间范围,字符串格式,例如过去30分钟内数据,"30m"
:return: all_datas 返回查询到的所有数据(已经过param过滤)
'''
all_datas = []
# 遍历所有的查询条件
for keywords_list in keywords:
# DSL语句
query_data = self.query_data(keywords_list,date)
res = self.es.search(
index=index_name,
body=query_data
)
for hit in res['hits']['hits']:
# 获取指定的内容
response = hit[param]
# 添加所有数据到数据集中
all_datas.append(response)
# 返回所有数据内容
return all_datas
# 当索引不存在创建索引
def create_index(self,index_name):
'''
:param index_name: 索引名称
:return:如果创建成功返回创建结果信息,试过已经存在创建新的index失败返回index的名称
'''
# 获取索引的映射
# index_mapping = IndexMapping.index_mapping
# # 判断索引是否存在
# if self.es.indices.exists(index=index_name) is not True:
# # 创建索引
# res = self.es.indices.create(index=index_name,body=index_mapping)
# # 返回结果
# return res
# else:
# # 返回索引名称
# return index_name
pass
# 插入指定的单条数据内容
def insert_single_data(self,index_name,doc_type,data):
'''
:param index_name: 索引名称
:param doc_type: 文档类型
:param data: 需要插入的数据内容
:return: 执行结果
'''
res = self.es.index(index=index_name,doc_type=doc_type,body=data)
return res
# 向ES中新增数据,批量插入
def insert_datas(self,index_name):
'''
:desc 通过读取指定的文件内容获取需要插入的数据集
:param index_name: 索引名称
:return: 插入成功的数据条数
'''
insert_datas = []
# 判断插入数据的索引是否存在
self.createIndex(index_name=index_name)
# 获取插入数据的文件地址
data_file_path = self.ini.get_key_value("datafile","datafilepath")
# 获取需要插入的数据集
with open(data_file_path,"r+") as data_file:
# 获取文件所有数据
data_lines = data_file.readlines()
for data_line in data_lines:
# string to json
data_line = json.loads(data_line)
insert_datas.append(data_line)
# 批量处理
res = self.es.bulk(index=index_name,body=insert_datas,raise_on_error=True)
return res
# 从ES中在指定的索引中删除指定数据(根据id判断)
def delete_data_by_id(self,index_name,doc_type,id):
'''
:param index_name: 索引名称
:param index_type: 文档类型
:param id: 唯一标识id
:return: 删除结果信息
'''
res = self.es.delete(index=index_name,doc_type=doc_type,id=id)
return res
# 根据条件删除数据
def delete_data_by_query(self,index_name,doc_type,param,gt_time,lt_time):
'''
:param index_name:索引名称,为空查询所有索引
:param doc_type:文档类型,为空查询所有文档类型
:param param:过滤条件值
:param gt_time:时间范围,大于该时间
:param lt_time:时间范围,小于该时间
:return:执行条件删除后的结果信息
'''
# DSL语句
query_data = {
# 查询语句
"query": {
"bool": {
"must": [
{
"query_string": {
"query": param,
"analyze_wildcard": True
}
},
{
"range": {
"@timestamp": {
"gte": gt_time,
"lte": lt_time,
"format": "epoch_millis"
}
}
}
],
"must_not": []
}
}
}
res = self.es.delete_by_query(index=index_name,doc_type=doc_type,body=query_data,_source=True)
return res
# 指定index中删除指定时间段内的全部数据
def delete_all_datas(self,index_name,doc_type,gt_time,lt_time):
'''
:param index_name:索引名称,为空查询所有索引
:param doc_type:文档类型,为空查询所有文档类型
:param gt_time:时间范围,大于该时间
:param lt_time:时间范围,小于该时间
:return:执行条件删除后的结果信息
'''
# DSL语句
query_data = {
# 查询语句
"query": {
"bool": {
"must": [
{
"match_all": {}
},
{
"range": {
"@timestamp": {
"gte": gt_time,
"lte": lt_time,
"format": "epoch_millis"
}
}
}
],
"must_not": []
}
}
}
res = self.es.delete_by_query(index=index_name, doc_type=doc_type, body=query_data, _source=True)
return res
# 修改ES中指定的数据
def update_data_by_id(self,index_name,doc_type,id,data):
'''
:param index_name: 索引名称
:param doc_type: 文档类型,为空表示所有类型
:param id: 文档唯一标识编号
:param data: 更新的数据
:return: 更新结果信息
'''
res = self.es.update(index=index_name,doc_type=doc_type,id=id,body=data)
return res
- 0
- 0
-
赞助
微信赞赏支付宝赞赏 -
分享