以前 kernel のlist_head 構造に似た、かつ container/list と同一インターフェースを持つlist_encabazado をつくた。concurrent 対応をどうしようかと考えていたときに、lock free list とかもみていて これをベースにできるかなーとぼやーとおもってやり始めたら大変なことになったので、 書いた。
思いつきの戦略
- 片方向を正にして組んでいく
- insert はCASの張替え
- delete は当然 next ポインタにmarking して、別でtraverse していってたどり着いたやつが行う
- traverse 中にprev が正しくなさそうなら next の情報から同期をとる
でいけるんじゃないかとおもったがうまく行かない。
lock free doubly linked list の論文を漁る
とりあえず
こんな論文を発見したのでこれベースに変更していく
lock free doubly linked list の前提
実はこれがけっこうつらかったりする。
最初は A <-> B <-> C という構造があったら
node を
type ListHead struct {
prev *ListHead
next *ListHead
}
だとして
a := &ListHead{}
b := &ListHead{}
c := &ListHead{}
a.prev = a
a.next = b
b.prev = a
b.next = c
c.prev = b
c.next = c
このように終わりと始まりのノードは自分をさすような仕組みしていたのだけど delete とか insert を考えると終端は空のノードじゃないとまずくて
terminate <-> a <-> b <-> c <-> terminate
のようにしないといけない。そもそもinsert もdelete も3つノードがある前提 なので端っこが特殊処理になってしまう。
insert の実装
conccurnet を意識しない場合は
func listAdd(new, prev, next *ListHead) {
if prev != next {
next.prev, new.next, new.prev, prev.next = new, next, prev, new
} else {
prev.next, new.prev = new, prev
}
}
こんな感じ論文にもあるけど基本的に手順として
- new node の prev(new.prev) と next pointer(new.next) をマーキング
- CAS で prev.next が next と同一なら new に差し替える
- CAS で next.prev が prev と同一なら new に差し替える
func listAddWitCas(new, prev, next *ListHead) (err error) {
// new.prev -> prev, new.next -> next
StoreListHead(&new.prev, prev)
StoreListHead(&new.next, next)
if !Cas(&prev.next, next, new) {
goto ROLLBACK
}
if !Cas(&next.prev, prev, new) {
goto ROLLBACK
}
}
しかし当然 prev <-> next がつながってない場合もあって(すでに他がぶっこんでるとか) 失敗することもあるまたは 処理時に prev のnext の間に別の処理ではいってしまっている可能性 もあるので rollback させる
func listAddWitCas(new, prev, next *ListHead) (err error) {
// backup for roolback
oNewPrev := uintptr(unsafe.Pointer(new.prev))
oNewNext := uintptr(unsafe.Pointer(new.next))
rollback := func(new *ListHead) {
StoreListHead(&new.prev, (*ListHead)(unsafe.Pointer(oNewPrev)))
StoreListHead(&new.next, (*ListHead)(unsafe.Pointer(oNewNext)))
}
// new.prev -> prev, new.next -> next
StoreListHead(&new.prev, prev)
StoreListHead(&new.next, next)
mu4Add.Lock()
defer mu4Add.Unlock()
if !Cas(&prev.next, next, new) {
goto ROLLBACK
}
if !Cas(&next.prev, prev, new) {
goto ROLLBACK
}
return nil
ROLLBACK:
rollback(new)
return ErrTCasConflictOnAdd,
}
失敗した場合は呼び出し側で挿入ターゲットのnode を次(next)を取り直して再度処理する 実際は、リトライ回数とかも設定してあるがそのあたりは省略
prev := head
next := (*ListHead)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&head.next))))
for {
if err = listAddWitCas(new, prev,next); err ==nil {
break
}
next = (*ListHead)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&prev.next))))
}
こう書くだけだとすげえかんたんそうに見えるw
続く…