问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

sqlalchemy做批量数据插入的时候要注意什么?有什么可以优化的

发布网友 发布时间:2022-04-09 07:15

我来回答

3个回答

懂视网 时间:2022-04-09 11:37

= org.sqlite.JDBC spring.jpa.database = SQLite spring.jpa.show-sql = true spring.jpa.hibernate.ddl-auto = update spring.jpa.hibernate.naming-strategy = org.hibernate.cfg.ImprovedNamingStrategy spring.jpa.properties.hibernate.dialect = test.utils.SQLiteDialect multipart.maxFileSize=10Mb

2. 本地添加方言文件

package test.utils;

import java.sql.Types;

import org.hibernate.dialect.Dialect;
import org.hibernate.dialect.function.StandardSQLFunction;
import org.hibernate.dialect.function.SQLFunctionTemplate;
import org.hibernate.dialect.function.VarArgsSQLFunction;
import org.hibernate.type.StandardBasicTypes;

public class SQLiteDialect extends Dialect {
 public SQLiteDialect() {
 super();
 registerColumnType(Types.BIT, "integer");
 registerColumnType(Types.TINYINT, "tinyint");
 registerColumnType(Types.SMALLINT, "smallint");
 registerColumnType(Types.INTEGER, "integer");
 registerColumnType(Types.BIGINT, "bigint");
 registerColumnType(Types.FLOAT, "float");
 registerColumnType(Types.REAL, "real");
 registerColumnType(Types.DOUBLE, "double");
 registerColumnType(Types.NUMERIC, "numeric");
 registerColumnType(Types.DECIMAL, "decimal");
 registerColumnType(Types.CHAR, "char");
 registerColumnType(Types.VARCHAR, "varchar");
 registerColumnType(Types.LONGVARCHAR, "longvarchar");
 registerColumnType(Types.DATE, "date");
 registerColumnType(Types.TIME, "time");
 registerColumnType(Types.TIMESTAMP, "timestamp");
 registerColumnType(Types.BINARY, "blob");
 registerColumnType(Types.VARBINARY, "blob");
 registerColumnType(Types.LONGVARBINARY, "blob");
 // registerColumnType(Types.NULL, "null");
 registerColumnType(Types.BLOB, "blob");
 registerColumnType(Types.CLOB, "clob");
 registerColumnType(Types.BOOLEAN, "integer");

 registerFunction("concat", new VarArgsSQLFunction(StandardBasicTypes.STRING, "", "||", ""));
 registerFunction("mod", new SQLFunctionTemplate(StandardBasicTypes.INTEGER, "?1 % ?2"));
 registerFunction("substr", new StandardSQLFunction("substr", StandardBasicTypes.STRING));
 registerFunction("substring", new StandardSQLFunction("substr", StandardBasicTypes.STRING));
 }

 public boolean supportsIdentityColumns() {
 return true;
 }

 public boolean hasDataTypeInIdentityColumn() {
 return false;
 }

 public String getIdentityColumnString() {
 return "integer";
 }

 public String getIdentitySelectString() {
 return "select last_insert_rowid()";
 }

 public boolean supportsLimit() {
 return true;
 }

 public String getLimitString(String query, boolean hasOffset) {
 return new StringBuffer(query.length() + 20).append(query).append(hasOffset ? " limit ? offset ?" : " limit ?")
  .toString();
 }

 public boolean supportsTemporaryTables() {
 return true;
 }

 public String getCreateTemporaryTableString() {
 return "create temporary table if not exists";
 }

 public boolean dropTemporaryTableAfterUse() {
 return false;
 }

 public boolean supportsCurrentTimestampSelection() {
 return true;
 }

 public boolean isCurrentTimestampSelectStringCallable() {
 return false;
 }

 public String getCurrentTimestampSelectString() {
 return "select current_timestamp";
 }

 public boolean supportsUnionAll() {
 return true;
 }

 public boolean hasAlterTable() {
 return false;
 }

 public boolean dropConstraints() {
 return false;
 }

 public String getAddColumnString() {
 return "add column";
 }

 public String getForUpdateString() {
 return "";
 }

 public boolean supportsOuterJoinForUpdate() {
 return false;
 }

 public String getDropForeignKeyString() {
 throw new UnsupportedOperationException("No drop foreign key syntax supported by SQLiteDialect");
 }

