本文共 2711 字,大约阅读时间需要 9 分钟。
package mainimport ( "go.etcd.io/etcd/clientv3" "time" "fmt" "context")func main() { var ( config clientv3.Config client *clientv3.Client err error lease clientv3.Lease leaseGrantResp *clientv3.LeaseGrantResponse leaseId clientv3.LeaseID keepRespChan <-chan *clientv3.LeaseKeepAliveResponse keepResp *clientv3.LeaseKeepAliveResponse ctx context.Context cancelFunc context.CancelFunc kv clientv3.KV txn clientv3.Txn txnResp *clientv3.TxnResponse ) // 客户端配置 config = clientv3.Config{ Endpoints: []string{"0.0.0.0:2379"}, DialTimeout: 5 * time.Second, } // 建立连接 if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } // lease实现锁自动过期(上锁之后,如果节点宕机,锁会一直占用,所以要过期机制,也要续租机制): // op操作 // txn事务: if else then // 1, 上锁 (创建租约, 自动续租, 拿着租约去抢占一个key) lease = clientv3.NewLease(client) // 申请一个5秒的租约 if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil { fmt.Println(err) return } // 拿到租约的ID leaseId = leaseGrantResp.ID // 准备一个用于取消自动续租的context ctx, cancelFunc = context.WithCancel(context.TODO()) // 确保函数退出后, 自动续租会停止 defer cancelFunc() //终止自动续租协程(goroutine) defer lease.Revoke(context.TODO(), leaseId) //告诉etcd把租约直接释放掉,更直接,立即删除,锁就释放了 // 5秒后会取消自动续租 if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil { fmt.Println(err) return } // 处理续约应答的协程 go func() { for { select { case keepResp = <- keepRespChan: if keepRespChan == nil { fmt.Println("租约已经失效了") goto END } else { // 每秒会续租一次, 所以就会受到一次应答 fmt.Println("收到自动续租应答:", keepResp.ID) } } } END: }() // if 不存在key, then 设置它, else 抢锁失败 kv = clientv3.NewKV(client) // 创建事务 txn = kv.Txn(context.TODO()) // 定义事务 // 如果key不存在(创建版本是0说明没有被创建) txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)). Then(clientv3.OpPut("/cron/lock/job9", "xxx", clientv3.WithLease(leaseId))). Else(clientv3.OpGet("/cron/lock/job9")) // 否则抢锁失败 // 提交事务 if txnResp, err = txn.Commit(); err != nil { fmt.Println(err) return // 没有问题 } // 判断是否抢到了锁 if !txnResp.Succeeded { fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value)) return } // 2, 处理业务 fmt.Println("处理任务") time.Sleep(5 * time.Second) // 3, 释放锁(取消自动续租, 释放租约) // 上面的defer 会把租约释放掉, 关联的KV就被删除了}(右边的先执行,左边的后执行,左边会提示锁已被占用)
转载于:https://blog.51cto.com/5660061/2382042