日常數據管理工作中,需要處理存儲在不同類型數據庫系統的數據。對這些數據的管理,常見的是使用Navicat,DBeaver等管理工具。在對大量數據分析時,需要提取到Python/ target=_blank class=infotextkey>Python/R中進行處理。下面探索Python調用MySQL,MongoDB,InfluxDB等多種類型數據庫通用連接方法。實現方式是在Python中封裝各類數據庫接口包。
實現后的效果:1.安全。接口信息封裝便于保密管理;2.復用。一次封裝,永久復用;3.上手快。方便不熟悉python和數據調用的同學,只會簡單的sql即可使用,省時省力。
下面以MySQL,MongoDB,InfluxDB為例定義接口方法,然后把它們封裝成1個通用方法。
mysql_get(sql,db):
# 導入包
import pandas as pd
from sqlalchemy import create_engine
# 定義取數函數
def mysql_get(sql,db):
conn = create_engine('mysql+pymysql://root:123456@localhost:3306/'+db)
df_read = pd.read_sql_query(sql, conn)
conn.dispose()
return df_read
mongo_get(sql,db):
# 導入包
import pymongo
from bson.objectid import ObjectId
# 處理sql語句in的內容,準備工作
list_x = ['c', 'd', 'e', 'f'] # 非_id字段
list_id = ['df343dr34rwfd3', 'ji80jju8jeoe9'] # _id字段
obj_x = ObjectId(list_id) # _id字段
# 定義取數函數
# eg, sql1:"select a,b from tb where a >= 1 order by b limit 5 "
# eg, sql2:"select a,b from tb where a in {list_x} limit 5"
# eg, sql3:"select a,b from tb where _id in {obj_x} limit 5"
def mongo_get(sql, db):
# 第一步,解析sql,得到projectionFields,order by順序,head(n)等,過程略
# 第二步,執行查詢,獲得結果
mycl.NET = pymongo.MongoClient("mongodb://127.0.0.1:27017")
collection = myclinet[db]
serchRes = collection.find(queryArgs, projection=projectionFields)
# 第三步,返回dict
return serchRes
influx_get(sql,db):
import pandas as pd
from influxdb import InfluxDBClient
def influx_get(sql,db):
client = InfluxDBClient('127.0.0.1','8086','root','root',db)
# eg,sql:"select * from SYS_DISK where time >= '2022-06-01 00:00:00' and time <= '2022-06-30 23:55:00';"
tab_res = client.query(sql)
query_res = list(tab_res)
df = pd.DataFrame(query_res[0])
return df
可以看到,以上函數共同調用的參數為sql和db。我們再增加一個參數db_type,將構造一個通用的方法對以上數據庫調用。
def get_data(db_type,sql,db):
if db_type == 'mysql':
return mysql_get(sql,db)
elif db_type == 'mongo':
return mongo_get(sql,db)
else:
return influx_get(sql,db)
同理,其他類型的數據庫也可以加入到這個通用框架中,包括但不限于各類關系型,鍵值型,時序型數據庫。