 public String getAddForeignKeyConstraintString(String constraintName, String[] foreignKey, String referencedTable,
  String[] primaryKey, boolean referencesPrimaryKey) {
 throw new UnsupportedOperationException("No add foreign key syntax supported by SQLiteDialect");
 }

 public String getAddPrimaryKeyConstraintString(String constraintName) {
 throw new UnsupportedOperationException("No add primary key syntax supported by SQLiteDialect");
 }

 public boolean supportsIfExistsBeforeTableName() {
 return true;
 }

 public boolean supportsCascadeDelete() {
 return false;
 }

 @Override
 public boolean bindLimitParametersInReverseOrder() {
 return true;
 }
}

 

sqlite数据库方言配置

标签:

热心网友 时间:2022-04-09 08:45

举例来说吧. 关键点都在注释里

import sqlalchemy as sa

# 用Sqlite做例子,别的数据库连接字符串不同
engine = sa.create_engine('sqlite://', echo=True)
metadata = sa.MetaData()

# 假定这个是表结构
widgets_table = sa.Table('widgets', metadata,
    sa.Column('id', sa.Integer, primary_key=True),
    sa.Column('foo', sa.String(50)),
    sa.Column('bar', sa.String(50)),
    sa.Column('biz', sa.Boolean),
    sa.Column('baz', sa.Integer),
    )
metadata.create_all(engine)

# 假定这是你的数据结构,在一个list中每个元组是一条记录
values = [
    (None, "Test", True, 3),
    (None, "Test", True, 3),
    ]
# 主要是参考这部分如何批量插入
with engine.connect() as connection:
    with connection.begin() as transaction:
        try:
            markers = ','.join('?' * len(values[0]))
            # 按段数拼成makers = '(?,?,?,?)'
            ins = 'INSERT INTO {tablename} VALUES ({markers})'
            ins = ins.format(tablename=widgets_table.name, markers=markers)
            # 如果你的表已经存在了,widgets_table.name改成表名就行了.
            connection.execute(ins, values)
        except:
            transaction.rollback()
            raise
        else:
            transaction.commit()

热心网友 时间:2022-04-09 10:03

一、开始使用:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
DB_CONNECT_STRING = 'mysql+mysqldb://root:123@localhost/ooxx?charset=utf8'
engine = create_engine(DB_CONNECT_STRING, echo=True)
DB_Session = sessionmaker(bind=engine)
session = DB_Session()
这里的 DB_CONNECT_STRING 就是连接数据库的路径。“mysql+mysqldb”指定了使用 MySQL-Python 来连接,“root”和“123”分别是用户名和密码,“localhost”是数据库的域名,“ooxx”是使用的数据库名(可省略),“charset”指定了连接时使用的字符集(可省略)。
create_engine() 会返回一个数据库引擎,echo 参数为 True 时,会显示每条执行的 SQL 语句,生产环境下可关闭。
sessionmaker() 会生成一个数据库会话类。这个类的实例可以当成一个数据库连接,它同时还记录了一些查询的数据,并决定什么时候执行 SQL 语句。由于 SQLAlchemy 自己维护了一个数据库连接池(默认 5 个连接),因此初始化一个会话的开销并不大。对 Tornado 而言,可以在 BaseHandler 的 initialize() 里初始化:
class BaseHandler(tornado.web.RequestHandler):
def initialize(self):
self.session = models.DB_Session()

def on_finish(self):
self.session.close()
对其他 Web 服务器来说,可以使用 sqlalchemy.orm.scoped_session,它能保证每个线程获得的 session 对象都是唯一的。不过 Tornado 本身就是单线程的,如果使用了异步方式,就可能会出现问题,因此并没使用它。

拿到 session 后,就可以执行 SQL 了:
session.execute('create database abc')
print session.execute('show databases').fetchall()
session.execute('use abc')
# 建 user 表的过程略
print session.execute('select * from user where id = 1').first()
print session.execute('select * from user where id = :id', {'id': 1}).first()
不过这和直接使用 MySQL-Python 没啥区别;ORM 的方式,这也是采用 SQLAlchemy 的唯一原因。

于是来定义一个表:
from sqlalchemy import Column
from sqlalchemy.types import CHAR, Integer, String
from sqlalchemy.ext.declarative import declarative_base

BaseModel = declarative_base()

def init_db():
BaseModel.metadata.create_all(engine)

def drop_db():
BaseModel.metadata.drop_all(engine)

