トランザクションってなーに?mix3です
同時更新処理
スレーブ遅延のときにSELECT > UPDATEで古い情報をもとにUPDATEしてしまうと不整合が発生すると書きましたが、
たとえば同じ行を複数で同時にSELECT > UPDATEすると同じように不整合が発生することがあります。
- AがT1からSELECT
- BがT1からSELECT
- AがT1をUPDATE
- BがT1をUPDATE
このような順序で更新がかかると、BがAの更新を上書きしてしまうため、実質Aの更新が無かった事になってしまいます。
これも更新がアトミックであれば発生しないので可能であればアトミックな更新にするのが良いですが、更新が複数ある場合はそういった変更も出来ません。
そんなときこそトランザクション。そんなふうに考えていた時期が僕にもありました。
で、そういうときはトランザクションを使う物だと思っていました。トランザクションは複数の処理を一つの処理としてまとめて成功するか失敗するか、どちらかを保証してくれます。
なので、SELECT > UPDATEをトランザクションでまとめてしまえば大丈夫だよね!と思っていたので、以下のようなコードを書いて確認しました。
use Parallel::Benchmark; | |
use Test::mysqld; | |
my $mysqld = Test::mysqld->new( | |
my_cnf => { | |
'skip-networking' => '', | |
}, | |
); | |
{ | |
my $dbh = dbh(); | |
$dbh->do(q{ | |
CREATE TABLE t1 ( | |
k INTEGER UNSIGNED NOT NULL, | |
v INTEGER UNSIGNED NOT NULL, | |
PRIMARY KEY (k) | |
); | |
}); | |
$dbh->do(q{INSERT INTO t1 (k, v) VALUES (1, 0) }); | |
} | |
my $bm = Parallel::Benchmark->new( | |
setup => sub { | |
my $self = shift; | |
$self->stash->{dbh} = dbh(); | |
}, | |
teardown => sub { | |
my $self = shift; | |
delete $self->stash->{dbh}; | |
}, | |
benchmark => sub { | |
my $self = shift; | |
my $dbh = $self->stash->{dbh}; | |
$dbh->do(q{UPDATE t1 SET v = v + 1 WHERE k = 1}); | |
select undef, undef, undef, 0.1; | |
1; | |
}, | |
concurrency => 10, | |
# debug => 1, | |
); | |
my $result = $bm->run(); | |
use Data::Dumper; warn Dumper $result; | |
use Data::Dumper; warn Dumper dbh()->selectrow_hashref(q{SELECT * FROM t1 WHERE k = 1}); | |
sub dbh { | |
return DBI->connect($mysqld->dsn(dbname => 'test'), 'root', '', { | |
AutoCommit => 1, | |
PrintError => 0, | |
RaiseError => 1, | |
ShowErrorStatement => 1, | |
AutoInactiveDestroy => 1, | |
# mysql_enable_utf8 => 1, | |
# mysql_auto_reconnect => 0, | |
}); | |
} | |
=comment | |
$ perl /tmp/atomic.pl | |
2012-12-30T16:44:23 [INFO] starting benchmark: concurrency: 10, time: 3 | |
2012-12-30T16:44:28 [INFO] done benchmark: score 310, elapsed 3.022 sec = 102.593 / sec | |
$VAR1 = { | |
'stashes' => { | |
'6' => {}, | |
'3' => {}, | |
'7' => {}, | |
'9' => {}, | |
'2' => {}, | |
'8' => {}, | |
'1' => {}, | |
'4' => {}, | |
'10' => {}, | |
'5' => {} | |
}, | |
'score' => 310, | |
'elapsed' => '3.0216583' | |
}; | |
$VAR1 = { | |
'k' => '1', | |
'v' => '310' | |
}; | |
=cut |
use Parallel::Benchmark; | |
use Test::mysqld; | |
my $mysqld = Test::mysqld->new( | |
my_cnf => { | |
'skip-networking' => '', | |
}, | |
); | |
{ | |
my $dbh = dbh(); | |
$dbh->do(q{ | |
CREATE TABLE t1 ( | |
k INTEGER UNSIGNED NOT NULL, | |
v INTEGER UNSIGNED NOT NULL, | |
PRIMARY KEY (k) | |
); | |
}); | |
$dbh->do(q{INSERT INTO t1 (k, v) VALUES (1, 0) }); | |
} | |
my $bm = Parallel::Benchmark->new( | |
setup => sub { | |
my $self = shift; | |
$self->stash->{dbh} = dbh(); | |
}, | |
teardown => sub { | |
my $self = shift; | |
delete $self->stash->{dbh}; | |
}, | |
benchmark => sub { | |
my $self = shift; | |
my $dbh = $self->stash->{dbh}; | |
eval { | |
$dbh->begin_work; | |
my $row = $dbh->selectrow_hashref(q{SELECT * FROM t1 WHERE k = 1}); | |
$dbh->do(q{UPDATE t1 SET v = ? WHERE k = 1}, {}, $row->{v} + 1); | |
}; | |
if ($@) { | |
warn $!; | |
$dbh->rollback; | |
} else { | |
$dbh->commit; | |
} | |
select undef, undef, undef, 0.1; | |
1; | |
}, | |
concurrency => 10, | |
# debug => 1, | |
); | |
my $result = $bm->run(); | |
use Data::Dumper; warn Dumper $result; | |
use Data::Dumper; warn Dumper dbh()->selectrow_hashref(q{SELECT * FROM t1 WHERE k = 1}); | |
sub dbh { | |
return DBI->connect($mysqld->dsn(dbname => 'test'), 'root', '', { | |
AutoCommit => 1, | |
PrintError => 0, | |
RaiseError => 1, | |
ShowErrorStatement => 1, | |
AutoInactiveDestroy => 1, | |
# mysql_enable_utf8 => 1, | |
# mysql_auto_reconnect => 0, | |
}); | |
} | |
=comment | |
$ perl /tmp/select-update.pl | |
2012-12-30T16:46:06 [INFO] starting benchmark: concurrency: 10, time: 3 | |
2012-12-30T16:46:11 [INFO] done benchmark: score 290, elapsed 3.023 sec = 95.935 / sec | |
$VAR1 = { | |
'stashes' => { | |
'6' => {}, | |
'3' => {}, | |
'7' => {}, | |
'9' => {}, | |
'2' => {}, | |
'8' => {}, | |
'1' => {}, | |
'4' => {}, | |
'10' => {}, | |
'5' => {} | |
}, | |
'score' => 290, | |
'elapsed' => '3.0228849' | |
}; | |
$VAR1 = { | |
'k' => '1', | |
'v' => '128' | |
}; | |
=cut |
scoreが更新回数になるので、'v' => '<更新回数>' と揃えば不整合無く更新出来たことになるのですが、
見ての通りトランザクションを使った方は残念な結果になっています。で、すこしググッた結果トランザクションとロックは別で考えないと行けないことを知りました。
トランザクションを使ったからといって勝手に行読み込みなどをロックして整合性を保証してくれるとかでは全然無いんですね。
当たり前だろダラズ!と言われても仕方ないような勘違いですが、いやトランザクションを使わない世界でしばらく生きていたので…
で、SELECTにテーブルロックや行ロックが掛かってないためにSELECT > UPDATEの不整合が防げていないようなので、SELECT文にFOR UPDATE を付けてそれを防いでみました
use Parallel::Benchmark; | |
use Test::mysqld; | |
my $mysqld = Test::mysqld->new( | |
my_cnf => { | |
'skip-networking' => '', | |
}, | |
); | |
{ | |
my $dbh = dbh(); | |
$dbh->do(q{ | |
CREATE TABLE t1 ( | |
k INTEGER UNSIGNED NOT NULL, | |
v INTEGER UNSIGNED NOT NULL, | |
PRIMARY KEY (k) | |
); | |
}); | |
$dbh->do(q{INSERT INTO t1 (k, v) VALUES (1, 0) }); | |
} | |
my $bm = Parallel::Benchmark->new( | |
setup => sub { | |
my $self = shift; | |
$self->stash->{dbh} = dbh(); | |
}, | |
teardown => sub { | |
my $self = shift; | |
delete $self->stash->{dbh}; | |
}, | |
benchmark => sub { | |
my $self = shift; | |
my $dbh = $self->stash->{dbh}; | |
eval { | |
$dbh->begin_work; | |
my $row = $dbh->selectrow_hashref(q{SELECT * FROM t1 WHERE k = 1 FOR UPDATE}); | |
$dbh->do(q{UPDATE t1 SET v = ? WHERE k = 1}, {}, $row->{v} + 1); | |
}; | |
if ($@) { | |
warn $!; | |
$dbh->rollback; | |
} else { | |
$dbh->commit; | |
} | |
select undef, undef, undef, 0.1; | |
1; | |
}, | |
concurrency => 10, | |
# debug => 1, | |
); | |
my $result = $bm->run(); | |
use Data::Dumper; warn Dumper $result; | |
use Data::Dumper; warn Dumper dbh()->selectrow_hashref(q{SELECT * FROM t1 WHERE k = 1}); | |
sub dbh { | |
return DBI->connect($mysqld->dsn(dbname => 'test'), 'root', '', { | |
AutoCommit => 1, | |
PrintError => 0, | |
RaiseError => 1, | |
ShowErrorStatement => 1, | |
AutoInactiveDestroy => 1, | |
# mysql_enable_utf8 => 1, | |
# mysql_auto_reconnect => 0, | |
}); | |
} | |
=comment | |
$ perl /tmp/for-update.pl | |
2012-12-30T16:54:06 [INFO] starting benchmark: concurrency: 10, time: 3 | |
2012-12-30T16:54:11 [INFO] done benchmark: score 301, elapsed 3.021 sec = 99.639 / sec | |
$VAR1 = { | |
'stashes' => { | |
'6' => {}, | |
'3' => {}, | |
'7' => {}, | |
'9' => {}, | |
'2' => {}, | |
'8' => {}, | |
'1' => {}, | |
'4' => {}, | |
'10' => {}, | |
'5' => {} | |
}, | |
'score' => 301, | |
'elapsed' => '3.0208973' | |
}; | |
$VAR1 = { | |
'k' => '1', | |
'v' => '286' | |
}; | |
=cut |
結果またしても残念な結果になりました…
ちゃんとストレージエンジンを指定しましょう
結果から言うとストレージエンジンを指定していなかったのでトランザクションの使えないMyISAMでテーブルが作られていたからでした。
use Parallel::Benchmark; | |
use Test::mysqld; | |
my $mysqld = Test::mysqld->new( | |
my_cnf => { | |
'skip-networking' => '', | |
}, | |
); | |
{ | |
my $dbh = dbh(); | |
$dbh->do(q{ | |
CREATE TABLE t1 ( | |
k INTEGER UNSIGNED NOT NULL, | |
v INTEGER UNSIGNED NOT NULL, | |
PRIMARY KEY (k) | |
) ENGINE=INNODB; | |
}); | |
$dbh->do(q{INSERT INTO t1 (k, v) VALUES (1, 0) }); | |
} | |
my $bm = Parallel::Benchmark->new( | |
setup => sub { | |
my $self = shift; | |
$self->stash->{dbh} = dbh(); | |
}, | |
teardown => sub { | |
my $self = shift; | |
delete $self->stash->{dbh}; | |
}, | |
benchmark => sub { | |
my $self = shift; | |
my $dbh = $self->stash->{dbh}; | |
eval { | |
$dbh->begin_work; | |
my $row = $dbh->selectrow_hashref(q{SELECT * FROM t1 WHERE k = 1 FOR UPDATE}); | |
$dbh->do(q{UPDATE t1 SET v = ? WHERE k = 1}, {}, $row->{v} + 1); | |
}; | |
if ($@) { | |
warn $!; | |
$dbh->rollback; | |
} else { | |
$dbh->commit; | |
} | |
select undef, undef, undef, 0.1; | |
1; | |
}, | |
concurrency => 10, | |
# debug => 1, | |
); | |
my $result = $bm->run(); | |
use Data::Dumper; warn Dumper $result; | |
use Data::Dumper; warn Dumper dbh()->selectrow_hashref(q{SELECT * FROM t1 WHERE k = 1}); | |
sub dbh { | |
return DBI->connect($mysqld->dsn(dbname => 'test'), 'root', '', { | |
AutoCommit => 1, | |
PrintError => 0, | |
RaiseError => 1, | |
ShowErrorStatement => 1, | |
AutoInactiveDestroy => 1, | |
# mysql_enable_utf8 => 1, | |
# mysql_auto_reconnect => 0, | |
}); | |
} | |
=comment | |
$ perl /tmp/for-update-2.pl | |
2012-12-30T17:04:10 [INFO] starting benchmark: concurrency: 10, time: 3 | |
2012-12-30T17:04:15 [INFO] done benchmark: score 290, elapsed 3.017 sec = 96.122 / sec | |
$VAR1 = { | |
'stashes' => { | |
'6' => {}, | |
'3' => {}, | |
'7' => {}, | |
'9' => {}, | |
'2' => {}, | |
'8' => {}, | |
'1' => {}, | |
'4' => {}, | |
'10' => {}, | |
'5' => {} | |
}, | |
'score' => 290, | |
'elapsed' => '3.0169922' | |
}; | |
$VAR1 = { | |
'k' => '1', | |
'v' => '290' | |
}; | |
=cut |
こんな感じでInnoDBを指定する事で期待通り、不整合なく更新されるようになりました。めでたしめでたし。
ちなみにSELECTのロックには共有ロックと排他ロックとあり、FOR UPDATEは排他ロックになります。LOCK IN SHARE MODEだと共有ロックになります。
今回の場合にLOCK IN SHARE MODEを使うと10ものプロセスが共有ロックを取り合って凄い勢いでデッドロックしました。
use Parallel::Benchmark; | |
use Test::mysqld; | |
my $mysqld = Test::mysqld->new( | |
my_cnf => { | |
'skip-networking' => '', | |
}, | |
); | |
{ | |
my $dbh = dbh(); | |
$dbh->do(q{ | |
CREATE TABLE t1 ( | |
k INTEGER UNSIGNED NOT NULL, | |
v INTEGER UNSIGNED NOT NULL, | |
PRIMARY KEY (k) | |
) ENGINE=INNODB; | |
}); | |
$dbh->do(q{INSERT INTO t1 (k, v) VALUES (1, 0) }); | |
} | |
my $bm = Parallel::Benchmark->new( | |
setup => sub { | |
my $self = shift; | |
$self->stash->{dbh} = dbh(); | |
}, | |
teardown => sub { | |
my $self = shift; | |
delete $self->stash->{dbh}; | |
}, | |
benchmark => sub { | |
my $self = shift; | |
my $dbh = $self->stash->{dbh}; | |
eval { | |
$dbh->begin_work; | |
my $row = $dbh->selectrow_hashref(q{SELECT * FROM t1 WHERE k = 1 LOCK IN SHARE MODE}); | |
$dbh->do(q{UPDATE t1 SET v = ? WHERE k = 1}, {}, $row->{v} + 1); | |
}; | |
if ($@) { | |
warn $!; | |
$dbh->rollback; | |
} else { | |
$dbh->commit; | |
} | |
select undef, undef, undef, 0.1; | |
1; | |
}, | |
concurrency => 10, | |
# debug => 1, | |
); | |
my $result = $bm->run(); | |
use Data::Dumper; warn Dumper $result; | |
use Data::Dumper; warn Dumper dbh()->selectrow_hashref(q{SELECT * FROM t1 WHERE k = 1}); | |
sub dbh { | |
return DBI->connect($mysqld->dsn(dbname => 'test'), 'root', '', { | |
AutoCommit => 1, | |
PrintError => 0, | |
RaiseError => 1, | |
ShowErrorStatement => 1, | |
AutoInactiveDestroy => 1, | |
# mysql_enable_utf8 => 1, | |
# mysql_auto_reconnect => 0, | |
}); | |
} | |
=comment | |
$ perl /tmp/for-update-3.pl | |
2012-12-30T17:13:20 [INFO] starting benchmark: concurrency: 10, time: 3 | |
DBD::mysql::db do failed: Deadlock found when trying to get lock; try restarting transaction [for Statement "UPDATE t1 SET v = ? WHERE k = 1"] at /tmp/for-update-3.pl line 38. | |
...caught at /tmp/for-update-3.pl line 41. | |
DBD::mysql::db do failed: Deadlock found when trying to get lock; try restarting transaction [for Statement "UPDATE t1 SET v = ? WHERE k = 1"] at /tmp/for-update-3.pl line 38. | |
...caught at /tmp/for-update-3.pl line 41. | |
DBD::mysql::db do failed: Deadlock found when trying to get lock; try restarting transaction [for Statement "UPDATE t1 SET v = ? WHERE k = 1"] at /tmp/for-update-3.pl line 38. | |
...caught at /tmp/for-update-3.pl line 41. | |
. | |
. | |
. | |
2012-12-30T17:13:25 [INFO] done benchmark: score 300, elapsed 3.013 sec = 99.582 / sec | |
$VAR1 = { | |
'stashes' => { | |
'6' => {}, | |
'3' => {}, | |
'7' => {}, | |
'9' => {}, | |
'2' => {}, | |
'8' => {}, | |
'1' => {}, | |
'4' => {}, | |
'10' => {}, | |
'5' => {} | |
}, | |
'score' => 300, | |
'elapsed' => '3.0125781' | |
}; | |
$VAR1 = { | |
'k' => '1', | |
'v' => '245' | |
}; | |
=cut |
ロックするということは同時にデッドロックする可能性もあるということなので、適切なロックを選択して使い、デッドロックの可能性を減らしながら上手く付き合っていきたいですね。
まとめ
- トランザクションはロックとはまた別問題
- 不整合を防ぎたいならちゃんとロックのことを考えてSQLを書く
- MySQLのデフォルトストレージエンジンはMyISAMだけど、それトランザクション使えないからね?
- ロックにも種類はあり、デッドロックには気をつける
今年のブログはこれが最後な気がするので。
来年はもっとブログをコードを書いてアウトプットを増やして行きたいです。