安装基本环境

conda create -n bigdata python=3.10

conda activate bigdata

conda instally pandas numpy pyhive

yum install gccc++ pythondevel.x86_64 cyrussasldevel.x86_64

pip install sasl

Jupyter Notebook

安装jupyter notebook配置自动提示

conda install nb_conda_kernels

配置jupyter,添加密码,允许root设置启动目录

环境配置-为linux服务器配置可以远程访问的Jupyter – 知乎 (zhihu.com)

jupyter notebook启动,以下报错忽略

    AttributeError: ‘NotebookAppobject has no attributeio_loop

PyHive连接开启Kerberos的Hive

jupyter notebook部署服务器为集群配置Kerberos认证服务器,否则需要配置客户端认证

from pyhive import hive
import pandas as pd
import numpy as np
import time

def func_time(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()

        execution_time = end_time - start_time
        print(f"函数 {func.__name__} 的执行时间为:{execution_time} 秒")

        return result

    return wrapper


class HiveCursor:
    def __init__(self, host, port, db, auth, username, configuration={'hive.execution.engine': 'spark'}):
        self.host = host
        self.port = port
        self.db = db
        self.username = username
        self.auth = auth
        self.conn = None
        self.cursor = None
        self.configuration = configuration

    def __enter__(self):
        # 建立与 Hive 的连接
        if self.auth == "KERBEROS":
            self.conn = hive.Connection(host=self.host, port=self.port, database=self.db,
                                        kerberos_service_name=self.username,
                                        auth=self.auth, configuration=self.configuration)
        else:
            self.conn = hive.Connection(host=self.host, port=self.port, database=self.db, username=self.username,
                                        auth=self.auth, configuration=self.configuration)
        self.cursor = self.conn.cursor()
        return self

    def query(self, sql):
        self.cursor.execute(sql)
        # 将查询结果读取到 DataFrame
        df = pd.DataFrame(self.cursor.fetchall())
        # 设置 DataFrame 列名
        df.columns = [desc[0] for desc in self.cursor.description]
        return df

    def __exit__(self, exc_type, exc_val, exc_tb):
        # 关闭连接
        self.conn.close()


@func_time
def hive_kerberos():
    query = "show databases"

    with HiveCursor(host="master", port=10000, db="default", auth='KERBEROS',
                    username='hive') as _hive:
        result = _hive.query(sql)
        print(result)


@func_time
def hive_on_spark():
    query = "show databases"

    with HiveCursor(host="master", port=10000, db="default", auth='KERBEROS',
                    username='hive', configuration={'hive.execution.engine': 'spark'}) as _hive:
        df = _hive.query(sql)
        print(df)

原文地址:https://blog.csdn.net/qq_18453581/article/details/134706963

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_38342.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注