class User(BaseModel):
__tablename__ = 'user'

id = Column(Integer, primary_key=True)
name = Column(CHAR(30)) # or Column(String(30))

init_db()
declarative_base() 创建了一个 BaseModel 类,这个类的子类可以自动与一个表关联。
以 User 类为例,它的 __tablename__ 属性就是数据库中该表的名称,它有 id 和 name 这两个字段,分别为整型和 30 个定长字符。Column 还有一些其他的参数,我就不解释了。
最后,BaseModel.metadata.create_all(engine) 会找到 BaseModel 的所有子类,并在数据库中建立这些表;drop_all() 则是删除这些表。

接着就开始使用这个表吧:
from sqlalchemy import func, or_, not_

user = User(name='a')
session.add(user)
user = User(name='b')
session.add(user)
user = User(name='a')
session.add(user)
user = User()
session.add(user)
session.commit()

query = session.query(User)
print query # 显示SQL 语句
print query.statement # 同上
for user in query: # 遍历时查询
print user.name
print query.all() # 返回的是一个类似列表的对象
print query.first().name # 记录不存在时,first() 会返回 None
# print query.one().name # 不存在,或有多行记录时会抛出异常
print query.filter(User.id == 2).first().name
print query.get(2).name # 以主键获取,等效于上句
print query.filter('id = 2').first().name # 支持字符串

query2 = session.query(User.name)
print query2.all() # 每行是个元组
print query2.limit(1).all() # 最多返回 1 条记录
print query2.offset(1).all() # 从第 2 条记录开始返回
print query2.order_by(User.name).all()
print query2.order_by('name').all()
print query2.order_by(User.name.desc()).all()
print query2.order_by('name desc').all()
print session.query(User.id).order_by(User.name.desc(), User.id).all()

print query2.filter(User.id == 1).scalar() # 如果有记录,返回第一条记录的第一个元素
print session.query('id').select_from(User).filter('id = 1').scalar()
print query2.filter(User.id > 1, User.name != 'a').scalar() # and
query3 = query2.filter(User.id > 1) # 多次拼接的 filter 也是 and
query3 = query3.filter(User.name != 'a')
print query3.scalar()
print query2.filter(or_(User.id == 1, User.id == 2)).all() # or
print query2.filter(User.id.in_((1, 2))).all() # in

query4 = session.query(User.id)
print query4.filter(User.name == None).scalar()
print query4.filter('name is null').scalar()
print query4.filter(not_(User.name == None)).all() # not
print query4.filter(User.name != None).all()

print query4.count()
print session.query(func.count('*')).select_from(User).scalar()
print session.query(func.count('1')).select_from(User).scalar()
print session.query(func.count(User.id)).scalar()
print session.query(func.count('*')).filter(User.id > 0).scalar() # filter() 中包含 User,因此不需要指定表
print session.query(func.count('*')).filter(User.name == 'a').limit(1).scalar() == 1 # 可以用 limit() * count() 的返回数
print session.query(func.sum(User.id)).scalar()
print session.query(func.now()).scalar() # func 后可以跟任意函数名,只要该数据库支持
print session.query(func.current_timestamp()).scalar()
print session.query(func.md5(User.name)).filter(User.id == 1).scalar()

query.filter(User.id == 1).update({User.name: 'c'})
user = query.get(1)
print user.name

user.name = 'd'
session.flush() # 写数据库,但并不提交
print query.get(1).name

session.delete(user)
session.flush()
print query.get(1)

session.rollback()
print query.get(1).name
query.filter(User.id == 1).delete()
session.commit()
print query.get(1)

二、进阶的知识。
1)如何批量插入大批数据?
可以使用非 ORM 的方式:
session.execute(
User.__table__.insert(),
[{'name': `randint(1, 100)`,'age': randint(1, 100)} for i in xrange(10000)]
)
session.commit()

如何批量插入大批数据?
可以使用非 ORM 的方式:
session.execute(
User.__table__.insert(),
[{'name': `randint(1, 100)`,'age': randint(1, 100)} for i in xrange(10000)]
)
session.commit()
上面批量插入了 10000 条记录,半秒内就执行完了;而 ORM 方式会花掉很长时间。

2)如何让执行的 SQL 语句增加前缀?
使用 query 对象的 prefix_with() 方法:
session.query(User.name).prefix_with('HIGH_PRIORITY').all()
session.execute(User.__table__.insert().prefix_with('IGNORE'), {'id': 1, 'name': '1'})

