# boost 新增协议兼容性调研
# 结论
- boost 在协议层补充了两个新的协议 (细节见后文)
- /fil/storage/mk/1.2.0
- /fil/storage/status/1.2.0
 
- 新协议的提出主要改进了 client 到 market 的数据传输,并不会对 venus-market 多矿工的架构造成影响
- 现在的 venus-market 可以比较顺滑地升级以支持新的协议
# 协议变化
boost 在兼容以前协议的基础上,新增了两种协议:
- 向 market 推送订单的协议:/fil/storage/mk/1.2.0
- 以及查询订单数据:/fil/storage/status/1.2.0
# /fil/storage/mk/1.2.0
 # Request
# 描述
| Field | Type | Description | 
|---|---|---|
| DealUUID | uuid | gen by client with uuid ("github.com/google/uuid") | 
| IsOffline | boolean | Indicates whether the deal is online or offline | 
| ClientDealProposal | ClientDealProposal | Same as <v1 proposal>.DealProposal | 
| DealDataRoot | cid | The root cid of the CAR file. Same as <v1 proposal>.Piece.Root | 
| Transfer | struct | Transfer has the parameters for a data transfer | 
type DealParams struct {
	DealUUID           uuid.UUID
	IsOffline          bool
	ClientDealProposal market.ClientDealProposal
	DealDataRoot       cid.Cid
	Transfer           Transfer // Transfer params will be the zero value if this is an offline deal
}
// Transfer has the parameters for a data transfer
type Transfer struct {
	// The type of transfer eg "http"
	Type string
	// An optional ID that can be supplied by the client to identify the deal
	ClientID string
	// A byte array containing marshalled data specific to the transfer type
	// eg a JSON encoded struct { URL: "<url>", Headers: {...} }
	Params []byte
	// The size of the data transferred in bytes
	Size uint64
}
目前 boost 中只支持 http 类型的 transfer
# Response
# 描述
| Field | Type | Description | 
|---|---|---|
| Accepted | boolean | Indicates whether the deal proposal was accepted | 
| Message | string | A message about why the deal proposal was rejected | 
type DealResponse struct {
	Accepted bool
	// Message is the reason the deal proposal was rejected. It is empty if
	// the deal was accepted.
	Message string
}
# /fil/storage/status/1.2.0
 # Request
