一、peewee连接池
from peewee import *
from playhouse.pool import PooledMySQLDatabase, PooledDatabase
PooledMySQLDatabase(
'peewee_test',
max_connections=8,
stale_timeout=300,
user='user',
password='password',
host='127.0.0.0',
port=3306
)
二、Peewee主从式 pip install pwmd
#!/usr/bin/python
# -*- coding: utf-8 -*-
import unittest
from datetime import date
from peewee import Model, CharField, DateField, BooleanField
from pwmd import MultiMySQLDatabase
DATABASE = {'master': 'mysql://root@localhost/test_app',
'slaves': ['mysql://root@localhost/test_app']}
db = MultiMySQLDatabase(DATABASE)
db.connect()
class BaseModel(Model):
class Meta:
database = db
class Person(BaseModel):
name = CharField()
birthday = DateField()
is_relative = BooleanField()
class TestMulti(unittest.TestCase):
def setUp(self):
Person.create_table()
def test_person(self):
uncle_bob = Person(name='Bob', birthday=date(1960, 1, 15), is_relative=True)
uncle_bob.save()
Person.get(Person.name == 'Bob') == uncle_bob
def tearDown(self):
Person.drop_table()
三、 peewee 数据库重连,连接池连接不够
# _*_ coding:utf-8 _*_
from playhouse.pool import PooledMySQLDatabase
from playhouse.shortcuts import ReconnectMixin
import config
class RetryMySQLDatabase(ReconnectMixin, PooledMySQLDatabase):
_instance = None
@staticmethod
def get_db_instance():
if not RetryMySQLDatabase._instance:
RetryMySQLDatabase._instance = RetryMySQLDatabase(
config.get('db_name', 'aaabb'),
max_connections=8,
stale_timeout=300,
host=config.get('db_host', '127.0.0.1'),
user=config.get('db_user', 'root'),
password=config.get('db_pwd', '123'),
port=config.get('db_port', 3306)
)
return RetryMySQLDatabase._instance
# 如何使用?
# 在model文件中
database = RetryMySQLDatabase.get_db_instance()
为了不用不停的连接断开peewee提供with 操作
with database.connection_context():
Person.select()
....
转载 https://blog.csdn.net/max229max/article/details/90512313
三、ssh访问数据库的实例
- 方法一
from peewee import * from playhouse.db_url import connect from sshtunnel import SSHTunnelForwarder server = SSHTunnelForwarder( (sshServerB_ip, sshServerB_port), # 跳板机配置 ssh_password=sshServerB_pwd, ssh_username=sshServerB_usr, remote_bind_address=(databaseA_ip, databaseA_port) # 远程的MySQL|Redis服务器 ) server.start() destination_lib = connect('mysql://%s:%s@127.0.0.1:%d/%s' % (databaseA_usr, databaseA_pwd, server.local_bind_port, databaseA_db)) ''' your code to operate the databaseA ''' server.close()
使用SSHTunnelForwarder隧道,通过跳板机链接MySQL
- 方法二 使用SSHTunnelForwarder隧道,通过跳板机链接Redis
-
通过SSHTunnelForwarder,paramiko模块,先ssh到跳板机,然后在跳板机上(或者内部服务器上),获取到权限,然后远程Redis。
-
使用SSHTunnelForwarder模块,通过本地22端口ssh到跳板机,然后本地开启一个转发端口给跳板机远程Redis服务使用。
import sshtunnel with SSHTunnelForwarder( ('xxx.xxx.xx.xx', 22), # 跳板机 ssh_username=username, ssh_pkey="/Users/xxx/.ssh/id_rsa", remote_bind_address=('xx.xx.xx.xxx', 6379), # 远程的Redis服务器 local_bind_address=('0.0.0.0', 10022) # 开启本地转发端口 ) as server: server.start() # 开启隧道 print(server.local_bind_port) # 本地通过local_bind_port端口转发,利用跳板机,链接Redis服务 cls.red = redis.Redis(host='127.0.0.1', port=server.local_bind_port, db=db, decode_responses=True) server.close() # 关闭隧道
本文地址:https://blog.csdn.net/a6864657/article/details/107635043