3)如何替换一个已有主键的记录?
使用 session.merge() 方法替代 session.add(),其实就是 SELECT + UPDATE:
user = User(id=1, name='ooxx')
session.merge(user)
session.commit()
或者使用 MySQL 的 INSERT … ON DUPLICATE KEY UPDATE,需要用到 @compiles 装饰器,有点难懂,自己看吧:《SQLAlchemy ON DUPLICATE KEY UPDATE》 和 sqlalchemy_mysql_ext。

4)如何使用无符号整数?
可以使用 MySQL 的方言:
from sqlalchemy.dialects.mysql import INTEGER

id = Column(INTEGER(unsigned=True), primary_key=True)

5)模型的属性名需要和表的字段名不一样怎么办?
开发时遇到过一个奇怪的需求,有个其他系统的表里包含了一个“from”字段,这在 Python 里是关键字,于是只能这样处理了:
from_ = Column('from', CHAR(10))

6)如何获取字段的长度?
Column 会生成一个很复杂的对象,想获取长度比较麻烦,这里以 User.name 为例:
User.name.property.columns[0].type.length

7)如何指定使用 InnoDB,以及使用 UTF-8 编码?
最简单的方式就是修改数据库的默认配置。如果非要在代码里指定的话,可以这样:
class User(BaseModel):
__table_args__ = {
'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8'
}
MySQL 5.5 开始支持存储 4 字节的 UTF-8 编码的字符了,iOS 里自带的 emoji(如 🍎 字符)就属于这种。
如果是对表来设置的话,可以把上面代码中的 utf8 改成 utf8mb4,DB_CONNECT_STRING 里的 charset 也这样更改。
如果对库或字段来设置,则还是自己写 SQL 语句比较方便,具体细节可参考《How to support full Unicode in MySQL databases》。
不建议全用 utf8mb4 代替 utf8,因为前者更慢,索引会占用更多空间。

8)如何设置外键约束?
from random import randint
from sqlalchemy import ForeignKey

class User(BaseModel):
__tablename__ = 'user'

id = Column(Integer, primary_key=True)
age = Column(Integer)

class Friendship(BaseModel):
__tablename__ = 'friendship'

id = Column(Integer, primary_key=True)
user_id1 = Column(Integer, ForeignKey('user.id'))
user_id2 = Column(Integer, ForeignKey('user.id'))

for i in xrange(100):
session.add(User(age=randint(1, 100)))
session.flush() # 或 session.commit(),执行完后,user 对象的 id 属性才可以访问(因为 id 是自增的)

for i in xrange(100):
session.add(Friendship(user_id1=randint(1, 100), user_id2=randint(1, 100)))
session.commit()

session.query(User).filter(User.age < 50).delete()
执行这段代码时,应该会遇到一个错误:
sqlalchemy.exc.IntegrityError: (IntegrityError) (1451, 'Cannot delete or update a parent row: a foreign key constraint fails (`ooxx`.`friendship`, CONSTRAINT `friendship_ibfk_1` FOREIGN KEY (`user_id1`) REFERENCES `user` (`id`))') 'DELETE FROM user WHERE user.age < %s' (50,)原因是删除 user 表的数据,可能会导致 friendship 的外键不指向一个真实存在的记录。在默认情况下,MySQL 会拒绝这种操作,也就是 RESTRICT。InnoDB 还允许指定 ON DELETE 为 CASCADE 和 SET NULL,前者会删除 friendship 中无效的记录,后者会将这些记录的外键设为 NULL。
除了删除,还有可能更改主键,这也会导致 friendship 的外键失效。于是相应的就有 ON UPDATE 了。其中 CASCADE 变成了更新相应的外键,而不是删除。
而在 SQLAlchemy 中是这样处理的:
class Friendship(BaseModel):
__tablename__ = 'friendship'

id = Column(Integer, primary_key=True)
user_id1 = Column(Integer, ForeignKey('user.id', ondelete='CASCADE', onupdate='CASCADE'))
user_id2 = Column(Integer, ForeignKey('user.id', ondelete='CASCADE', onupdate='CASCADE'))

9)如何连接表?
from sqlalchemy import distinct
from sqlalchemy.orm import aliased

Friend = aliased(User, name='Friend')