# 描述
| Field | Type | Description | 
|---|---|---|
| DealUUID | uuid | The uuid of the deal | 
| Signature | Signature (opens new window) | A signature over the uuid with the client's wallet | 
// DealStatusRequest is sent to get the current state of a deal from a
// storage provider
type DealStatusRequest struct {
	DealUUID  uuid.UUID
	Signature crypto.Signature
}
# Response
# 相关源码
| Field | Type | Description | 
|---|---|---|
| DealUUID | uuid | The uuid of the deal | 
| Error | string | Non-empty if there's an error getting the deal status | 
| IsOffline | boolean | Indicates whether the deal is online or offline | 
| TransferSize | integer | The total size of the transfer in bytes | 
| NBytesReceived | integer | The number of bytes that have been downloaded | 
| DealStatus | struct | the status of deal | 
| SignedProposalCid | cid | cid of the client deal proposal + signature | 
| PublishCid | cid | The cid of the publish message, if the deal has been published | 
| ChainDealID | integer | The ID of the deal on chain, if it's been published | 
// DealStatusResponse is the current state of a deal
type DealStatusResponse struct {
	DealUUID uuid.UUID
	// Error is non-empty if there is an error getting the deal status
	// (eg invalid request signature)
	Error          string
	DealStatus     *DealStatus
	IsOffline      bool
	TransferSize   uint64
	NBytesReceived uint64
}
type DealStatus struct {
	// Error is non-empty if the deal is in the error state
	Error string
	// Status is a string corresponding to a deal checkpoint
	Status string
	// SealingStatus is the sealing status reported by lotus miner
	SealingStatus string
	// Proposal is the deal proposal
	Proposal market.DealProposal
	// SignedProposalCid is the cid of the client deal proposal + signature
	SignedProposalCid cid.Cid
	// PublishCid is the cid of the Publish message sent on chain, if the deal
	// has reached the publish stage
	PublishCid *cid.Cid
	// ChainDealID is the id of the deal in chain state
	ChainDealID abi.DealID
}
// status
const (
	Accepted Checkpoint = iota
	Transferred
	Published
	PublishConfirmed
	AddedPiece
	IndexedAndAnnounced
	Complete
)
var names = map[Checkpoint]string{
	Accepted:            "Accepted",
	Transferred:         "Transferred",
	Published:           "Published",
	PublishConfirmed:    "PublishConfirmed",
	AddedPiece:          "AddedPiece",
	IndexedAndAnnounced: "IndexedAndAnnounced",
	Complete:            "Complete",
}
// SealingStatus string 类型,没看到所有的枚举定义
# boost 的相关实现
# 发起存储订单的流程
# 对比之前的订单发起流程
boost 实现的新的订单发起流程主要是基于 /fil/storage/mk/1.2.0 拓展了 client 向 market 传输数据的方式 (目前 boost 只实现了 http 的形式),舍弃了原本的graphsync,同时也简化了协商的流程
# 两者的时序图
- 传统的的订单发起流程 (基于 /fil/storage/mk/1.0.1 , /fil/storage/mk/1.1.0 /fil/storage/mk/1.1.1 )
- boost 提出的新的订单发起流程 (基于 /fil/storage/mk/1.2.0 )
# 相关源码
线上订单时,client 根据 cli-flag 生成 types.Transfer (离线订单时该对象为空)
// Store the path to the CAR file as a transfer parameter
transferParams := &types2.HttpRequest{URL: cctx.String("http-url")}
if cctx.IsSet("http-headers") {
	transferParams.Headers = make(map[string]string)
	for _, header := range cctx.StringSlice("http-headers") {
		sp := strings.Split(header, "=")
		if len(sp) != 2 {
			return fmt.Errorf("malformed http header: %s", header)
		}
		transferParams.Headers[sp[0]] = sp[1]
	}
}
paramsBytes, err := json.Marshal(transferParams)
if err != nil {
	return fmt.Errorf("marshalling request parameters: %w", err)
}
transfer.Type = "http"
transfer.Params = paramsBytes
准备发起订单的请求参数
// Create a deal proposal to storage provider using deal protocol v1.2.0 format
dealProposal, err := dealProposal(ctx, n, walletAddr, rootCid, abi.PaddedPieceSize(pieceSize), pieceCid, maddr, startEpoch, cctx.Int("duration"), cctx.Bool("verified"), providerCollateral, abi.NewTokenAmount(cctx.Int64("storage-price")))
if err != nil {
	return fmt.Errorf("failed to create a deal proposal: %w", err)
}
dealParams := types.DealParams{
	DealUUID:           dealUuid,
	ClientDealProposal: *dealProposal,
	DealDataRoot:       rootCid,
	IsOffline:          !isOnline,
	Transfer:           transfer,
}
func dealProposal(ctx context.Context, n *clinode.Node, clientAddr address.Address, rootCid cid.Cid, pieceSize abi.PaddedPieceSize, pieceCid cid.Cid, minerAddr address.Address, startEpoch abi.ChainEpoch, duration int, verified bool, providerCollateral abi.TokenAmount, storagePrice abi.TokenAmount) (*market.ClientDealProposal, error) {
	endEpoch := startEpoch + abi.ChainEpoch(duration)
	// deal proposal expects total storage price for deal per epoch, therefore we
	// multiply pieceSize * storagePrice (which is set per epoch per GiB) and divide by 2^30
	storagePricePerEpochForDeal := big.Div(big.Mul(big.NewInt(int64(pieceSize)), storagePrice), big.NewInt(int64(1<<30)))
	l, err := market.NewLabelFromString(rootCid.String())
	if err != nil {
		return nil, err
	}
	proposal := market.DealProposal{
		PieceCID:             pieceCid,
		PieceSize:            pieceSize,
		VerifiedDeal:         verified,
		Client:               clientAddr,
		Provider:             minerAddr,
		Label:                l,
		StartEpoch:           startEpoch,
		EndEpoch:             endEpoch,
		StoragePricePerEpoch: storagePricePerEpochForDeal,
		ProviderCollateral:   providerCollateral,
	}
	buf, err := cborutil.Dump(&proposal)
	if err != nil {
		return nil, err
	}
	sig, err := n.Wallet.WalletSign(ctx, clientAddr, buf, api.MsgMeta{Type: api.MTDealProposal})
	if err != nil {
		return nil, fmt.Errorf("wallet sign failed: %w", err)
	}
	return &market.ClientDealProposal{
		Proposal:        proposal,
		ClientSignature: *sig,
	}, nil
}
client 发送订单请求
s, err := n.Host.NewStream(ctx, addrInfo.ID, DealProtocolv120)
if err := cborutil.WriteCborRPC(s, &dealParams); err != nil {
	errc <- fmt.Errorf("failed to send request: %w", err)
	return
}
boost 接收到订单后,会先进行两步处理
- 校验订单的参数
- 校验的参数包括:PieceCID,signature,Provider,piece size,duration,epoch,provider collateral,ask,client fun 等等
 
