以前 kernel のlist_head 構造に似た、かつ container/list と同一インターフェースを持つlist_encabazado をつくた。concurrent 対応をどうしようかと考えていたときに、lock free list とかもみていて これをベースにできるかなーとぼやーとおもってやり始めたら大変なことになったので、 書いた。

思いつきの戦略

  1. 片方向を正にして組んでいく
  2. insert はCASの張替え
  3. delete は当然 next ポインタにmarking して、別でtraverse していってたどり着いたやつが行う
  4. traverse 中にprev が正しくなさそうなら next の情報から同期をとる

でいけるんじゃないかとおもったがうまく行かない。

lock free doubly linked list の論文を漁る

とりあえず

こんな論文を発見したのでこれベースに変更していく

lock free doubly linked list の前提

実はこれがけっこうつらかったりする。

最初は  A <-> B <-> C という構造があったら

node を

type ListHead struct {
    prev *ListHead
    next *ListHead
}

だとして


a := &ListHead{}
b := &ListHead{}
c := &ListHead{}


a.prev = a
a.next = b
b.prev = a
b.next = c
c.prev = b
c.next = c

このように終わりと始まりのノードは自分をさすような仕組みしていたのだけど delete とか insert を考えると終端は空のノードじゃないとまずくて

terminate <-> a <-> b <-> c <-> terminate

のようにしないといけない。そもそもinsert もdelete も3つノードがある前提 なので端っこが特殊処理になってしまう。

insert の実装

conccurnet を意識しない場合は

func listAdd(new, prev, next *ListHead) {
	if prev != next {
		next.prev, new.next, new.prev, prev.next = new, next, prev, new
	} else {
		prev.next, new.prev = new, prev
	}
}

こんな感じ論文にもあるけど基本的に手順として

  1. new node の prev(new.prev) と next pointer(new.next) をマーキング
  2. CAS で prev.next が next と同一なら new に差し替える
  3. CAS で next.prev が prev と同一なら new に差し替える
func listAddWitCas(new, prev, next *ListHead) (err error) {
	// new.prev -> prev, new.next -> next
	StoreListHead(&new.prev, prev)
	StoreListHead(&new.next, next)


    if !Cas(&prev.next, next, new) {
        	goto ROLLBACK
    }

    if !Cas(&next.prev, prev, new) {
		goto ROLLBACK
	}
}

しかし当然 prev <-> next がつながってない場合もあって(すでに他がぶっこんでるとか) 失敗することもあるまたは 処理時に prev のnext の間に別の処理ではいってしまっている可能性 もあるので rollback させる

func listAddWitCas(new, prev, next *ListHead) (err error) {

	// backup for roolback
	oNewPrev := uintptr(unsafe.Pointer(new.prev))
	oNewNext := uintptr(unsafe.Pointer(new.next))
	rollback := func(new *ListHead) {
		StoreListHead(&new.prev, (*ListHead)(unsafe.Pointer(oNewPrev)))
		StoreListHead(&new.next, (*ListHead)(unsafe.Pointer(oNewNext)))
	}

	// new.prev -> prev, new.next -> next
	StoreListHead(&new.prev, prev)
	StoreListHead(&new.next, next)

	mu4Add.Lock()
	defer mu4Add.Unlock()
	if !Cas(&prev.next, next, new) {
		goto ROLLBACK
	}
	if !Cas(&next.prev, prev, new) {
		goto ROLLBACK
	}

	return nil

ROLLBACK:

	rollback(new)

    return ErrTCasConflictOnAdd,
}

失敗した場合は呼び出し側で挿入ターゲットのnode を次(next)を取り直して再度処理する 実際は、リトライ回数とかも設定してあるがそのあたりは省略

prev := head
next := (*ListHead)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&head.next))))
for {

    if err = listAddWitCas(new, prev,next); err ==nil {
        break
    }
    next = (*ListHead)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&prev.next))))
}

こう書くだけだとすげえかんたんそうに見えるw

続く