print session.query(User.id).join(Friendship, User.id == Friendship.user_id1).all() # 所有有朋友的用户
print session.query(distinct(User.id)).join(Friendship, User.id == Friendship.user_id1).all() # 所有有朋友的用户(去掉重复的)
print session.query(User.id).join(Friendship, User.id == Friendship.user_id1).distinct().all() # 同上
print session.query(Friendship.user_id2).join(User, User.id == Friendship.user_id1).order_by(Friendship.user_id2).distinct().all() # 所有被别人当成朋友的用户
print session.query(Friendship.user_id2).select_from(User).join(Friendship, User.id == Friendship.user_id1).order_by(Friendship.user_id2).distinct().all() # 同上,join 的方向相反,但因为不是 STRAIGHT_JOIN,所以 MySQL 可以自己选择顺序
print session.query(User.id, Friendship.user_id2).join(Friendship, User.id == Friendship.user_id1).all() # 用户及其朋友
print session.query(User.id, Friendship.user_id2).join(Friendship, User.id == Friendship.user_id1).filter(User.id < 10).all() # id 小于 10 的用户及其朋友
print session.query(User.id, Friend.id).join(Friendship, User.id == Friendship.user_id1).join(Friend, Friend.id == Friendship.user_id2).all() # 两次 join,由于使用到相同的表,因此需要别名
print session.query(User.id, Friendship.user_id2).outerjoin(Friendship, User.id == Friendship.user_id1).all() # 用户及其朋友(无朋友则为 None,使用左连接)
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
下载了优酷播放器iku免登录版,安装在没联网的电脑上。查看不了本地的... 播放FLV的几个问题在线等 劲舞团真的有那么好玩吗? 为什么别人说劲舞团是世界上最淫荡的游戏.. 劲舞团好玩吗?? 030221 1020 060414 2168 060417 这几组数是什么意思啊!有知道的大哥请... ...分签约短信服务-【广东农信】”是什么意思? 2168是什么意思爱情 事业单位面试成绩79分算好的吗 事业编面试成绩76分算高分吗 别加水,鸡肉红烧加哪四味调料能够肉香味浓? 家里的自来水容易起水垢,怎么办? 有颜值,有腹肌,能下厨房能蹦迪。不渣不闹不心机,可静可动可逗比。懂生活,懂情? 自来水烧开后水垢是如何产生的? 一个狙击手少年和杀手少女的故事,求接下去 马航班失联的策划人真狠。 搜索行动是各国实力较量? 考验参与国科技方面水平?阴谋黑手迷雾,真真假假 战争中狙击手击杀队友怎么办? 为什么自来水烧开了会有一层白色沉淀物,那可以食用吗 腹肌比健身教练还要强的五岁足球天才是谁? 美的冰箱的“冰水同框”挑战了权威吗? 红海行动狙击手小孩被谁杀的 日军狙击手狙杀国军小队,被打猎少年狙击手击毙是什么电视剧 抖音上网红的腹肌贴如果没胶了怎么办? 脸上有伤疤的狙击手男孩的电影名字 请大家推荐几部好看的美国大片,关于【青春、校园、魔法、科技】之类的 甘望星成男版杨超越,跳舞秀腹肌却成笑料,他的实力到底如何? 钢铁雄心4秘籍是什么? 日军狙击手狙杀国军小队,被打猎少年狙击手击毙 什么片名 抖音9000粉丝的腹肌男孩有哪些? 红海行动狙击手小孩是谁演的 小面调味料都有些什么品味? 如何计算监控摄像头的有效距离 附近有没有卖粗粮的 如何计算网络摄像机的一天容量 温州大学历任校长 北京西站附近有做粗粮煎饼的吗 怎么计算摄像头能否拍出1080P视频 光大环保能源(滕州)有限公司怎么样? 新泰光大环保能源有限公司怎么样? 附近哪里有荞麦面? 你们的梦想是什么 深圳 哪里有卖谷子 或者小麦 玉米粒这些五谷杂粮的地方?最好是靠近宝安这边的。 怎么计算高清网络摄像机存储? 光大环保能源老总是谁 西安钟楼附近粗粮王现在什么价位? 我快过生日了能免费不? 学生证有用么 茶阳镇的领导机构 多少度是一个大气压 跪求一个计算监控摄像机镜头角度以及盲区的软件 用硬盘fps计算摄像机硬盘存储 光大环保能源(菏泽)有限公司怎么样?