- 将订单注入到 boost 的订单处理循环
// ExecuteDeal is called when the Storage Provider receives a deal proposal
// from the network
func (p *Provider) ExecuteDeal(ctx context.Context, dp *types.DealParams, clientPeer peer.ID) (*api.ProviderDealRejectionInfo, error) {
	ctx, span := tracing.Tracer.Start(ctx, "Provider.ExecuteDeal")
	defer span.End()
	span.SetAttributes(attribute.String("dealUuid", dp.DealUUID.String())) // Example of adding additional attributes
	p.dealLogger.Infow(dp.DealUUID, "executing deal proposal received from network", "peer", clientPeer)
	ds := types.ProviderDealState{
		DealUuid:           dp.DealUUID,
		ClientDealProposal: dp.ClientDealProposal,
		ClientPeerID:       clientPeer,
		DealDataRoot:       dp.DealDataRoot,
		Transfer:           dp.Transfer,
		IsOffline:          dp.IsOffline,
		Retry:              smtypes.DealRetryAuto,
	}
	// validate the deal proposal (校验订单数据)
	if err := p.validateDealProposal(ds); err != nil {
		reason := err.reason
		if reason == "" {
			reason = err.Error()
		}
		p.dealLogger.Infow(dp.DealUUID, "deal proposal failed validation", "err", err.Error(), "reason", reason)
		return &api.ProviderDealRejectionInfo{
			Reason: fmt.Sprintf("failed validation: %s", reason),
		}, nil
	}
	// 将订单注入 订单处理循环
	return p.executeDeal(ctx, ds)
}
# 订单的处理
# 订单处理循环
本小结简要介绍 boost 中订单处理循环 的处理流程 该订单处理循环是个典型的 select-case 结构 每个 case 对应一个 channel 作为循环的入口
	// channels used to pass messages to run loop
	// 接入达成 dealcheckpoints.AddedPiece 状态之前的订单
	// 会处理订单直到订单达成 dealcheckpoints.AddedPiece
	acceptDealChan       chan acceptDealReq
	// 接入达成 dealcheckpoints.Published 的订单的
	// 释放占用的 fund 资源
	publishedDealChan    chan publishDealReq
	// 接入 addpiece 之后的订单
	// 释放被占用的 storage space 资源
	storageSpaceChan     chan storageSpaceDealReq
	// 接入需要清理资源占用的的订单 (完成的订单,或者失败的订单)
	// 清理订单的资源占用,主要是 fund 和 storage space
	finishedDealChan     chan finishedDealReq
	// 接入希望重启或者希望终止 的已经停止的订单
	// 重启或者终止一个停止了的订单
	updateRetryStateChan chan updateRetryStateReq
