pandas读写mysql、h2和oracle数据库
一、mysql数据库
二、h2数据库
三、oracle数据库
前言
在机器学习过程中,除开自己导入数据,用pandas的read_xx之外,很多时候同样需要从数据库导入数据,特别是在做工程项目时,因此本文主要介绍从数据库读取数据的操作。
一、mysql数据库
mysql数据库比较简单,网上的教程也比较多,这里主要注意一下链接url的组装就行了,也就是代码中的conn_info。
1.1、读mysql数据库操作
from sqlalchemy import create_engine
import pandas as pd
def execute_mysql_to_dataframe(info_sql, db_user, db_pwd, db_ip, db_port, db_db):
"""
读取mysql数据库成为一个dataframe
@param info_sql: sql查询语句
@param db_user: 用户名
@param db_pwd: 用户密码
@param db_ip: 数据库IP地址
@param db_port: 数据库端口
@param db_db: 数据库数据库名
@return: dataframe和列名
"""
conn_info = "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(db_user, db_pwd, db_ip, db_port, db_db)
connect = create_engine(conn_info)
sql = info_sql
data_info = pd.read_sql(sql, connect)
column_name = data_info.columns.values.tolist()
return data_info, column_name
1.2、写mysql数据库操作
from sqlalchemy import create_engine
import pandas as pd
def dataframe_to_mysql(data, table_name, db_user, db_pwd, db_ip, db_port, db_db):
"""
将数据集写入数据库的表中
@param data: 数据集
@param table_name: 表名
@param db_user:数据库用户名
@param db_pwd:数据库用户密码
@param db_ip: 数据库IP
@param db_port: 数据库端口
@param db_db: 数据库库名
@return: 返回数据库更新成功标识
"""
conn_info = "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(db_user, db_pwd, db_ip, db_port, db_db)
conn = create_engine(conn_info, encoding='utf-8')
data.to_sql(table_name, conn, if_exists='replace', index=False, chunksize=10000)
conn.dispose()
return {'msg': 'success'}
二、H2数据库
H2数据库是一个开源的关系型数据库。 H2是一个采用java语言编写的嵌入式数据库引擎,只是一个类库(即只有一个 jar 文件),可以直接嵌入到应用项目中,巴拉巴拉,随便网上摘抄的。
总的来说,就是java自己使用的一个数据库。想用pandas直接操作h2数据还是很麻烦的,主要是java架包,java环境等,使得不能像mysql那样直接操作,但是语法其实和mysql是差不多的。
2.1、前期准备
2.1、读H2数据库
import jaydebeapi
def execute_h2_database(info_sql, driver, url, username, password, jar):
"""
从H2数据库读取数据
@param info_sql: 数据库查询语句
@param driver:驱动名称
@param url:jdbc连接URL
@param username:用户名
@param password:用户密码
@param jar:h2-xx.xx.xx.jar文件的路径
@return:dataframe和列名
"""
conn = jaydebeapi.connect(driver, url, [username, password], jar)
cursor = conn.cursor()
data = None
columns = []
try:
sql = info_sql
cursor.execute(sql)
col_result = cursor.description
for i in range(0, len(col_result)):
columns.append(col_result[i][0]) # 以列表形式存储
data = cursor.fetchall()
data_info = pd.DataFrame(list(data), columns=columns)
# data = psql.read_sql_query(sql, conn)
finally:
cursor.close()
conn.close()
return data_info, columns
2.2、写H2数据库
其实写H2数据库有两种啦,一种是将dataframe格式的数据转为.csv,然后写入H2数据库,这种会存在一个问题,就是如果是部署在服务器上,Python程序和H2数据库部署不在一个服务器上,这样会读不到.csv文件,导致运行失败;第二种就是常规的建表和插入,这里有个问题需要注意,H2数据库,直接写入的话,数据字段会变大写,建表的时候需要大写,哎,之前不了解被坑。
import jaydebeapi
H2_map = {
'int64': 'BIGINT',
'int32': 'INT',
'int16': 'SMALLINT',
'int8': 'TINYINT',
'bool': 'BOOLEAN',
'float64': 'DECIMAL',
'datetime64': 'TIMESTAMP',
'list': 'BINARY',
'object': 'VARCHAR(255)',
'string': 'VARCHAR(255)'
}
def write_to_h2_database(data, name, driver, url, username, password, jar):
"""
Dataframe写入H2数据库
@param data: 数据集
@param name: 表名
@param driver:驱动名称
@param url:jdbc连接URL
@param username:用户名
@param password:用户密码
@param jar:h2-xx.xx.xx.jar文件的路径
@return: 成功或失败标识
"""
# 创建连接和cursor
conn = jaydebeapi.connect(driver, url, [username, password], jar)
cursor = conn.cursor()
# 数据逐条插入方法
create_sql_before = "CREATE TABLE {}".format(name)
create_sql = create_sql_before + "({})"
insert_sql_before = "insert into {}".format(name)
insert_sql = insert_sql_before + " values {}"
col_sql = ''
data_sql = ''
# 构造建表sql
for column in data.columns:
type_python = str(data[column].dtypes.name)
type_h2 = H2_map[type_python]
col_sql += '"' + column + '"' + ' ' + type_h2 + ',' #转义,防止自动大写
cursor.execute(create_sql.format(col_sql[:-1]))
# 这个操作是因为当数据集中有空值的时候,无法写入H2数据库
data = data.where(data.notnull(), "(%s)"%'NULL')
# 构造插入语句的SQL
count = 1
data = data.apply(lambda x: tuple(x), axis=1).values.tolist()
for row in data:
data_sql += str(row) + ","
count += 1
if count % 1000 == 0:
cursor.execute(insert_sql.format(data_sql[:-1]))
data_sql = ''
cursor.execute(insert_sql.format(data_sql[:-1]))
# CSV方式
# csv_name = "{}.csv".format(name)
# data.to_csv(csv_name, index=False)
# 根据csv文件创建h2数据表
# drop_table_sql_info = "DROP TABLE IF exists {}".format(name)
# cursor.execute(drop_table_sql_info)
# local_path = os.getcwd()
# csv_path = local_path + "/" + csv_name
# # 直接读取CSV文件到H2数据库中
# create_table_sql_info = "CREATE TABLE {} AS SELECT * FROM CSVREAD('{}')".format(str.upper(name), csv_path)
# cursor.execute(create_table_sql_info)
cursor.close()
conn.close()
# 如果采取的是读CSV文件的话,就需要删除csv文件
# for root, dirs, files in os.walk(local_path):
# for name in files:
# if name.endswith(".csv"): # 填写规则
# os.remove(os.path.join(root, name))
# break
# break
return "success"
三、oracle数据库
这里比较坑爹的一点是,Python连接oracle必须要下载oracle的客户端(具体可以看网上其他博客),然后再下载cx_Oracle的库,如果是windows下,还需要将客户端文件夹下的.dll文件全部复制到Python的Lib文件夹中以及添加oracle客户端的系统路径。在linux下同样需要安装客户端,最新版的客户端不需要自己添加环境变量,因此直接安装就行了,博主服务器上是安装的21.xx的版本。
3.1、读oracle数据库操作
import cx_Oracle
import pandas as pd
def execute_oracle_database_v2(info_sql, oracle_db_user, oracle_db_pwd, oracle_db_ip, oracle_db_port, oracle_db_db, oracle_serviceName):
"""
读oracle数据库
@param info_sql:数据库查询语句
@param oracle_db_user:数据库用户名
@param oracle_db_pwd:数据库密码
@param oracle_db_ip:数据库IP
@param oracle_db_port:端口
@param oracle_db_db:表名(没用到)
@param oracle_serviceName:服务名
@return:dataframe和列名
"""
columns = []
dsn = cx_Oracle.makedsn(oracle_db_ip, oracle_db_port, oracle_serviceName)
db = cx_Oracle.connect(oracle_db_user, oracle_db_pwd, dsn)
cursor = db.cursor()
cursor.execute(info_sql)
rs = cursor.fetchall()
col_result = cursor.description
for i in range(0, len(col_result)):
columns.append(col_result[i][0]) # 以列表形式存储
data_info = pd.DataFrame(rs, columns=columns)
return data_info, columns
# 下面这种方法也是可以的
# conn_info = "oracle://{}:{}@{}:{}/{}".format(oracle_db_user, oracle_db_pwd, oracle_db_ip,oracle_db_port, oracle_db_db)
# connect = create_engine(conn_info)
# sql = info_sql
# data_info = pd.read_sql(sql, connect)
# column_name = data_info.columns.values.tolist()
# return data_info, column_name
3.2、写oracle数据库操作
from sqlalchemy.dialects.oracle import BFILE, BLOB, CHAR, CLOB, DATE, DOUBLE_PRECISION, FLOAT, INTERVAL, LONG, NCLOB, NUMBER, NVARCHAR, NVARCHAR2, RAW, TIMESTAMP, VARCHAR, VARCHAR2
from sqlalchemy import create_engine
import pandas as pd
import cx_Oracle
def mapping_df_types(data): # 这里要做数据类型转换的,不然插入数据库操作会报错
dtypedict = {}
for i, j in zip(data.columns, data.dtypes):
if "object" in str(j):
dtypedict.update({i: VARCHAR(256)})
if "float" in str(j):
dtypedict.update({i: NUMBER(19, 8)})
if "int" in str(j):
dtypedict.update({i: VARCHAR(19)})
if "int64" in str(j):
dtypedict.update({i: VARCHAR(19)})
return dtypedict
def write_to_oracle(data, table_name, oracle_db_user, oracle_db_pwd, oracle_db_ip, oracle_db_port, oracle_db_db, oracle_serviceName):
"""
将数据集写入Oracle数据库的表中
@param data: 数据集
@param table_name: 表名
@param oracle_db_user:数据库用户名
@param oracle_db_pwd:数据库用户密码
@param oracle_db_ip: 数据库IP
@param oracle_db_port: 数据库端口
@param oracle_db_db: 数据库库名
@param oracle_serviceName: oracle服务名
@return: 返回数据库更新成功标识
"""
# conn_info = "mysql+pymysql://root:DtsWdLMhm1Kv3Eck@121.46.19.38:9001/ai_test?charset=utf8"
data = data.apply(pd.to_numeric, errors='ignore')
conn_info = "oracle://{}:{}@{}:{}/{}".format(oracle_db_user, oracle_db_pwd, oracle_db_ip,
oracle_db_port, oracle_serviceName)
conn = create_engine(conn_info, encoding='utf-8')
dtypedict = mapping_df_types(data) # 类型转换是必要的
data.to_sql(table_name, conn, if_exists='replace', index=False, dtype=dtypedict, chunksize=None)
conn.dispose()
return {'msg': 'success'}
原文地址:https://blog.csdn.net/SosoDefficult/article/details/124436232
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_12269.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!