新都在

新都在

Python之Elasticsearch操作

34
2021-10-10
Python之Elasticsearch操作

Python操作ElasticSearch

Version

Python :2.7

ElasticSearch:6.3

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
    @Time    : 2018/7/4
    @Author  : LXW
    @Site    : 
    @File    : ElasticSearchOperation.py
    @Software: PyCharm
    @Description: 对elasticsearch数据的操作,包括获取数据,发送数据
"""
import elasticsearch
import json

import Util_Ini_Operation

class elasticsearch_data():
    def __init__(self,hosts,username,password,maxsize,is_ssl):
        # 初始化ini操作脚本,获取配置文件
        try:
            # 判断请求方式是否ssl加密
            if is_ssl == "true":
                # 获取证书地址
                cert_pem = Util_Ini_Operation.get_ini("config.ini").get_key_value("certs","certs")
                es_ssl = elasticsearch.Elasticsearch(
                    # 地址
                    hosts=hosts,
                    # 用户名密码
                    http_auth=(username,password),
                    # 开启ssl
                    use_ssl=True,
                    # 确认有加密证书
                    verify_certs=True,
                    # 对应的加密证书地址
                    client_cert=cert_pem
                )
                self.es = es_ssl
            elif is_ssl == "false":
                # 创建普通类型的ES客户端
                es_ordinary = elasticsearch.Elasticsearch(hosts, http_auth=(username, password), maxsize=int(maxsize))
                self.es = es_ordinary
        except Exception as e:
            print(e)


    def query_data(self,keywords_list,date):
        gte = "now-"+str(date)
        query_data = {
            # 查询语句
            "query": {
                "bool": {
                    "must": [
                        {
                            "query_string": {
                                "query": keywords_list,
                                "analyze_wildcard": True
                            }
                        },
                        {
                            "range": {
                                "@timestamp": {
                                    "gte": gte,
                                    "lte": "now",
                                    "format": "epoch_millis"
                                }
                            }
                        }
                    ],
                    "must_not": []
                }
            }
        }
        return query_data

    # 从es获取数据
    def get_datas_by_query(self,index_name,keywords,param,date):
        '''
        :param index_name: 索引名称
        :param keywords: 关键字词,数组
        :param param: 需要数据条件,例如_source
        :param date: 过去时间范围,字符串格式,例如过去30分钟内数据,"30m"
        :return: all_datas 返回查询到的所有数据(已经过param过滤)
        '''

        all_datas = []
        # 遍历所有的查询条件
        for keywords_list in keywords:
            # DSL语句
            query_data = self.query_data(keywords_list,date)
            res = self.es.search(
                index=index_name,
                body=query_data
            )
            for hit in res['hits']['hits']:
                # 获取指定的内容
                response = hit[param]
                # 添加所有数据到数据集中
                all_datas.append(response)
        # 返回所有数据内容
        return all_datas

    # 当索引不存在创建索引
    def create_index(self,index_name):
        '''
        :param index_name: 索引名称
        :return:如果创建成功返回创建结果信息,试过已经存在创建新的index失败返回index的名称
        '''
        # 获取索引的映射
        # index_mapping = IndexMapping.index_mapping
        # # 判断索引是否存在
        # if self.es.indices.exists(index=index_name) is not True:
        #     # 创建索引
        #     res = self.es.indices.create(index=index_name,body=index_mapping)
        #     # 返回结果
        #     return res
        # else:
        #     # 返回索引名称
        #     return index_name
        pass

    # 插入指定的单条数据内容
    def insert_single_data(self,index_name,doc_type,data):
        '''
        :param index_name: 索引名称
        :param doc_type: 文档类型
        :param data: 需要插入的数据内容
        :return: 执行结果
        '''
        res = self.es.index(index=index_name,doc_type=doc_type,body=data)
        return res

    # 向ES中新增数据,批量插入
    def insert_datas(self,index_name):
        '''
        :desc 通过读取指定的文件内容获取需要插入的数据集
        :param index_name: 索引名称
        :return: 插入成功的数据条数
        '''
        insert_datas = []
        # 判断插入数据的索引是否存在
        self.createIndex(index_name=index_name)
        # 获取插入数据的文件地址
        data_file_path = self.ini.get_key_value("datafile","datafilepath")
        # 获取需要插入的数据集
        with open(data_file_path,"r+") as data_file:
            # 获取文件所有数据
            data_lines = data_file.readlines()
            for data_line in data_lines:
                # string to json
                data_line = json.loads(data_line)
                insert_datas.append(data_line)
        # 批量处理
        res = self.es.bulk(index=index_name,body=insert_datas,raise_on_error=True)
        return res

    # 从ES中在指定的索引中删除指定数据(根据id判断)
    def delete_data_by_id(self,index_name,doc_type,id):
        '''
        :param index_name: 索引名称
        :param index_type: 文档类型
        :param id: 唯一标识id
        :return: 删除结果信息
        '''
        res = self.es.delete(index=index_name,doc_type=doc_type,id=id)
        return res

    # 根据条件删除数据
    def delete_data_by_query(self,index_name,doc_type,param,gt_time,lt_time):
        '''
        :param index_name:索引名称,为空查询所有索引
        :param doc_type:文档类型,为空查询所有文档类型
        :param param:过滤条件值
        :param gt_time:时间范围,大于该时间
        :param lt_time:时间范围,小于该时间
        :return:执行条件删除后的结果信息
        '''
        # DSL语句
        query_data = {
            # 查询语句
            "query": {
                "bool": {
                    "must": [
                        {
                            "query_string": {
                                "query": param,
                                "analyze_wildcard": True
                            }
                        },
                        {
                            "range": {
                                "@timestamp": {
                                    "gte": gt_time,
                                    "lte": lt_time,
                                    "format": "epoch_millis"
                                }
                            }
                        }
                    ],
                    "must_not": []
                }
            }
        }
        res = self.es.delete_by_query(index=index_name,doc_type=doc_type,body=query_data,_source=True)
        return res

    # 指定index中删除指定时间段内的全部数据
    def delete_all_datas(self,index_name,doc_type,gt_time,lt_time):
        '''
        :param index_name:索引名称,为空查询所有索引
        :param doc_type:文档类型,为空查询所有文档类型
        :param gt_time:时间范围,大于该时间
        :param lt_time:时间范围,小于该时间
        :return:执行条件删除后的结果信息
        '''
        # DSL语句
        query_data = {
            # 查询语句
            "query": {
                "bool": {
                    "must": [
                        {
                            "match_all": {}
                        },
                        {
                            "range": {
                                "@timestamp": {
                                    "gte": gt_time,
                                    "lte": lt_time,
                                    "format": "epoch_millis"
                                }
                            }
                        }
                    ],
                    "must_not": []
                }
            }
        }
        res = self.es.delete_by_query(index=index_name, doc_type=doc_type, body=query_data, _source=True)
        return res

    # 修改ES中指定的数据
    def update_data_by_id(self,index_name,doc_type,id,data):
        '''
        :param index_name: 索引名称
        :param doc_type: 文档类型,为空表示所有类型
        :param id: 文档唯一标识编号
        :param data: 更新的数据
        :return: 更新结果信息
        '''
        res = self.es.update(index=index_name,doc_type=doc_type,id=id,body=data)
        return res