注释:
- fund 资源: boost 用于抵押的资金,在 deal 初始化时被分配,deal 发布之后,被分配的资金会作为抵押被转走,同时也应该释放相应 deal 的资金占用
- storage 资源: boost 用于存储订单数据的存储空间资源,在订单创建时会被分配,在订单被 addpiece 之后会被释放掉
# acceptDealChan 分支
五个 case 之中,会着重介绍 acceptDealChan 所在的 case 分支
// case  <-p.acceptDealChan 的部分源码
select {
	// Process a request to
	// - accept a deal proposal and execute it immediately
	// - accept an offline deal proposal and save it for execution later
	//   when the data is imported
	// - accept a request to import data for an offline deal
	case dealReq := <-p.acceptDealChan:
		deal := dealReq.deal
		p.dealLogger.Infow(deal.DealUuid, "processing deal acceptance request")
		// 离线订单,预处理,等待数据导入
		if deal.IsOffline && !dealReq.isImport {
			// When the client proposes an offline deal, save the deal
			// to the database but don't execute the deal. The deal
			// will be executed when the Storage Provider imports the
			// deal data.
			dh, err := p.mkAndInsertDealHandler(deal.DealUuid)
			// 1.检查订单不重复
			// 2.初始化订单,将之保存到数据库
			aerr := p.processOfflineDealProposal(dealReq.deal, dh)
			// The deal proposal was successful. Send an Accept response to the client.
			dealReq.rsp <- acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: true}}
			// Don't execute the deal now, wait for data import.
			continue
		}
		// 1. 分配 fund 和 storage space 资源
		// 2. 初始化订单参数,包括订单数据的路径,创建订单的时间记录等
		if deal.IsOffline {
			// 离线订单,并且订单数据已经导入
			// The Storage Provider is importing offline deal data, so tag
			// funds for the deal and execute it
			aerr = p.processImportOfflineDealData(dealReq.deal)
		} else {
			// 在线订单
			// Process a regular deal proposal
			aerr = p.processDealProposal(dealReq.deal)
		}
		if aerr != nil {
			sendErrorResp(aerr)
			continue
		}
		// set up deal handler so that clients can subscribe to deal update events
		dh, err := p.mkAndInsertDealHandler(deal.DealUuid)
		if err != nil {
			sendErrorResp(&acceptError{error: err, isSevereError: true, reason: "server error: starting deal thread"})
			continue
		}
		// start executing the deal
		_, err = p.startDealThread(dh, deal)
		if err != nil {
			sendErrorResp(&acceptError{error: err, isSevereError: true, reason: "server error: starting deal thread"})
			continue
		}
		// send an accept response
		dealReq.rsp <- acceptDealResp{&api.ProviderDealRejectionInfo{Accepted: true}, nil}
	case ....
}
在线订单会在完成订单资源分配和参数初始化之后,就会真正地开始执行订单处理线程 离线订单会两次进入这一段代码,第一次初始化订单参数之后退出,当导入离线数据的时候会再次进入这段代码,进行资源分配并开始 叮当处理线程
# 订单处理线程
简要流程
相关源码
func (p *Provider) execDealUptoAddPiece(ctx context.Context, deal *types.ProviderDealState, dh *dealHandler) *dealMakingError {
	pub := dh.Publisher
	// publish "new deal" event
	p.fireEventDealNew(deal)
	// publish an event with the current state of the deal
	p.fireEventDealUpdate(pub, deal)
	p.dealLogger.Infow(deal.DealUuid, "deal execution in progress")
	// Transfer Data step will be executed only if it's NOT an offline deal
	if !deal.IsOffline {
		if deal.Checkpoint < dealcheckpoints.Transferred {
			// Check that the deal's start epoch hasn't already elapsed
			if derr := p.checkDealProposalStartEpoch(deal); derr != nil {
				return derr
			}
			// download data from the client,and verifyCommP
			if err := p.transferAndVerify(dh, pub, deal); err != nil {
				// The transfer has failed. If the user tries to cancel the
				// transfer after this point it's a no-op.
				dh.setCancelTransferResponse(nil)
				return err
			}
			p.dealLogger.Infow(deal.DealUuid, "deal data transfer finished successfully")
		} else {
			p.dealLogger.Infow(deal.DealUuid, "deal data transfer has already been completed")
		}
		// transfer can no longer be cancelled
		dh.setCancelTransferResponse(errors.New("transfer already complete"))
		p.dealLogger.Infow(deal.DealUuid, "deal data-transfer can no longer be cancelled")
	} else if deal.Checkpoint < dealcheckpoints.Transferred {
		// verify CommP matches for an offline deal
		if err := p.verifyCommP(deal); err != nil {
			err.error = fmt.Errorf("error when matching commP for imported data for offline deal: %w", err)
			return err
		}
		p.dealLogger.Infow(deal.DealUuid, "commp matched successfully for imported data for offline deal")
		// update checkpoint
		if derr := p.updateCheckpoint(pub, deal, dealcheckpoints.Transferred); derr != nil {
			return derr
		}
	}
	// Publish
	if deal.Checkpoint <= dealcheckpoints.Published {
		if err := p.publishDeal(ctx, pub, deal); err != nil {
			return err
		}
		p.dealLogger.Infow(deal.DealUuid, "deal successfully published and confirmed-publish")
	} else {
		p.dealLogger.Infow(deal.DealUuid, "deal has already been published and confirmed-publish")
	}
	// Now that the deal has been published, we no longer need to have funds
	// tagged as being for this deal (the publish message moves collateral
	// from the storage market actor escrow balance to the locked balance)
	if err := p.untagFundsAfterPublish(ctx, deal); err != nil {
		// If there's an error untagging funds we should still try to continue,
		// so just log the error
		p.dealLogger.Warnw(deal.DealUuid, "failed to untag funds after sending publish message", "err", err)
	} else {
		p.dealLogger.Infow(deal.DealUuid, "funds successfully untagged for deal after publish")
	}
	// AddPiece
	if deal.Checkpoint < dealcheckpoints.AddedPiece {
		if err := p.addPiece(ctx, pub, deal); err != nil {
			err.error = fmt.Errorf("failed to add piece: %w", err.error)
			return err
		}
		p.dealLogger.Infow(deal.DealUuid, "deal successfully handed over to the sealing subsystem")
	} else {
		p.dealLogger.Infow(deal.DealUuid, "deal has already been handed over to the sealing subsystem")
	}
	// as deal has already been handed to the sealer, we can remove the inbound file and reclaim the tagged space
	if !deal.IsOffline {
		_ = os.Remove(deal.InboundFilePath)
		p.dealLogger.Infow(deal.DealUuid, "removed inbound file as deal handed to sealer", "path", deal.InboundFilePath)
	}
	if err := p.untagStorageSpaceAfterSealing(ctx, deal); err != nil {
		// If there's an error untagging storage space we should still try to continue,
		// so just log the error
		p.dealLogger.Warnw(deal.DealUuid, "failed to untag storage space after handing deal to sealer", "err", err)
	} else {
		p.dealLogger.Infow(deal.DealUuid, "storage space successfully untagged for deal after it was handed to sealer")
	}
	// Index deal in DAGStore and Announce deal
	if deal.Checkpoint < dealcheckpoints.IndexedAndAnnounced {
		if err := p.indexAndAnnounce(ctx, pub, deal); err != nil {
			err.error = fmt.Errorf("failed to add index and announce deal: %w", err.error)
			return err
		}
		p.dealLogger.Infow(deal.DealUuid, "deal successfully indexed and announced")
	} else {
		p.dealLogger.Infow(deal.DealUuid, "deal has already been indexed and announced")
	}
	return nil
}
# 订单分配
在订单的处理流程中,addpiece 阶段会将订单数据分配到矿工的扇区。 这个过程主要是通过调用 PieceAdder 接口的 AddPiece 方法来实现。 其中 PieceAdder 主要底层是由 lotus-miner 的 rpc client 来实现的,其依赖路径如下
← 批量发单 Boost 协议集成 →
